AI Model Routing and Load Balancing: Intelligent Request Distribution

Master AI model routing and load balancing strategies. Learn intelligent routing algorithms, health checking, failover mechanisms, and performance optimization techniques.

As AI applications scale, managing traffic across multiple models and providers becomes critical. Intelligent routing and load balancing ensure optimal performance, cost efficiency, and reliability by distributing requests based on various factors like cost, performance, availability, and business requirements.

πŸš€

Managed Routing Solution

Tetrate Agent Router Service offers intelligent routing for GenAI developers with advanced load balancing, automatic failover, and cross-provider monitoring. This managed service handles the complexity of routing decisions, health checking, and traffic distribution across multiple AI providers.

  • Advanced load balancing algorithms
  • Automatic failover and health checking
  • Cross-provider traffic distribution
  • Intelligent routing decisions
Learn More β†’

The Need for Intelligent Routing

Modern AI applications face several challenges that require sophisticated routing:

  • Provider Diversity: Multiple AI providers with different capabilities and pricing
  • Performance Variability: Different models have varying response times and quality
  • Cost Optimization: Need to balance performance with cost considerations
  • Reliability Requirements: Automatic failover when providers are unavailable
  • Traffic Management: Handle varying load patterns and request volumes

:::tip Routing Benefits Intelligent AI model routing can improve response times by 40%, reduce costs by 30%, and increase reliability to 99.9% uptime. :::

Core Routing Strategies

1. Cost-Based Routing

Route requests to the most cost-effective provider while maintaining quality requirements.

from typing import Dict, List, Optional
import asyncio
import time

class CostBasedRouter:
    """Route requests based on cost optimization"""
    
    def __init__(self):
        self.provider_costs = {
            'openai': {
                'gpt-4': 0.03,
                'gpt-3.5-turbo': 0.002,
                'gpt-4-turbo': 0.01
            },
            'anthropic': {
                'claude-3-opus': 0.015,
                'claude-3-sonnet': 0.003,
                'claude-3-haiku': 0.00025
            },
            'together': {
                'llama-2-7b': 0.0003,
                'llama-2-13b': 0.0006,
                'mistral-7b': 0.0004
            }
        }
        self.quality_thresholds = {
            'high': ['gpt-4', 'claude-3-opus', 'llama-2-13b'],
            'medium': ['gpt-3.5-turbo', 'claude-3-sonnet', 'mistral-7b'],
            'low': ['claude-3-haiku', 'llama-2-7b']
        }
    
    def select_provider(self, task_complexity: str, budget: float, prompt: str) -> tuple[str, str]:
        """Select the most cost-effective provider for the task"""
        
        # Estimate tokens for cost calculation
        estimated_tokens = len(prompt.split()) * 1.3
        
        # Get eligible models based on quality requirement
        eligible_models = self.quality_thresholds.get(task_complexity, self.quality_thresholds['medium'])
        
        best_provider = None
        best_model = None
        lowest_cost = float('inf')
        
        # Find the cheapest option that meets quality requirements
        for provider, models in self.provider_costs.items():
            for model, cost_per_1k in models.items():
                if model in eligible_models:
                    total_cost = (estimated_tokens / 1000) * cost_per_1k
                    
                    if total_cost <= budget and total_cost < lowest_cost:
                        lowest_cost = total_cost
                        best_provider = provider
                        best_model = model
        
        if best_provider is None:
            # Fallback to cheapest available option
            for provider, models in self.provider_costs.items():
                for model, cost_per_1k in models.items():
                    total_cost = (estimated_tokens / 1000) * cost_per_1k
                    if total_cost < lowest_cost:
                        lowest_cost = total_cost
                        best_provider = provider
                        best_model = model
        
        return best_provider, best_model
    
    def get_cost_estimate(self, provider: str, model: str, prompt: str) -> float:
        """Get cost estimate for a specific provider and model"""
        if provider not in self.provider_costs or model not in self.provider_costs[provider]:
            return float('inf')
        
        estimated_tokens = len(prompt.split()) * 1.3
        cost_per_1k = self.provider_costs[provider][model]
        return (estimated_tokens / 1000) * cost_per_1k

