Usage
import asyncio
from typing import List, Dict, Any
class SimpleFallback:
def __init__(self, providers: List[Dict[str, Any]]):
self.providers = providers
async def generate_text(self, prompt: str, model: str = None) -> str:
"""Generate text with simple sequential fallback"""
for provider in self.providers:
try:
result = await self._call_provider(provider, prompt, model)
print(f"✅ Success with {provider['name']}")
return result
except Exception as e:
print(f"❌ {provider['name']} failed: {e}")
continue
raise Exception("All providers failed")
# Usage
providers = [
{'name': 'openai', 'api_key': 'sk-...', 'endpoint': 'https://api.openai.com/v1'},
{'name': 'anthropic', 'api_key': 'sk-ant-...', 'endpoint': 'https://api.anthropic.com'},
{'name': 'together', 'api_key': '...', 'endpoint': 'https://api.together.xyz'}
]
fallback = SimpleFallback(providers)
result = await fallback.generate_text("Hello, world!")
2. Parallel Fallback with Race Condition
Send requests to multiple providers simultaneously and use the first response:
import asyncio
import aiohttp
from typing import List, Dict, Any
class ParallelFallback:
def __init__(self, providers: List[Dict[str, Any]]):
self.providers = providers
async def generate_text(self, prompt: str, model: str = None) -> str:
"""Generate text with parallel fallback"""
# Create tasks for all providers
tasks = []
for provider in self.providers:
task = asyncio.create_task(
self._call_provider_with_timeout(provider, prompt, model)
)
tasks.append(task)
# Wait for the first successful response
for completed_task in asyncio.as_completed(tasks, timeout=30):
try:
result = await completed_task
# Cancel remaining tasks
for task in tasks:
if not task.done():
task.cancel()
return result
except Exception as e:
print(f"Provider failed: {e}")
continue
raise Exception("All providers failed")
async def _call_provider_with_timeout(self, provider: Dict, prompt: str, model: str) -> str:
"""Call provider with timeout"""
try:
return await asyncio.wait_for(
self._call_provider(provider, prompt, model),
timeout=10
)
except asyncio.TimeoutError:
raise Exception(f"Timeout for {provider['name']}")
3. Intelligent Fallback with Health Checks
Monitor provider health and route to healthy providers:
import time
from typing import Dict, List, Any
class HealthAwareFallback:
def __init__(self, providers: List[Dict[str, Any]]):
self.providers = providers
self.health_status = {p['name']: True for p in providers}
self.last_failure = {p['name']: 0 for p in providers}
self.failure_threshold = 3
self.recovery_time = 300 # 5 minutes
async def generate_text(self, prompt: str, model: str = None) -> str:
"""Generate text with health-aware fallback"""
# Get healthy providers
healthy_providers = [
p for p in self.providers
if self._is_healthy(p['name'])
]
if not healthy_providers:
# Try all providers if none are healthy
healthy_providers = self.providers
for provider in healthy_providers:
try:
result = await self._call_provider(provider, prompt, model)
self._mark_success(provider['name'])
return result
except Exception as e:
self._mark_failure(provider['name'])
print(f"❌ {provider['name']} failed: {e}")
continue
raise Exception("All providers failed")
def _is_healthy(self, provider_name: str) -> bool:
"""Check if provider is healthy"""
if not self.health_status[provider_name]:
# Check if recovery time has passed
time_since_failure = time.time() - self.last_failure[provider_name]
if time_since_failure > self.recovery_time:
self.health_status[provider_name] = True
return True
return False
return True
def _mark_success(self, provider_name: str):
"""Mark provider as successful"""
self.health_status[provider_name] = True
def _mark_failure(self, provider_name: str):
"""Mark provider as failed"""
self.last_failure[provider_name] = time.time()
# Mark as unhealthy after threshold failures
if self.last_failure[provider_name] >= self.failure_threshold:
self.health_status[provider_name] = False
4. Fallback with Caching
Implement caching to reduce API calls and provide fallback responses:
import hashlib
import json
from typing import Dict, Any, Optional
class CachedFallback:
def __init__(self, providers: List[Dict[str, Any]], cache: Dict[str, Any] = None):
self.providers = providers
self.cache = cache or {}
self.cache_ttl = 3600 # 1 hour
async def generate_text(self, prompt: str, model: str = None) -> str:
"""Generate text with caching and fallback"""
# Check cache first
cache_key = self._create_cache_key(prompt, model)
cached_result = self._get_from_cache(cache_key)
if cached_result:
print("✅ Returning cached result")
return cached_result
# Try providers
for provider in self.providers:
try:
result = await self._call_provider(provider, prompt, model)
# Cache the result
self._cache_result(cache_key, result)
return result
except Exception as e:
print(f"❌ {provider['name']} failed: {e}")
continue
# Return cached fallback if available
fallback_key = f"fallback:{cache_key}"
fallback_result = self._get_from_cache(fallback_key)
if fallback_result:
print("⚠️ Returning cached fallback")
return fallback_result
raise Exception("All providers failed and no cache available")
def _create_cache_key(self, prompt: str, model: str) -> str:
"""Create cache key from prompt and model"""
content = f"{prompt}:{model or 'default'}"
return hashlib.md5(content.encode()).hexdigest()
def _get_from_cache(self, key: str) -> Optional[str]:
"""Get result from cache"""
if key in self.cache:
entry = self.cache[key]
if time.time() - entry['timestamp'] < self.cache_ttl:
return entry['result']
else:
del self.cache[key]
return None
def _cache_result(self, key: str, result: str):
"""Cache a result"""
self.cache[key] = {
'result': result,
'timestamp': time.time()
}
Error Handling Strategies
1. Retry with Exponential Backoff
import asyncio
import random
async def retry_with_backoff(func, max_retries: int = 3, base_delay: float = 1.0):
"""Retry function with exponential backoff"""
for attempt in range(max_retries):
try:
return await func()
except Exception as e:
if attempt == max_retries - 1:
raise e
# Calculate delay with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Retry {attempt + 1}/{max_retries} in {delay:.2f}s")
await asyncio.sleep(delay)
2. Circuit Breaker Pattern
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
async def call(self, func):
"""Call function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is open")
try:
result = await func()
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
Real-World Implementation Example
class ProductionAIFallback:
def __init__(self):
self.providers = [
{
'name': 'openai',
'api_key': os.getenv('OPENAI_API_KEY'),
'endpoint': 'https://api.openai.com/v1/chat/completions',
'models': ['gpt-4', 'gpt-3.5-turbo']
},
{
'name': 'anthropic',
'api_key': os.getenv('ANTHROPIC_API_KEY'),
'endpoint': 'https://api.anthropic.com/v1/messages',
'models': ['claude-3-opus', 'claude-3-sonnet']
},
{
'name': 'together',
'api_key': os.getenv('TOGETHER_API_KEY'),
'endpoint': 'https://api.together.xyz/v1/completions',
'models': ['llama-2-7b', 'mistral-7b']
}
]
self.circuit_breakers = {
p['name']: CircuitBreaker() for p in self.providers
}
self.cache = {}
async def generate_text(self, prompt: str, model: str = None) -> str:
"""Generate text with production-ready fallback"""
# Check cache first
cache_key = hashlib.md5(f"{prompt}:{model}".encode()).hexdigest()
if cache_key in self.cache:
return self.cache[cache_key]
# Try each provider with circuit breaker
for provider in self.providers:
try:
circuit_breaker = self.circuit_breakers[provider['name']]
async def call_provider():
return await self._call_provider(provider, prompt, model)
result = await circuit_breaker.call(call_provider)
# Cache successful result
self.cache[cache_key] = result
return result
except Exception as e:
print(f"❌ {provider['name']} failed: {e}")
continue
# Return cached fallback if available
fallback_key = f"fallback:{cache_key}"
if fallback_key in self.cache:
return self.cache[fallback_key]
raise Exception("All providers failed")
async def _call_provider(self, provider: Dict, prompt: str, model: str) -> str:
"""Call a specific provider"""
# Implementation would vary by provider
# This is a simplified example
pass
Best Practices
- Monitor and Log: Track all fallback events and provider performance
- Test Regularly: Simulate provider failures to ensure fallback works
- Use Timeouts: Prevent hanging requests from blocking your application
- Implement Caching: Reduce API calls and provide fallback responses
- Handle All Errors: Don’t just catch generic exceptions
- Document Dependencies: Keep track of provider-specific requirements
Conclusion
Implementing robust AI API fallback is essential for production applications. Start with simple sequential fallback and gradually add more sophisticated features like parallel execution, health monitoring, and caching as your needs grow.
The key is to always have a plan for when things go wrong and to test your fallback mechanisms regularly.