AI Model Error Recovery

Master AI model error recovery strategies. Learn comprehensive error handling, recovery mechanisms, and best practices for managing AI model failures and errors.

Effective error recovery is crucial for maintaining reliable AI services. This guide covers comprehensive error handling strategies, recovery mechanisms, and best practices for managing AI model errors and failures.

🚀

Managed Error Recovery

Tetrate Agent Router Service provides comprehensive error recovery capabilities with intelligent error classification, automatic recovery mechanisms, and advanced monitoring. This managed service handles the complexity of error recovery, ensuring your AI systems automatically recover from failures and maintain high availability.

  • Intelligent error classification
  • Automatic recovery mechanisms
  • Advanced monitoring and alerting
  • Proactive error prevention
Learn More →

Understanding AI Model Errors

Common Error Types

  • API Errors: HTTP errors, rate limits, authentication failures
  • Model Errors: Inference failures, timeout errors, quality issues
  • Infrastructure Errors: Network issues, service outages, resource constraints
  • Data Errors: Invalid input, format issues, content violations
  • Business Logic Errors: Cost limits, quota exhaustion, policy violations

Error Classification Framework

import time
import asyncio
from typing import Dict, List, Optional, Callable
from enum import Enum

class ErrorSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class ErrorType(Enum):
    TRANSIENT = "transient"      # Temporary, can be retried
    PERMANENT = "permanent"      # Permanent, needs different approach
    RATE_LIMIT = "rate_limit"    # Rate limiting, needs backoff
    AUTHENTICATION = "auth"      # Authentication issues
    NETWORK = "network"          # Network connectivity issues
    MODEL = "model"              # Model-specific errors
    DATA = "data"                # Data-related errors

class AIError:
    def __init__(self, error_type: ErrorType, message: str, severity: ErrorSeverity, 
                 retryable: bool = True, context: Dict = None):
        self.error_type = error_type
        self.message = message
        self.severity = severity
        self.retryable = retryable
        self.context = context or {}
        self.timestamp = time.time()
        self.retry_count = 0
    
    def __str__(self):
        return f"{self.error_type.value}: {self.message} (Severity: {self.severity.value})"

class ErrorClassifier:
    def __init__(self):
        self.error_patterns = {
            'rate_limit': [
                'rate limit', 'too many requests', '429', 'quota exceeded'
            ],
            'authentication': [
                'unauthorized', '401', '403', 'invalid api key', 'authentication failed'
            ],
            'network': [
                'timeout', 'connection refused', 'network error', 'dns resolution'
            ],
            'model': [
                'model error', 'inference failed', 'model unavailable', 'model timeout'
            ],
            'data': [
                'invalid input', 'content policy', 'malformed request', 'data validation'
            ]
        }
    
    def classify_error(self, error_message: str, http_status: int = None) -> AIError:
        """Classify an error based on message and HTTP status"""
        error_message_lower = error_message.lower()
        
        # Check for rate limiting
        if any(pattern in error_message_lower for pattern in self.error_patterns['rate_limit']) or http_status == 429:
            return AIError(
                ErrorType.RATE_LIMIT,
                error_message,
                ErrorSeverity.MEDIUM,
                retryable=True,
                context={'http_status': http_status}
            )
        
        # Check for authentication errors
        if any(pattern in error_message_lower for pattern in self.error_patterns['authentication']) or http_status in [401, 403]:
            return AIError(
                ErrorType.AUTHENTICATION,
                error_message,
                ErrorSeverity.HIGH,
                retryable=False,
                context={'http_status': http_status}
            )
        
        # Check for network errors
        if any(pattern in error_message_lower for pattern in self.error_patterns['network']):
            return AIError(
                ErrorType.NETWORK,
                error_message,
                ErrorSeverity.MEDIUM,
                retryable=True,
                context={'http_status': http_status}
            )
        
        # Check for model errors
        if any(pattern in error_message_lower for pattern in self.error_patterns['model']):
            return AIError(
                ErrorType.MODEL,
                error_message,
                ErrorSeverity.HIGH,
                retryable=True,
                context={'http_status': http_status}
            )
        
        # Check for data errors
        if any(pattern in error_message_lower for pattern in self.error_patterns['data']):
            return AIError(
                ErrorType.DATA,
                error_message,
                ErrorSeverity.LOW,
                retryable=False,
                context={'http_status': http_status}
            )
        
        # Default classification
        return AIError(
            ErrorType.TRANSIENT,
            error_message,
            ErrorSeverity.MEDIUM,
            retryable=True,
            context={'http_status': http_status}
        )

