When your AI application depends on multiple providers (OpenAI, Anthropic, Google, etc.), implementing robust failover between them is crucial for maintaining reliability and performance.
Why Multi-Provider Failover Matters
- Provider Diversity: Different providers have different strengths and pricing
- Risk Mitigation: Avoid single points of failure
- Cost Optimization: Route to the most cost-effective provider
- Performance: Switch to faster providers when needed
Advanced Multi-Provider Architecture
1. Provider Abstraction Layer
Create a unified interface that abstracts provider differences:
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import asyncio
class AIProvider(ABC):
"""Abstract base class for AI providers"""
@abstractmethod
async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
pass
@abstractmethod
async def get_models(self) -> List[str]:
pass
@abstractmethod
async def get_cost_estimate(self, prompt: str, model: str) -> float:
pass
@abstractmethod
async def check_health(self) -> bool:
pass
class OpenAIProvider(AIProvider):
def __init__(self, api_key: str):
self.api_key = api_key
self.client = openai.AsyncOpenAI(api_key=api_key)
async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
try:
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**kwargs
)
return response.choices[0].message.content
except Exception as e:
raise ProviderError(f"OpenAI error: {str(e)}")
async def check_health(self) -> bool:
try:
await self.client.models.list()
return True
except:
return False
class AnthropicProvider(AIProvider):
def __init__(self, api_key: str):
self.api_key = api_key
self.client = anthropic.AsyncAnthropic(api_key=api_key)
async def generate_text(self, prompt: str, model: str, **kwargs) -> str:
try:
response = await self.client.messages.create(
model=model,
max_tokens=kwargs.get('max_tokens', 1000),
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
except Exception as e:
raise ProviderError(f"Anthropic error: {str(e)}")
async def check_health(self) -> bool:
try:
# Anthropic doesn't have a models endpoint, so we'll use a simple ping
return True
except:
return False
2. Intelligent Provider Selection
Implement smart routing based on multiple factors:
class MultiProviderRouter:
def __init__(self, providers: Dict[str, AIProvider]):
self.providers = providers
self.health_status = {name: True for name in providers}
self.performance_metrics = {name: {'latency': [], 'success_rate': 0.95} for name in providers}
self.cost_metrics = {name: {'avg_cost': 0.0} for name in providers}
async def select_provider(self, prompt: str, requirements: Dict[str, Any]) -> str:
"""Select the best provider based on requirements"""
# Filter healthy providers
healthy_providers = [name for name, healthy in self.health_status.items() if healthy]
if not healthy_providers:
raise Exception("No healthy providers available")
# Score providers based on requirements
provider_scores = {}
for provider_name in healthy_providers:
score = 0
# Cost optimization
if requirements.get('optimize_cost', False):
cost_score = 1.0 / (self.cost_metrics[provider_name]['avg_cost'] + 0.001)
score += cost_score * 0.4
# Performance optimization
if requirements.get('optimize_speed', False):
latency = self.performance_metrics[provider_name]['latency']
if latency:
avg_latency = sum(latency) / len(latency)
speed_score = 1.0 / (avg_latency + 0.1)
score += speed_score * 0.4
# Reliability optimization
success_rate = self.performance_metrics[provider_name]['success_rate']
score += success_rate * 0.2
provider_scores[provider_name] = score
# Return the provider with the highest score
return max(provider_scores, key=provider_scores.get)
async def execute_with_failover(self, prompt: str, model: str, requirements: Dict[str, Any] = None) -> str:
"""Execute request with automatic failover"""
requirements = requirements or {}
attempted_providers = []
while len(attempted_providers) < len(self.providers):
try:
# Select the best provider
provider_name = await self.select_provider(prompt, requirements)
if provider_name in attempted_providers:
# If we've tried all providers, break
break
attempted_providers.append(provider_name)
provider = self.providers[provider_name]
# Execute the request
start_time = time.time()
result = await provider.generate_text(prompt, model)
end_time = time.time()
# Update metrics
self._update_metrics(provider_name, end_time - start_time, True)
return result
except Exception as e:
# Mark provider as unhealthy
self.health_status[provider_name] = False
self._update_metrics(provider_name, 0, False)
print(f"Provider {provider_name} failed: {e}")
continue
raise Exception("All providers failed")
def _update_metrics(self, provider_name: str, latency: float, success: bool):
"""Update performance metrics"""
metrics = self.performance_metrics[provider_name]
if success:
metrics['latency'].append(latency)
if len(metrics['latency']) > 100:
metrics['latency'].pop(0)
# Update success rate (simplified)
if success:
metrics['success_rate'] = min(1.0, metrics['success_rate'] + 0.001)
else:
metrics['success_rate'] = max(0.0, metrics['success_rate'] - 0.01)
3. Health Monitoring & Circuit Breakers
Implement comprehensive health monitoring:
import asyncio
import time
from typing import Dict, List
class HealthMonitor:
def __init__(self, check_interval: int = 30):
self.check_interval = check_interval
self.health_status = {}
self.failure_counts = {}
self.last_check = {}
self.circuit_breakers = {}
async def start_monitoring(self, providers: Dict[str, AIProvider]):
"""Start continuous health monitoring"""
while True:
await self._check_all_providers(providers)
await asyncio.sleep(self.check_interval)
async def _check_all_providers(self, providers: Dict[str, AIProvider]):
"""Check health of all providers"""
tasks = []
for name, provider in providers.items():
task = asyncio.create_task(self._check_provider_health(name, provider))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
async def _check_provider_health(self, name: str, provider: AIProvider):
"""Check health of a specific provider"""
try:
is_healthy = await provider.check_health()
if is_healthy:
self._mark_healthy(name)
else:
self._mark_unhealthy(name)
except Exception as e:
self._mark_unhealthy(name)
print(f"Health check failed for {name}: {e}")
def _mark_healthy(self, provider_name: str):
"""Mark provider as healthy"""
was_unhealthy = not self.health_status.get(provider_name, True)
if was_unhealthy:
self.failure_counts[provider_name] = 0
print(f"✅ {provider_name} recovered")
self.health_status[provider_name] = True
self.last_check[provider_name] = time.time()
def _mark_unhealthy(self, provider_name: str):
"""Mark provider as unhealthy"""
self.failure_counts[provider_name] = self.failure_counts.get(provider_name, 0) + 1
if self.failure_counts[provider_name] >= 3:
self.health_status[provider_name] = False
print(f"❌ {provider_name} marked as unhealthy")
def is_healthy(self, provider_name: str) -> bool:
"""Check if provider is currently healthy"""
return self.health_status.get(provider_name, True)
Platform-Specific Considerations
OpenAI Failover
- Handle rate limiting (429 errors)
- Monitor quota usage
- Use different API keys for redundancy
Anthropic Failover
- Handle message format differences
- Monitor API version compatibility
- Use different models for fallback
Google AI Failover
- Handle authentication token refresh
- Monitor quota and billing
- Use different project IDs
Implementation Example
async def setup_multi_provider_system():
"""Set up complete multi-provider failover system"""
# Initialize providers
providers = {
'openai': OpenAIProvider(os.getenv('OPENAI_API_KEY')),
'anthropic': AnthropicProvider(os.getenv('ANTHROPIC_API_KEY')),
'google': GoogleProvider(os.getenv('GOOGLE_AI_API_KEY'))
}
# Initialize router and monitor
router = MultiProviderRouter(providers)
health_monitor = HealthMonitor()
# Start health monitoring
asyncio.create_task(health_monitor.start_monitoring(providers))
return router
## Usage example
```python
async def example_usage():
router = await setup_multi_provider_system()
# Generate text with cost optimization
result = await router.execute_with_failover(
prompt="Explain quantum computing",
model="gpt-4",
requirements={'optimize_cost': True}
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(example_usage())
Best Practices
- Monitor Everything: Track health, performance, and costs for all providers
- Test Regularly: Simulate provider failures to ensure failover works
- Document Dependencies: Keep track of provider-specific requirements
- Plan for All Scenarios: Consider what happens when all providers fail
- Optimize for Your Use Case: Balance cost, speed, and reliability based on your needs
Conclusion
Multi-provider AI failover provides significant benefits for reliability and cost optimization. By implementing intelligent routing, health monitoring, and circuit breakers, you can ensure your AI applications remain robust even when individual providers experience issues.
The key is to start with a simple abstraction layer and gradually add more sophisticated features like intelligent routing and comprehensive monitoring as your needs grow.