← 返回文章列表

大模型应用开发教程(九):应用架构与生产部署

2024-07-18·5 分钟阅读

大模型应用开发教程(九):应用架构与生产部署

前言

恭喜你完成前面八章的学习!本章作为教程的收官之作,将聚焦于将 AI 应用从原型推向生产的核心技能——架构设计、性能优化、成本控制和监控告警。

生产级架构设计

整体架构

┌─────────────────────────────────────────────────────────────────┐
│                      生产级 AI 应用架构                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────┐    ┌─────────┐    ┌─────────────────────────────┐ │
│  │ 客户端  │───→│ CDN/WAF │───→│        API Gateway          │ │
│  └─────────┘    └─────────┘    │  - 限流  - 认证  - 路由     │ │
│                                └─────────────────────────────┘ │
│                                           │                     │
│              ┌────────────────────────────┼────────────────┐    │
│              ↓                            ↓                ↓    │
│  ┌───────────────────┐    ┌───────────────────┐  ┌────────────┐│
│  │   应用服务层      │    │    AI 服务层      │  │ 管理服务   ││
│  │ - 业务逻辑        │    │ - LLM 调用        │  │ - 配置管理 ││
│  │ - 用户管理        │    │ - RAG 检索        │  │ - 监控面板 ││
│  │ - 会话管理        │    │ - Agent 执行      │  │ - 日志分析 ││
│  └───────────────────┘    └───────────────────┘  └────────────┘│
│              │                      │                          │
│              └──────────┬───────────┘                          │
│                         ↓                                      │
│  ┌─────────────────────────────────────────────────────────────┐│
│  │                      数据层                                  ││
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐   ││
│  │  │PostgreSQL│ │  Redis   │ │ 向量数据库│ │ 对象存储     │   ││
│  │  │ 业务数据 │ │ 缓存/队列│ │ 知识库   │ │ 文件/模型    │   ││
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────────┘   ││
│  └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘

微服务架构示例

# docker-compose.yml
version: '3.8'

services:
  # API 网关
  gateway:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
    depends_on:
      - api-service
      - ai-service

  # 应用服务
  api-service:
    build: ./api
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/app
      - REDIS_URL=redis://redis:6379
    depends_on:
      - postgres
      - redis

  # AI 服务
  ai-service:
    build: ./ai
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - MODEL_CACHE_DIR=/models
    volumes:
      - model-cache:/models
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  # 向量数据库
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
    volumes:
      - qdrant-storage:/qdrant/storage

  # PostgreSQL
  postgres:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
      - POSTGRES_DB=app
    volumes:
      - postgres-data:/var/lib/postgresql/data

  # Redis
  redis:
    image: redis:7-alpine
    volumes:
      - redis-data:/data

  # 监控
  prometheus:
    image: prom/prometheus
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml

  grafana:
    image: grafana/grafana
    ports:
      - "3001:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin

volumes:
  postgres-data:
  redis-data:
  qdrant-storage:
  model-cache:

性能优化策略

1. 请求优化

import asyncio
from typing import List
from openai import AsyncOpenAI

