Multi-Provider AI Failover

Master multi-provider AI failover strategies. Learn advanced patterns for switching between OpenAI, Anthropic, Google, and other AI providers seamlessly.

When your AI application depends on multiple providers (OpenAI, Anthropic, Google, etc.), implementing robust failover between them is crucial for maintaining reliability and performance.

Why Multi-Provider Failover Matters

  • Provider Diversity: Different providers have different strengths and pricing
  • Risk Mitigation: Avoid single points of failure
  • Cost Optimization: Route to the most cost-effective provider
  • Performance: Switch to faster providers when needed

Advanced Multi-Provider Architecture

1. Provider Abstraction Layer

Create a unified interface that abstracts provider differences:

from abc import ABC, abstractmethod
from typing import Dict, Any, List
import asyncio

class AIProvider(ABC):
    """Abstract base class for AI providers"""
    
    @abstractmethod
    async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
        pass
    
    @abstractmethod
    async def get_models(self) -> List[str]:
        pass
    
    @abstractmethod
    async def get_cost_estimate(self, prompt: str, model: str) -> float:
        pass
    
    @abstractmethod
    async def check_health(self) -> bool:
        pass

class OpenAIProvider(AIProvider):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = openai.AsyncOpenAI(api_key=api_key)
    
    async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                **kwargs
            )
            return response.choices[0].message.content
        except Exception as e:
            raise ProviderError(f"OpenAI error: {str(e)}")
    
    async def check_health(self) -> bool:
        try:
            await self.client.models.list()
            return True
        except:
            return False