2. Performance-Based Routing

Route requests based on historical performance metrics and current health status.

import statistics
from collections import defaultdict

class PerformanceBasedRouter:
    """Route requests based on performance metrics"""
    
    def __init__(self):
        self.performance_metrics = defaultdict(list)
        self.health_status = {}
        self.response_times = defaultdict(list)
        self.error_rates = defaultdict(float)
        self.success_rates = defaultdict(float)
    
    def update_metrics(self, provider: str, model: str, response_time: float, success: bool):
        """Update performance metrics for a provider/model combination"""
        key = f"{provider}:{model}"
        
        # Update response times
        self.response_times[key].append(response_time)
        if len(self.response_times[key]) > 100:  # Keep last 100 measurements
            self.response_times[key].pop(0)
        
        # Update success/error rates
        if key not in self.performance_metrics:
            self.performance_metrics[key] = {'success': 0, 'total': 0}
        
        self.performance_metrics[key]['total'] += 1
        if success:
            self.performance_metrics[key]['success'] += 1
        
        self.success_rates[key] = (
            self.performance_metrics[key]['success'] / 
            self.performance_metrics[key]['total']
        )
        self.error_rates[key] = 1 - self.success_rates[key]
    
    def get_performance_score(self, provider: str, model: str) -> float:
        """Calculate performance score for a provider/model combination"""
        key = f"{provider}:{model}"
        
        if key not in self.response_times or not self.response_times[key]:
            return 0.5  # Default score for unknown providers
        
        # Calculate average response time (lower is better)
        avg_response_time = statistics.mean(self.response_times[key])
        
        # Normalize response time (0-1 scale, lower is better)
        normalized_response_time = min(avg_response_time / 10.0, 1.0)  # Cap at 10 seconds
        
        # Get success rate
        success_rate = self.success_rates.get(key, 0.5)
        
        # Calculate performance score (higher is better)
        # Weight: 60% success rate, 40% response time
        performance_score = (success_rate * 0.6) + ((1 - normalized_response_time) * 0.4)
        
        return performance_score
    
    def select_best_performer(self, available_providers: List[str], model_requirements: List[str]) -> tuple[str, str]:
        """Select the best performing provider/model combination"""
        
        best_score = 0
        best_provider = None
        best_model = None
        
        for provider in available_providers:
            for model in model_requirements:
                score = self.get_performance_score(provider, model)
                
                if score > best_score:
                    best_score = score
                    best_provider = provider
                    best_model = model
        
        return best_provider, best_model
    
    def get_health_status(self, provider: str) -> bool:
        """Get current health status of a provider"""
        return self.health_status.get(provider, True)
    
    def mark_provider_unhealthy(self, provider: str):
        """Mark a provider as unhealthy"""
        self.health_status[provider] = False
    
    def mark_provider_healthy(self, provider: str):
        """Mark a provider as healthy"""
        self.health_status[provider] = True

3. Load Balancing Strategies

Implement various load balancing algorithms for distributing requests.

import random
from enum import Enum

class LoadBalancingStrategy(Enum):
    ROUND_ROBIN = "round_robin"
    LEAST_CONNECTIONS = "least_connections"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    LEAST_RESPONSE_TIME = "least_response_time"
    IP_HASH = "ip_hash"