class BatchProcessor:
    """批量请求处理器"""
    
    def __init__(self, batch_size: int = 10, delay: float = 0.1):
        self.client = AsyncOpenAI()
        self.batch_size = batch_size
        self.delay = delay
    
    async def process_single(self, prompt: str) -> str:
        """处理单个请求"""
        response = await self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": prompt}]
        )
        return response.choices[0].message.content
    
    async def process_batch(self, prompts: List[str]) -> List[str]:
        """批量处理"""
        results = []
        
        for i in range(0, len(prompts), self.batch_size):
            batch = prompts[i:i + self.batch_size]
            tasks = [self.process_single(p) for p in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            results.extend(batch_results)
            
            if i + self.batch_size < len(prompts):
                await asyncio.sleep(self.delay)
        
        return results

2. 缓存策略

import hashlib
import json
from typing import Optional, Any
import redis

class SmartCache:
    """智能缓存系统"""
    
    def __init__(self, redis_url: str, ttl: int = 3600):
        self.redis = redis.from_url(redis_url)
        self.ttl = ttl
    
    def _generate_key(self, messages: list, model: str, **kwargs) -> str:
        """生成缓存键"""
        key_data = {
            "messages": messages,
            "model": model,
            **{k: v for k, v in kwargs.items() if k in ["temperature", "max_tokens"]}
        }
        return hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()
    
    def get(self, messages: list, model: str, **kwargs) -> Optional[str]:
        """获取缓存"""
        key = self._generate_key(messages, model, **kwargs)
        cached = self.redis.get(key)
        return cached.decode() if cached else None
    
    def set(self, messages: list, model: str, response: str, **kwargs):
        """设置缓存"""
        key = self._generate_key(messages, model, **kwargs)
        self.redis.setex(key, self.ttl, response)
    
    def get_or_compute(
        self,
        messages: list,
        model: str,
        compute_fn,
        **kwargs
    ) -> Any:
        """获取缓存或计算"""
        cached = self.get(messages, model, **kwargs)
        if cached:
            return cached
        
        result = compute_fn()
        self.set(messages, model, result, **kwargs)
        return result

3. 并发控制

import asyncio
from dataclasses import dataclass

@dataclass
class RateLimitConfig:
    """速率限制配置"""
    requests_per_second: float = 10.0
    tokens_per_minute: int = 100000
    max_concurrent: int = 20

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, config: RateLimitConfig):
        self.config = config
        self.semaphore = asyncio.Semaphore(config.max_concurrent)
        self.request_times: list[float] = []
        self.token_usage: list[tuple[float, int]] = []
    
    async def acquire(self, tokens: int = 0):
        """获取执行许可"""
        await self.semaphore.acquire()
        
        now = asyncio.get_event_loop().time()
        
        # 检查请求速率
        self.request_times = [t for t in self.request_times if now - t < 1]
        if len(self.request_times) >= self.config.requests_per_second:
            sleep_time = 1 - (now - self.request_times[0])
            await asyncio.sleep(sleep_time)
        
        # 检查 Token 使用量
        if tokens > 0:
            self.token_usage = [(t, u) for t, u in self.token_usage if now - t < 60]
            total_tokens = sum(u for _, u in self.token_usage)
            if total_tokens + tokens > self.config.tokens_per_minute:
                sleep_time = 60 - (now - self.token_usage[0][0])
                await asyncio.sleep(sleep_time)
        
        self.request_times.append(now)
        if tokens > 0:
            self.token_usage.append((now, tokens))
    
    def release(self):
        """释放许可"""
        self.semaphore.release()
    
    async def __aenter__(self):
        await self.acquire()
        return self
    
    async def __aexit__(self, *args):
        self.release()

成本控制

Token 使用监控

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import json

@dataclass
class UsageRecord:
    """使用记录"""
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cost: float
    user_id: str = None
    conversation_id: str = None

class CostTracker:
    """成本追踪器"""
    
    # 定价表(2024年参考)
    PRICING = {
        "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
        "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
        "claude-3-5-sonnet": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
        "claude-3-5-haiku": {"input": 0.80 / 1_000_000, "output": 4.00 / 1_000_000},
    }
    
    def __init__(self):
        self.records: List[UsageRecord] = []
        self.daily_budget: float = 100.0  # 日预算
        self.alert_threshold: float = 0.8  # 告警阈值
    
    def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
        """计算成本"""
        pricing = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])
        return (
            input_tokens * pricing["input"] +
            output_tokens * pricing["output"]
        )
    
    def record(self, model: str, input_tokens: int, output_tokens: int, **kwargs):
        """记录使用"""
        cost = self.calculate_cost(model, input_tokens, output_tokens)
        
        record = UsageRecord(
            timestamp=datetime.now(),
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            cost=cost,
            **kwargs
        )
        
        self.records.append(record)
        
        # 检查预算
        self._check_budget()
    
    def _check_budget(self):
        """检查预算"""
        today = datetime.now().date()
        today_cost = sum(
            r.cost for r in self.records
            if r.timestamp.date() == today
        )
        
        if today_cost >= self.daily_budget * self.alert_threshold:
            self._send_alert(today_cost)
    
    def _send_alert(self, current_cost: float):
        """发送告警"""
        print(f"⚠️ 成本告警:今日已消费 ${current_cost:.2f}")
    
    def get_summary(self, period: str = "day") -> Dict:
        """获取统计摘要"""
        now = datetime.now()
        
        if period == "day":
            start = now.replace(hour=0, minute=0, second=0, microsecond=0)
        elif period == "week":
            start = now - timedelta(days=7)
        elif period == "month":
            start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
        else:
            start = datetime.min
        
        filtered = [r for r in self.records if r.timestamp >= start]
        
        return {
            "period": period,
            "total_cost": sum(r.cost for r in filtered),
            "total_tokens": sum(r.input_tokens + r.output_tokens for r in filtered),
            "request_count": len(filtered),
            "by_model": self._group_by_model(filtered)
        }
    
    def _group_by_model(self, records: List[UsageRecord]) -> Dict:
        """按模型分组统计"""
        result = {}
        for r in records:
            if r.model not in result:
                result[r.model] = {"cost": 0, "tokens": 0, "count": 0}
            result[r.model]["cost"] += r.cost
            result[r.model]["tokens"] += r.input_tokens + r.output_tokens
            result[r.model]["count"] += 1
        return result