class AnthropicProvider(AIProvider):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = anthropic.AsyncAnthropic(api_key=api_key)
    
    async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
        try:
            response = await self.client.messages.create(
                model=model,
                max_tokens=kwargs.get('max_tokens', 1000),
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text
        except Exception as e:
            raise ProviderError(f"Anthropic error: {str(e)}")
    
    async def check_health(self) -> bool:
        try:
            # Anthropic doesn't have a models endpoint, so we'll use a simple ping
            return True
        except:
            return False

2. Intelligent Provider Selection

Implement smart routing based on multiple factors:

class MultiProviderRouter:
    def __init__(self, providers: Dict[str, AIProvider]):
        self.providers = providers
        self.health_status = {name: True for name in providers}
        self.performance_metrics = {name: {'latency': [], 'success_rate': 0.95} for name in providers}
        self.cost_metrics = {name: {'avg_cost': 0.0} for name in providers}
    
    async def select_provider(self, prompt: str, requirements: Dict[str, Any]) -> str:
        """Select the best provider based on requirements"""
        
        # Filter healthy providers
        healthy_providers = [name for name, healthy in self.health_status.items() if healthy]
        
        if not healthy_providers:
            raise Exception("No healthy providers available")
        
        # Score providers based on requirements
        provider_scores = {}
        
        for provider_name in healthy_providers:
            score = 0
            
            # Cost optimization
            if requirements.get('optimize_cost', False):
                cost_score = 1.0 / (self.cost_metrics[provider_name]['avg_cost'] + 0.001)
                score += cost_score * 0.4
            
            # Performance optimization
            if requirements.get('optimize_speed', False):
                latency = self.performance_metrics[provider_name]['latency']
                if latency:
                    avg_latency = sum(latency) / len(latency)
                    speed_score = 1.0 / (avg_latency + 0.1)
                    score += speed_score * 0.4
            
            # Reliability optimization
            success_rate = self.performance_metrics[provider_name]['success_rate']
            score += success_rate * 0.2
            
            provider_scores[provider_name] = score
        
        # Return the provider with the highest score
        return max(provider_scores, key=provider_scores.get)
    
    async def execute_with_failover(self, prompt: str, model: str, requirements: Dict[str, Any] = None) -> str:
        """Execute request with automatic failover"""
        
        requirements = requirements or {}
        attempted_providers = []
        
        while len(attempted_providers) < len(self.providers):
            try:
                # Select the best provider
                provider_name = await self.select_provider(prompt, requirements)
                
                if provider_name in attempted_providers:
                    # If we've tried all providers, break
                    break
                
                attempted_providers.append(provider_name)
                provider = self.providers[provider_name]
                
                # Execute the request
                start_time = time.time()
                result = await provider.generate_text(prompt, model)
                end_time = time.time()
                
                # Update metrics
                self._update_metrics(provider_name, end_time - start_time, True)
                
                return result
                
            except Exception as e:
                # Mark provider as unhealthy
                self.health_status[provider_name] = False
                self._update_metrics(provider_name, 0, False)
                
                print(f"Provider {provider_name} failed: {e}")
                continue
        
        raise Exception("All providers failed")
    
    def _update_metrics(self, provider_name: str, latency: float, success: bool):
        """Update performance metrics"""
        metrics = self.performance_metrics[provider_name]
        
        if success:
            metrics['latency'].append(latency)
            if len(metrics['latency']) > 100:
                metrics['latency'].pop(0)
        
        # Update success rate (simplified)
        if success:
            metrics['success_rate'] = min(1.0, metrics['success_rate'] + 0.001)
        else:
            metrics['success_rate'] = max(0.0, metrics['success_rate'] - 0.01)

3. Health Monitoring & Circuit Breakers

Implement comprehensive health monitoring:

import asyncio
import time
from typing import Dict, List

class HealthMonitor:
    def __init__(self, check_interval: int = 30):
        self.check_interval = check_interval
        self.health_status = {}
        self.failure_counts = {}
        self.last_check = {}
        self.circuit_breakers = {}
    
    async def start_monitoring(self, providers: Dict[str, AIProvider]):
        """Start continuous health monitoring"""
        while True:
            await self._check_all_providers(providers)
            await asyncio.sleep(self.check_interval)
    
    async def _check_all_providers(self, providers: Dict[str, AIProvider]):
        """Check health of all providers"""
        tasks = []
        for name, provider in providers.items():
            task = asyncio.create_task(self._check_provider_health(name, provider))
            tasks.append(task)
        
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _check_provider_health(self, name: str, provider: AIProvider):
        """Check health of a specific provider"""
        try:
            is_healthy = await provider.check_health()
            
            if is_healthy:
                self._mark_healthy(name)
            else:
                self._mark_unhealthy(name)
                
        except Exception as e:
            self._mark_unhealthy(name)
            print(f"Health check failed for {name}: {e}")
    
    def _mark_healthy(self, provider_name: str):
        """Mark provider as healthy"""
        was_unhealthy = not self.health_status.get(provider_name, True)
        
        if was_unhealthy:
            self.failure_counts[provider_name] = 0
            print(f"✅ {provider_name} recovered")
        
        self.health_status[provider_name] = True
        self.last_check[provider_name] = time.time()
    
    def _mark_unhealthy(self, provider_name: str):
        """Mark provider as unhealthy"""
        self.failure_counts[provider_name] = self.failure_counts.get(provider_name, 0) + 1
        
        if self.failure_counts[provider_name] >= 3:
            self.health_status[provider_name] = False
            print(f"❌ {provider_name} marked as unhealthy")
    
    def is_healthy(self, provider_name: str) -> bool:
        """Check if provider is currently healthy"""
        return self.health_status.get(provider_name, True)

Platform-Specific Considerations

OpenAI Failover

  • Handle rate limiting (429 errors)
  • Monitor quota usage
  • Use different API keys for redundancy

Anthropic Failover

  • Handle message format differences
  • Monitor API version compatibility
  • Use different models for fallback

Google AI Failover

  • Handle authentication token refresh
  • Monitor quota and billing
  • Use different project IDs

Implementation Example

async def setup_multi_provider_system():
    """Set up complete multi-provider failover system"""
    
    # Initialize providers
    providers = {
        'openai': OpenAIProvider(os.getenv('OPENAI_API_KEY')),
        'anthropic': AnthropicProvider(os.getenv('ANTHROPIC_API_KEY')),
        'google': GoogleProvider(os.getenv('GOOGLE_AI_API_KEY'))
    }
    
    # Initialize router and monitor
    router = MultiProviderRouter(providers)
    health_monitor = HealthMonitor()
    
    # Start health monitoring
    asyncio.create_task(health_monitor.start_monitoring(providers))
    
    return router

## Usage example

```python
async def example_usage():
    router = await setup_multi_provider_system()
    
    # Generate text with cost optimization
    result = await router.execute_with_failover(
        prompt="Explain quantum computing",
        model="gpt-4",
        requirements={'optimize_cost': True}
    )
    
    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(example_usage())

Best Practices

  1. Monitor Everything: Track health, performance, and costs for all providers
  2. Test Regularly: Simulate provider failures to ensure failover works
  3. Document Dependencies: Keep track of provider-specific requirements
  4. Plan for All Scenarios: Consider what happens when all providers fail
  5. Optimize for Your Use Case: Balance cost, speed, and reliability based on your needs

Conclusion

Multi-provider AI failover provides significant benefits for reliability and cost optimization. By implementing intelligent routing, health monitoring, and circuit breakers, you can ensure your AI applications remain robust even when individual providers experience issues.

The key is to start with a simple abstraction layer and gradually add more sophisticated features like intelligent routing and comprehensive monitoring as your needs grow.

← Back to Learning Center