An API gateway serves as a centralized entry point for all AI model requests, providing essential services like authentication, rate limiting, request transformation, monitoring, and routing. This architecture pattern is crucial for managing multiple AI providers, ensuring security, and optimizing performance.
Managed Gateway Solution
Tetrate Agent Router Service provides a fully managed API gateway for AI models with built-in authentication, rate limiting, request transformation, and intelligent routing. This service eliminates the need to build and maintain your own gateway infrastructure while providing enterprise-grade reliability and monitoring.
- Built-in authentication and authorization
- Rate limiting and request transformation
- Intelligent routing and load balancing
- Enterprise-grade reliability
Why Use an API Gateway for AI Models?
API gateways provide several critical benefits for AI model management:
- Centralized Control: Single point of management for all AI requests
- Security: Unified authentication and authorization
- Rate Limiting: Prevent abuse and control costs
- Request Transformation: Standardize different provider APIs
- Monitoring: Comprehensive logging and analytics
- Load Balancing: Distribute requests across providers
- Caching: Reduce redundant API calls
:::tip Gateway Benefits An API gateway can reduce development time by 50%, improve security posture, and provide 99.9% uptime through intelligent routing and failover. :::
Core Gateway Architecture
1. Basic Gateway Structure
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Dict, Any, Optional
import asyncio
import time
import json
import hashlib
class AIGateway:
"""API Gateway for AI model management"""
def __init__(self):
self.app = FastAPI(title="AI Model Gateway")
self.security = HTTPBearer()
self.providers = {}
self.rate_limiters = {}
self.cache = {}
self.metrics = {}
# Set up routes
self._setup_routes()
def _setup_routes(self):
"""Set up API routes"""
@self.app.post("/v1/chat/completions")
async def chat_completions(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(self.security)
):
return await self._handle_chat_request(request, credentials)
@self.app.post("/v1/completions")
async def completions(
request: Request,
credentials: HTTPAuthorizationCredentials = Depends(self.security)
):
return await self._handle_completion_request(request, credentials)
@self.app.get("/v1/models")
async def list_models(
credentials: HTTPAuthorizationCredentials = Depends(self.security)
):
return await self._handle_models_request(credentials)
@self.app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": time.time()}
async def _handle_chat_request(self, request: Request, credentials: HTTPAuthorizationCredentials):
"""Handle chat completion requests"""
# Extract request data
request_data = await request.json()
# Authenticate and authorize
user_id = await self._authenticate(credentials.credentials)
await self._authorize(user_id, "chat_completions")
# Check rate limits
await self._check_rate_limit(user_id, "chat_completions")
# Transform request if needed
transformed_request = await self._transform_request(request_data)
# Route to appropriate provider
provider = await self._select_provider(transformed_request)
# Execute request
response = await self._execute_request(provider, transformed_request)
# Transform response
transformed_response = await self._transform_response(response)
# Cache if appropriate
await self._cache_response(request_data, transformed_response)
# Log metrics
await self._log_metrics(user_id, provider, request_data, response)
return transformed_response
async def _authenticate(self, token: str) -> str:
"""Authenticate the request and return user ID"""
# Implement your authentication logic here
# This could involve JWT validation, API key lookup, etc.
if not token:
raise HTTPException(status_code=401, detail="Authentication required")
# Example: Simple token validation
try:
# In production, use proper JWT validation
user_id = self._validate_token(token)
return user_id
except Exception as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {str(e)}")
async def _authorize(self, user_id: str, action: str):
"""Authorize the user for the requested action"""
# Implement your authorization logic here
# Check user permissions, quotas, etc.
user_permissions = await self._get_user_permissions(user_id)
if action not in user_permissions:
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Check quotas
if not await self._check_quota(user_id, action):
raise HTTPException(status_code=429, detail="Quota exceeded")
async def _check_rate_limit(self, user_id: str, action: str):
"""Check rate limits for the user"""
if user_id not in self.rate_limiters:
self.rate_limiters[user_id] = {}
if action not in self.rate_limiters[user_id]:
self.rate_limiters[user_id][action] = RateLimiter(
max_requests=100,
window_seconds=60
)
rate_limiter = self.rate_limiters[user_id][action]
if not rate_limiter.allow_request():
raise HTTPException(status_code=429, detail="Rate limit exceeded")
async def _transform_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Transform request to provider-specific format"""
# Standardize the request format
transformed = {
'model': request_data.get('model', 'gpt-3.5-turbo'),
'messages': request_data.get('messages', []),
'max_tokens': request_data.get('max_tokens', 1000),
'temperature': request_data.get('temperature', 0.7),
'stream': request_data.get('stream', False)
}
# Add provider-specific transformations
provider = self._detect_provider(transformed['model'])
if provider == 'anthropic':
transformed = self._transform_for_anthropic(transformed)
elif provider == 'google':
transformed = self._transform_for_google(transformed)
return transformed
async def _select_provider(self, request_data: Dict[str, Any]) -> str:
"""Select the appropriate provider for the request"""
model = request_data.get('model', 'gpt-3.5-turbo')
# Simple provider mapping
provider_mapping = {
'gpt-': 'openai',
'claude-': 'anthropic',
'gemini-': 'google',
'llama-': 'together'
}
for prefix, provider in provider_mapping.items():
if model.startswith(prefix):
return provider
# Default to OpenAI
return 'openai'
async def _execute_request(self, provider: str, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute the request with the selected provider"""
if provider not in self.providers:
raise HTTPException(status_code=503, detail=f"Provider {provider} not available")
try:
provider_client = self.providers[provider]
response = await provider_client.generate(request_data)
return response
except Exception as e:
# Log the error and try fallback
await self._log_error(provider, request_data, str(e))
# Try fallback provider
fallback_provider = await self._get_fallback_provider(provider)
if fallback_provider:
return await self._execute_request(fallback_provider, request_data)
raise HTTPException(status_code=503, detail="All providers unavailable")
async def _transform_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
"""Transform provider response to standard format"""
# Standardize response format
transformed = {
'id': response.get('id', f"chatcmpl-{int(time.time())}"),
'object': 'chat.completion',
'created': int(time.time()),
'model': response.get('model', 'unknown'),
'choices': response.get('choices', []),
'usage': response.get('usage', {})
}
return transformed
async def _cache_response(self, request_data: Dict[str, Any], response: Dict[str, Any]):
"""Cache the response if appropriate"""
# Only cache non-streaming responses
if request_data.get('stream', False):
return
# Create cache key
cache_key = self._create_cache_key(request_data)
# Cache for 1 hour
self.cache[cache_key] = {
'response': response,
'timestamp': time.time(),
'ttl': 3600
}
async def _log_metrics(self, user_id: str, provider: str, request: Dict, response: Dict):
"""Log metrics for monitoring and analytics"""
if user_id not in self.metrics:
self.metrics[user_id] = {
'requests': 0,
'tokens_used': 0,
'cost': 0.0,
'providers': {}
}
# Update metrics
self.metrics[user_id]['requests'] += 1
# Calculate tokens and cost
usage = response.get('usage', {})
tokens_used = usage.get('total_tokens', 0)
self.metrics[user_id]['tokens_used'] += tokens_used
# Estimate cost
cost = self._estimate_cost(provider, request.get('model'), tokens_used)
self.metrics[user_id]['cost'] += cost
# Update provider metrics
if provider not in self.metrics[user_id]['providers']:
self.metrics[user_id]['providers'][provider] = 0
self.metrics[user_id]['providers'][provider] += 1
2. Rate Limiting Implementation
import time
from collections import deque
from typing import Dict, Deque
class RateLimiter:
"""Rate limiter with sliding window"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Deque[float] = deque()
def allow_request(self) -> bool:
"""Check if a request is allowed"""
current_time = time.time()
# Remove old requests outside the window
while self.requests and current_time - self.requests[0] > self.window_seconds:
self.requests.popleft()
# Check if we're under the limit
if len(self.requests) < self.max_requests:
self.requests.append(current_time)
return True
return False
def get_remaining_requests(self) -> int:
"""Get number of remaining requests in current window"""
current_time = time.time()
# Remove old requests
while self.requests and current_time - self.requests[0] > self.window_seconds:
self.requests.popleft()
return max(0, self.max_requests - len(self.requests))
def get_reset_time(self) -> float:
"""Get time until rate limit resets"""
if not self.requests:
return 0
return self.requests[0] + self.window_seconds - time.time()
class TieredRateLimiter:
"""Rate limiter with different tiers for different user types"""
def __init__(self):
self.tiers = {
'free': {'requests_per_minute': 10, 'requests_per_hour': 100},
'basic': {'requests_per_minute': 60, 'requests_per_hour': 1000},
'pro': {'requests_per_minute': 300, 'requests_per_hour': 10000},
'enterprise': {'requests_per_minute': 1000, 'requests_per_hour': 100000}
}
self.limiters = {}
def get_limiter(self, user_id: str, tier: str) -> RateLimiter:
"""Get rate limiter for user and tier"""
key = f"{user_id}:{tier}"
if key not in self.limiters:
tier_config = self.tiers.get(tier, self.tiers['free'])
self.limiters[key] = {
'minute': RateLimiter(tier_config['requests_per_minute'], 60),
'hour': RateLimiter(tier_config['requests_per_hour'], 3600)
}
return self.limiters[key]
def check_rate_limit(self, user_id: str, tier: str) -> bool:
"""Check if request is allowed for user and tier"""
limiters = self.get_limiter(user_id, tier)
# Check both minute and hour limits
minute_allowed = limiters['minute'].allow_request()
hour_allowed = limiters['hour'].allow_request()
return minute_allowed and hour_allowed
3. Request Transformation
class RequestTransformer:
"""Transform requests between different provider formats"""
def __init__(self):
self.provider_formats = {
'openai': self._openai_format,
'anthropic': self._anthropic_format,
'google': self._google_format,
'together': self._together_format
}
def transform_request(self, request: Dict[str, Any], target_provider: str) -> Dict[str, Any]:
"""Transform request to target provider format"""
if target_provider not in self.provider_formats:
raise ValueError(f"Unsupported provider: {target_provider}")
transformer = self.provider_formats[target_provider]
return transformer(request)
def _openai_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Transform to OpenAI format"""
return {
'model': request.get('model', 'gpt-3.5-turbo'),
'messages': request.get('messages', []),
'max_tokens': request.get('max_tokens', 1000),
'temperature': request.get('temperature', 0.7),
'stream': request.get('stream', False)
}
def _anthropic_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Transform to Anthropic format"""
return {
'model': request.get('model', 'claude-3-haiku-20240307'),
'max_tokens': request.get('max_tokens', 1000),
'messages': request.get('messages', []),
'temperature': request.get('temperature', 0.7)
}
def _google_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Transform to Google format"""
return {
'model': request.get('model', 'gemini-pro'),
'contents': self._convert_messages_to_contents(request.get('messages', [])),
'generationConfig': {
'maxOutputTokens': request.get('max_tokens', 1000),
'temperature': request.get('temperature', 0.7)
}
}
def _together_format(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""Transform to Together AI format"""
return {
'model': request.get('model', 'togethercomputer/llama-2-7b-chat'),
'prompt': self._convert_messages_to_prompt(request.get('messages', [])),
'max_tokens': request.get('max_tokens', 1000),
'temperature': request.get('temperature', 0.7)
}
def _convert_messages_to_contents(self, messages: List[Dict]) -> List[Dict]:
"""Convert OpenAI-style messages to Google contents format"""
contents = []
for message in messages:
contents.append({
'parts': [{'text': message['content']}],
'role': message['role']
})
return contents
def _convert_messages_to_prompt(self, messages: List[Dict]) -> str:
"""Convert OpenAI-style messages to Together AI prompt format"""
prompt = ""
for message in messages:
if message['role'] == 'user':
prompt += f"User: {message['content']}\n"
elif message['role'] == 'assistant':
prompt += f"Assistant: {message['content']}\n"
elif message['role'] == 'system':
prompt += f"System: {message['content']}\n"
return prompt.strip()
4. Authentication and Authorization
import jwt
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
class AuthManager:
"""Manage authentication and authorization"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.user_permissions = {}
self.user_quotas = {}
def create_token(self, user_id: str, permissions: List[str], expires_in: int = 3600) -> str:
"""Create JWT token for user"""
payload = {
'user_id': user_id,
'permissions': permissions,
'exp': datetime.utcnow() + timedelta(seconds=expires_in),
'iat': datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm='HS256')
def validate_token(self, token: str) -> Dict[str, Any]:
"""Validate JWT token and return payload"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
raise ValueError("Token has expired")
except jwt.InvalidTokenError:
raise ValueError("Invalid token")
def check_permission(self, user_id: str, action: str) -> bool:
"""Check if user has permission for action"""
user_perms = self.user_permissions.get(user_id, [])
return action in user_perms
def set_user_permissions(self, user_id: str, permissions: List[str]):
"""Set permissions for user"""
self.user_permissions[user_id] = permissions
def check_quota(self, user_id: str, action: str) -> bool:
"""Check if user has quota remaining for action"""
user_quota = self.user_quotas.get(user_id, {})
action_quota = user_quota.get(action, {'limit': 1000, 'used': 0})
return action_quota['used'] < action_quota['limit']
def increment_quota_usage(self, user_id: str, action: str, amount: int = 1):
"""Increment quota usage for user"""
if user_id not in self.user_quotas:
self.user_quotas[user_id] = {}
if action not in self.user_quotas[user_id]:
self.user_quotas[user_id][action] = {'limit': 1000, 'used': 0}
self.user_quotas[user_id][action]['used'] += amount
5. Caching Layer
import redis
import pickle
import hashlib
from typing import Optional, Any
class CacheManager:
"""Manage response caching"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 3600 # 1 hour
def _create_cache_key(self, request_data: Dict[str, Any]) -> str:
"""Create cache key from request data"""
# Create deterministic string from request
request_str = json.dumps(request_data, sort_keys=True)
return hashlib.md5(request_str.encode()).hexdigest()
async def get_cached_response(self, request_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Get cached response for request"""
cache_key = self._create_cache_key(request_data)
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
return pickle.loads(cached_data)
except Exception as e:
print(f"Cache retrieval error: {e}")
return None
async def cache_response(self, request_data: Dict[str, Any], response: Dict[str, Any], ttl: int = None):
"""Cache response for request"""
cache_key = self._create_cache_key(request_data)
ttl = ttl or self.default_ttl
try:
# Don't cache streaming responses
if request_data.get('stream', False):
return
# Serialize response
serialized_response = pickle.dumps(response)
# Store in cache
self.redis_client.setex(cache_key, ttl, serialized_response)
except Exception as e:
print(f"Cache storage error: {e}")
async def invalidate_cache(self, pattern: str = "*"):
"""Invalidate cache entries matching pattern"""
try:
keys = self.redis_client.keys(pattern)
if keys:
self.redis_client.delete(*keys)
except Exception as e:
print(f"Cache invalidation error: {e}")
Implementation Example
Hereβs a complete example of setting up and using the API gateway:
import uvicorn
from fastapi import FastAPI
import asyncio
async def setup_ai_gateway():
"""Set up complete AI gateway"""
# Initialize gateway
gateway = AIGateway()
# Set up providers
gateway.providers = {
'openai': OpenAIProvider(os.getenv('OPENAI_API_KEY')),
'anthropic': AnthropicProvider(os.getenv('ANTHROPIC_API_KEY')),
'together': TogetherProvider(os.getenv('TOGETHER_API_KEY'))
}
# Set up authentication
auth_manager = AuthManager(os.getenv('JWT_SECRET_KEY'))
# Set up rate limiting
rate_limiter = TieredRateLimiter()
# Set up caching
cache_manager = CacheManager(os.getenv('REDIS_URL'))
# Set up request transformer
request_transformer = RequestTransformer()
return gateway
async def run_gateway():
"""Run the AI gateway"""
gateway = await setup_ai_gateway()
# Start the server
uvicorn.run(
gateway.app,
host="0.0.0.0",
port=8000,
log_level="info"
)
## Configuration example
```yaml
# security_config.yaml
security:
authentication:
type: "jwt"
secret_key: "${JWT_SECRET_KEY}"
token_expiry: 3600
authorization:
enabled: true
permission_model: "rbac"
default_permissions: ["read"]
rate_limiting:
enabled: true
default_limit: 100
burst_limit: 200
window_size: 60
cors:
enabled: true
allowed_origins: ["https://yourdomain.com"]
allowed_methods: ["GET", "POST"]
allowed_headers: ["*"]
Monitoring and Analytics
class GatewayMonitor:
"""Monitor gateway performance and usage"""
def __init__(self):
self.metrics = {
'requests_total': 0,
'requests_by_provider': defaultdict(int),
'requests_by_user': defaultdict(int),
'response_times': [],
'errors': defaultdict(int),
'cache_hits': 0,
'cache_misses': 0
}
def record_request(self, user_id: str, provider: str, response_time: float, success: bool):
"""Record request metrics"""
self.metrics['requests_total'] += 1
self.metrics['requests_by_provider'][provider] += 1
self.metrics['requests_by_user'][user_id] += 1
self.metrics['response_times'].append(response_time)
if not success:
self.metrics['errors'][provider] += 1
def record_cache_hit(self):
"""Record cache hit"""
self.metrics['cache_hits'] += 1
def record_cache_miss(self):
"""Record cache miss"""
self.metrics['cache_misses'] += 1
def get_metrics_report(self) -> Dict[str, Any]:
"""Generate metrics report"""
total_requests = self.metrics['requests_total']
avg_response_time = sum(self.metrics['response_times']) / len(self.metrics['response_times']) if self.metrics['response_times'] else 0
cache_hit_rate = self.metrics['cache_hits'] / (self.metrics['cache_hits'] + self.metrics['cache_misses']) if (self.metrics['cache_hits'] + self.metrics['cache_misses']) > 0 else 0
return {
'total_requests': total_requests,
'average_response_time': avg_response_time,
'cache_hit_rate': cache_hit_rate,
'requests_by_provider': dict(self.metrics['requests_by_provider']),
'top_users': dict(sorted(self.metrics['requests_by_user'].items(), key=lambda x: x[1], reverse=True)[:10]),
'error_rates': {provider: count/total_requests for provider, count in self.metrics['errors'].items()}
}
Best Practices
1. Security Configuration
## security_config.yaml
security:
authentication:
type: "jwt"
secret_key: "${JWT_SECRET_KEY}"
token_expiry: 3600
authorization:
enabled: true
permission_model: "rbac"
default_permissions: ["read"]
rate_limiting:
enabled: true
default_limit: 100
burst_limit: 200
window_size: 60
cors:
enabled: true
allowed_origins: ["https://yourdomain.com"]
allowed_methods: ["GET", "POST"]
allowed_headers: ["*"]
2. Performance Optimization
class PerformanceOptimizer:
"""Optimize gateway performance"""
def __init__(self):
self.connection_pool = {}
self.request_batching = {}
async def optimize_connections(self, provider: str, max_connections: int = 10):
"""Optimize connection pooling for provider"""
# Implement connection pooling
pass
async def batch_requests(self, requests: List[Dict]) -> List[Dict]:
"""Batch similar requests for efficiency"""
# Implement request batching
pass
async def preload_models(self, models: List[str]):
"""Preload frequently used models"""
# Implement model preloading
pass
Conclusion
An API gateway for AI models provides essential infrastructure for:
- Centralized Management: Single point of control for all AI requests
- Enhanced Security: Unified authentication and authorization
- Cost Control: Rate limiting and quota management
- Performance Optimization: Caching and load balancing
- Monitoring: Comprehensive analytics and observability
- Scalability: Easy addition of new providers and features
:::tip Implementation Priority Start with basic authentication and rate limiting, then add caching and advanced routing features as your needs grow. :::
The key to success is implementing the gateway incrementally, starting with core security and rate limiting features, then gradually adding more sophisticated capabilities based on your specific requirements and usage patterns.