Effective error handling is crucial for building reliable AI applications. This guide covers comprehensive error handling strategies, retry mechanisms, graceful degradation, and implementation patterns.
Managed Error Handling
Tetrate Agent Router Service provides advanced error handling capabilities with automatic retry mechanisms, circuit breakers, and graceful degradation. This managed service handles the complexity of error management, ensuring your AI applications remain resilient even when individual components fail.
- Automatic retry with exponential backoff
- Circuit breaker pattern implementation
- Graceful degradation strategies
- Comprehensive error monitoring
Common AI Model Errors
- Rate Limiting: 429 errors from API providers
- Authentication Failures: Invalid or expired API keys
- Network Issues: Timeouts and connection problems
- Model Errors: Invalid requests or model-specific issues
- Quota Exhaustion: Monthly or daily limits reached
- Service Outages: Provider downtime
Error Handling Strategies
1. Retry with Exponential Backoff
import asyncio
import random
import time
from typing import Callable, Any, Optional
class RetryHandler:
def __init__(self, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
async def retry_with_backoff(self, func: Callable, *args, **kwargs) -> Any:
"""Retry function with exponential backoff"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
# Don't retry on certain errors
if self._should_not_retry(e):
raise e
# Don't sleep on last attempt
if attempt == self.max_retries:
break
# Calculate delay with jitter
delay = min(
self.base_delay * (2 ** attempt) + random.uniform(0, 1),
self.max_delay
)
print(f"Retry {attempt + 1}/{self.max_retries} in {delay:.2f}s due to: {e}")
await asyncio.sleep(delay)
raise last_exception
def _should_not_retry(self, exception: Exception) -> bool:
"""Determine if an exception should not be retried"""
# Don't retry authentication errors
if "authentication" in str(exception).lower() or "unauthorized" in str(exception).lower():
return True
# Don't retry invalid request errors
if "invalid" in str(exception).lower() or "bad request" in str(exception).lower():
return True
# Don't retry quota exceeded errors
if "quota" in str(exception).lower() or "limit" in str(exception).lower():
return True
return False
## Usage
retry_handler = RetryHandler(max_retries=3, base_delay=1.0)
async def call_ai_model(prompt: str) -> str:
# Implementation that might fail
pass
try:
result = await retry_handler.retry_with_backoff(call_ai_model, "Hello, world!")
except Exception as e:
print(f"All retries failed: {e}")
2. Circuit Breaker Pattern
import time
from enum import Enum
from typing import Dict, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject requests
HALF_OPEN = "half_open" # Testing if service recovered
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = 0
self.state = CircuitState.CLOSED
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Call function with circuit breaker protection"""
if self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
print("π Circuit breaker half-open, testing recovery")
else:
raise Exception("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _on_success(self):
"""Handle successful call"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""Handle failed call"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"β Circuit breaker opened after {self.failure_count} failures")
def get_state(self) -> CircuitState:
"""Get current circuit breaker state"""
return self.state
## Usage
circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)
try:
result = await circuit_breaker.call(call_ai_model, "Hello, world!")
except Exception as e:
print(f"Request failed: {e}")
3. Graceful Degradation
class GracefulDegradation:
def __init__(self, fallback_strategies: Dict[str, Callable]):
self.fallback_strategies = fallback_strategies
self.degradation_level = 0
async def execute_with_degradation(self, primary_func: Callable, *args, **kwargs) -> Any:
"""Execute with graceful degradation"""
# Try primary function first
try:
result = await primary_func(*args, **kwargs)
self.degradation_level = 0
return result
except Exception as e:
print(f"Primary function failed: {e}")
return await self._try_fallbacks(*args, **kwargs)
async def _try_fallbacks(self, *args, **kwargs) -> Any:
"""Try fallback strategies in order"""
for level, fallback_func in self.fallback_strategies.items():
try:
print(f"π Trying fallback level {level}")
result = await fallback_func(*args, **kwargs)
self.degradation_level = int(level)
return result
except Exception as e:
print(f"Fallback level {level} failed: {e}")
continue
# All fallbacks failed
raise Exception("All fallback strategies failed")
def get_degradation_level(self) -> int:
"""Get current degradation level"""
return self.degradation_level
## Usage
async def fallback_model_1(prompt: str) -> str:
# Use a simpler model
return "Fallback response 1"
async def fallback_model_2(prompt: str) -> str:
# Use cached responses
return "Fallback response 2"
async def fallback_model_3(prompt: str) -> str:
# Return static response
return "Service temporarily unavailable"
fallback_strategies = {
"1": fallback_model_1,
"2": fallback_model_2,
"3": fallback_model_3
}
degradation = GracefulDegradation(fallback_strategies)
try:
result = await degradation.execute_with_degradation(call_ai_model, "Hello, world!")
print(f"Result: {result}")
print(f"Degradation level: {degradation.get_degradation_level()}")
except Exception as e:
print(f"All strategies failed: {e}")
4. Error Classification and Handling
from enum import Enum
from typing import Dict, List, Optional
class ErrorType(Enum):
RATE_LIMIT = "rate_limit"
AUTHENTICATION = "authentication"
NETWORK = "network"
MODEL_ERROR = "model_error"
QUOTA_EXCEEDED = "quota_exceeded"
UNKNOWN = "unknown"
class ErrorClassifier:
def __init__(self):
self.error_patterns = {
ErrorType.RATE_LIMIT: [
"rate limit", "429", "too many requests", "throttled"
],
ErrorType.AUTHENTICATION: [
"authentication", "unauthorized", "401", "403", "invalid api key"
],
ErrorType.NETWORK: [
"timeout", "connection", "network", "dns", "ssl"
],
ErrorType.MODEL_ERROR: [
"invalid request", "model", "parameter", "400"
],
ErrorType.QUOTA_EXCEEDED: [
"quota", "limit", "billing", "usage"
]
}
def classify_error(self, exception: Exception) -> ErrorType:
"""Classify an exception by type"""
error_message = str(exception).lower()
for error_type, patterns in self.error_patterns.items():
for pattern in patterns:
if pattern in error_message:
return error_type
return ErrorType.UNKNOWN
class ErrorHandler:
def __init__(self):
self.classifier = ErrorClassifier()
self.handlers = {
ErrorType.RATE_LIMIT: self._handle_rate_limit,
ErrorType.AUTHENTICATION: self._handle_authentication,
ErrorType.NETWORK: self._handle_network,
ErrorType.MODEL_ERROR: self._handle_model_error,
ErrorType.QUOTA_EXCEEDED: self._handle_quota_exceeded,
ErrorType.UNKNOWN: self._handle_unknown
}
async def handle_error(self, exception: Exception, context: Dict = None) -> Optional[Any]:
"""Handle error based on classification"""
error_type = self.classifier.classify_error(exception)
handler = self.handlers.get(error_type, self._handle_unknown)
return await handler(exception, context or {})
async def _handle_rate_limit(self, exception: Exception, context: Dict) -> Optional[Any]:
"""Handle rate limit errors"""
print("π Rate limit detected, implementing backoff strategy")
# Implement exponential backoff
await asyncio.sleep(random.uniform(1, 5))
return None
async def _handle_authentication(self, exception: Exception, context: Dict) -> Optional[Any]:
"""Handle authentication errors"""
print("π Authentication error, check API keys")
# Could implement API key rotation here
raise exception # Don't retry auth errors
async def _handle_network(self, exception: Exception, context: Dict) -> Optional[Any]:
"""Handle network errors"""
print("π Network error, will retry")
# Network errors are usually transient
return None
async def _handle_model_error(self, exception: Exception, context: Dict) -> Optional[Any]:
"""Handle model-specific errors"""
print("π€ Model error, check request parameters")
# Could implement request validation or model switching
raise exception
async def _handle_quota_exceeded(self, exception: Exception, context: Dict) -> Optional[Any]:
"""Handle quota exceeded errors"""
print("π° Quota exceeded, switch to fallback model")
# Switch to different model or provider
return await self._get_fallback_response(context)
async def _handle_unknown(self, exception: Exception, context: Dict) -> Optional[Any]:
"""Handle unknown errors"""
print(f"β Unknown error: {exception}")
return None
async def _get_fallback_response(self, context: Dict) -> str:
"""Get fallback response when quota is exceeded"""
return "Service temporarily unavailable due to quota limits"
5. Comprehensive Error Handling System
class AIErrorHandler:
def __init__(self):
self.retry_handler = RetryHandler(max_retries=3)
self.circuit_breaker = CircuitBreaker(failure_threshold=5)
self.error_handler = ErrorHandler()
self.degradation = GracefulDegradation({
"1": self._fallback_model_1,
"2": self._fallback_model_2,
"3": self._fallback_model_3
})
async def execute_with_error_handling(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function with comprehensive error handling"""
try:
# Try with circuit breaker and retry
async def protected_call():
return await self.circuit_breaker.call(func, *args, **kwargs)
result = await self.retry_handler.retry_with_backoff(protected_call)
return result
except Exception as e:
# Handle specific error types
handled_result = await self.error_handler.handle_error(e, {
'function': func.__name__,
'args': args,
'kwargs': kwargs
})
if handled_result is not None:
return handled_result
# Try graceful degradation
try:
return await self.degradation.execute_with_degradation(func, *args, **kwargs)
except Exception as degradation_error:
# All error handling failed
raise Exception(f"All error handling strategies failed: {e} -> {degradation_error}")
async def _fallback_model_1(self, *args, **kwargs) -> str:
"""First fallback: use simpler model"""
return "Fallback response from simpler model"
async def _fallback_model_2(self, *args, **kwargs) -> str:
"""Second fallback: use cached responses"""
return "Fallback response from cache"
async def _fallback_model_3(self, *args, **kwargs) -> str:
"""Third fallback: static response"""
return "Service temporarily unavailable"
## Usage
ai_error_handler = AIErrorHandler()
async def call_ai_model(prompt: str) -> str:
# Simulate AI model call that might fail
if random.random() < 0.3: # 30% chance of failure
raise Exception("Rate limit exceeded")
return f"AI response to: {prompt}"
try:
result = await ai_error_handler.execute_with_error_handling(
call_ai_model, "Hello, world!"
)
print(f"Success: {result}")
except Exception as e:
print(f"All error handling failed: {e}")
Error Monitoring and Logging
1. Error Tracking
import logging
from datetime import datetime
class ErrorTracker:
def __init__(self):
self.error_counts = {}
self.error_history = []
self.logger = logging.getLogger(__name__)
def track_error(self, error: Exception, context: Dict = None):
"""Track an error occurrence"""
error_type = type(error).__name__
timestamp = datetime.now()
# Update error counts
self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
# Record error details
error_record = {
'timestamp': timestamp,
'error_type': error_type,
'error_message': str(error),
'context': context or {}
}
self.error_history.append(error_record)
# Log error
self.logger.error(f"Error tracked: {error_type} - {error}")
# Keep only recent errors
if len(self.error_history) > 1000:
self.error_history.pop(0)
def get_error_summary(self) -> Dict:
"""Get summary of recent errors"""
return {
'total_errors': len(self.error_history),
'error_counts': self.error_counts,
'recent_errors': self.error_history[-10:] # Last 10 errors
}
def get_error_rate(self, window_minutes: int = 60) -> float:
"""Calculate error rate in recent window"""
cutoff_time = datetime.now().timestamp() - (window_minutes * 60)
recent_errors = [
e for e in self.error_history
if e['timestamp'].timestamp() > cutoff_time
]
return len(recent_errors) / window_minutes # errors per minute
Best Practices
- Classify Errors: Differentiate between retryable and non-retryable errors
- Use Circuit Breakers: Prevent cascading failures
- Implement Graceful Degradation: Provide fallback responses
- Monitor Error Patterns: Track and analyze error occurrences
- Set Appropriate Timeouts: Donβt let requests hang indefinitely
- Log Everything: Maintain detailed error logs for debugging
- Test Error Scenarios: Regularly test error handling mechanisms
- Document Error Responses: Maintain clear documentation of error handling
Conclusion
Effective error handling is essential for building reliable AI applications. By implementing comprehensive error handling strategies including retries, circuit breakers, graceful degradation, and proper monitoring, you can ensure your AI applications remain robust even when individual components fail.
The key is to start with basic error handling and gradually add more sophisticated features as your needs grow.