成本优化策略

class CostOptimizer:
    """成本优化器"""
    
    def __init__(self, cost_tracker: CostTracker):
        self.tracker = cost_tracker
    
    def select_model(self, task_complexity: str) -> str:
        """根据任务复杂度选择模型"""
        model_mapping = {
            "simple": "gpt-4o-mini",      # 简单任务
            "medium": "gpt-4o-mini",      # 中等任务
            "complex": "gpt-4o",          # 复杂任务
            "reasoning": "claude-3-5-sonnet"  # 推理任务
        }
        return model_mapping.get(task_complexity, "gpt-4o-mini")
    
    def optimize_prompt(self, prompt: str) -> str:
        """优化提示词以减少 Token"""
        # 移除多余空格
        prompt = " ".join(prompt.split())
        # 移除重复内容
        # ... 其他优化
        return prompt
    
    def should_use_cache(self, prompt: str) -> bool:
        """判断是否应该使用缓存"""
        # 相似请求使用缓存
        return len(prompt) > 100

监控告警

Prometheus 指标

from prometheus_client import Counter, Histogram, Gauge
import time

# 定义指标
REQUEST_COUNT = Counter(
    'llm_requests_total',
    'Total LLM API requests',
    ['model', 'status']
)

REQUEST_LATENCY = Histogram(
    'llm_request_latency_seconds',
    'LLM API request latency',
    ['model'],
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)

TOKEN_USAGE = Counter(
    'llm_tokens_total',
    'Total tokens used',
    ['model', 'type']  # type: input/output
)

COST_TOTAL = Counter(
    'llm_cost_total_dollars',
    'Total cost in dollars',
    ['model']
)

ACTIVE_CONVERSATIONS = Gauge(
    'active_conversations',
    'Number of active conversations'
)


class MonitoredLLMClient:
    """带监控的 LLM 客户端"""
    
    def __init__(self, client):
        self.client = client
    
    async def chat(self, model: str, messages: list, **kwargs):
        """带监控的对话"""
        start_time = time.time()
        status = "success"
        
        try:
            response = await self.client.chat.completions.create(
                model=model,
                messages=messages,
                **kwargs
            )
            
            # 记录 Token 使用
            TOKEN_USAGE.labels(model=model, type="input").inc(
                response.usage.prompt_tokens
            )
            TOKEN_USAGE.labels(model=model, type="output").inc(
                response.usage.completion_tokens
            )
            
            return response
            
        except Exception as e:
            status = "error"
            raise
            
        finally:
            # 记录请求
            REQUEST_COUNT.labels(model=model, status=status).inc()
            
            # 记录延迟
            latency = time.time() - start_time
            REQUEST_LATENCY.labels(model=model).observe(latency)

Grafana 仪表板

{
  "dashboard": {
    "title": "AI Application Monitoring",
    "panels": [
      {
        "title": "Request Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_requests_total[5m])"
          }
        ]
      },
      {
        "title": "Latency",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m]))"
          }
        ]
      },
      {
        "title": "Token Usage",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(llm_tokens_total[1h])"
          }
        ]
      },
      {
        "title": "Cost",
        "type": "stat",
        "targets": [
          {
            "expr": "sum(llm_cost_total_dollars)"
          }
        ]
      }
    ]
  }
}

