AI Model Performance Troubleshooting

Master AI model performance troubleshooting techniques. Learn systematic approaches, performance analysis, and optimization strategies for resolving AI performance issues.

Effective troubleshooting is essential for maintaining optimal AI model performance. This guide provides systematic approaches, diagnostic tools, and optimization techniques for identifying and resolving AI performance issues.

🚀

Managed Performance Solution

Tetrate Agent Router Service provides comprehensive performance monitoring and troubleshooting capabilities with real-time metrics, automated diagnostics, and optimization recommendations. This managed service helps identify and resolve performance issues before they impact your users.

  • Real-time performance monitoring
  • Automated diagnostics and alerts
  • Performance optimization recommendations
  • Cross-provider performance analysis
Learn More →

Understanding Performance Issues

Common Performance Problems

  • High Response Times: Slow model inference and API responses
  • Memory Issues: Excessive memory usage and out-of-memory errors
  • Throughput Bottlenecks: Limited requests per second
  • Cost Inefficiency: High costs for model usage
  • Inconsistent Performance: Variable response times and quality

Performance Metrics Framework

import time
import psutil
import statistics
from typing import Dict, List, Optional

class PerformanceMetrics:
    def __init__(self):
        self.metrics = {
            'response_time': [],
            'memory_usage': [],
            'cpu_usage': [],
            'throughput': [],
            'error_rate': [],
            'cost_per_request': []
        }
        self.baseline_metrics = {}
    
    def record_metric(self, metric_type: str, value: float, timestamp: float = None):
        """Record a performance metric"""
        if timestamp is None:
            timestamp = time.time()
        
        if metric_type in self.metrics:
            self.metrics[metric_type].append({
                'value': value,
                'timestamp': timestamp
            })
            
            # Keep only recent metrics (last 1000)
            if len(self.metrics[metric_type]) > 1000:
                self.metrics[metric_type].pop(0)
    
    def calculate_statistics(self, metric_type: str, window_minutes: int = 60) -> Dict:
        """Calculate statistics for a metric over a time window"""
        if metric_type not in self.metrics:
            return {}
        
        cutoff_time = time.time() - (window_minutes * 60)
        recent_metrics = [
            m for m in self.metrics[metric_type]
            if m['timestamp'] >= cutoff_time
        ]
        
        if not recent_metrics:
            return {}
        
        values = [m['value'] for m in recent_metrics]
        
        return {
            'count': len(values),
            'mean': statistics.mean(values),
            'median': statistics.median(values),
            'p95': self._calculate_percentile(values, 95),
            'p99': self._calculate_percentile(values, 99),
            'min': min(values),
            'max': max(values),
            'std_dev': statistics.stdev(values) if len(values) > 1 else 0
        }
    
    def _calculate_percentile(self, values: List[float], percentile: int) -> float:
        """Calculate percentile of values"""
        if not values:
            return 0
        
        sorted_values = sorted(values)
        index = (percentile / 100) * (len(sorted_values) - 1)
        
        if index.is_integer():
            return sorted_values[int(index)]
        else:
            lower = sorted_values[int(index)]
            upper = sorted_values[int(index) + 1]
            return lower + (upper - lower) * (index - int(index))
    
    def set_baseline(self, metric_type: str, baseline_value: float):
        """Set baseline value for a metric"""
        self.baseline_metrics[metric_type] = baseline_value
    
    def compare_to_baseline(self, metric_type: str) -> Dict:
        """Compare current performance to baseline"""
        if metric_type not in self.baseline_metrics:
            return {}
        
        current_stats = self.calculate_statistics(metric_type)
        baseline = self.baseline_metrics[metric_type]
        
        if not current_stats:
            return {}
        
        return {
            'baseline': baseline,
            'current_mean': current_stats['mean'],
            'difference': current_stats['mean'] - baseline,
            'percentage_change': ((current_stats['mean'] - baseline) / baseline) * 100,
            'status': 'improved' if current_stats['mean'] < baseline else 'degraded'
        }

Systematic Troubleshooting Approach

1. Performance Issue Identification

