Monitoring AI Model Health

Master AI model health monitoring strategies. Learn metrics, alerting, and implementation patterns for comprehensive AI system monitoring.

Comprehensive health monitoring is essential for maintaining reliable AI model deployments. This guide covers monitoring strategies, key metrics, alerting systems, and implementation patterns.

🚀

Managed Monitoring Solution

Tetrate Agent Router Service provides comprehensive AI model health monitoring with real-time metrics, alerting, and cross-provider visibility. This managed service offers built-in monitoring dashboards, automated health checks, and intelligent alerting to ensure your AI systems remain healthy and performant.

  • Real-time metrics and dashboards
  • Automated health checks
  • Intelligent alerting systems
  • Cross-provider visibility
Learn More →

Why AI Model Health Monitoring Matters

  • Early Detection: Identify issues before they impact users
  • Performance Optimization: Track and improve model performance
  • Cost Management: Monitor usage and costs
  • Reliability: Ensure high availability and uptime

Key Health Metrics

1. Response Time Metrics

import time
import statistics
from typing import Dict, List, Optional

class ResponseTimeMonitor:
    def __init__(self, window_size: int = 100):
        self.window_size = window_size
        self.response_times = {}
        self.p95_times = {}
        self.p99_times = {}
    
    def record_response_time(self, model_id: str, response_time: float):
        """Record response time for a model"""
        if model_id not in self.response_times:
            self.response_times[model_id] = []
        
        self.response_times[model_id].append(response_time)
        
        # Keep only recent measurements
        if len(self.response_times[model_id]) > self.window_size:
            self.response_times[model_id].pop(0)
        
        # Update percentiles
        self._update_percentiles(model_id)
    
    def _update_percentiles(self, model_id: str):
        """Update P95 and P99 response times"""
        times = self.response_times[model_id]
        if len(times) >= 10:  # Need at least 10 samples for percentiles
            sorted_times = sorted(times)
            p95_index = int(len(sorted_times) * 0.95)
            p99_index = int(len(sorted_times) * 0.99)
            
            self.p95_times[model_id] = sorted_times[p95_index]
            self.p99_times[model_id] = sorted_times[p99_index]
    
    def get_metrics(self, model_id: str) -> Dict:
        """Get response time metrics for a model"""
        times = self.response_times.get(model_id, [])
        
        if not times:
            return {
                'avg_response_time': 0,
                'p95_response_time': 0,
                'p99_response_time': 0,
                'min_response_time': 0,
                'max_response_time': 0,
                'sample_count': 0
            }
        
        return {
            'avg_response_time': statistics.mean(times),
            'p95_response_time': self.p95_times.get(model_id, 0),
            'p99_response_time': self.p99_times.get(model_id, 0),
            'min_response_time': min(times),
            'max_response_time': max(times),
            'sample_count': len(times)
        }

2. Error Rate Monitoring

class ErrorRateMonitor:
    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self.total_requests = {}
        self.error_requests = {}
        self.error_types = {}
    
    def record_request(self, model_id: str, success: bool, error_type: str = None):
        """Record a request and its outcome"""
        if model_id not in self.total_requests:
            self.total_requests[model_id] = []
            self.error_requests[model_id] = []
            self.error_types[model_id] = {}
        
        # Record total request
        self.total_requests[model_id].append(time.time())
        
        # Record error if applicable
        if not success:
            self.error_requests[model_id].append(time.time())
            if error_type:
                self.error_types[model_id][error_type] = self.error_types[model_id].get(error_type, 0) + 1
        
        # Clean old data
        self._clean_old_data(model_id)
    
    def _clean_old_data(self, model_id: str):
        """Remove old data outside the window"""
        cutoff_time = time.time() - 3600  # 1 hour window
        
        # Clean total requests
        self.total_requests[model_id] = [
            t for t in self.total_requests[model_id] 
            if t > cutoff_time
        ]
        
        # Clean error requests
        self.error_requests[model_id] = [
            t for t in self.error_requests[model_id] 
            if t > cutoff_time
        ]
    
    def get_error_rate(self, model_id: str) -> float:
        """Calculate error rate for a model"""
        total = len(self.total_requests.get(model_id, []))
        errors = len(self.error_requests.get(model_id, []))
        
        if total == 0:
            return 0.0
        
        return errors / total
    
    def get_error_breakdown(self, model_id: str) -> Dict[str, int]:
        """Get breakdown of error types"""
        return self.error_types.get(model_id, {})

3. Throughput Monitoring

