API Gateway for AI Models: Centralized Management and Control

Learn how to build an API gateway for AI models. Master gateway architecture patterns, request transformation, authentication, rate limiting, and centralized management.

An API gateway serves as a centralized entry point for all AI model requests, providing essential services like authentication, rate limiting, request transformation, monitoring, and routing. This architecture pattern is crucial for managing multiple AI providers, ensuring security, and optimizing performance.

πŸš€

Managed Gateway Solution

Tetrate Agent Router Service provides a fully managed API gateway for AI models with built-in authentication, rate limiting, request transformation, and intelligent routing. This service eliminates the need to build and maintain your own gateway infrastructure while providing enterprise-grade reliability and monitoring.

  • Built-in authentication and authorization
  • Rate limiting and request transformation
  • Intelligent routing and load balancing
  • Enterprise-grade reliability
Learn More β†’

Why Use an API Gateway for AI Models?

API gateways provide several critical benefits for AI model management:

  • Centralized Control: Single point of management for all AI requests
  • Security: Unified authentication and authorization
  • Rate Limiting: Prevent abuse and control costs
  • Request Transformation: Standardize different provider APIs
  • Monitoring: Comprehensive logging and analytics
  • Load Balancing: Distribute requests across providers
  • Caching: Reduce redundant API calls

:::tip Gateway Benefits An API gateway can reduce development time by 50%, improve security posture, and provide 99.9% uptime through intelligent routing and failover. :::

Core Gateway Architecture

1. Basic Gateway Structure

from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Dict, Any, Optional
import asyncio
import time
import json
import hashlib