class PerformanceTroubleshooter:
    def __init__(self):
        self.troubleshooting_steps = [
            'identify_symptoms',
            'gather_metrics',
            'analyze_patterns',
            'identify_root_cause',
            'implement_solution',
            'verify_fix'
        ]
        self.performance_metrics = PerformanceMetrics()
    
    async def troubleshoot_performance_issue(self, issue_description: str) -> Dict:
        """Systematic troubleshooting process"""
        troubleshooting_result = {
            'issue_description': issue_description,
            'steps_completed': [],
            'findings': {},
            'root_cause': None,
            'solution': None,
            'verification': None
        }
        
        # Step 1: Identify symptoms
        symptoms = await self._identify_symptoms(issue_description)
        troubleshooting_result['steps_completed'].append('identify_symptoms')
        troubleshooting_result['findings']['symptoms'] = symptoms
        
        # Step 2: Gather metrics
        metrics = await self._gather_performance_metrics(symptoms)
        troubleshooting_result['steps_completed'].append('gather_metrics')
        troubleshooting_result['findings']['metrics'] = metrics
        
        # Step 3: Analyze patterns
        patterns = await self._analyze_performance_patterns(metrics)
        troubleshooting_result['steps_completed'].append('analyze_patterns')
        troubleshooting_result['findings']['patterns'] = patterns
        
        # Step 4: Identify root cause
        root_cause = await self._identify_root_cause(patterns, symptoms)
        troubleshooting_result['steps_completed'].append('identify_root_cause')
        troubleshooting_result['root_cause'] = root_cause
        
        # Step 5: Implement solution
        solution = await self._implement_solution(root_cause)
        troubleshooting_result['steps_completed'].append('implement_solution')
        troubleshooting_result['solution'] = solution
        
        # Step 6: Verify fix
        verification = await self._verify_solution(solution, metrics)
        troubleshooting_result['steps_completed'].append('verify_fix')
        troubleshooting_result['verification'] = verification
        
        return troubleshooting_result
    
    async def _identify_symptoms(self, issue_description: str) -> Dict:
        """Identify specific symptoms from issue description"""
        symptoms = {
            'response_time_issues': False,
            'memory_issues': False,
            'throughput_issues': False,
            'cost_issues': False,
            'quality_issues': False
        }
        
        # Analyze issue description for keywords
        description_lower = issue_description.lower()
        
        if any(word in description_lower for word in ['slow', 'timeout', 'response time']):
            symptoms['response_time_issues'] = True
        
        if any(word in description_lower for word in ['memory', 'oom', 'out of memory']):
            symptoms['memory_issues'] = True
        
        if any(word in description_lower for word in ['throughput', 'requests per second', 'rps']):
            symptoms['throughput_issues'] = True
        
        if any(word in description_lower for word in ['cost', 'expensive', 'billing']):
            symptoms['cost_issues'] = True
        
        if any(word in description_lower for word in ['quality', 'accuracy', 'results']):
            symptoms['quality_issues'] = True
        
        return symptoms
    
    async def _gather_performance_metrics(self, symptoms: Dict) -> Dict:
        """Gather relevant performance metrics"""
        metrics = {}
        
        # Gather system metrics
        metrics['system'] = {
            'cpu_usage': psutil.cpu_percent(interval=1),
            'memory_usage': psutil.virtual_memory().percent,
            'disk_usage': psutil.disk_usage('/').percent
        }
        
        # Gather application metrics
        if symptoms['response_time_issues']:
            metrics['response_time'] = self.performance_metrics.calculate_statistics('response_time')
        
        if symptoms['memory_issues']:
            metrics['memory_usage'] = self.performance_metrics.calculate_statistics('memory_usage')
        
        if symptoms['throughput_issues']:
            metrics['throughput'] = self.performance_metrics.calculate_statistics('throughput')
        
        if symptoms['cost_issues']:
            metrics['cost'] = self.performance_metrics.calculate_statistics('cost_per_request')
        
        return metrics

2. Performance Pattern Analysis

