AI Model Error Handling Techniques

Master AI model error handling techniques. Learn retry mechanisms, graceful degradation, and implementation patterns for robust AI applications.

Effective error handling is crucial for building reliable AI applications. This guide covers comprehensive error handling strategies, retry mechanisms, graceful degradation, and implementation patterns.

πŸš€

Managed Error Handling

Tetrate Agent Router Service provides advanced error handling capabilities with automatic retry mechanisms, circuit breakers, and graceful degradation. This managed service handles the complexity of error management, ensuring your AI applications remain resilient even when individual components fail.

  • Automatic retry with exponential backoff
  • Circuit breaker pattern implementation
  • Graceful degradation strategies
  • Comprehensive error monitoring
Learn More β†’

Common AI Model Errors

  • Rate Limiting: 429 errors from API providers
  • Authentication Failures: Invalid or expired API keys
  • Network Issues: Timeouts and connection problems
  • Model Errors: Invalid requests or model-specific issues
  • Quota Exhaustion: Monthly or daily limits reached
  • Service Outages: Provider downtime

Error Handling Strategies

1. Retry with Exponential Backoff

import asyncio
import random
import time
from typing import Callable, Any, Optional

class RetryHandler:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0, max_delay: float = 60.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
    
    async def retry_with_backoff(self, func: Callable, *args, **kwargs) -> Any:
        """Retry function with exponential backoff"""
        
        last_exception = None
        
        for attempt in range(self.max_retries + 1):
            try:
                return await func(*args, **kwargs)
                
            except Exception as e:
                last_exception = e
                
                # Don't retry on certain errors
                if self._should_not_retry(e):
                    raise e
                
                # Don't sleep on last attempt
                if attempt == self.max_retries:
                    break
                
                # Calculate delay with jitter
                delay = min(
                    self.base_delay * (2 ** attempt) + random.uniform(0, 1),
                    self.max_delay
                )
                
                print(f"Retry {attempt + 1}/{self.max_retries} in {delay:.2f}s due to: {e}")
                await asyncio.sleep(delay)
        
        raise last_exception
    
    def _should_not_retry(self, exception: Exception) -> bool:
        """Determine if an exception should not be retried"""
        # Don't retry authentication errors
        if "authentication" in str(exception).lower() or "unauthorized" in str(exception).lower():
            return True
        
        # Don't retry invalid request errors
        if "invalid" in str(exception).lower() or "bad request" in str(exception).lower():
            return True
        
        # Don't retry quota exceeded errors
        if "quota" in str(exception).lower() or "limit" in str(exception).lower():
            return True
        
        return False

## Usage

retry_handler = RetryHandler(max_retries=3, base_delay=1.0)

async def call_ai_model(prompt: str) -> str:
    # Implementation that might fail
    pass

try:
    result = await retry_handler.retry_with_backoff(call_ai_model, "Hello, world!")
except Exception as e:
    print(f"All retries failed: {e}")

2. Circuit Breaker Pattern