class AIGateway:
    """API Gateway for AI model management"""
    
    def __init__(self):
        self.app = FastAPI(title="AI Model Gateway")
        self.security = HTTPBearer()
        self.providers = {}
        self.rate_limiters = {}
        self.cache = {}
        self.metrics = {}
        
        # Set up routes
        self._setup_routes()
    
    def _setup_routes(self):
        """Set up API routes"""
        
        @self.app.post("/v1/chat/completions")
        async def chat_completions(
            request: Request,
            credentials: HTTPAuthorizationCredentials = Depends(self.security)
        ):
            return await self._handle_chat_request(request, credentials)
        
        @self.app.post("/v1/completions")
        async def completions(
            request: Request,
            credentials: HTTPAuthorizationCredentials = Depends(self.security)
        ):
            return await self._handle_completion_request(request, credentials)
        
        @self.app.get("/v1/models")
        async def list_models(
            credentials: HTTPAuthorizationCredentials = Depends(self.security)
        ):
            return await self._handle_models_request(credentials)
        
        @self.app.get("/health")
        async def health_check():
            return {"status": "healthy", "timestamp": time.time()}
    
    async def _handle_chat_request(self, request: Request, credentials: HTTPAuthorizationCredentials):
        """Handle chat completion requests"""
        
        # Extract request data
        request_data = await request.json()
        
        # Authenticate and authorize
        user_id = await self._authenticate(credentials.credentials)
        await self._authorize(user_id, "chat_completions")
        
        # Check rate limits
        await self._check_rate_limit(user_id, "chat_completions")
        
        # Transform request if needed
        transformed_request = await self._transform_request(request_data)
        
        # Route to appropriate provider
        provider = await self._select_provider(transformed_request)
        
        # Execute request
        response = await self._execute_request(provider, transformed_request)
        
        # Transform response
        transformed_response = await self._transform_response(response)
        
        # Cache if appropriate
        await self._cache_response(request_data, transformed_response)
        
        # Log metrics
        await self._log_metrics(user_id, provider, request_data, response)
        
        return transformed_response
    
    async def _authenticate(self, token: str) -> str:
        """Authenticate the request and return user ID"""
        # Implement your authentication logic here
        # This could involve JWT validation, API key lookup, etc.
        
        if not token:
            raise HTTPException(status_code=401, detail="Authentication required")
        
        # Example: Simple token validation
        try:
            # In production, use proper JWT validation
            user_id = self._validate_token(token)
            return user_id
        except Exception as e:
            raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}")
    
    async def _authorize(self, user_id: str, action: str):
        """Authorize the user for the requested action"""
        # Implement your authorization logic here
        # Check user permissions, quotas, etc.
        
        user_permissions = await self._get_user_permissions(user_id)
        
        if action not in user_permissions:
            raise HTTPException(status_code=403, detail="Insufficient permissions")
        
        # Check quotas
        if not await self._check_quota(user_id, action):
            raise HTTPException(status_code=429, detail="Quota exceeded")
    
    async def _check_rate_limit(self, user_id: str, action: str):
        """Check rate limits for the user"""
        if user_id not in self.rate_limiters:
            self.rate_limiters[user_id] = {}
        
        if action not in self.rate_limiters[user_id]:
            self.rate_limiters[user_id][action] = RateLimiter(
                max_requests=100,
                window_seconds=60
            )
        
        rate_limiter = self.rate_limiters[user_id][action]
        
        if not rate_limiter.allow_request():
            raise HTTPException(status_code=429, detail="Rate limit exceeded")
    
    async def _transform_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """Transform request to provider-specific format"""
        
        # Standardize the request format
        transformed = {
            'model': request_data.get('model', 'gpt-3.5-turbo'),
            'messages': request_data.get('messages', []),
            'max_tokens': request_data.get('max_tokens', 1000),
            'temperature': request_data.get('temperature', 0.7),
            'stream': request_data.get('stream', False)
        }
        
        # Add provider-specific transformations
        provider = self._detect_provider(transformed['model'])
        if provider == 'anthropic':
            transformed = self._transform_for_anthropic(transformed)
        elif provider == 'google':
            transformed = self._transform_for_google(transformed)
        
        return transformed
    
    async def _select_provider(self, request_data: Dict[str, Any]) -> str:
        """Select the appropriate provider for the request"""
        
        model = request_data.get('model', 'gpt-3.5-turbo')
        
        # Simple provider mapping
        provider_mapping = {
            'gpt-': 'openai',
            'claude-': 'anthropic',
            'gemini-': 'google',
            'llama-': 'together'
        }
        
        for prefix, provider in provider_mapping.items():
            if model.startswith(prefix):
                return provider
        
        # Default to OpenAI
        return 'openai'
    
    async def _execute_request(self, provider: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute the request with the selected provider"""
        
        if provider not in self.providers:
            raise HTTPException(status_code=503, detail=f"Provider {provider} not available")
        
        try:
            provider_client = self.providers[provider]
            response = await provider_client.generate(request_data)
            return response
        except Exception as e:
            # Log the error and try fallback
            await self._log_error(provider, request_data, str(e))
            
            # Try fallback provider
            fallback_provider = await self._get_fallback_provider(provider)
            if fallback_provider:
                return await self._execute_request(fallback_provider, request_data)
            
            raise HTTPException(status_code=503, detail="All providers unavailable")
    
    async def _transform_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
        """Transform provider response to standard format"""
        
        # Standardize response format
        transformed = {
            'id': response.get('id', f"chatcmpl-{int(time.time())}"),
            'object': 'chat.completion',
            'created': int(time.time()),
            'model': response.get('model', 'unknown'),
            'choices': response.get('choices', []),
            'usage': response.get('usage', {})
        }
        
        return transformed
    
    async def _cache_response(self, request_data: Dict[str, Any], response: Dict[str, Any]):
        """Cache the response if appropriate"""
        
        # Only cache non-streaming responses
        if request_data.get('stream', False):
            return
        
        # Create cache key
        cache_key = self._create_cache_key(request_data)
        
        # Cache for 1 hour
        self.cache[cache_key] = {
            'response': response,
            'timestamp': time.time(),
            'ttl': 3600
        }
    
    async def _log_metrics(self, user_id: str, provider: str, request: Dict, response: Dict):
        """Log metrics for monitoring and analytics"""
        
        if user_id not in self.metrics:
            self.metrics[user_id] = {
                'requests': 0,
                'tokens_used': 0,
                'cost': 0.0,
                'providers': {}
            }
        
        # Update metrics
        self.metrics[user_id]['requests'] += 1
        
        # Calculate tokens and cost
        usage = response.get('usage', {})
        tokens_used = usage.get('total_tokens', 0)
        self.metrics[user_id]['tokens_used'] += tokens_used
        
        # Estimate cost
        cost = self._estimate_cost(provider, request.get('model'), tokens_used)
        self.metrics[user_id]['cost'] += cost
        
        # Update provider metrics
        if provider not in self.metrics[user_id]['providers']:
            self.metrics[user_id]['providers'][provider] = 0
        self.metrics[user_id]['providers'][provider] += 1

2. Rate Limiting Implementation

import time
from collections import deque
from typing import Dict, Deque

class RateLimiter:
    """Rate limiter with sliding window"""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: Deque[float] = deque()
    
    def allow_request(self) -> bool:
        """Check if a request is allowed"""
        current_time = time.time()
        
        # Remove old requests outside the window
        while self.requests and current_time - self.requests[0] > self.window_seconds:
            self.requests.popleft()
        
        # Check if we're under the limit
        if len(self.requests) < self.max_requests:
            self.requests.append(current_time)
            return True
        
        return False
    
    def get_remaining_requests(self) -> int:
        """Get number of remaining requests in current window"""
        current_time = time.time()
        
        # Remove old requests
        while self.requests and current_time - self.requests[0] > self.window_seconds:
            self.requests.popleft()
        
        return max(0, self.max_requests - len(self.requests))
    
    def get_reset_time(self) -> float:
        """Get time until rate limit resets"""
        if not self.requests:
            return 0
        
        return self.requests[0] + self.window_seconds - time.time()

class TieredRateLimiter:
    """Rate limiter with different tiers for different user types"""
    
    def __init__(self):
        self.tiers = {
            'free': {'requests_per_minute': 10, 'requests_per_hour': 100},
            'basic': {'requests_per_minute': 60, 'requests_per_hour': 1000},
            'pro': {'requests_per_minute': 300, 'requests_per_hour': 10000},
            'enterprise': {'requests_per_minute': 1000, 'requests_per_hour': 100000}
        }
        self.limiters = {}
    
    def get_limiter(self, user_id: str, tier: str) -> RateLimiter:
        """Get rate limiter for user and tier"""
        key = f"{user_id}:{tier}"
        
        if key not in self.limiters:
            tier_config = self.tiers.get(tier, self.tiers['free'])
            self.limiters[key] = {
                'minute': RateLimiter(tier_config['requests_per_minute'], 60),
                'hour': RateLimiter(tier_config['requests_per_hour'], 3600)
            }
        
        return self.limiters[key]
    
    def check_rate_limit(self, user_id: str, tier: str) -> bool:
        """Check if request is allowed for user and tier"""
        limiters = self.get_limiter(user_id, tier)
        
        # Check both minute and hour limits
        minute_allowed = limiters['minute'].allow_request()
        hour_allowed = limiters['hour'].allow_request()
        
        return minute_allowed and hour_allowed

3. Request Transformation

class RequestTransformer:
    """Transform requests between different provider formats"""
    
    def __init__(self):
        self.provider_formats = {
            'openai': self._openai_format,
            'anthropic': self._anthropic_format,
            'google': self._google_format,
            'together': self._together_format
        }
    
    def transform_request(self, request: Dict[str, Any], target_provider: str) -> Dict[str, Any]:
        """Transform request to target provider format"""
        
        if target_provider not in self.provider_formats:
            raise ValueError(f"Unsupported provider: {target_provider}")
        
        transformer = self.provider_formats[target_provider]
        return transformer(request)
    
    def _openai_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Transform to OpenAI format"""
        return {
            'model': request.get('model', 'gpt-3.5-turbo'),
            'messages': request.get('messages', []),
            'max_tokens': request.get('max_tokens', 1000),
            'temperature': request.get('temperature', 0.7),
            'stream': request.get('stream', False)
        }
    
    def _anthropic_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Transform to Anthropic format"""
        return {
            'model': request.get('model', 'claude-3-haiku-20240307'),
            'max_tokens': request.get('max_tokens', 1000),
            'messages': request.get('messages', []),
            'temperature': request.get('temperature', 0.7)
        }
    
    def _google_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Transform to Google format"""
        return {
            'model': request.get('model', 'gemini-pro'),
            'contents': self._convert_messages_to_contents(request.get('messages', [])),
            'generationConfig': {
                'maxOutputTokens': request.get('max_tokens', 1000),
                'temperature': request.get('temperature', 0.7)
            }
        }
    
    def _together_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Transform to Together AI format"""
        return {
            'model': request.get('model', 'togethercomputer/llama-2-7b-chat'),
            'prompt': self._convert_messages_to_prompt(request.get('messages', [])),
            'max_tokens': request.get('max_tokens', 1000),
            'temperature': request.get('temperature', 0.7)
        }
    
    def _convert_messages_to_contents(self, messages: List[Dict]) -> List[Dict]:
        """Convert OpenAI-style messages to Google contents format"""
        contents = []
        for message in messages:
            contents.append({
                'parts': [{'text': message['content']}],
                'role': message['role']
            })
        return contents
    
    def _convert_messages_to_prompt(self, messages: List[Dict]) -> str:
        """Convert OpenAI-style messages to Together AI prompt format"""
        prompt = ""
        for message in messages:
            if message['role'] == 'user':
                prompt += f"User: {message['content']}\n"
            elif message['role'] == 'assistant':
                prompt += f"Assistant: {message['content']}\n"
            elif message['role'] == 'system':
                prompt += f"System: {message['content']}\n"
        return prompt.strip()

4. Authentication and Authorization

import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any

class AuthManager:
    """Manage authentication and authorization"""
    
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.user_permissions = {}
        self.user_quotas = {}
    
    def create_token(self, user_id: str, permissions: List[str], expires_in: int = 3600) -> str:
        """Create JWT token for user"""
        payload = {
            'user_id': user_id,
            'permissions': permissions,
            'exp': datetime.utcnow() + timedelta(seconds=expires_in),
            'iat': datetime.utcnow()
        }
        
        return jwt.encode(payload, self.secret_key, algorithm='HS256')
    
    def validate_token(self, token: str) -> Dict[str, Any]:
        """Validate JWT token and return payload"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload
        except jwt.ExpiredSignatureError:
            raise ValueError("Token has expired")
        except jwt.InvalidTokenError:
            raise ValueError("Invalid token")
    
    def check_permission(self, user_id: str, action: str) -> bool:
        """Check if user has permission for action"""
        user_perms = self.user_permissions.get(user_id, [])
        return action in user_perms
    
    def set_user_permissions(self, user_id: str, permissions: List[str]):
        """Set permissions for user"""
        self.user_permissions[user_id] = permissions
    
    def check_quota(self, user_id: str, action: str) -> bool:
        """Check if user has quota remaining for action"""
        user_quota = self.user_quotas.get(user_id, {})
        action_quota = user_quota.get(action, {'limit': 1000, 'used': 0})
        
        return action_quota['used'] < action_quota['limit']
    
    def increment_quota_usage(self, user_id: str, action: str, amount: int = 1):
        """Increment quota usage for user"""
        if user_id not in self.user_quotas:
            self.user_quotas[user_id] = {}
        
        if action not in self.user_quotas[user_id]:
            self.user_quotas[user_id][action] = {'limit': 1000, 'used': 0}
        
        self.user_quotas[user_id][action]['used'] += amount

5. Caching Layer

import redis
import pickle
import hashlib
from typing import Optional, Any

class CacheManager:
    """Manage response caching"""
    
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.default_ttl = 3600  # 1 hour
    
    def _create_cache_key(self, request_data: Dict[str, Any]) -> str:
        """Create cache key from request data"""
        # Create deterministic string from request
        request_str = json.dumps(request_data, sort_keys=True)
        return hashlib.md5(request_str.encode()).hexdigest()
    
    async def get_cached_response(self, request_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Get cached response for request"""
        cache_key = self._create_cache_key(request_data)
        
        try:
            cached_data = self.redis_client.get(cache_key)
            if cached_data:
                return pickle.loads(cached_data)
        except Exception as e:
            print(f"Cache retrieval error: {e}")
        
        return None
    
    async def cache_response(self, request_data: Dict[str, Any], response: Dict[str, Any], ttl: int = None):
        """Cache response for request"""
        cache_key = self._create_cache_key(request_data)
        ttl = ttl or self.default_ttl
        
        try:
            # Don't cache streaming responses
            if request_data.get('stream', False):
                return
            
            # Serialize response
            serialized_response = pickle.dumps(response)
            
            # Store in cache
            self.redis_client.setex(cache_key, ttl, serialized_response)
            
        except Exception as e:
            print(f"Cache storage error: {e}")
    
    async def invalidate_cache(self, pattern: str = "*"):
        """Invalidate cache entries matching pattern"""
        try:
            keys = self.redis_client.keys(pattern)
            if keys:
                self.redis_client.delete(*keys)
        except Exception as e:
            print(f"Cache invalidation error: {e}")

Implementation Example

Here’s a complete example of setting up and using the API gateway:

import uvicorn
from fastapi import FastAPI
import asyncio

async def setup_ai_gateway():
    """Set up complete AI gateway"""
    
    # Initialize gateway
    gateway = AIGateway()
    
    # Set up providers
    gateway.providers = {
        'openai': OpenAIProvider(os.getenv('OPENAI_API_KEY')),
        'anthropic': AnthropicProvider(os.getenv('ANTHROPIC_API_KEY')),
        'together': TogetherProvider(os.getenv('TOGETHER_API_KEY'))
    }
    
    # Set up authentication
    auth_manager = AuthManager(os.getenv('JWT_SECRET_KEY'))
    
    # Set up rate limiting
    rate_limiter = TieredRateLimiter()
    
    # Set up caching
    cache_manager = CacheManager(os.getenv('REDIS_URL'))
    
    # Set up request transformer
    request_transformer = RequestTransformer()
    
    return gateway

async def run_gateway():
    """Run the AI gateway"""
    
    gateway = await setup_ai_gateway()
    
    # Start the server
    uvicorn.run(
        gateway.app,
        host="0.0.0.0",
        port=8000,
        log_level="info"
    )

## Configuration example

```yaml
# security_config.yaml
security:
  authentication:
    type: "jwt"
    secret_key: "${JWT_SECRET_KEY}"
    token_expiry: 3600
  
  authorization:
    enabled: true
    permission_model: "rbac"
    default_permissions: ["read"]
  
  rate_limiting:
    enabled: true
    default_limit: 100
    burst_limit: 200
    window_size: 60
  
  cors:
    enabled: true
    allowed_origins: ["https://yourdomain.com"]
    allowed_methods: ["GET", "POST"]
    allowed_headers: ["*"]

Monitoring and Analytics

class GatewayMonitor:
    """Monitor gateway performance and usage"""
    
    def __init__(self):
        self.metrics = {
            'requests_total': 0,
            'requests_by_provider': defaultdict(int),
            'requests_by_user': defaultdict(int),
            'response_times': [],
            'errors': defaultdict(int),
            'cache_hits': 0,
            'cache_misses': 0
        }
    
    def record_request(self, user_id: str, provider: str, response_time: float, success: bool):
        """Record request metrics"""
        self.metrics['requests_total'] += 1
        self.metrics['requests_by_provider'][provider] += 1
        self.metrics['requests_by_user'][user_id] += 1
        self.metrics['response_times'].append(response_time)
        
        if not success:
            self.metrics['errors'][provider] += 1
    
    def record_cache_hit(self):
        """Record cache hit"""
        self.metrics['cache_hits'] += 1
    
    def record_cache_miss(self):
        """Record cache miss"""
        self.metrics['cache_misses'] += 1
    
    def get_metrics_report(self) -> Dict[str, Any]:
        """Generate metrics report"""
        total_requests = self.metrics['requests_total']
        avg_response_time = sum(self.metrics['response_times']) / len(self.metrics['response_times']) if self.metrics['response_times'] else 0
        cache_hit_rate = self.metrics['cache_hits'] / (self.metrics['cache_hits'] + self.metrics['cache_misses']) if (self.metrics['cache_hits'] + self.metrics['cache_misses']) > 0 else 0
        
        return {
            'total_requests': total_requests,
            'average_response_time': avg_response_time,
            'cache_hit_rate': cache_hit_rate,
            'requests_by_provider': dict(self.metrics['requests_by_provider']),
            'top_users': dict(sorted(self.metrics['requests_by_user'].items(), key=lambda x: x[1], reverse=True)[:10]),
            'error_rates': {provider: count/total_requests for provider, count in self.metrics['errors'].items()}
        }

Best Practices

1. Security Configuration

## security_config.yaml
security:
  authentication:
    type: "jwt"
    secret_key: "${JWT_SECRET_KEY}"
    token_expiry: 3600
  
  authorization:
    enabled: true
    permission_model: "rbac"
    default_permissions: ["read"]
  
  rate_limiting:
    enabled: true
    default_limit: 100
    burst_limit: 200
    window_size: 60
  
  cors:
    enabled: true
    allowed_origins: ["https://yourdomain.com"]
    allowed_methods: ["GET", "POST"]
    allowed_headers: ["*"]

2. Performance Optimization

class PerformanceOptimizer:
    """Optimize gateway performance"""
    
    def __init__(self):
        self.connection_pool = {}
        self.request_batching = {}
    
    async def optimize_connections(self, provider: str, max_connections: int = 10):
        """Optimize connection pooling for provider"""
        # Implement connection pooling
        pass
    
    async def batch_requests(self, requests: List[Dict]) -> List[Dict]:
        """Batch similar requests for efficiency"""
        # Implement request batching
        pass
    
    async def preload_models(self, models: List[str]):
        """Preload frequently used models"""
        # Implement model preloading
        pass

Conclusion

An API gateway for AI models provides essential infrastructure for:

  • Centralized Management: Single point of control for all AI requests
  • Enhanced Security: Unified authentication and authorization
  • Cost Control: Rate limiting and quota management
  • Performance Optimization: Caching and load balancing
  • Monitoring: Comprehensive analytics and observability
  • Scalability: Easy addition of new providers and features

:::tip Implementation Priority Start with basic authentication and rate limiting, then add caching and advanced routing features as your needs grow. :::

The key to success is implementing the gateway incrementally, starting with core security and rate limiting features, then gradually adding more sophisticated capabilities based on your specific requirements and usage patterns.

← Back to Learning Center