Comprehensive health monitoring is essential for maintaining reliable AI model deployments. This guide covers monitoring strategies, key metrics, alerting systems, and implementation patterns.
Managed Monitoring Solution
Tetrate Agent Router Service provides comprehensive AI model health monitoring with real-time metrics, alerting, and cross-provider visibility. This managed service offers built-in monitoring dashboards, automated health checks, and intelligent alerting to ensure your AI systems remain healthy and performant.
- Real-time metrics and dashboards
- Automated health checks
- Intelligent alerting systems
- Cross-provider visibility
Why AI Model Health Monitoring Matters
- Early Detection: Identify issues before they impact users
- Performance Optimization: Track and improve model performance
- Cost Management: Monitor usage and costs
- Reliability: Ensure high availability and uptime
Key Health Metrics
1. Response Time Metrics
import time
import statistics
from typing import Dict, List, Optional
class ResponseTimeMonitor:
def __init__(self, window_size: int = 100):
self.window_size = window_size
self.response_times = {}
self.p95_times = {}
self.p99_times = {}
def record_response_time(self, model_id: str, response_time: float):
"""Record response time for a model"""
if model_id not in self.response_times:
self.response_times[model_id] = []
self.response_times[model_id].append(response_time)
# Keep only recent measurements
if len(self.response_times[model_id]) > self.window_size:
self.response_times[model_id].pop(0)
# Update percentiles
self._update_percentiles(model_id)
def _update_percentiles(self, model_id: str):
"""Update P95 and P99 response times"""
times = self.response_times[model_id]
if len(times) >= 10: # Need at least 10 samples for percentiles
sorted_times = sorted(times)
p95_index = int(len(sorted_times) * 0.95)
p99_index = int(len(sorted_times) * 0.99)
self.p95_times[model_id] = sorted_times[p95_index]
self.p99_times[model_id] = sorted_times[p99_index]
def get_metrics(self, model_id: str) -> Dict:
"""Get response time metrics for a model"""
times = self.response_times.get(model_id, [])
if not times:
return {
'avg_response_time': 0,
'p95_response_time': 0,
'p99_response_time': 0,
'min_response_time': 0,
'max_response_time': 0,
'sample_count': 0
}
return {
'avg_response_time': statistics.mean(times),
'p95_response_time': self.p95_times.get(model_id, 0),
'p99_response_time': self.p99_times.get(model_id, 0),
'min_response_time': min(times),
'max_response_time': max(times),
'sample_count': len(times)
}
2. Error Rate Monitoring
class ErrorRateMonitor:
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self.total_requests = {}
self.error_requests = {}
self.error_types = {}
def record_request(self, model_id: str, success: bool, error_type: str = None):
"""Record a request and its outcome"""
if model_id not in self.total_requests:
self.total_requests[model_id] = []
self.error_requests[model_id] = []
self.error_types[model_id] = {}
# Record total request
self.total_requests[model_id].append(time.time())
# Record error if applicable
if not success:
self.error_requests[model_id].append(time.time())
if error_type:
self.error_types[model_id][error_type] = self.error_types[model_id].get(error_type, 0) + 1
# Clean old data
self._clean_old_data(model_id)
def _clean_old_data(self, model_id: str):
"""Remove old data outside the window"""
cutoff_time = time.time() - 3600 # 1 hour window
# Clean total requests
self.total_requests[model_id] = [
t for t in self.total_requests[model_id]
if t > cutoff_time
]
# Clean error requests
self.error_requests[model_id] = [
t for t in self.error_requests[model_id]
if t > cutoff_time
]
def get_error_rate(self, model_id: str) -> float:
"""Calculate error rate for a model"""
total = len(self.total_requests.get(model_id, []))
errors = len(self.error_requests.get(model_id, []))
if total == 0:
return 0.0
return errors / total
def get_error_breakdown(self, model_id: str) -> Dict[str, int]:
"""Get breakdown of error types"""
return self.error_types.get(model_id, {})
3. Throughput Monitoring
class ThroughputMonitor:
def __init__(self, window_size: int = 60): # 60 seconds
self.window_size = window_size
self.request_timestamps = {}
def record_request(self, model_id: str):
"""Record a request timestamp"""
if model_id not in self.request_timestamps:
self.request_timestamps[model_id] = []
self.request_timestamps[model_id].append(time.time())
self._clean_old_data(model_id)
def _clean_old_data(self, model_id: str):
"""Remove old timestamps"""
cutoff_time = time.time() - self.window_size
self.request_timestamps[model_id] = [
t for t in self.request_timestamps[model_id]
if t > cutoff_time
]
def get_requests_per_second(self, model_id: str) -> float:
"""Calculate requests per second"""
timestamps = self.request_timestamps.get(model_id, [])
if not timestamps:
return 0.0
# Calculate RPS over the window
oldest_time = min(timestamps)
newest_time = max(timestamps)
time_span = newest_time - oldest_time
if time_span == 0:
return len(timestamps)
return len(timestamps) / time_span
Health Check Implementation
1. Basic Health Check
import aiohttp
import asyncio
from typing import Dict, List
class AIHealthChecker:
def __init__(self, models: List[Dict]):
self.models = models
self.health_status = {model['id']: True for model in models}
self.last_check = {model['id']: 0 for model in models}
self.check_interval = 30 # seconds
async def check_model_health(self, model: Dict) -> bool:
"""Check health of a specific model"""
try:
# Simple health check request
health_prompt = "Hello"
start_time = time.time()
response = await self._call_model(model, health_prompt)
response_time = time.time() - start_time
# Check if response is valid
if not response or response_time > 10: # 10 second timeout
return False
return True
except Exception as e:
print(f"Health check failed for {model['id']}: {e}")
return False
async def run_health_checks(self):
"""Run health checks for all models"""
while True:
tasks = []
for model in self.models:
task = asyncio.create_task(
self._check_single_model(model)
)
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
await asyncio.sleep(self.check_interval)
async def _check_single_model(self, model: Dict):
"""Check health of a single model"""
is_healthy = await self.check_model_health(model)
was_healthy = self.health_status[model['id']]
self.health_status[model['id']] = is_healthy
self.last_check[model['id']] = time.time()
# Log status changes
if is_healthy and not was_healthy:
print(f"✅ {model['id']} recovered")
elif not is_healthy and was_healthy:
print(f"❌ {model['id']} became unhealthy")
2. Advanced Health Check with Metrics
class AdvancedHealthChecker:
def __init__(self, models: List[Dict]):
self.models = models
self.response_monitor = ResponseTimeMonitor()
self.error_monitor = ErrorRateMonitor()
self.throughput_monitor = ThroughputMonitor()
self.health_status = {model['id']: True for model in models}
async def comprehensive_health_check(self, model: Dict) -> Dict:
"""Perform comprehensive health check"""
health_result = {
'model_id': model['id'],
'timestamp': time.time(),
'overall_healthy': True,
'metrics': {},
'issues': []
}
try:
# Test basic functionality
test_prompt = "Generate a simple response"
start_time = time.time()
response = await self._call_model(model, test_prompt)
response_time = time.time() - start_time
# Record metrics
self.response_monitor.record_response_time(model['id'], response_time)
self.throughput_monitor.record_request(model['id'])
self.error_monitor.record_request(model['id'], True)
# Check response time
if response_time > 5: # 5 second threshold
health_result['issues'].append(f"High response time: {response_time:.2f}s")
health_result['overall_healthy'] = False
# Check response quality
if not response or len(response) < 10:
health_result['issues'].append("Poor response quality")
health_result['overall_healthy'] = False
# Get aggregated metrics
health_result['metrics'] = {
'response_time': self.response_monitor.get_metrics(model['id']),
'error_rate': self.error_monitor.get_error_rate(model['id']),
'throughput': self.throughput_monitor.get_requests_per_second(model['id'])
}
# Check error rate
if health_result['metrics']['error_rate'] > 0.05: # 5% threshold
health_result['issues'].append(f"High error rate: {health_result['metrics']['error_rate']:.2%}")
health_result['overall_healthy'] = False
except Exception as e:
self.error_monitor.record_request(model['id'], False, str(type(e).__name__))
health_result['issues'].append(f"Health check failed: {str(e)}")
health_result['overall_healthy'] = False
# Update health status
self.health_status[model['id']] = health_result['overall_healthy']
return health_result
Alerting System
1. Alert Manager
from enum import Enum
import smtplib
from email.mime.text import MIMEText
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
class AlertManager:
def __init__(self, config: Dict):
self.config = config
self.alert_history = []
self.alert_cooldowns = {} # Prevent alert spam
async def send_alert(self, model_id: str, severity: AlertSeverity, message: str):
"""Send an alert"""
alert_id = f"{model_id}_{severity.value}_{int(time.time())}"
# Check cooldown
if self._is_in_cooldown(alert_id):
return
alert = {
'id': alert_id,
'model_id': model_id,
'severity': severity,
'message': message,
'timestamp': time.time()
}
# Send alert based on severity
if severity == AlertSeverity.CRITICAL:
await self._send_critical_alert(alert)
elif severity == AlertSeverity.WARNING:
await self._send_warning_alert(alert)
else:
await self._send_info_alert(alert)
# Record alert
self.alert_history.append(alert)
self._set_cooldown(alert_id)
async def _send_critical_alert(self, alert: Dict):
"""Send critical alert via multiple channels"""
# Email
await self._send_email_alert(alert, "CRITICAL")
# Slack/Teams
await self._send_slack_alert(alert)
# PagerDuty
await self._send_pagerduty_alert(alert)
async def _send_warning_alert(self, alert: Dict):
"""Send warning alert"""
await self._send_email_alert(alert, "WARNING")
await self._send_slack_alert(alert)
async def _send_info_alert(self, alert: Dict):
"""Send info alert"""
await self._send_slack_alert(alert)
def _is_in_cooldown(self, alert_id: str) -> bool:
"""Check if alert is in cooldown period"""
if alert_id in self.alert_cooldowns:
cooldown_until = self.alert_cooldowns[alert_id]
if time.time() < cooldown_until:
return True
return False
def _set_cooldown(self, alert_id: str):
"""Set cooldown for an alert"""
cooldown_duration = 300 # 5 minutes
self.alert_cooldowns[alert_id] = time.time() + cooldown_duration
2. Threshold-Based Alerting
class ThresholdAlerting:
def __init__(self, alert_manager: AlertManager):
self.alert_manager = alert_manager
self.thresholds = {
'response_time_p95': 3.0, # seconds
'response_time_p99': 5.0, # seconds
'error_rate': 0.05, # 5%
'throughput_min': 1.0, # requests per second
'health_check_failures': 3 # consecutive failures
}
async def check_thresholds(self, model_id: str, metrics: Dict):
"""Check metrics against thresholds and send alerts"""
# Check response time P95
p95_time = metrics['response_time']['p95_response_time']
if p95_time > self.thresholds['response_time_p95']:
await self.alert_manager.send_alert(
model_id,
AlertSeverity.WARNING,
f"P95 response time ({p95_time:.2f}s) exceeds threshold ({self.thresholds['response_time_p95']}s)"
)
# Check response time P99
p99_time = metrics['response_time']['p99_response_time']
if p99_time > self.thresholds['response_time_p99']:
await self.alert_manager.send_alert(
model_id,
AlertSeverity.CRITICAL,
f"P99 response time ({p99_time:.2f}s) exceeds threshold ({self.thresholds['response_time_p99']}s)"
)
# Check error rate
error_rate = metrics['error_rate']
if error_rate > self.thresholds['error_rate']:
await self.alert_manager.send_alert(
model_id,
AlertSeverity.CRITICAL,
f"Error rate ({error_rate:.2%}) exceeds threshold ({self.thresholds['error_rate']:.2%})"
)
# Check throughput
throughput = metrics['throughput']
if throughput < self.thresholds['throughput_min']:
await self.alert_manager.send_alert(
model_id,
AlertSeverity.WARNING,
f"Throughput ({throughput:.2f} req/s) below minimum ({self.thresholds['throughput_min']} req/s)"
)
Dashboard and Visualization
1. Metrics Dashboard
class MetricsDashboard:
def __init__(self):
self.metrics_history = {}
def add_metrics(self, model_id: str, metrics: Dict):
"""Add metrics to dashboard"""
if model_id not in self.metrics_history:
self.metrics_history[model_id] = []
self.metrics_history[model_id].append({
'timestamp': time.time(),
'metrics': metrics
})
# Keep only last 1000 data points
if len(self.metrics_history[model_id]) > 1000:
self.metrics_history[model_id].pop(0)
def get_model_summary(self, model_id: str) -> Dict:
"""Get summary metrics for a model"""
history = self.metrics_history.get(model_id, [])
if not history:
return {}
# Calculate summary statistics
response_times = [h['metrics']['response_time']['avg_response_time'] for h in history]
error_rates = [h['metrics']['error_rate'] for h in history]
throughputs = [h['metrics']['throughput'] for h in history]
return {
'avg_response_time': statistics.mean(response_times),
'avg_error_rate': statistics.mean(error_rates),
'avg_throughput': statistics.mean(throughputs),
'total_requests': len(history),
'last_updated': history[-1]['timestamp']
}
def get_health_status(self) -> Dict:
"""Get health status of all models"""
status = {}
for model_id in self.metrics_history:
recent_metrics = self.metrics_history[model_id][-10:] # Last 10 data points
if not recent_metrics:
status[model_id] = 'unknown'
continue
# Determine health based on recent metrics
avg_error_rate = statistics.mean([m['metrics']['error_rate'] for m in recent_metrics])
avg_response_time = statistics.mean([m['metrics']['response_time']['avg_response_time'] for m in recent_metrics])
if avg_error_rate > 0.1 or avg_response_time > 10:
status[model_id] = 'unhealthy'
elif avg_error_rate > 0.05 or avg_response_time > 5:
status[model_id] = 'degraded'
else:
status[model_id] = 'healthy'
return status
Implementation Example
class ProductionAIHealthMonitor:
def __init__(self, models: List[Dict], alert_config: Dict):
self.models = models
self.health_checker = AdvancedHealthChecker(models)
self.alert_manager = AlertManager(alert_config)
self.threshold_alerter = ThresholdAlerting(self.alert_manager)
self.dashboard = MetricsDashboard()
async def start_monitoring(self):
"""Start comprehensive health monitoring"""
print("🚀 Starting AI health monitoring...")
# Start health check loop
health_task = asyncio.create_task(self._health_check_loop())
# Start metrics collection
metrics_task = asyncio.create_task(self._metrics_collection_loop())
# Wait for both tasks
await asyncio.gather(health_task, metrics_task)
async def _health_check_loop(self):
"""Run periodic health checks"""
while True:
for model in self.models:
try:
health_result = await self.health_checker.comprehensive_health_check(model)
# Update dashboard
self.dashboard.add_metrics(model['id'], health_result['metrics'])
# Check thresholds and send alerts
await self.threshold_alerter.check_thresholds(model['id'], health_result['metrics'])
# Send health status alerts
if not health_result['overall_healthy']:
await self.alert_manager.send_alert(
model['id'],
AlertSeverity.CRITICAL,
f"Model health check failed: {', '.join(health_result['issues'])}"
)
except Exception as e:
print(f"Error in health check for {model['id']}: {e}")
await asyncio.sleep(30) # Check every 30 seconds
async def _metrics_collection_loop(self):
"""Collect additional metrics"""
while True:
# Collect system metrics, cost metrics, etc.
await asyncio.sleep(60) # Collect every minute
## Usage
```python
models = [
{'id': 'gpt-4', 'endpoint': 'https://api.openai.com/v1'},
{'id': 'claude-3', 'endpoint': 'https://api.anthropic.com/v1'},
{'id': 'gemini-pro', 'endpoint': 'https://generativelanguage.googleapis.com/v1'}
]
alert_config = {
'email': {'smtp_server': 'smtp.gmail.com', 'port': 587},
'slack': {'webhook_url': 'https://hooks.slack.com/...'},
'pagerduty': {'api_key': '...'}
}
monitor = ProductionAIHealthMonitor(models, alert_config)
await monitor.start_monitoring()
Best Practices
- Monitor Key Metrics: Response time, error rate, throughput, and availability
- Set Appropriate Thresholds: Base thresholds on historical data and business requirements
- Implement Alerting: Use multiple channels for critical alerts
- Avoid Alert Fatigue: Use cooldowns and escalation policies
- Visualize Data: Create dashboards for real-time monitoring
- Test Monitoring: Regularly test alerting and health checks
- Document Procedures: Maintain runbooks for common issues
- Automate Recovery: Implement automated responses to common problems
Conclusion
Comprehensive AI model health monitoring is essential for maintaining reliable AI services. By implementing proper metrics collection, health checks, alerting, and visualization, you can ensure your AI models remain healthy and performant.
The key is to start with basic monitoring and gradually add more sophisticated features as your needs grow.