Error Recovery Strategies

1. Retry Mechanisms

class RetryManager:
    def __init__(self):
        self.retry_configs = {
            ErrorType.TRANSIENT: {
                'max_retries': 3,
                'base_delay': 1.0,
                'max_delay': 10.0,
                'backoff_factor': 2.0
            },
            ErrorType.RATE_LIMIT: {
                'max_retries': 5,
                'base_delay': 5.0,
                'max_delay': 60.0,
                'backoff_factor': 2.0
            },
            ErrorType.NETWORK: {
                'max_retries': 3,
                'base_delay': 2.0,
                'max_delay': 15.0,
                'backoff_factor': 1.5
            },
            ErrorType.MODEL: {
                'max_retries': 2,
                'base_delay': 3.0,
                'max_delay': 20.0,
                'backoff_factor': 2.0
            }
        }
    
    async def execute_with_retry(self, operation: Callable, *args, **kwargs) -> Dict:
        """Execute an operation with retry logic"""
        error = None
        retry_count = 0
        
        # Get retry config based on operation type (default to transient)
        retry_config = self.retry_configs.get(ErrorType.TRANSIENT, self.retry_configs[ErrorType.TRANSIENT])
        
        while retry_count <= retry_config['max_retries']:
            try:
                result = await operation(*args, **kwargs)
                return {
                    'success': True,
                    'result': result,
                    'retry_count': retry_count
                }
            
            except Exception as e:
                error = e
                ai_error = self._classify_exception(e)
                
                # Check if error is retryable
                if not ai_error.retryable:
                    break
                
                # Check if we've exceeded max retries
                if retry_count >= retry_config['max_retries']:
                    break
                
                # Get retry config for this error type
                error_retry_config = self.retry_configs.get(ai_error.error_type, retry_config)
                
                # Calculate delay
                delay = min(
                    error_retry_config['base_delay'] * (error_retry_config['backoff_factor'] ** retry_count),
                    error_retry_config['max_delay']
                )
                
                print(f"Retry {retry_count + 1}/{error_retry_config['max_retries']} after {delay:.1f}s: {ai_error}")
                
                # Wait before retry
                await asyncio.sleep(delay)
                retry_count += 1
        
        return {
            'success': False,
            'error': ai_error,
            'retry_count': retry_count,
            'final_error': str(error)
        }
    
    def _classify_exception(self, exception: Exception) -> AIError:
        """Classify an exception as an AIError"""
        error_message = str(exception)
        
        # Try to extract HTTP status from exception
        http_status = None
        if hasattr(exception, 'status_code'):
            http_status = exception.status_code
        elif hasattr(exception, 'response'):
            http_status = getattr(exception.response, 'status_code', None)
        
        classifier = ErrorClassifier()
        return classifier.classify_error(error_message, http_status)