class PerformanceAnalyzer:
    def __init__(self):
        self.analysis_patterns = {
            'gradual_degradation': self._detect_gradual_degradation,
            'sudden_drop': self._detect_sudden_drop,
            'periodic_issues': self._detect_periodic_issues,
            'spikes': self._detect_performance_spikes
        }
    
    async def analyze_performance_patterns(self, metrics: Dict) -> Dict:
        """Analyze performance patterns to identify issues"""
        patterns = {}
        
        for pattern_name, pattern_func in self.analysis_patterns.items():
            try:
                pattern_result = await pattern_func(metrics)
                if pattern_result:
                    patterns[pattern_name] = pattern_result
            except Exception as e:
                print(f"Error analyzing pattern {pattern_name}: {e}")
        
        return patterns
    
    async def _detect_gradual_degradation(self, metrics: Dict) -> Optional[Dict]:
        """Detect gradual performance degradation"""
        if 'response_time' not in metrics:
            return None
        
        response_times = metrics['response_time']
        if response_times.get('count', 0) < 10:
            return None
        
        # Calculate trend over time
        trend = self._calculate_trend(response_times.get('values', []))
        
        if trend['slope'] > 0.1:  # Increasing response times
            return {
                'detected': True,
                'slope': trend['slope'],
                'severity': 'high' if trend['slope'] > 0.5 else 'medium',
                'description': f"Response times increasing by {trend['slope']:.2f}ms per request"
            }
        
        return None
    
    async def _detect_sudden_drop(self, metrics: Dict) -> Optional[Dict]:
        """Detect sudden performance drops"""
        if 'response_time' not in metrics:
            return None
        
        response_times = metrics['response_time']
        if response_times.get('count', 0) < 5:
            return None
        
        values = response_times.get('values', [])
        if len(values) < 5:
            return None
        
        # Check for sudden increase in response time
        recent_avg = statistics.mean(values[-5:])
        previous_avg = statistics.mean(values[-10:-5]) if len(values) >= 10 else values[0]
        
        if recent_avg > previous_avg * 2:  # 2x increase
            return {
                'detected': True,
                'increase_factor': recent_avg / previous_avg,
                'severity': 'high',
                'description': f"Response time increased by {recent_avg / previous_avg:.1f}x"
            }
        
        return None
    
    async def _detect_periodic_issues(self, metrics: Dict) -> Optional[Dict]:
        """Detect periodic performance issues"""
        # This would require more sophisticated time series analysis
        # For now, check for patterns in error rates
        if 'error_rate' not in metrics:
            return None
        
        error_rates = metrics['error_rate']
        if error_rates.get('count', 0) < 20:
            return None
        
        # Simple pattern detection
        values = error_rates.get('values', [])
        if len(values) < 20:
            return None
        
        # Check for recurring high error rates
        high_error_periods = [i for i, v in enumerate(values) if v > 0.1]  # >10% error rate
        
        if len(high_error_periods) > 3:
            return {
                'detected': True,
                'periods': len(high_error_periods),
                'severity': 'medium',
                'description': f"Detected {len(high_error_periods)} periods of high error rates"
            }
        
        return None
    
    def _calculate_trend(self, values: List[float]) -> Dict:
        """Calculate trend of values"""
        if len(values) < 2:
            return {'slope': 0, 'intercept': 0}
        
        # Simple linear regression
        n = len(values)
        x = list(range(n))
        
        sum_x = sum(x)
        sum_y = sum(values)
        sum_xy = sum(x[i] * values[i] for i in range(n))
        sum_x2 = sum(x[i] ** 2 for i in range(n))
        
        slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x ** 2)
        intercept = (sum_y - slope * sum_x) / n
        
        return {'slope': slope, 'intercept': intercept}

Specific Performance Issues and Solutions

1. High Response Time Troubleshooting

class ResponseTimeTroubleshooter:
    def __init__(self):
        self.response_time_thresholds = {
            'acceptable': 2.0,  # seconds
            'warning': 5.0,
            'critical': 10.0
        }
    
    async def troubleshoot_response_time(self, metrics: Dict) -> Dict:
        """Troubleshoot high response time issues"""
        response_time_stats = metrics.get('response_time', {})
        
        if not response_time_stats:
            return {'error': 'No response time metrics available'}
        
        p95_response_time = response_time_stats.get('p95', 0)
        
        # Determine severity
        if p95_response_time > self.response_time_thresholds['critical']:
            severity = 'critical'
        elif p95_response_time > self.response_time_thresholds['warning']:
            severity = 'warning'
        elif p95_response_time > self.response_time_thresholds['acceptable']:
            severity = 'notice'
        else:
            severity = 'normal'
        
        # Analyze potential causes
        causes = await self._analyze_response_time_causes(metrics)
        
        # Generate solutions
        solutions = await self._generate_response_time_solutions(causes, severity)
        
        return {
            'severity': severity,
            'p95_response_time': p95_response_time,
            'causes': causes,
            'solutions': solutions,
            'priority': 'high' if severity in ['critical', 'warning'] else 'medium'
        }
    
    async def _analyze_response_time_causes(self, metrics: Dict) -> List[Dict]:
        """Analyze potential causes of high response times"""
        causes = []
        
        # Check system resources
        system_metrics = metrics.get('system', {})
        
        if system_metrics.get('cpu_usage', 0) > 80:
            causes.append({
                'type': 'system_resource',
                'component': 'cpu',
                'description': 'High CPU usage may be causing response time delays',
                'confidence': 'high'
            })
        
        if system_metrics.get('memory_usage', 0) > 90:
            causes.append({
                'type': 'system_resource',
                'component': 'memory',
                'description': 'High memory usage may be causing swapping and delays',
                'confidence': 'high'
            })
        
        # Check network issues
        if 'network_latency' in metrics:
            network_latency = metrics['network_latency'].get('mean', 0)
            if network_latency > 100:  # 100ms
                causes.append({
                    'type': 'network',
                    'component': 'latency',
                    'description': f'High network latency ({network_latency:.1f}ms)',
                    'confidence': 'medium'
                })
        
        # Check model-specific issues
        if 'model_load_time' in metrics:
            load_time = metrics['model_load_time'].get('mean', 0)
            if load_time > 1.0:  # 1 second
                causes.append({
                    'type': 'model',
                    'component': 'load_time',
                    'description': f'Slow model loading ({load_time:.1f}s)',
                    'confidence': 'medium'
                })
        
        return causes
    
    async def _generate_response_time_solutions(self, causes: List[Dict], severity: str) -> List[Dict]:
        """Generate solutions for response time issues"""
        solutions = []
        
        for cause in causes:
            if cause['type'] == 'system_resource':
                if cause['component'] == 'cpu':
                    solutions.append({
                        'type': 'optimization',
                        'description': 'Optimize CPU usage through request batching and caching',
                        'implementation': 'medium',
                        'expected_improvement': '20-40%'
                    })
                elif cause['component'] == 'memory':
                    solutions.append({
                        'type': 'optimization',
                        'description': 'Implement memory management and garbage collection optimization',
                        'implementation': 'medium',
                        'expected_improvement': '15-30%'
                    })
            
            elif cause['type'] == 'network':
                solutions.append({
                    'type': 'infrastructure',
                    'description': 'Use CDN or move to closer geographic region',
                    'implementation': 'high',
                    'expected_improvement': '30-50%'
                })
            
            elif cause['type'] == 'model':
                solutions.append({
                    'type': 'model_optimization',
                    'description': 'Implement model caching and warm-up procedures',
                    'implementation': 'medium',
                    'expected_improvement': '40-60%'
                })
        
        # Add general solutions for critical issues
        if severity == 'critical':
            solutions.append({
                'type': 'emergency',
                'description': 'Implement request queuing and rate limiting',
                'implementation': 'low',
                'expected_improvement': 'Immediate relief'
            })
        
        return solutions

2. Memory Usage Troubleshooting

class MemoryTroubleshooter:
    def __init__(self):
        self.memory_thresholds = {
            'warning': 80,  # percentage
            'critical': 95
        }
    
    async def troubleshoot_memory_usage(self, metrics: Dict) -> Dict:
        """Troubleshoot memory usage issues"""
        memory_stats = metrics.get('memory_usage', {})
        system_memory = metrics.get('system', {}).get('memory_usage', 0)
        
        if not memory_stats and system_memory == 0:
            return {'error': 'No memory metrics available'}
        
        # Determine severity
        if system_memory > self.memory_thresholds['critical']:
            severity = 'critical'
        elif system_memory > self.memory_thresholds['warning']:
            severity = 'warning'
        else:
            severity = 'normal'
        
        # Analyze memory patterns
        memory_patterns = await self._analyze_memory_patterns(metrics)
        
        # Generate solutions
        solutions = await self._generate_memory_solutions(memory_patterns, severity)
        
        return {
            'severity': severity,
            'system_memory_usage': system_memory,
            'memory_patterns': memory_patterns,
            'solutions': solutions,
            'priority': 'high' if severity == 'critical' else 'medium'
        }
    
    async def _analyze_memory_patterns(self, metrics: Dict) -> Dict:
        """Analyze memory usage patterns"""
        patterns = {
            'memory_leak': False,
            'high_peak_usage': False,
            'gradual_increase': False,
            'spikes': False
        }
        
        memory_stats = metrics.get('memory_usage', {})
        if not memory_stats:
            return patterns
        
        values = memory_stats.get('values', [])
        if len(values) < 10:
            return patterns
        
        # Check for memory leak (gradual increase)
        trend = self._calculate_trend(values)
        if trend['slope'] > 0.1:  # Increasing memory usage
            patterns['gradual_increase'] = True
            patterns['memory_leak'] = True
        
        # Check for high peak usage
        if max(values) > 90:
            patterns['high_peak_usage'] = True
        
        # Check for memory spikes
        if len(values) >= 3:
            for i in range(1, len(values) - 1):
                if values[i] > values[i-1] * 1.5 and values[i] > values[i+1] * 1.5:
                    patterns['spikes'] = True
                    break
        
        return patterns
    
    async def _generate_memory_solutions(self, patterns: Dict, severity: str) -> List[Dict]:
        """Generate solutions for memory issues"""
        solutions = []
        
        if patterns['memory_leak']:
            solutions.append({
                'type': 'debugging',
                'description': 'Investigate and fix memory leaks in application code',
                'implementation': 'high',
                'expected_improvement': 'Significant reduction in memory usage'
            })
        
        if patterns['high_peak_usage']:
            solutions.append({
                'type': 'optimization',
                'description': 'Implement memory pooling and object reuse',
                'implementation': 'medium',
                'expected_improvement': '20-40% reduction in peak usage'
            })
        
        if patterns['spikes']:
            solutions.append({
                'type': 'monitoring',
                'description': 'Add memory spike detection and alerting',
                'implementation': 'low',
                'expected_improvement': 'Better visibility into memory issues'
            })
        
        # Add general memory optimization
        solutions.append({
            'type': 'optimization',
            'description': 'Implement garbage collection tuning and memory limits',
            'implementation': 'medium',
            'expected_improvement': '10-30% improvement'
        })
        
        return solutions