import time
from enum import Enum
from typing import Dict, Any

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, reject requests
    HALF_OPEN = "half_open"  # Testing if service recovered

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, recovery_timeout: int = 60, expected_exception: type = Exception):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED
    
    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Call function with circuit breaker protection"""
        
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
                print("πŸ”„ Circuit breaker half-open, testing recovery")
            else:
                raise Exception("Circuit breaker is open")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
            
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = time.time()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"❌ Circuit breaker opened after {self.failure_count} failures")
    
    def get_state(self) -> CircuitState:
        """Get current circuit breaker state"""
        return self.state

## Usage

circuit_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

try:
    result = await circuit_breaker.call(call_ai_model, "Hello, world!")
except Exception as e:
    print(f"Request failed: {e}")

3. Graceful Degradation

class GracefulDegradation:
    def __init__(self, fallback_strategies: Dict[str, Callable]):
        self.fallback_strategies = fallback_strategies
        self.degradation_level = 0
    
    async def execute_with_degradation(self, primary_func: Callable, *args, **kwargs) -> Any:
        """Execute with graceful degradation"""
        
        # Try primary function first
        try:
            result = await primary_func(*args, **kwargs)
            self.degradation_level = 0
            return result
            
        except Exception as e:
            print(f"Primary function failed: {e}")
            return await self._try_fallbacks(*args, **kwargs)
    
    async def _try_fallbacks(self, *args, **kwargs) -> Any:
        """Try fallback strategies in order"""
        
        for level, fallback_func in self.fallback_strategies.items():
            try:
                print(f"πŸ”„ Trying fallback level {level}")
                result = await fallback_func(*args, **kwargs)
                self.degradation_level = int(level)
                return result
                
            except Exception as e:
                print(f"Fallback level {level} failed: {e}")
                continue
        
        # All fallbacks failed
        raise Exception("All fallback strategies failed")
    
    def get_degradation_level(self) -> int:
        """Get current degradation level"""
        return self.degradation_level

## Usage

async def fallback_model_1(prompt: str) -> str:
    # Use a simpler model
    return "Fallback response 1"

async def fallback_model_2(prompt: str) -> str:
    # Use cached responses
    return "Fallback response 2"

async def fallback_model_3(prompt: str) -> str:
    # Return static response
    return "Service temporarily unavailable"

fallback_strategies = {
    "1": fallback_model_1,
    "2": fallback_model_2,
    "3": fallback_model_3
}

degradation = GracefulDegradation(fallback_strategies)

try:
    result = await degradation.execute_with_degradation(call_ai_model, "Hello, world!")
    print(f"Result: {result}")
    print(f"Degradation level: {degradation.get_degradation_level()}")
except Exception as e:
    print(f"All strategies failed: {e}")

4. Error Classification and Handling

from enum import Enum
from typing import Dict, List, Optional

class ErrorType(Enum):
    RATE_LIMIT = "rate_limit"
    AUTHENTICATION = "authentication"
    NETWORK = "network"
    MODEL_ERROR = "model_error"
    QUOTA_EXCEEDED = "quota_exceeded"
    UNKNOWN = "unknown"

class ErrorClassifier:
    def __init__(self):
        self.error_patterns = {
            ErrorType.RATE_LIMIT: [
                "rate limit", "429", "too many requests", "throttled"
            ],
            ErrorType.AUTHENTICATION: [
                "authentication", "unauthorized", "401", "403", "invalid api key"
            ],
            ErrorType.NETWORK: [
                "timeout", "connection", "network", "dns", "ssl"
            ],
            ErrorType.MODEL_ERROR: [
                "invalid request", "model", "parameter", "400"
            ],
            ErrorType.QUOTA_EXCEEDED: [
                "quota", "limit", "billing", "usage"
            ]
        }
    
    def classify_error(self, exception: Exception) -> ErrorType:
        """Classify an exception by type"""
        error_message = str(exception).lower()
        
        for error_type, patterns in self.error_patterns.items():
            for pattern in patterns:
                if pattern in error_message:
                    return error_type
        
        return ErrorType.UNKNOWN

class ErrorHandler:
    def __init__(self):
        self.classifier = ErrorClassifier()
        self.handlers = {
            ErrorType.RATE_LIMIT: self._handle_rate_limit,
            ErrorType.AUTHENTICATION: self._handle_authentication,
            ErrorType.NETWORK: self._handle_network,
            ErrorType.MODEL_ERROR: self._handle_model_error,
            ErrorType.QUOTA_EXCEEDED: self._handle_quota_exceeded,
            ErrorType.UNKNOWN: self._handle_unknown
        }
    
    async def handle_error(self, exception: Exception, context: Dict = None) -> Optional[Any]:
        """Handle error based on classification"""
        error_type = self.classifier.classify_error(exception)
        handler = self.handlers.get(error_type, self._handle_unknown)
        
        return await handler(exception, context or {})
    
    async def _handle_rate_limit(self, exception: Exception, context: Dict) -> Optional[Any]:
        """Handle rate limit errors"""
        print("πŸ”„ Rate limit detected, implementing backoff strategy")
        # Implement exponential backoff
        await asyncio.sleep(random.uniform(1, 5))
        return None
    
    async def _handle_authentication(self, exception: Exception, context: Dict) -> Optional[Any]:
        """Handle authentication errors"""
        print("πŸ” Authentication error, check API keys")
        # Could implement API key rotation here
        raise exception  # Don't retry auth errors
    
    async def _handle_network(self, exception: Exception, context: Dict) -> Optional[Any]:
        """Handle network errors"""
        print("🌐 Network error, will retry")
        # Network errors are usually transient
        return None
    
    async def _handle_model_error(self, exception: Exception, context: Dict) -> Optional[Any]:
        """Handle model-specific errors"""
        print("πŸ€– Model error, check request parameters")
        # Could implement request validation or model switching
        raise exception
    
    async def _handle_quota_exceeded(self, exception: Exception, context: Dict) -> Optional[Any]:
        """Handle quota exceeded errors"""
        print("πŸ’° Quota exceeded, switch to fallback model")
        # Switch to different model or provider
        return await self._get_fallback_response(context)
    
    async def _handle_unknown(self, exception: Exception, context: Dict) -> Optional[Any]:
        """Handle unknown errors"""
        print(f"❓ Unknown error: {exception}")
        return None
    
    async def _get_fallback_response(self, context: Dict) -> str:
        """Get fallback response when quota is exceeded"""
        return "Service temporarily unavailable due to quota limits"

5. Comprehensive Error Handling System

class AIErrorHandler:
    def __init__(self):
        self.retry_handler = RetryHandler(max_retries=3)
        self.circuit_breaker = CircuitBreaker(failure_threshold=5)
        self.error_handler = ErrorHandler()
        self.degradation = GracefulDegradation({
            "1": self._fallback_model_1,
            "2": self._fallback_model_2,
            "3": self._fallback_model_3
        })
    
    async def execute_with_error_handling(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with comprehensive error handling"""
        
        try:
            # Try with circuit breaker and retry
            async def protected_call():
                return await self.circuit_breaker.call(func, *args, **kwargs)
            
            result = await self.retry_handler.retry_with_backoff(protected_call)
            return result
            
        except Exception as e:
            # Handle specific error types
            handled_result = await self.error_handler.handle_error(e, {
                'function': func.__name__,
                'args': args,
                'kwargs': kwargs
            })
            
            if handled_result is not None:
                return handled_result
            
            # Try graceful degradation
            try:
                return await self.degradation.execute_with_degradation(func, *args, **kwargs)
            except Exception as degradation_error:
                # All error handling failed
                raise Exception(f"All error handling strategies failed: {e} -> {degradation_error}")
    
    async def _fallback_model_1(self, *args, **kwargs) -> str:
        """First fallback: use simpler model"""
        return "Fallback response from simpler model"
    
    async def _fallback_model_2(self, *args, **kwargs) -> str:
        """Second fallback: use cached responses"""
        return "Fallback response from cache"
    
    async def _fallback_model_3(self, *args, **kwargs) -> str:
        """Third fallback: static response"""
        return "Service temporarily unavailable"