class LoadBalancer:
    """Load balancer for AI model requests"""
    
    def __init__(self, strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN):
        self.strategy = strategy
        self.providers = []
        self.current_index = 0
        self.connection_counts = defaultdict(int)
        self.response_times = defaultdict(list)
        self.weights = {}
    
    def add_provider(self, provider: str, weight: int = 1):
        """Add a provider to the load balancer"""
        self.providers.append(provider)
        self.weights[provider] = weight
    
    def remove_provider(self, provider: str):
        """Remove a provider from the load balancer"""
        if provider in self.providers:
            self.providers.remove(provider)
            if provider in self.weights:
                del self.weights[provider]
    
    def select_provider(self, client_ip: str = None) -> Optional[str]:
        """Select a provider based on the load balancing strategy"""
        
        if not self.providers:
            return None
        
        if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
            return self._round_robin()
        elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
            return self._least_connections()
        elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
            return self._weighted_round_robin()
        elif self.strategy == LoadBalancingStrategy.LEAST_RESPONSE_TIME:
            return self._least_response_time()
        elif self.strategy == LoadBalancingStrategy.IP_HASH:
            return self._ip_hash(client_ip)
        
        return self.providers[0]  # Default fallback
    
    def _round_robin(self) -> str:
        """Round-robin selection"""
        provider = self.providers[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.providers)
        return provider
    
    def _least_connections(self) -> str:
        """Select provider with least active connections"""
        return min(self.providers, key=lambda p: self.connection_counts[p])
    
    def _weighted_round_robin(self) -> str:
        """Weighted round-robin selection"""
        # Create weighted list
        weighted_providers = []
        for provider in self.providers:
            weight = self.weights.get(provider, 1)
            weighted_providers.extend([provider] * weight)
        
        # Select using round-robin on weighted list
        provider = weighted_providers[self.current_index % len(weighted_providers)]
        self.current_index += 1
        return provider
    
    def _least_response_time(self) -> str:
        """Select provider with lowest average response time"""
        if not any(self.response_times.values()):
            return random.choice(self.providers)
        
        avg_response_times = {}
        for provider in self.providers:
            times = self.response_times[provider]
            avg_response_times[provider] = sum(times) / len(times) if times else float('inf')
        
        return min(avg_response_times, key=avg_response_times.get)
    
    def _ip_hash(self, client_ip: str) -> str:
        """Select provider based on client IP hash"""
        if not client_ip:
            return random.choice(self.providers)
        
        hash_value = hash(client_ip)
        return self.providers[hash_value % len(self.providers)]
    
    def record_request(self, provider: str, response_time: float):
        """Record request metrics for load balancing decisions"""
        self.connection_counts[provider] += 1
        self.response_times[provider].append(response_time)
        
        # Keep only recent response times
        if len(self.response_times[provider]) > 50:
            self.response_times[provider].pop(0)
    
    def record_response(self, provider: str):
        """Record response completion"""
        self.connection_counts[provider] = max(0, self.connection_counts[provider] - 1)

Health Checking and Failover

Implement robust health checking and automatic failover mechanisms.

import asyncio
import aiohttp
from datetime import datetime, timedelta