告警规则

# prometheus/alerts.yml
groups:
  - name: ai_application
    rules:
      - alert: HighErrorRate
        expr: rate(llm_requests_total{status="error"}[5m]) > 0.1
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate detected"
          description: "Error rate is {{ $value }} per second"

      - alert: HighLatency
        expr: histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m])) > 10
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "High latency detected"
          description: "95th percentile latency is {{ $value }}s"

      - alert: BudgetExceeded
        expr: sum(llm_cost_total_dollars) > 100
        for: 1h
        labels:
          severity: warning
        annotations:
          summary: "Daily budget exceeded"
          description: "Total cost is ${{ $value }}"

安全最佳实践

1. API Key 管理

import os
from cryptography.fernet import Fernet

class SecureConfig:
    """安全配置管理"""
    
    def __init__(self, encryption_key: bytes = None):
        self.key = encryption_key or Fernet.generate_key()
        self.cipher = Fernet(self.key)
    
    def encrypt_key(self, api_key: str) -> bytes:
        """加密 API Key"""
        return self.cipher.encrypt(api_key.encode())
    
    def decrypt_key(self, encrypted: bytes) -> str:
        """解密 API Key"""
        return self.cipher.decrypt(encrypted).decode()

2. 输入验证

import re
from typing import Optional

class InputValidator:
    """输入验证器"""
    
    MAX_INPUT_LENGTH = 10000
    FORBIDDEN_PATTERNS = [
        r"ignore\s+previous\s+instructions",
        r"system\s*:",
        r"<\|.*?\|>",
    ]
    
    @classmethod
    def validate(cls, user_input: str) -> tuple[bool, Optional[str]]:
        """验证用户输入"""
        
        # 检查长度
        if len(user_input) > cls.MAX_INPUT_LENGTH:
            return False, "输入过长"
        
        # 检查禁止模式
        for pattern in cls.FORBIDDEN_PATTERNS:
            if re.search(pattern, user_input, re.IGNORECASE):
                return False, "输入包含禁止内容"
        
        return True, None

总结与展望

教程回顾

恭喜你完成了整个教程!让我们回顾一下学到的内容:

┌─────────────────────────────────────────────────────────┐
│                    知识体系总览                          │
├─────────────────────────────────────────────────────────┤
│  【基础篇】                                              │
│  第一章:大模型概述与发展历程                            │
│  第二章:主流大模型介绍与选择                            │
│  第三章:API 调用基础                                    │
├─────────────────────────────────────────────────────────┤
│  【实践篇】                                              │
│  第四章:Prompt Engineering 提示词工程                   │
│  第五章:大模型 API 集成开发实战                         │
│  第六章:构建第一个 AI 应用                              │
├─────────────────────────────────────────────────────────┤
│  【进阶篇】                                              │
│  第七章:RAG 检索增强生成                                │
│  第八章:Agent 智能体开发                                │
├─────────────────────────────────────────────────────────┤
│  【生产篇】                                              │
│  第九章:应用架构与生产部署                              │
└─────────────────────────────────────────────────────────┘

学习路径建议

入门 → 进阶 → 实战 → 专家
  │       │       │       │
  │       │       │       └── 研究前沿论文,参与开源项目
  │       │       │
  │       │       └── 开发完整项目,部署上线
  │       │
  │       └── 深入 RAG、Agent 等高级技术
  │
  └── 掌握 API 调用和提示词工程基础

持续学习资源

  1. 官方文档:OpenAI、Anthropic、LangChain 官方文档
  2. 论文追踪:arXiv、Papers with Code
  3. 开源项目:LangChain、LlamaIndex、AutoGPT
  4. 社区:GitHub、Discord、Twitter/X

未来发展方向

  • 多模态 AI:文本、图像、音频、视频的统一理解
  • 具身智能:AI 与物理世界的交互
  • AGI 探索:通用人工智能的研究
  • AI 安全:对齐、可解释性、安全部署

感谢你的学习!希望这个教程能够帮助你开启 AI 应用开发的旅程。

如有问题或建议,欢迎交流讨论!

分享: