Advanced Architectures Overview
Advanced architectures handle complex requirements. They use multi-vector embeddings. They support temporal search. They employ ensemble methods. They optimize indexing strategies.
Advanced architectures improve performance. They handle complexity. They enable sophisticated applications. They optimize for scale.
The diagram shows advanced architecture components. Multi-vector handles complexity. Temporal search handles time. Ensembles improve performance.
Multi-vector Embeddings
Multi-vector embeddings use multiple vectors per document. They capture different aspects. They improve retrieval coverage. They handle complex documents.
Methods include sentence-level, chunk-level, and aspect-based embeddings. Each captures different information. Combined they improve retrieval.
# Multi-vector Embeddingsdef create_multi_vectors(document):# Multiple embedding strategiessentence_embs = embed_sentences(document)chunk_embs = embed_chunks(document)aspect_embs = embed_aspects(document)return {'sentences': sentence_embs,'chunks': chunk_embs,'aspects': aspect_embs}def multi_vector_search(query, multi_vectors):query_emb = embed_query(query)# Search across all vector typesall_results = []for doc_id, vectors in multi_vectors.items():for vec_type, embs in vectors.items():scores = compute_similarity(query_emb, embs)all_results.append((doc_id, vec_type, max(scores)))# Aggregate and rankreturn aggregate_results(all_results)
Multi-vector embeddings improve coverage. They capture document complexity. They enable better retrieval.
Temporal Search Patterns
Temporal search handles time-sensitive information. It considers document timestamps. It prioritizes recent information. It enables time-based queries.
Patterns include recency boosting, time-weighted scoring, and temporal filtering. Each handles time differently. Combined they improve temporal relevance.
# Temporal Searchdef temporal_search(query, documents, timestamps, recency_weight=0.3):# Relevance scoresrelevance = compute_relevance(query, documents)# Recency scoresmax_time = max(timestamps)recency = [1.0 / (1 + (max_time - t).days) for t in timestamps]recency = normalize(recency)# Combined scoresscores = (1 - recency_weight) * relevance + recency_weight * recencyreturn rank_by_scores(scores)
Temporal search handles time-sensitive queries. It prioritizes recent information. It improves temporal relevance.
Ensemble Methods
Ensemble methods combine multiple models. They improve performance. They reduce variance. They increase robustness.
Methods include voting, averaging, and stacking. Voting combines predictions. Averaging combines probabilities. Stacking uses meta-learner.
# Ensemble Methodsdef ensemble_predict(models, input_data):predictions = []for model in models:pred = model.predict(input_data)predictions.append(pred)# Votingvoted = majority_vote(predictions)# Averagingaveraged = np.mean(predictions, axis=0)# Stackingstacked = meta_learner.predict(predictions)return stacked
Ensemble methods improve performance. They combine model strengths. They reduce individual weaknesses.
Detailed Ensemble Techniques
Voting ensembles combine predictions from multiple models. Hard voting uses majority class. Soft voting averages probabilities. Voting works well when models are diverse. It reduces individual model errors.
Averaging ensembles average predictions. For regression, average numeric predictions. For classification, average probability distributions. Averaging reduces variance. It improves stability.
Stacking uses meta-learner. Base models make predictions. Meta-learner learns to combine predictions. It learns optimal combination. It often performs best. It requires more data.
# Detailed Ensemble Implementationfrom sklearn.ensemble import VotingClassifier, VotingRegressor, StackingClassifierfrom sklearn.linear_model import LogisticRegression, LinearRegressionfrom sklearn.tree import DecisionTreeClassifierfrom sklearn.svm import SVCimport numpy as npclass EnsembleMethods:def __init__(self):self.models = []def hard_voting(self, models, X):"""Hard voting ensemble"""predictions = []for model in models:pred = model.predict(X)predictions.append(pred)# Majority votepredictions = np.array(predictions)final_pred = []for i in range(X.shape[0]):votes = predictions[:, i]final_pred.append(np.bincount(votes.astype(int)).argmax())return np.array(final_pred)def soft_voting(self, models, X):"""Soft voting ensemble"""probabilities = []for model in models:if hasattr(model, 'predict_proba'):proba = model.predict_proba(X)probabilities.append(proba)# Average probabilitiesavg_proba = np.mean(probabilities, axis=0)return np.argmax(avg_proba, axis=1)def stacking_ensemble(self, base_models, meta_model, X_train, y_train, X_test):"""Stacking ensemble"""# Train base modelsbase_predictions = []for model in base_models:model.fit(X_train, y_train)pred = model.predict_proba(X_test) if hasattr(model, 'predict_proba') else model.predict(X_test)base_predictions.append(pred)# Stack predictionsstacked_X = np.column_stack(base_predictions)# Train meta-learnermeta_model.fit(stacked_X, y_train)# Predictreturn meta_model.predict(stacked_X)def bagging_ensemble(self, base_model, X_train, y_train, n_estimators=10):"""Bootstrap aggregating"""from sklearn.utils import resamplemodels = []for i in range(n_estimators):# Bootstrap sampleX_boot, y_boot = resample(X_train, y_train, random_state=i)# Train model on bootstrap samplemodel = type(base_model)(**base_model.get_params())model.fit(X_boot, y_boot)models.append(model)return modelsdef predict_bagging(self, models, X):"""Predict using bagging ensemble"""predictions = []for model in models:pred = model.predict(X)predictions.append(pred)# Average predictionsreturn np.mean(predictions, axis=0)# Exampleensemble = EnsembleMethods()base_models = [LogisticRegression(),DecisionTreeClassifier(),SVC(probability=True)]# Hard voting# hard_pred = ensemble.hard_voting(base_models, X_test)# Soft voting# soft_pred = ensemble.soft_voting(base_models, X_test)# Stackingmeta_model = LogisticRegression()# stacked_pred = ensemble.stacking_ensemble(base_models, meta_model, X_train, y_train, X_test)
Ensemble Selection and Optimization
Select diverse base models. Different algorithms learn different patterns. Different architectures capture different features. Diversity improves ensemble performance. Similar models provide little benefit.
Optimize ensemble size. More models improve performance but increase computation. Diminishing returns occur after certain point. Typical ensemble size is 5-20 models. Test different sizes to find optimal.
Weight ensemble members. Some models perform better. Assign higher weights to better models. Learn weights from validation data. Weighted combination improves performance.
# Ensemble Optimizationclass EnsembleOptimizer:def __init__(self):self.model_weights = Nonedef optimize_weights(self, models, X_val, y_val):"""Optimize ensemble weights using validation data"""from scipy.optimize import minimize# Get predictions from all modelspredictions = []for model in models:if hasattr(model, 'predict_proba'):pred = model.predict_proba(X_val)else:pred = model.predict(X_val)predictions.append(pred)predictions = np.array(predictions)# Objective function: minimize error with weighted combinationdef objective(weights):weighted_pred = np.tensordot(weights, predictions, axes=1)if len(weighted_pred.shape) > 1:weighted_pred = np.argmax(weighted_pred, axis=1)error = np.mean(weighted_pred != y_val)return error# Constraint: weights sum to 1constraints = {'type': 'eq', 'fun': lambda w: np.sum(w) - 1}bounds = [(0, 1) for _ in range(len(models))]# Initial weights (equal)initial_weights = np.ones(len(models)) / len(models)# Optimizeresult = minimize(objective, initial_weights, method='SLSQP',bounds=bounds, constraints=constraints)self.model_weights = result.xreturn self.model_weightsdef weighted_ensemble_predict(self, models, X):"""Predict using optimized weights"""predictions = []for model in models:if hasattr(model, 'predict_proba'):pred = model.predict_proba(X)else:pred = model.predict(X)predictions.append(pred)predictions = np.array(predictions)weighted_pred = np.tensordot(self.model_weights, predictions, axes=1)if len(weighted_pred.shape) > 1:return np.argmax(weighted_pred, axis=1)return weighted_pred# Exampleoptimizer = EnsembleOptimizer()# weights = optimizer.optimize_weights(base_models, X_val, y_val)# weighted_pred = optimizer.weighted_ensemble_predict(base_models, X_test)
Advanced Indexing Strategies
Advanced indexing optimizes search performance. It uses specialized structures. It handles high dimensions. It scales to large datasets.
Strategies include HNSW, IVF, and product quantization. Each optimizes different aspects. Combined they enable scale.
# Advanced Indexingimport faiss# HNSW indexindex_hnsw = faiss.IndexHNSWFlat(dimension, M=16)index_hnsw.add(vectors)# IVF indexquantizer = faiss.IndexFlatL2(dimension)index_ivf = faiss.IndexIVFFlat(quantizer, dimension, nlist=100)index_ivf.train(vectors)index_ivf.add(vectors)# Product quantizationindex_pq = faiss.IndexPQ(dimension, M=8, nbits=8)index_pq.train(vectors)index_pq.add(vectors)
Advanced indexing enables scale. It optimizes performance. It handles large datasets.
Complex Architecture Designs
Complex architectures combine multiple techniques. They optimize for specific requirements. They balance tradeoffs. They enable sophisticated applications.
Designs include multi-stage retrieval, cascading models, and adaptive systems. Each handles complexity differently. Combined they enable advanced applications.
# Complex Architectureclass AdvancedSearchSystem:def __init__(self):self.retrievers = [semantic_retriever, keyword_retriever, hybrid_retriever]self.reranker = cross_encoder_rerankerself.generator = llm_generatordef search(self, query):# Multi-stage retrievalcandidates = []for retriever in self.retrievers:results = retriever.retrieve(query, top_k=20)candidates.extend(results)# Deduplicatecandidates = deduplicate(candidates)# Rerankreranked = self.reranker.rerank(query, candidates, top_k=10)# Generatecontext = format_context(reranked)answer = self.generator.generate(query, context)return answer
Complex architectures enable advanced applications. They combine techniques effectively. They optimize for requirements.
Detailed Architecture Design Patterns
Multi-stage retrieval uses multiple retrieval passes. First stage uses fast approximate search. It retrieves large candidate set. Second stage uses accurate reranking. It selects final results. This balances speed and accuracy.
Cascading models use multiple models in sequence. Early models filter candidates quickly. Later models provide accurate predictions. Each model has different speed-accuracy tradeoff. This optimizes overall performance.
Adaptive systems adjust behavior dynamically. They monitor performance metrics. They switch strategies based on conditions. They optimize for current workload. They improve efficiency.
# Detailed Architecture Patternsclass MultiStageRetrieval:def __init__(self):self.fast_retriever = FastApproximateRetriever() # Fast, approximateself.accurate_reranker = AccurateReranker() # Slow, accuratedef retrieve(self, query, top_k=10):# Stage 1: Fast approximate retrievalcandidates = self.fast_retriever.retrieve(query, top_k=100)# Stage 2: Accurate rerankingfinal_results = self.accurate_reranker.rerank(query, candidates, top_k=top_k)return final_resultsclass CascadingModels:def __init__(self):self.fast_model = FastModel() # Quick filteringself.accurate_model = AccurateModel() # Precise predictiondef predict(self, input_data):# Stage 1: Fast filteringfast_prediction = self.fast_model.predict(input_data)# Only use accurate model if neededif fast_prediction.confidence < 0.8:accurate_prediction = self.accurate_model.predict(input_data)return accurate_predictionelse:return fast_predictionclass AdaptiveSystem:def __init__(self):self.strategies = {'fast': FastStrategy(),'balanced': BalancedStrategy(),'accurate': AccurateStrategy()}self.current_strategy = 'balanced'self.metrics = {'latency': [], 'accuracy': []}def adapt(self):avg_latency = np.mean(self.metrics['latency'][-100:])avg_accuracy = np.mean(self.metrics['accuracy'][-100:])if avg_latency > 1.0:self.current_strategy = 'fast'elif avg_accuracy < 0.8:self.current_strategy = 'accurate'else:self.current_strategy = 'balanced'def process(self, input_data):result = self.strategies[self.current_strategy].process(input_data)self.metrics['latency'].append(result.latency)self.metrics['accuracy'].append(result.accuracy)self.adapt()return result
Attention Mechanisms
Attention mechanisms enable models to focus on relevant information. Self-attention processes all positions. Cross-attention connects different sequences. Multi-head attention captures multiple patterns.
The diagram shows attention types. Self-attention connects all tokens. Cross-attention connects query and keys. Multi-head attention uses parallel heads. Each captures different relationships.
Detailed Attention Mechanism Mathematics
Self-attention computes attention(Q, K, V) = softmax(QKᵀ / √dₖ) V. Q, K, V are query, key, and value matrices. Each row represents a token position. QKᵀ computes similarity between all position pairs. Division by √dₖ prevents large values. Softmax converts to probabilities. V provides content to attend to.
Scaled dot-product attention uses dot products. It is computationally efficient. It works well in practice. It requires O(n²) computation for sequence length n. This limits maximum sequence length.
Attention weights show what each position attends to. They are interpretable. They reveal model focus. They help debug models. They enable visualization.
# Detailed Attention Mathematicsimport torchimport torch.nn as nnimport numpy as npimport matplotlib.pyplot as pltclass AttentionMechanismDetailed:def __init__(self, d_model, d_k=None, d_v=None):self.d_model = d_modelself.d_k = d_k if d_k else d_modelself.d_v = d_v if d_v else d_modelself.W_q = nn.Linear(d_model, self.d_k)self.W_k = nn.Linear(d_model, self.d_k)self.W_v = nn.Linear(d_model, self.d_v)def scaled_dot_product_attention(self, Q, K, V, mask=None):"""Compute scaled dot-product attention"""# Compute attention scoresscores = torch.matmul(Q, K.transpose(-2, -1)) / np.sqrt(self.d_k)# Apply mask if providedif mask is not None:scores = scores.masked_fill(mask == 0, -1e9)# Softmaxattention_weights = torch.softmax(scores, dim=-1)# Apply to valuesoutput = torch.matmul(attention_weights, V)return output, attention_weightsdef forward(self, x, mask=None):"""Forward pass"""Q = self.W_q(x)K = self.W_k(x)V = self.W_v(x)output, attention_weights = self.scaled_dot_product_attention(Q, K, V, mask)return output, attention_weightsdef visualize_attention(self, attention_weights, tokens):"""Visualize attention weights"""weights = attention_weights[0].detach().numpy() # First head, first sampleplt.figure(figsize=(10, 8))plt.imshow(weights, cmap='Blues')plt.xlabel('Key Position')plt.ylabel('Query Position')plt.title('Attention Weights Visualization')plt.colorbar()plt.show()# Exampleattention = AttentionMechanismDetailed(d_model=512)x = torch.randn(1, 10, 512) # batch=1, seq_len=10, d_model=512output, weights = attention(x)print("Output shape: " + str(output.shape))print("Attention weights shape: " + str(weights.shape))print("Attention weights sum (should be 1): " + str(weights.sum(dim=-1)[0, 0].item()))
Attention Variants and Optimizations
Sparse attention reduces computation. It attends to subset of positions. It uses patterns or learned sparsity. It scales to longer sequences. It maintains quality.
Linear attention uses kernel methods. It reduces complexity from O(n²) to O(n). It approximates softmax attention. It enables longer sequences. It trades some accuracy for speed.
Flash attention optimizes memory usage. It computes attention in blocks. It reduces memory from O(n²) to O(n). It speeds up training. It enables larger batch sizes.
# Attention Variantsclass SparseAttention:def __init__(self, d_model, window_size=3):self.d_model = d_modelself.window_size = window_sizedef forward(self, x):"""Local window attention"""batch_size, seq_len, d_model = x.shapeoutput = torch.zeros_like(x)for i in range(seq_len):# Attend to local windowstart = max(0, i - self.window_size)end = min(seq_len, i + self.window_size + 1)# Compute attention for windowwindow = x[:, start:end, :]# ... attention computation ...return outputclass LinearAttention:def __init__(self, d_model):self.d_model = d_modelself.W_q = nn.Linear(d_model, d_model)self.W_k = nn.Linear(d_model, d_model)self.W_v = nn.Linear(d_model, d_model)def forward(self, x):"""Linear attention using kernel trick"""Q = self.W_q(x)K = self.W_k(x)V = self.W_v(x)# Use ReLU as kernel functionQ_kernel = torch.relu(Q)K_kernel = torch.relu(K)# Linear complexity: O(n) instead of O(n²)KV = torch.matmul(K_kernel.transpose(-2, -1), V)Z = torch.matmul(Q_kernel, KV)# Normalizenormalizer = torch.matmul(Q_kernel, K_kernel.sum(dim=-2, keepdim=True))output = Z / (normalizer + 1e-8)return output# Compare complexitiesprint("Standard attention: O(n²) complexity")print("Sparse attention: O(n×w) complexity where w is window size")print("Linear attention: O(n) complexity")
Encoder-Decoder Architectures
Encoder-decoder architectures process input-output sequences. Encoders process input. Decoders generate output. Attention connects them.
The diagram shows encoder-decoder structure. Encoder processes source sequence. Decoder generates target sequence. Cross-attention connects them. Enables sequence-to-sequence tasks.
Graph Neural Networks
Graph neural networks process graph-structured data. They handle nodes and edges. They capture relationships. They work for social networks and knowledge graphs.
The diagram shows graph structure. Nodes represent entities. Edges represent relationships. Networks process graph information. Enable relationship learning.
Detailed Graph Neural Network Implementation
Graph neural networks process graph-structured data. They aggregate neighbor information. They update node representations. They capture graph structure.
Message passing is core mechanism. Each node collects messages from neighbors. Messages contain neighbor features. Aggregation combines messages. Update function computes new node representation.
# Detailed GNN Implementationimport torchimport torch.nn as nnimport torch.nn.functional as Ffrom torch_geometric.nn import MessagePassingfrom torch_geometric.data import Dataclass GCNLayer(MessagePassing):"""Graph Convolutional Network Layer"""def __init__(self, in_channels, out_channels):super(GCNLayer, self).__init__(aggr='add')self.lin = nn.Linear(in_channels, out_channels)def forward(self, x, edge_index):# Linear transformationx = self.lin(x)# Message passingreturn self.propagate(edge_index, x=x)def message(self, x_j):"""Message from neighbor j to node i"""return x_jdef update(self, aggr_out):"""Update node representation"""return aggr_outclass GraphNeuralNetwork(nn.Module):def __init__(self, input_dim, hidden_dim, output_dim, num_layers=2):super().__init__()self.layers = nn.ModuleList()# First layerself.layers.append(GCNLayer(input_dim, hidden_dim))# Hidden layersfor _ in range(num_layers - 2):self.layers.append(GCNLayer(hidden_dim, hidden_dim))# Output layerself.layers.append(GCNLayer(hidden_dim, output_dim))def forward(self, x, edge_index):for layer in self.layers:x = layer(x, edge_index)x = F.relu(x)return x# Example# Create graph datanum_nodes = 5x = torch.randn(num_nodes, 10) # Node featuresedge_index = torch.tensor([[0, 1, 1, 2, 2, 3, 3, 4],[1, 0, 2, 1, 3, 2, 4, 3]], dtype=torch.long)# Create GNNgnn = GraphNeuralNetwork(input_dim=10, hidden_dim=16, output_dim=2, num_layers=2)output = gnn(x, edge_index)print("GNN output shape: " + str(output.shape))
GNN Variants and Applications
Graph Convolutional Networks use spectral graph theory. They filter signals on graphs. They work well for node classification. They scale to large graphs.
Graph Attention Networks use attention mechanisms. They learn importance of neighbors. They adapt to different graph structures. They improve performance on many tasks.
GraphSAGE samples and aggregates neighbors. It works for large graphs. It generalizes to unseen nodes. It enables inductive learning.
# GNN Variantsclass GraphAttentionLayer(nn.Module):"""Graph Attention Network Layer"""def __init__(self, in_features, out_features, dropout=0.1, alpha=0.2):super().__init__()self.in_features = in_featuresself.out_features = out_featuresself.dropout = dropoutself.alpha = alphaself.W = nn.Parameter(torch.empty(size=(in_features, out_features)))self.a = nn.Parameter(torch.empty(size=(2*out_features, 1)))self.reset_parameters()def reset_parameters(self):nn.init.xavier_uniform_(self.W.data, gain=1.414)nn.init.xavier_uniform_(self.a.data, gain=1.414)def forward(self, h, adj):Wh = torch.mm(h, self.W)e = self._prepare_attentional_mechanism_input(Wh)e = F.leaky_relu(e, negative_slope=self.alpha)attention = F.softmax(e, dim=1)attention = F.dropout(attention, self.dropout, training=self.training)h_prime = torch.matmul(attention, Wh)return h_primedef _prepare_attentional_mechanism_input(self, Wh):N = Wh.size()[0]Wh1 = torch.matmul(Wh, self.a[:self.out_features, :])Wh2 = torch.matmul(Wh, self.a[self.out_features:, :])e = Wh1 + Wh2.Treturn e# GraphSAGE implementationclass GraphSAGELayer(nn.Module):def __init__(self, in_features, out_features):super().__init__()self.linear = nn.Linear(in_features * 2, out_features)def forward(self, x, adj, sample_size=5):# Sample neighborssampled_neighbors = self.sample_neighbors(adj, sample_size)# Aggregate neighbor featuresneighbor_features = x[sampled_neighbors]aggregated = torch.mean(neighbor_features, dim=1)# Concatenate and transformcombined = torch.cat([x, aggregated], dim=1)output = self.linear(combined)return F.relu(output)def sample_neighbors(self, adj, sample_size):# Simplified neighbor samplingreturn torch.randint(0, adj.size(0), (adj.size(0), sample_size))# Example usage# gat_layer = GraphAttentionLayer(10, 16)# sage_layer = GraphSAGELayer(10, 16)
Diffusion Models
Diffusion models generate data through iterative denoising. Forward process adds noise. Reverse process removes noise. They generate high-quality images and audio.
The diagram shows diffusion process. Forward adds noise gradually. Reverse removes noise iteratively. Generates new samples. Works for images and audio.
Detailed Diffusion Model Implementation
Diffusion models learn to reverse noise process. Forward process adds Gaussian noise. q(x_t | x_{t-1}) = N(x_t; √(1-β_t)x_{t-1}, β_t I). β_t is noise schedule. It increases over time. Eventually data becomes pure noise.
Reverse process learns to denoise. p_θ(x_{t-1} | x_t) predicts previous step. Model learns to predict noise. It subtracts predicted noise. It recovers original data.
Training objective minimizes noise prediction error. L = E[||ε - ε_θ(x_t, t)||²]. ε is actual noise. ε_θ is predicted noise. Model learns to predict noise at each timestep.
# Detailed Diffusion Model Implementationimport torchimport torch.nn as nnimport numpy as npclass DiffusionModel(nn.Module):def __init__(self, input_dim, hidden_dim=256, num_timesteps=1000):super().__init__()self.num_timesteps = num_timesteps# Noise scheduleself.betas = self.linear_beta_schedule(num_timesteps)self.alphas = 1.0 - self.betasself.alphas_cumprod = torch.cumprod(self.alphas, dim=0)# Noise prediction networkself.network = nn.Sequential(nn.Linear(input_dim + 1, hidden_dim), # +1 for timestepnn.ReLU(),nn.Linear(hidden_dim, hidden_dim),nn.ReLU(),nn.Linear(hidden_dim, input_dim))def linear_beta_schedule(self, timesteps, start=0.0001, end=0.02):"""Linear noise schedule"""return torch.linspace(start, end, timesteps)def forward_process(self, x_0, t):"""Forward diffusion process"""sqrt_alphas_cumprod_t = torch.sqrt(self.alphas_cumprod[t])sqrt_one_minus_alphas_cumprod_t = torch.sqrt(1.0 - self.alphas_cumprod[t])noise = torch.randn_like(x_0)x_t = sqrt_alphas_cumprod_t * x_0 + sqrt_one_minus_alphas_cumprod_t * noisereturn x_t, noisedef reverse_process(self, x_t, t):"""Reverse diffusion process"""# Predict noiset_tensor = t.float().unsqueeze(-1)network_input = torch.cat([x_t, t_tensor], dim=-1)predicted_noise = self.network(network_input)# Denoisealpha_t = self.alphas[t]alpha_cumprod_t = self.alphas_cumprod[t]beta_t = self.betas[t]pred_x_0 = (x_t - torch.sqrt(1.0 - alpha_cumprod_t) * predicted_noise) / torch.sqrt(alpha_cumprod_t)pred_x_0 = torch.clamp(pred_x_0, -1.0, 1.0)# Predict x_{t-1}pred_x_prev = (1.0 / torch.sqrt(alpha_t)) * (x_t - beta_t / torch.sqrt(1.0 - alpha_cumprod_t) * predicted_noise)if t[0] > 0:posterior_variance = beta_t * (1.0 - self.alphas_cumprod[t-1]) / (1.0 - alpha_cumprod_t)noise = torch.randn_like(x_t)pred_x_prev = pred_x_prev + torch.sqrt(posterior_variance) * noisereturn pred_x_prevdef sample(self, shape, device):"""Generate samples"""x = torch.randn(shape, device=device)for t in range(self.num_timesteps - 1, -1, -1):t_tensor = torch.full((shape[0],), t, device=device, dtype=torch.long)x = self.reverse_process(x, t_tensor)return x# Examplemodel = DiffusionModel(input_dim=784) # For 28x28 imagesx_0 = torch.randn(32, 784) # Batch of 32 samplest = torch.randint(0, 1000, (32,))# Forward processx_t, noise = model.forward_process(x_0, t)print("Noisy sample shape: " + str(x_t.shape))# Reverse processx_prev = model.reverse_process(x_t, t)print("Denoised sample shape: " + str(x_prev.shape))
Diffusion Model Training and Sampling
Training samples random timesteps. It adds noise to data. It predicts added noise. It minimizes prediction error. Process is straightforward.
Sampling starts from pure noise. It iteratively denoises. Each step removes noise. It gradually recovers data. Many steps required for quality.
DDPM uses fixed number of steps. DDIM uses fewer steps. It accelerates sampling. It maintains quality. It enables faster generation.
# Diffusion Training and Samplingdef train_diffusion_model(model, dataloader, num_epochs=100, device='cuda'):"""Train diffusion model"""optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)model = model.to(device)for epoch in range(num_epochs):total_loss = 0for batch in dataloader:x_0 = batch.to(device)batch_size = x_0.shape[0]# Sample random timestepst = torch.randint(0, model.num_timesteps, (batch_size,), device=device)# Forward processx_t, noise = model.forward_process(x_0, t)# Predict noiset_tensor = t.float().unsqueeze(-1)network_input = torch.cat([x_t, t_tensor], dim=-1)predicted_noise = model.network(network_input)# Lossloss = nn.functional.mse_loss(predicted_noise, noise)# Backwardoptimizer.zero_grad()loss.backward()optimizer.step()total_loss += loss.item()print(f"Epoch {epoch+1}, Loss: {total_loss/len(dataloader):.4f}")def sample_ddpm(model, num_samples=10, device='cuda'):"""Sample using DDPM"""shape = (num_samples, model.input_dim)x = torch.randn(shape, device=device)for t in range(model.num_timesteps - 1, -1, -1):t_tensor = torch.full((num_samples,), t, device=device, dtype=torch.long)x = model.reverse_process(x, t_tensor)return xdef sample_ddim(model, num_samples=10, num_steps=50, device='cuda'):"""Sample using DDIM (faster)"""shape = (num_samples, model.input_dim)x = torch.randn(shape, device=device)# Use fewer stepsstep_size = model.num_timesteps // num_stepsfor i in range(num_steps - 1, -1, -1):t = i * step_sizet_tensor = torch.full((num_samples,), t, device=device, dtype=torch.long)x = model.reverse_process(x, t_tensor)return x# Example# train_diffusion_model(model, dataloader)# samples_ddpm = sample_ddpm(model, num_samples=10)# samples_ddim = sample_ddim(model, num_samples=10, num_steps=50)
Reinforcement Learning
Reinforcement learning learns from interaction. Agents take actions. Environments provide rewards. Policies improve over time. Enables game playing and robotics.
The diagram shows RL loop. Agent observes state. Agent takes action. Environment provides reward. Agent updates policy. Process repeats for learning.
Detailed Reinforcement Learning Algorithms
Q-learning learns action-value function. Q(s, a) estimates expected return. It uses Bellman equation. Q(s, a) = r + γ max Q(s', a'). It learns optimal policy. It works for discrete actions.
Policy gradient methods learn policy directly. They maximize expected return. They use gradient ascent. They work for continuous actions. They require more samples.
Actor-critic combines value and policy methods. Actor learns policy. Critic learns value function. Critic guides actor updates. It reduces variance. It improves learning.
# Detailed RL Implementationimport numpy as npimport torchimport torch.nn as nnimport torch.optim as optimclass QNetwork(nn.Module):"""Q-learning network"""def __init__(self, state_dim, action_dim, hidden_dim=64):super().__init__()self.fc1 = nn.Linear(state_dim, hidden_dim)self.fc2 = nn.Linear(hidden_dim, hidden_dim)self.fc3 = nn.Linear(hidden_dim, action_dim)def forward(self, state):x = torch.relu(self.fc1(state))x = torch.relu(self.fc2(x))return self.fc3(x)class QLearning:def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon=1.0):self.q_network = QNetwork(state_dim, action_dim)self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)self.gamma = gammaself.epsilon = epsilonself.epsilon_decay = 0.995self.epsilon_min = 0.01def select_action(self, state, training=True):"""Epsilon-greedy action selection"""if training and np.random.random() < self.epsilon:return np.random.randint(self.q_network.fc3.out_features)else:with torch.no_grad():q_values = self.q_network(torch.FloatTensor(state))return q_values.argmax().item()def update(self, state, action, reward, next_state, done):"""Update Q-network"""state_tensor = torch.FloatTensor(state)next_state_tensor = torch.FloatTensor(next_state)current_q = self.q_network(state_tensor)[action]if done:target_q = rewardelse:next_q = self.q_network(next_state_tensor).max()target_q = reward + self.gamma * next_qloss = nn.functional.mse_loss(current_q, target_q)self.optimizer.zero_grad()loss.backward()self.optimizer.step()if self.epsilon > self.epsilon_min:self.epsilon *= self.epsilon_decayreturn loss.item()class PolicyGradient:"""Policy gradient method"""def __init__(self, state_dim, action_dim, lr=0.001):self.policy_network = nn.Sequential(nn.Linear(state_dim, 64),nn.ReLU(),nn.Linear(64, action_dim),nn.Softmax(dim=-1))self.optimizer = optim.Adam(self.policy_network.parameters(), lr=lr)def select_action(self, state):"""Sample action from policy"""probs = self.policy_network(torch.FloatTensor(state))action = torch.distributions.Categorical(probs).sample()return action.item(), probs[action].item()def update(self, states, actions, rewards, log_probs):"""Update policy using REINFORCE"""returns = []G = 0for reward in reversed(rewards):G = reward + 0.99 * Greturns.insert(0, G)returns = torch.FloatTensor(returns)returns = (returns - returns.mean()) / (returns.std() + 1e-8)policy_loss = []for log_prob, G in zip(log_probs, returns):policy_loss.append(-log_prob * G)loss = torch.stack(policy_loss).sum()self.optimizer.zero_grad()loss.backward()self.optimizer.step()return loss.item()# Example# q_learning = QLearning(state_dim=4, action_dim=2)# policy_gradient = PolicyGradient(state_dim=4, action_dim=2)
RL Training Strategies
Experience replay stores past experiences. It breaks correlation between samples. It improves sample efficiency. It enables off-policy learning. It requires memory buffer.
Target networks stabilize learning. Separate network for target values. Target network updates slowly. It reduces training instability. It improves convergence.
# RL Training Strategiesfrom collections import dequeimport randomclass ExperienceReplay:def __init__(self, capacity=10000):self.buffer = deque(maxlen=capacity)def push(self, state, action, reward, next_state, done):"""Store experience"""self.buffer.append((state, action, reward, next_state, done))def sample(self, batch_size):"""Sample batch of experiences"""batch = random.sample(self.buffer, batch_size)states, actions, rewards, next_states, dones = zip(*batch)return states, actions, rewards, next_states, donesdef __len__(self):return len(self.buffer)class DQNWithReplay:"""Deep Q-Network with experience replay"""def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):self.q_network = QNetwork(state_dim, action_dim)self.target_network = QNetwork(state_dim, action_dim)self.target_network.load_state_dict(self.q_network.state_dict())self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)self.gamma = gammaself.replay_buffer = ExperienceReplay()self.update_target_frequency = 100self.steps = 0def update(self, batch_size=32):"""Update using experience replay"""if len(self.replay_buffer) < batch_size:returnstates, actions, rewards, next_states, dones = self.replay_buffer.sample(batch_size)states = torch.FloatTensor(states)actions = torch.LongTensor(actions)rewards = torch.FloatTensor(rewards)next_states = torch.FloatTensor(next_states)dones = torch.BoolTensor(dones)current_q = self.q_network(states).gather(1, actions.unsqueeze(1))with torch.no_grad():next_q = self.target_network(next_states).max(1)[0]target_q = rewards + self.gamma * next_q * (~dones)loss = nn.functional.mse_loss(current_q.squeeze(), target_q)self.optimizer.zero_grad()loss.backward()self.optimizer.step()self.steps += 1if self.steps % self.update_target_frequency == 0:self.target_network.load_state_dict(self.q_network.state_dict())return loss.item()# Example# dqn = DQNWithReplay(state_dim=4, action_dim=2)# dqn.replay_buffer.push(state, action, reward, next_state, done)# loss = dqn.update(batch_size=32)
Real-World Application Examples
E-commerce recommendation systems use multi-vector embeddings. Product embeddings capture multiple aspects. User embeddings capture preferences. Temporal search prioritizes recent products. Ensemble methods combine multiple recommenders. This improves recommendation quality.
Search engines use hybrid retrieval. Keyword search handles exact matches. Semantic search handles meaning. Reranking improves order. Multi-stage retrieval balances speed and accuracy. This provides comprehensive results.
# Real-World Application: E-commerce Recommendationclass ECommerceRecommendation:def __init__(self):self.product_embeddings = {} # Multiple embeddings per productself.user_embeddings = {}self.retriever = HybridRetriever()self.reranker = CrossEncoderReranker()def get_recommendations(self, user_id, query=None, top_k=10):"""Get product recommendations"""# Multi-vector product searchif query:query_emb = self.embed_query(query)candidates = self.retriever.retrieve(query_emb, top_k=50)else:# User-based recommendationsuser_emb = self.user_embeddings[user_id]candidates = self.retriever.retrieve(user_emb, top_k=50)# Temporal boosting (prioritize recent products)candidates = self.apply_temporal_boosting(candidates)# Rerankfinal_recommendations = self.reranker.rerank(query or user_emb, candidates, top_k=top_k)return final_recommendationsdef apply_temporal_boosting(self, candidates, recency_weight=0.3):"""Boost recent products"""for candidate in candidates:days_old = (datetime.now() - candidate.created_date).daysrecency_score = 1.0 / (1 + days_old)candidate.score = (1 - recency_weight) * candidate.score + recency_weight * recency_scorereturn sorted(candidates, key=lambda x: x.score, reverse=True)# Real-World Application: Enterprise Searchclass EnterpriseSearchSystem:def __init__(self):self.semantic_retriever = SemanticRetriever()self.keyword_retriever = KeywordRetriever()self.reranker = LearnedToRankReranker()self.generator = LLMGenerator()def search(self, query, filters=None, top_k=10):"""Enterprise search with filters"""# Hybrid retrievalsemantic_results = self.semantic_retriever.retrieve(query, top_k=50)keyword_results = self.keyword_retriever.retrieve(query, top_k=50)# Combine and deduplicateall_candidates = self.combine_results(semantic_results, keyword_results)# Apply filters (department, date, type, etc.)if filters:all_candidates = self.apply_filters(all_candidates, filters)# Rerankreranked = self.reranker.rerank(query, all_candidates, top_k=top_k)return rerankeddef apply_filters(self, candidates, filters):"""Apply metadata filters"""filtered = []for candidate in candidates:matches = Truefor key, value in filters.items():if candidate.metadata.get(key) != value:matches = Falsebreakif matches:filtered.append(candidate)return filtered
Summary
Advanced architectures handle complex requirements. Multi-vector embeddings improve coverage. Temporal search handles time. Ensemble methods improve performance. Advanced indexing enables scale. Complex architectures combine techniques. Advanced systems enable sophisticated applications.