Effective troubleshooting is essential for maintaining optimal AI model performance. This guide provides systematic approaches, diagnostic tools, and optimization techniques for identifying and resolving AI performance issues.
Managed Performance Solution
Tetrate Agent Router Service provides comprehensive performance monitoring and troubleshooting capabilities with real-time metrics, automated diagnostics, and optimization recommendations. This managed service helps identify and resolve performance issues before they impact your users.
- Real-time performance monitoring
- Automated diagnostics and alerts
- Performance optimization recommendations
- Cross-provider performance analysis
Understanding Performance Issues
Common Performance Problems
- High Response Times: Slow model inference and API responses
- Memory Issues: Excessive memory usage and out-of-memory errors
- Throughput Bottlenecks: Limited requests per second
- Cost Inefficiency: High costs for model usage
- Inconsistent Performance: Variable response times and quality
Performance Metrics Framework
import time
import psutil
import statistics
from typing import Dict, List, Optional
class PerformanceMetrics:
def __init__(self):
self.metrics = {
'response_time': [],
'memory_usage': [],
'cpu_usage': [],
'throughput': [],
'error_rate': [],
'cost_per_request': []
}
self.baseline_metrics = {}
def record_metric(self, metric_type: str, value: float, timestamp: float = None):
"""Record a performance metric"""
if timestamp is None:
timestamp = time.time()
if metric_type in self.metrics:
self.metrics[metric_type].append({
'value': value,
'timestamp': timestamp
})
# Keep only recent metrics (last 1000)
if len(self.metrics[metric_type]) > 1000:
self.metrics[metric_type].pop(0)
def calculate_statistics(self, metric_type: str, window_minutes: int = 60) -> Dict:
"""Calculate statistics for a metric over a time window"""
if metric_type not in self.metrics:
return {}
cutoff_time = time.time() - (window_minutes * 60)
recent_metrics = [
m for m in self.metrics[metric_type]
if m['timestamp'] >= cutoff_time
]
if not recent_metrics:
return {}
values = [m['value'] for m in recent_metrics]
return {
'count': len(values),
'mean': statistics.mean(values),
'median': statistics.median(values),
'p95': self._calculate_percentile(values, 95),
'p99': self._calculate_percentile(values, 99),
'min': min(values),
'max': max(values),
'std_dev': statistics.stdev(values) if len(values) > 1 else 0
}
def _calculate_percentile(self, values: List[float], percentile: int) -> float:
"""Calculate percentile of values"""
if not values:
return 0
sorted_values = sorted(values)
index = (percentile / 100) * (len(sorted_values) - 1)
if index.is_integer():
return sorted_values[int(index)]
else:
lower = sorted_values[int(index)]
upper = sorted_values[int(index) + 1]
return lower + (upper - lower) * (index - int(index))
def set_baseline(self, metric_type: str, baseline_value: float):
"""Set baseline value for a metric"""
self.baseline_metrics[metric_type] = baseline_value
def compare_to_baseline(self, metric_type: str) -> Dict:
"""Compare current performance to baseline"""
if metric_type not in self.baseline_metrics:
return {}
current_stats = self.calculate_statistics(metric_type)
baseline = self.baseline_metrics[metric_type]
if not current_stats:
return {}
return {
'baseline': baseline,
'current_mean': current_stats['mean'],
'difference': current_stats['mean'] - baseline,
'percentage_change': ((current_stats['mean'] - baseline) / baseline) * 100,
'status': 'improved' if current_stats['mean'] < baseline else 'degraded'
}
Systematic Troubleshooting Approach
1. Performance Issue Identification
class PerformanceTroubleshooter:
def __init__(self):
self.troubleshooting_steps = [
'identify_symptoms',
'gather_metrics',
'analyze_patterns',
'identify_root_cause',
'implement_solution',
'verify_fix'
]
self.performance_metrics = PerformanceMetrics()
async def troubleshoot_performance_issue(self, issue_description: str) -> Dict:
"""Systematic troubleshooting process"""
troubleshooting_result = {
'issue_description': issue_description,
'steps_completed': [],
'findings': {},
'root_cause': None,
'solution': None,
'verification': None
}
# Step 1: Identify symptoms
symptoms = await self._identify_symptoms(issue_description)
troubleshooting_result['steps_completed'].append('identify_symptoms')
troubleshooting_result['findings']['symptoms'] = symptoms
# Step 2: Gather metrics
metrics = await self._gather_performance_metrics(symptoms)
troubleshooting_result['steps_completed'].append('gather_metrics')
troubleshooting_result['findings']['metrics'] = metrics
# Step 3: Analyze patterns
patterns = await self._analyze_performance_patterns(metrics)
troubleshooting_result['steps_completed'].append('analyze_patterns')
troubleshooting_result['findings']['patterns'] = patterns
# Step 4: Identify root cause
root_cause = await self._identify_root_cause(patterns, symptoms)
troubleshooting_result['steps_completed'].append('identify_root_cause')
troubleshooting_result['root_cause'] = root_cause
# Step 5: Implement solution
solution = await self._implement_solution(root_cause)
troubleshooting_result['steps_completed'].append('implement_solution')
troubleshooting_result['solution'] = solution
# Step 6: Verify fix
verification = await self._verify_solution(solution, metrics)
troubleshooting_result['steps_completed'].append('verify_fix')
troubleshooting_result['verification'] = verification
return troubleshooting_result
async def _identify_symptoms(self, issue_description: str) -> Dict:
"""Identify specific symptoms from issue description"""
symptoms = {
'response_time_issues': False,
'memory_issues': False,
'throughput_issues': False,
'cost_issues': False,
'quality_issues': False
}
# Analyze issue description for keywords
description_lower = issue_description.lower()
if any(word in description_lower for word in ['slow', 'timeout', 'response time']):
symptoms['response_time_issues'] = True
if any(word in description_lower for word in ['memory', 'oom', 'out of memory']):
symptoms['memory_issues'] = True
if any(word in description_lower for word in ['throughput', 'requests per second', 'rps']):
symptoms['throughput_issues'] = True
if any(word in description_lower for word in ['cost', 'expensive', 'billing']):
symptoms['cost_issues'] = True
if any(word in description_lower for word in ['quality', 'accuracy', 'results']):
symptoms['quality_issues'] = True
return symptoms
async def _gather_performance_metrics(self, symptoms: Dict) -> Dict:
"""Gather relevant performance metrics"""
metrics = {}
# Gather system metrics
metrics['system'] = {
'cpu_usage': psutil.cpu_percent(interval=1),
'memory_usage': psutil.virtual_memory().percent,
'disk_usage': psutil.disk_usage('/').percent
}
# Gather application metrics
if symptoms['response_time_issues']:
metrics['response_time'] = self.performance_metrics.calculate_statistics('response_time')
if symptoms['memory_issues']:
metrics['memory_usage'] = self.performance_metrics.calculate_statistics('memory_usage')
if symptoms['throughput_issues']:
metrics['throughput'] = self.performance_metrics.calculate_statistics('throughput')
if symptoms['cost_issues']:
metrics['cost'] = self.performance_metrics.calculate_statistics('cost_per_request')
return metrics
2. Performance Pattern Analysis
class PerformanceAnalyzer:
def __init__(self):
self.analysis_patterns = {
'gradual_degradation': self._detect_gradual_degradation,
'sudden_drop': self._detect_sudden_drop,
'periodic_issues': self._detect_periodic_issues,
'spikes': self._detect_performance_spikes
}
async def analyze_performance_patterns(self, metrics: Dict) -> Dict:
"""Analyze performance patterns to identify issues"""
patterns = {}
for pattern_name, pattern_func in self.analysis_patterns.items():
try:
pattern_result = await pattern_func(metrics)
if pattern_result:
patterns[pattern_name] = pattern_result
except Exception as e:
print(f"Error analyzing pattern {pattern_name}: {e}")
return patterns
async def _detect_gradual_degradation(self, metrics: Dict) -> Optional[Dict]:
"""Detect gradual performance degradation"""
if 'response_time' not in metrics:
return None
response_times = metrics['response_time']
if response_times.get('count', 0) < 10:
return None
# Calculate trend over time
trend = self._calculate_trend(response_times.get('values', []))
if trend['slope'] > 0.1: # Increasing response times
return {
'detected': True,
'slope': trend['slope'],
'severity': 'high' if trend['slope'] > 0.5 else 'medium',
'description': f"Response times increasing by {trend['slope']:.2f}ms per request"
}
return None
async def _detect_sudden_drop(self, metrics: Dict) -> Optional[Dict]:
"""Detect sudden performance drops"""
if 'response_time' not in metrics:
return None
response_times = metrics['response_time']
if response_times.get('count', 0) < 5:
return None
values = response_times.get('values', [])
if len(values) < 5:
return None
# Check for sudden increase in response time
recent_avg = statistics.mean(values[-5:])
previous_avg = statistics.mean(values[-10:-5]) if len(values) >= 10 else values[0]
if recent_avg > previous_avg * 2: # 2x increase
return {
'detected': True,
'increase_factor': recent_avg / previous_avg,
'severity': 'high',
'description': f"Response time increased by {recent_avg / previous_avg:.1f}x"
}
return None
async def _detect_periodic_issues(self, metrics: Dict) -> Optional[Dict]:
"""Detect periodic performance issues"""
# This would require more sophisticated time series analysis
# For now, check for patterns in error rates
if 'error_rate' not in metrics:
return None
error_rates = metrics['error_rate']
if error_rates.get('count', 0) < 20:
return None
# Simple pattern detection
values = error_rates.get('values', [])
if len(values) < 20:
return None
# Check for recurring high error rates
high_error_periods = [i for i, v in enumerate(values) if v > 0.1] # >10% error rate
if len(high_error_periods) > 3:
return {
'detected': True,
'periods': len(high_error_periods),
'severity': 'medium',
'description': f"Detected {len(high_error_periods)} periods of high error rates"
}
return None
def _calculate_trend(self, values: List[float]) -> Dict:
"""Calculate trend of values"""
if len(values) < 2:
return {'slope': 0, 'intercept': 0}
# Simple linear regression
n = len(values)
x = list(range(n))
sum_x = sum(x)
sum_y = sum(values)
sum_xy = sum(x[i] * values[i] for i in range(n))
sum_x2 = sum(x[i] ** 2 for i in range(n))
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
intercept = (sum_y - slope * sum_x) / n
return {'slope': slope, 'intercept': intercept}
Specific Performance Issues and Solutions
1. High Response Time Troubleshooting
class ResponseTimeTroubleshooter:
def __init__(self):
self.response_time_thresholds = {
'acceptable': 2.0, # seconds
'warning': 5.0,
'critical': 10.0
}
async def troubleshoot_response_time(self, metrics: Dict) -> Dict:
"""Troubleshoot high response time issues"""
response_time_stats = metrics.get('response_time', {})
if not response_time_stats:
return {'error': 'No response time metrics available'}
p95_response_time = response_time_stats.get('p95', 0)
# Determine severity
if p95_response_time > self.response_time_thresholds['critical']:
severity = 'critical'
elif p95_response_time > self.response_time_thresholds['warning']:
severity = 'warning'
elif p95_response_time > self.response_time_thresholds['acceptable']:
severity = 'notice'
else:
severity = 'normal'
# Analyze potential causes
causes = await self._analyze_response_time_causes(metrics)
# Generate solutions
solutions = await self._generate_response_time_solutions(causes, severity)
return {
'severity': severity,
'p95_response_time': p95_response_time,
'causes': causes,
'solutions': solutions,
'priority': 'high' if severity in ['critical', 'warning'] else 'medium'
}
async def _analyze_response_time_causes(self, metrics: Dict) -> List[Dict]:
"""Analyze potential causes of high response times"""
causes = []
# Check system resources
system_metrics = metrics.get('system', {})
if system_metrics.get('cpu_usage', 0) > 80:
causes.append({
'type': 'system_resource',
'component': 'cpu',
'description': 'High CPU usage may be causing response time delays',
'confidence': 'high'
})
if system_metrics.get('memory_usage', 0) > 90:
causes.append({
'type': 'system_resource',
'component': 'memory',
'description': 'High memory usage may be causing swapping and delays',
'confidence': 'high'
})
# Check network issues
if 'network_latency' in metrics:
network_latency = metrics['network_latency'].get('mean', 0)
if network_latency > 100: # 100ms
causes.append({
'type': 'network',
'component': 'latency',
'description': f'High network latency ({network_latency:.1f}ms)',
'confidence': 'medium'
})
# Check model-specific issues
if 'model_load_time' in metrics:
load_time = metrics['model_load_time'].get('mean', 0)
if load_time > 1.0: # 1 second
causes.append({
'type': 'model',
'component': 'load_time',
'description': f'Slow model loading ({load_time:.1f}s)',
'confidence': 'medium'
})
return causes
async def _generate_response_time_solutions(self, causes: List[Dict], severity: str) -> List[Dict]:
"""Generate solutions for response time issues"""
solutions = []
for cause in causes:
if cause['type'] == 'system_resource':
if cause['component'] == 'cpu':
solutions.append({
'type': 'optimization',
'description': 'Optimize CPU usage through request batching and caching',
'implementation': 'medium',
'expected_improvement': '20-40%'
})
elif cause['component'] == 'memory':
solutions.append({
'type': 'optimization',
'description': 'Implement memory management and garbage collection optimization',
'implementation': 'medium',
'expected_improvement': '15-30%'
})
elif cause['type'] == 'network':
solutions.append({
'type': 'infrastructure',
'description': 'Use CDN or move to closer geographic region',
'implementation': 'high',
'expected_improvement': '30-50%'
})
elif cause['type'] == 'model':
solutions.append({
'type': 'model_optimization',
'description': 'Implement model caching and warm-up procedures',
'implementation': 'medium',
'expected_improvement': '40-60%'
})
# Add general solutions for critical issues
if severity == 'critical':
solutions.append({
'type': 'emergency',
'description': 'Implement request queuing and rate limiting',
'implementation': 'low',
'expected_improvement': 'Immediate relief'
})
return solutions
2. Memory Usage Troubleshooting
class MemoryTroubleshooter:
def __init__(self):
self.memory_thresholds = {
'warning': 80, # percentage
'critical': 95
}
async def troubleshoot_memory_usage(self, metrics: Dict) -> Dict:
"""Troubleshoot memory usage issues"""
memory_stats = metrics.get('memory_usage', {})
system_memory = metrics.get('system', {}).get('memory_usage', 0)
if not memory_stats and system_memory == 0:
return {'error': 'No memory metrics available'}
# Determine severity
if system_memory > self.memory_thresholds['critical']:
severity = 'critical'
elif system_memory > self.memory_thresholds['warning']:
severity = 'warning'
else:
severity = 'normal'
# Analyze memory patterns
memory_patterns = await self._analyze_memory_patterns(metrics)
# Generate solutions
solutions = await self._generate_memory_solutions(memory_patterns, severity)
return {
'severity': severity,
'system_memory_usage': system_memory,
'memory_patterns': memory_patterns,
'solutions': solutions,
'priority': 'high' if severity == 'critical' else 'medium'
}
async def _analyze_memory_patterns(self, metrics: Dict) -> Dict:
"""Analyze memory usage patterns"""
patterns = {
'memory_leak': False,
'high_peak_usage': False,
'gradual_increase': False,
'spikes': False
}
memory_stats = metrics.get('memory_usage', {})
if not memory_stats:
return patterns
values = memory_stats.get('values', [])
if len(values) < 10:
return patterns
# Check for memory leak (gradual increase)
trend = self._calculate_trend(values)
if trend['slope'] > 0.1: # Increasing memory usage
patterns['gradual_increase'] = True
patterns['memory_leak'] = True
# Check for high peak usage
if max(values) > 90:
patterns['high_peak_usage'] = True
# Check for memory spikes
if len(values) >= 3:
for i in range(1, len(values) - 1):
if values[i] > values[i-1] * 1.5 and values[i] > values[i+1] * 1.5:
patterns['spikes'] = True
break
return patterns
async def _generate_memory_solutions(self, patterns: Dict, severity: str) -> List[Dict]:
"""Generate solutions for memory issues"""
solutions = []
if patterns['memory_leak']:
solutions.append({
'type': 'debugging',
'description': 'Investigate and fix memory leaks in application code',
'implementation': 'high',
'expected_improvement': 'Significant reduction in memory usage'
})
if patterns['high_peak_usage']:
solutions.append({
'type': 'optimization',
'description': 'Implement memory pooling and object reuse',
'implementation': 'medium',
'expected_improvement': '20-40% reduction in peak usage'
})
if patterns['spikes']:
solutions.append({
'type': 'monitoring',
'description': 'Add memory spike detection and alerting',
'implementation': 'low',
'expected_improvement': 'Better visibility into memory issues'
})
# Add general memory optimization
solutions.append({
'type': 'optimization',
'description': 'Implement garbage collection tuning and memory limits',
'implementation': 'medium',
'expected_improvement': '10-30% improvement'
})
return solutions
Performance Optimization Techniques
1. Caching Strategies
class PerformanceOptimizer:
def __init__(self):
self.optimization_strategies = {
'caching': self._optimize_caching,
'batching': self._optimize_batching,
'connection_pooling': self._optimize_connections,
'model_optimization': self._optimize_model
}
async def optimize_performance(self, issues: Dict) -> Dict:
"""Apply performance optimizations based on identified issues"""
optimization_results = {}
for strategy_name, strategy_func in self.optimization_strategies.items():
try:
result = await strategy_func(issues)
if result:
optimization_results[strategy_name] = result
except Exception as e:
print(f"Error applying optimization {strategy_name}: {e}")
return optimization_results
async def _optimize_caching(self, issues: Dict) -> Dict:
"""Optimize caching for better performance"""
cache_config = {
'enable_response_caching': True,
'cache_ttl': 3600, # 1 hour
'cache_size': '1GB',
'cache_strategy': 'lru',
'cache_warmup': True
}
# Implement caching based on issues
if 'response_time' in issues:
cache_config['cache_ttl'] = 7200 # 2 hours for response time issues
return {
'config': cache_config,
'estimated_improvement': '20-40% response time reduction',
'implementation_effort': 'medium'
}
async def _optimize_batching(self, issues: Dict) -> Dict:
"""Optimize request batching"""
batching_config = {
'enable_batching': True,
'batch_size': 10,
'batch_timeout': 100, # milliseconds
'batch_strategy': 'time_and_size'
}
# Adjust based on throughput issues
if 'throughput' in issues:
batching_config['batch_size'] = 20
batching_config['batch_timeout'] = 50
return {
'config': batching_config,
'estimated_improvement': '30-50% throughput increase',
'implementation_effort': 'medium'
}
Best Practices
- Monitor Continuously: Implement comprehensive performance monitoring
- Set Baselines: Establish performance baselines for comparison
- Use Systematic Approach: Follow structured troubleshooting methodology
- Document Issues: Keep detailed records of performance issues and solutions
- Test Solutions: Verify fixes before deploying to production
- Optimize Proactively: Implement optimizations before issues become critical
- Use Multiple Tools: Combine different monitoring and analysis tools
- Learn from Incidents: Document lessons learned and improve processes
Conclusion
Effective AI model performance troubleshooting requires a systematic approach, comprehensive monitoring, and targeted optimization strategies. By implementing these techniques and best practices, you can identify and resolve performance issues quickly and efficiently.
The key is to establish good monitoring practices, follow systematic troubleshooting procedures, and implement appropriate optimizations based on the specific issues identified.