Performance Optimization Techniques

1. Caching Strategies

class PerformanceOptimizer:
    def __init__(self):
        self.optimization_strategies = {
            'caching': self._optimize_caching,
            'batching': self._optimize_batching,
            'connection_pooling': self._optimize_connections,
            'model_optimization': self._optimize_model
        }
    
    async def optimize_performance(self, issues: Dict) -> Dict:
        """Apply performance optimizations based on identified issues"""
        optimization_results = {}
        
        for strategy_name, strategy_func in self.optimization_strategies.items():
            try:
                result = await strategy_func(issues)
                if result:
                    optimization_results[strategy_name] = result
            except Exception as e:
                print(f"Error applying optimization {strategy_name}: {e}")
        
        return optimization_results
    
    async def _optimize_caching(self, issues: Dict) -> Dict:
        """Optimize caching for better performance"""
        cache_config = {
            'enable_response_caching': True,
            'cache_ttl': 3600,  # 1 hour
            'cache_size': '1GB',
            'cache_strategy': 'lru',
            'cache_warmup': True
        }
        
        # Implement caching based on issues
        if 'response_time' in issues:
            cache_config['cache_ttl'] = 7200  # 2 hours for response time issues
        
        return {
            'config': cache_config,
            'estimated_improvement': '20-40% response time reduction',
            'implementation_effort': 'medium'
        }
    
    async def _optimize_batching(self, issues: Dict) -> Dict:
        """Optimize request batching"""
        batching_config = {
            'enable_batching': True,
            'batch_size': 10,
            'batch_timeout': 100,  # milliseconds
            'batch_strategy': 'time_and_size'
        }
        
        # Adjust based on throughput issues
        if 'throughput' in issues:
            batching_config['batch_size'] = 20
            batching_config['batch_timeout'] = 50
        
        return {
            'config': batching_config,
            'estimated_improvement': '30-50% throughput increase',
            'implementation_effort': 'medium'
        }

Best Practices

  1. Monitor Continuously: Implement comprehensive performance monitoring
  2. Set Baselines: Establish performance baselines for comparison
  3. Use Systematic Approach: Follow structured troubleshooting methodology
  4. Document Issues: Keep detailed records of performance issues and solutions
  5. Test Solutions: Verify fixes before deploying to production
  6. Optimize Proactively: Implement optimizations before issues become critical
  7. Use Multiple Tools: Combine different monitoring and analysis tools
  8. Learn from Incidents: Document lessons learned and improve processes

Conclusion

Effective AI model performance troubleshooting requires a systematic approach, comprehensive monitoring, and targeted optimization strategies. By implementing these techniques and best practices, you can identify and resolve performance issues quickly and efficiently.

The key is to establish good monitoring practices, follow systematic troubleshooting procedures, and implement appropriate optimizations based on the specific issues identified.

← Back to Learning Center