AI API Fallback Implementation

Learn practical AI API fallback implementation patterns. Master code examples, error handling, and real-world scenarios for robust AI applications.

Usage

import asyncio
from typing import List, Dict, Any

class SimpleFallback:
    def __init__(self, providers: List[Dict[str, Any]]):
        self.providers = providers
    
    async def generate_text(self, prompt: str, model: str = None) -> str:
        """Generate text with simple sequential fallback"""
        
        for provider in self.providers:
            try:
                result = await self._call_provider(provider, prompt, model)
                print(f"✅ Success with {provider['name']}")
                return result
            except Exception as e:
                print(f"❌ {provider['name']} failed: {e}")
                continue
        
        raise Exception("All providers failed")

# Usage
providers = [
    {'name': 'openai', 'api_key': 'sk-...', 'endpoint': 'https://api.openai.com/v1'},
    {'name': 'anthropic', 'api_key': 'sk-ant-...', 'endpoint': 'https://api.anthropic.com'},
    {'name': 'together', 'api_key': '...', 'endpoint': 'https://api.together.xyz'}
]

fallback = SimpleFallback(providers)
result = await fallback.generate_text("Hello, world!")

2. Parallel Fallback with Race Condition

Send requests to multiple providers simultaneously and use the first response:

import asyncio
import aiohttp
from typing import List, Dict, Any

class ParallelFallback:
    def __init__(self, providers: List[Dict[str, Any]]):
        self.providers = providers
    
    async def generate_text(self, prompt: str, model: str = None) -> str:
        """Generate text with parallel fallback"""
        
        # Create tasks for all providers
        tasks = []
        for provider in self.providers:
            task = asyncio.create_task(
                self._call_provider_with_timeout(provider, prompt, model)
            )
            tasks.append(task)
        
        # Wait for the first successful response
        for completed_task in asyncio.as_completed(tasks, timeout=30):
            try:
                result = await completed_task
                # Cancel remaining tasks
                for task in tasks:
                    if not task.done():
                        task.cancel()
                return result
            except Exception as e:
                print(f"Provider failed: {e}")
                continue
        
        raise Exception("All providers failed")
    
    async def _call_provider_with_timeout(self, provider: Dict, prompt: str, model: str) -> str:
        """Call provider with timeout"""
        try:
            return await asyncio.wait_for(
                self._call_provider(provider, prompt, model),
                timeout=10
            )
        except asyncio.TimeoutError:
            raise Exception(f"Timeout for {provider['name']}")

3. Intelligent Fallback with Health Checks

Monitor provider health and route to healthy providers:

import time
from typing import Dict, List, Any

class HealthAwareFallback:
    def __init__(self, providers: List[Dict[str, Any]]):
        self.providers = providers
        self.health_status = {p['name']: True for p in providers}
        self.last_failure = {p['name']: 0 for p in providers}
        self.failure_threshold = 3
        self.recovery_time = 300  # 5 minutes
    
    async def generate_text(self, prompt: str, model: str = None) -> str:
        """Generate text with health-aware fallback"""
        
        # Get healthy providers
        healthy_providers = [
            p for p in self.providers 
            if self._is_healthy(p['name'])
        ]
        
        if not healthy_providers:
            # Try all providers if none are healthy
            healthy_providers = self.providers
        
        for provider in healthy_providers:
            try:
                result = await self._call_provider(provider, prompt, model)
                self._mark_success(provider['name'])
                return result
            except Exception as e:
                self._mark_failure(provider['name'])
                print(f"❌ {provider['name']} failed: {e}")
                continue
        
        raise Exception("All providers failed")
    
    def _is_healthy(self, provider_name: str) -> bool:
        """Check if provider is healthy"""
        if not self.health_status[provider_name]:
            # Check if recovery time has passed
            time_since_failure = time.time() - self.last_failure[provider_name]
            if time_since_failure > self.recovery_time:
                self.health_status[provider_name] = True
                return True
            return False
        return True
    
    def _mark_success(self, provider_name: str):
        """Mark provider as successful"""
        self.health_status[provider_name] = True
    
    def _mark_failure(self, provider_name: str):
        """Mark provider as failed"""
        self.last_failure[provider_name] = time.time()
        # Mark as unhealthy after threshold failures
        if self.last_failure[provider_name] >= self.failure_threshold:
            self.health_status[provider_name] = False

4. Fallback with Caching

Implement caching to reduce API calls and provide fallback responses:

import hashlib
import json
from typing import Dict, Any, Optional

