Agent Designer
Design and orchestrate multi-agent AI systems — define agent roles, communication protocols, tool use patterns, and failure recovery strategies.
What this skill does
Design sophisticated AI teams by defining clear roles, communication rules, and safety nets for complex workflows. You will produce a structured blueprint that enables multiple AI teammates to collaborate seamlessly on research, analysis, or operational tasks. Reach for this skill when simple single-bot setups fail to handle the scale or complexity of your project.
name: “agent-designer” description: “Agent Designer - Multi-Agent System Architecture”
Agent Designer - Multi-Agent System Architecture
Tier: POWERFUL
Category: Engineering
Tags: AI agents, architecture, system design, orchestration, multi-agent systems
Overview
Agent Designer is a comprehensive toolkit for designing, architecting, and evaluating multi-agent systems. It provides structured approaches to agent architecture patterns, tool design principles, communication strategies, and performance evaluation frameworks for building robust, scalable AI agent systems.
Core Capabilities
1. Agent Architecture Patterns
Single Agent Pattern
- Use Case: Simple, focused tasks with clear boundaries
- Pros: Minimal complexity, easy debugging, predictable behavior
- Cons: Limited scalability, single point of failure
- Implementation: Direct user-agent interaction with comprehensive tool access
Supervisor Pattern
- Use Case: Hierarchical task decomposition with centralized control
- Architecture: One supervisor agent coordinating multiple specialist agents
- Pros: Clear command structure, centralized decision making
- Cons: Supervisor bottleneck, complex coordination logic
- Implementation: Supervisor receives tasks, delegates to specialists, aggregates results
Swarm Pattern
- Use Case: Distributed problem solving with peer-to-peer collaboration
- Architecture: Multiple autonomous agents with shared objectives
- Pros: High parallelism, fault tolerance, emergent intelligence
- Cons: Complex coordination, potential conflicts, harder to predict
- Implementation: Agent discovery, consensus mechanisms, distributed task allocation
Hierarchical Pattern
- Use Case: Complex systems with multiple organizational layers
- Architecture: Tree structure with managers and workers at different levels
- Pros: Natural organizational mapping, clear responsibilities
- Cons: Communication overhead, potential bottlenecks at each level
- Implementation: Multi-level delegation with feedback loops
Pipeline Pattern
- Use Case: Sequential processing with specialized stages
- Architecture: Agents arranged in processing pipeline
- Pros: Clear data flow, specialized optimization per stage
- Cons: Sequential bottlenecks, rigid processing order
- Implementation: Message queues between stages, state handoffs
2. Agent Role Definition
Role Specification Framework
- Identity: Name, purpose statement, core competencies
- Responsibilities: Primary tasks, decision boundaries, success criteria
- Capabilities: Required tools, knowledge domains, processing limits
- Interfaces: Input/output formats, communication protocols
- Constraints: Security boundaries, resource limits, operational guidelines
Common Agent Archetypes
Coordinator Agent
- Orchestrates multi-agent workflows
- Makes high-level decisions and resource allocation
- Monitors system health and performance
- Handles escalations and conflict resolution
Specialist Agent
- Deep expertise in specific domain (code, data, research)
- Optimized tools and knowledge for specialized tasks
- High-quality output within narrow scope
- Clear handoff protocols for out-of-scope requests
Interface Agent
- Handles external interactions (users, APIs, systems)
- Protocol translation and format conversion
- Authentication and authorization management
- User experience optimization
Monitor Agent
- System health monitoring and alerting
- Performance metrics collection and analysis
- Anomaly detection and reporting
- Compliance and audit trail maintenance
3. Tool Design Principles
Schema Design
- Input Validation: Strong typing, required vs optional parameters
- Output Consistency: Standardized response formats, error handling
- Documentation: Clear descriptions, usage examples, edge cases
- Versioning: Backward compatibility, migration paths
Error Handling Patterns
- Graceful Degradation: Partial functionality when dependencies fail
- Retry Logic: Exponential backoff, circuit breakers, max attempts
- Error Propagation: Structured error responses, error classification
- Recovery Strategies: Fallback methods, alternative approaches
Idempotency Requirements
- Safe Operations: Read operations with no side effects
- Idempotent Writes: Same operation can be safely repeated
- State Management: Version tracking, conflict resolution
- Atomicity: All-or-nothing operation completion
4. Communication Patterns
Message Passing
- Asynchronous Messaging: Decoupled agents, message queues
- Message Format: Structured payloads with metadata
- Delivery Guarantees: At-least-once, exactly-once semantics
- Routing: Direct messaging, publish-subscribe, broadcast
Shared State
- State Stores: Centralized data repositories
- Consistency Models: Strong, eventual, weak consistency
- Access Patterns: Read-heavy, write-heavy, mixed workloads
- Conflict Resolution: Last-writer-wins, merge strategies
Event-Driven Architecture
- Event Sourcing: Immutable event logs, state reconstruction
- Event Types: Domain events, system events, integration events
- Event Processing: Real-time, batch, stream processing
- Event Schema: Versioned event formats, backward compatibility
5. Guardrails and Safety
Input Validation
- Schema Enforcement: Required fields, type checking, format validation
- Content Filtering: Harmful content detection, PII scrubbing
- Rate Limiting: Request throttling, resource quotas
- Authentication: Identity verification, authorization checks
Output Filtering
- Content Moderation: Harmful content removal, quality checks
- Consistency Validation: Logic checks, constraint verification
- Formatting: Standardized output formats, clean presentation
- Audit Logging: Decision trails, compliance records
Human-in-the-Loop
- Approval Workflows: Critical decision checkpoints
- Escalation Triggers: Confidence thresholds, risk assessment
- Override Mechanisms: Human judgment precedence
- Feedback Loops: Human corrections improve system behavior
6. Evaluation Frameworks
Task Completion Metrics
- Success Rate: Percentage of tasks completed successfully
- Partial Completion: Progress measurement for complex tasks
- Task Classification: Success criteria by task type
- Failure Analysis: Root cause identification and categorization
Quality Assessment
- Output Quality: Accuracy, relevance, completeness measures
- Consistency: Response variability across similar inputs
- Coherence: Logical flow and internal consistency
- User Satisfaction: Feedback scores, usage patterns
Cost Analysis
- Token Usage: Input/output token consumption per task
- API Costs: External service usage and charges
- Compute Resources: CPU, memory, storage utilization
- Time-to-Value: Cost per successful task completion
Latency Distribution
- Response Time: End-to-end task completion time
- Processing Stages: Bottleneck identification per stage
- Queue Times: Wait times in processing pipelines
- Resource Contention: Impact of concurrent operations
7. Orchestration Strategies
Centralized Orchestration
- Workflow Engine: Central coordinator manages all agents
- State Management: Centralized workflow state tracking
- Decision Logic: Complex routing and branching rules
- Monitoring: Comprehensive visibility into all operations
Decentralized Orchestration
- Peer-to-Peer: Agents coordinate directly with each other
- Service Discovery: Dynamic agent registration and lookup
- Consensus Protocols: Distributed decision making
- Fault Tolerance: No single point of failure
Hybrid Approaches
- Domain Boundaries: Centralized within domains, federated across
- Hierarchical Coordination: Multiple orchestration levels
- Context-Dependent: Strategy selection based on task type
- Load Balancing: Distribute coordination responsibility
8. Memory Patterns
Short-Term Memory
- Context Windows: Working memory for current tasks
- Session State: Temporary data for ongoing interactions
- Cache Management: Performance optimization strategies
- Memory Pressure: Handling capacity constraints
Long-Term Memory
- Persistent Storage: Durable data across sessions
- Knowledge Base: Accumulated domain knowledge
- Experience Replay: Learning from past interactions
- Memory Consolidation: Transferring from short to long-term
Shared Memory
- Collaborative Knowledge: Shared learning across agents
- Synchronization: Consistency maintenance strategies
- Access Control: Permission-based memory access
- Memory Partitioning: Isolation between agent groups
9. Scaling Considerations
Horizontal Scaling
- Agent Replication: Multiple instances of same agent type
- Load Distribution: Request routing across agent instances
- Resource Pooling: Shared compute and storage resources
- Geographic Distribution: Multi-region deployments
Vertical Scaling
- Capability Enhancement: More powerful individual agents
- Tool Expansion: Broader tool access per agent
- Context Expansion: Larger working memory capacity
- Processing Power: Higher throughput per agent
Performance Optimization
- Caching Strategies: Response caching, tool result caching
- Parallel Processing: Concurrent task execution
- Resource Optimization: Efficient resource utilization
- Bottleneck Elimination: Systematic performance tuning
10. Failure Handling
Retry Mechanisms
- Exponential Backoff: Increasing delays between retries
- Jitter: Random delay variation to prevent thundering herd
- Maximum Attempts: Bounded retry behavior
- Retry Conditions: Transient vs permanent failure classification
Fallback Strategies
- Graceful Degradation: Reduced functionality when systems fail
- Alternative Approaches: Different methods for same goals
- Default Responses: Safe fallback behaviors
- User Communication: Clear failure messaging
Circuit Breakers
- Failure Detection: Monitoring failure rates and response times
- State Management: Open, closed, half-open circuit states
- Recovery Testing: Gradual return to normal operation
- Cascading Failure Prevention: Protecting upstream systems
Implementation Guidelines
Architecture Decision Process
- Requirements Analysis: Understand system goals, constraints, scale
- Pattern Selection: Choose appropriate architecture pattern
- Agent Design: Define roles, responsibilities, interfaces
- Tool Architecture: Design tool schemas and error handling
- Communication Design: Select message patterns and protocols
- Safety Implementation: Build guardrails and validation
- Evaluation Planning: Define success metrics and monitoring
- Deployment Strategy: Plan scaling and failure handling
Quality Assurance
- Testing Strategy: Unit, integration, and system testing approaches
- Monitoring: Real-time system health and performance tracking
- Documentation: Architecture documentation and runbooks
- Security Review: Threat modeling and security assessments
Continuous Improvement
- Performance Monitoring: Ongoing system performance analysis
- User Feedback: Incorporating user experience improvements
- A/B Testing: Controlled experiments for system improvements
- Knowledge Base Updates: Continuous learning and adaptation
This skill provides the foundation for designing robust, scalable multi-agent systems that can handle complex tasks while maintaining safety, reliability, and performance at scale.
Agent Designer - Multi-Agent System Architecture Toolkit
Tier: POWERFUL
Category: Engineering
Tags: AI agents, architecture, system design, orchestration, multi-agent systems
A comprehensive toolkit for designing, architecting, and evaluating multi-agent systems. Provides structured approaches to agent architecture patterns, tool design principles, communication strategies, and performance evaluation frameworks.
Overview
The Agent Designer skill includes three core components:
- Agent Planner (
agent_planner.py) - Designs multi-agent system architectures - Tool Schema Generator (
tool_schema_generator.py) - Creates structured tool schemas - Agent Evaluator (
agent_evaluator.py) - Evaluates system performance and identifies optimizations
Quick Start
1. Design a Multi-Agent Architecture
# Use sample requirements or create your own
python agent_planner.py assets/sample_system_requirements.json -o my_architecture
# This generates:
# - my_architecture.json (complete architecture)
# - my_architecture_diagram.mmd (Mermaid diagram)
# - my_architecture_roadmap.json (implementation plan)2. Generate Tool Schemas
# Use sample tool descriptions or create your own
python tool_schema_generator.py assets/sample_tool_descriptions.json -o my_tools
# This generates:
# - my_tools.json (complete schemas)
# - my_tools_openai.json (OpenAI format)
# - my_tools_anthropic.json (Anthropic format)
# - my_tools_validation.json (validation rules)
# - my_tools_examples.json (usage examples)3. Evaluate System Performance
# Use sample execution logs or your own
python agent_evaluator.py assets/sample_execution_logs.json -o evaluation
# This generates:
# - evaluation.json (complete report)
# - evaluation_summary.json (executive summary)
# - evaluation_recommendations.json (optimization suggestions)
# - evaluation_errors.json (error analysis)Detailed Usage
Agent Planner
The Agent Planner designs multi-agent architectures based on system requirements.
Input Format
Create a JSON file with system requirements:
{
"goal": "Your system's primary objective",
"description": "Detailed system description",
"tasks": ["List", "of", "required", "tasks"],
"constraints": {
"max_response_time": 30000,
"budget_per_task": 1.0,
"quality_threshold": 0.9
},
"team_size": 6,
"performance_requirements": {
"high_throughput": true,
"fault_tolerance": true,
"low_latency": false
},
"safety_requirements": [
"Input validation and sanitization",
"Output content filtering"
]
}Command Line Options
python agent_planner.py <input_file> [OPTIONS]
Options:
-o, --output PREFIX Output file prefix (default: agent_architecture)
--format FORMAT Output format: json, both (default: both)Output Files
- Architecture JSON: Complete system design with agents, communication topology, and scaling strategy
- Mermaid Diagram: Visual representation of the agent architecture
- Implementation Roadmap: Phased implementation plan with timelines and risks
Architecture Patterns
The planner automatically selects from these patterns based on requirements:
- Single Agent: Simple, focused tasks (1 agent)
- Supervisor: Hierarchical delegation (2-8 agents)
- Swarm: Peer-to-peer collaboration (3-20 agents)
- Hierarchical: Multi-level management (5-50 agents)
- Pipeline: Sequential processing (3-15 agents)
Tool Schema Generator
Generates structured tool schemas compatible with OpenAI and Anthropic formats.
Input Format
Create a JSON file with tool descriptions:
{
"tools": [
{
"name": "tool_name",
"purpose": "What the tool does",
"category": "Tool category (search, data, api, etc.)",
"inputs": [
{
"name": "parameter_name",
"type": "string",
"description": "Parameter description",
"required": true,
"examples": ["example1", "example2"]
}
],
"outputs": [
{
"name": "result_field",
"type": "object",
"description": "Output description"
}
],
"error_conditions": ["List of possible errors"],
"side_effects": ["List of side effects"],
"idempotent": true,
"rate_limits": {
"requests_per_minute": 60
}
}
]
}Command Line Options
python tool_schema_generator.py <input_file> [OPTIONS]
Options:
-o, --output PREFIX Output file prefix (default: tool_schemas)
--format FORMAT Output format: json, both (default: both)
--validate Validate generated schemasOutput Files
- Complete Schemas: All schemas with validation and examples
- OpenAI Format: Schemas compatible with OpenAI function calling
- Anthropic Format: Schemas compatible with Anthropic tool use
- Validation Rules: Input validation specifications
- Usage Examples: Example calls and responses
Schema Features
- Input Validation: Comprehensive parameter validation rules
- Error Handling: Structured error response formats
- Rate Limiting: Configurable rate limit specifications
- Documentation: Auto-generated usage examples
- Security: Built-in security considerations
Agent Evaluator
Analyzes agent execution logs to identify performance issues and optimization opportunities.
Input Format
Create a JSON file with execution logs:
{
"execution_logs": [
{
"task_id": "unique_task_identifier",
"agent_id": "agent_identifier",
"task_type": "task_category",
"start_time": "2024-01-15T09:00:00Z",
"end_time": "2024-01-15T09:02:34Z",
"duration_ms": 154000,
"status": "success",
"actions": [
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 2300,
"success": true
}
],
"results": {
"summary": "Task results",
"quality_score": 0.92
},
"tokens_used": {
"input_tokens": 1250,
"output_tokens": 2800,
"total_tokens": 4050
},
"cost_usd": 0.081,
"error_details": null,
"tools_used": ["web_search"],
"retry_count": 0
}
]
}Command Line Options
python agent_evaluator.py <input_file> [OPTIONS]
Options:
-o, --output PREFIX Output file prefix (default: evaluation_report)
--format FORMAT Output format: json, both (default: both)
--detailed Include detailed analysis in outputOutput Files
- Complete Report: Comprehensive performance analysis
- Executive Summary: High-level metrics and health assessment
- Optimization Recommendations: Prioritized improvement suggestions
- Error Analysis: Detailed error patterns and solutions
Evaluation Metrics
Performance Metrics:
- Task success rate and completion times
- Token usage and cost efficiency
- Error rates and retry patterns
- Throughput and latency distributions
System Health:
- Overall health score (poor/fair/good/excellent)
- SLA compliance tracking
- Resource utilization analysis
- Trend identification
Bottleneck Analysis:
- Agent performance bottlenecks
- Tool usage inefficiencies
- Communication overhead
- Resource constraints
Architecture Patterns Guide
When to Use Each Pattern
Single Agent
- Best for: Simple, focused tasks with clear boundaries
- Team size: 1 agent
- Complexity: Low
- Examples: Personal assistant, document summarizer, simple automation
Supervisor
- Best for: Hierarchical task decomposition with quality control
- Team size: 2-8 agents
- Complexity: Medium
- Examples: Research coordinator with specialists, content review workflow
Swarm
- Best for: Distributed problem solving with high fault tolerance
- Team size: 3-20 agents
- Complexity: High
- Examples: Parallel data processing, distributed research, competitive analysis
Hierarchical
- Best for: Large-scale operations with organizational structure
- Team size: 5-50 agents
- Complexity: Very High
- Examples: Enterprise workflows, complex business processes
Pipeline
- Best for: Sequential processing with specialized stages
- Team size: 3-15 agents
- Complexity: Medium
- Examples: Data ETL pipelines, content processing workflows
Best Practices
System Design
- Start Simple: Begin with simpler patterns and evolve
- Clear Responsibilities: Define distinct roles for each agent
- Robust Communication: Design reliable message passing
- Error Handling: Plan for failures and recovery
- Monitor Everything: Implement comprehensive observability
Tool Design
- Single Responsibility: Each tool should have one clear purpose
- Input Validation: Validate all inputs thoroughly
- Idempotency: Design operations to be safely repeatable
- Error Recovery: Provide clear error messages and recovery paths
- Documentation: Include comprehensive usage examples
Performance Optimization
- Measure First: Use the evaluator to identify actual bottlenecks
- Optimize Bottlenecks: Focus on highest-impact improvements
- Cache Strategically: Cache expensive operations and results
- Parallel Processing: Identify opportunities for parallelization
- Resource Management: Monitor and optimize resource usage
Sample Files
The assets/ directory contains sample files to help you get started:
sample_system_requirements.json: Example system requirements for a research platformsample_tool_descriptions.json: Example tool descriptions for common operationssample_execution_logs.json: Example execution logs from a running system
The expected_outputs/ directory shows expected results from processing these samples.
References
See the references/ directory for detailed documentation:
agent_architecture_patterns.md: Comprehensive catalog of architecture patternstool_design_best_practices.md: Best practices for tool design and implementationevaluation_methodology.md: Detailed methodology for system evaluation
Integration Examples
With OpenAI
import json
import openai
# Load generated OpenAI schemas
with open('my_tools_openai.json') as f:
schemas = json.load(f)
# Use with OpenAI function calling
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": "Search for AI news"}],
functions=schemas['functions']
)With Anthropic Claude
import json
import anthropic
# Load generated Anthropic schemas
with open('my_tools_anthropic.json') as f:
schemas = json.load(f)
# Use with Anthropic tool use
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-3-opus-20240229",
messages=[{"role": "user", "content": "Search for AI news"}],
tools=schemas['tools']
)Troubleshooting
Common Issues
"No valid architecture pattern found"
- Check that team_size is reasonable (1-50)
- Ensure tasks list is not empty
- Verify performance_requirements are valid
"Tool schema validation failed"
- Check that all required fields are present
- Ensure parameter types are valid
- Verify enum values are provided as arrays
"Insufficient execution logs"
- Ensure logs contain required fields (task_id, agent_id, status)
- Check that timestamps are in ISO 8601 format
- Verify token usage fields are numeric
Performance Tips
- Large Systems: For systems with >20 agents, consider breaking into subsystems
- Complex Tools: Tools with >10 parameters may need simplification
- Log Volume: For >1000 log entries, consider sampling for faster analysis
Contributing
This skill is part of the claude-skills repository. To contribute:
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests and documentation
- Submit a pull request
License
This project is licensed under the MIT License - see the main repository for details.
Support
For issues and questions:
- Check the troubleshooting section above
- Review the reference documentation in
references/ - Create an issue in the claude-skills repository
#!/usr/bin/env python3
"""
Agent Evaluator - Multi-Agent System Performance Analysis
Takes agent execution logs (task, actions taken, results, time, tokens used)
and evaluates performance: task success rate, average cost per task, latency
distribution, error patterns, tool usage efficiency, identifies bottlenecks
and improvement opportunities.
Input: execution logs JSON
Output: performance report + bottleneck analysis + optimization recommendations
"""
import json
import argparse
import sys
import statistics
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import re
@dataclass
class ExecutionLog:
"""Single execution log entry"""
task_id: str
agent_id: str
task_type: str
task_description: str
start_time: str
end_time: str
duration_ms: int
status: str # success, failure, partial, timeout
actions: List[Dict[str, Any]]
results: Dict[str, Any]
tokens_used: Dict[str, int] # input_tokens, output_tokens, total_tokens
cost_usd: float
error_details: Optional[Dict[str, Any]]
tools_used: List[str]
retry_count: int
metadata: Dict[str, Any]
@dataclass
class PerformanceMetrics:
"""Performance metrics for an agent or system"""
total_tasks: int
successful_tasks: int
failed_tasks: int
partial_tasks: int
timeout_tasks: int
success_rate: float
failure_rate: float
average_duration_ms: float
median_duration_ms: float
percentile_95_duration_ms: float
min_duration_ms: int
max_duration_ms: int
total_tokens_used: int
average_tokens_per_task: float
total_cost_usd: float
average_cost_per_task: float
cost_per_token: float
throughput_tasks_per_hour: float
error_rate: float
retry_rate: float
@dataclass
class ErrorAnalysis:
"""Error pattern analysis"""
error_type: str
count: int
percentage: float
affected_agents: List[str]
affected_task_types: List[str]
common_patterns: List[str]
suggested_fixes: List[str]
impact_level: str # high, medium, low
@dataclass
class BottleneckAnalysis:
"""System bottleneck analysis"""
bottleneck_type: str # agent, tool, communication, resource
location: str
severity: str # critical, high, medium, low
description: str
impact_on_performance: Dict[str, float]
affected_workflows: List[str]
optimization_suggestions: List[str]
estimated_improvement: Dict[str, float]
@dataclass
class OptimizationRecommendation:
"""Performance optimization recommendation"""
category: str # performance, cost, reliability, scalability
priority: str # high, medium, low
title: str
description: str
implementation_effort: str # low, medium, high
expected_impact: Dict[str, Any]
estimated_cost_savings: Optional[float]
estimated_performance_gain: Optional[float]
implementation_steps: List[str]
risks: List[str]
prerequisites: List[str]
@dataclass
class EvaluationReport:
"""Complete evaluation report"""
summary: Dict[str, Any]
system_metrics: PerformanceMetrics
agent_metrics: Dict[str, PerformanceMetrics]
task_type_metrics: Dict[str, PerformanceMetrics]
tool_usage_analysis: Dict[str, Any]
error_analysis: List[ErrorAnalysis]
bottleneck_analysis: List[BottleneckAnalysis]
optimization_recommendations: List[OptimizationRecommendation]
trends_analysis: Dict[str, Any]
cost_breakdown: Dict[str, Any]
sla_compliance: Dict[str, Any]
metadata: Dict[str, Any]
class AgentEvaluator:
"""Evaluate multi-agent system performance from execution logs"""
def __init__(self):
self.error_patterns = self._define_error_patterns()
self.performance_thresholds = self._define_performance_thresholds()
self.cost_benchmarks = self._define_cost_benchmarks()
def _define_error_patterns(self) -> Dict[str, Dict[str, Any]]:
"""Define common error patterns and their classifications"""
return {
"timeout": {
"patterns": [r"timeout", r"timed out", r"deadline exceeded"],
"category": "performance",
"severity": "high",
"common_fixes": [
"Increase timeout values",
"Optimize slow operations",
"Add retry logic with exponential backoff",
"Parallelize independent operations"
]
},
"rate_limit": {
"patterns": [r"rate limit", r"too many requests", r"quota exceeded"],
"category": "resource",
"severity": "medium",
"common_fixes": [
"Implement request throttling",
"Add circuit breaker pattern",
"Use request queuing",
"Negotiate higher limits"
]
},
"authentication": {
"patterns": [r"unauthorized", r"authentication failed", r"invalid credentials"],
"category": "security",
"severity": "high",
"common_fixes": [
"Check credential rotation",
"Implement token refresh logic",
"Add authentication retry",
"Verify permission scopes"
]
},
"network": {
"patterns": [r"connection refused", r"network error", r"dns resolution"],
"category": "infrastructure",
"severity": "high",
"common_fixes": [
"Add network retry logic",
"Implement fallback endpoints",
"Use connection pooling",
"Add health checks"
]
},
"validation": {
"patterns": [r"validation error", r"invalid input", r"schema violation"],
"category": "data",
"severity": "medium",
"common_fixes": [
"Strengthen input validation",
"Add data sanitization",
"Improve error messages",
"Add input examples"
]
},
"resource": {
"patterns": [r"out of memory", r"disk full", r"cpu overload"],
"category": "resource",
"severity": "critical",
"common_fixes": [
"Scale up resources",
"Optimize memory usage",
"Add resource monitoring",
"Implement graceful degradation"
]
}
}
def _define_performance_thresholds(self) -> Dict[str, Any]:
"""Define performance thresholds for different metrics"""
return {
"success_rate": {"excellent": 0.98, "good": 0.95, "acceptable": 0.90, "poor": 0.80},
"average_duration": {"excellent": 1000, "good": 3000, "acceptable": 10000, "poor": 30000},
"error_rate": {"excellent": 0.01, "good": 0.03, "acceptable": 0.05, "poor": 0.10},
"retry_rate": {"excellent": 0.05, "good": 0.10, "acceptable": 0.20, "poor": 0.40},
"cost_per_task": {"excellent": 0.01, "good": 0.05, "acceptable": 0.10, "poor": 0.25},
"throughput": {"excellent": 100, "good": 50, "acceptable": 20, "poor": 5} # tasks per hour
}
def _define_cost_benchmarks(self) -> Dict[str, Any]:
"""Define cost benchmarks for different operations"""
return {
"token_costs": {
"gpt-4": {"input": 0.00003, "output": 0.00006},
"gpt-3.5-turbo": {"input": 0.000002, "output": 0.000002},
"claude-3": {"input": 0.000015, "output": 0.000075}
},
"operation_costs": {
"simple_task": 0.005,
"complex_task": 0.050,
"research_task": 0.020,
"analysis_task": 0.030,
"generation_task": 0.015
}
}
def parse_execution_logs(self, logs_data: List[Dict[str, Any]]) -> List[ExecutionLog]:
"""Parse raw execution logs into structured format"""
logs = []
for log_entry in logs_data:
try:
log = ExecutionLog(
task_id=log_entry.get("task_id", ""),
agent_id=log_entry.get("agent_id", ""),
task_type=log_entry.get("task_type", "unknown"),
task_description=log_entry.get("task_description", ""),
start_time=log_entry.get("start_time", ""),
end_time=log_entry.get("end_time", ""),
duration_ms=log_entry.get("duration_ms", 0),
status=log_entry.get("status", "unknown"),
actions=log_entry.get("actions", []),
results=log_entry.get("results", {}),
tokens_used=log_entry.get("tokens_used", {"total_tokens": 0}),
cost_usd=log_entry.get("cost_usd", 0.0),
error_details=log_entry.get("error_details"),
tools_used=log_entry.get("tools_used", []),
retry_count=log_entry.get("retry_count", 0),
metadata=log_entry.get("metadata", {})
)
logs.append(log)
except Exception as e:
print(f"Warning: Failed to parse log entry: {e}", file=sys.stderr)
continue
return logs
def calculate_performance_metrics(self, logs: List[ExecutionLog]) -> PerformanceMetrics:
"""Calculate performance metrics from execution logs"""
if not logs:
return PerformanceMetrics(
total_tasks=0, successful_tasks=0, failed_tasks=0, partial_tasks=0,
timeout_tasks=0, success_rate=0.0, failure_rate=0.0,
average_duration_ms=0.0, median_duration_ms=0.0, percentile_95_duration_ms=0.0,
min_duration_ms=0, max_duration_ms=0, total_tokens_used=0,
average_tokens_per_task=0.0, total_cost_usd=0.0, average_cost_per_task=0.0,
cost_per_token=0.0, throughput_tasks_per_hour=0.0, error_rate=0.0, retry_rate=0.0
)
total_tasks = len(logs)
successful_tasks = sum(1 for log in logs if log.status == "success")
failed_tasks = sum(1 for log in logs if log.status == "failure")
partial_tasks = sum(1 for log in logs if log.status == "partial")
timeout_tasks = sum(1 for log in logs if log.status == "timeout")
success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0.0
failure_rate = (failed_tasks + timeout_tasks) / total_tasks if total_tasks > 0 else 0.0
durations = [log.duration_ms for log in logs if log.duration_ms > 0]
if durations:
average_duration_ms = statistics.mean(durations)
median_duration_ms = statistics.median(durations)
percentile_95_duration_ms = self._percentile(durations, 95)
min_duration_ms = min(durations)
max_duration_ms = max(durations)
else:
average_duration_ms = median_duration_ms = percentile_95_duration_ms = 0.0
min_duration_ms = max_duration_ms = 0
total_tokens = sum(log.tokens_used.get("total_tokens", 0) for log in logs)
average_tokens_per_task = total_tokens / total_tasks if total_tasks > 0 else 0.0
total_cost = sum(log.cost_usd for log in logs)
average_cost_per_task = total_cost / total_tasks if total_tasks > 0 else 0.0
cost_per_token = total_cost / total_tokens if total_tokens > 0 else 0.0
# Calculate throughput (tasks per hour)
if logs and len(logs) > 1:
start_time = min(log.start_time for log in logs if log.start_time)
end_time = max(log.end_time for log in logs if log.end_time)
if start_time and end_time:
try:
start_dt = datetime.fromisoformat(start_time.replace("Z", "+00:00"))
end_dt = datetime.fromisoformat(end_time.replace("Z", "+00:00"))
time_diff_hours = (end_dt - start_dt).total_seconds() / 3600
throughput_tasks_per_hour = total_tasks / time_diff_hours if time_diff_hours > 0 else 0.0
except:
throughput_tasks_per_hour = 0.0
else:
throughput_tasks_per_hour = 0.0
else:
throughput_tasks_per_hour = 0.0
error_rate = sum(1 for log in logs if log.error_details) / total_tasks if total_tasks > 0 else 0.0
retry_rate = sum(1 for log in logs if log.retry_count > 0) / total_tasks if total_tasks > 0 else 0.0
return PerformanceMetrics(
total_tasks=total_tasks,
successful_tasks=successful_tasks,
failed_tasks=failed_tasks,
partial_tasks=partial_tasks,
timeout_tasks=timeout_tasks,
success_rate=success_rate,
failure_rate=failure_rate,
average_duration_ms=average_duration_ms,
median_duration_ms=median_duration_ms,
percentile_95_duration_ms=percentile_95_duration_ms,
min_duration_ms=min_duration_ms,
max_duration_ms=max_duration_ms,
total_tokens_used=total_tokens,
average_tokens_per_task=average_tokens_per_task,
total_cost_usd=total_cost,
average_cost_per_task=average_cost_per_task,
cost_per_token=cost_per_token,
throughput_tasks_per_hour=throughput_tasks_per_hour,
error_rate=error_rate,
retry_rate=retry_rate
)
def _percentile(self, data: List[float], percentile: int) -> float:
"""Calculate percentile value from data"""
if not data:
return 0.0
sorted_data = sorted(data)
index = (percentile / 100) * (len(sorted_data) - 1)
if index.is_integer():
return sorted_data[int(index)]
else:
lower_index = int(index)
upper_index = lower_index + 1
weight = index - lower_index
return sorted_data[lower_index] * (1 - weight) + sorted_data[upper_index] * weight
def analyze_errors(self, logs: List[ExecutionLog]) -> List[ErrorAnalysis]:
"""Analyze error patterns in execution logs"""
error_analyses = []
# Collect all errors
errors = []
for log in logs:
if log.error_details:
errors.append({
"error": log.error_details,
"agent_id": log.agent_id,
"task_type": log.task_type,
"task_id": log.task_id
})
if not errors:
return error_analyses
# Group errors by pattern
error_groups = defaultdict(list)
unclassified_errors = []
for error in errors:
error_message = str(error.get("error", {})).lower()
classified = False
for pattern_name, pattern_info in self.error_patterns.items():
for pattern in pattern_info["patterns"]:
if re.search(pattern, error_message):
error_groups[pattern_name].append(error)
classified = True
break
if classified:
break
if not classified:
unclassified_errors.append(error)
# Analyze each error group
total_errors = len(errors)
for error_type, error_list in error_groups.items():
count = len(error_list)
percentage = (count / total_errors) * 100 if total_errors > 0 else 0.0
affected_agents = list(set(error["agent_id"] for error in error_list))
affected_task_types = list(set(error["task_type"] for error in error_list))
# Extract common patterns from error messages
common_patterns = self._extract_common_patterns([str(e["error"]) for e in error_list])
# Get suggested fixes
pattern_info = self.error_patterns.get(error_type, {})
suggested_fixes = pattern_info.get("common_fixes", [])
# Determine impact level
if percentage > 20 or pattern_info.get("severity") == "critical":
impact_level = "high"
elif percentage > 10 or pattern_info.get("severity") == "high":
impact_level = "medium"
else:
impact_level = "low"
error_analysis = ErrorAnalysis(
error_type=error_type,
count=count,
percentage=percentage,
affected_agents=affected_agents,
affected_task_types=affected_task_types,
common_patterns=common_patterns,
suggested_fixes=suggested_fixes,
impact_level=impact_level
)
error_analyses.append(error_analysis)
# Handle unclassified errors
if unclassified_errors:
count = len(unclassified_errors)
percentage = (count / total_errors) * 100
error_analysis = ErrorAnalysis(
error_type="unclassified",
count=count,
percentage=percentage,
affected_agents=list(set(error["agent_id"] for error in unclassified_errors)),
affected_task_types=list(set(error["task_type"] for error in unclassified_errors)),
common_patterns=self._extract_common_patterns([str(e["error"]) for e in unclassified_errors]),
suggested_fixes=["Review and classify error patterns", "Add specific error handling"],
impact_level="medium" if percentage > 10 else "low"
)
error_analyses.append(error_analysis)
# Sort by impact and count
error_analyses.sort(key=lambda x: (x.impact_level == "high", x.count), reverse=True)
return error_analyses
def _extract_common_patterns(self, error_messages: List[str]) -> List[str]:
"""Extract common patterns from error messages"""
if not error_messages:
return []
# Simple pattern extraction - find common phrases
word_counts = Counter()
for message in error_messages:
words = re.findall(r'\w+', message.lower())
for word in words:
if len(word) > 3: # Ignore short words
word_counts[word] += 1
# Return most common words/patterns
common_patterns = [word for word, count in word_counts.most_common(5)
if count > 1]
return common_patterns
def identify_bottlenecks(self, logs: List[ExecutionLog],
agent_metrics: Dict[str, PerformanceMetrics]) -> List[BottleneckAnalysis]:
"""Identify system bottlenecks"""
bottlenecks = []
# Agent performance bottlenecks
for agent_id, metrics in agent_metrics.items():
if metrics.success_rate < 0.8:
severity = "critical" if metrics.success_rate < 0.5 else "high"
bottlenecks.append(BottleneckAnalysis(
bottleneck_type="agent",
location=agent_id,
severity=severity,
description=f"Agent {agent_id} has low success rate ({metrics.success_rate:.1%})",
impact_on_performance={
"success_rate_impact": (0.95 - metrics.success_rate) * 100,
"cost_impact": metrics.average_cost_per_task * metrics.failed_tasks
},
affected_workflows=self._get_agent_workflows(agent_id, logs),
optimization_suggestions=[
"Review and improve agent logic",
"Add better error handling",
"Optimize tool usage",
"Consider agent specialization"
],
estimated_improvement={
"success_rate_gain": min(0.15, 0.95 - metrics.success_rate),
"cost_reduction": metrics.average_cost_per_task * 0.2
}
))
if metrics.average_duration_ms > 30000: # 30 seconds
severity = "high" if metrics.average_duration_ms > 60000 else "medium"
bottlenecks.append(BottleneckAnalysis(
bottleneck_type="agent",
location=agent_id,
severity=severity,
description=f"Agent {agent_id} has high latency ({metrics.average_duration_ms/1000:.1f}s avg)",
impact_on_performance={
"latency_impact": metrics.average_duration_ms - 10000,
"throughput_impact": max(0, 50 - metrics.total_tasks)
},
affected_workflows=self._get_agent_workflows(agent_id, logs),
optimization_suggestions=[
"Profile and optimize slow operations",
"Implement caching strategies",
"Parallelize independent tasks",
"Optimize API calls"
],
estimated_improvement={
"latency_reduction": min(0.5, (metrics.average_duration_ms - 10000) / metrics.average_duration_ms),
"throughput_gain": 1.3
}
))
# Tool usage bottlenecks
tool_usage = self._analyze_tool_usage(logs)
for tool, usage_stats in tool_usage.items():
if usage_stats.get("error_rate", 0) > 0.2:
bottlenecks.append(BottleneckAnalysis(
bottleneck_type="tool",
location=tool,
severity="high" if usage_stats["error_rate"] > 0.4 else "medium",
description=f"Tool {tool} has high error rate ({usage_stats['error_rate']:.1%})",
impact_on_performance={
"reliability_impact": usage_stats["error_rate"] * usage_stats["usage_count"],
"retry_overhead": usage_stats.get("retry_count", 0) * 1000 # ms
},
affected_workflows=usage_stats.get("affected_workflows", []),
optimization_suggestions=[
"Review tool implementation",
"Add better error handling for tool",
"Implement tool fallbacks",
"Consider alternative tools"
],
estimated_improvement={
"error_reduction": usage_stats["error_rate"] * 0.7,
"performance_gain": 1.2
}
))
# Communication bottlenecks
communication_analysis = self._analyze_communication_patterns(logs)
if communication_analysis.get("high_latency_communications", 0) > 5:
bottlenecks.append(BottleneckAnalysis(
bottleneck_type="communication",
location="inter_agent_communication",
severity="medium",
description="High latency in inter-agent communications detected",
impact_on_performance={
"communication_overhead": communication_analysis.get("avg_communication_latency", 0),
"coordination_efficiency": 0.8 # Assumed impact
},
affected_workflows=communication_analysis.get("affected_workflows", []),
optimization_suggestions=[
"Optimize message serialization",
"Implement message batching",
"Add communication caching",
"Consider direct communication patterns"
],
estimated_improvement={
"communication_latency_reduction": 0.4,
"overall_efficiency_gain": 1.15
}
))
# Resource bottlenecks
resource_analysis = self._analyze_resource_usage(logs)
if resource_analysis.get("high_token_usage_tasks", 0) > 10:
bottlenecks.append(BottleneckAnalysis(
bottleneck_type="resource",
location="token_usage",
severity="medium",
description="High token usage detected in multiple tasks",
impact_on_performance={
"cost_impact": resource_analysis.get("excess_token_cost", 0),
"latency_impact": resource_analysis.get("token_processing_overhead", 0)
},
affected_workflows=resource_analysis.get("high_usage_workflows", []),
optimization_suggestions=[
"Optimize prompt engineering",
"Implement response caching",
"Use more efficient models for simple tasks",
"Add token usage monitoring"
],
estimated_improvement={
"cost_reduction": 0.3,
"efficiency_gain": 1.1
}
))
# Sort bottlenecks by severity and impact
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
bottlenecks.sort(key=lambda x: (severity_order[x.severity],
-sum(x.impact_on_performance.values())))
return bottlenecks
def _get_agent_workflows(self, agent_id: str, logs: List[ExecutionLog]) -> List[str]:
"""Get workflows affected by a specific agent"""
workflows = set()
for log in logs:
if log.agent_id == agent_id:
workflows.add(log.task_type)
return list(workflows)
def _analyze_tool_usage(self, logs: List[ExecutionLog]) -> Dict[str, Dict[str, Any]]:
"""Analyze tool usage patterns"""
tool_stats = defaultdict(lambda: {
"usage_count": 0,
"error_count": 0,
"total_duration": 0,
"affected_workflows": set(),
"retry_count": 0
})
for log in logs:
for tool in log.tools_used:
stats = tool_stats[tool]
stats["usage_count"] += 1
stats["total_duration"] += log.duration_ms
stats["affected_workflows"].add(log.task_type)
if log.error_details:
stats["error_count"] += 1
if log.retry_count > 0:
stats["retry_count"] += log.retry_count
# Calculate derived metrics
result = {}
for tool, stats in tool_stats.items():
result[tool] = {
"usage_count": stats["usage_count"],
"error_rate": stats["error_count"] / stats["usage_count"] if stats["usage_count"] > 0 else 0,
"avg_duration": stats["total_duration"] / stats["usage_count"] if stats["usage_count"] > 0 else 0,
"affected_workflows": list(stats["affected_workflows"]),
"retry_count": stats["retry_count"]
}
return result
def _analyze_communication_patterns(self, logs: List[ExecutionLog]) -> Dict[str, Any]:
"""Analyze communication patterns between agents"""
# This is a simplified analysis - in a real system, you'd have more detailed communication logs
communication_actions = []
for log in logs:
for action in log.actions:
if action.get("type") in ["message", "delegate", "coordinate", "respond"]:
communication_actions.append({
"duration": action.get("duration_ms", 0),
"success": action.get("success", True),
"workflow": log.task_type
})
if not communication_actions:
return {}
avg_latency = sum(action["duration"] for action in communication_actions) / len(communication_actions)
high_latency_count = sum(1 for action in communication_actions if action["duration"] > 5000)
return {
"total_communications": len(communication_actions),
"avg_communication_latency": avg_latency,
"high_latency_communications": high_latency_count,
"affected_workflows": list(set(action["workflow"] for action in communication_actions))
}
def _analyze_resource_usage(self, logs: List[ExecutionLog]) -> Dict[str, Any]:
"""Analyze resource usage patterns"""
token_usage = [log.tokens_used.get("total_tokens", 0) for log in logs]
if not token_usage:
return {}
avg_tokens = sum(token_usage) / len(token_usage)
high_usage_threshold = avg_tokens * 2
high_usage_tasks = sum(1 for tokens in token_usage if tokens > high_usage_threshold)
# Estimate excess cost
excess_tokens = sum(max(0, tokens - avg_tokens) for tokens in token_usage)
excess_cost = excess_tokens * 0.00002 # Rough estimate
return {
"avg_token_usage": avg_tokens,
"high_token_usage_tasks": high_usage_tasks,
"excess_token_cost": excess_cost,
"token_processing_overhead": high_usage_tasks * 500, # Estimated overhead in ms
"high_usage_workflows": [log.task_type for log in logs
if log.tokens_used.get("total_tokens", 0) > high_usage_threshold]
}
def generate_optimization_recommendations(self,
system_metrics: PerformanceMetrics,
error_analyses: List[ErrorAnalysis],
bottlenecks: List[BottleneckAnalysis]) -> List[OptimizationRecommendation]:
"""Generate optimization recommendations based on analysis"""
recommendations = []
# Performance optimization recommendations
if system_metrics.success_rate < 0.9:
recommendations.append(OptimizationRecommendation(
category="reliability",
priority="high",
title="Improve System Reliability",
description=f"System success rate is {system_metrics.success_rate:.1%}, below target of 90%",
implementation_effort="medium",
expected_impact={
"success_rate_improvement": min(0.1, 0.95 - system_metrics.success_rate),
"cost_reduction": system_metrics.average_cost_per_task * 0.15
},
estimated_cost_savings=system_metrics.total_cost_usd * 0.1,
estimated_performance_gain=1.2,
implementation_steps=[
"Identify and fix top error patterns",
"Implement better error handling and retries",
"Add comprehensive monitoring and alerting",
"Implement graceful degradation patterns"
],
risks=["Temporary increase in complexity", "Potential initial performance overhead"],
prerequisites=["Error analysis completion", "Monitoring infrastructure"]
))
# Cost optimization recommendations
if system_metrics.average_cost_per_task > 0.1:
recommendations.append(OptimizationRecommendation(
category="cost",
priority="medium",
title="Optimize Token Usage and Costs",
description=f"Average cost per task (${system_metrics.average_cost_per_task:.3f}) is above optimal range",
implementation_effort="low",
expected_impact={
"cost_reduction": system_metrics.average_cost_per_task * 0.3,
"efficiency_improvement": 1.15
},
estimated_cost_savings=system_metrics.total_cost_usd * 0.3,
estimated_performance_gain=1.05,
implementation_steps=[
"Implement prompt optimization",
"Add response caching for repeated queries",
"Use smaller models for simple tasks",
"Implement token usage monitoring and alerts"
],
risks=["Potential quality reduction with smaller models"],
prerequisites=["Token usage analysis", "Caching infrastructure"]
))
# Performance optimization recommendations
if system_metrics.average_duration_ms > 10000:
recommendations.append(OptimizationRecommendation(
category="performance",
priority="high",
title="Reduce Task Latency",
description=f"Average task duration ({system_metrics.average_duration_ms/1000:.1f}s) exceeds target",
implementation_effort="high",
expected_impact={
"latency_reduction": min(0.5, (system_metrics.average_duration_ms - 5000) / system_metrics.average_duration_ms),
"throughput_improvement": 1.5
},
estimated_performance_gain=1.4,
implementation_steps=[
"Profile and optimize slow operations",
"Implement parallel processing where possible",
"Add caching for expensive operations",
"Optimize API calls and reduce round trips"
],
risks=["Increased system complexity", "Potential resource usage increase"],
prerequisites=["Performance profiling tools", "Caching infrastructure"]
))
# Error-based recommendations
high_impact_errors = [ea for ea in error_analyses if ea.impact_level == "high"]
if high_impact_errors:
for error_analysis in high_impact_errors[:3]: # Top 3 high impact errors
recommendations.append(OptimizationRecommendation(
category="reliability",
priority="high",
title=f"Address {error_analysis.error_type.title()} Errors",
description=f"{error_analysis.error_type.title()} errors occur in {error_analysis.percentage:.1f}% of cases",
implementation_effort="medium",
expected_impact={
"error_reduction": error_analysis.percentage / 100,
"reliability_improvement": 1.1
},
estimated_cost_savings=system_metrics.total_cost_usd * (error_analysis.percentage / 100) * 0.5,
implementation_steps=error_analysis.suggested_fixes,
risks=["May require significant code changes"],
prerequisites=["Root cause analysis", "Testing framework"]
))
# Bottleneck-based recommendations
critical_bottlenecks = [b for b in bottlenecks if b.severity in ["critical", "high"]]
for bottleneck in critical_bottlenecks[:2]: # Top 2 critical bottlenecks
recommendations.append(OptimizationRecommendation(
category="performance",
priority="high" if bottleneck.severity == "critical" else "medium",
title=f"Address {bottleneck.bottleneck_type.title()} Bottleneck",
description=bottleneck.description,
implementation_effort="medium",
expected_impact=bottleneck.estimated_improvement,
estimated_performance_gain=list(bottleneck.estimated_improvement.values())[0] if bottleneck.estimated_improvement else 1.1,
implementation_steps=bottleneck.optimization_suggestions,
risks=["System downtime during implementation", "Potential cascade effects"],
prerequisites=["Impact assessment", "Rollback plan"]
))
# Scalability recommendations
if system_metrics.throughput_tasks_per_hour < 20:
recommendations.append(OptimizationRecommendation(
category="scalability",
priority="medium",
title="Improve System Scalability",
description="Current throughput indicates potential scalability issues",
implementation_effort="high",
expected_impact={
"throughput_improvement": 2.0,
"scalability_headroom": 5.0
},
estimated_performance_gain=2.0,
implementation_steps=[
"Implement horizontal scaling for agents",
"Add load balancing and resource pooling",
"Optimize resource allocation algorithms",
"Implement auto-scaling policies"
],
risks=["High implementation complexity", "Increased operational overhead"],
prerequisites=["Infrastructure scaling capability", "Monitoring and metrics"]
))
# Sort recommendations by priority and impact
priority_order = {"high": 0, "medium": 1, "low": 2}
recommendations.sort(key=lambda x: (
priority_order[x.priority],
-x.estimated_performance_gain if x.estimated_performance_gain else 0,
-x.estimated_cost_savings if x.estimated_cost_savings else 0
))
return recommendations
def generate_report(self, logs: List[ExecutionLog]) -> EvaluationReport:
"""Generate complete evaluation report"""
# Calculate system metrics
system_metrics = self.calculate_performance_metrics(logs)
# Calculate per-agent metrics
agents = set(log.agent_id for log in logs)
agent_metrics = {}
for agent_id in agents:
agent_logs = [log for log in logs if log.agent_id == agent_id]
agent_metrics[agent_id] = self.calculate_performance_metrics(agent_logs)
# Calculate per-task-type metrics
task_types = set(log.task_type for log in logs)
task_type_metrics = {}
for task_type in task_types:
task_logs = [log for log in logs if log.task_type == task_type]
task_type_metrics[task_type] = self.calculate_performance_metrics(task_logs)
# Analyze tool usage
tool_usage_analysis = self._analyze_tool_usage(logs)
# Analyze errors
error_analysis = self.analyze_errors(logs)
# Identify bottlenecks
bottleneck_analysis = self.identify_bottlenecks(logs, agent_metrics)
# Generate optimization recommendations
optimization_recommendations = self.generate_optimization_recommendations(
system_metrics, error_analysis, bottleneck_analysis)
# Generate trends analysis (simplified)
trends_analysis = self._generate_trends_analysis(logs)
# Generate cost breakdown
cost_breakdown = self._generate_cost_breakdown(logs, agent_metrics)
# Check SLA compliance
sla_compliance = self._check_sla_compliance(system_metrics)
# Create summary
summary = {
"evaluation_period": {
"start_time": min(log.start_time for log in logs if log.start_time) if logs else None,
"end_time": max(log.end_time for log in logs if log.end_time) if logs else None,
"total_duration_hours": system_metrics.total_tasks / system_metrics.throughput_tasks_per_hour if system_metrics.throughput_tasks_per_hour > 0 else 0
},
"overall_health": self._assess_overall_health(system_metrics),
"key_findings": self._extract_key_findings(system_metrics, error_analysis, bottleneck_analysis),
"critical_issues": len([b for b in bottleneck_analysis if b.severity == "critical"]),
"improvement_opportunities": len(optimization_recommendations)
}
# Create metadata
metadata = {
"generated_at": datetime.now().isoformat(),
"evaluator_version": "1.0",
"total_logs_processed": len(logs),
"agents_analyzed": len(agents),
"task_types_analyzed": len(task_types),
"analysis_completeness": "full"
}
return EvaluationReport(
summary=summary,
system_metrics=system_metrics,
agent_metrics=agent_metrics,
task_type_metrics=task_type_metrics,
tool_usage_analysis=tool_usage_analysis,
error_analysis=error_analysis,
bottleneck_analysis=bottleneck_analysis,
optimization_recommendations=optimization_recommendations,
trends_analysis=trends_analysis,
cost_breakdown=cost_breakdown,
sla_compliance=sla_compliance,
metadata=metadata
)
def _generate_trends_analysis(self, logs: List[ExecutionLog]) -> Dict[str, Any]:
"""Generate trends analysis (simplified version)"""
# Group logs by time periods (daily)
daily_metrics = defaultdict(list)
for log in logs:
if log.start_time:
try:
date = log.start_time.split('T')[0] # Extract date part
daily_metrics[date].append(log)
except:
continue
trends = {}
if len(daily_metrics) > 1:
daily_success_rates = {}
daily_avg_durations = {}
daily_costs = {}
for date, date_logs in daily_metrics.items():
if date_logs:
metrics = self.calculate_performance_metrics(date_logs)
daily_success_rates[date] = metrics.success_rate
daily_avg_durations[date] = metrics.average_duration_ms
daily_costs[date] = metrics.total_cost_usd
trends = {
"daily_success_rates": daily_success_rates,
"daily_avg_durations": daily_avg_durations,
"daily_costs": daily_costs,
"trend_direction": {
"success_rate": "stable", # Simplified
"duration": "stable",
"cost": "stable"
}
}
return trends
def _generate_cost_breakdown(self, logs: List[ExecutionLog],
agent_metrics: Dict[str, PerformanceMetrics]) -> Dict[str, Any]:
"""Generate cost breakdown analysis"""
total_cost = sum(log.cost_usd for log in logs)
# Cost by agent
agent_costs = {}
for agent_id, metrics in agent_metrics.items():
agent_costs[agent_id] = metrics.total_cost_usd
# Cost by task type
task_type_costs = defaultdict(float)
for log in logs:
task_type_costs[log.task_type] += log.cost_usd
# Token cost breakdown
total_tokens = sum(log.tokens_used.get("total_tokens", 0) for log in logs)
return {
"total_cost": total_cost,
"cost_by_agent": dict(agent_costs),
"cost_by_task_type": dict(task_type_costs),
"cost_per_token": total_cost / total_tokens if total_tokens > 0 else 0,
"top_cost_drivers": sorted(task_type_costs.items(), key=lambda x: x[1], reverse=True)[:5]
}
def _check_sla_compliance(self, metrics: PerformanceMetrics) -> Dict[str, Any]:
"""Check SLA compliance"""
thresholds = self.performance_thresholds
compliance = {
"success_rate": {
"target": 0.95,
"actual": metrics.success_rate,
"compliant": metrics.success_rate >= 0.95,
"gap": max(0, 0.95 - metrics.success_rate)
},
"average_latency": {
"target": 10000, # 10 seconds
"actual": metrics.average_duration_ms,
"compliant": metrics.average_duration_ms <= 10000,
"gap": max(0, metrics.average_duration_ms - 10000)
},
"error_rate": {
"target": 0.05, # 5%
"actual": metrics.error_rate,
"compliant": metrics.error_rate <= 0.05,
"gap": max(0, metrics.error_rate - 0.05)
}
}
overall_compliance = all(sla["compliant"] for sla in compliance.values())
return {
"overall_compliant": overall_compliance,
"sla_details": compliance,
"compliance_score": sum(1 for sla in compliance.values() if sla["compliant"]) / len(compliance)
}
def _assess_overall_health(self, metrics: PerformanceMetrics) -> str:
"""Assess overall system health"""
health_score = 0
# Success rate contribution (40%)
if metrics.success_rate >= 0.95:
health_score += 40
elif metrics.success_rate >= 0.90:
health_score += 30
elif metrics.success_rate >= 0.80:
health_score += 20
else:
health_score += 10
# Performance contribution (30%)
if metrics.average_duration_ms <= 5000:
health_score += 30
elif metrics.average_duration_ms <= 10000:
health_score += 20
elif metrics.average_duration_ms <= 30000:
health_score += 15
else:
health_score += 5
# Error rate contribution (20%)
if metrics.error_rate <= 0.02:
health_score += 20
elif metrics.error_rate <= 0.05:
health_score += 15
elif metrics.error_rate <= 0.10:
health_score += 10
else:
health_score += 0
# Cost efficiency contribution (10%)
if metrics.cost_per_token <= 0.00005:
health_score += 10
elif metrics.cost_per_token <= 0.0001:
health_score += 7
else:
health_score += 3
if health_score >= 85:
return "excellent"
elif health_score >= 70:
return "good"
elif health_score >= 50:
return "fair"
else:
return "poor"
def _extract_key_findings(self, metrics: PerformanceMetrics,
errors: List[ErrorAnalysis],
bottlenecks: List[BottleneckAnalysis]) -> List[str]:
"""Extract key findings from analysis"""
findings = []
# Performance findings
if metrics.success_rate < 0.9:
findings.append(f"Success rate ({metrics.success_rate:.1%}) below target")
if metrics.average_duration_ms > 15000:
findings.append(f"High average latency ({metrics.average_duration_ms/1000:.1f}s)")
# Error findings
high_impact_errors = [e for e in errors if e.impact_level == "high"]
if high_impact_errors:
findings.append(f"{len(high_impact_errors)} high-impact error patterns identified")
# Bottleneck findings
critical_bottlenecks = [b for b in bottlenecks if b.severity == "critical"]
if critical_bottlenecks:
findings.append(f"{len(critical_bottlenecks)} critical bottlenecks found")
# Cost findings
if metrics.cost_per_token > 0.0001:
findings.append("Token usage costs above optimal range")
return findings
def main():
parser = argparse.ArgumentParser(description="Multi-Agent System Performance Evaluator")
parser.add_argument("input_file", help="JSON file with execution logs")
parser.add_argument("-o", "--output", help="Output file prefix (default: evaluation_report)")
parser.add_argument("--format", choices=["json", "both"], default="both",
help="Output format")
parser.add_argument("--detailed", action="store_true",
help="Include detailed analysis in output")
args = parser.parse_args()
try:
# Load execution logs
with open(args.input_file, 'r') as f:
logs_data = json.load(f)
# Parse logs
evaluator = AgentEvaluator()
logs = evaluator.parse_execution_logs(logs_data.get("execution_logs", []))
if not logs:
print("No valid execution logs found in input file", file=sys.stderr)
sys.exit(1)
# Generate evaluation report
report = evaluator.generate_report(logs)
# Prepare output
output_data = asdict(report)
# Output files
output_prefix = args.output or "evaluation_report"
if args.format in ["json", "both"]:
with open(f"{output_prefix}.json", 'w') as f:
json.dump(output_data, f, indent=2, default=str)
print(f"JSON report written to {output_prefix}.json")
if args.format == "both":
# Generate separate detailed files
# Performance summary
summary_data = {
"summary": report.summary,
"system_metrics": asdict(report.system_metrics),
"sla_compliance": report.sla_compliance
}
with open(f"{output_prefix}_summary.json", 'w') as f:
json.dump(summary_data, f, indent=2, default=str)
print(f"Summary report written to {output_prefix}_summary.json")
# Recommendations
recommendations_data = {
"optimization_recommendations": [asdict(rec) for rec in report.optimization_recommendations],
"bottleneck_analysis": [asdict(b) for b in report.bottleneck_analysis]
}
with open(f"{output_prefix}_recommendations.json", 'w') as f:
json.dump(recommendations_data, f, indent=2)
print(f"Recommendations written to {output_prefix}_recommendations.json")
# Error analysis
error_data = {
"error_analysis": [asdict(e) for e in report.error_analysis],
"error_summary": {
"total_errors": sum(e.count for e in report.error_analysis),
"high_impact_errors": len([e for e in report.error_analysis if e.impact_level == "high"])
}
}
with open(f"{output_prefix}_errors.json", 'w') as f:
json.dump(error_data, f, indent=2)
print(f"Error analysis written to {output_prefix}_errors.json")
# Print executive summary
print(f"\n{'='*60}")
print(f"AGENT SYSTEM EVALUATION REPORT")
print(f"{'='*60}")
print(f"Overall Health: {report.summary['overall_health'].upper()}")
print(f"Total Tasks: {report.system_metrics.total_tasks}")
print(f"Success Rate: {report.system_metrics.success_rate:.1%}")
print(f"Average Duration: {report.system_metrics.average_duration_ms/1000:.1f}s")
print(f"Total Cost: ${report.system_metrics.total_cost_usd:.2f}")
print(f"Agents Analyzed: {len(report.agent_metrics)}")
print(f"\nKey Findings:")
for finding in report.summary['key_findings']:
print(f" • {finding}")
print(f"\nTop Recommendations:")
high_priority_recs = [r for r in report.optimization_recommendations if r.priority == "high"][:3]
for i, rec in enumerate(high_priority_recs, 1):
print(f" {i}. {rec.title}")
if report.summary['critical_issues'] > 0:
print(f"\n⚠️ CRITICAL: {report.summary['critical_issues']} critical issues require immediate attention")
print(f"\n📊 Detailed reports available in generated files")
print(f"{'='*60}")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main() #!/usr/bin/env python3
"""
Agent Planner - Multi-Agent System Architecture Designer
Given a system description (goal, tasks, constraints, team size), designs a multi-agent
architecture: defines agent roles, responsibilities, capabilities needed, communication
topology, tool requirements. Generates architecture diagram (Mermaid).
Input: system requirements JSON
Output: agent architecture + role definitions + Mermaid diagram + implementation roadmap
"""
import json
import argparse
import sys
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
class AgentArchitecturePattern(Enum):
"""Supported agent architecture patterns"""
SINGLE_AGENT = "single_agent"
SUPERVISOR = "supervisor"
SWARM = "swarm"
HIERARCHICAL = "hierarchical"
PIPELINE = "pipeline"
class CommunicationPattern(Enum):
"""Agent communication patterns"""
DIRECT_MESSAGE = "direct_message"
SHARED_STATE = "shared_state"
EVENT_DRIVEN = "event_driven"
MESSAGE_QUEUE = "message_queue"
class AgentRole(Enum):
"""Standard agent role archetypes"""
COORDINATOR = "coordinator"
SPECIALIST = "specialist"
INTERFACE = "interface"
MONITOR = "monitor"
@dataclass
class Tool:
"""Tool definition for agents"""
name: str
description: str
input_schema: Dict[str, Any]
output_schema: Dict[str, Any]
capabilities: List[str]
reliability: str = "high" # high, medium, low
latency: str = "low" # low, medium, high
@dataclass
class AgentDefinition:
"""Complete agent definition"""
name: str
role: str
archetype: AgentRole
responsibilities: List[str]
capabilities: List[str]
tools: List[Tool]
communication_interfaces: List[str]
constraints: Dict[str, Any]
success_criteria: List[str]
dependencies: List[str] = None
@dataclass
class CommunicationLink:
"""Communication link between agents"""
from_agent: str
to_agent: str
pattern: CommunicationPattern
data_format: str
frequency: str
criticality: str
@dataclass
class SystemRequirements:
"""Input system requirements"""
goal: str
description: str
tasks: List[str]
constraints: Dict[str, Any]
team_size: int
performance_requirements: Dict[str, Any]
safety_requirements: List[str]
integration_requirements: List[str]
scale_requirements: Dict[str, Any]
@dataclass
class ArchitectureDesign:
"""Complete architecture design output"""
pattern: AgentArchitecturePattern
agents: List[AgentDefinition]
communication_topology: List[CommunicationLink]
shared_resources: List[Dict[str, Any]]
guardrails: List[Dict[str, Any]]
scaling_strategy: Dict[str, Any]
failure_handling: Dict[str, Any]
class AgentPlanner:
"""Multi-agent system architecture planner"""
def __init__(self):
self.common_tools = self._define_common_tools()
self.pattern_heuristics = self._define_pattern_heuristics()
def _define_common_tools(self) -> Dict[str, Tool]:
"""Define commonly used tools across agents"""
return {
"web_search": Tool(
name="web_search",
description="Search the web for information",
input_schema={"type": "object", "properties": {"query": {"type": "string"}}},
output_schema={"type": "object", "properties": {"results": {"type": "array"}}},
capabilities=["research", "information_gathering"],
reliability="high",
latency="medium"
),
"code_executor": Tool(
name="code_executor",
description="Execute code in various languages",
input_schema={"type": "object", "properties": {"language": {"type": "string"}, "code": {"type": "string"}}},
output_schema={"type": "object", "properties": {"result": {"type": "string"}, "error": {"type": "string"}}},
capabilities=["code_execution", "testing", "automation"],
reliability="high",
latency="low"
),
"file_manager": Tool(
name="file_manager",
description="Manage files and directories",
input_schema={"type": "object", "properties": {"action": {"type": "string"}, "path": {"type": "string"}}},
output_schema={"type": "object", "properties": {"success": {"type": "boolean"}, "content": {"type": "string"}}},
capabilities=["file_operations", "data_management"],
reliability="high",
latency="low"
),
"data_analyzer": Tool(
name="data_analyzer",
description="Analyze and process data",
input_schema={"type": "object", "properties": {"data": {"type": "object"}, "analysis_type": {"type": "string"}}},
output_schema={"type": "object", "properties": {"insights": {"type": "array"}, "metrics": {"type": "object"}}},
capabilities=["data_analysis", "statistics", "visualization"],
reliability="high",
latency="medium"
),
"api_client": Tool(
name="api_client",
description="Make API calls to external services",
input_schema={"type": "object", "properties": {"url": {"type": "string"}, "method": {"type": "string"}, "data": {"type": "object"}}},
output_schema={"type": "object", "properties": {"response": {"type": "object"}, "status": {"type": "integer"}}},
capabilities=["integration", "external_services"],
reliability="medium",
latency="medium"
)
}
def _define_pattern_heuristics(self) -> Dict[AgentArchitecturePattern, Dict[str, Any]]:
"""Define heuristics for selecting architecture patterns"""
return {
AgentArchitecturePattern.SINGLE_AGENT: {
"team_size_range": (1, 1),
"task_complexity": "simple",
"coordination_overhead": "none",
"suitable_for": ["simple tasks", "prototyping", "single domain"],
"scaling_limit": "low"
},
AgentArchitecturePattern.SUPERVISOR: {
"team_size_range": (2, 8),
"task_complexity": "medium",
"coordination_overhead": "low",
"suitable_for": ["hierarchical tasks", "clear delegation", "quality control"],
"scaling_limit": "medium"
},
AgentArchitecturePattern.SWARM: {
"team_size_range": (3, 20),
"task_complexity": "high",
"coordination_overhead": "high",
"suitable_for": ["parallel processing", "distributed problem solving", "fault tolerance"],
"scaling_limit": "high"
},
AgentArchitecturePattern.HIERARCHICAL: {
"team_size_range": (5, 50),
"task_complexity": "very high",
"coordination_overhead": "medium",
"suitable_for": ["large organizations", "complex workflows", "enterprise systems"],
"scaling_limit": "very high"
},
AgentArchitecturePattern.PIPELINE: {
"team_size_range": (3, 15),
"task_complexity": "medium",
"coordination_overhead": "low",
"suitable_for": ["sequential processing", "data pipelines", "assembly line tasks"],
"scaling_limit": "medium"
}
}
def select_architecture_pattern(self, requirements: SystemRequirements) -> AgentArchitecturePattern:
"""Select the most appropriate architecture pattern based on requirements"""
team_size = requirements.team_size
task_count = len(requirements.tasks)
performance_reqs = requirements.performance_requirements
# Score each pattern based on requirements
pattern_scores = {}
for pattern, heuristics in self.pattern_heuristics.items():
score = 0
# Team size fit
min_size, max_size = heuristics["team_size_range"]
if min_size <= team_size <= max_size:
score += 3
elif abs(team_size - min_size) <= 2 or abs(team_size - max_size) <= 2:
score += 1
# Task complexity assessment
complexity_indicators = [
"parallel" in requirements.description.lower(),
"sequential" in requirements.description.lower(),
"hierarchical" in requirements.description.lower(),
"distributed" in requirements.description.lower(),
task_count > 5,
len(requirements.constraints) > 3
]
complexity_score = sum(complexity_indicators)
if pattern == AgentArchitecturePattern.SINGLE_AGENT and complexity_score <= 2:
score += 2
elif pattern == AgentArchitecturePattern.SUPERVISOR and 2 <= complexity_score <= 4:
score += 2
elif pattern == AgentArchitecturePattern.PIPELINE and "sequential" in requirements.description.lower():
score += 3
elif pattern == AgentArchitecturePattern.SWARM and "parallel" in requirements.description.lower():
score += 3
elif pattern == AgentArchitecturePattern.HIERARCHICAL and complexity_score >= 4:
score += 2
# Performance requirements
if performance_reqs.get("high_throughput", False) and pattern in [AgentArchitecturePattern.SWARM, AgentArchitecturePattern.PIPELINE]:
score += 2
if performance_reqs.get("fault_tolerance", False) and pattern == AgentArchitecturePattern.SWARM:
score += 2
if performance_reqs.get("low_latency", False) and pattern in [AgentArchitecturePattern.SINGLE_AGENT, AgentArchitecturePattern.PIPELINE]:
score += 1
pattern_scores[pattern] = score
# Select the highest scoring pattern
best_pattern = max(pattern_scores.items(), key=lambda x: x[1])[0]
return best_pattern
def design_agents(self, requirements: SystemRequirements, pattern: AgentArchitecturePattern) -> List[AgentDefinition]:
"""Design individual agents based on requirements and architecture pattern"""
agents = []
if pattern == AgentArchitecturePattern.SINGLE_AGENT:
agents = self._design_single_agent(requirements)
elif pattern == AgentArchitecturePattern.SUPERVISOR:
agents = self._design_supervisor_agents(requirements)
elif pattern == AgentArchitecturePattern.SWARM:
agents = self._design_swarm_agents(requirements)
elif pattern == AgentArchitecturePattern.HIERARCHICAL:
agents = self._design_hierarchical_agents(requirements)
elif pattern == AgentArchitecturePattern.PIPELINE:
agents = self._design_pipeline_agents(requirements)
return agents
def _design_single_agent(self, requirements: SystemRequirements) -> List[AgentDefinition]:
"""Design a single general-purpose agent"""
all_tools = list(self.common_tools.values())
agent = AgentDefinition(
name="universal_agent",
role="Universal Task Handler",
archetype=AgentRole.SPECIALIST,
responsibilities=requirements.tasks,
capabilities=["general_purpose", "multi_domain", "adaptable"],
tools=all_tools,
communication_interfaces=["direct_user_interface"],
constraints={
"max_concurrent_tasks": 1,
"memory_limit": "high",
"response_time": "fast"
},
success_criteria=["complete all assigned tasks", "maintain quality standards", "respond within time limits"],
dependencies=[]
)
return [agent]
def _design_supervisor_agents(self, requirements: SystemRequirements) -> List[AgentDefinition]:
"""Design supervisor pattern agents"""
agents = []
# Create supervisor agent
supervisor = AgentDefinition(
name="supervisor_agent",
role="Task Coordinator and Quality Controller",
archetype=AgentRole.COORDINATOR,
responsibilities=[
"task_decomposition",
"delegation",
"progress_monitoring",
"quality_assurance",
"result_aggregation"
],
capabilities=["planning", "coordination", "evaluation", "decision_making"],
tools=[self.common_tools["file_manager"], self.common_tools["data_analyzer"]],
communication_interfaces=["user_interface", "agent_messaging"],
constraints={
"max_concurrent_supervisions": 5,
"decision_timeout": "30s"
},
success_criteria=["successful task completion", "optimal resource utilization", "quality standards met"],
dependencies=[]
)
agents.append(supervisor)
# Create specialist agents based on task domains
task_domains = self._identify_task_domains(requirements.tasks)
for i, domain in enumerate(task_domains[:requirements.team_size - 1]):
specialist = AgentDefinition(
name=f"{domain}_specialist",
role=f"{domain.title()} Specialist",
archetype=AgentRole.SPECIALIST,
responsibilities=[task for task in requirements.tasks if domain in task.lower()],
capabilities=[f"{domain}_expertise", "specialized_tools", "domain_knowledge"],
tools=self._select_tools_for_domain(domain),
communication_interfaces=["supervisor_messaging"],
constraints={
"domain_scope": domain,
"task_queue_size": 10
},
success_criteria=[f"excel in {domain} tasks", "maintain domain expertise", "provide quality output"],
dependencies=["supervisor_agent"]
)
agents.append(specialist)
return agents
def _design_swarm_agents(self, requirements: SystemRequirements) -> List[AgentDefinition]:
"""Design swarm pattern agents"""
agents = []
# Create peer agents with overlapping capabilities
agent_count = min(requirements.team_size, 10) # Reasonable swarm size
base_capabilities = ["collaboration", "consensus", "adaptation", "peer_communication"]
for i in range(agent_count):
agent = AgentDefinition(
name=f"swarm_agent_{i+1}",
role=f"Collaborative Worker #{i+1}",
archetype=AgentRole.SPECIALIST,
responsibilities=requirements.tasks, # All agents can handle all tasks
capabilities=base_capabilities + [f"specialization_{i%3}"], # Some specialization
tools=list(self.common_tools.values()),
communication_interfaces=["peer_messaging", "broadcast", "consensus_protocol"],
constraints={
"peer_discovery_timeout": "10s",
"consensus_threshold": 0.6,
"max_retries": 3
},
success_criteria=["contribute to group goals", "maintain peer relationships", "adapt to failures"],
dependencies=[f"swarm_agent_{j+1}" for j in range(agent_count) if j != i]
)
agents.append(agent)
return agents
def _design_hierarchical_agents(self, requirements: SystemRequirements) -> List[AgentDefinition]:
"""Design hierarchical pattern agents"""
agents = []
# Create management hierarchy
levels = min(3, requirements.team_size // 3) # Reasonable hierarchy depth
agents_per_level = requirements.team_size // levels
# Top level manager
manager = AgentDefinition(
name="executive_manager",
role="Executive Manager",
archetype=AgentRole.COORDINATOR,
responsibilities=["strategic_planning", "resource_allocation", "performance_monitoring"],
capabilities=["leadership", "strategy", "resource_management", "oversight"],
tools=[self.common_tools["data_analyzer"], self.common_tools["file_manager"]],
communication_interfaces=["executive_dashboard", "management_messaging"],
constraints={"management_span": 5, "decision_authority": "high"},
success_criteria=["achieve system goals", "optimize resource usage", "maintain quality"],
dependencies=[]
)
agents.append(manager)
# Middle managers
for i in range(agents_per_level - 1):
middle_manager = AgentDefinition(
name=f"team_manager_{i+1}",
role=f"Team Manager #{i+1}",
archetype=AgentRole.COORDINATOR,
responsibilities=["team_coordination", "task_distribution", "progress_tracking"],
capabilities=["team_management", "coordination", "reporting"],
tools=[self.common_tools["file_manager"]],
communication_interfaces=["management_messaging", "team_messaging"],
constraints={"team_size": 3, "reporting_frequency": "hourly"},
success_criteria=["team performance", "task completion", "team satisfaction"],
dependencies=["executive_manager"]
)
agents.append(middle_manager)
# Workers
remaining_agents = requirements.team_size - len(agents)
for i in range(remaining_agents):
worker = AgentDefinition(
name=f"worker_agent_{i+1}",
role=f"Task Worker #{i+1}",
archetype=AgentRole.SPECIALIST,
responsibilities=["task_execution", "result_delivery", "status_reporting"],
capabilities=["task_execution", "specialized_skills", "reliability"],
tools=self._select_diverse_tools(),
communication_interfaces=["team_messaging"],
constraints={"task_focus": "single", "reporting_interval": "30min"},
success_criteria=["complete assigned tasks", "maintain quality", "meet deadlines"],
dependencies=[f"team_manager_{(i // 3) + 1}"]
)
agents.append(worker)
return agents
def _design_pipeline_agents(self, requirements: SystemRequirements) -> List[AgentDefinition]:
"""Design pipeline pattern agents"""
agents = []
# Create sequential processing stages
pipeline_stages = self._identify_pipeline_stages(requirements.tasks)
for i, stage in enumerate(pipeline_stages):
agent = AgentDefinition(
name=f"pipeline_stage_{i+1}_{stage}",
role=f"Pipeline Stage {i+1}: {stage.title()}",
archetype=AgentRole.SPECIALIST,
responsibilities=[f"process_{stage}", f"validate_{stage}_output", "handoff_to_next_stage"],
capabilities=[f"{stage}_processing", "quality_control", "data_transformation"],
tools=self._select_tools_for_stage(stage),
communication_interfaces=["pipeline_queue", "stage_messaging"],
constraints={
"processing_order": i + 1,
"batch_size": 10,
"stage_timeout": "5min"
},
success_criteria=[f"successfully process {stage}", "maintain data integrity", "meet throughput targets"],
dependencies=[f"pipeline_stage_{i}_{pipeline_stages[i-1]}"] if i > 0 else []
)
agents.append(agent)
return agents
def _identify_task_domains(self, tasks: List[str]) -> List[str]:
"""Identify distinct domains from task list"""
domains = []
domain_keywords = {
"research": ["research", "search", "find", "investigate", "analyze"],
"development": ["code", "build", "develop", "implement", "program"],
"data": ["data", "process", "analyze", "calculate", "compute"],
"communication": ["write", "send", "message", "communicate", "report"],
"file": ["file", "document", "save", "load", "manage"]
}
for domain, keywords in domain_keywords.items():
if any(keyword in " ".join(tasks).lower() for keyword in keywords):
domains.append(domain)
return domains[:5] # Limit to 5 domains
def _identify_pipeline_stages(self, tasks: List[str]) -> List[str]:
"""Identify pipeline stages from task list"""
# Common pipeline patterns
common_stages = ["input", "process", "transform", "validate", "output"]
# Try to infer stages from tasks
stages = []
task_text = " ".join(tasks).lower()
if "collect" in task_text or "gather" in task_text:
stages.append("collection")
if "process" in task_text or "transform" in task_text:
stages.append("processing")
if "analyze" in task_text or "evaluate" in task_text:
stages.append("analysis")
if "validate" in task_text or "check" in task_text:
stages.append("validation")
if "output" in task_text or "deliver" in task_text or "report" in task_text:
stages.append("output")
# Default to common stages if none identified
return stages if stages else common_stages[:min(5, len(tasks))]
def _select_tools_for_domain(self, domain: str) -> List[Tool]:
"""Select appropriate tools for a specific domain"""
domain_tools = {
"research": [self.common_tools["web_search"], self.common_tools["data_analyzer"]],
"development": [self.common_tools["code_executor"], self.common_tools["file_manager"]],
"data": [self.common_tools["data_analyzer"], self.common_tools["file_manager"]],
"communication": [self.common_tools["api_client"], self.common_tools["file_manager"]],
"file": [self.common_tools["file_manager"]]
}
return domain_tools.get(domain, [self.common_tools["api_client"]])
def _select_tools_for_stage(self, stage: str) -> List[Tool]:
"""Select appropriate tools for a pipeline stage"""
stage_tools = {
"input": [self.common_tools["api_client"], self.common_tools["file_manager"]],
"collection": [self.common_tools["web_search"], self.common_tools["api_client"]],
"process": [self.common_tools["code_executor"], self.common_tools["data_analyzer"]],
"processing": [self.common_tools["data_analyzer"], self.common_tools["code_executor"]],
"transform": [self.common_tools["data_analyzer"], self.common_tools["code_executor"]],
"analysis": [self.common_tools["data_analyzer"]],
"validate": [self.common_tools["data_analyzer"]],
"validation": [self.common_tools["data_analyzer"]],
"output": [self.common_tools["file_manager"], self.common_tools["api_client"]]
}
return stage_tools.get(stage, [self.common_tools["file_manager"]])
def _select_diverse_tools(self) -> List[Tool]:
"""Select a diverse set of tools for general purpose agents"""
return [
self.common_tools["file_manager"],
self.common_tools["code_executor"],
self.common_tools["data_analyzer"]
]
def design_communication_topology(self, agents: List[AgentDefinition], pattern: AgentArchitecturePattern) -> List[CommunicationLink]:
"""Design communication links between agents"""
links = []
if pattern == AgentArchitecturePattern.SINGLE_AGENT:
# No inter-agent communication needed
return []
elif pattern == AgentArchitecturePattern.SUPERVISOR:
supervisor = next(agent for agent in agents if agent.archetype == AgentRole.COORDINATOR)
specialists = [agent for agent in agents if agent.archetype == AgentRole.SPECIALIST]
for specialist in specialists:
# Bidirectional communication with supervisor
links.append(CommunicationLink(
from_agent=supervisor.name,
to_agent=specialist.name,
pattern=CommunicationPattern.DIRECT_MESSAGE,
data_format="json",
frequency="on_demand",
criticality="high"
))
links.append(CommunicationLink(
from_agent=specialist.name,
to_agent=supervisor.name,
pattern=CommunicationPattern.DIRECT_MESSAGE,
data_format="json",
frequency="on_completion",
criticality="high"
))
elif pattern == AgentArchitecturePattern.SWARM:
# All-to-all communication for swarm
for i, agent1 in enumerate(agents):
for j, agent2 in enumerate(agents):
if i != j:
links.append(CommunicationLink(
from_agent=agent1.name,
to_agent=agent2.name,
pattern=CommunicationPattern.EVENT_DRIVEN,
data_format="json",
frequency="periodic",
criticality="medium"
))
elif pattern == AgentArchitecturePattern.HIERARCHICAL:
# Hierarchical communication based on dependencies
for agent in agents:
if agent.dependencies:
for dependency in agent.dependencies:
links.append(CommunicationLink(
from_agent=dependency,
to_agent=agent.name,
pattern=CommunicationPattern.DIRECT_MESSAGE,
data_format="json",
frequency="scheduled",
criticality="high"
))
links.append(CommunicationLink(
from_agent=agent.name,
to_agent=dependency,
pattern=CommunicationPattern.DIRECT_MESSAGE,
data_format="json",
frequency="on_completion",
criticality="high"
))
elif pattern == AgentArchitecturePattern.PIPELINE:
# Sequential pipeline communication
for i in range(len(agents) - 1):
links.append(CommunicationLink(
from_agent=agents[i].name,
to_agent=agents[i + 1].name,
pattern=CommunicationPattern.MESSAGE_QUEUE,
data_format="json",
frequency="continuous",
criticality="high"
))
return links
def generate_mermaid_diagram(self, design: ArchitectureDesign) -> str:
"""Generate Mermaid diagram for the architecture"""
diagram = ["graph TD"]
# Add agent nodes
for agent in design.agents:
node_style = self._get_node_style(agent.archetype)
diagram.append(f" {agent.name}[{agent.role}]{node_style}")
# Add communication links
for link in design.communication_topology:
arrow_style = self._get_arrow_style(link.pattern, link.criticality)
diagram.append(f" {link.from_agent} {arrow_style} {link.to_agent}")
# Add styling
diagram.extend([
"",
" classDef coordinator fill:#e1f5fe,stroke:#01579b,stroke-width:2px",
" classDef specialist fill:#f3e5f5,stroke:#4a148c,stroke-width:2px",
" classDef interface fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px",
" classDef monitor fill:#fff3e0,stroke:#e65100,stroke-width:2px"
])
# Apply classes to nodes
for agent in design.agents:
class_name = agent.archetype.value
diagram.append(f" class {agent.name} {class_name}")
return "\n".join(diagram)
def _get_node_style(self, archetype: AgentRole) -> str:
"""Get node styling based on archetype"""
styles = {
AgentRole.COORDINATOR: ":::coordinator",
AgentRole.SPECIALIST: ":::specialist",
AgentRole.INTERFACE: ":::interface",
AgentRole.MONITOR: ":::monitor"
}
return styles.get(archetype, "")
def _get_arrow_style(self, pattern: CommunicationPattern, criticality: str) -> str:
"""Get arrow styling based on communication pattern and criticality"""
base_arrows = {
CommunicationPattern.DIRECT_MESSAGE: "-->",
CommunicationPattern.SHARED_STATE: "-.->",
CommunicationPattern.EVENT_DRIVEN: "===>",
CommunicationPattern.MESSAGE_QUEUE: "==="
}
arrow = base_arrows.get(pattern, "-->")
# Modify for criticality
if criticality == "high":
return arrow
elif criticality == "medium":
return arrow.replace("-", ".")
else:
return arrow.replace("-", ":")
def generate_implementation_roadmap(self, design: ArchitectureDesign, requirements: SystemRequirements) -> Dict[str, Any]:
"""Generate implementation roadmap"""
phases = []
# Phase 1: Core Infrastructure
phases.append({
"phase": 1,
"name": "Core Infrastructure",
"duration": "2-3 weeks",
"tasks": [
"Set up development environment",
"Implement basic agent framework",
"Create communication infrastructure",
"Set up monitoring and logging",
"Implement basic tools"
],
"deliverables": [
"Agent runtime framework",
"Communication layer",
"Basic monitoring dashboard"
]
})
# Phase 2: Agent Implementation
phases.append({
"phase": 2,
"name": "Agent Implementation",
"duration": "3-4 weeks",
"tasks": [
"Implement individual agent logic",
"Create agent-specific tools",
"Implement communication protocols",
"Add error handling and recovery",
"Create agent configuration system"
],
"deliverables": [
"Functional agent implementations",
"Tool integration",
"Configuration management"
]
})
# Phase 3: Integration and Testing
phases.append({
"phase": 3,
"name": "Integration and Testing",
"duration": "2-3 weeks",
"tasks": [
"Integrate all agents",
"End-to-end testing",
"Performance optimization",
"Security implementation",
"Documentation creation"
],
"deliverables": [
"Integrated system",
"Test suite",
"Performance benchmarks",
"Security audit report"
]
})
# Phase 4: Deployment and Monitoring
phases.append({
"phase": 4,
"name": "Deployment and Monitoring",
"duration": "1-2 weeks",
"tasks": [
"Production deployment",
"Monitoring setup",
"Alerting configuration",
"User training",
"Go-live support"
],
"deliverables": [
"Production system",
"Monitoring dashboard",
"Operational runbooks",
"Training materials"
]
})
return {
"total_duration": "8-12 weeks",
"phases": phases,
"critical_path": [
"Agent framework implementation",
"Communication layer development",
"Integration testing",
"Production deployment"
],
"risks": [
{
"risk": "Communication complexity",
"impact": "high",
"mitigation": "Start with simple protocols, iterate"
},
{
"risk": "Agent coordination failures",
"impact": "medium",
"mitigation": "Implement robust error handling and fallbacks"
},
{
"risk": "Performance bottlenecks",
"impact": "medium",
"mitigation": "Early performance testing and optimization"
}
],
"success_criteria": requirements.safety_requirements + [
"All agents operational",
"Communication working reliably",
"Performance targets met",
"Error rate below 1%"
]
}
def plan_system(self, requirements: SystemRequirements) -> Tuple[ArchitectureDesign, str, Dict[str, Any]]:
"""Main planning function"""
# Select architecture pattern
pattern = self.select_architecture_pattern(requirements)
# Design agents
agents = self.design_agents(requirements, pattern)
# Design communication topology
communication_topology = self.design_communication_topology(agents, pattern)
# Create complete design
design = ArchitectureDesign(
pattern=pattern,
agents=agents,
communication_topology=communication_topology,
shared_resources=[
{"type": "message_queue", "capacity": 1000},
{"type": "shared_memory", "size": "1GB"},
{"type": "event_store", "retention": "30 days"}
],
guardrails=[
{"type": "input_validation", "rules": "strict_schema_enforcement"},
{"type": "rate_limiting", "limit": "100_requests_per_minute"},
{"type": "output_filtering", "rules": "content_safety_check"}
],
scaling_strategy={
"horizontal_scaling": True,
"auto_scaling_triggers": ["cpu > 80%", "queue_depth > 100"],
"max_instances_per_agent": 5
},
failure_handling={
"retry_policy": "exponential_backoff",
"circuit_breaker": True,
"fallback_strategies": ["graceful_degradation", "human_escalation"]
}
)
# Generate Mermaid diagram
mermaid_diagram = self.generate_mermaid_diagram(design)
# Generate implementation roadmap
roadmap = self.generate_implementation_roadmap(design, requirements)
return design, mermaid_diagram, roadmap
def main():
parser = argparse.ArgumentParser(description="Multi-Agent System Architecture Planner")
parser.add_argument("input_file", help="JSON file with system requirements")
parser.add_argument("-o", "--output", help="Output file prefix (default: agent_architecture)")
parser.add_argument("--format", choices=["json", "yaml", "both"], default="both",
help="Output format")
args = parser.parse_args()
try:
# Load requirements
with open(args.input_file, 'r') as f:
requirements_data = json.load(f)
requirements = SystemRequirements(**requirements_data)
# Plan the system
planner = AgentPlanner()
design, mermaid_diagram, roadmap = planner.plan_system(requirements)
# Prepare output
output_data = {
"architecture_design": asdict(design),
"mermaid_diagram": mermaid_diagram,
"implementation_roadmap": roadmap,
"metadata": {
"generated_by": "agent_planner.py",
"requirements_file": args.input_file,
"architecture_pattern": design.pattern.value,
"agent_count": len(design.agents)
}
}
# Output files
output_prefix = args.output or "agent_architecture"
if args.format in ["json", "both"]:
with open(f"{output_prefix}.json", 'w') as f:
json.dump(output_data, f, indent=2, default=str)
print(f"JSON output written to {output_prefix}.json")
if args.format in ["both"]:
# Also create separate files for key components
with open(f"{output_prefix}_diagram.mmd", 'w') as f:
f.write(mermaid_diagram)
print(f"Mermaid diagram written to {output_prefix}_diagram.mmd")
with open(f"{output_prefix}_roadmap.json", 'w') as f:
json.dump(roadmap, f, indent=2)
print(f"Implementation roadmap written to {output_prefix}_roadmap.json")
# Print summary
print(f"\nArchitecture Summary:")
print(f"Pattern: {design.pattern.value}")
print(f"Agents: {len(design.agents)}")
print(f"Communication Links: {len(design.communication_topology)}")
print(f"Estimated Duration: {roadmap['total_duration']}")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main() {
"execution_logs": [
{
"task_id": "task_001",
"agent_id": "research_agent_1",
"task_type": "web_research",
"task_description": "Research recent developments in artificial intelligence",
"start_time": "2024-01-15T09:00:00Z",
"end_time": "2024-01-15T09:02:34Z",
"duration_ms": 154000,
"status": "success",
"actions": [
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 2300,
"success": true,
"parameters": {
"query": "artificial intelligence developments 2024",
"limit": 10
}
},
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 2100,
"success": true,
"parameters": {
"query": "machine learning breakthroughs recent",
"limit": 5
}
},
{
"type": "analysis",
"description": "Synthesize search results",
"duration_ms": 149600,
"success": true
}
],
"results": {
"summary": "Found 15 relevant sources covering recent AI developments including GPT-4 improvements, autonomous vehicle progress, and medical AI applications.",
"sources_found": 15,
"quality_score": 0.92
},
"tokens_used": {
"input_tokens": 1250,
"output_tokens": 2800,
"total_tokens": 4050
},
"cost_usd": 0.081,
"error_details": null,
"tools_used": ["web_search"],
"retry_count": 0,
"metadata": {
"user_id": "user_123",
"session_id": "session_abc",
"request_priority": "normal"
}
},
{
"task_id": "task_002",
"agent_id": "data_agent_1",
"task_type": "data_analysis",
"task_description": "Analyze sales performance data for Q4 2023",
"start_time": "2024-01-15T09:05:00Z",
"end_time": "2024-01-15T09:07:45Z",
"duration_ms": 165000,
"status": "success",
"actions": [
{
"type": "data_ingestion",
"description": "Load Q4 sales data",
"duration_ms": 5000,
"success": true
},
{
"type": "tool_call",
"tool_name": "data_analyzer",
"duration_ms": 155000,
"success": true,
"parameters": {
"analysis_type": "descriptive",
"target_column": "revenue"
}
},
{
"type": "visualization",
"description": "Generate charts and graphs",
"duration_ms": 5000,
"success": true
}
],
"results": {
"insights": [
"Revenue increased by 15% compared to Q3",
"December was the strongest month",
"Product category A led growth"
],
"charts_generated": 4,
"quality_score": 0.88
},
"tokens_used": {
"input_tokens": 3200,
"output_tokens": 1800,
"total_tokens": 5000
},
"cost_usd": 0.095,
"error_details": null,
"tools_used": ["data_analyzer"],
"retry_count": 0,
"metadata": {
"user_id": "user_456",
"session_id": "session_def",
"request_priority": "high"
}
},
{
"task_id": "task_003",
"agent_id": "document_agent_1",
"task_type": "document_processing",
"task_description": "Extract key information from research paper PDF",
"start_time": "2024-01-15T09:10:00Z",
"end_time": "2024-01-15T09:12:20Z",
"duration_ms": 140000,
"status": "partial",
"actions": [
{
"type": "tool_call",
"tool_name": "document_processor",
"duration_ms": 135000,
"success": true,
"parameters": {
"document_url": "https://example.com/research.pdf",
"processing_mode": "key_points"
}
},
{
"type": "validation",
"description": "Validate extracted content",
"duration_ms": 5000,
"success": false,
"error": "Content validation failed - missing abstract"
}
],
"results": {
"extracted_content": "Partial content extracted successfully",
"pages_processed": 12,
"validation_issues": ["Missing abstract section"],
"quality_score": 0.65
},
"tokens_used": {
"input_tokens": 5400,
"output_tokens": 3200,
"total_tokens": 8600
},
"cost_usd": 0.172,
"error_details": {
"error_type": "validation_error",
"error_message": "Document structure validation failed",
"affected_section": "abstract"
},
"tools_used": ["document_processor"],
"retry_count": 1,
"metadata": {
"user_id": "user_789",
"session_id": "session_ghi",
"request_priority": "normal"
}
},
{
"task_id": "task_004",
"agent_id": "communication_agent_1",
"task_type": "notification",
"task_description": "Send completion notification to project stakeholders",
"start_time": "2024-01-15T09:15:00Z",
"end_time": "2024-01-15T09:15:08Z",
"duration_ms": 8000,
"status": "success",
"actions": [
{
"type": "tool_call",
"tool_name": "notification_sender",
"duration_ms": 7500,
"success": true,
"parameters": {
"recipients": ["[email protected]", "[email protected]"],
"message": "Project analysis completed successfully",
"channel": "email"
}
}
],
"results": {
"notifications_sent": 2,
"delivery_confirmations": 2,
"quality_score": 1.0
},
"tokens_used": {
"input_tokens": 200,
"output_tokens": 150,
"total_tokens": 350
},
"cost_usd": 0.007,
"error_details": null,
"tools_used": ["notification_sender"],
"retry_count": 0,
"metadata": {
"user_id": "system",
"session_id": "session_jkl",
"request_priority": "normal"
}
},
{
"task_id": "task_005",
"agent_id": "research_agent_2",
"task_type": "web_research",
"task_description": "Research competitive landscape analysis",
"start_time": "2024-01-15T09:20:00Z",
"end_time": "2024-01-15T09:25:30Z",
"duration_ms": 330000,
"status": "failure",
"actions": [
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 2800,
"success": true,
"parameters": {
"query": "competitive analysis software industry",
"limit": 15
}
},
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 30000,
"success": false,
"error": "Rate limit exceeded"
},
{
"type": "retry",
"description": "Wait and retry search",
"duration_ms": 60000,
"success": false
},
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 30000,
"success": false,
"error": "Service timeout"
}
],
"results": {
"partial_results": "Initial search completed, subsequent searches failed",
"sources_found": 8,
"quality_score": 0.3
},
"tokens_used": {
"input_tokens": 800,
"output_tokens": 400,
"total_tokens": 1200
},
"cost_usd": 0.024,
"error_details": {
"error_type": "service_timeout",
"error_message": "Web search service exceeded timeout limit",
"retry_attempts": 2
},
"tools_used": ["web_search"],
"retry_count": 2,
"metadata": {
"user_id": "user_101",
"session_id": "session_mno",
"request_priority": "high"
}
},
{
"task_id": "task_006",
"agent_id": "scheduler_agent_1",
"task_type": "task_scheduling",
"task_description": "Schedule weekly report generation",
"start_time": "2024-01-15T09:30:00Z",
"end_time": "2024-01-15T09:30:15Z",
"duration_ms": 15000,
"status": "success",
"actions": [
{
"type": "tool_call",
"tool_name": "task_scheduler",
"duration_ms": 12000,
"success": true,
"parameters": {
"task_definition": {
"action": "generate_report",
"parameters": {"report_type": "weekly_summary"}
},
"schedule": {
"type": "recurring",
"recurrence_pattern": "weekly"
}
}
},
{
"type": "validation",
"description": "Verify schedule creation",
"duration_ms": 3000,
"success": true
}
],
"results": {
"task_scheduled": true,
"next_execution": "2024-01-22T09:30:00Z",
"schedule_id": "sched_789",
"quality_score": 1.0
},
"tokens_used": {
"input_tokens": 300,
"output_tokens": 200,
"total_tokens": 500
},
"cost_usd": 0.01,
"error_details": null,
"tools_used": ["task_scheduler"],
"retry_count": 0,
"metadata": {
"user_id": "user_202",
"session_id": "session_pqr",
"request_priority": "low"
}
},
{
"task_id": "task_007",
"agent_id": "data_agent_2",
"task_type": "data_analysis",
"task_description": "Analyze customer satisfaction survey results",
"start_time": "2024-01-15T10:00:00Z",
"end_time": "2024-01-15T10:04:25Z",
"duration_ms": 265000,
"status": "timeout",
"actions": [
{
"type": "data_ingestion",
"description": "Load survey response data",
"duration_ms": 15000,
"success": true
},
{
"type": "tool_call",
"tool_name": "data_analyzer",
"duration_ms": 250000,
"success": false,
"error": "Operation timeout after 250 seconds"
}
],
"results": {
"partial_analysis": "Data loaded but analysis incomplete",
"records_processed": 5000,
"total_records": 15000,
"quality_score": 0.2
},
"tokens_used": {
"input_tokens": 8000,
"output_tokens": 1000,
"total_tokens": 9000
},
"cost_usd": 0.18,
"error_details": {
"error_type": "timeout",
"error_message": "Data analysis operation exceeded maximum allowed time",
"timeout_limit_ms": 250000
},
"tools_used": ["data_analyzer"],
"retry_count": 0,
"metadata": {
"user_id": "user_303",
"session_id": "session_stu",
"request_priority": "normal"
}
},
{
"task_id": "task_008",
"agent_id": "research_agent_1",
"task_type": "web_research",
"task_description": "Research industry best practices for remote work",
"start_time": "2024-01-15T10:30:00Z",
"end_time": "2024-01-15T10:33:15Z",
"duration_ms": 195000,
"status": "success",
"actions": [
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 2200,
"success": true,
"parameters": {
"query": "remote work best practices 2024",
"limit": 12
}
},
{
"type": "tool_call",
"tool_name": "web_search",
"duration_ms": 2400,
"success": true,
"parameters": {
"query": "hybrid work policies companies",
"limit": 8
}
},
{
"type": "content_synthesis",
"description": "Synthesize findings from multiple sources",
"duration_ms": 190400,
"success": true
}
],
"results": {
"comprehensive_report": "Detailed analysis of remote work best practices with industry examples",
"sources_analyzed": 20,
"key_insights": 8,
"quality_score": 0.94
},
"tokens_used": {
"input_tokens": 2800,
"output_tokens": 4200,
"total_tokens": 7000
},
"cost_usd": 0.14,
"error_details": null,
"tools_used": ["web_search"],
"retry_count": 0,
"metadata": {
"user_id": "user_404",
"session_id": "session_vwx",
"request_priority": "normal"
}
},
{
"task_id": "task_009",
"agent_id": "document_agent_2",
"task_type": "document_processing",
"task_description": "Process and summarize quarterly financial report",
"start_time": "2024-01-15T11:00:00Z",
"end_time": "2024-01-15T11:02:30Z",
"duration_ms": 150000,
"status": "success",
"actions": [
{
"type": "tool_call",
"tool_name": "document_processor",
"duration_ms": 145000,
"success": true,
"parameters": {
"document_url": "https://example.com/q4-financial-report.pdf",
"processing_mode": "summary",
"output_format": "json"
}
},
{
"type": "quality_check",
"description": "Validate summary completeness",
"duration_ms": 5000,
"success": true
}
],
"results": {
"executive_summary": "Q4 revenue grew 12% YoY with strong performance in all segments",
"key_metrics_extracted": 15,
"summary_length": 500,
"quality_score": 0.91
},
"tokens_used": {
"input_tokens": 6500,
"output_tokens": 2200,
"total_tokens": 8700
},
"cost_usd": 0.174,
"error_details": null,
"tools_used": ["document_processor"],
"retry_count": 0,
"metadata": {
"user_id": "user_505",
"session_id": "session_yzA",
"request_priority": "high"
}
},
{
"task_id": "task_010",
"agent_id": "communication_agent_2",
"task_type": "notification",
"task_description": "Send urgent system maintenance notification",
"start_time": "2024-01-15T11:30:00Z",
"end_time": "2024-01-15T11:30:45Z",
"duration_ms": 45000,
"status": "failure",
"actions": [
{
"type": "tool_call",
"tool_name": "notification_sender",
"duration_ms": 30000,
"success": false,
"error": "Authentication failed - invalid API key",
"parameters": {
"recipients": ["[email protected]"],
"message": "Scheduled maintenance tonight 11 PM - 2 AM",
"channel": "email",
"priority": "urgent"
}
},
{
"type": "retry",
"description": "Retry with backup credentials",
"duration_ms": 15000,
"success": false,
"error": "Backup authentication also failed"
}
],
"results": {
"notifications_sent": 0,
"delivery_failures": 1,
"quality_score": 0.0
},
"tokens_used": {
"input_tokens": 150,
"output_tokens": 50,
"total_tokens": 200
},
"cost_usd": 0.004,
"error_details": {
"error_type": "authentication_error",
"error_message": "Failed to authenticate with notification service",
"retry_attempts": 1
},
"tools_used": ["notification_sender"],
"retry_count": 1,
"metadata": {
"user_id": "system",
"session_id": "session_BcD",
"request_priority": "urgent"
}
}
]
} {
"goal": "Build a comprehensive research and analysis platform that can gather information from multiple sources, analyze data, and generate detailed reports",
"description": "The system needs to handle complex research tasks involving web searches, data analysis, document processing, and collaborative report generation. It should be able to coordinate multiple specialists working in parallel while maintaining quality control and ensuring comprehensive coverage of research topics.",
"tasks": [
"Conduct multi-source web research on specified topics",
"Analyze and synthesize information from various sources",
"Perform data processing and statistical analysis",
"Generate visualizations and charts from data",
"Create comprehensive written reports",
"Fact-check and validate information accuracy",
"Coordinate parallel research streams",
"Handle real-time information updates",
"Manage research project timelines",
"Provide interactive research assistance"
],
"constraints": {
"max_response_time": 30000,
"budget_per_task": 1.0,
"quality_threshold": 0.9,
"concurrent_tasks": 10,
"data_retention_days": 90,
"security_level": "standard",
"compliance_requirements": ["GDPR", "data_minimization"]
},
"team_size": 6,
"performance_requirements": {
"high_throughput": true,
"fault_tolerance": true,
"low_latency": false,
"scalability": "medium",
"availability": 0.99
},
"safety_requirements": [
"Input validation and sanitization",
"Output content filtering",
"Rate limiting for external APIs",
"Error handling and graceful degradation",
"Human oversight for critical decisions",
"Audit logging for all operations"
],
"integration_requirements": [
"REST API endpoints for external systems",
"Webhook support for real-time updates",
"Database integration for data persistence",
"File storage for documents and media",
"Email notifications for important events",
"Dashboard for monitoring and control"
],
"scale_requirements": {
"initial_users": 50,
"peak_concurrent_users": 200,
"data_volume_gb": 100,
"requests_per_hour": 1000,
"geographic_regions": ["US", "EU"],
"growth_projection": "50% per year"
}
} {
"tools": [
{
"name": "web_search",
"purpose": "Search the web for information on specified topics with customizable filters and result limits",
"category": "search",
"inputs": [
{
"name": "query",
"type": "string",
"description": "Search query string to find relevant information",
"required": true,
"min_length": 1,
"max_length": 500,
"examples": ["artificial intelligence trends", "climate change impact", "python programming tutorial"]
},
{
"name": "limit",
"type": "integer",
"description": "Maximum number of search results to return",
"required": false,
"default": 10,
"minimum": 1,
"maximum": 100
},
{
"name": "language",
"type": "string",
"description": "Language code for search results",
"required": false,
"default": "en",
"enum": ["en", "es", "fr", "de", "it", "pt", "zh", "ja"]
},
{
"name": "time_range",
"type": "string",
"description": "Time range filter for search results",
"required": false,
"enum": ["any", "day", "week", "month", "year"]
}
],
"outputs": [
{
"name": "results",
"type": "array",
"description": "Array of search result objects",
"items": {
"type": "object",
"properties": {
"title": {"type": "string"},
"url": {"type": "string"},
"snippet": {"type": "string"},
"relevance_score": {"type": "number"}
}
}
},
{
"name": "total_found",
"type": "integer",
"description": "Total number of results available"
}
],
"error_conditions": [
"Invalid query format",
"Network timeout",
"API rate limit exceeded",
"No results found",
"Service unavailable"
],
"side_effects": [
"Logs search query for analytics",
"May cache results temporarily"
],
"idempotent": true,
"rate_limits": {
"requests_per_minute": 60,
"requests_per_hour": 1000,
"burst_limit": 10
},
"dependencies": [
"search_api_service",
"content_filter_service"
],
"examples": [
{
"description": "Basic web search",
"input": {
"query": "machine learning algorithms",
"limit": 5
},
"expected_output": {
"results": [
{
"title": "Introduction to Machine Learning Algorithms",
"url": "https://example.com/ml-intro",
"snippet": "Machine learning algorithms are computational methods...",
"relevance_score": 0.95
}
],
"total_found": 1250
}
}
],
"security_requirements": [
"Query sanitization",
"Rate limiting by user",
"Content filtering"
]
},
{
"name": "data_analyzer",
"purpose": "Analyze structured data and generate statistical insights, trends, and visualizations",
"category": "data",
"inputs": [
{
"name": "data",
"type": "object",
"description": "Structured data to analyze in JSON format",
"required": true,
"properties": {
"columns": {"type": "array"},
"rows": {"type": "array"}
}
},
{
"name": "analysis_type",
"type": "string",
"description": "Type of analysis to perform",
"required": true,
"enum": ["descriptive", "correlation", "trend", "distribution", "outlier_detection"]
},
{
"name": "target_column",
"type": "string",
"description": "Primary column to focus analysis on",
"required": false
},
{
"name": "include_visualization",
"type": "boolean",
"description": "Whether to generate visualization data",
"required": false,
"default": true
}
],
"outputs": [
{
"name": "insights",
"type": "array",
"description": "Array of analytical insights and findings"
},
{
"name": "statistics",
"type": "object",
"description": "Statistical measures and metrics"
},
{
"name": "visualization_data",
"type": "object",
"description": "Data formatted for visualization creation"
}
],
"error_conditions": [
"Invalid data format",
"Insufficient data points",
"Missing required columns",
"Data type mismatch",
"Analysis timeout"
],
"side_effects": [
"May create temporary analysis files",
"Logs analysis parameters for optimization"
],
"idempotent": true,
"rate_limits": {
"requests_per_minute": 30,
"requests_per_hour": 500,
"burst_limit": 5
},
"dependencies": [
"statistics_engine",
"visualization_service"
],
"examples": [
{
"description": "Basic descriptive analysis",
"input": {
"data": {
"columns": ["age", "salary", "department"],
"rows": [
[25, 50000, "engineering"],
[30, 60000, "engineering"],
[28, 55000, "marketing"]
]
},
"analysis_type": "descriptive",
"target_column": "salary"
},
"expected_output": {
"insights": [
"Average salary is $55,000",
"Salary range: $50,000 - $60,000",
"Engineering department has higher average salary"
],
"statistics": {
"mean": 55000,
"median": 55000,
"std_dev": 5000
}
}
}
],
"security_requirements": [
"Data anonymization",
"Access control validation"
]
},
{
"name": "document_processor",
"purpose": "Process and extract information from various document formats including PDFs, Word docs, and plain text",
"category": "file",
"inputs": [
{
"name": "document_url",
"type": "string",
"description": "URL or path to the document to process",
"required": true,
"pattern": "^(https?://|file://|/)"
},
{
"name": "processing_mode",
"type": "string",
"description": "How to process the document",
"required": false,
"default": "full_text",
"enum": ["full_text", "summary", "key_points", "metadata_only"]
},
{
"name": "output_format",
"type": "string",
"description": "Desired output format",
"required": false,
"default": "json",
"enum": ["json", "markdown", "plain_text"]
},
{
"name": "language_detection",
"type": "boolean",
"description": "Whether to detect document language",
"required": false,
"default": true
}
],
"outputs": [
{
"name": "content",
"type": "string",
"description": "Extracted and processed document content"
},
{
"name": "metadata",
"type": "object",
"description": "Document metadata including author, creation date, etc."
},
{
"name": "language",
"type": "string",
"description": "Detected language of the document"
},
{
"name": "word_count",
"type": "integer",
"description": "Total word count in the document"
}
],
"error_conditions": [
"Document not found",
"Unsupported file format",
"Document corrupted or unreadable",
"Access permission denied",
"Document too large"
],
"side_effects": [
"May download and cache documents temporarily",
"Creates processing logs for debugging"
],
"idempotent": true,
"rate_limits": {
"requests_per_minute": 20,
"requests_per_hour": 300,
"burst_limit": 3
},
"dependencies": [
"document_parser_service",
"language_detection_service",
"file_storage_service"
],
"examples": [
{
"description": "Process PDF document for full text extraction",
"input": {
"document_url": "https://example.com/research-paper.pdf",
"processing_mode": "full_text",
"output_format": "markdown"
},
"expected_output": {
"content": "# Research Paper Title\n\nAbstract: This paper discusses...",
"metadata": {
"author": "Dr. Smith",
"creation_date": "2024-01-15",
"pages": 15
},
"language": "en",
"word_count": 3500
}
}
],
"security_requirements": [
"URL validation",
"File type verification",
"Malware scanning",
"Access control enforcement"
]
},
{
"name": "notification_sender",
"purpose": "Send notifications via multiple channels including email, SMS, and webhooks",
"category": "communication",
"inputs": [
{
"name": "recipients",
"type": "array",
"description": "List of recipient identifiers",
"required": true,
"min_items": 1,
"max_items": 100,
"items": {
"type": "string",
"pattern": "^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$|^\\+?[1-9]\\d{1,14}$"
}
},
{
"name": "message",
"type": "string",
"description": "Message content to send",
"required": true,
"min_length": 1,
"max_length": 10000
},
{
"name": "channel",
"type": "string",
"description": "Communication channel to use",
"required": false,
"default": "email",
"enum": ["email", "sms", "webhook", "push"]
},
{
"name": "priority",
"type": "string",
"description": "Message priority level",
"required": false,
"default": "normal",
"enum": ["low", "normal", "high", "urgent"]
},
{
"name": "template_id",
"type": "string",
"description": "Optional template ID for formatting",
"required": false
}
],
"outputs": [
{
"name": "delivery_status",
"type": "object",
"description": "Status of message delivery to each recipient"
},
{
"name": "message_id",
"type": "string",
"description": "Unique identifier for the sent message"
},
{
"name": "delivery_timestamp",
"type": "string",
"description": "ISO timestamp when message was sent"
}
],
"error_conditions": [
"Invalid recipient format",
"Message too long",
"Channel service unavailable",
"Authentication failure",
"Rate limit exceeded for channel"
],
"side_effects": [
"Sends actual notifications to recipients",
"Logs delivery attempts and results",
"Updates delivery statistics"
],
"idempotent": false,
"rate_limits": {
"requests_per_minute": 100,
"requests_per_hour": 2000,
"burst_limit": 20
},
"dependencies": [
"email_service",
"sms_service",
"webhook_service"
],
"examples": [
{
"description": "Send email notification",
"input": {
"recipients": ["[email protected]"],
"message": "Your report has been completed and is ready for review.",
"channel": "email",
"priority": "normal"
},
"expected_output": {
"delivery_status": {
"[email protected]": "delivered"
},
"message_id": "msg_12345",
"delivery_timestamp": "2024-01-15T10:30:00Z"
}
}
],
"security_requirements": [
"Recipient validation",
"Message content filtering",
"Rate limiting per user",
"Delivery confirmation"
]
},
{
"name": "task_scheduler",
"purpose": "Schedule and manage delayed or recurring tasks within the agent system",
"category": "compute",
"inputs": [
{
"name": "task_definition",
"type": "object",
"description": "Definition of the task to be scheduled",
"required": true,
"properties": {
"action": {"type": "string"},
"parameters": {"type": "object"},
"retry_policy": {"type": "object"}
}
},
{
"name": "schedule",
"type": "object",
"description": "Scheduling parameters for the task",
"required": true,
"properties": {
"type": {"type": "string", "enum": ["once", "recurring"]},
"execute_at": {"type": "string"},
"recurrence_pattern": {"type": "string"}
}
},
{
"name": "priority",
"type": "integer",
"description": "Task priority (1-10, higher is more urgent)",
"required": false,
"default": 5,
"minimum": 1,
"maximum": 10
}
],
"outputs": [
{
"name": "task_id",
"type": "string",
"description": "Unique identifier for the scheduled task"
},
{
"name": "next_execution",
"type": "string",
"description": "ISO timestamp of next scheduled execution"
},
{
"name": "status",
"type": "string",
"description": "Current status of the scheduled task"
}
],
"error_conditions": [
"Invalid schedule format",
"Past execution time specified",
"Task queue full",
"Invalid task definition",
"Scheduling service unavailable"
],
"side_effects": [
"Creates scheduled tasks in the system",
"May consume system resources for task storage",
"Updates scheduling metrics"
],
"idempotent": false,
"rate_limits": {
"requests_per_minute": 50,
"requests_per_hour": 1000,
"burst_limit": 10
},
"dependencies": [
"task_scheduler_service",
"task_executor_service"
],
"examples": [
{
"description": "Schedule a one-time report generation",
"input": {
"task_definition": {
"action": "generate_report",
"parameters": {
"report_type": "monthly_summary",
"recipients": ["[email protected]"]
}
},
"schedule": {
"type": "once",
"execute_at": "2024-02-01T09:00:00Z"
},
"priority": 7
},
"expected_output": {
"task_id": "task_67890",
"next_execution": "2024-02-01T09:00:00Z",
"status": "scheduled"
}
}
],
"security_requirements": [
"Task definition validation",
"User authorization for scheduling",
"Resource limit enforcement"
]
}
]
} {
"architecture_design": {
"pattern": "supervisor",
"agents": [
{
"name": "supervisor_agent",
"role": "Task Coordinator and Quality Controller",
"archetype": "coordinator",
"responsibilities": [
"task_decomposition",
"delegation",
"progress_monitoring",
"quality_assurance",
"result_aggregation"
],
"capabilities": [
"planning",
"coordination",
"evaluation",
"decision_making"
],
"tools": [
{
"name": "file_manager",
"description": "Manage files and directories",
"input_schema": {
"type": "object",
"properties": {
"action": {
"type": "string"
},
"path": {
"type": "string"
}
}
},
"output_schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean"
},
"content": {
"type": "string"
}
}
},
"capabilities": [
"file_operations",
"data_management"
],
"reliability": "high",
"latency": "low"
},
{
"name": "data_analyzer",
"description": "Analyze and process data",
"input_schema": {
"type": "object",
"properties": {
"data": {
"type": "object"
},
"analysis_type": {
"type": "string"
}
}
},
"output_schema": {
"type": "object",
"properties": {
"insights": {
"type": "array"
},
"metrics": {
"type": "object"
}
}
},
"capabilities": [
"data_analysis",
"statistics",
"visualization"
],
"reliability": "high",
"latency": "medium"
}
],
"communication_interfaces": [
"user_interface",
"agent_messaging"
],
"constraints": {
"max_concurrent_supervisions": 5,
"decision_timeout": "30s"
},
"success_criteria": [
"successful task completion",
"optimal resource utilization",
"quality standards met"
],
"dependencies": []
},
{
"name": "research_specialist",
"role": "Research Specialist",
"archetype": "specialist",
"responsibilities": [
"Conduct multi-source web research on specified topics",
"Handle real-time information updates"
],
"capabilities": [
"research_expertise",
"specialized_tools",
"domain_knowledge"
],
"tools": [
{
"name": "web_search",
"description": "Search the web for information",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string"
}
}
},
"output_schema": {
"type": "object",
"properties": {
"results": {
"type": "array"
}
}
},
"capabilities": [
"research",
"information_gathering"
],
"reliability": "high",
"latency": "medium"
},
{
"name": "data_analyzer",
"description": "Analyze and process data",
"input_schema": {
"type": "object",
"properties": {
"data": {
"type": "object"
},
"analysis_type": {
"type": "string"
}
}
},
"output_schema": {
"type": "object",
"properties": {
"insights": {
"type": "array"
},
"metrics": {
"type": "object"
}
}
},
"capabilities": [
"data_analysis",
"statistics",
"visualization"
],
"reliability": "high",
"latency": "medium"
}
],
"communication_interfaces": [
"supervisor_messaging"
],
"constraints": {
"domain_scope": "research",
"task_queue_size": 10
},
"success_criteria": [
"excel in research tasks",
"maintain domain expertise",
"provide quality output"
],
"dependencies": [
"supervisor_agent"
]
},
{
"name": "data_specialist",
"role": "Data Specialist",
"archetype": "specialist",
"responsibilities": [
"Analyze and synthesize information from various sources",
"Perform data processing and statistical analysis",
"Generate visualizations and charts from data"
],
"capabilities": [
"data_expertise",
"specialized_tools",
"domain_knowledge"
],
"tools": [
{
"name": "data_analyzer",
"description": "Analyze and process data",
"input_schema": {
"type": "object",
"properties": {
"data": {
"type": "object"
},
"analysis_type": {
"type": "string"
}
}
},
"output_schema": {
"type": "object",
"properties": {
"insights": {
"type": "array"
},
"metrics": {
"type": "object"
}
}
},
"capabilities": [
"data_analysis",
"statistics",
"visualization"
],
"reliability": "high",
"latency": "medium"
},
{
"name": "file_manager",
"description": "Manage files and directories",
"input_schema": {
"type": "object",
"properties": {
"action": {
"type": "string"
},
"path": {
"type": "string"
}
}
},
"output_schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean"
},
"content": {
"type": "string"
}
}
},
"capabilities": [
"file_operations",
"data_management"
],
"reliability": "high",
"latency": "low"
}
],
"communication_interfaces": [
"supervisor_messaging"
],
"constraints": {
"domain_scope": "data",
"task_queue_size": 10
},
"success_criteria": [
"excel in data tasks",
"maintain domain expertise",
"provide quality output"
],
"dependencies": [
"supervisor_agent"
]
}
],
"communication_topology": [
{
"from_agent": "supervisor_agent",
"to_agent": "research_specialist",
"pattern": "direct_message",
"data_format": "json",
"frequency": "on_demand",
"criticality": "high"
},
{
"from_agent": "research_specialist",
"to_agent": "supervisor_agent",
"pattern": "direct_message",
"data_format": "json",
"frequency": "on_completion",
"criticality": "high"
},
{
"from_agent": "supervisor_agent",
"to_agent": "data_specialist",
"pattern": "direct_message",
"data_format": "json",
"frequency": "on_demand",
"criticality": "high"
},
{
"from_agent": "data_specialist",
"to_agent": "supervisor_agent",
"pattern": "direct_message",
"data_format": "json",
"frequency": "on_completion",
"criticality": "high"
}
],
"shared_resources": [
{
"type": "message_queue",
"capacity": 1000
},
{
"type": "shared_memory",
"size": "1GB"
},
{
"type": "event_store",
"retention": "30 days"
}
],
"guardrails": [
{
"type": "input_validation",
"rules": "strict_schema_enforcement"
},
{
"type": "rate_limiting",
"limit": "100_requests_per_minute"
},
{
"type": "output_filtering",
"rules": "content_safety_check"
}
],
"scaling_strategy": {
"horizontal_scaling": true,
"auto_scaling_triggers": [
"cpu > 80%",
"queue_depth > 100"
],
"max_instances_per_agent": 5
},
"failure_handling": {
"retry_policy": "exponential_backoff",
"circuit_breaker": true,
"fallback_strategies": [
"graceful_degradation",
"human_escalation"
]
}
},
"mermaid_diagram": "graph TD\n supervisor_agent[Task Coordinator and Quality Controller]:::coordinator\n research_specialist[Research Specialist]:::specialist\n data_specialist[Data Specialist]:::specialist\n supervisor_agent --> research_specialist\n research_specialist --> supervisor_agent\n supervisor_agent --> data_specialist\n data_specialist --> supervisor_agent\n\n classDef coordinator fill:#e1f5fe,stroke:#01579b,stroke-width:2px\n classDef specialist fill:#f3e5f5,stroke:#4a148c,stroke-width:2px\n classDef interface fill:#e8f5e8,stroke:#1b5e20,stroke-width:2px\n classDef monitor fill:#fff3e0,stroke:#e65100,stroke-width:2px\n class supervisor_agent coordinator\n class research_specialist specialist\n class data_specialist specialist",
"implementation_roadmap": {
"total_duration": "8-12 weeks",
"phases": [
{
"phase": 1,
"name": "Core Infrastructure",
"duration": "2-3 weeks",
"tasks": [
"Set up development environment",
"Implement basic agent framework",
"Create communication infrastructure",
"Set up monitoring and logging",
"Implement basic tools"
],
"deliverables": [
"Agent runtime framework",
"Communication layer",
"Basic monitoring dashboard"
]
},
{
"phase": 2,
"name": "Agent Implementation",
"duration": "3-4 weeks",
"tasks": [
"Implement individual agent logic",
"Create agent-specific tools",
"Implement communication protocols",
"Add error handling and recovery",
"Create agent configuration system"
],
"deliverables": [
"Functional agent implementations",
"Tool integration",
"Configuration management"
]
},
{
"phase": 3,
"name": "Integration and Testing",
"duration": "2-3 weeks",
"tasks": [
"Integrate all agents",
"End-to-end testing",
"Performance optimization",
"Security implementation",
"Documentation creation"
],
"deliverables": [
"Integrated system",
"Test suite",
"Performance benchmarks",
"Security audit report"
]
},
{
"phase": 4,
"name": "Deployment and Monitoring",
"duration": "1-2 weeks",
"tasks": [
"Production deployment",
"Monitoring setup",
"Alerting configuration",
"User training",
"Go-live support"
],
"deliverables": [
"Production system",
"Monitoring dashboard",
"Operational runbooks",
"Training materials"
]
}
],
"critical_path": [
"Agent framework implementation",
"Communication layer development",
"Integration testing",
"Production deployment"
],
"risks": [
{
"risk": "Communication complexity",
"impact": "high",
"mitigation": "Start with simple protocols, iterate"
},
{
"risk": "Agent coordination failures",
"impact": "medium",
"mitigation": "Implement robust error handling and fallbacks"
},
{
"risk": "Performance bottlenecks",
"impact": "medium",
"mitigation": "Early performance testing and optimization"
}
],
"success_criteria": [
"Input validation and sanitization",
"Output content filtering",
"Rate limiting for external APIs",
"Error handling and graceful degradation",
"Human oversight for critical decisions",
"Audit logging for all operations",
"All agents operational",
"Communication working reliably",
"Performance targets met",
"Error rate below 1%"
]
},
"metadata": {
"generated_by": "agent_planner.py",
"requirements_file": "sample_system_requirements.json",
"architecture_pattern": "supervisor",
"agent_count": 3
}
} {
"summary": {
"evaluation_period": {
"start_time": "2024-01-15T09:00:00Z",
"end_time": "2024-01-15T11:30:45Z",
"total_duration_hours": 2.51
},
"overall_health": "good",
"key_findings": [
"Success rate (80.0%) below target",
"High average latency (16.9s)",
"2 high-impact error patterns identified"
],
"critical_issues": 0,
"improvement_opportunities": 6
},
"system_metrics": {
"total_tasks": 10,
"successful_tasks": 8,
"failed_tasks": 2,
"partial_tasks": 1,
"timeout_tasks": 1,
"success_rate": 0.8,
"failure_rate": 0.2,
"average_duration_ms": 169800.0,
"median_duration_ms": 152500.0,
"percentile_95_duration_ms": 330000.0,
"min_duration_ms": 8000,
"max_duration_ms": 330000,
"total_tokens_used": 53700,
"average_tokens_per_task": 5370.0,
"total_cost_usd": 1.074,
"average_cost_per_task": 0.1074,
"cost_per_token": 0.00002,
"throughput_tasks_per_hour": 3.98,
"error_rate": 0.3,
"retry_rate": 0.3
},
"agent_metrics": {
"research_agent_1": {
"total_tasks": 2,
"successful_tasks": 2,
"failed_tasks": 0,
"partial_tasks": 0,
"timeout_tasks": 0,
"success_rate": 1.0,
"failure_rate": 0.0,
"average_duration_ms": 174500.0,
"median_duration_ms": 174500.0,
"percentile_95_duration_ms": 195000.0,
"min_duration_ms": 154000,
"max_duration_ms": 195000,
"total_tokens_used": 11050,
"average_tokens_per_task": 5525.0,
"total_cost_usd": 0.221,
"average_cost_per_task": 0.1105,
"cost_per_token": 0.00002,
"throughput_tasks_per_hour": 11.49,
"error_rate": 0.0,
"retry_rate": 0.0
},
"data_agent_1": {
"total_tasks": 1,
"successful_tasks": 1,
"failed_tasks": 0,
"partial_tasks": 0,
"timeout_tasks": 0,
"success_rate": 1.0,
"failure_rate": 0.0,
"average_duration_ms": 165000.0,
"median_duration_ms": 165000.0,
"percentile_95_duration_ms": 165000.0,
"min_duration_ms": 165000,
"max_duration_ms": 165000,
"total_tokens_used": 5000,
"average_tokens_per_task": 5000.0,
"total_cost_usd": 0.095,
"average_cost_per_task": 0.095,
"cost_per_token": 0.000019,
"throughput_tasks_per_hour": 21.82,
"error_rate": 0.0,
"retry_rate": 0.0
},
"document_agent_1": {
"total_tasks": 1,
"successful_tasks": 0,
"failed_tasks": 0,
"partial_tasks": 1,
"timeout_tasks": 0,
"success_rate": 0.0,
"failure_rate": 0.0,
"average_duration_ms": 140000.0,
"median_duration_ms": 140000.0,
"percentile_95_duration_ms": 140000.0,
"min_duration_ms": 140000,
"max_duration_ms": 140000,
"total_tokens_used": 8600,
"average_tokens_per_task": 8600.0,
"total_cost_usd": 0.172,
"average_cost_per_task": 0.172,
"cost_per_token": 0.00002,
"throughput_tasks_per_hour": 25.71,
"error_rate": 1.0,
"retry_rate": 1.0
}
},
"task_type_metrics": {
"web_research": {
"total_tasks": 3,
"successful_tasks": 2,
"failed_tasks": 1,
"partial_tasks": 0,
"timeout_tasks": 0,
"success_rate": 0.667,
"failure_rate": 0.333,
"average_duration_ms": 226333.33,
"median_duration_ms": 195000.0,
"percentile_95_duration_ms": 330000.0,
"min_duration_ms": 154000,
"max_duration_ms": 330000,
"total_tokens_used": 12250,
"average_tokens_per_task": 4083.33,
"total_cost_usd": 0.245,
"average_cost_per_task": 0.082,
"cost_per_token": 0.00002,
"throughput_tasks_per_hour": 2.65,
"error_rate": 0.333,
"retry_rate": 0.333
},
"data_analysis": {
"total_tasks": 2,
"successful_tasks": 1,
"failed_tasks": 0,
"partial_tasks": 0,
"timeout_tasks": 1,
"success_rate": 0.5,
"failure_rate": 0.0,
"average_duration_ms": 215000.0,
"median_duration_ms": 215000.0,
"percentile_95_duration_ms": 265000.0,
"min_duration_ms": 165000,
"max_duration_ms": 265000,
"total_tokens_used": 14000,
"average_tokens_per_task": 7000.0,
"total_cost_usd": 0.275,
"average_cost_per_task": 0.138,
"cost_per_token": 0.0000196,
"throughput_tasks_per_hour": 1.86,
"error_rate": 0.5,
"retry_rate": 0.0
}
},
"tool_usage_analysis": {
"web_search": {
"usage_count": 3,
"error_rate": 0.333,
"avg_duration": 126666.67,
"affected_workflows": [
"web_research"
],
"retry_count": 2
},
"data_analyzer": {
"usage_count": 2,
"error_rate": 0.0,
"avg_duration": 205000.0,
"affected_workflows": [
"data_analysis"
],
"retry_count": 0
},
"document_processor": {
"usage_count": 2,
"error_rate": 0.0,
"avg_duration": 140000.0,
"affected_workflows": [
"document_processing"
],
"retry_count": 1
},
"notification_sender": {
"usage_count": 2,
"error_rate": 0.5,
"avg_duration": 18750.0,
"affected_workflows": [
"notification"
],
"retry_count": 1
},
"task_scheduler": {
"usage_count": 1,
"error_rate": 0.0,
"avg_duration": 12000.0,
"affected_workflows": [
"task_scheduling"
],
"retry_count": 0
}
},
"error_analysis": [
{
"error_type": "timeout",
"count": 2,
"percentage": 20.0,
"affected_agents": [
"research_agent_2",
"data_agent_2"
],
"affected_task_types": [
"web_research",
"data_analysis"
],
"common_patterns": [
"timeout",
"exceeded",
"limit"
],
"suggested_fixes": [
"Increase timeout values",
"Optimize slow operations",
"Add retry logic with exponential backoff",
"Parallelize independent operations"
],
"impact_level": "high"
},
{
"error_type": "authentication",
"count": 1,
"percentage": 10.0,
"affected_agents": [
"communication_agent_2"
],
"affected_task_types": [
"notification"
],
"common_patterns": [
"authentication",
"failed",
"invalid"
],
"suggested_fixes": [
"Check credential rotation",
"Implement token refresh logic",
"Add authentication retry",
"Verify permission scopes"
],
"impact_level": "high"
},
{
"error_type": "validation",
"count": 1,
"percentage": 10.0,
"affected_agents": [
"document_agent_1"
],
"affected_task_types": [
"document_processing"
],
"common_patterns": [
"validation",
"failed",
"missing"
],
"suggested_fixes": [
"Strengthen input validation",
"Add data sanitization",
"Improve error messages",
"Add input examples"
],
"impact_level": "medium"
}
],
"bottleneck_analysis": [
{
"bottleneck_type": "tool",
"location": "notification_sender",
"severity": "medium",
"description": "Tool notification_sender has high error rate (50.0%)",
"impact_on_performance": {
"reliability_impact": 1.0,
"retry_overhead": 1000
},
"affected_workflows": [
"notification"
],
"optimization_suggestions": [
"Review tool implementation",
"Add better error handling for tool",
"Implement tool fallbacks",
"Consider alternative tools"
],
"estimated_improvement": {
"error_reduction": 0.35,
"performance_gain": 1.2
}
},
{
"bottleneck_type": "tool",
"location": "web_search",
"severity": "medium",
"description": "Tool web_search has high error rate (33.3%)",
"impact_on_performance": {
"reliability_impact": 1.0,
"retry_overhead": 2000
},
"affected_workflows": [
"web_research"
],
"optimization_suggestions": [
"Review tool implementation",
"Add better error handling for tool",
"Implement tool fallbacks",
"Consider alternative tools"
],
"estimated_improvement": {
"error_reduction": 0.233,
"performance_gain": 1.2
}
}
],
"optimization_recommendations": [
{
"category": "reliability",
"priority": "high",
"title": "Improve System Reliability",
"description": "System success rate is 80.0%, below target of 90%",
"implementation_effort": "medium",
"expected_impact": {
"success_rate_improvement": 0.1,
"cost_reduction": 0.01611
},
"estimated_cost_savings": 0.1074,
"estimated_performance_gain": 1.2,
"implementation_steps": [
"Identify and fix top error patterns",
"Implement better error handling and retries",
"Add comprehensive monitoring and alerting",
"Implement graceful degradation patterns"
],
"risks": [
"Temporary increase in complexity",
"Potential initial performance overhead"
],
"prerequisites": [
"Error analysis completion",
"Monitoring infrastructure"
]
},
{
"category": "performance",
"priority": "high",
"title": "Reduce Task Latency",
"description": "Average task duration (169.8s) exceeds target",
"implementation_effort": "high",
"expected_impact": {
"latency_reduction": 0.49,
"throughput_improvement": 1.5
},
"estimated_performance_gain": 1.4,
"implementation_steps": [
"Profile and optimize slow operations",
"Implement parallel processing where possible",
"Add caching for expensive operations",
"Optimize API calls and reduce round trips"
],
"risks": [
"Increased system complexity",
"Potential resource usage increase"
],
"prerequisites": [
"Performance profiling tools",
"Caching infrastructure"
]
},
{
"category": "cost",
"priority": "medium",
"title": "Optimize Token Usage and Costs",
"description": "Average cost per task ($0.107) is above optimal range",
"implementation_effort": "low",
"expected_impact": {
"cost_reduction": 0.032,
"efficiency_improvement": 1.15
},
"estimated_cost_savings": 0.322,
"estimated_performance_gain": 1.05,
"implementation_steps": [
"Implement prompt optimization",
"Add response caching for repeated queries",
"Use smaller models for simple tasks",
"Implement token usage monitoring and alerts"
],
"risks": [
"Potential quality reduction with smaller models"
],
"prerequisites": [
"Token usage analysis",
"Caching infrastructure"
]
},
{
"category": "reliability",
"priority": "high",
"title": "Address Timeout Errors",
"description": "Timeout errors occur in 20.0% of cases",
"implementation_effort": "medium",
"expected_impact": {
"error_reduction": 0.2,
"reliability_improvement": 1.1
},
"estimated_cost_savings": 0.1074,
"implementation_steps": [
"Increase timeout values",
"Optimize slow operations",
"Add retry logic with exponential backoff",
"Parallelize independent operations"
],
"risks": [
"May require significant code changes"
],
"prerequisites": [
"Root cause analysis",
"Testing framework"
]
},
{
"category": "reliability",
"priority": "high",
"title": "Address Authentication Errors",
"description": "Authentication errors occur in 10.0% of cases",
"implementation_effort": "medium",
"expected_impact": {
"error_reduction": 0.1,
"reliability_improvement": 1.1
},
"estimated_cost_savings": 0.1074,
"implementation_steps": [
"Check credential rotation",
"Implement token refresh logic",
"Add authentication retry",
"Verify permission scopes"
],
"risks": [
"May require significant code changes"
],
"prerequisites": [
"Root cause analysis",
"Testing framework"
]
},
{
"category": "performance",
"priority": "medium",
"title": "Address Tool Bottleneck",
"description": "Tool notification_sender has high error rate (50.0%)",
"implementation_effort": "medium",
"expected_impact": {
"error_reduction": 0.35,
"performance_gain": 1.2
},
"estimated_performance_gain": 1.2,
"implementation_steps": [
"Review tool implementation",
"Add better error handling for tool",
"Implement tool fallbacks",
"Consider alternative tools"
],
"risks": [
"System downtime during implementation",
"Potential cascade effects"
],
"prerequisites": [
"Impact assessment",
"Rollback plan"
]
}
],
"trends_analysis": {
"daily_success_rates": {
"2024-01-15": 0.8
},
"daily_avg_durations": {
"2024-01-15": 169800.0
},
"daily_costs": {
"2024-01-15": 1.074
},
"trend_direction": {
"success_rate": "stable",
"duration": "stable",
"cost": "stable"
}
},
"cost_breakdown": {
"total_cost": 1.074,
"cost_by_agent": {
"research_agent_1": 0.221,
"research_agent_2": 0.024,
"data_agent_1": 0.095,
"data_agent_2": 0.18,
"document_agent_1": 0.172,
"document_agent_2": 0.174,
"communication_agent_1": 0.007,
"communication_agent_2": 0.004,
"scheduler_agent_1": 0.01
},
"cost_by_task_type": {
"web_research": 0.245,
"data_analysis": 0.275,
"document_processing": 0.346,
"notification": 0.011,
"task_scheduling": 0.01
},
"cost_per_token": 0.00002,
"top_cost_drivers": [
[
"document_processing",
0.346
],
[
"data_analysis",
0.275
],
[
"web_research",
0.245
],
[
"notification",
0.011
],
[
"task_scheduling",
0.01
]
]
},
"sla_compliance": {
"overall_compliant": false,
"sla_details": {
"success_rate": {
"target": 0.95,
"actual": 0.8,
"compliant": false,
"gap": 0.15
},
"average_latency": {
"target": 10000,
"actual": 169800.0,
"compliant": false,
"gap": 159800.0
},
"error_rate": {
"target": 0.05,
"actual": 0.3,
"compliant": false,
"gap": 0.25
}
},
"compliance_score": 0.0
},
"metadata": {
"generated_at": "2024-01-15T12:00:00Z",
"evaluator_version": "1.0",
"total_logs_processed": 10,
"agents_analyzed": 9,
"task_types_analyzed": 5,
"analysis_completeness": "full"
}
} {
"tool_schemas": [
{
"name": "web_search",
"description": "Search the web for information on specified topics with customizable filters and result limits",
"openai_schema": {
"name": "web_search",
"description": "Search the web for information on specified topics with customizable filters and result limits",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query string to find relevant information",
"minLength": 1,
"maxLength": 500,
"examples": [
"artificial intelligence trends",
"climate change impact",
"python programming tutorial"
]
},
"limit": {
"type": "integer",
"description": "Maximum number of search results to return",
"minimum": 1,
"maximum": 100,
"default": 10
},
"language": {
"type": "string",
"description": "Language code for search results",
"enum": [
"en",
"es",
"fr",
"de",
"it",
"pt",
"zh",
"ja"
],
"default": "en"
},
"time_range": {
"type": "string",
"description": "Time range filter for search results",
"enum": [
"any",
"day",
"week",
"month",
"year"
]
}
},
"required": [
"query"
],
"additionalProperties": false
}
},
"anthropic_schema": {
"name": "web_search",
"description": "Search the web for information on specified topics with customizable filters and result limits",
"input_schema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query string to find relevant information",
"minLength": 1,
"maxLength": 500
},
"limit": {
"type": "integer",
"description": "Maximum number of search results to return",
"minimum": 1,
"maximum": 100
},
"language": {
"type": "string",
"description": "Language code for search results",
"enum": [
"en",
"es",
"fr",
"de",
"it",
"pt",
"zh",
"ja"
]
},
"time_range": {
"type": "string",
"description": "Time range filter for search results",
"enum": [
"any",
"day",
"week",
"month",
"year"
]
}
},
"required": [
"query"
]
}
},
"validation_rules": [
{
"parameter": "query",
"rules": {
"minLength": 1,
"maxLength": 500
}
},
{
"parameter": "limit",
"rules": {
"minimum": 1,
"maximum": 100
}
}
],
"error_responses": [
{
"error_code": "invalid_input",
"error_message": "Invalid input parameters provided",
"http_status": 400,
"retry_after": null,
"details": {
"validation_errors": []
}
},
{
"error_code": "authentication_required",
"error_message": "Authentication required to access this tool",
"http_status": 401,
"retry_after": null,
"details": null
},
{
"error_code": "rate_limit_exceeded",
"error_message": "Rate limit exceeded. Please try again later",
"http_status": 429,
"retry_after": 60,
"details": null
}
],
"rate_limits": {
"requests_per_minute": 60,
"requests_per_hour": 1000,
"requests_per_day": 10000,
"burst_limit": 10,
"cooldown_period": 60,
"rate_limit_key": "user_id"
},
"examples": [
{
"description": "Basic web search",
"input": {
"query": "machine learning algorithms",
"limit": 5
},
"expected_output": {
"results": [
{
"title": "Introduction to Machine Learning Algorithms",
"url": "https://example.com/ml-intro",
"snippet": "Machine learning algorithms are computational methods...",
"relevance_score": 0.95
}
],
"total_found": 1250
}
}
],
"metadata": {
"category": "search",
"idempotent": true,
"side_effects": [
"Logs search query for analytics",
"May cache results temporarily"
],
"dependencies": [
"search_api_service",
"content_filter_service"
],
"security_requirements": [
"Query sanitization",
"Rate limiting by user",
"Content filtering"
],
"generated_at": "2024-01-15T10:30:00Z",
"schema_version": "1.0",
"input_parameters": 4,
"output_parameters": 2,
"required_parameters": 1,
"optional_parameters": 3
}
},
{
"name": "data_analyzer",
"description": "Analyze structured data and generate statistical insights, trends, and visualizations",
"openai_schema": {
"name": "data_analyzer",
"description": "Analyze structured data and generate statistical insights, trends, and visualizations",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "object",
"description": "Structured data to analyze in JSON format",
"properties": {
"columns": {
"type": "array"
},
"rows": {
"type": "array"
}
},
"additionalProperties": false
},
"analysis_type": {
"type": "string",
"description": "Type of analysis to perform",
"enum": [
"descriptive",
"correlation",
"trend",
"distribution",
"outlier_detection"
]
},
"target_column": {
"type": "string",
"description": "Primary column to focus analysis on",
"maxLength": 1000
},
"include_visualization": {
"type": "boolean",
"description": "Whether to generate visualization data",
"default": true
}
},
"required": [
"data",
"analysis_type"
],
"additionalProperties": false
}
},
"anthropic_schema": {
"name": "data_analyzer",
"description": "Analyze structured data and generate statistical insights, trends, and visualizations",
"input_schema": {
"type": "object",
"properties": {
"data": {
"type": "object",
"description": "Structured data to analyze in JSON format"
},
"analysis_type": {
"type": "string",
"description": "Type of analysis to perform",
"enum": [
"descriptive",
"correlation",
"trend",
"distribution",
"outlier_detection"
]
},
"target_column": {
"type": "string",
"description": "Primary column to focus analysis on",
"maxLength": 1000
},
"include_visualization": {
"type": "boolean",
"description": "Whether to generate visualization data"
}
},
"required": [
"data",
"analysis_type"
]
}
},
"validation_rules": [
{
"parameter": "target_column",
"rules": {
"maxLength": 1000
}
}
],
"error_responses": [
{
"error_code": "invalid_input",
"error_message": "Invalid input parameters provided",
"http_status": 400,
"retry_after": null,
"details": {
"validation_errors": []
}
},
{
"error_code": "authentication_required",
"error_message": "Authentication required to access this tool",
"http_status": 401,
"retry_after": null,
"details": null
},
{
"error_code": "rate_limit_exceeded",
"error_message": "Rate limit exceeded. Please try again later",
"http_status": 429,
"retry_after": 60,
"details": null
}
],
"rate_limits": {
"requests_per_minute": 30,
"requests_per_hour": 500,
"requests_per_day": 5000,
"burst_limit": 5,
"cooldown_period": 60,
"rate_limit_key": "user_id"
},
"examples": [
{
"description": "Basic descriptive analysis",
"input": {
"data": {
"columns": [
"age",
"salary",
"department"
],
"rows": [
[
25,
50000,
"engineering"
],
[
30,
60000,
"engineering"
],
[
28,
55000,
"marketing"
]
]
},
"analysis_type": "descriptive",
"target_column": "salary"
},
"expected_output": {
"insights": [
"Average salary is $55,000",
"Salary range: $50,000 - $60,000",
"Engineering department has higher average salary"
],
"statistics": {
"mean": 55000,
"median": 55000,
"std_dev": 5000
}
}
}
],
"metadata": {
"category": "data",
"idempotent": true,
"side_effects": [
"May create temporary analysis files",
"Logs analysis parameters for optimization"
],
"dependencies": [
"statistics_engine",
"visualization_service"
],
"security_requirements": [
"Data anonymization",
"Access control validation"
],
"generated_at": "2024-01-15T10:30:00Z",
"schema_version": "1.0",
"input_parameters": 4,
"output_parameters": 3,
"required_parameters": 2,
"optional_parameters": 2
}
}
],
"metadata": {
"generated_by": "tool_schema_generator.py",
"input_file": "sample_tool_descriptions.json",
"tool_count": 2,
"generation_timestamp": "2024-01-15T10:30:00Z",
"schema_version": "1.0"
},
"validation_summary": {
"total_tools": 2,
"total_parameters": 8,
"total_validation_rules": 3,
"total_examples": 2
}
} Agent Architecture Patterns Catalog
Overview
This document provides a comprehensive catalog of multi-agent system architecture patterns, their characteristics, use cases, and implementation considerations.
Pattern Categories
1. Single Agent Pattern
Description: One agent handles all system functionality Structure: User → Agent ← Tools Complexity: Low
Characteristics:
- Centralized decision making
- No inter-agent communication
- Simple state management
- Direct user interaction
Use Cases:
- Personal assistants
- Simple automation tasks
- Prototyping and development
- Domain-specific applications
Advantages:
- Simple to implement and debug
- Predictable behavior
- Low coordination overhead
- Clear responsibility model
Disadvantages:
- Limited scalability
- Single point of failure
- Resource bottlenecks
- Difficulty handling complex workflows
Implementation Patterns:
Agent {
receive_request()
process_task()
use_tools()
return_response()
}2. Supervisor Pattern (Hierarchical Delegation)
Description: One supervisor coordinates multiple specialist agents Structure: User → Supervisor → Specialists Complexity: Medium
Characteristics:
- Central coordination
- Clear hierarchy
- Specialized capabilities
- Delegation and aggregation
Use Cases:
- Task decomposition scenarios
- Quality control workflows
- Resource allocation systems
- Project management
Advantages:
- Clear command structure
- Specialized expertise
- Centralized quality control
- Efficient resource allocation
Disadvantages:
- Supervisor bottleneck
- Complex coordination logic
- Single point of failure
- Limited parallelism
Implementation Patterns:
Supervisor {
decompose_task()
delegate_to_specialists()
monitor_progress()
aggregate_results()
quality_control()
}
Specialist {
receive_assignment()
execute_specialized_task()
report_results()
}3. Swarm Pattern (Peer-to-Peer)
Description: Multiple autonomous agents collaborate as peers Structure: Agent ↔ Agent ↔ Agent (interconnected) Complexity: High
Characteristics:
- Distributed decision making
- Peer-to-peer communication
- Emergent behavior
- Self-organization
Use Cases:
- Distributed problem solving
- Parallel processing
- Fault-tolerant systems
- Research and exploration
Advantages:
- High fault tolerance
- Scalable parallelism
- Emergent intelligence
- No single point of failure
Disadvantages:
- Complex coordination
- Unpredictable behavior
- Difficult debugging
- Consensus overhead
Implementation Patterns:
SwarmAgent {
discover_peers()
share_information()
negotiate_tasks()
collaborate()
adapt_behavior()
}
ConsensusProtocol {
propose_action()
vote()
reach_agreement()
execute_collective_decision()
}4. Hierarchical Pattern (Multi-Level Management)
Description: Multiple levels of management and execution Structure: Executive → Managers → Workers (tree structure) Complexity: Very High
Characteristics:
- Multi-level hierarchy
- Distributed management
- Clear organizational structure
- Scalable command structure
Use Cases:
- Enterprise systems
- Large-scale operations
- Complex workflows
- Organizational modeling
Advantages:
- Natural organizational mapping
- Scalable structure
- Clear responsibilities
- Efficient resource management
Disadvantages:
- Communication overhead
- Multi-level bottlenecks
- Complex coordination
- Slower decision making
Implementation Patterns:
Executive {
strategic_planning()
resource_allocation()
performance_monitoring()
}
Manager {
tactical_planning()
team_coordination()
progress_reporting()
}
Worker {
task_execution()
status_reporting()
resource_requests()
}5. Pipeline Pattern (Sequential Processing)
Description: Agents arranged in processing pipeline Structure: Input → Stage1 → Stage2 → Stage3 → Output Complexity: Medium
Characteristics:
- Sequential processing
- Specialized stages
- Data flow architecture
- Clear processing order
Use Cases:
- Data processing pipelines
- Manufacturing workflows
- Content processing
- ETL operations
Advantages:
- Clear data flow
- Specialized optimization
- Predictable processing
- Easy to scale stages
Disadvantages:
- Sequential bottlenecks
- Rigid processing order
- Stage coupling
- Limited flexibility
Implementation Patterns:
PipelineStage {
receive_input()
process_data()
validate_output()
send_to_next_stage()
}
PipelineController {
manage_flow()
handle_errors()
monitor_throughput()
optimize_stages()
}Pattern Selection Criteria
Team Size Considerations
- 1 Agent: Single Agent Pattern only
- 2-5 Agents: Supervisor, Pipeline
- 6-15 Agents: Swarm, Hierarchical, Pipeline
- 15+ Agents: Hierarchical, Large Swarm
Task Complexity
- Simple: Single Agent
- Medium: Supervisor, Pipeline
- Complex: Swarm, Hierarchical
- Very Complex: Hierarchical
Coordination Requirements
- None: Single Agent
- Low: Pipeline, Supervisor
- Medium: Hierarchical
- High: Swarm
Fault Tolerance Requirements
- Low: Single Agent, Pipeline
- Medium: Supervisor, Hierarchical
- High: Swarm
Hybrid Patterns
Hub-and-Spoke with Clusters
Combines supervisor pattern with swarm clusters
- Central coordinator
- Specialized swarm clusters
- Hierarchical communication
Pipeline with Parallel Stages
Pipeline stages that can process in parallel
- Sequential overall flow
- Parallel processing within stages
- Load balancing across stage instances
Hierarchical Swarms
Swarm behavior at each hierarchical level
- Distributed decision making
- Hierarchical coordination
- Multi-level autonomy
Communication Patterns by Architecture
Single Agent
- Direct user interface
- Tool API calls
- No inter-agent communication
Supervisor
- Command/response with specialists
- Progress reporting
- Result aggregation
Swarm
- Broadcast messaging
- Peer discovery
- Consensus protocols
- Information sharing
Hierarchical
- Upward reporting
- Downward delegation
- Lateral coordination
- Skip-level communication
Pipeline
- Stage-to-stage data flow
- Error propagation
- Status monitoring
- Flow control
Scaling Considerations
Horizontal Scaling
- Single Agent: Scale by replication
- Supervisor: Scale specialists
- Swarm: Add more peers
- Hierarchical: Add at appropriate levels
- Pipeline: Scale bottleneck stages
Vertical Scaling
- Single Agent: More powerful agent
- Supervisor: Enhanced supervisor capabilities
- Swarm: Smarter individual agents
- Hierarchical: Better management agents
- Pipeline: Optimize stage processing
Error Handling Patterns
Single Agent
- Retry logic
- Fallback behaviors
- User notification
Supervisor
- Specialist failure detection
- Task reassignment
- Result validation
Swarm
- Peer failure detection
- Consensus recalculation
- Self-healing behavior
Hierarchical
- Escalation procedures
- Skip-level communication
- Management override
Pipeline
- Stage failure recovery
- Data replay
- Circuit breakers
Performance Characteristics
| Pattern | Latency | Throughput | Scalability | Reliability | Complexity |
|---|---|---|---|---|---|
| Single Agent | Low | Low | Poor | Poor | Low |
| Supervisor | Medium | Medium | Good | Medium | Medium |
| Swarm | High | High | Excellent | Excellent | High |
| Hierarchical | Medium | High | Excellent | Good | Very High |
| Pipeline | Low | High | Good | Medium | Medium |
Best Practices by Pattern
Single Agent
- Keep scope focused
- Implement comprehensive error handling
- Use efficient tool selection
- Monitor resource usage
Supervisor
- Design clear delegation rules
- Implement progress monitoring
- Use timeout mechanisms
- Plan for specialist failures
Swarm
- Design simple interaction protocols
- Implement conflict resolution
- Monitor emergent behavior
- Plan for network partitions
Hierarchical
- Define clear role boundaries
- Implement efficient communication
- Plan escalation procedures
- Monitor span of control
Pipeline
- Optimize bottleneck stages
- Implement error recovery
- Use appropriate buffering
- Monitor flow rates
Anti-Patterns to Avoid
God Agent
Single agent that tries to do everything
- Violates single responsibility
- Creates maintenance nightmare
- Poor scalability
Chatty Communication
Excessive inter-agent messaging
- Performance degradation
- Network congestion
- Poor scalability
Circular Dependencies
Agents depending on each other cyclically
- Deadlock potential
- Complex error handling
- Difficult debugging
Over-Centralization
Too much logic in coordinator
- Single point of failure
- Bottleneck creation
- Poor fault tolerance
Under-Specification
Unclear roles and responsibilities
- Coordination failures
- Duplicate work
- Inconsistent behavior
Conclusion
The choice of agent architecture pattern depends on multiple factors including team size, task complexity, coordination requirements, fault tolerance needs, and performance objectives. Each pattern has distinct trade-offs that must be carefully considered in the context of specific system requirements.
Success factors include:
- Clear role definitions
- Appropriate communication patterns
- Robust error handling
- Scalability planning
- Performance monitoring
The patterns can be combined and customized to meet specific needs, but maintaining clarity and avoiding unnecessary complexity should always be prioritized.
Multi-Agent System Evaluation Methodology
Overview
This document provides a comprehensive methodology for evaluating multi-agent systems across multiple dimensions including performance, reliability, cost-effectiveness, and user satisfaction. The methodology is designed to provide actionable insights for system optimization.
Evaluation Framework
Evaluation Dimensions
1. Task Performance
- Success Rate: Percentage of tasks completed successfully
- Completion Time: Time from task initiation to completion
- Quality Metrics: Accuracy, relevance, completeness of results
- Partial Success: Progress made on incomplete tasks
2. System Reliability
- Availability: System uptime and accessibility
- Error Rates: Frequency and types of errors
- Recovery Time: Time to recover from failures
- Fault Tolerance: System behavior under component failures
3. Cost Efficiency
- Resource Utilization: CPU, memory, network, storage usage
- Token Consumption: LLM API usage and costs
- Operational Costs: Infrastructure and maintenance costs
- Cost per Task: Economic efficiency per completed task
4. User Experience
- Response Time: User-perceived latency
- User Satisfaction: Qualitative feedback scores
- Usability: Ease of system interaction
- Predictability: Consistency of system behavior
5. Scalability
- Load Handling: Performance under increasing load
- Resource Scaling: Ability to scale resources dynamically
- Concurrency: Handling multiple simultaneous requests
- Degradation Patterns: Behavior at capacity limits
6. Security
- Access Control: Authentication and authorization effectiveness
- Data Protection: Privacy and confidentiality measures
- Audit Trail: Logging and monitoring completeness
- Vulnerability Assessment: Security weakness identification
Metrics Collection
Core Metrics
Performance Metrics
{
"task_metrics": {
"task_id": "string",
"agent_id": "string",
"task_type": "string",
"start_time": "ISO 8601 timestamp",
"end_time": "ISO 8601 timestamp",
"duration_ms": "integer",
"status": "success|failure|partial|timeout",
"quality_score": "float 0-1",
"steps_completed": "integer",
"total_steps": "integer"
}
}Resource Metrics
{
"resource_metrics": {
"timestamp": "ISO 8601 timestamp",
"agent_id": "string",
"cpu_usage_percent": "float",
"memory_usage_mb": "integer",
"network_bytes_sent": "integer",
"network_bytes_received": "integer",
"tokens_consumed": "integer",
"api_calls_made": "integer"
}
}Error Metrics
{
"error_metrics": {
"timestamp": "ISO 8601 timestamp",
"error_type": "string",
"error_code": "string",
"error_message": "string",
"agent_id": "string",
"task_id": "string",
"severity": "critical|high|medium|low",
"recovery_action": "string",
"resolved": "boolean"
}
}Advanced Metrics
Agent Collaboration Metrics
{
"collaboration_metrics": {
"timestamp": "ISO 8601 timestamp",
"initiating_agent": "string",
"target_agent": "string",
"interaction_type": "request|response|broadcast|delegate",
"latency_ms": "integer",
"success": "boolean",
"payload_size_bytes": "integer",
"context_shared": "boolean"
}
}Tool Usage Metrics
{
"tool_metrics": {
"timestamp": "ISO 8601 timestamp",
"agent_id": "string",
"tool_name": "string",
"invocation_duration_ms": "integer",
"success": "boolean",
"error_type": "string|null",
"input_size_bytes": "integer",
"output_size_bytes": "integer",
"cached_result": "boolean"
}
}Evaluation Methods
1. Synthetic Benchmarks
Task Complexity Levels
- Level 1 (Simple): Single-agent, single-tool tasks
- Level 2 (Moderate): Multi-tool tasks requiring coordination
- Level 3 (Complex): Multi-agent collaborative tasks
- Level 4 (Advanced): Long-running, multi-stage workflows
- Level 5 (Expert): Adaptive tasks requiring learning
Benchmark Task Categories
benchmark_categories:
information_retrieval:
- simple_web_search
- multi_source_research
- fact_verification
- comparative_analysis
content_generation:
- text_summarization
- creative_writing
- technical_documentation
- multilingual_translation
data_processing:
- data_cleaning
- statistical_analysis
- visualization_creation
- report_generation
problem_solving:
- algorithm_development
- optimization_tasks
- troubleshooting
- decision_support
workflow_automation:
- multi_step_processes
- conditional_workflows
- exception_handling
- resource_coordinationBenchmark Execution
def run_benchmark_suite(agents, benchmark_tasks):
results = {}
for category, tasks in benchmark_tasks.items():
category_results = []
for task in tasks:
task_result = execute_benchmark_task(
agents=agents,
task=task,
timeout=task.max_duration,
repetitions=task.repetitions
)
category_results.append(task_result)
results[category] = analyze_category_results(category_results)
return generate_benchmark_report(results)2. A/B Testing
Test Design
ab_test_design:
hypothesis: "New agent architecture improves task success rate"
success_metrics:
primary: "task_success_rate"
secondary: ["response_time", "cost_per_task", "user_satisfaction"]
test_configuration:
control_group: "current_architecture"
treatment_group: "new_architecture"
traffic_split: 50/50
duration_days: 14
minimum_sample_size: 1000
statistical_parameters:
confidence_level: 0.95
minimum_detectable_effect: 0.05
statistical_power: 0.8Analysis Framework
def analyze_ab_test(control_data, treatment_data, metrics):
results = {}
for metric in metrics:
control_values = extract_metric_values(control_data, metric)
treatment_values = extract_metric_values(treatment_data, metric)
# Statistical significance test
stat_result = perform_statistical_test(
control_values,
treatment_values,
test_type=determine_test_type(metric)
)
# Effect size calculation
effect_size = calculate_effect_size(
control_values,
treatment_values
)
results[metric] = {
"control_mean": np.mean(control_values),
"treatment_mean": np.mean(treatment_values),
"p_value": stat_result.p_value,
"confidence_interval": stat_result.confidence_interval,
"effect_size": effect_size,
"practical_significance": assess_practical_significance(
effect_size, metric
)
}
return results3. Load Testing
Load Test Scenarios
load_test_scenarios:
baseline_load:
concurrent_users: 10
ramp_up_time: "5 minutes"
duration: "30 minutes"
normal_load:
concurrent_users: 100
ramp_up_time: "10 minutes"
duration: "1 hour"
peak_load:
concurrent_users: 500
ramp_up_time: "15 minutes"
duration: "2 hours"
stress_test:
concurrent_users: 1000
ramp_up_time: "20 minutes"
duration: "1 hour"
spike_test:
phases:
- users: 100, duration: "10 minutes"
- users: 1000, duration: "5 minutes" # Spike
- users: 100, duration: "15 minutes"Performance Thresholds
performance_thresholds:
response_time:
p50: 2000ms # 50th percentile
p90: 5000ms # 90th percentile
p95: 8000ms # 95th percentile
p99: 15000ms # 99th percentile
throughput:
minimum: 10 # requests per second
target: 50 # requests per second
error_rate:
maximum: 5% # percentage of failed requests
resource_utilization:
cpu_max: 80%
memory_max: 85%
network_max: 70%4. Real-World Evaluation
Production Monitoring
production_metrics:
business_metrics:
- task_completion_rate
- user_retention_rate
- feature_adoption_rate
- time_to_value
technical_metrics:
- system_availability
- mean_time_to_recovery
- resource_efficiency
- cost_per_transaction
user_experience_metrics:
- net_promoter_score
- user_satisfaction_rating
- task_abandonment_rate
- help_desk_ticket_volumeContinuous Evaluation Pipeline
class ContinuousEvaluationPipeline:
def __init__(self, metrics_collector, analyzer, alerting):
self.metrics_collector = metrics_collector
self.analyzer = analyzer
self.alerting = alerting
def run_evaluation_cycle(self):
# Collect recent metrics
metrics = self.metrics_collector.collect_recent_metrics(
time_window="1 hour"
)
# Analyze performance
analysis = self.analyzer.analyze_metrics(metrics)
# Check for anomalies
anomalies = self.analyzer.detect_anomalies(
metrics,
baseline_window="24 hours"
)
# Generate alerts if needed
if anomalies:
self.alerting.send_alerts(anomalies)
# Update performance baselines
self.analyzer.update_baselines(metrics)
return analysisAnalysis Techniques
1. Statistical Analysis
Descriptive Statistics
def calculate_descriptive_stats(data):
return {
"count": len(data),
"mean": np.mean(data),
"median": np.median(data),
"std_dev": np.std(data),
"min": np.min(data),
"max": np.max(data),
"percentiles": {
"p25": np.percentile(data, 25),
"p50": np.percentile(data, 50),
"p75": np.percentile(data, 75),
"p90": np.percentile(data, 90),
"p95": np.percentile(data, 95),
"p99": np.percentile(data, 99)
}
}Correlation Analysis
def analyze_metric_correlations(metrics_df):
correlation_matrix = metrics_df.corr()
# Identify strong correlations
strong_correlations = []
for i in range(len(correlation_matrix.columns)):
for j in range(i + 1, len(correlation_matrix.columns)):
corr_value = correlation_matrix.iloc[i, j]
if abs(corr_value) > 0.7: # Strong correlation threshold
strong_correlations.append({
"metric1": correlation_matrix.columns[i],
"metric2": correlation_matrix.columns[j],
"correlation": corr_value,
"strength": "strong" if abs(corr_value) > 0.8 else "moderate"
})
return strong_correlations2. Trend Analysis
Time Series Analysis
def analyze_performance_trends(time_series_data, metric):
# Decompose time series
decomposition = seasonal_decompose(
time_series_data[metric],
model='additive',
period=24 # Daily seasonality
)
# Trend detection
trend_slope = calculate_trend_slope(decomposition.trend)
# Seasonality detection
seasonal_patterns = identify_seasonal_patterns(decomposition.seasonal)
# Anomaly detection
anomalies = detect_anomalies_isolation_forest(time_series_data[metric])
return {
"trend_direction": "increasing" if trend_slope > 0 else "decreasing" if trend_slope < 0 else "stable",
"trend_strength": abs(trend_slope),
"seasonal_patterns": seasonal_patterns,
"anomalies": anomalies,
"forecast": generate_forecast(time_series_data[metric], periods=24)
}3. Comparative Analysis
Multi-System Comparison
def compare_systems(system_metrics_dict):
comparison_results = {}
metrics_to_compare = [
"success_rate", "average_response_time",
"cost_per_task", "error_rate"
]
for metric in metrics_to_compare:
metric_values = {
system: metrics[metric]
for system, metrics in system_metrics_dict.items()
}
# Rank systems by metric
ranked_systems = sorted(
metric_values.items(),
key=lambda x: x[1],
reverse=(metric in ["success_rate"]) # Higher is better for some metrics
)
# Calculate relative performance
best_value = ranked_systems[0][1]
relative_performance = {
system: value / best_value if best_value > 0 else 0
for system, value in metric_values.items()
}
comparison_results[metric] = {
"rankings": ranked_systems,
"relative_performance": relative_performance,
"best_system": ranked_systems[0][0]
}
return comparison_resultsQuality Assurance
1. Data Quality Validation
Data Completeness Checks
def validate_data_completeness(metrics_data):
completeness_report = {}
required_fields = [
"timestamp", "task_id", "agent_id",
"duration_ms", "status", "success"
]
for field in required_fields:
missing_count = metrics_data[field].isnull().sum()
total_count = len(metrics_data)
completeness_percentage = (total_count - missing_count) / total_count * 100
completeness_report[field] = {
"completeness_percentage": completeness_percentage,
"missing_count": missing_count,
"status": "pass" if completeness_percentage >= 95 else "fail"
}
return completeness_reportData Consistency Checks
def validate_data_consistency(metrics_data):
consistency_issues = []
# Check timestamp ordering
if not metrics_data['timestamp'].is_monotonic_increasing:
consistency_issues.append("Timestamps are not in chronological order")
# Check duration consistency
duration_negative = (metrics_data['duration_ms'] < 0).sum()
if duration_negative > 0:
consistency_issues.append(f"Found {duration_negative} negative durations")
# Check status-success consistency
success_status_mismatch = (
(metrics_data['status'] == 'success') != metrics_data['success']
).sum()
if success_status_mismatch > 0:
consistency_issues.append(f"Found {success_status_mismatch} status-success mismatches")
return consistency_issues2. Evaluation Reliability
Reproducibility Framework
class ReproducibleEvaluation:
def __init__(self, config):
self.config = config
self.random_seed = config.get('random_seed', 42)
def setup_environment(self):
# Set random seeds
random.seed(self.random_seed)
np.random.seed(self.random_seed)
# Configure logging
self.setup_evaluation_logging()
# Snapshot system state
self.snapshot_system_state()
def run_evaluation(self, test_suite):
self.setup_environment()
# Execute evaluation with full logging
results = self.execute_test_suite(test_suite)
# Verify reproducibility
self.verify_reproducibility(results)
return resultsReporting Framework
1. Executive Summary Report
Key Performance Indicators
kpi_dashboard:
overall_health_score: 85/100
performance:
task_success_rate: 94.2%
average_response_time: 2.3s
p95_response_time: 8.1s
reliability:
system_uptime: 99.8%
error_rate: 2.1%
mean_recovery_time: 45s
cost_efficiency:
cost_per_task: $0.05
token_utilization: 78%
resource_efficiency: 82%
user_satisfaction:
net_promoter_score: 42
task_completion_rate: 89%
user_retention_rate: 76%Trend Indicators
trend_analysis:
performance_trends:
success_rate: "↗ +2.3% vs last month"
response_time: "↘ -15% vs last month"
error_rate: "→ stable vs last month"
cost_trends:
total_cost: "↗ +8% vs last month"
cost_per_task: "↘ -5% vs last month"
efficiency: "↗ +12% vs last month"2. Technical Deep-Dive Report
Performance Analysis
## Performance Analysis
### Task Success Patterns
- **Overall Success Rate**: 94.2% (target: 95%)
- **By Task Type**:
- Simple tasks: 98.1% success
- Complex tasks: 87.4% success
- Multi-agent tasks: 91.2% success
### Response Time Distribution
- **Median**: 1.8 seconds
- **95th Percentile**: 8.1 seconds
- **Peak Hours Impact**: +35% slower during 9-11 AM
### Error Analysis
- **Top Error Types**:
1. Timeout errors (34% of failures)
2. Rate limit exceeded (28% of failures)
3. Invalid input (19% of failures)Resource Utilization
## Resource Utilization
### Compute Resources
- **CPU Utilization**: 45% average, 78% peak
- **Memory Usage**: 6.2GB average, 12.1GB peak
- **Network I/O**: 125 MB/s average
### API Usage
- **Token Consumption**: 2.4M tokens/day
- **Cost Breakdown**:
- GPT-4: 68% of token costs
- GPT-3.5: 28% of token costs
- Other models: 4% of token costs3. Actionable Recommendations
Performance Optimization
recommendations:
high_priority:
- title: "Reduce timeout error rate"
impact: "Could improve success rate by 2.1%"
effort: "Medium"
timeline: "2 weeks"
- title: "Optimize complex task handling"
impact: "Could improve complex task success by 5%"
effort: "High"
timeline: "4 weeks"
medium_priority:
- title: "Implement intelligent caching"
impact: "Could reduce costs by 15%"
effort: "Medium"
timeline: "3 weeks"Continuous Improvement Process
1. Evaluation Cadence
Regular Evaluation Schedule
evaluation_schedule:
real_time:
frequency: "continuous"
metrics: ["error_rate", "response_time", "system_health"]
hourly:
frequency: "every hour"
metrics: ["throughput", "resource_utilization", "user_activity"]
daily:
frequency: "daily at 2 AM UTC"
metrics: ["success_rates", "cost_analysis", "user_satisfaction"]
weekly:
frequency: "every Sunday"
metrics: ["trend_analysis", "comparative_analysis", "capacity_planning"]
monthly:
frequency: "first Monday of month"
metrics: ["comprehensive_evaluation", "benchmark_testing", "strategic_review"]2. Performance Baseline Management
Baseline Update Process
def update_performance_baselines(current_metrics, historical_baselines):
updated_baselines = {}
for metric, current_value in current_metrics.items():
historical_values = historical_baselines.get(metric, [])
historical_values.append(current_value)
# Keep rolling window of last 30 days
historical_values = historical_values[-30:]
# Calculate new baseline
baseline = {
"mean": np.mean(historical_values),
"std": np.std(historical_values),
"p95": np.percentile(historical_values, 95),
"trend": calculate_trend(historical_values)
}
updated_baselines[metric] = baseline
return updated_baselinesConclusion
Effective evaluation of multi-agent systems requires a comprehensive, multi-dimensional approach that combines quantitative metrics with qualitative assessments. The methodology should be:
- Comprehensive: Cover all aspects of system performance
- Continuous: Provide ongoing monitoring and evaluation
- Actionable: Generate specific, implementable recommendations
- Adaptable: Evolve with system changes and requirements
- Reliable: Produce consistent, reproducible results
Regular evaluation using this methodology will ensure multi-agent systems continue to meet user needs while optimizing for cost, performance, and reliability.
Tool Design Best Practices for Multi-Agent Systems
Overview
This document outlines comprehensive best practices for designing tools that work effectively within multi-agent systems. Tools are the primary interface between agents and external capabilities, making their design critical for system success.
Core Principles
1. Single Responsibility Principle
Each tool should have a clear, focused purpose:
- Do one thing well: Avoid multi-purpose tools that try to solve many problems
- Clear boundaries: Well-defined input/output contracts
- Predictable behavior: Consistent results for similar inputs
- Easy to understand: Purpose should be obvious from name and description
2. Idempotency
Tools should produce consistent results:
- Safe operations: Read operations should never modify state
- Repeatable operations: Same input should yield same output (when possible)
- State handling: Clear semantics for state-modifying operations
- Error recovery: Failed operations should be safely retryable
3. Composability
Tools should work well together:
- Standard interfaces: Consistent input/output formats
- Minimal assumptions: Don't assume specific calling contexts
- Chain-friendly: Output of one tool can be input to another
- Modular design: Tools can be combined in different ways
4. Robustness
Tools should handle edge cases gracefully:
- Input validation: Comprehensive validation of all inputs
- Error handling: Graceful degradation on failures
- Resource management: Proper cleanup and resource management
- Timeout handling: Operations should have reasonable timeouts
Input Schema Design
Schema Structure
{
"type": "object",
"properties": {
"parameter_name": {
"type": "string",
"description": "Clear, specific description",
"examples": ["example1", "example2"],
"minLength": 1,
"maxLength": 1000
}
},
"required": ["parameter_name"],
"additionalProperties": false
}Parameter Guidelines
Required vs Optional Parameters
- Required parameters: Essential for tool function
- Optional parameters: Provide additional control or customization
- Default values: Sensible defaults for optional parameters
- Parameter groups: Related parameters should be grouped logically
Parameter Types
- Primitives: string, number, boolean for simple values
- Arrays: For lists of similar items
- Objects: For complex structured data
- Enums: For fixed sets of valid values
- Unions: When multiple types are acceptable
Validation Rules
String validation:
- Length constraints (minLength, maxLength)
- Pattern matching for formats (email, URL, etc.)
- Character set restrictions
- Content filtering for security
Numeric validation:
- Range constraints (minimum, maximum)
- Multiple restrictions (multipleOf)
- Precision requirements
- Special value handling (NaN, infinity)
Array validation:
- Size constraints (minItems, maxItems)
- Item type validation
- Uniqueness requirements
- Ordering requirements
Object validation:
- Required property enforcement
- Additional property policies
- Nested validation rules
- Dependency validation
Input Examples
Good Example:
{
"name": "search_web",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query string",
"minLength": 1,
"maxLength": 500,
"examples": ["latest AI developments", "weather forecast"]
},
"limit": {
"type": "integer",
"description": "Maximum number of results to return",
"minimum": 1,
"maximum": 100,
"default": 10
},
"language": {
"type": "string",
"description": "Language code for search results",
"enum": ["en", "es", "fr", "de"],
"default": "en"
}
},
"required": ["query"],
"additionalProperties": false
}
}Bad Example:
{
"name": "do_stuff",
"description": "Does various operations",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "Some data"
}
},
"additionalProperties": true
}
}Output Schema Design
Response Structure
{
"success": true,
"data": {
// Actual response data
},
"metadata": {
"timestamp": "2024-01-15T10:30:00Z",
"execution_time_ms": 234,
"version": "1.0"
},
"warnings": [],
"pagination": {
"total": 100,
"page": 1,
"per_page": 10,
"has_next": true
}
}Data Consistency
- Predictable structure: Same structure regardless of success/failure
- Type consistency: Same data types across different calls
- Null handling: Clear semantics for missing/null values
- Empty responses: Consistent handling of empty result sets
Metadata Inclusion
- Execution time: Performance monitoring
- Timestamps: Audit trails and debugging
- Version information: Compatibility tracking
- Request identifiers: Correlation and debugging
Error Handling
Error Response Structure
{
"success": false,
"error": {
"code": "INVALID_INPUT",
"message": "The provided query is too short",
"details": {
"field": "query",
"provided_length": 0,
"minimum_length": 1
},
"retry_after": null,
"documentation_url": "https://docs.example.com/errors#INVALID_INPUT"
},
"request_id": "req_12345"
}Error Categories
Client Errors (4xx equivalent)
- INVALID_INPUT: Malformed or invalid parameters
- MISSING_PARAMETER: Required parameter not provided
- VALIDATION_ERROR: Parameter fails validation rules
- AUTHENTICATION_ERROR: Invalid or missing credentials
- PERMISSION_ERROR: Insufficient permissions
- RATE_LIMIT_ERROR: Too many requests
Server Errors (5xx equivalent)
- INTERNAL_ERROR: Unexpected server error
- SERVICE_UNAVAILABLE: Downstream service unavailable
- TIMEOUT_ERROR: Operation timed out
- RESOURCE_EXHAUSTED: Out of resources (memory, disk, etc.)
- DEPENDENCY_ERROR: External dependency failed
Tool-Specific Errors
- DATA_NOT_FOUND: Requested data doesn't exist
- FORMAT_ERROR: Data in unexpected format
- PROCESSING_ERROR: Error during data processing
- CONFIGURATION_ERROR: Tool misconfiguration
Error Recovery Strategies
Retry Logic
{
"retry_policy": {
"max_attempts": 3,
"backoff_strategy": "exponential",
"base_delay_ms": 1000,
"max_delay_ms": 30000,
"retryable_errors": [
"TIMEOUT_ERROR",
"SERVICE_UNAVAILABLE",
"RATE_LIMIT_ERROR"
]
}
}Fallback Behaviors
- Graceful degradation: Partial results when possible
- Alternative approaches: Different methods to achieve same goal
- Cached responses: Return stale data if fresh data unavailable
- Default responses: Safe default when specific response impossible
Security Considerations
Input Sanitization
- SQL injection prevention: Parameterized queries
- XSS prevention: HTML encoding of outputs
- Command injection prevention: Input validation and sandboxing
- Path traversal prevention: Path validation and restrictions
Authentication and Authorization
- API key management: Secure storage and rotation
- Token validation: JWT validation and expiration
- Permission checking: Role-based access control
- Audit logging: Security event logging
Data Protection
- PII handling: Detection and protection of personal data
- Encryption: Data encryption in transit and at rest
- Data retention: Compliance with retention policies
- Access logging: Who accessed what data when
Performance Optimization
Response Time
- Caching strategies: Result caching for repeated requests
- Connection pooling: Reuse connections to external services
- Async processing: Non-blocking operations where possible
- Resource optimization: Efficient resource utilization
Throughput
- Batch operations: Support for bulk operations
- Parallel processing: Concurrent execution where safe
- Load balancing: Distribute load across instances
- Resource scaling: Auto-scaling based on demand
Resource Management
- Memory usage: Efficient memory allocation and cleanup
- CPU optimization: Avoid unnecessary computations
- Network efficiency: Minimize network round trips
- Storage optimization: Efficient data structures and storage
Testing Strategies
Unit Testing
def test_search_web_valid_input():
result = search_web("test query", limit=5)
assert result["success"] is True
assert len(result["data"]["results"]) <= 5
def test_search_web_invalid_input():
result = search_web("", limit=5)
assert result["success"] is False
assert result["error"]["code"] == "INVALID_INPUT"Integration Testing
- End-to-end workflows: Complete user scenarios
- External service mocking: Mock external dependencies
- Error simulation: Simulate various error conditions
- Performance testing: Load and stress testing
Contract Testing
- Schema validation: Validate against defined schemas
- Backward compatibility: Ensure changes don't break clients
- API versioning: Test multiple API versions
- Consumer-driven contracts: Test from consumer perspective
Documentation
Tool Documentation Template
# Tool Name
## Description
Brief description of what the tool does.
## Parameters
### Required Parameters
- `parameter_name` (type): Description
### Optional Parameters
- `optional_param` (type, default: value): Description
## Response
Description of response format and data.
## Examples
### Basic Usage
Input:
```json
{
"parameter_name": "value"
}Output:
{
"success": true,
"data": {...}
}Error Codes
ERROR_CODE: Description of when this error occurs
### API Documentation
- **OpenAPI/Swagger specs:** Machine-readable API documentation
- **Interactive examples:** Runnable examples in documentation
- **Code samples:** Examples in multiple programming languages
- **Changelog:** Version history and breaking changes
## Versioning Strategy
### Semantic Versioning
- **Major version:** Breaking changes
- **Minor version:** New features, backward compatible
- **Patch version:** Bug fixes, no new features
### API Evolution
- **Deprecation policy:** How to deprecate old features
- **Migration guides:** Help users upgrade to new versions
- **Backward compatibility:** Support for old versions
- **Feature flags:** Gradual rollout of new features
## Monitoring and Observability
### Metrics Collection
- **Usage metrics:** Call frequency, success rates
- **Performance metrics:** Response times, throughput
- **Error metrics:** Error rates by type
- **Resource metrics:** CPU, memory, network usage
### Logging
```json
{
"timestamp": "2024-01-15T10:30:00Z",
"tool_name": "search_web",
"request_id": "req_12345",
"agent_id": "agent_001",
"input_hash": "abc123",
"execution_time_ms": 234,
"success": true,
"error_code": null
}Alerting
- Error rate thresholds: Alert on high error rates
- Performance degradation: Alert on slow responses
- Resource exhaustion: Alert on resource limits
- Service availability: Alert on service downtime
Common Anti-Patterns
Tool Design Anti-Patterns
- God tools: Tools that try to do everything
- Chatty tools: Tools that require many calls for simple tasks
- Stateful tools: Tools that maintain state between calls
- Inconsistent interfaces: Tools with different conventions
Error Handling Anti-Patterns
- Silent failures: Failing without proper error reporting
- Generic errors: Non-descriptive error messages
- Inconsistent error formats: Different error structures
- No retry guidance: Not indicating if operation is retryable
Performance Anti-Patterns
- Synchronous everything: Not using async operations where appropriate
- No caching: Repeatedly fetching same data
- Resource leaks: Not properly cleaning up resources
- Unbounded operations: Operations that can run indefinitely
Best Practices Checklist
Design Phase
- Single, clear purpose
- Well-defined input/output contracts
- Comprehensive input validation
- Idempotent operations where possible
- Error handling strategy defined
Implementation Phase
- Robust error handling
- Input sanitization
- Resource management
- Timeout handling
- Logging implementation
Testing Phase
- Unit tests for all functionality
- Integration tests with dependencies
- Error condition testing
- Performance testing
- Security testing
Documentation Phase
- Complete API documentation
- Usage examples
- Error code documentation
- Performance characteristics
- Security considerations
Deployment Phase
- Monitoring setup
- Alerting configuration
- Performance baselines
- Security reviews
- Operational runbooks
Conclusion
Well-designed tools are the foundation of effective multi-agent systems. They should be reliable, secure, performant, and easy to use. Following these best practices will result in tools that agents can effectively compose to solve complex problems while maintaining system reliability and security.
#!/usr/bin/env python3
"""
Tool Schema Generator - Generate structured tool schemas for AI agents
Given a description of desired tools (name, purpose, inputs, outputs), generates
structured tool schemas compatible with OpenAI function calling format and
Anthropic tool use format. Includes: input validation rules, error response
formats, example calls, rate limit suggestions.
Input: tool descriptions JSON
Output: tool schemas (OpenAI + Anthropic format) + validation rules + example usage
"""
import json
import argparse
import sys
import re
from typing import Dict, List, Any, Optional, Union, Tuple
from dataclasses import dataclass, asdict
from enum import Enum
class ParameterType(Enum):
"""Parameter types for tool schemas"""
STRING = "string"
INTEGER = "integer"
NUMBER = "number"
BOOLEAN = "boolean"
ARRAY = "array"
OBJECT = "object"
NULL = "null"
class ValidationRule(Enum):
"""Validation rule types"""
REQUIRED = "required"
MIN_LENGTH = "min_length"
MAX_LENGTH = "max_length"
PATTERN = "pattern"
ENUM = "enum"
MINIMUM = "minimum"
MAXIMUM = "maximum"
MIN_ITEMS = "min_items"
MAX_ITEMS = "max_items"
UNIQUE_ITEMS = "unique_items"
FORMAT = "format"
@dataclass
class ParameterSpec:
"""Parameter specification for tool inputs/outputs"""
name: str
type: ParameterType
description: str
required: bool = False
default: Any = None
validation_rules: Dict[str, Any] = None
examples: List[Any] = None
deprecated: bool = False
@dataclass
class ErrorSpec:
"""Error specification for tool responses"""
error_code: str
error_message: str
http_status: int
retry_after: Optional[int] = None
details: Dict[str, Any] = None
@dataclass
class RateLimitSpec:
"""Rate limiting specification"""
requests_per_minute: int
requests_per_hour: int
requests_per_day: int
burst_limit: int
cooldown_period: int
rate_limit_key: str = "user_id"
@dataclass
class ToolDescription:
"""Input tool description"""
name: str
purpose: str
category: str
inputs: List[Dict[str, Any]]
outputs: List[Dict[str, Any]]
error_conditions: List[str]
side_effects: List[str]
idempotent: bool
rate_limits: Dict[str, Any]
dependencies: List[str]
examples: List[Dict[str, Any]]
security_requirements: List[str]
@dataclass
class ToolSchema:
"""Complete tool schema with validation and examples"""
name: str
description: str
openai_schema: Dict[str, Any]
anthropic_schema: Dict[str, Any]
validation_rules: List[Dict[str, Any]]
error_responses: List[ErrorSpec]
rate_limits: RateLimitSpec
examples: List[Dict[str, Any]]
metadata: Dict[str, Any]
class ToolSchemaGenerator:
"""Generate structured tool schemas from descriptions"""
def __init__(self):
self.common_patterns = self._define_common_patterns()
self.format_validators = self._define_format_validators()
self.security_templates = self._define_security_templates()
def _define_common_patterns(self) -> Dict[str, str]:
"""Define common regex patterns for validation"""
return {
"email": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
"url": r"^https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)$",
"uuid": r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$",
"phone": r"^\+?1?[0-9]{10,15}$",
"ip_address": r"^(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$",
"date": r"^\d{4}-\d{2}-\d{2}$",
"datetime": r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d{3})?Z?$",
"slug": r"^[a-z0-9]+(?:-[a-z0-9]+)*$",
"semantic_version": r"^(?P<major>0|[1-9]\d*)\.(?P<minor>0|[1-9]\d*)\.(?P<patch>0|[1-9]\d*)(?:-(?P<prerelease>(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+(?P<buildmetadata>[0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$"
}
def _define_format_validators(self) -> Dict[str, Dict[str, Any]]:
"""Define format validators for common data types"""
return {
"email": {
"type": "string",
"format": "email",
"pattern": self.common_patterns["email"],
"min_length": 5,
"max_length": 254
},
"url": {
"type": "string",
"format": "uri",
"pattern": self.common_patterns["url"],
"min_length": 7,
"max_length": 2048
},
"uuid": {
"type": "string",
"format": "uuid",
"pattern": self.common_patterns["uuid"],
"min_length": 36,
"max_length": 36
},
"date": {
"type": "string",
"format": "date",
"pattern": self.common_patterns["date"],
"min_length": 10,
"max_length": 10
},
"datetime": {
"type": "string",
"format": "date-time",
"pattern": self.common_patterns["datetime"],
"min_length": 19,
"max_length": 30
},
"password": {
"type": "string",
"min_length": 8,
"max_length": 128,
"pattern": r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]"
}
}
def _define_security_templates(self) -> Dict[str, Dict[str, Any]]:
"""Define security requirement templates"""
return {
"authentication_required": {
"requires_auth": True,
"auth_methods": ["bearer_token", "api_key"],
"scope_required": ["read", "write"]
},
"rate_limited": {
"rate_limits": {
"requests_per_minute": 60,
"requests_per_hour": 1000,
"burst_limit": 10
}
},
"input_sanitization": {
"sanitize_html": True,
"validate_sql_injection": True,
"escape_special_chars": True
},
"output_validation": {
"validate_response_schema": True,
"filter_sensitive_data": True,
"content_type_validation": True
}
}
def parse_tool_description(self, description: ToolDescription) -> ParameterSpec:
"""Parse tool description into structured parameters"""
input_params = []
output_params = []
# Parse input parameters
for input_spec in description.inputs:
param = self._parse_parameter_spec(input_spec)
input_params.append(param)
# Parse output parameters
for output_spec in description.outputs:
param = self._parse_parameter_spec(output_spec)
output_params.append(param)
return input_params, output_params
def _parse_parameter_spec(self, param_spec: Dict[str, Any]) -> ParameterSpec:
"""Parse individual parameter specification"""
name = param_spec.get("name", "")
type_str = param_spec.get("type", "string")
description = param_spec.get("description", "")
required = param_spec.get("required", False)
default = param_spec.get("default")
examples = param_spec.get("examples", [])
# Parse parameter type
param_type = self._parse_parameter_type(type_str)
# Generate validation rules
validation_rules = self._generate_validation_rules(param_spec, param_type)
return ParameterSpec(
name=name,
type=param_type,
description=description,
required=required,
default=default,
validation_rules=validation_rules,
examples=examples
)
def _parse_parameter_type(self, type_str: str) -> ParameterType:
"""Parse parameter type from string"""
type_mapping = {
"str": ParameterType.STRING,
"string": ParameterType.STRING,
"text": ParameterType.STRING,
"int": ParameterType.INTEGER,
"integer": ParameterType.INTEGER,
"float": ParameterType.NUMBER,
"number": ParameterType.NUMBER,
"bool": ParameterType.BOOLEAN,
"boolean": ParameterType.BOOLEAN,
"list": ParameterType.ARRAY,
"array": ParameterType.ARRAY,
"dict": ParameterType.OBJECT,
"object": ParameterType.OBJECT,
"null": ParameterType.NULL,
"none": ParameterType.NULL
}
return type_mapping.get(type_str.lower(), ParameterType.STRING)
def _generate_validation_rules(self, param_spec: Dict[str, Any], param_type: ParameterType) -> Dict[str, Any]:
"""Generate validation rules for a parameter"""
rules = {}
# Type-specific validation
if param_type == ParameterType.STRING:
rules.update(self._generate_string_validation(param_spec))
elif param_type == ParameterType.INTEGER:
rules.update(self._generate_integer_validation(param_spec))
elif param_type == ParameterType.NUMBER:
rules.update(self._generate_number_validation(param_spec))
elif param_type == ParameterType.ARRAY:
rules.update(self._generate_array_validation(param_spec))
elif param_type == ParameterType.OBJECT:
rules.update(self._generate_object_validation(param_spec))
# Common validation rules
if param_spec.get("required", False):
rules["required"] = True
if "enum" in param_spec:
rules["enum"] = param_spec["enum"]
if "pattern" in param_spec:
rules["pattern"] = param_spec["pattern"]
elif self._detect_format(param_spec.get("name", ""), param_spec.get("description", "")):
format_name = self._detect_format(param_spec.get("name", ""), param_spec.get("description", ""))
if format_name in self.format_validators:
rules.update(self.format_validators[format_name])
return rules
def _generate_string_validation(self, param_spec: Dict[str, Any]) -> Dict[str, Any]:
"""Generate string-specific validation rules"""
rules = {}
if "min_length" in param_spec:
rules["minLength"] = param_spec["min_length"]
elif "min_len" in param_spec:
rules["minLength"] = param_spec["min_len"]
else:
# Infer from description
desc = param_spec.get("description", "").lower()
if "password" in desc:
rules["minLength"] = 8
elif "email" in desc:
rules["minLength"] = 5
elif "name" in desc:
rules["minLength"] = 1
if "max_length" in param_spec:
rules["maxLength"] = param_spec["max_length"]
elif "max_len" in param_spec:
rules["maxLength"] = param_spec["max_len"]
else:
# Reasonable defaults
desc = param_spec.get("description", "").lower()
if "password" in desc:
rules["maxLength"] = 128
elif "email" in desc:
rules["maxLength"] = 254
elif "description" in desc or "content" in desc:
rules["maxLength"] = 10000
elif "name" in desc or "title" in desc:
rules["maxLength"] = 255
else:
rules["maxLength"] = 1000
return rules
def _generate_integer_validation(self, param_spec: Dict[str, Any]) -> Dict[str, Any]:
"""Generate integer-specific validation rules"""
rules = {}
if "minimum" in param_spec:
rules["minimum"] = param_spec["minimum"]
elif "min" in param_spec:
rules["minimum"] = param_spec["min"]
else:
# Infer from context
name = param_spec.get("name", "").lower()
desc = param_spec.get("description", "").lower()
if any(word in name + desc for word in ["count", "quantity", "amount", "size", "limit"]):
rules["minimum"] = 0
elif "page" in name + desc:
rules["minimum"] = 1
elif "port" in name + desc:
rules["minimum"] = 1
rules["maximum"] = 65535
if "maximum" in param_spec:
rules["maximum"] = param_spec["maximum"]
elif "max" in param_spec:
rules["maximum"] = param_spec["max"]
return rules
def _generate_number_validation(self, param_spec: Dict[str, Any]) -> Dict[str, Any]:
"""Generate number-specific validation rules"""
rules = {}
if "minimum" in param_spec:
rules["minimum"] = param_spec["minimum"]
if "maximum" in param_spec:
rules["maximum"] = param_spec["maximum"]
if "exclusive_minimum" in param_spec:
rules["exclusiveMinimum"] = param_spec["exclusive_minimum"]
if "exclusive_maximum" in param_spec:
rules["exclusiveMaximum"] = param_spec["exclusive_maximum"]
if "multiple_of" in param_spec:
rules["multipleOf"] = param_spec["multiple_of"]
return rules
def _generate_array_validation(self, param_spec: Dict[str, Any]) -> Dict[str, Any]:
"""Generate array-specific validation rules"""
rules = {}
if "min_items" in param_spec:
rules["minItems"] = param_spec["min_items"]
elif "min_length" in param_spec:
rules["minItems"] = param_spec["min_length"]
else:
rules["minItems"] = 0
if "max_items" in param_spec:
rules["maxItems"] = param_spec["max_items"]
elif "max_length" in param_spec:
rules["maxItems"] = param_spec["max_length"]
else:
rules["maxItems"] = 1000 # Reasonable default
if param_spec.get("unique_items", False):
rules["uniqueItems"] = True
if "item_type" in param_spec:
rules["items"] = {"type": param_spec["item_type"]}
return rules
def _generate_object_validation(self, param_spec: Dict[str, Any]) -> Dict[str, Any]:
"""Generate object-specific validation rules"""
rules = {}
if "properties" in param_spec:
rules["properties"] = param_spec["properties"]
if "required_properties" in param_spec:
rules["required"] = param_spec["required_properties"]
if "additional_properties" in param_spec:
rules["additionalProperties"] = param_spec["additional_properties"]
else:
rules["additionalProperties"] = False
if "min_properties" in param_spec:
rules["minProperties"] = param_spec["min_properties"]
if "max_properties" in param_spec:
rules["maxProperties"] = param_spec["max_properties"]
return rules
def _detect_format(self, name: str, description: str) -> Optional[str]:
"""Detect parameter format from name and description"""
combined = (name + " " + description).lower()
format_indicators = {
"email": ["email", "e-mail", "email_address"],
"url": ["url", "uri", "link", "website", "endpoint"],
"uuid": ["uuid", "guid", "identifier", "id"],
"date": ["date", "birthday", "created_date", "modified_date"],
"datetime": ["datetime", "timestamp", "created_at", "updated_at"],
"password": ["password", "secret", "token", "api_key"]
}
for format_name, indicators in format_indicators.items():
if any(indicator in combined for indicator in indicators):
return format_name
return None
def generate_openai_schema(self, description: ToolDescription, input_params: List[ParameterSpec]) -> Dict[str, Any]:
"""Generate OpenAI function calling schema"""
properties = {}
required = []
for param in input_params:
prop_def = {
"type": param.type.value,
"description": param.description
}
# Add validation rules
if param.validation_rules:
prop_def.update(param.validation_rules)
# Add examples
if param.examples:
prop_def["examples"] = param.examples
# Add default value
if param.default is not None:
prop_def["default"] = param.default
properties[param.name] = prop_def
if param.required:
required.append(param.name)
schema = {
"name": description.name,
"description": description.purpose,
"parameters": {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False
}
}
return schema
def generate_anthropic_schema(self, description: ToolDescription, input_params: List[ParameterSpec]) -> Dict[str, Any]:
"""Generate Anthropic tool use schema"""
input_schema = {
"type": "object",
"properties": {},
"required": []
}
for param in input_params:
prop_def = {
"type": param.type.value,
"description": param.description
}
# Add validation rules (Anthropic uses subset of JSON Schema)
if param.validation_rules:
# Filter to supported validation rules
supported_rules = ["minLength", "maxLength", "minimum", "maximum", "pattern", "enum", "items"]
for rule, value in param.validation_rules.items():
if rule in supported_rules:
prop_def[rule] = value
input_schema["properties"][param.name] = prop_def
if param.required:
input_schema["required"].append(param.name)
schema = {
"name": description.name,
"description": description.purpose,
"input_schema": input_schema
}
return schema
def generate_error_responses(self, description: ToolDescription) -> List[ErrorSpec]:
"""Generate error response specifications"""
error_specs = []
# Common errors
common_errors = [
{
"error_code": "invalid_input",
"error_message": "Invalid input parameters provided",
"http_status": 400,
"details": {"validation_errors": []}
},
{
"error_code": "authentication_required",
"error_message": "Authentication required to access this tool",
"http_status": 401
},
{
"error_code": "insufficient_permissions",
"error_message": "Insufficient permissions to perform this operation",
"http_status": 403
},
{
"error_code": "rate_limit_exceeded",
"error_message": "Rate limit exceeded. Please try again later",
"http_status": 429,
"retry_after": 60
},
{
"error_code": "internal_error",
"error_message": "Internal server error occurred",
"http_status": 500
},
{
"error_code": "service_unavailable",
"error_message": "Service temporarily unavailable",
"http_status": 503,
"retry_after": 300
}
]
# Add common errors
for error in common_errors:
error_specs.append(ErrorSpec(**error))
# Add tool-specific errors based on error conditions
for condition in description.error_conditions:
if "not found" in condition.lower():
error_specs.append(ErrorSpec(
error_code="resource_not_found",
error_message=f"Requested resource not found: {condition}",
http_status=404
))
elif "timeout" in condition.lower():
error_specs.append(ErrorSpec(
error_code="operation_timeout",
error_message=f"Operation timed out: {condition}",
http_status=408,
retry_after=30
))
elif "quota" in condition.lower() or "limit" in condition.lower():
error_specs.append(ErrorSpec(
error_code="quota_exceeded",
error_message=f"Quota or limit exceeded: {condition}",
http_status=429,
retry_after=3600
))
elif "dependency" in condition.lower():
error_specs.append(ErrorSpec(
error_code="dependency_failure",
error_message=f"Dependency service failure: {condition}",
http_status=502
))
return error_specs
def generate_rate_limits(self, description: ToolDescription) -> RateLimitSpec:
"""Generate rate limiting specification"""
rate_limits = description.rate_limits
# Default rate limits based on tool category
defaults = {
"search": {"rpm": 60, "rph": 1000, "rpd": 10000, "burst": 10},
"data": {"rpm": 30, "rph": 500, "rpd": 5000, "burst": 5},
"api": {"rpm": 100, "rph": 2000, "rpd": 20000, "burst": 20},
"file": {"rpm": 120, "rph": 3000, "rpd": 30000, "burst": 30},
"compute": {"rpm": 10, "rph": 100, "rpd": 1000, "burst": 3},
"communication": {"rpm": 30, "rph": 300, "rpd": 3000, "burst": 5}
}
category_defaults = defaults.get(description.category.lower(), defaults["api"])
return RateLimitSpec(
requests_per_minute=rate_limits.get("requests_per_minute", category_defaults["rpm"]),
requests_per_hour=rate_limits.get("requests_per_hour", category_defaults["rph"]),
requests_per_day=rate_limits.get("requests_per_day", category_defaults["rpd"]),
burst_limit=rate_limits.get("burst_limit", category_defaults["burst"]),
cooldown_period=rate_limits.get("cooldown_period", 60),
rate_limit_key=rate_limits.get("rate_limit_key", "user_id")
)
def generate_examples(self, description: ToolDescription, input_params: List[ParameterSpec]) -> List[Dict[str, Any]]:
"""Generate usage examples"""
examples = []
# Use provided examples if available
if description.examples:
for example in description.examples:
examples.append(example)
# Generate synthetic examples
if len(examples) == 0:
synthetic_example = self._generate_synthetic_example(description, input_params)
if synthetic_example:
examples.append(synthetic_example)
# Ensure we have multiple examples showing different scenarios
if len(examples) == 1 and len(input_params) > 1:
# Generate minimal example
minimal_example = self._generate_minimal_example(description, input_params)
if minimal_example and minimal_example != examples[0]:
examples.append(minimal_example)
return examples
def _generate_synthetic_example(self, description: ToolDescription, input_params: List[ParameterSpec]) -> Dict[str, Any]:
"""Generate a synthetic example based on parameter specifications"""
example_input = {}
for param in input_params:
if param.examples:
example_input[param.name] = param.examples[0]
elif param.default is not None:
example_input[param.name] = param.default
else:
example_input[param.name] = self._generate_example_value(param)
# Generate expected output based on tool purpose
expected_output = self._generate_example_output(description)
return {
"description": f"Example usage of {description.name}",
"input": example_input,
"expected_output": expected_output
}
def _generate_minimal_example(self, description: ToolDescription, input_params: List[ParameterSpec]) -> Dict[str, Any]:
"""Generate minimal example with only required parameters"""
example_input = {}
for param in input_params:
if param.required:
if param.examples:
example_input[param.name] = param.examples[0]
else:
example_input[param.name] = self._generate_example_value(param)
if not example_input:
return None
expected_output = self._generate_example_output(description)
return {
"description": f"Minimal example of {description.name} with required parameters only",
"input": example_input,
"expected_output": expected_output
}
def _generate_example_value(self, param: ParameterSpec) -> Any:
"""Generate example value for a parameter"""
if param.type == ParameterType.STRING:
format_examples = {
"email": "[email protected]",
"url": "https://example.com",
"uuid": "123e4567-e89b-12d3-a456-426614174000",
"date": "2024-01-15",
"datetime": "2024-01-15T10:30:00Z"
}
# Check for format in validation rules
if param.validation_rules and "format" in param.validation_rules:
format_type = param.validation_rules["format"]
if format_type in format_examples:
return format_examples[format_type]
# Check for patterns or enum
if param.validation_rules:
if "enum" in param.validation_rules:
return param.validation_rules["enum"][0]
# Generate based on name/description
name_lower = param.name.lower()
if "name" in name_lower:
return "example_name"
elif "query" in name_lower or "search" in name_lower:
return "search query"
elif "path" in name_lower:
return "/path/to/resource"
elif "message" in name_lower:
return "Example message"
else:
return "example_value"
elif param.type == ParameterType.INTEGER:
if param.validation_rules:
min_val = param.validation_rules.get("minimum", 0)
max_val = param.validation_rules.get("maximum", 100)
return min(max(42, min_val), max_val)
return 42
elif param.type == ParameterType.NUMBER:
if param.validation_rules:
min_val = param.validation_rules.get("minimum", 0.0)
max_val = param.validation_rules.get("maximum", 100.0)
return min(max(42.5, min_val), max_val)
return 42.5
elif param.type == ParameterType.BOOLEAN:
return True
elif param.type == ParameterType.ARRAY:
return ["item1", "item2"]
elif param.type == ParameterType.OBJECT:
return {"key": "value"}
else:
return None
def _generate_example_output(self, description: ToolDescription) -> Dict[str, Any]:
"""Generate example output based on tool description"""
category = description.category.lower()
if category == "search":
return {
"results": [
{"title": "Example Result 1", "url": "https://example.com/1", "snippet": "Example snippet..."},
{"title": "Example Result 2", "url": "https://example.com/2", "snippet": "Another snippet..."}
],
"total_count": 2
}
elif category == "data":
return {
"data": [{"id": 1, "value": "example"}, {"id": 2, "value": "another"}],
"metadata": {"count": 2, "processed_at": "2024-01-15T10:30:00Z"}
}
elif category == "file":
return {
"success": True,
"file_path": "/path/to/file.txt",
"size": 1024,
"modified_at": "2024-01-15T10:30:00Z"
}
elif category == "api":
return {
"status": "success",
"data": {"result": "operation completed successfully"},
"timestamp": "2024-01-15T10:30:00Z"
}
else:
return {
"success": True,
"message": f"{description.name} executed successfully",
"result": "example result"
}
def generate_tool_schema(self, description: ToolDescription) -> ToolSchema:
"""Generate complete tool schema"""
# Parse parameters
input_params, output_params = self.parse_tool_description(description)
# Generate schemas
openai_schema = self.generate_openai_schema(description, input_params)
anthropic_schema = self.generate_anthropic_schema(description, input_params)
# Generate validation rules
validation_rules = []
for param in input_params:
if param.validation_rules:
validation_rules.append({
"parameter": param.name,
"rules": param.validation_rules
})
# Generate error responses
error_responses = self.generate_error_responses(description)
# Generate rate limits
rate_limits = self.generate_rate_limits(description)
# Generate examples
examples = self.generate_examples(description, input_params)
# Generate metadata
metadata = {
"category": description.category,
"idempotent": description.idempotent,
"side_effects": description.side_effects,
"dependencies": description.dependencies,
"security_requirements": description.security_requirements,
"generated_at": "2024-01-15T10:30:00Z",
"schema_version": "1.0",
"input_parameters": len(input_params),
"output_parameters": len(output_params),
"required_parameters": sum(1 for p in input_params if p.required),
"optional_parameters": sum(1 for p in input_params if not p.required)
}
return ToolSchema(
name=description.name,
description=description.purpose,
openai_schema=openai_schema,
anthropic_schema=anthropic_schema,
validation_rules=validation_rules,
error_responses=error_responses,
rate_limits=rate_limits,
examples=examples,
metadata=metadata
)
def main():
parser = argparse.ArgumentParser(description="Tool Schema Generator for AI Agents")
parser.add_argument("input_file", help="JSON file with tool descriptions")
parser.add_argument("-o", "--output", help="Output file prefix (default: tool_schemas)")
parser.add_argument("--format", choices=["json", "both"], default="both",
help="Output format")
parser.add_argument("--validate", action="store_true",
help="Validate generated schemas")
args = parser.parse_args()
try:
# Load tool descriptions
with open(args.input_file, 'r') as f:
tools_data = json.load(f)
# Parse tool descriptions
tool_descriptions = []
for tool_data in tools_data.get("tools", []):
tool_desc = ToolDescription(**tool_data)
tool_descriptions.append(tool_desc)
# Generate schemas
generator = ToolSchemaGenerator()
schemas = []
for description in tool_descriptions:
schema = generator.generate_tool_schema(description)
schemas.append(schema)
print(f"Generated schema for: {schema.name}")
# Prepare output
output_data = {
"tool_schemas": [asdict(schema) for schema in schemas],
"metadata": {
"generated_by": "tool_schema_generator.py",
"input_file": args.input_file,
"tool_count": len(schemas),
"generation_timestamp": "2024-01-15T10:30:00Z",
"schema_version": "1.0"
},
"validation_summary": {
"total_tools": len(schemas),
"total_parameters": sum(schema.metadata["input_parameters"] for schema in schemas),
"total_validation_rules": sum(len(schema.validation_rules) for schema in schemas),
"total_examples": sum(len(schema.examples) for schema in schemas)
}
}
# Output files
output_prefix = args.output or "tool_schemas"
if args.format in ["json", "both"]:
with open(f"{output_prefix}.json", 'w') as f:
json.dump(output_data, f, indent=2, default=str)
print(f"JSON output written to {output_prefix}.json")
if args.format == "both":
# Generate separate files for different formats
# OpenAI format
openai_schemas = {
"functions": [schema.openai_schema for schema in schemas]
}
with open(f"{output_prefix}_openai.json", 'w') as f:
json.dump(openai_schemas, f, indent=2)
print(f"OpenAI schemas written to {output_prefix}_openai.json")
# Anthropic format
anthropic_schemas = {
"tools": [schema.anthropic_schema for schema in schemas]
}
with open(f"{output_prefix}_anthropic.json", 'w') as f:
json.dump(anthropic_schemas, f, indent=2)
print(f"Anthropic schemas written to {output_prefix}_anthropic.json")
# Validation rules
validation_data = {
"validation_rules": {schema.name: schema.validation_rules for schema in schemas}
}
with open(f"{output_prefix}_validation.json", 'w') as f:
json.dump(validation_data, f, indent=2)
print(f"Validation rules written to {output_prefix}_validation.json")
# Usage examples
examples_data = {
"examples": {schema.name: schema.examples for schema in schemas}
}
with open(f"{output_prefix}_examples.json", 'w') as f:
json.dump(examples_data, f, indent=2)
print(f"Usage examples written to {output_prefix}_examples.json")
# Print summary
print(f"\nSchema Generation Summary:")
print(f"Tools processed: {len(schemas)}")
print(f"Total input parameters: {sum(schema.metadata['input_parameters'] for schema in schemas)}")
print(f"Total validation rules: {sum(len(schema.validation_rules) for schema in schemas)}")
print(f"Total examples generated: {sum(len(schema.examples) for schema in schemas)}")
# Validation if requested
if args.validate:
print("\nValidation Results:")
for schema in schemas:
validation_errors = []
# Basic validation checks
if not schema.openai_schema.get("parameters", {}).get("properties"):
validation_errors.append("Missing input parameters")
if not schema.examples:
validation_errors.append("No usage examples")
if not schema.validation_rules:
validation_errors.append("No validation rules defined")
if validation_errors:
print(f" {schema.name}: {', '.join(validation_errors)}")
else:
print(f" {schema.name}: ✓ Valid")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main() Install this Skill
Skills give your AI agent a consistent, structured approach to this task — better output than a one-off prompt.
npx skills add alirezarezvani/claude-skills --skill engineering/agent-designer Community skill by @alirezarezvani. Need a walkthrough? See the install guide →
Works with
Prefer no terminal? Download the ZIP and place it manually.
Details
- Category
- Development
- License
- MIT
- Author
- @alirezarezvani
- Source
- GitHub →
- Source file
-
show path
engineering/agent-designer/SKILL.md
People who install this also use
RAG Architect
Design and build Retrieval-Augmented Generation systems — chunking strategies, embedding selection, vector store setup, and query pipeline optimization.
@alirezarezvani
MCP Server Builder
Create Model Context Protocol servers from scratch — define tools, resources, and prompts, then wire up to external APIs or local services.
@alirezarezvani
Senior Software Architect
Design system architecture with C4 and sequence diagrams, write Architecture Decision Records, evaluate tech stacks, and guide architectural trade-offs.
@alirezarezvani