class ThroughputMonitor:
    def __init__(self, window_size: int = 60):  # 60 seconds
        self.window_size = window_size
        self.request_timestamps = {}
    
    def record_request(self, model_id: str):
        """Record a request timestamp"""
        if model_id not in self.request_timestamps:
            self.request_timestamps[model_id] = []
        
        self.request_timestamps[model_id].append(time.time())
        self._clean_old_data(model_id)
    
    def _clean_old_data(self, model_id: str):
        """Remove old timestamps"""
        cutoff_time = time.time() - self.window_size
        self.request_timestamps[model_id] = [
            t for t in self.request_timestamps[model_id] 
            if t > cutoff_time
        ]
    
    def get_requests_per_second(self, model_id: str) -> float:
        """Calculate requests per second"""
        timestamps = self.request_timestamps.get(model_id, [])
        if not timestamps:
            return 0.0
        
        # Calculate RPS over the window
        oldest_time = min(timestamps)
        newest_time = max(timestamps)
        time_span = newest_time - oldest_time
        
        if time_span == 0:
            return len(timestamps)
        
        return len(timestamps) / time_span

Health Check Implementation

1. Basic Health Check

import aiohttp
import asyncio
from typing import Dict, List

class AIHealthChecker:
    def __init__(self, models: List[Dict]):
        self.models = models
        self.health_status = {model['id']: True for model in models}
        self.last_check = {model['id']: 0 for model in models}
        self.check_interval = 30  # seconds
    
    async def check_model_health(self, model: Dict) -> bool:
        """Check health of a specific model"""
        try:
            # Simple health check request
            health_prompt = "Hello"
            
            start_time = time.time()
            response = await self._call_model(model, health_prompt)
            response_time = time.time() - start_time
            
            # Check if response is valid
            if not response or response_time > 10:  # 10 second timeout
                return False
            
            return True
            
        except Exception as e:
            print(f"Health check failed for {model['id']}: {e}")
            return False
    
    async def run_health_checks(self):
        """Run health checks for all models"""
        while True:
            tasks = []
            for model in self.models:
                task = asyncio.create_task(
                    self._check_single_model(model)
                )
                tasks.append(task)
            
            await asyncio.gather(*tasks, return_exceptions=True)
            await asyncio.sleep(self.check_interval)
    
    async def _check_single_model(self, model: Dict):
        """Check health of a single model"""
        is_healthy = await self.check_model_health(model)
        was_healthy = self.health_status[model['id']]
        
        self.health_status[model['id']] = is_healthy
        self.last_check[model['id']] = time.time()
        
        # Log status changes
        if is_healthy and not was_healthy:
            print(f"✅ {model['id']} recovered")
        elif not is_healthy and was_healthy:
            print(f"❌ {model['id']} became unhealthy")

2. Advanced Health Check with Metrics

class AdvancedHealthChecker:
    def __init__(self, models: List[Dict]):
        self.models = models
        self.response_monitor = ResponseTimeMonitor()
        self.error_monitor = ErrorRateMonitor()
        self.throughput_monitor = ThroughputMonitor()
        self.health_status = {model['id']: True for model in models}
    
    async def comprehensive_health_check(self, model: Dict) -> Dict:
        """Perform comprehensive health check"""
        health_result = {
            'model_id': model['id'],
            'timestamp': time.time(),
            'overall_healthy': True,
            'metrics': {},
            'issues': []
        }
        
        try:
            # Test basic functionality
            test_prompt = "Generate a simple response"
            start_time = time.time()
            
            response = await self._call_model(model, test_prompt)
            response_time = time.time() - start_time
            
            # Record metrics
            self.response_monitor.record_response_time(model['id'], response_time)
            self.throughput_monitor.record_request(model['id'])
            self.error_monitor.record_request(model['id'], True)
            
            # Check response time
            if response_time > 5:  # 5 second threshold
                health_result['issues'].append(f"High response time: {response_time:.2f}s")
                health_result['overall_healthy'] = False
            
            # Check response quality
            if not response or len(response) < 10:
                health_result['issues'].append("Poor response quality")
                health_result['overall_healthy'] = False
            
            # Get aggregated metrics
            health_result['metrics'] = {
                'response_time': self.response_monitor.get_metrics(model['id']),
                'error_rate': self.error_monitor.get_error_rate(model['id']),
                'throughput': self.throughput_monitor.get_requests_per_second(model['id'])
            }
            
            # Check error rate
            if health_result['metrics']['error_rate'] > 0.05:  # 5% threshold
                health_result['issues'].append(f"High error rate: {health_result['metrics']['error_rate']:.2%}")
                health_result['overall_healthy'] = False
            
        except Exception as e:
            self.error_monitor.record_request(model['id'], False, str(type(e).__name__))
            health_result['issues'].append(f"Health check failed: {str(e)}")
            health_result['overall_healthy'] = False
        
        # Update health status
        self.health_status[model['id']] = health_result['overall_healthy']
        
        return health_result

Alerting System

1. Alert Manager

from enum import Enum
import smtplib
from email.mime.text import MIMEText

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