class CachedFallback:
    def __init__(self, providers: List[Dict[str, Any]], cache: Dict[str, Any] = None):
        self.providers = providers
        self.cache = cache or {}
        self.cache_ttl = 3600  # 1 hour
    
    async def generate_text(self, prompt: str, model: str = None) -> str:
        """Generate text with caching and fallback"""
        
        # Check cache first
        cache_key = self._create_cache_key(prompt, model)
        cached_result = self._get_from_cache(cache_key)
        
        if cached_result:
            print("✅ Returning cached result")
            return cached_result
        
        # Try providers
        for provider in self.providers:
            try:
                result = await self._call_provider(provider, prompt, model)
                
                # Cache the result
                self._cache_result(cache_key, result)
                
                return result
            except Exception as e:
                print(f"❌ {provider['name']} failed: {e}")
                continue
        
        # Return cached fallback if available
        fallback_key = f"fallback:{cache_key}"
        fallback_result = self._get_from_cache(fallback_key)
        
        if fallback_result:
            print("⚠️ Returning cached fallback")
            return fallback_result
        
        raise Exception("All providers failed and no cache available")
    
    def _create_cache_key(self, prompt: str, model: str) -> str:
        """Create cache key from prompt and model"""
        content = f"{prompt}:{model or 'default'}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def _get_from_cache(self, key: str) -> Optional[str]:
        """Get result from cache"""
        if key in self.cache:
            entry = self.cache[key]
            if time.time() - entry['timestamp'] < self.cache_ttl:
                return entry['result']
            else:
                del self.cache[key]
        return None
    
    def _cache_result(self, key: str, result: str):
        """Cache a result"""
        self.cache[key] = {
            'result': result,
            'timestamp': time.time()
        }

Error Handling Strategies

1. Retry with Exponential Backoff

import asyncio
import random

async def retry_with_backoff(func, max_retries: int = 3, base_delay: float = 1.0):
    """Retry function with exponential backoff"""
    
    for attempt in range(max_retries):
        try:
            return await func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise e
            
            # Calculate delay with jitter
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Retry {attempt + 1}/{max_retries} in {delay:.2f}s")
            await asyncio.sleep(delay)

2. Circuit Breaker Pattern

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED
    
    async def call(self, func):
        """Call function with circuit breaker protection"""
        
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = await func()
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e
    
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

Real-World Implementation Example

class ProductionAIFallback:
    def __init__(self):
        self.providers = [
            {
                'name': 'openai',
                'api_key': os.getenv('OPENAI_API_KEY'),
                'endpoint': 'https://api.openai.com/v1/chat/completions',
                'models': ['gpt-4', 'gpt-3.5-turbo']
            },
            {
                'name': 'anthropic',
                'api_key': os.getenv('ANTHROPIC_API_KEY'),
                'endpoint': 'https://api.anthropic.com/v1/messages',
                'models': ['claude-3-opus', 'claude-3-sonnet']
            },
            {
                'name': 'together',
                'api_key': os.getenv('TOGETHER_API_KEY'),
                'endpoint': 'https://api.together.xyz/v1/completions',
                'models': ['llama-2-7b', 'mistral-7b']
            }
        ]
        
        self.circuit_breakers = {
            p['name']: CircuitBreaker() for p in self.providers
        }
        self.cache = {}
    
    async def generate_text(self, prompt: str, model: str = None) -> str:
        """Generate text with production-ready fallback"""
        
        # Check cache first
        cache_key = hashlib.md5(f"{prompt}:{model}".encode()).hexdigest()
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # Try each provider with circuit breaker
        for provider in self.providers:
            try:
                circuit_breaker = self.circuit_breakers[provider['name']]
                
                async def call_provider():
                    return await self._call_provider(provider, prompt, model)
                
                result = await circuit_breaker.call(call_provider)
                
                # Cache successful result
                self.cache[cache_key] = result
                return result
                
            except Exception as e:
                print(f"❌ {provider['name']} failed: {e}")
                continue
        
        # Return cached fallback if available
        fallback_key = f"fallback:{cache_key}"
        if fallback_key in self.cache:
            return self.cache[fallback_key]
        
        raise Exception("All providers failed")
    
    async def _call_provider(self, provider: Dict, prompt: str, model: str) -> str:
        """Call a specific provider"""
        # Implementation would vary by provider
        # This is a simplified example
        pass

Best Practices

  1. Monitor and Log: Track all fallback events and provider performance
  2. Test Regularly: Simulate provider failures to ensure fallback works
  3. Use Timeouts: Prevent hanging requests from blocking your application
  4. Implement Caching: Reduce API calls and provide fallback responses
  5. Handle All Errors: Don’t just catch generic exceptions
  6. Document Dependencies: Keep track of provider-specific requirements

Conclusion

Implementing robust AI API fallback is essential for production applications. Start with simple sequential fallback and gradually add more sophisticated features like parallel execution, health monitoring, and caching as your needs grow.

The key is to always have a plan for when things go wrong and to test your fallback mechanisms regularly.

← Back to Learning Center