## Usage

ai_error_handler = AIErrorHandler()

async def call_ai_model(prompt: str) -> str:
    # Simulate AI model call that might fail
    if random.random() < 0.3:  # 30% chance of failure
        raise Exception("Rate limit exceeded")
    return f"AI response to: {prompt}"

try:
    result = await ai_error_handler.execute_with_error_handling(
        call_ai_model, "Hello, world!"
    )
    print(f"Success: {result}")
except Exception as e:
    print(f"All error handling failed: {e}")

Error Monitoring and Logging

1. Error Tracking

import logging
from datetime import datetime

class ErrorTracker:
    def __init__(self):
        self.error_counts = {}
        self.error_history = []
        self.logger = logging.getLogger(__name__)
    
    def track_error(self, error: Exception, context: Dict = None):
        """Track an error occurrence"""
        error_type = type(error).__name__
        timestamp = datetime.now()
        
        # Update error counts
        self.error_counts[error_type] = self.error_counts.get(error_type, 0) + 1
        
        # Record error details
        error_record = {
            'timestamp': timestamp,
            'error_type': error_type,
            'error_message': str(error),
            'context': context or {}
        }
        
        self.error_history.append(error_record)
        
        # Log error
        self.logger.error(f"Error tracked: {error_type} - {error}")
        
        # Keep only recent errors
        if len(self.error_history) > 1000:
            self.error_history.pop(0)
    
    def get_error_summary(self) -> Dict:
        """Get summary of recent errors"""
        return {
            'total_errors': len(self.error_history),
            'error_counts': self.error_counts,
            'recent_errors': self.error_history[-10:]  # Last 10 errors
        }
    
    def get_error_rate(self, window_minutes: int = 60) -> float:
        """Calculate error rate in recent window"""
        cutoff_time = datetime.now().timestamp() - (window_minutes * 60)
        
        recent_errors = [
            e for e in self.error_history
            if e['timestamp'].timestamp() > cutoff_time
        ]
        
        return len(recent_errors) / window_minutes  # errors per minute

Best Practices

  1. Classify Errors: Differentiate between retryable and non-retryable errors
  2. Use Circuit Breakers: Prevent cascading failures
  3. Implement Graceful Degradation: Provide fallback responses
  4. Monitor Error Patterns: Track and analyze error occurrences
  5. Set Appropriate Timeouts: Don’t let requests hang indefinitely
  6. Log Everything: Maintain detailed error logs for debugging
  7. Test Error Scenarios: Regularly test error handling mechanisms
  8. Document Error Responses: Maintain clear documentation of error handling

Conclusion

Effective error handling is essential for building reliable AI applications. By implementing comprehensive error handling strategies including retries, circuit breakers, graceful degradation, and proper monitoring, you can ensure your AI applications remain robust even when individual components fail.

The key is to start with basic error handling and gradually add more sophisticated features as your needs grow.

← Back to Learning Center