class AlertManager:
    def __init__(self, config: Dict):
        self.config = config
        self.alert_history = []
        self.alert_cooldowns = {}  # Prevent alert spam
    
    async def send_alert(self, model_id: str, severity: AlertSeverity, message: str):
        """Send an alert"""
        alert_id = f"{model_id}_{severity.value}_{int(time.time())}"
        
        # Check cooldown
        if self._is_in_cooldown(alert_id):
            return
        
        alert = {
            'id': alert_id,
            'model_id': model_id,
            'severity': severity,
            'message': message,
            'timestamp': time.time()
        }
        
        # Send alert based on severity
        if severity == AlertSeverity.CRITICAL:
            await self._send_critical_alert(alert)
        elif severity == AlertSeverity.WARNING:
            await self._send_warning_alert(alert)
        else:
            await self._send_info_alert(alert)
        
        # Record alert
        self.alert_history.append(alert)
        self._set_cooldown(alert_id)
    
    async def _send_critical_alert(self, alert: Dict):
        """Send critical alert via multiple channels"""
        # Email
        await self._send_email_alert(alert, "CRITICAL")
        
        # Slack/Teams
        await self._send_slack_alert(alert)
        
        # PagerDuty
        await self._send_pagerduty_alert(alert)
    
    async def _send_warning_alert(self, alert: Dict):
        """Send warning alert"""
        await self._send_email_alert(alert, "WARNING")
        await self._send_slack_alert(alert)
    
    async def _send_info_alert(self, alert: Dict):
        """Send info alert"""
        await self._send_slack_alert(alert)
    
    def _is_in_cooldown(self, alert_id: str) -> bool:
        """Check if alert is in cooldown period"""
        if alert_id in self.alert_cooldowns:
            cooldown_until = self.alert_cooldowns[alert_id]
            if time.time() < cooldown_until:
                return True
        return False
    
    def _set_cooldown(self, alert_id: str):
        """Set cooldown for an alert"""
        cooldown_duration = 300  # 5 minutes
        self.alert_cooldowns[alert_id] = time.time() + cooldown_duration

2. Threshold-Based Alerting

class ThresholdAlerting:
    def __init__(self, alert_manager: AlertManager):
        self.alert_manager = alert_manager
        self.thresholds = {
            'response_time_p95': 3.0,  # seconds
            'response_time_p99': 5.0,  # seconds
            'error_rate': 0.05,  # 5%
            'throughput_min': 1.0,  # requests per second
            'health_check_failures': 3  # consecutive failures
        }
    
    async def check_thresholds(self, model_id: str, metrics: Dict):
        """Check metrics against thresholds and send alerts"""
        
        # Check response time P95
        p95_time = metrics['response_time']['p95_response_time']
        if p95_time > self.thresholds['response_time_p95']:
            await self.alert_manager.send_alert(
                model_id, 
                AlertSeverity.WARNING,
                f"P95 response time ({p95_time:.2f}s) exceeds threshold ({self.thresholds['response_time_p95']}s)"
            )
        
        # Check response time P99
        p99_time = metrics['response_time']['p99_response_time']
        if p99_time > self.thresholds['response_time_p99']:
            await self.alert_manager.send_alert(
                model_id,
                AlertSeverity.CRITICAL,
                f"P99 response time ({p99_time:.2f}s) exceeds threshold ({self.thresholds['response_time_p99']}s)"
            )
        
        # Check error rate
        error_rate = metrics['error_rate']
        if error_rate > self.thresholds['error_rate']:
            await self.alert_manager.send_alert(
                model_id,
                AlertSeverity.CRITICAL,
                f"Error rate ({error_rate:.2%}) exceeds threshold ({self.thresholds['error_rate']:.2%})"
            )
        
        # Check throughput
        throughput = metrics['throughput']
        if throughput < self.thresholds['throughput_min']:
            await self.alert_manager.send_alert(
                model_id,
                AlertSeverity.WARNING,
                f"Throughput ({throughput:.2f} req/s) below minimum ({self.thresholds['throughput_min']} req/s)"
            )

Dashboard and Visualization

1. Metrics Dashboard