2. Circuit Breaker Pattern

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, 
                 expected_exception: type = Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    async def call(self, operation: Callable, *args, **kwargs) -> Dict:
        """Execute operation with circuit breaker protection"""
        if self.state == 'OPEN':
            if self._should_attempt_reset():
                self.state = 'HALF_OPEN'
            else:
                return {
                    'success': False,
                    'error': 'Circuit breaker is OPEN',
                    'state': self.state
                }
        
        try:
            result = await operation(*args, **kwargs)
            self._on_success()
            return {
                'success': True,
                'result': result,
                'state': self.state
            }
        
        except self.expected_exception as e:
            self._on_failure()
            return {
                'success': False,
                'error': str(e),
                'state': self.state
            }
    
    def _on_success(self):
        """Handle successful operation"""
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'CLOSED'
    
    def _on_failure(self):
        """Handle failed operation"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = 'OPEN'
    
    def _should_attempt_reset(self) -> bool:
        """Check if circuit breaker should attempt reset"""
        if self.last_failure_time is None:
            return True
        
        return time.time() - self.last_failure_time >= self.recovery_timeout
    
    def get_status(self) -> Dict:
        """Get circuit breaker status"""
        return {
            'state': self.state,
            'failure_count': self.failure_count,
            'last_failure_time': self.last_failure_time,
            'failure_threshold': self.failure_threshold,
            'recovery_timeout': self.recovery_timeout
        }

3. Fallback Strategies

class FallbackManager:
    def __init__(self):
        self.fallback_strategies = {
            ErrorType.RATE_LIMIT: self._handle_rate_limit_fallback,
            ErrorType.MODEL: self._handle_model_fallback,
            ErrorType.NETWORK: self._handle_network_fallback,
            ErrorType.AUTHENTICATION: self._handle_auth_fallback
        }
        self.cached_responses = {}
    
    async def execute_with_fallback(self, primary_operation: Callable, 
                                  fallback_operation: Callable = None,
                                  *args, **kwargs) -> Dict:
        """Execute operation with fallback strategy"""
        try:
            # Try primary operation
            result = await primary_operation(*args, **kwargs)
            return {
                'success': True,
                'result': result,
                'strategy': 'primary'
            }
        
        except Exception as e:
            ai_error = self._classify_exception(e)
            
            # Check if we have a specific fallback strategy
            if ai_error.error_type in self.fallback_strategies:
                fallback_result = await self.fallback_strategies[ai_error.error_type](e, *args, **kwargs)
                if fallback_result['success']:
                    return fallback_result
            
            # Try generic fallback operation
            if fallback_operation:
                try:
                    fallback_result = await fallback_operation(*args, **kwargs)
                    return {
                        'success': True,
                        'result': fallback_result,
                        'strategy': 'generic_fallback'
                    }
                except Exception as fallback_error:
                    return {
                        'success': False,
                        'error': f"Both primary and fallback failed: {str(e)}, {str(fallback_error)}",
                        'strategy': 'failed'
                    }
            
            return {
                'success': False,
                'error': str(e),
                'strategy': 'no_fallback'
            }
    
    async def _handle_rate_limit_fallback(self, error: Exception, *args, **kwargs) -> Dict:
        """Handle rate limit errors with fallback"""
        # Try to use cached response if available
        cache_key = self._generate_cache_key(*args, **kwargs)
        if cache_key in self.cached_responses:
            return {
                'success': True,
                'result': self.cached_responses[cache_key],
                'strategy': 'cached_response'
            }
        
        # Try alternative provider or model
        return await self._try_alternative_provider(*args, **kwargs)
    
    async def _handle_model_fallback(self, error: Exception, *args, **kwargs) -> Dict:
        """Handle model errors with fallback"""
        # Try simpler/faster model
        return await self._try_simpler_model(*args, **kwargs)
    
    async def _handle_network_fallback(self, error: Exception, *args, **kwargs) -> Dict:
        """Handle network errors with fallback"""
        # Try alternative endpoint or region
        return await self._try_alternative_endpoint(*args, **kwargs)
    
    async def _handle_auth_fallback(self, error: Exception, *args, **kwargs) -> Dict:
        """Handle authentication errors with fallback"""
        # Try alternative API key or credentials
        return await self._try_alternative_credentials(*args, **kwargs)
    
    def _generate_cache_key(self, *args, **kwargs) -> str:
        """Generate cache key for request"""
        import hashlib
        key_data = str(args) + str(sorted(kwargs.items()))
        return hashlib.md5(key_data.encode()).hexdigest()
    
    async def _try_alternative_provider(self, *args, **kwargs) -> Dict:
        """Try alternative AI provider"""
        # Implementation would depend on available providers
        return {'success': False, 'error': 'No alternative provider available'}
    
    async def _try_simpler_model(self, *args, **kwargs) -> Dict:
        """Try simpler/faster model"""
        # Implementation would depend on available models
        return {'success': False, 'error': 'No simpler model available'}
    
    async def _try_alternative_endpoint(self, *args, **kwargs) -> Dict:
        """Try alternative endpoint"""
        # Implementation would depend on available endpoints
        return {'success': False, 'error': 'No alternative endpoint available'}
    
    async def _try_alternative_credentials(self, *args, **kwargs) -> Dict:
        """Try alternative credentials"""
        # Implementation would depend on available credentials
        return {'success': False, 'error': 'No alternative credentials available'}

Error Monitoring and Alerting

1. Error Tracking System

class ErrorTracker:
    def __init__(self):
        self.error_log = []
        self.error_counts = {}
        self.alert_thresholds = {
            'error_rate': 0.05,  # 5% error rate
            'consecutive_failures': 10,
            'critical_errors': 5
        }
    
    def log_error(self, error: AIError, context: Dict = None):
        """Log an error for tracking"""
        error_entry = {
            'error': error,
            'context': context or {},
            'timestamp': time.time()
        }
        
        self.error_log.append(error_entry)
        
        # Update error counts
        error_key = f"{error.error_type.value}_{error.severity.value}"
        self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
        
        # Check for alerts
        await self._check_alerts(error_entry)
    
    async def _check_alerts(self, error_entry: Dict):
        """Check if alerts should be triggered"""
        # Calculate error rate
        recent_errors = [
            e for e in self.error_log
            if time.time() - e['timestamp'] < 3600  # Last hour
        ]
        
        if len(recent_errors) > 100:  # Need sufficient data
            error_rate = len(recent_errors) / 100  # Assuming 100 requests per hour baseline
            
            if error_rate > self.alert_thresholds['error_rate']:
                await self._send_alert('high_error_rate', {
                    'error_rate': error_rate,
                    'threshold': self.alert_thresholds['error_rate']
                })
        
        # Check for consecutive failures
        consecutive_failures = 0
        for error in reversed(self.error_log):
            if error['error'].severity in [ErrorSeverity.HIGH, ErrorSeverity.CRITICAL]:
                consecutive_failures += 1
            else:
                break
        
        if consecutive_failures >= self.alert_thresholds['consecutive_failures']:
            await self._send_alert('consecutive_failures', {
                'count': consecutive_failures,
                'threshold': self.alert_thresholds['consecutive_failures']
            })
        
        # Check for critical errors
        critical_errors = [
            e for e in recent_errors
            if e['error'].severity == ErrorSeverity.CRITICAL
        ]
        
        if len(critical_errors) >= self.alert_thresholds['critical_errors']:
            await self._send_alert('critical_errors', {
                'count': len(critical_errors),
                'threshold': self.alert_thresholds['critical_errors']
            })
    
    async def _send_alert(self, alert_type: str, data: Dict):
        """Send alert notification"""
        alert = {
            'type': alert_type,
            'data': data,
            'timestamp': time.time(),
            'severity': 'high' if alert_type in ['critical_errors', 'consecutive_failures'] else 'medium'
        }
        
        # Send alert via configured channels (email, Slack, etc.)
        print(f"🚨 ALERT: {alert_type} - {data}")
    
    def get_error_summary(self, hours: int = 24) -> Dict:
        """Get error summary for the specified time period"""
        cutoff_time = time.time() - (hours * 3600)
        recent_errors = [
            e for e in self.error_log
            if e['timestamp'] >= cutoff_time
        ]
        
        error_summary = {
            'total_errors': len(recent_errors),
            'error_types': {},
            'severity_distribution': {},
            'most_common_errors': []
        }
        
        # Count by error type
        for error_entry in recent_errors:
            error_type = error_entry['error'].error_type.value
            error_summary['error_types'][error_type] = error_summary['error_types'].get(error_type, 0) + 1
            
            severity = error_entry['error'].severity.value
            error_summary['severity_distribution'][severity] = error_summary['severity_distribution'].get(severity, 0) + 1
        
        # Find most common errors
        error_messages = {}
        for error_entry in recent_errors:
            message = error_entry['error'].message
            error_messages[message] = error_messages.get(message, 0) + 1
        
        error_summary['most_common_errors'] = sorted(
            error_messages.items(),
            key=lambda x: x[1],
            reverse=True
        )[:5]
        
        return error_summary

2. Error Recovery Dashboard

class ErrorRecoveryDashboard:
    def __init__(self):
        self.error_tracker = ErrorTracker()
        self.circuit_breakers = {}
        self.recovery_metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'recovered_requests': 0
        }
    
    async def track_request(self, request_id: str, operation: Callable, *args, **kwargs) -> Dict:
        """Track a request with error recovery"""
        self.recovery_metrics['total_requests'] += 1
        
        try:
            # Try primary operation
            result = await operation(*args, **kwargs)
            self.recovery_metrics['successful_requests'] += 1
            
            return {
                'request_id': request_id,
                'success': True,
                'result': result,
                'strategy': 'primary'
            }
        
        except Exception as e:
            self.recovery_metrics['failed_requests'] += 1
            
            # Log error
            ai_error = self._classify_exception(e)
            self.error_tracker.log_error(ai_error, {
                'request_id': request_id,
                'args': args,
                'kwargs': kwargs
            })
            
            # Try recovery
            recovery_result = await self._attempt_recovery(ai_error, operation, *args, **kwargs)
            
            if recovery_result['success']:
                self.recovery_metrics['recovered_requests'] += 1
            
            return recovery_result
    
    async def _attempt_recovery(self, error: AIError, operation: Callable, *args, **kwargs) -> Dict:
        """Attempt to recover from error"""
        # Use circuit breaker if available
        circuit_breaker_key = f"{operation.__name__}_{error.error_type.value}"
        
        if circuit_breaker_key not in self.circuit_breakers:
            self.circuit_breakers[circuit_breaker_key] = CircuitBreaker()
        
        circuit_breaker = self.circuit_breakers[circuit_breaker_key]
        
        # Try with retry and fallback
        retry_manager = RetryManager()
        fallback_manager = FallbackManager()
        
        # First try with retry
        retry_result = await retry_manager.execute_with_retry(operation, *args, **kwargs)
        
        if retry_result['success']:
            return retry_result
        
        # If retry fails, try with fallback
        fallback_result = await fallback_manager.execute_with_fallback(
            operation, None, *args, **kwargs
        )
        
        return fallback_result
    
    def get_dashboard_data(self) -> Dict:
        """Get dashboard data for monitoring"""
        return {
            'metrics': self.recovery_metrics,
            'error_summary': self.error_tracker.get_error_summary(),
            'circuit_breakers': {
                key: breaker.get_status()
                for key, breaker in self.circuit_breakers.items()
            },
            'success_rate': (
                self.recovery_metrics['successful_requests'] + self.recovery_metrics['recovered_requests']
            ) / max(self.recovery_metrics['total_requests'], 1) * 100,
            'recovery_rate': (
                self.recovery_metrics['recovered_requests'] / max(self.recovery_metrics['failed_requests'], 1) * 100
            )
        }

Best Practices

  1. Classify Errors: Implement proper error classification for targeted recovery
  2. Use Retry Logic: Implement exponential backoff for transient errors
  3. Circuit Breakers: Use circuit breakers to prevent cascading failures
  4. Fallback Strategies: Have multiple fallback options for different error types
  5. Monitor Errors: Track error patterns and recovery success rates
  6. Alert on Issues: Set up alerts for critical error patterns
  7. Test Recovery: Regularly test error recovery mechanisms
  8. Document Procedures: Maintain runbooks for manual error recovery

Conclusion

Effective AI model error recovery requires a comprehensive approach that includes proper error classification, retry mechanisms, circuit breakers, and fallback strategies. By implementing these techniques and best practices, you can maintain high availability and reliability for your AI services.

The key is to understand the different types of errors, implement appropriate recovery strategies for each, and continuously monitor and improve your error handling processes.

← Back to Learning Center