class HealthChecker:
    """Monitor health of AI providers"""
    
    def __init__(self, check_interval: int = 30):
        self.check_interval = check_interval
        self.health_status = {}
        self.last_check = {}
        self.failure_counts = defaultdict(int)
        self.recovery_threshold = 3
        self.failure_threshold = 2
    
    async def start_monitoring(self, providers: Dict[str, str]):
        """Start health monitoring for all providers"""
        self.providers = providers
        
        while True:
            await self._check_all_providers()
            await asyncio.sleep(self.check_interval)
    
    async def _check_all_providers(self):
        """Check health of all providers"""
        tasks = []
        for provider_name, health_endpoint in self.providers.items():
            task = asyncio.create_task(self._check_provider_health(provider_name, health_endpoint))
            tasks.append(task)
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _check_provider_health(self, provider_name: str, health_endpoint: str):
        """Check health of a specific provider"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(health_endpoint, timeout=5) as response:
                    is_healthy = response.status == 200
                    
                    if is_healthy:
                        self._mark_healthy(provider_name)
                    else:
                        self._mark_unhealthy(provider_name)
                        
        except Exception as e:
            self._mark_unhealthy(provider_name)
            print(f"Health check failed for {provider_name}: {e}")
    
    def _mark_healthy(self, provider_name: str):
        """Mark provider as healthy"""
        was_unhealthy = not self.health_status.get(provider_name, True)
        
        if was_unhealthy:
            self.failure_counts[provider_name] = 0
            print(f"βœ… {provider_name} recovered")
        
        self.health_status[provider_name] = True
        self.last_check[provider_name] = datetime.now()
    
    def _mark_unhealthy(self, provider_name: str):
        """Mark provider as unhealthy"""
        self.failure_counts[provider_name] += 1
        
        if self.failure_counts[provider_name] >= self.failure_threshold:
            self.health_status[provider_name] = False
            print(f"❌ {provider_name} marked as unhealthy")
    
    def is_healthy(self, provider_name: str) -> bool:
        """Check if provider is currently healthy"""
        return self.health_status.get(provider_name, True)
    
    def get_healthy_providers(self) -> List[str]:
        """Get list of currently healthy providers"""
        return [name for name, healthy in self.health_status.items() if healthy]

class FailoverManager:
    """Manage failover between providers"""
    
    def __init__(self, primary_providers: List[str], fallback_providers: List[str]):
        self.primary_providers = primary_providers
        self.fallback_providers = fallback_providers
        self.health_checker = HealthChecker()
        self.current_provider = primary_providers[0] if primary_providers else None
    
    async def get_available_provider(self, task_requirements: Dict) -> str:
        """Get an available provider for the task"""
        
        # Try primary providers first
        for provider in self.primary_providers:
            if self.health_checker.is_healthy(provider):
                if self._meets_requirements(provider, task_requirements):
                    return provider
        
        # Fallback to secondary providers
        for provider in self.fallback_providers:
            if self.health_checker.is_healthy(provider):
                if self._meets_requirements(provider, task_requirements):
                    return provider
        
        # If no healthy providers, return the least unhealthy one
        return self._get_least_unhealthy_provider()
    
    def _meets_requirements(self, provider: str, requirements: Dict) -> bool:
        """Check if provider meets task requirements"""
        # Implement requirement checking logic
        # e.g., model availability, cost constraints, etc.
        return True
    
    def _get_least_unhealthy_provider(self) -> str:
        """Get the provider with the least failures"""
        if not self.primary_providers and not self.fallback_providers:
            raise Exception("No providers available")
        
        all_providers = self.primary_providers + self.fallback_providers
        return min(all_providers, key=lambda p: self.health_checker.failure_counts[p])
    
    async def execute_with_failover(self, task_func, *args, **kwargs):
        """Execute a task with automatic failover"""
        
        providers_to_try = self.primary_providers + self.fallback_providers
        last_exception = None
        
        for provider in providers_to_try:
            if not self.health_checker.is_healthy(provider):
                continue
            
            try:
                # Set the provider context
                kwargs['provider'] = provider
                result = await task_func(*args, **kwargs)
                return result
                
            except Exception as e:
                last_exception = e
                self.health_checker._mark_unhealthy(provider)
                print(f"Provider {provider} failed: {e}")
                continue
        
        # If all providers failed, raise the last exception
        raise last_exception or Exception("All providers failed")

Advanced Routing Features

1. Intelligent Request Classification

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import numpy as np

class RequestClassifier:
    """Classify requests to determine optimal routing"""
    
    def __init__(self):
        self.vectorizer = TfidfVectorizer(max_features=1000)
        self.classifier = KMeans(n_clusters=5)
        self.request_patterns = {
            'summarization': ['summarize', 'summary', 'brief', 'overview'],
            'classification': ['classify', 'categorize', 'identify', 'detect'],
            'generation': ['generate', 'create', 'write', 'compose'],
            'analysis': ['analyze', 'examine', 'study', 'investigate'],
            'translation': ['translate', 'convert', 'interpret']
        }
    
    def classify_request(self, prompt: str) -> Dict[str, float]:
        """Classify a request and return confidence scores"""
        
        # Simple keyword-based classification
        prompt_lower = prompt.lower()
        scores = {}
        
        for category, keywords in self.request_patterns.items():
            score = sum(1 for keyword in keywords if keyword in prompt_lower)
            scores[category] = score / len(keywords)
        
        return scores
    
    def get_optimal_provider(self, prompt: str, available_providers: List[str]) -> str:
        """Get optimal provider based on request classification"""
        
        classification = self.classify_request(prompt)
        primary_category = max(classification, key=classification.get)
        
        # Provider capabilities mapping
        provider_capabilities = {
            'openai': ['generation', 'analysis', 'classification'],
            'anthropic': ['analysis', 'generation', 'summarization'],
            'together': ['generation', 'translation']
        }
        
        # Find providers that excel at this category
        suitable_providers = [
            provider for provider in available_providers
            if primary_category in provider_capabilities.get(provider, [])
        ]
        
        if suitable_providers:
            return suitable_providers[0]
        
        return available_providers[0] if available_providers else None

2. Dynamic Load Balancing

class DynamicLoadBalancer(LoadBalancer):
    """Dynamic load balancer that adapts to changing conditions"""
    
    def __init__(self):
        super().__init__(LoadBalancingStrategy.LEAST_RESPONSE_TIME)
        self.adaptive_weights = defaultdict(lambda: 1.0)
        self.performance_history = defaultdict(list)
        self.adaptation_rate = 0.1
    
    def adapt_weights(self):
        """Adapt provider weights based on performance"""
        
        for provider in self.providers:
            if provider in self.response_times and self.response_times[provider]:
                # Calculate performance score
                avg_response_time = statistics.mean(self.response_times[provider])
                success_rate = self.success_rates.get(provider, 0.5)
                
                # Performance score (higher is better)
                performance_score = success_rate / (1 + avg_response_time)
                
                # Update adaptive weight
                current_weight = self.adaptive_weights[provider]
                target_weight = performance_score
                
                new_weight = current_weight + self.adaptation_rate * (target_weight - current_weight)
                self.adaptive_weights[provider] = max(0.1, new_weight)
                
                # Update provider weight
                self.weights[provider] = int(new_weight * 10)
    
    def select_provider(self, client_ip: str = None) -> Optional[str]:
        """Select provider with adaptive weights"""
        
        # Adapt weights periodically
        if random.random() < 0.1:  # 10% chance to adapt
            self.adapt_weights()
        
        return super().select_provider(client_ip)

Implementation Example

Here’s a complete example of implementing intelligent routing and load balancing:

async def setup_intelligent_routing():
    """Set up complete intelligent routing system"""
    
    # Initialize components
    cost_router = CostBasedRouter()
    performance_router = PerformanceBasedRouter()
    load_balancer = DynamicLoadBalancer()
    health_checker = HealthChecker()
    failover_manager = FailoverManager(
        primary_providers=['openai', 'anthropic'],
        fallback_providers=['together', 'google']
    )
    
    # Add providers to load balancer
    load_balancer.add_provider('openai', weight=3)
    load_balancer.add_provider('anthropic', weight=2)
    load_balancer.add_provider('together', weight=1)
    
    # Set up health checking
    health_endpoints = {
        'openai': 'https://api.openai.com/v1/models',
        'anthropic': 'https://api.anthropic.com/v1/models',
        'together': 'https://api.together.xyz/v1/models'
    }
    
    # Start health monitoring
    asyncio.create_task(health_checker.start_monitoring(health_endpoints))
    
    return {
        'cost_router': cost_router,
        'performance_router': performance_router,
        'load_balancer': load_balancer,
        'health_checker': health_checker,
        'failover_manager': failover_manager
    }

async def route_request(prompt: str, task_complexity: str = 'medium', budget: float = 0.01):
    """Route a request using intelligent routing"""
    
    routing_system = await setup_intelligent_routing()
    
    # Get healthy providers
    healthy_providers = routing_system['health_checker'].get_healthy_providers()
    
    if not healthy_providers:
        raise Exception("No healthy providers available")
    
    # Select provider based on multiple factors
    provider = None
    
    # 1. Try cost-based routing first
    cost_provider, cost_model = routing_system['cost_router'].select_provider(
        task_complexity, budget, prompt
    )
    
    if cost_provider in healthy_providers:
        provider = cost_provider
    
    # 2. Fallback to performance-based routing
    if not provider:
        perf_provider, perf_model = routing_system['performance_router'].select_best_performer(
            healthy_providers, ['gpt-3.5-turbo', 'claude-3-haiku']
        )
        provider = perf_provider
    
    # 3. Final fallback to load balancer
    if not provider:
        provider = routing_system['load_balancer'].select_provider()
    
    # Execute request with failover
    async def execute_request():
        # Simulate API call
        await asyncio.sleep(0.1)
        return f"Response from {provider}"
    
    result = await routing_system['failover_manager'].execute_with_failover(
        execute_request
    )
    
    return result

## Usage example

```python
async def main():
    prompts = [
        "Summarize the benefits of AI routing",
        "Classify this text as positive or negative",
        "Generate a creative story about AI"
    ]
    
    for prompt in prompts:
        try:
            result = await route_request(prompt, 'medium', 0.01)
            print(f"βœ… {result}")
        except Exception as e:
            print(f"❌ Error: {e}")

if __name__ == "__main__":
    asyncio.run(main())

Best Practices

1. Monitoring and Metrics

class RoutingMetrics:
    """Track routing performance metrics"""
    
    def __init__(self):
        self.metrics = {
            'requests_routed': 0,
            'failures': 0,
            'average_response_time': 0,
            'cost_per_request': 0,
            'provider_utilization': defaultdict(int)
        }
    
    def record_request(self, provider: str, response_time: float, cost: float, success: bool):
        """Record routing metrics"""
        self.metrics['requests_routed'] += 1
        self.metrics['provider_utilization'][provider] += 1
        
        if not success:
            self.metrics['failures'] += 1
        
        # Update averages
        total_requests = self.metrics['requests_routed']
        current_avg = self.metrics['average_response_time']
        self.metrics['average_response_time'] = (
            (current_avg * (total_requests - 1) + response_time) / total_requests
        )
        
        current_cost_avg = self.metrics['cost_per_request']
        self.metrics['cost_per_request'] = (
            (current_cost_avg * (total_requests - 1) + cost) / total_requests
        )
    
    def get_routing_report(self) -> Dict:
        """Generate routing performance report"""
        total_requests = self.metrics['requests_routed']
        success_rate = (total_requests - self.metrics['failures']) / max(total_requests, 1)
        
        return {
            'total_requests': total_requests,
            'success_rate': success_rate,
            'average_response_time': self.metrics['average_response_time'],
            'average_cost': self.metrics['cost_per_request'],
            'provider_utilization': dict(self.metrics['provider_utilization'])
        }

2. Configuration Management

# routing_config.yaml
routing:
  strategy: "hybrid"  # cost, performance, load_balance, hybrid
  health_check_interval: 30
  failover_threshold: 2
  recovery_threshold: 3

providers:
  openai:
    weight: 3
    cost_multiplier: 1.0
    performance_bonus: 0.1
    models: ["gpt-4", "gpt-3.5-turbo"]
  
  anthropic:
    weight: 2
    cost_multiplier: 0.8
    performance_bonus: 0.05
    models: ["claude-3-opus", "claude-3-sonnet"]
  
  together:
    weight: 1
    cost_multiplier: 0.3
    performance_bonus: 0.0
    models: ["llama-2-7b", "mistral-7b"]

load_balancing:
  algorithm: "weighted_round_robin"
  adaptation_rate: 0.1
  history_size: 100

monitoring:
  enabled: true
  metrics_retention_days: 30
  alert_threshold: 0.95

Conclusion

Intelligent AI model routing and load balancing provide significant benefits:

  • Improved Performance: Faster response times through optimal provider selection
  • Cost Optimization: Reduced costs by routing to cost-effective providers
  • Enhanced Reliability: Automatic failover and health monitoring
  • Better Scalability: Handle increased load through load balancing
  • Operational Efficiency: Reduced manual intervention and monitoring

:::tip Implementation Priority Start with simple cost-based routing, then add health checking and load balancing as your needs grow. :::

The key to success is implementing routing incrementally, starting with basic strategies and gradually adding more sophisticated features based on your specific requirements and performance metrics.

← Back to Learning Center