class MetricsDashboard:
    def __init__(self):
        self.metrics_history = {}
    
    def add_metrics(self, model_id: str, metrics: Dict):
        """Add metrics to dashboard"""
        if model_id not in self.metrics_history:
            self.metrics_history[model_id] = []
        
        self.metrics_history[model_id].append({
            'timestamp': time.time(),
            'metrics': metrics
        })
        
        # Keep only last 1000 data points
        if len(self.metrics_history[model_id]) > 1000:
            self.metrics_history[model_id].pop(0)
    
    def get_model_summary(self, model_id: str) -> Dict:
        """Get summary metrics for a model"""
        history = self.metrics_history.get(model_id, [])
        
        if not history:
            return {}
        
        # Calculate summary statistics
        response_times = [h['metrics']['response_time']['avg_response_time'] for h in history]
        error_rates = [h['metrics']['error_rate'] for h in history]
        throughputs = [h['metrics']['throughput'] for h in history]
        
        return {
            'avg_response_time': statistics.mean(response_times),
            'avg_error_rate': statistics.mean(error_rates),
            'avg_throughput': statistics.mean(throughputs),
            'total_requests': len(history),
            'last_updated': history[-1]['timestamp']
        }
    
    def get_health_status(self) -> Dict:
        """Get health status of all models"""
        status = {}
        for model_id in self.metrics_history:
            recent_metrics = self.metrics_history[model_id][-10:]  # Last 10 data points
            
            if not recent_metrics:
                status[model_id] = 'unknown'
                continue
            
            # Determine health based on recent metrics
            avg_error_rate = statistics.mean([m['metrics']['error_rate'] for m in recent_metrics])
            avg_response_time = statistics.mean([m['metrics']['response_time']['avg_response_time'] for m in recent_metrics])
            
            if avg_error_rate > 0.1 or avg_response_time > 10:
                status[model_id] = 'unhealthy'
            elif avg_error_rate > 0.05 or avg_response_time > 5:
                status[model_id] = 'degraded'
            else:
                status[model_id] = 'healthy'
        
        return status

Implementation Example

class ProductionAIHealthMonitor:
    def __init__(self, models: List[Dict], alert_config: Dict):
        self.models = models
        self.health_checker = AdvancedHealthChecker(models)
        self.alert_manager = AlertManager(alert_config)
        self.threshold_alerter = ThresholdAlerting(self.alert_manager)
        self.dashboard = MetricsDashboard()
    
    async def start_monitoring(self):
        """Start comprehensive health monitoring"""
        print("🚀 Starting AI health monitoring...")
        
        # Start health check loop
        health_task = asyncio.create_task(self._health_check_loop())
        
        # Start metrics collection
        metrics_task = asyncio.create_task(self._metrics_collection_loop())
        
        # Wait for both tasks
        await asyncio.gather(health_task, metrics_task)
    
    async def _health_check_loop(self):
        """Run periodic health checks"""
        while True:
            for model in self.models:
                try:
                    health_result = await self.health_checker.comprehensive_health_check(model)
                    
                    # Update dashboard
                    self.dashboard.add_metrics(model['id'], health_result['metrics'])
                    
                    # Check thresholds and send alerts
                    await self.threshold_alerter.check_thresholds(model['id'], health_result['metrics'])
                    
                    # Send health status alerts
                    if not health_result['overall_healthy']:
                        await self.alert_manager.send_alert(
                            model['id'],
                            AlertSeverity.CRITICAL,
                            f"Model health check failed: {', '.join(health_result['issues'])}"
                        )
                    
                except Exception as e:
                    print(f"Error in health check for {model['id']}: {e}")
            
            await asyncio.sleep(30)  # Check every 30 seconds
    
    async def _metrics_collection_loop(self):
        """Collect additional metrics"""
        while True:
            # Collect system metrics, cost metrics, etc.
            await asyncio.sleep(60)  # Collect every minute

## Usage

```python
models = [
    {'id': 'gpt-4', 'endpoint': 'https://api.openai.com/v1'},
    {'id': 'claude-3', 'endpoint': 'https://api.anthropic.com/v1'},
    {'id': 'gemini-pro', 'endpoint': 'https://generativelanguage.googleapis.com/v1'}
]

alert_config = {
    'email': {'smtp_server': 'smtp.gmail.com', 'port': 587},
    'slack': {'webhook_url': 'https://hooks.slack.com/...'},
    'pagerduty': {'api_key': '...'}
}

monitor = ProductionAIHealthMonitor(models, alert_config)
await monitor.start_monitoring()

Best Practices

  1. Monitor Key Metrics: Response time, error rate, throughput, and availability
  2. Set Appropriate Thresholds: Base thresholds on historical data and business requirements
  3. Implement Alerting: Use multiple channels for critical alerts
  4. Avoid Alert Fatigue: Use cooldowns and escalation policies
  5. Visualize Data: Create dashboards for real-time monitoring
  6. Test Monitoring: Regularly test alerting and health checks
  7. Document Procedures: Maintain runbooks for common issues
  8. Automate Recovery: Implement automated responses to common problems

Conclusion

Comprehensive AI model health monitoring is essential for maintaining reliable AI services. By implementing proper metrics collection, health checks, alerting, and visualization, you can ensure your AI models remain healthy and performant.

The key is to start with basic monitoring and gradually add more sophisticated features as your needs grow.

← Back to Learning Center