The AI Revolution in Synthetic Data Generation
Generative AI for synthetic data represents the cutting edge of artificial intelligence applications, transforming how organizations create, validate, and deploy realistic datasets. While traditional synthetic data generation relied on statistical sampling and rule-based systems, modern AI approaches use sophisticated neural networks to understand complex data patterns and generate unprecedented levels of realism and utility.
Large language models, generative adversarial networks, and variational autoencoders have revolutionized synthetic data AI, enabling the creation of datasets that not only preserve statistical properties but also capture subtle relationships, semantic meaning, and contextual nuances that traditional methods often miss.
The Evolution from Statistical to AI-Powered Generation
The journey from basic random sampling to AI-powered generation represents a fundamental shift in how we approach data creation:
- Traditional Statistical Methods: Rule-based generation with predefined distributions
- Machine Learning Enhancement: Pattern recognition to improve realism
- Deep Learning Revolution: Neural networks that understand complex relationships
- Generative AI Era: Models that create contextually aware, semantically meaningful data
Large Language Models: The New Frontier
GPT and Transformer-Based Data Generation
Modern language models have transformed text-based synthetic data generation by understanding context, maintaining consistency, and generating human-like content at scale:
Text Data Generation with GPT Models
import openai
import json
from datetime import datetime, timedelta
import random
class GPTSyntheticDataGenerator:
def init(self, api_key, model="gpt-4"):
self.client = openai.OpenAI(api_key=api_key)
self.model = model
async def generate_customer_reviews(self, product_info, num_reviews=50, sentiment_distribution=None):
"""Generate realistic customer reviews using GPT-4"""
if sentiment_distribution is None:
sentiment_distribution = {"positive": 0.6, "neutral": 0.25, "negative": 0.15}
prompt_template = """
Generate {num_reviews} realistic customer reviews for the following product:
Product: {product_name}
Category: {category}
Price: ${price}
Key Features: {features}
Review Requirements:
- {positive_count} positive reviews (4-5 stars)
- {neutral_count} neutral reviews (3 stars)
- {negative_count} negative reviews (1-2 stars)
- Vary review length from 20-200 words
- Include specific product details and use cases
- Make reviews feel authentic with natural language patterns
- Include occasional typos and colloquialisms
Format as JSON array with fields: reviewer_name, rating, title, review_text, helpful_votes, verified_purchase, review_date
"""
# Calculate sentiment counts
positive_count = int(num_reviews * sentiment_distribution["positive"])
neutral_count = int(num_reviews * sentiment_distribution["neutral"])
negative_count = num_reviews - positive_count - neutral_count
prompt = prompt_template.format(
num_reviews=num_reviews,
product_name=product_info["name"],
category=product_info["category"],
price=product_info["price"],
features=", ".join(product_info["features"]),
positive_count=positive_count,
neutral_count=neutral_count,
negative_count=negative_count
)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert at generating realistic customer reviews that feel authentic and diverse."},
{"role": "user", "content": prompt}
],
temperature=0.8,
max_tokens=4000
)
try:
reviews = json.loads(response.choices[0].message.content)
return self.enhance_review_metadata(reviews)
except json.JSONDecodeError:
# Fallback parsing if JSON is malformed
return self.parse_reviews_fallback(response.choices[0].message.content)
def enhance_review_metadata(self, reviews):
"""Add realistic metadata to generated reviews"""
enhanced_reviews = []
for review in reviews:
# Add realistic timestamps
days_ago = random.randint(1, 365)
review_date = datetime.now() - timedelta(days=days_ago)
enhanced_review = {
**review,
"review_id": f"review_{random.randint(100000, 999999)}",
"reviewer_id": f"user_{random.randint(10000, 99999)}",
"review_date": review_date.isoformat(),
"verified_purchase": random.choice([True, True, True, False]), # 75% verified
"helpful_votes": random.randint(0, 50),
"total_votes": random.randint(0, 60),
"reviewer_location": random.choice([
"United States", "Canada", "United Kingdom", "Australia",
"Germany", "France", "Japan", "Brazil"
]),
"reviewer_rank": f"Top {random.randint(100, 10000)} Reviewer" if random.random() < 0.1 else None
}
enhanced_reviews.append(enhanced_review)
return enhanced_reviews
async def generate_conversational_data(self, scenario, num_conversations=25):
"""Generate realistic conversation datasets for chatbot training"""
scenarios = {
"customer_support": {
"context": "Customer service interactions for e-commerce platform",
"intents": ["order_status", "return_request", "product_inquiry", "billing_issue", "technical_support"],
"tone": "professional, helpful, empathetic"
},
"sales_inquiry": {
"context": "Sales conversations for B2B software platform",
"intents": ["pricing_inquiry", "feature_demo", "trial_request", "competitor_comparison", "implementation_timeline"],
"tone": "consultative, informative, persuasive"
},
"healthcare_support": {
"context": "Patient support conversations for healthcare platform",
"intents": ["appointment_scheduling", "symptom_inquiry", "medication_question", "insurance_verification", "test_results"],
"tone": "caring, professional, HIPAA-compliant"
}
}
scenario_config = scenarios.get(scenario, scenarios["customer_support"])
prompt = f"""
Generate {num_conversations} realistic conversation pairs between a customer/user and support agent.
Context: {scenario_config["context"]}
Tone: {scenario_config["tone"]}
Include conversations covering these intents:
{', '.join(scenario_config["intents"])}
Requirements:
- Natural conversation flow with multiple exchanges
- Realistic user questions and agent responses
- Varied conversation lengths (2-8 exchanges)
- Include edge cases and complex scenarios
- Maintain consistency within each conversation
- Use natural language with appropriate formality level
Format as JSON array with structure:
{{
"conversation_id": "conv_001",
"intent": "order_status",
"exchanges": [
{{"speaker": "user", "message": "...", "timestamp": "..."}},
{{"speaker": "agent", "message": "...", "timestamp": "..."}}
],
"resolution_status": "resolved|escalated|ongoing",
"satisfaction_score": 1-5
}}
"""
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": f"You are an expert at generating realistic conversational data for {scenario} scenarios."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=6000
)
try:
conversations = json.loads(response.choices[0].message.content)
return self.enhance_conversation_metadata(conversations)
except json.JSONDecodeError:
return self.parse_conversations_fallback(response.choices[0].message.content)
async def generate_structured_content(self, content_type, specifications):
"""Generate structured content like articles, documentation, or reports"""
content_templates = {
"blog_articles": {
"prompt": """
Create {count} blog articles about {topic} with the following specifications:
- Target audience: {audience}
- Tone: {tone}
- Word count: {word_count} words each
- Include SEO-optimized titles and meta descriptions
- Add relevant tags and categories
- Include call-to-action sections
""",
"fields": ["title", "meta_description", "content", "tags", "category", "author", "publish_date", "reading_time"]
},
"product_descriptions": {
"prompt": """
Generate {count} product descriptions for {category} products:
- Include key features and benefits
- Target keywords: {keywords}
- Highlight unique selling points
- Include technical specifications
- Add compelling calls-to-action
""",
"fields": ["product_name", "short_description", "detailed_description", "features", "specifications", "price_range", "target_audience"]
},
"legal_documents": {
"prompt": """
Create {count} sample legal document templates for {document_type}:
- Follow standard legal formatting
- Include necessary clauses and provisions
- Use appropriate legal terminology
- Ensure compliance with {jurisdiction} law
- Include placeholder fields for customization
""",
"fields": ["document_title", "document_type", "sections", "clauses", "effective_date", "jurisdiction", "template_fields"]
}
}
template = content_templates.get(content_type, content_templates["blog_articles"])
prompt = template["prompt"].format(**specifications)
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": f"You are an expert content creator specializing in {content_type}."},
{"role": "user", "content": prompt}
],
temperature=0.7,
max_tokens=8000
)
return response.choices[0].message.content
Usage example
generator = GPTSyntheticDataGenerator(api_key="your_openai_api_key")
Generate product reviews
product_info = {
"name": "MacBook Pro 16-inch M2 Pro",
"category": "Laptops",
"price": 2499,
"features": ["M2 Pro chip", "16-inch Liquid Retina display", "1TB SSD", "32GB RAM"]
}
reviews = await generator.generate_customer_reviews(product_info, num_reviews=100)
Advanced Text Generation with Context Awareness
class ContextAwareSyntheticGenerator:
def __init__(self, model_name="gpt-4"):
self.model = model_name
self.context_memory = {}
async def generate_employee_records(self, company_context, num_employees=500):
"""Generate realistic employee data with company-specific context"""
prompt = f"""
Generate employee records for {company_context["company_name"]}, a {company_context["industry"]} company
with {company_context["size"]} employees headquartered in {company_context["location"]}.
Company Culture: {company_context["culture"]}
Departments: {', '.join(company_context["departments"])}
Generate {num_employees} employee records that reflect:
- Realistic role distributions for this industry
- Appropriate salary ranges for the location
- Diverse backgrounds and experiences
- Company-specific email domains and systems
- Realistic tenure and promotion patterns
- Skills that match industry requirements
Include fields: employee_id, first_name, last_name, email, department, role,
level, salary, hire_date, manager_id, skills, performance_rating, location
"""
# Generate in batches to maintain context consistency
all_employees = []
batch_size = 50
for batch_start in range(0, num_employees, batch_size):
batch_size_actual = min(batch_size, num_employees - batch_start)
batch_prompt = f"{prompt}\n\nGenerate employees {batch_start + 1} to {batch_start + batch_size_actual}."
if batch_start > 0:
batch_prompt += f"\n\nMaintain consistency with previously generated employees and ensure realistic manager-subordinate relationships."
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": "You are an expert HR data analyst who creates realistic employee datasets."},
{"role": "user", "content": batch_prompt}
],
temperature=0.6,
max_tokens=4000
)
batch_employees = json.loads(response.choices[0].message.content)
all_employees.extend(batch_employees)
return self.validate_employee_consistency(all_employees, company_context)
def validate_employee_consistency(self, employees, company_context):
"""Ensure generated employee data maintains internal consistency"""
# Validate manager relationships
employee_ids = {emp["employee_id"] for emp in employees}
for employee in employees:
# Ensure managers exist in the dataset
if employee.get("manager_id") and employee["manager_id"] not in employee_ids:
# Assign to a valid manager or make senior level
if employee["level"] in ["Senior", "Principal", "Director"]:
employee["manager_id"] = None # Senior enough to not have manager in dataset
else:
# Find appropriate manager
potential_managers = [
emp for emp in employees
if emp["department"] == employee["department"]
and emp["level"] in ["Senior", "Principal", "Director", "VP"]
and emp["employee_id"] != employee["employee_id"]
]
if potential_managers:
employee["manager_id"] = random.choice(potential_managers)["employee_id"]
# Validate salary ranges for role and location
self.validate_salary_ranges(employee, company_context)
# Ensure email consistency
employee["email"] = f"{employee['first_name'].lower()}.{employee['last_name'].lower()}@{company_context['email_domain']}"
return employees
def validate_salary_ranges(self, employee, company_context):
"""Validate and adjust salary ranges based on role, level, and location"""
# Salary multipliers by level
level_multipliers = {
"Entry": 1.0,
"Mid": 1.3,
"Senior": 1.7,
"Principal": 2.2,
"Director": 2.8,
"VP": 3.5,
"C-Level": 5.0
}
# Base salary by department (in thousands)
dept_base_salaries = {
"Engineering": 85,
"Product": 80,
"Sales": 70,
"Marketing": 65,
"Operations": 60,
"HR": 65,
"Finance": 75,
"Legal": 90
}
base_salary = dept_base_salaries.get(employee["department"], 65)
level_multiplier = level_multipliers.get(employee["level"], 1.0)
# Location adjustment
location_multipliers = {
"San Francisco": 1.4,
"New York": 1.3,
"Seattle": 1.2,
"Austin": 1.1,
"Denver": 1.0,
"Remote": 0.95
}
location_multiplier = location_multipliers.get(employee["location"], 1.0)
# Calculate expected salary range
expected_salary = base_salary * level_multiplier * location_multiplier * 1000
variance = 0.15 # 15% variance
min_salary = expected_salary * (1 - variance)
max_salary = expected_salary * (1 + variance)
# Adjust if current salary is outside reasonable range
if employee["salary"] < min_salary or employee["salary"] > max_salary:
employee["salary"] = int(random.uniform(min_salary, max_salary))
return employee
Generative Adversarial Networks (GANs) for Tabular Data
GANs have revolutionized synthetic data generation by learning to create data that's indistinguishable from real data:
CTGAN Implementation for Realistic Tabular Data
import pandas as pd
import numpy as np
from ctgan import CTGAN
from sdv.metrics import evaluate
import torch
import torch.nn as nn
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
class AdvancedCTGAN:
def init(self, epochs=300, batch_size=500, generator_lr=2e-4, discriminator_lr=2e-4):
self.ctgan = CTGAN(
epochs=epochs,
batch_size=batch_size,
generator_lr=generator_lr,
discriminator_lr=discriminator_lr,
generator_decay=1e-6,
discriminator_decay=1e-6,
verbose=True
)
self.metadata = {}
self.quality_metrics = {}
def prepare_data(self, real_data, categorical_columns=None, datetime_columns=None):
"""Prepare real data for training with advanced preprocessing"""
prepared_data = real_data.copy()
# Handle datetime columns
if datetime_columns:
for col in datetime_columns:
if col in prepared_data.columns:
prepared_data[f"{col}_year"] = pd.to_datetime(prepared_data[col]).dt.year
prepared_data[f"{col}_month"] = pd.to_datetime(prepared_data[col]).dt.month
prepared_data[f"{col}_day"] = pd.to_datetime(prepared_data[col]).dt.day
prepared_data[f"{col}_weekday"] = pd.to_datetime(prepared_data[col]).dt.weekday
prepared_data = prepared_data.drop(columns=[col])
# Store metadata for post-processing
self.metadata = {
"categorical_columns": categorical_columns or [],
"datetime_columns": datetime_columns or [],
"numeric_columns": prepared_data.select_dtypes(include=[np.number]).columns.tolist(),
"column_distributions": {}
}
# Analyze distributions for quality validation
for col in prepared_data.columns:
if prepared_data[col].dtype in ['object', 'category']:
self.metadata["column_distributions"][col] = prepared_data[col].value_counts(normalize=True).to_dict()
else:
self.metadata["column_distributions"][col] = {
"mean": prepared_data[col].mean(),
"std": prepared_data[col].std(),
"min": prepared_data[col].min(),
"max": prepared_data[col].max(),
"percentiles": prepared_data[col].quantile([0.25, 0.5, 0.75]).to_dict()
}
return prepared_data
def train_with_validation(self, real_data, validation_split=0.2):
"""Train CTGAN with validation and early stopping"""
# Split data for validation
train_data, val_data = train_test_split(real_data, test_size=validation_split, random_state=42)
# Train the model
print("Training CTGAN model...")
self.ctgan.fit(train_data)
# Generate validation samples
print("Generating validation samples...")
synthetic_val = self.ctgan.sample(len(val_data))
# Evaluate model quality
print("Evaluating model quality...")
self.quality_metrics = self.evaluate_quality(val_data, synthetic_val)
return self.quality_metrics
def generate_synthetic_data(self, num_samples, apply_constraints=True):
"""Generate synthetic data with optional constraint application"""
print(f"Generating {num_samples} synthetic samples...")
synthetic_data = self.ctgan.sample(num_samples)
if apply_constraints:
synthetic_data = self.apply_business_constraints(synthetic_data)
return synthetic_data
def apply_business_constraints(self, synthetic_data):
"""Apply business logic constraints to ensure realistic data"""
constrained_data = synthetic_data.copy()
# Example constraints (customize based on your data)
# Ensure age is within reasonable bounds
if "age" in constrained_data.columns:
constrained_data["age"] = np.clip(constrained_data["age"], 18, 100)
# Ensure income is positive
if "income" in constrained_data.columns:
constrained_data["income"] = np.maximum(constrained_data["income"], 0)
# Ensure credit score is within valid range
if "credit_score" in constrained_data.columns:
constrained_data["credit_score"] = np.clip(constrained_data["credit_score"], 300, 850)
# Business logic: credit limit should correlate with income and credit score
if all(col in constrained_data.columns for col in ["income", "credit_score", "credit_limit"]):
# Simplified business rule
max_credit = (constrained_data["income"] * 0.3) * (constrained_data["credit_score"] / 700)
constrained_data["credit_limit"] = np.minimum(constrained_data["credit_limit"], max_credit)
return constrained_data
def evaluate_quality(self, real_data, synthetic_data):
"""Comprehensive quality evaluation of synthetic data"""
metrics = {}
# Statistical similarity
try:
from sdv.metrics import evaluate
quality_report = evaluate(real_data, synthetic_data)
metrics["overall_quality"] = quality_report
except Exception as e:
print(f"Warning: Could not compute SDV quality metrics: {e}")
# Distribution comparison
metrics["distribution_similarity"] = self.compare_distributions(real_data, synthetic_data)
# Correlation preservation
metrics["correlation_similarity"] = self.compare_correlations(real_data, synthetic_data)
# Privacy assessment
metrics["privacy_score"] = self.assess_privacy(real_data, synthetic_data)
return metrics
def compare_distributions(self, real_data, synthetic_data):
"""Compare distributions between real and synthetic data"""
from scipy.stats import ks_2samp, chi2_contingency
distribution_scores = {}
for column in real_data.columns:
if column in synthetic_data.columns:
if real_data[column].dtype in ['object', 'category']:
# Categorical comparison using chi-square test
real_counts = real_data[column].value_counts()
synthetic_counts = synthetic_data[column].value_counts()
# Align categories
all_categories = set(real_counts.index) | set(synthetic_counts.index)
real_aligned = [real_counts.get(cat, 0) for cat in all_categories]
synthetic_aligned = [synthetic_counts.get(cat, 0) for cat in all_categories]
if len(all_categories) > 1:
chi2, p_value = chi2_contingency([real_aligned, synthetic_aligned])[:2]
distribution_scores[column] = {"chi2": chi2, "p_value": p_value}
else:
# Numerical comparison using Kolmogorov-Smirnov test
ks_stat, p_value = ks_2samp(real_data[column].dropna(), synthetic_data[column].dropna())
distribution_scores[column] = {"ks_statistic": ks_stat, "p_value": p_value}
return distribution_scores
def compare_correlations(self, real_data, synthetic_data):
"""Compare correlation matrices between real and synthetic data"""
numeric_columns = real_data.select_dtypes(include=[np.number]).columns
if len(numeric_columns) < 2:
return {"error": "Not enough numeric columns for correlation analysis"}
real_corr = real_data[numeric_columns].corr()
synthetic_corr = synthetic_data[numeric_columns].corr()
# Calculate correlation similarity
correlation_diff = np.abs(real_corr - synthetic_corr)
mean_diff = correlation_diff.mean().mean()
max_diff = correlation_diff.max().max()
return {
"mean_correlation_difference": mean_diff,
"max_correlation_difference": max_diff,
"correlation_preserved": max_diff < 0.1
}
def assess_privacy(self, real_data, synthetic_data):
"""Assess privacy preservation using nearest neighbor analysis"""
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
# Prepare numeric data for privacy assessment
numeric_real = real_data.select_dtypes(include=[np.number])
numeric_synthetic = synthetic_data.select_dtypes(include=[np.number])
if numeric_real.empty:
return {"error": "No numeric columns for privacy assessment"}
# Standardize data
scaler = StandardScaler()
real_scaled = scaler.fit_transform(numeric_real.fillna(0))
synthetic_scaled = scaler.transform(numeric_synthetic.fillna(0))
# Find nearest neighbors
nn = NearestNeighbors(n_neighbors=1, metric='euclidean')
nn.fit(real_scaled)
distances, indices = nn.kneighbors(synthetic_scaled)
# Calculate privacy metrics
mean_distance = np.mean(distances)
min_distance = np.min(distances)
# Privacy score (higher is better)
privacy_score = min(1.0, mean_distance / (mean_distance + 0.1))
return {
"mean_nearest_neighbor_distance": mean_distance,
"min_nearest_neighbor_distance": min_distance,
"privacy_score": privacy_score,
"privacy_level": "high" if privacy_score > 0.8 else "medium" if privacy_score > 0.5 else "low"
}
Usage example
ctgan_generator = AdvancedCTGAN(epochs=500, batch_size=1000)
Load and prepare your data
real_data = pd.read_csv("your_dataset.csv")
prepared_data = ctgan_generator.prepare_data(
real_data,
categorical_columns=["category", "region", "customer_type"],
datetime_columns=["created_date", "last_updated"]
)
Train with validation
quality_metrics = ctgan_generator.train_with_validation(prepared_data)
print("Quality Metrics:", quality_metrics)
Generate synthetic data
synthetic_data = ctgan_generator.generate_synthetic_data(num_samples=10000)
Variational Autoencoders (VAEs) for Complex Data Generation
VAEs provide another powerful approach for generating synthetic data with controllable properties:
VAE Implementation for Mixed-Type Data
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd
class ConditionalVAE(nn.Module):
def init(self, input_dim, latent_dim=20, hidden_dims=[128, 64], condition_dim=0):
super(ConditionalVAE, self).init()
self.input_dim = input_dim
self.latent_dim = latent_dim
self.condition_dim = condition_dim
# Encoder
encoder_input_dim = input_dim + condition_dim
encoder_layers = []
prev_dim = encoder_input_dim
for hidden_dim in hidden_dims:
encoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
self.encoder = nn.Sequential(*encoder_layers)
self.fc_mu = nn.Linear(hidden_dims[-1], latent_dim)
self.fc_logvar = nn.Linear(hidden_dims[-1], latent_dim)
# Decoder
decoder_input_dim = latent_dim + condition_dim
decoder_layers = []
prev_dim = decoder_input_dim
for hidden_dim in reversed(hidden_dims):
decoder_layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2)
])
prev_dim = hidden_dim
decoder_layers.append(nn.Linear(hidden_dims[0], input_dim))
self.decoder = nn.Sequential(*decoder_layers)
def encode(self, x, condition=None):
if condition is not None:
x = torch.cat([x, condition], dim=1)
h = self.encoder(x)
mu = self.fc_mu(h)
logvar = self.fc_logvar(h)
return mu, logvar
def reparameterize(self, mu, logvar):
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std)
return mu + eps * std
def decode(self, z, condition=None):
if condition is not None:
z = torch.cat([z, condition], dim=1)
return self.decoder(z)
def forward(self, x, condition=None):
mu, logvar = self.encode(x, condition)
z = self.reparameterize(mu, logvar)
recon_x = self.decode(z, condition)
return recon_x, mu, logvar
class VAESyntheticDataGenerator:
def init(self, latent_dim=20, hidden_dims=[128, 64], learning_rate=1e-3):
self.latent_dim = latent_dim
self.hidden_dims = hidden_dims
self.learning_rate = learning_rate
self.model = None
self.preprocessors = {}
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
def prepare_data(self, data, categorical_columns=None, condition_column=None):
"""Prepare data for VAE training with proper encoding"""
processed_data = data.copy()
# Handle categorical variables
if categorical_columns:
for col in categorical_columns:
if col in processed_data.columns:
# One-hot encode categorical variables
dummies = pd.get_dummies(processed_data[col], prefix=col)
processed_data = pd.concat([processed_data, dummies], axis=1)
processed_data = processed_data.drop(columns=[col])
# Handle condition column separately
condition_data = None
if condition_column and condition_column in processed_data.columns:
if processed_data[condition_column].dtype == 'object':
condition_encoded = pd.get_dummies(processed_data[condition_column])
condition_data = condition_encoded.values
self.preprocessors['condition_encoder'] = condition_encoded.columns
else:
condition_data = processed_data[condition_column].values.reshape(-1, 1)
processed_data = processed_data.drop(columns=[condition_column])
# Normalize numerical features
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaled_data = scaler.fit_transform(processed_data)
self.preprocessors['scaler'] = scaler
self.preprocessors['feature_names'] = processed_data.columns.tolist()
return scaled_data, condition_data
def train(self, data, condition_data=None, epochs=200, batch_size=128):
"""Train the VAE model"""
input_dim = data.shape[1]
condition_dim = condition_data.shape[1] if condition_data is not None else 0
self.model = ConditionalVAE(
input_dim=input_dim,
latent_dim=self.latent_dim,
hidden_dims=self.hidden_dims,
condition_dim=condition_dim
).to(self.device)
optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
# Convert to tensors
data_tensor = torch.FloatTensor(data).to(self.device)
condition_tensor = torch.FloatTensor(condition_data).to(self.device) if condition_data is not None else None
# Create dataset and dataloader
if condition_tensor is not None:
dataset = TensorDataset(data_tensor, condition_tensor)
else:
dataset = TensorDataset(data_tensor)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
self.model.train()
for epoch in range(epochs):
total_loss = 0
for batch in dataloader:
if condition_tensor is not None:
batch_data, batch_condition = batch
else:
batch_data = batch[0]
batch_condition = None
optimizer.zero_grad()
recon_batch, mu, logvar = self.model(batch_data, batch_condition)
loss = self.loss_function(recon_batch, batch_data, mu, logvar)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 50 == 0:
avg_loss = total_loss / len(dataloader)
print(f"Epoch {epoch + 1}/{epochs}, Average Loss: {avg_loss:.4f}")
def loss_function(self, recon_x, x, mu, logvar):
"""VAE loss function (reconstruction + KL divergence)"""
# Reconstruction loss
recon_loss = F.mse_loss(recon_x, x, reduction='sum')
# KL divergence loss
kl_loss = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
return recon_loss + kl_loss
def generate(self, num_samples, condition=None):
"""Generate synthetic data samples"""
self.model.eval()
with torch.no_grad():
# Sample from latent space
z = torch.randn(num_samples, self.latent_dim).to(self.device)
# Handle conditioning
if condition is not None:
if isinstance(condition, np.ndarray):
condition_tensor = torch.FloatTensor(condition).to(self.device)
else:
condition_tensor = condition.to(self.device)
# Repeat condition for all samples if single condition provided
if condition_tensor.shape[0] == 1 and num_samples > 1:
condition_tensor = condition_tensor.repeat(num_samples, 1)
else:
condition_tensor = None
# Generate samples
generated = self.model.decode(z, condition_tensor)
# Convert back to numpy and denormalize
generated_np = generated.cpu().numpy()
# Denormalize using stored scaler
denormalized = self.preprocessors['scaler'].inverse_transform(generated_np)
# Convert back to DataFrame
df = pd.DataFrame(denormalized, columns=self.preprocessors['feature_names'])
return df
def generate_conditional_samples(self, conditions_dict, num_samples_per_condition=100):
"""Generate samples for specific conditions"""
all_samples = []
for condition_name, condition_value in conditions_dict.items():
# Encode condition
if isinstance(condition_value, str) and 'condition_encoder' in self.preprocessors:
# One-hot encode categorical condition
condition_encoded = np.zeros((1, len(self.preprocessors['condition_encoder'])))
if condition_value in self.preprocessors['condition_encoder']:
idx = list(self.preprocessors['condition_encoder']).index(condition_value)
condition_encoded[0, idx] = 1
else:
condition_encoded = np.array([[condition_value]])
# Generate samples for this condition
samples = self.generate(num_samples_per_condition, condition_encoded)
samples['condition'] = condition_name
all_samples.append(samples)
return pd.concat(all_samples, ignore_index=True)
Usage example
vae_generator = VAESyntheticDataGenerator(latent_dim=32, hidden_dims=[256, 128, 64])
Prepare data
data, condition_data = vae_generator.prepare_data(
real_data,
categorical_columns=['category', 'region'],
condition_column='customer_type'
)
Train the model
vae_generator.train(data, condition_data, epochs=300, batch_size=256)
Generate unconditional samples
synthetic_samples = vae_generator.generate(num_samples=5000)
Generate conditional samples
conditional_samples = vae_generator.generate_conditional_samples({
'premium_customer': 1000,
'standard_customer': 2000,
'budget_customer': 1500
})
Quality Assurance and Validation
Multi-Dimensional Quality Assessment Framework
class ComprehensiveQualityAssessment:
def __init__(self):
self.metrics = {}
self.validation_results = {}
def evaluate_synthetic_quality(self, real_data, synthetic_data, sensitive_columns=None):
"""Comprehensive quality evaluation framework"""
results = {
"statistical_fidelity": self.assess_statistical_fidelity(real_data, synthetic_data),
"utility_preservation": self.assess_utility_preservation(real_data, synthetic_data),
"privacy_protection": self.assess_privacy_protection(real_data, synthetic_data, sensitive_columns),
"bias_detection": self.detect_bias_amplification(real_data, synthetic_data),
"robustness": self.assess_robustness(real_data, synthetic_data),
"overall_score": 0
}
# Calculate weighted overall score
weights = {
"statistical_fidelity": 0.25,
"utility_preservation": 0.25,
"privacy_protection": 0.25,
"bias_detection": 0.15,
"robustness": 0.10
}
overall_score = sum(
results[metric]["score"] * weight
for metric, weight in weights.items()
if "score" in results[metric]
)
results["overall_score"] = overall_score
results["quality_grade"] = self.assign_quality_grade(overall_score)
return results
def assess_statistical_fidelity(self, real_data, synthetic_data):
"""Assess how well synthetic data preserves statistical properties"""
from scipy.stats import ks_2samp, wasserstein_distance
fidelity_scores = {}
# Univariate distribution comparison
univariate_scores = []
for column in real_data.select_dtypes(include=[np.number]).columns:
if column in synthetic_data.columns:
# Kolmogorov-Smirnov test
ks_stat, ks_p = ks_2samp(real_data[column].dropna(), synthetic_data[column].dropna())
# Wasserstein distance (Earth Mover's Distance)
wass_dist = wasserstein_distance(real_data[column].dropna(), synthetic_data[column].dropna())
# Normalize Wasserstein distance by data range
data_range = real_data[column].max() - real_data[column].min()
normalized_wass = wass_dist / data_range if data_range > 0 else 0
univariate_scores.append({
"column": column,
"ks_statistic": ks_stat,
"ks_p_value": ks_p,
"wasserstein_distance": normalized_wass,
"distribution_similarity": 1 - min(1, normalized_wass)
})
# Multivariate distribution comparison
multivariate_score = self.assess_multivariate_similarity(real_data, synthetic_data)
# Calculate overall statistical fidelity score
avg_univariate_similarity = np.mean([score["distribution_similarity"] for score in univariate_scores])
fidelity_scores = {
"univariate_similarity": avg_univariate_similarity,
"multivariate_similarity": multivariate_score,
"detailed_univariate": univariate_scores,
"score": (avg_univariate_similarity + multivariate_score) / 2
}
return fidelity_scores
def assess_multivariate_similarity(self, real_data, synthetic_data):
"""Assess multivariate distribution similarity"""
numeric_cols = real_data.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
return 1.0
# Correlation matrix comparison
real_corr = real_data[numeric_cols].corr()
synthetic_corr = synthetic_data[numeric_cols].corr()
# Frobenius norm of correlation difference
corr_diff = np.linalg.norm(real_corr - synthetic_corr, 'fro')
max_possible_diff = np.linalg.norm(np.ones_like(real_corr) * 2, 'fro')
correlation_similarity = 1 - (corr_diff / max_possible_diff)
return max(0, correlation_similarity)
def assess_utility_preservation(self, real_data, synthetic_data):
"""Assess how well synthetic data preserves utility for downstream tasks"""
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
utility_scores = {}
# Test utility preservation for classification/regression tasks
numeric_cols = real_data.select_dtypes(include=[np.number]).columns
categorical_cols = real_data.select_dtypes(include=['object', 'category']).columns
# For each numeric column, test prediction performance
prediction_scores = []
for target_col in numeric_cols[:3]: # Test on first 3 numeric columns
try:
# Prepare features (all columns except target)
feature_cols = [col for col in real_data.columns if col != target_col]
# Encode categorical features
real_features = real_data[feature_cols].copy()
synthetic_features = synthetic_data[feature_cols].copy()
for cat_col in categorical_cols:
if cat_col in feature_cols:
le = LabelEncoder()
# Fit on combined data to handle new categories
combined_values = pd.concat([real_features[cat_col], synthetic_features[cat_col]]).astype(str)
le.fit(combined_values)
real_features[cat_col] = le.transform(real_features[cat_col].astype(str))
synthetic_features[cat_col] = le.transform(synthetic_features[cat_col].astype(str))
# Fill missing values
real_features = real_features.fillna(0)
synthetic_features = synthetic_features.fillna(0)
# Train on real data, test utility on synthetic data
if real_data[target_col].dtype in ['object', 'category']:
model = RandomForestClassifier(n_estimators=50, random_state=42)
le_target = LabelEncoder()
real_target = le_target.fit_transform(real_data[target_col].astype(str))
synthetic_target = le_target.transform(synthetic_data[target_col].astype(str))
else:
model = RandomForestRegressor(n_estimators=50, random_state=42)
real_target = real_data[target_col]
synthetic_target = synthetic_data[target_col]
# Train on real, evaluate on real
real_score = np.mean(cross_val_score(model, real_features, real_target, cv=3))
# Train on synthetic, evaluate on synthetic
synthetic_score = np.mean(cross_val_score(model, synthetic_features, synthetic_target, cv=3))
# Utility preservation score
utility_preservation = min(1.0, synthetic_score / real_score) if real_score > 0 else 0
prediction_scores.append({
"target_column": target_col,
"real_performance": real_score,
"synthetic_performance": synthetic_score,
"utility_preservation": utility_preservation
})
except Exception as e:
print(f"Warning: Could not assess utility for column {target_col}: {e}")
avg_utility_preservation = np.mean([score["utility_preservation"] for score in prediction_scores]) if prediction_scores else 0
utility_scores = {
"prediction_utility": avg_utility_preservation,
"detailed_predictions": prediction_scores,
"score": avg_utility_preservation
}
return utility_scores
def assess_privacy_protection(self, real_data, synthetic_data, sensitive_columns=None):
"""Assess privacy protection through various metrics"""
privacy_scores = {}
# Membership inference attack resistance
membership_score = self.assess_membership_inference_resistance(real_data, synthetic_data)
# Attribute inference attack resistance
attribute_score = self.assess_attribute_inference_resistance(real_data, synthetic_data, sensitive_columns)
# K-anonymity assessment
k_anonymity_score = self.assess_k_anonymity(real_data, synthetic_data, sensitive_columns)
# Distance-based privacy
distance_score = self.assess_distance_privacy(real_data, synthetic_data)
overall_privacy_score = np.mean([membership_score, attribute_score, k_anonymity_score, distance_score])
privacy_scores = {
"membership_inference_resistance": membership_score,
"attribute_inference_resistance": attribute_score,
"k_anonymity": k_anonymity_score,
"distance_privacy": distance_score,
"score": overall_privacy_score
}
return privacy_scores
def detect_bias_amplification(self, real_data, synthetic_data):
"""Detect if synthetic data amplifies biases present in real data"""
bias_scores = {}
# Statistical parity across groups
categorical_cols = real_data.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) >= 2:
# Test for bias amplification between categorical variables
bias_metrics = []
for col1 in categorical_cols[:2]: # Test first 2 categorical columns
for col2 in categorical_cols[:2]:
if col1 != col2:
# Calculate conditional distributions
real_conditional = pd.crosstab(real_data[col1], real_data[col2], normalize='index')
synthetic_conditional = pd.crosstab(synthetic_data[col1], synthetic_data[col2], normalize='index')
# Calculate bias amplification factor
common_values = real_conditional.index.intersection(synthetic_conditional.index)
common_cols = real_conditional.columns.intersection(synthetic_conditional.columns)
if len(common_values) > 0 and len(common_cols) > 0:
real_subset = real_conditional.loc[common_values, common_cols]
synthetic_subset = synthetic_conditional.loc[common_values, common_cols]
# Calculate statistical distance
bias_amplification = np.mean(np.abs(synthetic_subset - real_subset))
bias_metrics.append({
"group_column": col1,
"target_column": col2,
"bias_amplification": bias_amplification
})
avg_bias_amplification = np.mean([metric["bias_amplification"] for metric in bias_metrics])
bias_score = 1 - min(1, avg_bias_amplification * 2) # Scale and invert
bias_scores = {
"bias_amplification_score": bias_score,
"detailed_bias_metrics": bias_metrics,
"score": bias_score
}
else:
bias_scores = {
"bias_amplification_score": 1.0,
"note": "Insufficient categorical columns for bias assessment",
"score": 1.0
}
return bias_scores
def assign_quality_grade(self, overall_score):
"""Assign quality grade based on overall score"""
if overall_score >= 0.9:
return "A+ (Excellent)"
elif overall_score >= 0.8:
return "A (Very Good)"
elif overall_score >= 0.7:
return "B+ (Good)"
elif overall_score >= 0.6:
return "B (Acceptable)"
elif overall_score >= 0.5:
return "C (Needs Improvement)"
else:
return "D (Poor Quality)"
Usage example
quality_assessor = ComprehensiveQualityAssessment()
Evaluate synthetic data quality
quality_results = quality_assessor.evaluate_synthetic_quality(
real_data=original_dataset,
synthetic_data=generated_dataset,
sensitive_columns=['age', 'income', 'location']
)
print(f"Overall Quality Score: {quality_results['overall_score']:.3f}")
print(f"Quality Grade: {quality_results['quality_grade']}")
print("\nDetailed Results:")
for metric, results in quality_results.items():
if isinstance(results, dict) and 'score' in results:
print(f" {metric}: {results['score']:.3f}")
Harness the power of cutting-edge generative AI to create synthetic data with unprecedented realism and utility! Our advanced AI techniques ensure your datasets maintain perfect balance between statistical fidelity, privacy protection, and practical utility for any application.
Data Field Types Visualization
Interactive diagram showing all supported data types and their relationships
Export Formats
Visual guide to JSON, CSV, SQL, and XML output formats
Integration Examples
Code snippets showing integration with popular frameworks
Ready to Generate Your Data?
Start creating high-quality synthetic data in minutes with our powerful, AI-driven generator. No registration required, unlimited usage.
Start Generating Now - Free