Instructions references/llm_integration_guide.md references/mlops_production_patterns.md references/rag_system_architecture.md scripts/ml_monitoring_suite.py scripts/model_deployment_pipeline.py scripts/rag_system_builder.py
name: “senior-ml-engineer”
description: ML engineering skill for productionizing models, building MLOps pipelines, and integrating LLMs. Covers model deployment, feature stores, drift monitoring, RAG systems, and cost optimization. Use when the user asks about deploying ML models to production, setting up MLOps infrastructure (MLflow, Kubeflow, Kubernetes, Docker), monitoring model performance or drift, building RAG pipelines, or integrating LLM APIs with retry logic and cost controls. Focused on production and operational concerns rather than model research or initial training.
triggers:
MLOps pipeline
model deployment
feature store
model monitoring
drift detection
RAG system
LLM integration
model serving
A/B testing ML
automated retraining
Senior ML Engineer
Production ML engineering patterns for model deployment, MLOps infrastructure, and LLM integration.
Table of Contents
Model Deployment Workflow
Deploy a trained model to production with monitoring:
Export model to standardized format (ONNX, TorchScript, SavedModel)
Package model with dependencies in Docker container
Deploy to staging environment
Run integration tests against staging
Deploy canary (5% traffic) to production
Monitor latency and error rates for 1 hour
Promote to full production if metrics pass
Validation: p95 latency < 100ms, error rate < 0.1%
Container Template
FROM python:3.11-slim
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY model/ /app/model/
COPY src/ /app/src/
HEALTHCHECK CMD curl -f http://localhost:8080/health || exit 1
EXPOSE 8080
CMD [ "uvicorn" , "src.server:app" , "--host" , "0.0.0.0" , "--port" , "8080" ]
Serving Options
Option Latency Throughput Use Case FastAPI + Uvicorn Low Medium REST APIs, small models Triton Inference Server Very Low Very High GPU inference, batching TensorFlow Serving Low High TensorFlow models TorchServe Low High PyTorch models Ray Serve Medium High Complex pipelines, multi-model
MLOps Pipeline Setup
Establish automated training and deployment:
Configure feature store (Feast, Tecton) for training data
Set up experiment tracking (MLflow, Weights & Biases)
Create training pipeline with hyperparameter logging
Register model in model registry with version metadata
Configure staging deployment triggered by registry events
Set up A/B testing infrastructure for model comparison
Enable drift monitoring with alerting
Validation: New models automatically evaluated against baseline
Feature Store Pattern
from feast import Entity, Feature, FeatureView, FileSource
user = Entity( name = "user_id" , value_type = ValueType. INT64 )
user_features = FeatureView(
name = "user_features" ,
entities = [ "user_id" ],
ttl = timedelta( days = 1 ),
features = [
Feature( name = "purchase_count_30d" , dtype = ValueType. INT64 ),
Feature( name = "avg_order_value" , dtype = ValueType. FLOAT ),
],
online = True ,
source = FileSource( path = "data/user_features.parquet" ),
)
Retraining Triggers
Trigger Detection Action Scheduled Cron (weekly/monthly) Full retrain Performance drop Accuracy < threshold Immediate retrain Data drift PSI > 0.2 Evaluate, then retrain New data volume X new samples Incremental update
LLM Integration Workflow
Integrate LLM APIs into production applications:
Create provider abstraction layer for vendor flexibility
Implement retry logic with exponential backoff
Configure fallback to secondary provider
Set up token counting and context truncation
Add response caching for repeated queries
Implement cost tracking per request
Add structured output validation with Pydantic
Validation: Response parses correctly, cost within budget
Provider Abstraction
from abc import ABC , abstractmethod
from tenacity import retry, stop_after_attempt, wait_exponential
class LLMProvider ( ABC ):
@abstractmethod
def complete (self, prompt: str , ** kwargs) -> str :
pass
@retry ( stop = stop_after_attempt( 3 ), wait = wait_exponential( min = 1 , max = 10 ))
def call_llm_with_retry (provider: LLMProvider, prompt: str ) -> str :
return provider.complete(prompt)
Cost Management
Provider Input Cost Output Cost GPT-4 $0.03/1K $0.06/1K GPT-3.5 $0.0005/1K $0.0015/1K Claude 3 Opus $0.015/1K $0.075/1K Claude 3 Haiku $0.00025/1K $0.00125/1K
RAG System Implementation
Build retrieval-augmented generation pipeline:
Choose vector database (Pinecone, Qdrant, Weaviate)
Select embedding model based on quality/cost tradeoff
Implement document chunking strategy
Create ingestion pipeline with metadata extraction
Build retrieval with query embedding
Add reranking for relevance improvement
Format context and send to LLM
Validation: Response references retrieved context, no hallucinations
Vector Database Selection
Database Hosting Scale Latency Best For Pinecone Managed High Low Production, managed Qdrant Both High Very Low Performance-critical Weaviate Both High Low Hybrid search Chroma Self-hosted Medium Low Prototyping pgvector Self-hosted Medium Medium Existing Postgres
Chunking Strategies
Strategy Chunk Size Overlap Best For Fixed 500-1000 tokens 50-100 General text Sentence 3-5 sentences 1 sentence Structured text Semantic Variable Based on meaning Research papers Recursive Hierarchical Parent-child Long documents
Model Monitoring
Monitor production models for drift and degradation:
Set up latency tracking (p50, p95, p99)
Configure error rate alerting
Implement input data drift detection
Track prediction distribution shifts
Log ground truth when available
Compare model versions with A/B metrics
Set up automated retraining triggers
Validation: Alerts fire before user-visible degradation
Drift Detection
from scipy.stats import ks_2samp
def detect_drift (reference, current, threshold = 0.05 ):
statistic, p_value = ks_2samp(reference, current)
return {
"drift_detected" : p_value < threshold,
"ks_statistic" : statistic,
"p_value" : p_value
}
Alert Thresholds
Metric Warning Critical p95 latency > 100ms > 200ms Error rate > 0.1% > 1% PSI (drift) > 0.1 > 0.2 Accuracy drop > 2% > 5%
Reference Documentation
MLOps Production Patterns
references/mlops_production_patterns.md contains:
Model deployment pipeline with Kubernetes manifests
Feature store architecture with Feast examples
Model monitoring with drift detection code
A/B testing infrastructure with traffic splitting
Automated retraining pipeline with MLflow
LLM Integration Guide
references/llm_integration_guide.md contains:
Provider abstraction layer pattern
Retry and fallback strategies with tenacity
Prompt engineering templates (few-shot, CoT)
Token optimization with tiktoken
Cost calculation and tracking
RAG System Architecture
references/rag_system_architecture.md contains:
RAG pipeline implementation with code
Vector database comparison and integration
Chunking strategies (fixed, semantic, recursive)
Embedding model selection guide
Hybrid search and reranking patterns
Model Deployment Pipeline
python scripts/model_deployment_pipeline.py --model model.pkl --target staging
Generates deployment artifacts: Dockerfile, Kubernetes manifests, health checks.
RAG System Builder
python scripts/rag_system_builder.py --config rag_config.yaml --analyze
Scaffolds RAG pipeline with vector store integration and retrieval logic.
ML Monitoring Suite
python scripts/ml_monitoring_suite.py --config monitoring.yaml --deploy
Sets up drift detection, alerting, and performance dashboards.
Tech Stack
Category Tools ML Frameworks PyTorch, TensorFlow, Scikit-learn, XGBoost LLM Frameworks LangChain, LlamaIndex, DSPy MLOps MLflow, Weights & Biases, Kubeflow Data Spark, Airflow, dbt, Kafka Deployment Docker, Kubernetes, Triton Databases PostgreSQL, BigQuery, Pinecone, Redis
LLM Integration Guide
Production patterns for integrating Large Language Models into applications.
Table of Contents
API Integration Patterns
Provider Abstraction Layer
from abc import ABC , abstractmethod
from typing import List, Dict, Any
class LLMProvider ( ABC ):
"""Abstract base class for LLM providers."""
@abstractmethod
def complete (self, prompt: str , ** kwargs) -> str :
pass
@abstractmethod
def chat (self, messages: List[Dict], ** kwargs) -> str :
pass
class OpenAIProvider ( LLMProvider ):
def __init__ (self, api_key: str , model: str = "gpt-4" ):
self .client = OpenAI( api_key = api_key)
self .model = model
def complete (self, prompt: str , ** kwargs) -> str :
response = self .client.completions.create(
model = self .model,
prompt = prompt,
** kwargs
)
return response.choices[ 0 ].text
class AnthropicProvider ( LLMProvider ):
def __init__ (self, api_key: str , model: str = "claude-3-opus" ):
self .client = Anthropic( api_key = api_key)
self .model = model
def chat (self, messages: List[Dict], ** kwargs) -> str :
response = self .client.messages.create(
model = self .model,
messages = messages,
** kwargs
)
return response.content[ 0 ].text Retry and Fallback Strategy
import time
from tenacity import retry, stop_after_attempt, wait_exponential
@retry (
stop = stop_after_attempt( 3 ),
wait = wait_exponential( multiplier = 1 , min = 1 , max = 10 )
)
def call_llm_with_retry (provider: LLMProvider, prompt: str ) -> str :
"""Call LLM with exponential backoff retry."""
return provider.complete(prompt)
def call_with_fallback (
primary: LLMProvider,
fallback: LLMProvider,
prompt: str
) -> str :
"""Try primary provider, fall back on failure."""
try :
return call_llm_with_retry(primary, prompt)
except Exception as e:
logger.warning( f "Primary provider failed: { e } , using fallback" )
return call_llm_with_retry(fallback, prompt)
Prompt Engineering
Prompt Templates
Pattern
Use Case
Structure
Zero-shot
Simple tasks
Task description + input
Few-shot
Complex tasks
Examples + task + input
Chain-of-thought
Reasoning
"Think step by step" + task
Role-based
Specialized output
System role + task
Few-Shot Template
FEW_SHOT_TEMPLATE = """
You are a sentiment classifier. Classify the sentiment as positive, negative, or neutral.
Examples:
Input: "This product is amazing, I love it!"
Output: positive
Input: "Terrible experience, waste of money."
Output: negative
Input: "The product arrived on time."
Output: neutral
Now classify:
Input: " {user_input} "
Output:"""
def classify_sentiment (text: str , provider: LLMProvider) -> str :
prompt = FEW_SHOT_TEMPLATE .format( user_input = text)
response = provider.complete(prompt, max_tokens = 10 , temperature = 0 )
return response.strip().lower() System Prompts for Consistency
SYSTEM_PROMPT = """You are a helpful assistant that answers questions about our product.
Guidelines:
- Be concise and direct
- Use bullet points for lists
- If unsure, say "I don't have that information"
- Never make up information
- Keep responses under 200 words
Product context:
{product_context}
"""
def create_chat_messages (user_query: str , context: str ) -> List[Dict]:
return [
{ "role" : "system" , "content" : SYSTEM_PROMPT .format( product_context = context)},
{ "role" : "user" , "content" : user_query}
]
Token Optimization
Token Counting
import tiktoken
def count_tokens (text: str , model: str = "gpt-4" ) -> int :
"""Count tokens for a given text and model."""
encoding = tiktoken.encoding_for_model(model)
return len (encoding.encode(text))
def truncate_to_token_limit (text: str , max_tokens: int , model: str = "gpt-4" ) -> str :
"""Truncate text to fit within token limit."""
encoding = tiktoken.encoding_for_model(model)
tokens = encoding.encode(text)
if len (tokens) <= max_tokens:
return text
return encoding.decode(tokens[:max_tokens]) Context Window Management
Model
Context Window
Effective Limit
GPT-4
8,192
~6,000 (leave room for response)
GPT-4-32k
32,768
~28,000
Claude 3
200,000
~180,000
Llama 3
8,192
~6,000
Chunking Strategy
def chunk_text (text: str , chunk_size: int = 1000 , overlap: int = 100 ) -> List[ str ]:
"""Split text into overlapping chunks."""
chunks = []
start = 0
while start < len (text):
end = start + chunk_size
chunk = text[start:end]
chunks.append(chunk)
start = end - overlap
return chunks
Cost Management
Cost Calculation
Provider
Input Cost
Output Cost
Example (1K tokens)
GPT-4
$0.03/1K
$0.06/1K
$0.09
GPT-3.5
$0.0005/1K
$0.0015/1K
$0.002
Claude 3 Opus
$0.015/1K
$0.075/1K
$0.09
Claude 3 Haiku
$0.00025/1K
$0.00125/1K
$0.0015
Cost Tracking
from dataclasses import dataclass
from typing import Optional
@dataclass
class LLMUsage :
input_tokens: int
output_tokens: int
model: str
cost: float
def calculate_cost (
input_tokens: int ,
output_tokens: int ,
model: str
) -> float :
"""Calculate cost based on token usage."""
PRICING = {
"gpt-4" : { "input" : 0.03 , "output" : 0.06 },
"gpt-3.5-turbo" : { "input" : 0.0005 , "output" : 0.0015 },
"claude-3-opus" : { "input" : 0.015 , "output" : 0.075 },
}
prices = PRICING .get(model, { "input" : 0.01 , "output" : 0.03 })
input_cost = (input_tokens / 1000 ) * prices[ "input" ]
output_cost = (output_tokens / 1000 ) * prices[ "output" ]
return input_cost + output_cost Cost Optimization Strategies
Use smaller models for simple tasks - GPT-3.5 for classification, GPT-4 for reasoning
Cache common responses - Store results for repeated queries
Batch requests - Combine multiple items in single prompt
Truncate context - Only include relevant information
Set max_tokens limit - Prevent runaway responses
Error Handling
Common Error Types
Error
Cause
Handling
RateLimitError
Too many requests
Exponential backoff
InvalidRequestError
Bad input
Validate before sending
AuthenticationError
Invalid API key
Check credentials
ServiceUnavailable
Provider down
Fallback to alternative
ContextLengthExceeded
Input too long
Truncate or chunk
Error Handling Pattern
from openai import RateLimitError, APIError
def safe_llm_call (provider: LLMProvider, prompt: str , max_retries: int = 3 ) -> str :
"""Safely call LLM with comprehensive error handling."""
for attempt in range (max_retries):
try :
return provider.complete(prompt)
except RateLimitError:
wait_time = 2 ** attempt
logger.warning( f "Rate limited, waiting { wait_time } s" )
time.sleep(wait_time)
except APIError as e:
if e.status_code >= 500 :
logger.warning( f "Server error: { e } , retrying..." )
time.sleep( 1 )
else :
raise
raise Exception ( f "Failed after { max_retries } attempts" ) Response Validation
import json
from pydantic import BaseModel, ValidationError
class StructuredResponse ( BaseModel ):
answer: str
confidence: float
sources: List[ str ]
def parse_structured_response (response: str ) -> StructuredResponse:
"""Parse and validate LLM JSON response."""
try :
data = json.loads(response)
return StructuredResponse( ** data)
except json.JSONDecodeError:
raise ValueError ( "Response is not valid JSON" )
except ValidationError as e:
raise ValueError ( f "Response validation failed: { e } " ) MLOps Production Patterns
Production ML infrastructure patterns for model deployment, monitoring, and lifecycle management.
Table of Contents
Model Deployment Pipeline
Deployment Workflow
Export trained model to standardized format (ONNX, TorchScript, SavedModel)
Package model with dependencies in Docker container
Deploy to staging environment
Run integration tests against staging
Deploy canary (5% traffic) to production
Monitor latency and error rates for 1 hour
Promote to full production if metrics pass
Validation: p95 latency < 100ms, error rate < 0.1%
Container Structure
FROM python:3.11-slim
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy model artifacts
COPY model/ /app/model/
COPY src/ /app/src/
# Health check endpoint
HEALTHCHECK CMD curl -f http://localhost:8080/health || exit 1
EXPOSE 8080
CMD [ "uvicorn" , "src.server:app" , "--host" , "0.0.0.0" , "--port" , "8080" ] Model Serving Options
Option
Latency
Throughput
Use Case
FastAPI + Uvicorn
Low
Medium
REST APIs, small models
Triton Inference Server
Very Low
Very High
GPU inference, batching
TensorFlow Serving
Low
High
TensorFlow models
TorchServe
Low
High
PyTorch models
Ray Serve
Medium
High
Complex pipelines, multi-model
Kubernetes Deployment
apiVersion : apps/v1
kind : Deployment
metadata :
name : model-serving
spec :
replicas : 3
selector :
matchLabels :
app : model-serving
template :
spec :
containers :
- name : model
image : model:v1.0.0
resources :
requests :
memory : "2Gi"
cpu : "1"
limits :
memory : "4Gi"
cpu : "2"
readinessProbe :
httpGet :
path : /health
port : 8080
initialDelaySeconds : 10
periodSeconds : 5
Feature Store Architecture
Feature Store Components
Component
Purpose
Tools
Offline Store
Training data, batch features
BigQuery, Snowflake, S3
Online Store
Low-latency serving
Redis, DynamoDB, Feast
Feature Registry
Metadata, lineage
Feast, Tecton, Hopsworks
Transformation
Feature engineering
Spark, Flink, dbt
Feature Pipeline Workflow
Define feature schema in registry
Implement transformation logic (SQL or Python)
Backfill historical features to offline store
Schedule incremental updates
Materialize to online store for serving
Monitor feature freshness and quality
Validation: Feature values within expected ranges, no nulls in required fields
Feature Definition Example
from feast import Entity, Feature, FeatureView, FileSource
user = Entity( name = "user_id" , value_type = ValueType. INT64 )
user_features = FeatureView(
name = "user_features" ,
entities = [ "user_id" ],
ttl = timedelta( days = 1 ),
features = [
Feature( name = "purchase_count_30d" , dtype = ValueType. INT64 ),
Feature( name = "avg_order_value" , dtype = ValueType. FLOAT ),
Feature( name = "days_since_last_purchase" , dtype = ValueType. INT64 ),
],
online = True ,
source = FileSource( path = "data/user_features.parquet" ),
)
Model Monitoring
Monitoring Dimensions
Dimension
Metrics
Alert Threshold
Latency
p50, p95, p99
p95 > 100ms
Throughput
requests/sec
< 80% baseline
Errors
error rate, 5xx count
> 0.1%
Data Drift
PSI, KS statistic
PSI > 0.2
Model Drift
accuracy, AUC decay
> 5% drop
Data Drift Detection
from scipy.stats import ks_2samp
import numpy as np
def detect_drift (reference: np.array, current: np.array, threshold: float = 0.05 ):
"""Detect distribution drift using Kolmogorov-Smirnov test."""
statistic, p_value = ks_2samp(reference, current)
drift_detected = p_value < threshold
return {
"drift_detected" : drift_detected,
"ks_statistic" : statistic,
"p_value" : p_value,
"threshold" : threshold
} Monitoring Dashboard Metrics
Infrastructure:
Request latency (p50, p95, p99)
Requests per second
Error rate by type
CPU/memory utilization
GPU utilization (if applicable)
Model Performance:
Prediction distribution
Feature value distributions
Model output confidence
Ground truth vs predictions (when available)
A/B Testing Infrastructure
Experiment Workflow
Define experiment hypothesis and success metrics
Calculate required sample size for statistical power
Configure traffic split (control vs treatment)
Deploy treatment model alongside control
Route traffic based on user/session hash
Collect metrics for both variants
Run statistical significance test
Validation: p-value < 0.05, minimum sample size reached
Traffic Splitting
import hashlib
def get_variant (user_id: str , experiment: str , control_pct: float = 0.5 ) -> str :
"""Deterministic traffic splitting based on user ID."""
hash_input = f " { user_id } : { experiment } "
hash_value = int (hashlib.md5(hash_input.encode()).hexdigest(), 16 )
bucket = (hash_value % 100 ) / 100.0
return "control" if bucket < control_pct else "treatment" Metrics Collection
Metric Type
Examples
Collection Method
Primary
Conversion rate, revenue
Event logging
Secondary
Latency, engagement
Request logs
Guardrail
Error rate, crashes
Monitoring system
Automated Retraining
Retraining Triggers
Trigger
Detection Method
Action
Scheduled
Cron (weekly/monthly)
Full retrain
Performance drop
Accuracy < threshold
Immediate retrain
Data drift
PSI > 0.2
Evaluate, then retrain
New data volume
X new samples
Incremental update
Retraining Pipeline
Trigger detection (schedule, drift, performance)
Fetch latest training data from feature store
Run training job with hyperparameter config
Evaluate model on holdout set
Compare against production model
If improved: register new model version
Deploy to staging for validation
Promote to production via canary
Validation: New model outperforms baseline on key metrics
MLflow Model Registry Integration
import mlflow
def register_model (model, metrics: dict , model_name: str ):
"""Register trained model with MLflow."""
with mlflow.start_run():
# Log metrics
for name, value in metrics.items():
mlflow.log_metric(name, value)
# Log model
mlflow.sklearn.log_model(model, "model" )
# Register in model registry
model_uri = f "runs:/ { mlflow.active_run().info.run_id } /model"
mlflow.register_model(model_uri, model_name) RAG System Architecture
Retrieval-Augmented Generation patterns for production applications.
Table of Contents
RAG Pipeline Architecture
Basic RAG Flow
Receive user query
Generate query embedding
Search vector database for relevant chunks
Rerank retrieved chunks by relevance
Format context with retrieved chunks
Send prompt to LLM with context
Return generated response
Validation: Response references retrieved context, no hallucinations
Pipeline Components
from dataclasses import dataclass
from typing import List
@dataclass
class Document :
content: str
metadata: dict
embedding: List[ float ] = None
@dataclass
class RetrievalResult :
document: Document
score: float
class RAGPipeline :
def __init__ (
self,
embedder: Embedder,
vector_store: VectorStore,
llm: LLMProvider,
reranker: Reranker = None
):
self .embedder = embedder
self .vector_store = vector_store
self .llm = llm
self .reranker = reranker
def query (self, question: str , top_k: int = 5 ) -> str :
# 1. Embed query
query_embedding = self .embedder.embed(question)
# 2. Retrieve relevant documents
results = self .vector_store.search(query_embedding, top_k = top_k * 2 )
# 3. Rerank if available
if self .reranker:
results = self .reranker.rerank(question, results)[:top_k]
else :
results = results[:top_k]
# 4. Build context
context = self ._build_context(results)
# 5. Generate response
prompt = self ._build_prompt(question, context)
return self .llm.complete(prompt)
def _build_context (self, results: List[RetrievalResult]) -> str :
return " \n\n " .join([
f "[Source { i + 1} ]: { r.document.content } "
for i, r in enumerate (results)
])
def _build_prompt (self, question: str , context: str ) -> str :
return f """Answer the question based on the context provided.
Context:
{ context }
Question: { question }
Answer:"""
Vector Database Selection
Comparison Matrix
Database
Hosting
Scale
Latency
Cost
Best For
Pinecone
Managed
High
Low
$$
Production, managed
Weaviate
Both
High
Low
$
Hybrid search
Qdrant
Both
High
Very Low
$
Performance-critical
Chroma
Self-hosted
Medium
Low
Free
Prototyping
pgvector
Self-hosted
Medium
Medium
Free
Existing Postgres
Milvus
Both
Very High
Low
$
Large-scale
Pinecone Integration
import pinecone
class PineconeVectorStore :
def __init__ (self, api_key: str , environment: str , index_name: str ):
pinecone.init( api_key = api_key, environment = environment)
self .index = pinecone.Index(index_name)
def upsert (self, documents: List[Document], batch_size: int = 100 ):
"""Upsert documents in batches."""
vectors = [
(doc.metadata[ "id" ], doc.embedding, doc.metadata)
for doc in documents
]
for i in range ( 0 , len (vectors), batch_size):
batch = vectors[i:i + batch_size]
self .index.upsert( vectors = batch)
def search (self, embedding: List[ float ], top_k: int = 5 ) -> List[RetrievalResult]:
"""Search for similar vectors."""
results = self .index.query(
vector = embedding,
top_k = top_k,
include_metadata = True
)
return [
RetrievalResult(
document = Document(
content = match.metadata.get( "content" , "" ),
metadata = match.metadata
),
score = match.score
)
for match in results.matches
]
Chunking Strategies
Strategy Comparison
Strategy
Chunk Size
Overlap
Best For
Fixed
500-1000 tokens
50-100
General text
Sentence
3-5 sentences
1 sentence
Structured text
Paragraph
Natural breaks
None
Documents with clear structure
Semantic
Variable
Based on meaning
Research papers
Recursive
Hierarchical
Parent-child
Long documents
Recursive Character Splitter
from langchain.text_splitter import RecursiveCharacterTextSplitter
def create_chunks (
text: str ,
chunk_size: int = 1000 ,
chunk_overlap: int = 100
) -> List[ str ]:
"""Split text using recursive character splitting."""
splitter = RecursiveCharacterTextSplitter(
chunk_size = chunk_size,
chunk_overlap = chunk_overlap,
separators = [ " \n\n " , " \n " , ". " , " " , "" ]
)
return splitter.split_text(text) Semantic Chunking
from sentence_transformers import SentenceTransformer
import numpy as np
def semantic_chunk (
sentences: List[ str ],
embedder: SentenceTransformer,
threshold: float = 0.7
) -> List[List[ str ]]:
"""Group sentences by semantic similarity."""
embeddings = embedder.encode(sentences)
chunks = []
current_chunk = [sentences[ 0 ]]
current_embedding = embeddings[ 0 ]
for i in range ( 1 , len (sentences)):
similarity = np.dot(current_embedding, embeddings[i]) / (
np.linalg.norm(current_embedding) * np.linalg.norm(embeddings[i])
)
if similarity >= threshold:
current_chunk.append(sentences[i])
current_embedding = np.mean(
[current_embedding, embeddings[i]], axis = 0
)
else :
chunks.append(current_chunk)
current_chunk = [sentences[i]]
current_embedding = embeddings[i]
chunks.append(current_chunk)
return chunks
Embedding Models
Model Comparison
Model
Dimensions
Quality
Speed
Cost
text-embedding-3-large
3072
Excellent
Medium
$0.13/1M
text-embedding-3-small
1536
Good
Fast
$0.02/1M
BGE-large
1024
Excellent
Medium
Free
all-MiniLM-L6-v2
384
Good
Very Fast
Free
Cohere embed-v3
1024
Excellent
Medium
$0.10/1M
Embedding with Caching
import hashlib
from functools import lru_cache
class CachedEmbedder :
def __init__ (self, model_name: str = "text-embedding-3-small" ):
self .client = OpenAI()
self .model = model_name
self ._cache = {}
def embed (self, text: str ) -> List[ float ]:
"""Embed text with caching."""
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self ._cache:
return self ._cache[cache_key]
response = self .client.embeddings.create(
model = self .model,
input = text
)
embedding = response.data[ 0 ].embedding
self ._cache[cache_key] = embedding
return embedding
def embed_batch (self, texts: List[ str ]) -> List[List[ float ]]:
"""Embed multiple texts efficiently."""
response = self .client.embeddings.create(
model = self .model,
input = texts
)
return [item.embedding for item in response.data]
Retrieval Optimization
Hybrid Search
Combine dense (vector) and sparse (keyword) retrieval:
from rank_bm25 import BM25Okapi
class HybridRetriever :
def __init__ (
self,
vector_store: VectorStore,
documents: List[Document],
alpha: float = 0.5
):
self .vector_store = vector_store
self .alpha = alpha # Weight for vector search
# Build BM25 index
tokenized = [doc.content.lower().split() for doc in documents]
self .bm25 = BM25Okapi(tokenized)
self .documents = documents
def search (self, query: str , query_embedding: List[ float ], top_k: int = 5 ):
# Vector search
vector_results = self .vector_store.search(query_embedding, top_k = top_k * 2 )
# BM25 search
tokenized_query = query.lower().split()
bm25_scores = self .bm25.get_scores(tokenized_query)
# Combine scores
combined = {}
for result in vector_results:
doc_id = result.document.metadata[ "id" ]
combined[doc_id] = self .alpha * result.score
for i, score in enumerate (bm25_scores):
doc_id = self .documents[i].metadata[ "id" ]
if doc_id in combined:
combined[doc_id] += ( 1 - self .alpha) * score
else :
combined[doc_id] = ( 1 - self .alpha) * score
# Sort and return top_k
sorted_ids = sorted (combined.keys(), key =lambda x: combined[x], reverse = True )
return sorted_ids[:top_k] Reranking
from sentence_transformers import CrossEncoder
class Reranker :
def __init__ (self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-12-v2" ):
self .model = CrossEncoder(model_name)
def rerank (
self,
query: str ,
results: List[RetrievalResult],
top_k: int = 5
) -> List[RetrievalResult]:
"""Rerank results using cross-encoder."""
pairs = [(query, r.document.content) for r in results]
scores = self .model.predict(pairs)
# Update scores and sort
for i, score in enumerate (scores):
results[i].score = float (score)
return sorted (results, key =lambda x: x.score, reverse = True )[:top_k] Query Expansion
def expand_query (query: str , llm: LLMProvider) -> List[ str ]:
"""Generate query variations for better retrieval."""
prompt = f """Generate 3 alternative phrasings of this question for search.
Return only the questions, one per line.
Original: { query }
Alternatives:"""
response = llm.complete(prompt, max_tokens = 150 )
alternatives = [q.strip() for q in response.strip().split( " \n " ) if q.strip()]
return [query] + alternatives[: 3 ] #!/usr/bin/env python3
"""
Ml Monitoring Suite
Production-grade tool for senior ml/ai engineer
"""
import os
import sys
import json
import logging
import argparse
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(levelname)s - %(message)s '
)
logger = logging.getLogger( __name__ )
class MlMonitoringSuite :
"""Production-grade ml monitoring suite"""
def __init__ (self, config: Dict):
self .config = config
self .results = {
'status' : 'initialized' ,
'start_time' : datetime.now().isoformat(),
'processed_items' : 0
}
logger.info( f "Initialized {self . __class__ . __name__} " )
def validate_config (self) -> bool :
"""Validate configuration"""
logger.info( "Validating configuration..." )
# Add validation logic
logger.info( "Configuration validated" )
return True
def process (self) -> Dict:
"""Main processing logic"""
logger.info( "Starting processing..." )
try :
self .validate_config()
# Main processing
result = self ._execute()
self .results[ 'status' ] = 'completed'
self .results[ 'end_time' ] = datetime.now().isoformat()
logger.info( "Processing completed successfully" )
return self .results
except Exception as e:
self .results[ 'status' ] = 'failed'
self .results[ 'error' ] = str (e)
logger.error( f "Processing failed: { e } " )
raise
def _execute (self) -> Dict:
"""Execute main logic"""
# Implementation here
return { 'success' : True }
def main ():
"""Main entry point"""
parser = argparse.ArgumentParser(
description = "Ml Monitoring Suite"
)
parser.add_argument( '--input' , '-i' , required = True , help = 'Input path' )
parser.add_argument( '--output' , '-o' , required = True , help = 'Output path' )
parser.add_argument( '--config' , '-c' , help = 'Configuration file' )
parser.add_argument( '--verbose' , '-v' , action = 'store_true' , help = 'Verbose output' )
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging. DEBUG )
try :
config = {
'input' : args.input,
'output' : args.output
}
processor = MlMonitoringSuite(config)
results = processor.process()
print (json.dumps(results, indent = 2 ))
sys.exit( 0 )
except Exception as e:
logger.error( f "Fatal error: { e } " )
sys.exit( 1 )
if __name__ == '__main__' :
main()
#!/usr/bin/env python3
"""
Model Deployment Pipeline
Production-grade tool for senior ml/ai engineer
"""
import os
import sys
import json
import logging
import argparse
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(levelname)s - %(message)s '
)
logger = logging.getLogger( __name__ )
class ModelDeploymentPipeline :
"""Production-grade model deployment pipeline"""
def __init__ (self, config: Dict):
self .config = config
self .results = {
'status' : 'initialized' ,
'start_time' : datetime.now().isoformat(),
'processed_items' : 0
}
logger.info( f "Initialized {self . __class__ . __name__} " )
def validate_config (self) -> bool :
"""Validate configuration"""
logger.info( "Validating configuration..." )
# Add validation logic
logger.info( "Configuration validated" )
return True
def process (self) -> Dict:
"""Main processing logic"""
logger.info( "Starting processing..." )
try :
self .validate_config()
# Main processing
result = self ._execute()
self .results[ 'status' ] = 'completed'
self .results[ 'end_time' ] = datetime.now().isoformat()
logger.info( "Processing completed successfully" )
return self .results
except Exception as e:
self .results[ 'status' ] = 'failed'
self .results[ 'error' ] = str (e)
logger.error( f "Processing failed: { e } " )
raise
def _execute (self) -> Dict:
"""Execute main logic"""
# Implementation here
return { 'success' : True }
def main ():
"""Main entry point"""
parser = argparse.ArgumentParser(
description = "Model Deployment Pipeline"
)
parser.add_argument( '--input' , '-i' , required = True , help = 'Input path' )
parser.add_argument( '--output' , '-o' , required = True , help = 'Output path' )
parser.add_argument( '--config' , '-c' , help = 'Configuration file' )
parser.add_argument( '--verbose' , '-v' , action = 'store_true' , help = 'Verbose output' )
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging. DEBUG )
try :
config = {
'input' : args.input,
'output' : args.output
}
processor = ModelDeploymentPipeline(config)
results = processor.process()
print (json.dumps(results, indent = 2 ))
sys.exit( 0 )
except Exception as e:
logger.error( f "Fatal error: { e } " )
sys.exit( 1 )
if __name__ == '__main__' :
main()
#!/usr/bin/env python3
"""
Rag System Builder
Production-grade tool for senior ml/ai engineer
"""
import os
import sys
import json
import logging
import argparse
from pathlib import Path
from typing import Dict, List, Optional
from datetime import datetime
logging.basicConfig(
level = logging. INFO ,
format = ' %(asctime)s - %(levelname)s - %(message)s '
)
logger = logging.getLogger( __name__ )
class RagSystemBuilder :
"""Production-grade rag system builder"""
def __init__ (self, config: Dict):
self .config = config
self .results = {
'status' : 'initialized' ,
'start_time' : datetime.now().isoformat(),
'processed_items' : 0
}
logger.info( f "Initialized {self . __class__ . __name__} " )
def validate_config (self) -> bool :
"""Validate configuration"""
logger.info( "Validating configuration..." )
# Add validation logic
logger.info( "Configuration validated" )
return True
def process (self) -> Dict:
"""Main processing logic"""
logger.info( "Starting processing..." )
try :
self .validate_config()
# Main processing
result = self ._execute()
self .results[ 'status' ] = 'completed'
self .results[ 'end_time' ] = datetime.now().isoformat()
logger.info( "Processing completed successfully" )
return self .results
except Exception as e:
self .results[ 'status' ] = 'failed'
self .results[ 'error' ] = str (e)
logger.error( f "Processing failed: { e } " )
raise
def _execute (self) -> Dict:
"""Execute main logic"""
# Implementation here
return { 'success' : True }
def main ():
"""Main entry point"""
parser = argparse.ArgumentParser(
description = "Rag System Builder"
)
parser.add_argument( '--input' , '-i' , required = True , help = 'Input path' )
parser.add_argument( '--output' , '-o' , required = True , help = 'Output path' )
parser.add_argument( '--config' , '-c' , help = 'Configuration file' )
parser.add_argument( '--verbose' , '-v' , action = 'store_true' , help = 'Verbose output' )
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging. DEBUG )
try :
config = {
'input' : args.input,
'output' : args.output
}
processor = RagSystemBuilder(config)
results = processor.process()
print (json.dumps(results, indent = 2 ))
sys.exit( 0 )
except Exception as e:
logger.error( f "Fatal error: { e } " )
sys.exit( 1 )
if __name__ == '__main__' :
main()