As AI applications scale, managing traffic across multiple models and providers becomes critical. Intelligent routing and load balancing ensure optimal performance, cost efficiency, and reliability by distributing requests based on various factors like cost, performance, availability, and business requirements.
Managed Routing Solution
Tetrate Agent Router Service offers intelligent routing for GenAI developers with advanced load balancing, automatic failover, and cross-provider monitoring. This managed service handles the complexity of routing decisions, health checking, and traffic distribution across multiple AI providers.
- Advanced load balancing algorithms
- Automatic failover and health checking
- Cross-provider traffic distribution
- Intelligent routing decisions
The Need for Intelligent Routing
Modern AI applications face several challenges that require sophisticated routing:
- Provider Diversity: Multiple AI providers with different capabilities and pricing
- Performance Variability: Different models have varying response times and quality
- Cost Optimization: Need to balance performance with cost considerations
- Reliability Requirements: Automatic failover when providers are unavailable
- Traffic Management: Handle varying load patterns and request volumes
:::tip Routing Benefits Intelligent AI model routing can improve response times by 40%, reduce costs by 30%, and increase reliability to 99.9% uptime. :::
Core Routing Strategies
1. Cost-Based Routing
Route requests to the most cost-effective provider while maintaining quality requirements.
from typing import Dict, List, Optional
import asyncio
import time
class CostBasedRouter:
"""Route requests based on cost optimization"""
def __init__(self):
self.provider_costs = {
'openai': {
'gpt-4': 0.03,
'gpt-3.5-turbo': 0.002,
'gpt-4-turbo': 0.01
},
'anthropic': {
'claude-3-opus': 0.015,
'claude-3-sonnet': 0.003,
'claude-3-haiku': 0.00025
},
'together': {
'llama-2-7b': 0.0003,
'llama-2-13b': 0.0006,
'mistral-7b': 0.0004
}
}
self.quality_thresholds = {
'high': ['gpt-4', 'claude-3-opus', 'llama-2-13b'],
'medium': ['gpt-3.5-turbo', 'claude-3-sonnet', 'mistral-7b'],
'low': ['claude-3-haiku', 'llama-2-7b']
}
def select_provider(self, task_complexity: str, budget: float, prompt: str) -> tuple[str, str]:
"""Select the most cost-effective provider for the task"""
# Estimate tokens for cost calculation
estimated_tokens = len(prompt.split()) * 1.3
# Get eligible models based on quality requirement
eligible_models = self.quality_thresholds.get(task_complexity, self.quality_thresholds['medium'])
best_provider = None
best_model = None
lowest_cost = float('inf')
# Find the cheapest option that meets quality requirements
for provider, models in self.provider_costs.items():
for model, cost_per_1k in models.items():
if model in eligible_models:
total_cost = (estimated_tokens / 1000) * cost_per_1k
if total_cost <= budget and total_cost < lowest_cost:
lowest_cost = total_cost
best_provider = provider
best_model = model
if best_provider is None:
# Fallback to cheapest available option
for provider, models in self.provider_costs.items():
for model, cost_per_1k in models.items():
total_cost = (estimated_tokens / 1000) * cost_per_1k
if total_cost < lowest_cost:
lowest_cost = total_cost
best_provider = provider
best_model = model
return best_provider, best_model
def get_cost_estimate(self, provider: str, model: str, prompt: str) -> float:
"""Get cost estimate for a specific provider and model"""
if provider not in self.provider_costs or model not in self.provider_costs[provider]:
return float('inf')
estimated_tokens = len(prompt.split()) * 1.3
cost_per_1k = self.provider_costs[provider][model]
return (estimated_tokens / 1000) * cost_per_1k
2. Performance-Based Routing
Route requests based on historical performance metrics and current health status.
import statistics
from collections import defaultdict
class PerformanceBasedRouter:
"""Route requests based on performance metrics"""
def __init__(self):
self.performance_metrics = defaultdict(list)
self.health_status = {}
self.response_times = defaultdict(list)
self.error_rates = defaultdict(float)
self.success_rates = defaultdict(float)
def update_metrics(self, provider: str, model: str, response_time: float, success: bool):
"""Update performance metrics for a provider/model combination"""
key = f"{provider}:{model}"
# Update response times
self.response_times[key].append(response_time)
if len(self.response_times[key]) > 100: # Keep last 100 measurements
self.response_times[key].pop(0)
# Update success/error rates
if key not in self.performance_metrics:
self.performance_metrics[key] = {'success': 0, 'total': 0}
self.performance_metrics[key]['total'] += 1
if success:
self.performance_metrics[key]['success'] += 1
self.success_rates[key] = (
self.performance_metrics[key]['success'] /
self.performance_metrics[key]['total']
)
self.error_rates[key] = 1 - self.success_rates[key]
def get_performance_score(self, provider: str, model: str) -> float:
"""Calculate performance score for a provider/model combination"""
key = f"{provider}:{model}"
if key not in self.response_times or not self.response_times[key]:
return 0.5 # Default score for unknown providers
# Calculate average response time (lower is better)
avg_response_time = statistics.mean(self.response_times[key])
# Normalize response time (0-1 scale, lower is better)
normalized_response_time = min(avg_response_time / 10.0, 1.0) # Cap at 10 seconds
# Get success rate
success_rate = self.success_rates.get(key, 0.5)
# Calculate performance score (higher is better)
# Weight: 60% success rate, 40% response time
performance_score = (success_rate * 0.6) + ((1 - normalized_response_time) * 0.4)
return performance_score
def select_best_performer(self, available_providers: List[str], model_requirements: List[str]) -> tuple[str, str]:
"""Select the best performing provider/model combination"""
best_score = 0
best_provider = None
best_model = None
for provider in available_providers:
for model in model_requirements:
score = self.get_performance_score(provider, model)
if score > best_score:
best_score = score
best_provider = provider
best_model = model
return best_provider, best_model
def get_health_status(self, provider: str) -> bool:
"""Get current health status of a provider"""
return self.health_status.get(provider, True)
def mark_provider_unhealthy(self, provider: str):
"""Mark a provider as unhealthy"""
self.health_status[provider] = False
def mark_provider_healthy(self, provider: str):
"""Mark a provider as healthy"""
self.health_status[provider] = True
3. Load Balancing Strategies
Implement various load balancing algorithms for distributing requests.
import random
from enum import Enum
class LoadBalancingStrategy(Enum):
ROUND_ROBIN = "round_robin"
LEAST_CONNECTIONS = "least_connections"
WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
LEAST_RESPONSE_TIME = "least_response_time"
IP_HASH = "ip_hash"
class LoadBalancer:
"""Load balancer for AI model requests"""
def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN):
self.strategy = strategy
self.providers = []
self.current_index = 0
self.connection_counts = defaultdict(int)
self.response_times = defaultdict(list)
self.weights = {}
def add_provider(self, provider: str, weight: int = 1):
"""Add a provider to the load balancer"""
self.providers.append(provider)
self.weights[provider] = weight
def remove_provider(self, provider: str):
"""Remove a provider from the load balancer"""
if provider in self.providers:
self.providers.remove(provider)
if provider in self.weights:
del self.weights[provider]
def select_provider(self, client_ip: str = None) -> Optional[str]:
"""Select a provider based on the load balancing strategy"""
if not self.providers:
return None
if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
return self._round_robin()
elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
return self._least_connections()
elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
return self._weighted_round_robin()
elif self.strategy == LoadBalancingStrategy.LEAST_RESPONSE_TIME:
return self._least_response_time()
elif self.strategy == LoadBalancingStrategy.IP_HASH:
return self._ip_hash(client_ip)
return self.providers[0] # Default fallback
def _round_robin(self) -> str:
"""Round-robin selection"""
provider = self.providers[self.current_index]
self.current_index = (self.current_index + 1) % len(self.providers)
return provider
def _least_connections(self) -> str:
"""Select provider with least active connections"""
return min(self.providers, key=lambda p: self.connection_counts[p])
def _weighted_round_robin(self) -> str:
"""Weighted round-robin selection"""
# Create weighted list
weighted_providers = []
for provider in self.providers:
weight = self.weights.get(provider, 1)
weighted_providers.extend([provider] * weight)
# Select using round-robin on weighted list
provider = weighted_providers[self.current_index % len(weighted_providers)]
self.current_index += 1
return provider
def _least_response_time(self) -> str:
"""Select provider with lowest average response time"""
if not any(self.response_times.values()):
return random.choice(self.providers)
avg_response_times = {}
for provider in self.providers:
times = self.response_times[provider]
avg_response_times[provider] = sum(times) / len(times) if times else float('inf')
return min(avg_response_times, key=avg_response_times.get)
def _ip_hash(self, client_ip: str) -> str:
"""Select provider based on client IP hash"""
if not client_ip:
return random.choice(self.providers)
hash_value = hash(client_ip)
return self.providers[hash_value % len(self.providers)]
def record_request(self, provider: str, response_time: float):
"""Record request metrics for load balancing decisions"""
self.connection_counts[provider] += 1
self.response_times[provider].append(response_time)
# Keep only recent response times
if len(self.response_times[provider]) > 50:
self.response_times[provider].pop(0)
def record_response(self, provider: str):
"""Record response completion"""
self.connection_counts[provider] = max(0, self.connection_counts[provider] - 1)
Health Checking and Failover
Implement robust health checking and automatic failover mechanisms.
import asyncio
import aiohttp
from datetime import datetime, timedelta
class HealthChecker:
"""Monitor health of AI providers"""
def __init__(self, check_interval: int = 30):
self.check_interval = check_interval
self.health_status = {}
self.last_check = {}
self.failure_counts = defaultdict(int)
self.recovery_threshold = 3
self.failure_threshold = 2
async def start_monitoring(self, providers: Dict[str, str]):
"""Start health monitoring for all providers"""
self.providers = providers
while True:
await self._check_all_providers()
await asyncio.sleep(self.check_interval)
async def _check_all_providers(self):
"""Check health of all providers"""
tasks = []
for provider_name, health_endpoint in self.providers.items():
task = asyncio.create_task(self._check_provider_health(provider_name, health_endpoint))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def _check_provider_health(self, provider_name: str, health_endpoint: str):
"""Check health of a specific provider"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(health_endpoint, timeout=5) as response:
is_healthy = response.status == 200
if is_healthy:
self._mark_healthy(provider_name)
else:
self._mark_unhealthy(provider_name)
except Exception as e:
self._mark_unhealthy(provider_name)
print(f"Health check failed for {provider_name}: {e}")
def _mark_healthy(self, provider_name: str):
"""Mark provider as healthy"""
was_unhealthy = not self.health_status.get(provider_name, True)
if was_unhealthy:
self.failure_counts[provider_name] = 0
print(f"β
{provider_name} recovered")
self.health_status[provider_name] = True
self.last_check[provider_name] = datetime.now()
def _mark_unhealthy(self, provider_name: str):
"""Mark provider as unhealthy"""
self.failure_counts[provider_name] += 1
if self.failure_counts[provider_name] >= self.failure_threshold:
self.health_status[provider_name] = False
print(f"β {provider_name} marked as unhealthy")
def is_healthy(self, provider_name: str) -> bool:
"""Check if provider is currently healthy"""
return self.health_status.get(provider_name, True)
def get_healthy_providers(self) -> List[str]:
"""Get list of currently healthy providers"""
return [name for name, healthy in self.health_status.items() if healthy]
class FailoverManager:
"""Manage failover between providers"""
def __init__(self, primary_providers: List[str], fallback_providers: List[str]):
self.primary_providers = primary_providers
self.fallback_providers = fallback_providers
self.health_checker = HealthChecker()
self.current_provider = primary_providers[0] if primary_providers else None
async def get_available_provider(self, task_requirements: Dict) -> str:
"""Get an available provider for the task"""
# Try primary providers first
for provider in self.primary_providers:
if self.health_checker.is_healthy(provider):
if self._meets_requirements(provider, task_requirements):
return provider
# Fallback to secondary providers
for provider in self.fallback_providers:
if self.health_checker.is_healthy(provider):
if self._meets_requirements(provider, task_requirements):
return provider
# If no healthy providers, return the least unhealthy one
return self._get_least_unhealthy_provider()
def _meets_requirements(self, provider: str, requirements: Dict) -> bool:
"""Check if provider meets task requirements"""
# Implement requirement checking logic
# e.g., model availability, cost constraints, etc.
return True
def _get_least_unhealthy_provider(self) -> str:
"""Get the provider with the least failures"""
if not self.primary_providers and not self.fallback_providers:
raise Exception("No providers available")
all_providers = self.primary_providers + self.fallback_providers
return min(all_providers, key=lambda p: self.health_checker.failure_counts[p])
async def execute_with_failover(self, task_func, *args, **kwargs):
"""Execute a task with automatic failover"""
providers_to_try = self.primary_providers + self.fallback_providers
last_exception = None
for provider in providers_to_try:
if not self.health_checker.is_healthy(provider):
continue
try:
# Set the provider context
kwargs['provider'] = provider
result = await task_func(*args, **kwargs)
return result
except Exception as e:
last_exception = e
self.health_checker._mark_unhealthy(provider)
print(f"Provider {provider} failed: {e}")
continue
# If all providers failed, raise the last exception
raise last_exception or Exception("All providers failed")
Advanced Routing Features
1. Intelligent Request Classification
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import numpy as np
class RequestClassifier:
"""Classify requests to determine optimal routing"""
def __init__(self):
self.vectorizer = TfidfVectorizer(max_features=1000)
self.classifier = KMeans(n_clusters=5)
self.request_patterns = {
'summarization': ['summarize', 'summary', 'brief', 'overview'],
'classification': ['classify', 'categorize', 'identify', 'detect'],
'generation': ['generate', 'create', 'write', 'compose'],
'analysis': ['analyze', 'examine', 'study', 'investigate'],
'translation': ['translate', 'convert', 'interpret']
}
def classify_request(self, prompt: str) -> Dict[str, float]:
"""Classify a request and return confidence scores"""
# Simple keyword-based classification
prompt_lower = prompt.lower()
scores = {}
for category, keywords in self.request_patterns.items():
score = sum(1 for keyword in keywords if keyword in prompt_lower)
scores[category] = score / len(keywords)
return scores
def get_optimal_provider(self, prompt: str, available_providers: List[str]) -> str:
"""Get optimal provider based on request classification"""
classification = self.classify_request(prompt)
primary_category = max(classification, key=classification.get)
# Provider capabilities mapping
provider_capabilities = {
'openai': ['generation', 'analysis', 'classification'],
'anthropic': ['analysis', 'generation', 'summarization'],
'together': ['generation', 'translation']
}
# Find providers that excel at this category
suitable_providers = [
provider for provider in available_providers
if primary_category in provider_capabilities.get(provider, [])
]
if suitable_providers:
return suitable_providers[0]
return available_providers[0] if available_providers else None
2. Dynamic Load Balancing
class DynamicLoadBalancer(LoadBalancer):
"""Dynamic load balancer that adapts to changing conditions"""
def __init__(self):
super().__init__(LoadBalancingStrategy.LEAST_RESPONSE_TIME)
self.adaptive_weights = defaultdict(lambda: 1.0)
self.performance_history = defaultdict(list)
self.adaptation_rate = 0.1
def adapt_weights(self):
"""Adapt provider weights based on performance"""
for provider in self.providers:
if provider in self.response_times and self.response_times[provider]:
# Calculate performance score
avg_response_time = statistics.mean(self.response_times[provider])
success_rate = self.success_rates.get(provider, 0.5)
# Performance score (higher is better)
performance_score = success_rate / (1 + avg_response_time)
# Update adaptive weight
current_weight = self.adaptive_weights[provider]
target_weight = performance_score
new_weight = current_weight + self.adaptation_rate * (target_weight - current_weight)
self.adaptive_weights[provider] = max(0.1, new_weight)
# Update provider weight
self.weights[provider] = int(new_weight * 10)
def select_provider(self, client_ip: str = None) -> Optional[str]:
"""Select provider with adaptive weights"""
# Adapt weights periodically
if random.random() < 0.1: # 10% chance to adapt
self.adapt_weights()
return super().select_provider(client_ip)
Implementation Example
Hereβs a complete example of implementing intelligent routing and load balancing:
async def setup_intelligent_routing():
"""Set up complete intelligent routing system"""
# Initialize components
cost_router = CostBasedRouter()
performance_router = PerformanceBasedRouter()
load_balancer = DynamicLoadBalancer()
health_checker = HealthChecker()
failover_manager = FailoverManager(
primary_providers=['openai', 'anthropic'],
fallback_providers=['together', 'google']
)
# Add providers to load balancer
load_balancer.add_provider('openai', weight=3)
load_balancer.add_provider('anthropic', weight=2)
load_balancer.add_provider('together', weight=1)
# Set up health checking
health_endpoints = {
'openai': 'https://api.openai.com/v1/models',
'anthropic': 'https://api.anthropic.com/v1/models',
'together': 'https://api.together.xyz/v1/models'
}
# Start health monitoring
asyncio.create_task(health_checker.start_monitoring(health_endpoints))
return {
'cost_router': cost_router,
'performance_router': performance_router,
'load_balancer': load_balancer,
'health_checker': health_checker,
'failover_manager': failover_manager
}
async def route_request(prompt: str, task_complexity: str = 'medium', budget: float = 0.01):
"""Route a request using intelligent routing"""
routing_system = await setup_intelligent_routing()
# Get healthy providers
healthy_providers = routing_system['health_checker'].get_healthy_providers()
if not healthy_providers:
raise Exception("No healthy providers available")
# Select provider based on multiple factors
provider = None
# 1. Try cost-based routing first
cost_provider, cost_model = routing_system['cost_router'].select_provider(
task_complexity, budget, prompt
)
if cost_provider in healthy_providers:
provider = cost_provider
# 2. Fallback to performance-based routing
if not provider:
perf_provider, perf_model = routing_system['performance_router'].select_best_performer(
healthy_providers, ['gpt-3.5-turbo', 'claude-3-haiku']
)
provider = perf_provider
# 3. Final fallback to load balancer
if not provider:
provider = routing_system['load_balancer'].select_provider()
# Execute request with failover
async def execute_request():
# Simulate API call
await asyncio.sleep(0.1)
return f"Response from {provider}"
result = await routing_system['failover_manager'].execute_with_failover(
execute_request
)
return result
## Usage example
```python
async def main():
prompts = [
"Summarize the benefits of AI routing",
"Classify this text as positive or negative",
"Generate a creative story about AI"
]
for prompt in prompts:
try:
result = await route_request(prompt, 'medium', 0.01)
print(f"β
{result}")
except Exception as e:
print(f"β Error: {e}")
if __name__ == "__main__":
asyncio.run(main())
Best Practices
1. Monitoring and Metrics
class RoutingMetrics:
"""Track routing performance metrics"""
def __init__(self):
self.metrics = {
'requests_routed': 0,
'failures': 0,
'average_response_time': 0,
'cost_per_request': 0,
'provider_utilization': defaultdict(int)
}
def record_request(self, provider: str, response_time: float, cost: float, success: bool):
"""Record routing metrics"""
self.metrics['requests_routed'] += 1
self.metrics['provider_utilization'][provider] += 1
if not success:
self.metrics['failures'] += 1
# Update averages
total_requests = self.metrics['requests_routed']
current_avg = self.metrics['average_response_time']
self.metrics['average_response_time'] = (
(current_avg * (total_requests - 1) + response_time) / total_requests
)
current_cost_avg = self.metrics['cost_per_request']
self.metrics['cost_per_request'] = (
(current_cost_avg * (total_requests - 1) + cost) / total_requests
)
def get_routing_report(self) -> Dict:
"""Generate routing performance report"""
total_requests = self.metrics['requests_routed']
success_rate = (total_requests - self.metrics['failures']) / max(total_requests, 1)
return {
'total_requests': total_requests,
'success_rate': success_rate,
'average_response_time': self.metrics['average_response_time'],
'average_cost': self.metrics['cost_per_request'],
'provider_utilization': dict(self.metrics['provider_utilization'])
}
2. Configuration Management
# routing_config.yaml
routing:
strategy: "hybrid" # cost, performance, load_balance, hybrid
health_check_interval: 30
failover_threshold: 2
recovery_threshold: 3
providers:
openai:
weight: 3
cost_multiplier: 1.0
performance_bonus: 0.1
models: ["gpt-4", "gpt-3.5-turbo"]
anthropic:
weight: 2
cost_multiplier: 0.8
performance_bonus: 0.05
models: ["claude-3-opus", "claude-3-sonnet"]
together:
weight: 1
cost_multiplier: 0.3
performance_bonus: 0.0
models: ["llama-2-7b", "mistral-7b"]
load_balancing:
algorithm: "weighted_round_robin"
adaptation_rate: 0.1
history_size: 100
monitoring:
enabled: true
metrics_retention_days: 30
alert_threshold: 0.95
Conclusion
Intelligent AI model routing and load balancing provide significant benefits:
- Improved Performance: Faster response times through optimal provider selection
- Cost Optimization: Reduced costs by routing to cost-effective providers
- Enhanced Reliability: Automatic failover and health monitoring
- Better Scalability: Handle increased load through load balancing
- Operational Efficiency: Reduced manual intervention and monitoring
:::tip Implementation Priority Start with simple cost-based routing, then add health checking and load balancing as your needs grow. :::
The key to success is implementing routing incrementally, starting with basic strategies and gradually adding more sophisticated features based on your specific requirements and performance metrics.