Effective error recovery is crucial for maintaining reliable AI services. This guide covers comprehensive error handling strategies, recovery mechanisms, and best practices for managing AI model errors and failures.
Managed Error Recovery
Tetrate Agent Router Service provides comprehensive error recovery capabilities with intelligent error classification, automatic recovery mechanisms, and advanced monitoring. This managed service handles the complexity of error recovery, ensuring your AI systems automatically recover from failures and maintain high availability.
- Intelligent error classification
- Automatic recovery mechanisms
- Advanced monitoring and alerting
- Proactive error prevention
Understanding AI Model Errors
Common Error Types
- API Errors: HTTP errors, rate limits, authentication failures
- Model Errors: Inference failures, timeout errors, quality issues
- Infrastructure Errors: Network issues, service outages, resource constraints
- Data Errors: Invalid input, format issues, content violations
- Business Logic Errors: Cost limits, quota exhaustion, policy violations
Error Classification Framework
import time
import asyncio
from typing import Dict, List, Optional, Callable
from enum import Enum
class ErrorSeverity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class ErrorType(Enum):
TRANSIENT = "transient" # Temporary, can be retried
PERMANENT = "permanent" # Permanent, needs different approach
RATE_LIMIT = "rate_limit" # Rate limiting, needs backoff
AUTHENTICATION = "auth" # Authentication issues
NETWORK = "network" # Network connectivity issues
MODEL = "model" # Model-specific errors
DATA = "data" # Data-related errors
class AIError:
def __init__(self, error_type: ErrorType, message: str, severity: ErrorSeverity,
retryable: bool = True, context: Dict = None):
self.error_type = error_type
self.message = message
self.severity = severity
self.retryable = retryable
self.context = context or {}
self.timestamp = time.time()
self.retry_count = 0
def __str__(self):
return f"{self.error_type.value}: {self.message} (Severity: {self.severity.value})"
class ErrorClassifier:
def __init__(self):
self.error_patterns = {
'rate_limit': [
'rate limit', 'too many requests', '429', 'quota exceeded'
],
'authentication': [
'unauthorized', '401', '403', 'invalid api key', 'authentication failed'
],
'network': [
'timeout', 'connection refused', 'network error', 'dns resolution'
],
'model': [
'model error', 'inference failed', 'model unavailable', 'model timeout'
],
'data': [
'invalid input', 'content policy', 'malformed request', 'data validation'
]
}
def classify_error(self, error_message: str, http_status: int = None) -> AIError:
"""Classify an error based on message and HTTP status"""
error_message_lower = error_message.lower()
# Check for rate limiting
if any(pattern in error_message_lower for pattern in self.error_patterns['rate_limit']) or http_status == 429:
return AIError(
ErrorType.RATE_LIMIT,
error_message,
ErrorSeverity.MEDIUM,
retryable=True,
context={'http_status': http_status}
)
# Check for authentication errors
if any(pattern in error_message_lower for pattern in self.error_patterns['authentication']) or http_status in [401, 403]:
return AIError(
ErrorType.AUTHENTICATION,
error_message,
ErrorSeverity.HIGH,
retryable=False,
context={'http_status': http_status}
)
# Check for network errors
if any(pattern in error_message_lower for pattern in self.error_patterns['network']):
return AIError(
ErrorType.NETWORK,
error_message,
ErrorSeverity.MEDIUM,
retryable=True,
context={'http_status': http_status}
)
# Check for model errors
if any(pattern in error_message_lower for pattern in self.error_patterns['model']):
return AIError(
ErrorType.MODEL,
error_message,
ErrorSeverity.HIGH,
retryable=True,
context={'http_status': http_status}
)
# Check for data errors
if any(pattern in error_message_lower for pattern in self.error_patterns['data']):
return AIError(
ErrorType.DATA,
error_message,
ErrorSeverity.LOW,
retryable=False,
context={'http_status': http_status}
)
# Default classification
return AIError(
ErrorType.TRANSIENT,
error_message,
ErrorSeverity.MEDIUM,
retryable=True,
context={'http_status': http_status}
)
Error Recovery Strategies
1. Retry Mechanisms
class RetryManager:
def __init__(self):
self.retry_configs = {
ErrorType.TRANSIENT: {
'max_retries': 3,
'base_delay': 1.0,
'max_delay': 10.0,
'backoff_factor': 2.0
},
ErrorType.RATE_LIMIT: {
'max_retries': 5,
'base_delay': 5.0,
'max_delay': 60.0,
'backoff_factor': 2.0
},
ErrorType.NETWORK: {
'max_retries': 3,
'base_delay': 2.0,
'max_delay': 15.0,
'backoff_factor': 1.5
},
ErrorType.MODEL: {
'max_retries': 2,
'base_delay': 3.0,
'max_delay': 20.0,
'backoff_factor': 2.0
}
}
async def execute_with_retry(self, operation: Callable, *args, **kwargs) -> Dict:
"""Execute an operation with retry logic"""
error = None
retry_count = 0
# Get retry config based on operation type (default to transient)
retry_config = self.retry_configs.get(ErrorType.TRANSIENT, self.retry_configs[ErrorType.TRANSIENT])
while retry_count <= retry_config['max_retries']:
try:
result = await operation(*args, **kwargs)
return {
'success': True,
'result': result,
'retry_count': retry_count
}
except Exception as e:
error = e
ai_error = self._classify_exception(e)
# Check if error is retryable
if not ai_error.retryable:
break
# Check if we've exceeded max retries
if retry_count >= retry_config['max_retries']:
break
# Get retry config for this error type
error_retry_config = self.retry_configs.get(ai_error.error_type, retry_config)
# Calculate delay
delay = min(
error_retry_config['base_delay'] * (error_retry_config['backoff_factor'] ** retry_count),
error_retry_config['max_delay']
)
print(f"Retry {retry_count + 1}/{error_retry_config['max_retries']} after {delay:.1f}s: {ai_error}")
# Wait before retry
await asyncio.sleep(delay)
retry_count += 1
return {
'success': False,
'error': ai_error,
'retry_count': retry_count,
'final_error': str(error)
}
def _classify_exception(self, exception: Exception) -> AIError:
"""Classify an exception as an AIError"""
error_message = str(exception)
# Try to extract HTTP status from exception
http_status = None
if hasattr(exception, 'status_code'):
http_status = exception.status_code
elif hasattr(exception, 'response'):
http_status = getattr(exception.response, 'status_code', None)
classifier = ErrorClassifier()
return classifier.classify_error(error_message, http_status)
2. Circuit Breaker Pattern
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60,
expected_exception: type = Exception):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
async def call(self, operation: Callable, *args, **kwargs) -> Dict:
"""Execute operation with circuit breaker protection"""
if self.state == 'OPEN':
if self._should_attempt_reset():
self.state = 'HALF_OPEN'
else:
return {
'success': False,
'error': 'Circuit breaker is OPEN',
'state': self.state
}
try:
result = await operation(*args, **kwargs)
self._on_success()
return {
'success': True,
'result': result,
'state': self.state
}
except self.expected_exception as e:
self._on_failure()
return {
'success': False,
'error': str(e),
'state': self.state
}
def _on_success(self):
"""Handle successful operation"""
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED'
def _on_failure(self):
"""Handle failed operation"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
def _should_attempt_reset(self) -> bool:
"""Check if circuit breaker should attempt reset"""
if self.last_failure_time is None:
return True
return time.time() - self.last_failure_time >= self.recovery_timeout
def get_status(self) -> Dict:
"""Get circuit breaker status"""
return {
'state': self.state,
'failure_count': self.failure_count,
'last_failure_time': self.last_failure_time,
'failure_threshold': self.failure_threshold,
'recovery_timeout': self.recovery_timeout
}
3. Fallback Strategies
class FallbackManager:
def __init__(self):
self.fallback_strategies = {
ErrorType.RATE_LIMIT: self._handle_rate_limit_fallback,
ErrorType.MODEL: self._handle_model_fallback,
ErrorType.NETWORK: self._handle_network_fallback,
ErrorType.AUTHENTICATION: self._handle_auth_fallback
}
self.cached_responses = {}
async def execute_with_fallback(self, primary_operation: Callable,
fallback_operation: Callable = None,
*args, **kwargs) -> Dict:
"""Execute operation with fallback strategy"""
try:
# Try primary operation
result = await primary_operation(*args, **kwargs)
return {
'success': True,
'result': result,
'strategy': 'primary'
}
except Exception as e:
ai_error = self._classify_exception(e)
# Check if we have a specific fallback strategy
if ai_error.error_type in self.fallback_strategies:
fallback_result = await self.fallback_strategies[ai_error.error_type](e, *args, **kwargs)
if fallback_result['success']:
return fallback_result
# Try generic fallback operation
if fallback_operation:
try:
fallback_result = await fallback_operation(*args, **kwargs)
return {
'success': True,
'result': fallback_result,
'strategy': 'generic_fallback'
}
except Exception as fallback_error:
return {
'success': False,
'error': f"Both primary and fallback failed: {str(e)}, {str(fallback_error)}",
'strategy': 'failed'
}
return {
'success': False,
'error': str(e),
'strategy': 'no_fallback'
}
async def _handle_rate_limit_fallback(self, error: Exception, *args, **kwargs) -> Dict:
"""Handle rate limit errors with fallback"""
# Try to use cached response if available
cache_key = self._generate_cache_key(*args, **kwargs)
if cache_key in self.cached_responses:
return {
'success': True,
'result': self.cached_responses[cache_key],
'strategy': 'cached_response'
}
# Try alternative provider or model
return await self._try_alternative_provider(*args, **kwargs)
async def _handle_model_fallback(self, error: Exception, *args, **kwargs) -> Dict:
"""Handle model errors with fallback"""
# Try simpler/faster model
return await self._try_simpler_model(*args, **kwargs)
async def _handle_network_fallback(self, error: Exception, *args, **kwargs) -> Dict:
"""Handle network errors with fallback"""
# Try alternative endpoint or region
return await self._try_alternative_endpoint(*args, **kwargs)
async def _handle_auth_fallback(self, error: Exception, *args, **kwargs) -> Dict:
"""Handle authentication errors with fallback"""
# Try alternative API key or credentials
return await self._try_alternative_credentials(*args, **kwargs)
def _generate_cache_key(self, *args, **kwargs) -> str:
"""Generate cache key for request"""
import hashlib
key_data = str(args) + str(sorted(kwargs.items()))
return hashlib.md5(key_data.encode()).hexdigest()
async def _try_alternative_provider(self, *args, **kwargs) -> Dict:
"""Try alternative AI provider"""
# Implementation would depend on available providers
return {'success': False, 'error': 'No alternative provider available'}
async def _try_simpler_model(self, *args, **kwargs) -> Dict:
"""Try simpler/faster model"""
# Implementation would depend on available models
return {'success': False, 'error': 'No simpler model available'}
async def _try_alternative_endpoint(self, *args, **kwargs) -> Dict:
"""Try alternative endpoint"""
# Implementation would depend on available endpoints
return {'success': False, 'error': 'No alternative endpoint available'}
async def _try_alternative_credentials(self, *args, **kwargs) -> Dict:
"""Try alternative credentials"""
# Implementation would depend on available credentials
return {'success': False, 'error': 'No alternative credentials available'}
Error Monitoring and Alerting
1. Error Tracking System
class ErrorTracker:
def __init__(self):
self.error_log = []
self.error_counts = {}
self.alert_thresholds = {
'error_rate': 0.05, # 5% error rate
'consecutive_failures': 10,
'critical_errors': 5
}
def log_error(self, error: AIError, context: Dict = None):
"""Log an error for tracking"""
error_entry = {
'error': error,
'context': context or {},
'timestamp': time.time()
}
self.error_log.append(error_entry)
# Update error counts
error_key = f"{error.error_type.value}_{error.severity.value}"
self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
# Check for alerts
await self._check_alerts(error_entry)
async def _check_alerts(self, error_entry: Dict):
"""Check if alerts should be triggered"""
# Calculate error rate
recent_errors = [
e for e in self.error_log
if time.time() - e['timestamp'] < 3600 # Last hour
]
if len(recent_errors) > 100: # Need sufficient data
error_rate = len(recent_errors) / 100 # Assuming 100 requests per hour baseline
if error_rate > self.alert_thresholds['error_rate']:
await self._send_alert('high_error_rate', {
'error_rate': error_rate,
'threshold': self.alert_thresholds['error_rate']
})
# Check for consecutive failures
consecutive_failures = 0
for error in reversed(self.error_log):
if error['error'].severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
consecutive_failures += 1
else:
break
if consecutive_failures >= self.alert_thresholds['consecutive_failures']:
await self._send_alert('consecutive_failures', {
'count': consecutive_failures,
'threshold': self.alert_thresholds['consecutive_failures']
})
# Check for critical errors
critical_errors = [
e for e in recent_errors
if e['error'].severity == ErrorSeverity.CRITICAL
]
if len(critical_errors) >= self.alert_thresholds['critical_errors']:
await self._send_alert('critical_errors', {
'count': len(critical_errors),
'threshold': self.alert_thresholds['critical_errors']
})
async def _send_alert(self, alert_type: str, data: Dict):
"""Send alert notification"""
alert = {
'type': alert_type,
'data': data,
'timestamp': time.time(),
'severity': 'high' if alert_type in ['critical_errors', 'consecutive_failures'] else 'medium'
}
# Send alert via configured channels (email, Slack, etc.)
print(f"🚨 ALERT: {alert_type} - {data}")
def get_error_summary(self, hours: int = 24) -> Dict:
"""Get error summary for the specified time period"""
cutoff_time = time.time() - (hours * 3600)
recent_errors = [
e for e in self.error_log
if e['timestamp'] >= cutoff_time
]
error_summary = {
'total_errors': len(recent_errors),
'error_types': {},
'severity_distribution': {},
'most_common_errors': []
}
# Count by error type
for error_entry in recent_errors:
error_type = error_entry['error'].error_type.value
error_summary['error_types'][error_type] = error_summary['error_types'].get(error_type, 0) + 1
severity = error_entry['error'].severity.value
error_summary['severity_distribution'][severity] = error_summary['severity_distribution'].get(severity, 0) + 1
# Find most common errors
error_messages = {}
for error_entry in recent_errors:
message = error_entry['error'].message
error_messages[message] = error_messages.get(message, 0) + 1
error_summary['most_common_errors'] = sorted(
error_messages.items(),
key=lambda x: x[1],
reverse=True
)[:5]
return error_summary
2. Error Recovery Dashboard
class ErrorRecoveryDashboard:
def __init__(self):
self.error_tracker = ErrorTracker()
self.circuit_breakers = {}
self.recovery_metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'recovered_requests': 0
}
async def track_request(self, request_id: str, operation: Callable, *args, **kwargs) -> Dict:
"""Track a request with error recovery"""
self.recovery_metrics['total_requests'] += 1
try:
# Try primary operation
result = await operation(*args, **kwargs)
self.recovery_metrics['successful_requests'] += 1
return {
'request_id': request_id,
'success': True,
'result': result,
'strategy': 'primary'
}
except Exception as e:
self.recovery_metrics['failed_requests'] += 1
# Log error
ai_error = self._classify_exception(e)
self.error_tracker.log_error(ai_error, {
'request_id': request_id,
'args': args,
'kwargs': kwargs
})
# Try recovery
recovery_result = await self._attempt_recovery(ai_error, operation, *args, **kwargs)
if recovery_result['success']:
self.recovery_metrics['recovered_requests'] += 1
return recovery_result
async def _attempt_recovery(self, error: AIError, operation: Callable, *args, **kwargs) -> Dict:
"""Attempt to recover from error"""
# Use circuit breaker if available
circuit_breaker_key = f"{operation.__name__}_{error.error_type.value}"
if circuit_breaker_key not in self.circuit_breakers:
self.circuit_breakers[circuit_breaker_key] = CircuitBreaker()
circuit_breaker = self.circuit_breakers[circuit_breaker_key]
# Try with retry and fallback
retry_manager = RetryManager()
fallback_manager = FallbackManager()
# First try with retry
retry_result = await retry_manager.execute_with_retry(operation, *args, **kwargs)
if retry_result['success']:
return retry_result
# If retry fails, try with fallback
fallback_result = await fallback_manager.execute_with_fallback(
operation, None, *args, **kwargs
)
return fallback_result
def get_dashboard_data(self) -> Dict:
"""Get dashboard data for monitoring"""
return {
'metrics': self.recovery_metrics,
'error_summary': self.error_tracker.get_error_summary(),
'circuit_breakers': {
key: breaker.get_status()
for key, breaker in self.circuit_breakers.items()
},
'success_rate': (
self.recovery_metrics['successful_requests'] + self.recovery_metrics['recovered_requests']
) / max(self.recovery_metrics['total_requests'], 1) * 100,
'recovery_rate': (
self.recovery_metrics['recovered_requests'] / max(self.recovery_metrics['failed_requests'], 1) * 100
)
}
Best Practices
- Classify Errors: Implement proper error classification for targeted recovery
- Use Retry Logic: Implement exponential backoff for transient errors
- Circuit Breakers: Use circuit breakers to prevent cascading failures
- Fallback Strategies: Have multiple fallback options for different error types
- Monitor Errors: Track error patterns and recovery success rates
- Alert on Issues: Set up alerts for critical error patterns
- Test Recovery: Regularly test error recovery mechanisms
- Document Procedures: Maintain runbooks for manual error recovery
Conclusion
Effective AI model error recovery requires a comprehensive approach that includes proper error classification, retry mechanisms, circuit breakers, and fallback strategies. By implementing these techniques and best practices, you can maintain high availability and reliability for your AI services.
The key is to understand the different types of errors, implement appropriate recovery strategies for each, and continuously monitor and improve your error handling processes.