大模型应用开发教程(九):应用架构与生产部署
2024-07-18·5 分钟阅读
大模型应用开发教程(九):应用架构与生产部署
前言
恭喜你完成前面八章的学习!本章作为教程的收官之作,将聚焦于将 AI 应用从原型推向生产的核心技能——架构设计、性能优化、成本控制和监控告警。
生产级架构设计
整体架构
┌─────────────────────────────────────────────────────────────────┐
│ 生产级 AI 应用架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ ┌─────────────────────────────┐ │
│ │ 客户端 │───→│ CDN/WAF │───→│ API Gateway │ │
│ └─────────┘ └─────────┘ │ - 限流 - 认证 - 路由 │ │
│ └─────────────────────────────┘ │
│ │ │
│ ┌────────────────────────────┼────────────────┐ │
│ ↓ ↓ ↓ │
│ ┌───────────────────┐ ┌───────────────────┐ ┌────────────┐│
│ │ 应用服务层 │ │ AI 服务层 │ │ 管理服务 ││
│ │ - 业务逻辑 │ │ - LLM 调用 │ │ - 配置管理 ││
│ │ - 用户管理 │ │ - RAG 检索 │ │ - 监控面板 ││
│ │ - 会话管理 │ │ - Agent 执行 │ │ - 日志分析 ││
│ └───────────────────┘ └───────────────────┘ └────────────┘│
│ │ │ │
│ └──────────┬───────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ 数据层 ││
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────────┐ ││
│ │ │PostgreSQL│ │ Redis │ │ 向量数据库│ │ 对象存储 │ ││
│ │ │ 业务数据 │ │ 缓存/队列│ │ 知识库 │ │ 文件/模型 │ ││
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────────┘ ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
微服务架构示例
# docker-compose.yml
version: '3.8'
services:
# API 网关
gateway:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
depends_on:
- api-service
- ai-service
# 应用服务
api-service:
build: ./api
environment:
- DATABASE_URL=postgresql://user:pass@postgres:5432/app
- REDIS_URL=redis://redis:6379
depends_on:
- postgres
- redis
# AI 服务
ai-service:
build: ./ai
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- MODEL_CACHE_DIR=/models
volumes:
- model-cache:/models
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
# 向量数据库
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
volumes:
- qdrant-storage:/qdrant/storage
# PostgreSQL
postgres:
image: postgres:15-alpine
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=app
volumes:
- postgres-data:/var/lib/postgresql/data
# Redis
redis:
image: redis:7-alpine
volumes:
- redis-data:/data
# 监控
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3001:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
postgres-data:
redis-data:
qdrant-storage:
model-cache:
性能优化策略
1. 请求优化
import asyncio
from typing import List
from openai import AsyncOpenAI
class BatchProcessor:
"""批量请求处理器"""
def __init__(self, batch_size: int = 10, delay: float = 0.1):
self.client = AsyncOpenAI()
self.batch_size = batch_size
self.delay = delay
async def process_single(self, prompt: str) -> str:
"""处理单个请求"""
response = await self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
async def process_batch(self, prompts: List[str]) -> List[str]:
"""批量处理"""
results = []
for i in range(0, len(prompts), self.batch_size):
batch = prompts[i:i + self.batch_size]
tasks = [self.process_single(p) for p in batch]
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
results.extend(batch_results)
if i + self.batch_size < len(prompts):
await asyncio.sleep(self.delay)
return results
2. 缓存策略
import hashlib
import json
from typing import Optional, Any
import redis
class SmartCache:
"""智能缓存系统"""
def __init__(self, redis_url: str, ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl
def _generate_key(self, messages: list, model: str, **kwargs) -> str:
"""生成缓存键"""
key_data = {
"messages": messages,
"model": model,
**{k: v for k, v in kwargs.items() if k in ["temperature", "max_tokens"]}
}
return hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()
def get(self, messages: list, model: str, **kwargs) -> Optional[str]:
"""获取缓存"""
key = self._generate_key(messages, model, **kwargs)
cached = self.redis.get(key)
return cached.decode() if cached else None
def set(self, messages: list, model: str, response: str, **kwargs):
"""设置缓存"""
key = self._generate_key(messages, model, **kwargs)
self.redis.setex(key, self.ttl, response)
def get_or_compute(
self,
messages: list,
model: str,
compute_fn,
**kwargs
) -> Any:
"""获取缓存或计算"""
cached = self.get(messages, model, **kwargs)
if cached:
return cached
result = compute_fn()
self.set(messages, model, result, **kwargs)
return result
3. 并发控制
import asyncio
from dataclasses import dataclass
@dataclass
class RateLimitConfig:
"""速率限制配置"""
requests_per_second: float = 10.0
tokens_per_minute: int = 100000
max_concurrent: int = 20
class RateLimiter:
"""速率限制器"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrent)
self.request_times: list[float] = []
self.token_usage: list[tuple[float, int]] = []
async def acquire(self, tokens: int = 0):
"""获取执行许可"""
await self.semaphore.acquire()
now = asyncio.get_event_loop().time()
# 检查请求速率
self.request_times = [t for t in self.request_times if now - t < 1]
if len(self.request_times) >= self.config.requests_per_second:
sleep_time = 1 - (now - self.request_times[0])
await asyncio.sleep(sleep_time)
# 检查 Token 使用量
if tokens > 0:
self.token_usage = [(t, u) for t, u in self.token_usage if now - t < 60]
total_tokens = sum(u for _, u in self.token_usage)
if total_tokens + tokens > self.config.tokens_per_minute:
sleep_time = 60 - (now - self.token_usage[0][0])
await asyncio.sleep(sleep_time)
self.request_times.append(now)
if tokens > 0:
self.token_usage.append((now, tokens))
def release(self):
"""释放许可"""
self.semaphore.release()
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
self.release()
成本控制
Token 使用监控
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, List
import json
@dataclass
class UsageRecord:
"""使用记录"""
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cost: float
user_id: str = None
conversation_id: str = None
class CostTracker:
"""成本追踪器"""
# 定价表(2024年参考)
PRICING = {
"gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
"gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
"claude-3-5-sonnet": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
"claude-3-5-haiku": {"input": 0.80 / 1_000_000, "output": 4.00 / 1_000_000},
}
def __init__(self):
self.records: List[UsageRecord] = []
self.daily_budget: float = 100.0 # 日预算
self.alert_threshold: float = 0.8 # 告警阈值
def calculate_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
"""计算成本"""
pricing = self.PRICING.get(model, self.PRICING["gpt-4o-mini"])
return (
input_tokens * pricing["input"] +
output_tokens * pricing["output"]
)
def record(self, model: str, input_tokens: int, output_tokens: int, **kwargs):
"""记录使用"""
cost = self.calculate_cost(model, input_tokens, output_tokens)
record = UsageRecord(
timestamp=datetime.now(),
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost=cost,
**kwargs
)
self.records.append(record)
# 检查预算
self._check_budget()
def _check_budget(self):
"""检查预算"""
today = datetime.now().date()
today_cost = sum(
r.cost for r in self.records
if r.timestamp.date() == today
)
if today_cost >= self.daily_budget * self.alert_threshold:
self._send_alert(today_cost)
def _send_alert(self, current_cost: float):
"""发送告警"""
print(f"⚠️ 成本告警:今日已消费 ${current_cost:.2f}")
def get_summary(self, period: str = "day") -> Dict:
"""获取统计摘要"""
now = datetime.now()
if period == "day":
start = now.replace(hour=0, minute=0, second=0, microsecond=0)
elif period == "week":
start = now - timedelta(days=7)
elif period == "month":
start = now.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
else:
start = datetime.min
filtered = [r for r in self.records if r.timestamp >= start]
return {
"period": period,
"total_cost": sum(r.cost for r in filtered),
"total_tokens": sum(r.input_tokens + r.output_tokens for r in filtered),
"request_count": len(filtered),
"by_model": self._group_by_model(filtered)
}
def _group_by_model(self, records: List[UsageRecord]) -> Dict:
"""按模型分组统计"""
result = {}
for r in records:
if r.model not in result:
result[r.model] = {"cost": 0, "tokens": 0, "count": 0}
result[r.model]["cost"] += r.cost
result[r.model]["tokens"] += r.input_tokens + r.output_tokens
result[r.model]["count"] += 1
return result
成本优化策略
class CostOptimizer:
"""成本优化器"""
def __init__(self, cost_tracker: CostTracker):
self.tracker = cost_tracker
def select_model(self, task_complexity: str) -> str:
"""根据任务复杂度选择模型"""
model_mapping = {
"simple": "gpt-4o-mini", # 简单任务
"medium": "gpt-4o-mini", # 中等任务
"complex": "gpt-4o", # 复杂任务
"reasoning": "claude-3-5-sonnet" # 推理任务
}
return model_mapping.get(task_complexity, "gpt-4o-mini")
def optimize_prompt(self, prompt: str) -> str:
"""优化提示词以减少 Token"""
# 移除多余空格
prompt = " ".join(prompt.split())
# 移除重复内容
# ... 其他优化
return prompt
def should_use_cache(self, prompt: str) -> bool:
"""判断是否应该使用缓存"""
# 相似请求使用缓存
return len(prompt) > 100
监控告警
Prometheus 指标
from prometheus_client import Counter, Histogram, Gauge
import time
# 定义指标
REQUEST_COUNT = Counter(
'llm_requests_total',
'Total LLM API requests',
['model', 'status']
)
REQUEST_LATENCY = Histogram(
'llm_request_latency_seconds',
'LLM API request latency',
['model'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
TOKEN_USAGE = Counter(
'llm_tokens_total',
'Total tokens used',
['model', 'type'] # type: input/output
)
COST_TOTAL = Counter(
'llm_cost_total_dollars',
'Total cost in dollars',
['model']
)
ACTIVE_CONVERSATIONS = Gauge(
'active_conversations',
'Number of active conversations'
)
class MonitoredLLMClient:
"""带监控的 LLM 客户端"""
def __init__(self, client):
self.client = client
async def chat(self, model: str, messages: list, **kwargs):
"""带监控的对话"""
start_time = time.time()
status = "success"
try:
response = await self.client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
# 记录 Token 使用
TOKEN_USAGE.labels(model=model, type="input").inc(
response.usage.prompt_tokens
)
TOKEN_USAGE.labels(model=model, type="output").inc(
response.usage.completion_tokens
)
return response
except Exception as e:
status = "error"
raise
finally:
# 记录请求
REQUEST_COUNT.labels(model=model, status=status).inc()
# 记录延迟
latency = time.time() - start_time
REQUEST_LATENCY.labels(model=model).observe(latency)
Grafana 仪表板
{
"dashboard": {
"title": "AI Application Monitoring",
"panels": [
{
"title": "Request Rate",
"type": "graph",
"targets": [
{
"expr": "rate(llm_requests_total[5m])"
}
]
},
{
"title": "Latency",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m]))"
}
]
},
{
"title": "Token Usage",
"type": "graph",
"targets": [
{
"expr": "rate(llm_tokens_total[1h])"
}
]
},
{
"title": "Cost",
"type": "stat",
"targets": [
{
"expr": "sum(llm_cost_total_dollars)"
}
]
}
]
}
}
告警规则
# prometheus/alerts.yml
groups:
- name: ai_application
rules:
- alert: HighErrorRate
expr: rate(llm_requests_total{status="error"}[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value }} per second"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(llm_request_latency_seconds_bucket[5m])) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s"
- alert: BudgetExceeded
expr: sum(llm_cost_total_dollars) > 100
for: 1h
labels:
severity: warning
annotations:
summary: "Daily budget exceeded"
description: "Total cost is ${{ $value }}"
安全最佳实践
1. API Key 管理
import os
from cryptography.fernet import Fernet
class SecureConfig:
"""安全配置管理"""
def __init__(self, encryption_key: bytes = None):
self.key = encryption_key or Fernet.generate_key()
self.cipher = Fernet(self.key)
def encrypt_key(self, api_key: str) -> bytes:
"""加密 API Key"""
return self.cipher.encrypt(api_key.encode())
def decrypt_key(self, encrypted: bytes) -> str:
"""解密 API Key"""
return self.cipher.decrypt(encrypted).decode()
2. 输入验证
import re
from typing import Optional
class InputValidator:
"""输入验证器"""
MAX_INPUT_LENGTH = 10000
FORBIDDEN_PATTERNS = [
r"ignore\s+previous\s+instructions",
r"system\s*:",
r"<\|.*?\|>",
]
@classmethod
def validate(cls, user_input: str) -> tuple[bool, Optional[str]]:
"""验证用户输入"""
# 检查长度
if len(user_input) > cls.MAX_INPUT_LENGTH:
return False, "输入过长"
# 检查禁止模式
for pattern in cls.FORBIDDEN_PATTERNS:
if re.search(pattern, user_input, re.IGNORECASE):
return False, "输入包含禁止内容"
return True, None
总结与展望
教程回顾
恭喜你完成了整个教程!让我们回顾一下学到的内容:
┌─────────────────────────────────────────────────────────┐
│ 知识体系总览 │
├─────────────────────────────────────────────────────────┤
│ 【基础篇】 │
│ 第一章:大模型概述与发展历程 │
│ 第二章:主流大模型介绍与选择 │
│ 第三章:API 调用基础 │
├─────────────────────────────────────────────────────────┤
│ 【实践篇】 │
│ 第四章:Prompt Engineering 提示词工程 │
│ 第五章:大模型 API 集成开发实战 │
│ 第六章:构建第一个 AI 应用 │
├─────────────────────────────────────────────────────────┤
│ 【进阶篇】 │
│ 第七章:RAG 检索增强生成 │
│ 第八章:Agent 智能体开发 │
├─────────────────────────────────────────────────────────┤
│ 【生产篇】 │
│ 第九章:应用架构与生产部署 │
└─────────────────────────────────────────────────────────┘
学习路径建议
入门 → 进阶 → 实战 → 专家
│ │ │ │
│ │ │ └── 研究前沿论文,参与开源项目
│ │ │
│ │ └── 开发完整项目,部署上线
│ │
│ └── 深入 RAG、Agent 等高级技术
│
└── 掌握 API 调用和提示词工程基础
持续学习资源
- 官方文档:OpenAI、Anthropic、LangChain 官方文档
- 论文追踪:arXiv、Papers with Code
- 开源项目:LangChain、LlamaIndex、AutoGPT
- 社区:GitHub、Discord、Twitter/X
未来发展方向
- 多模态 AI:文本、图像、音频、视频的统一理解
- 具身智能:AI 与物理世界的交互
- AGI 探索:通用人工智能的研究
- AI 安全:对齐、可解释性、安全部署
感谢你的学习!希望这个教程能够帮助你开启 AI 应用开发的旅程。
如有问题或建议,欢迎交流讨论!
相关文章
Milvus底层原理(一):概述与架构设计
2026-03-10·9 分钟阅读
深入理解 Milvus 向量数据库的整体架构设计,探索存储计算分离、分布式查询、向量索引等核心原理,为后续深入学习 Milvus 底层实现奠定基础。
Milvus底层原理(七):数据模型与存储
2026-03-10·5 分钟阅读
深入理解 Milvus 的数据模型设计,掌握 Collection、Partition、Segment 的层次结构,了解列式存储格式和 Schema 设计原则。
Milvus底层原理(八):数据写入流程
2026-03-10·5 分钟阅读
深入理解 Milvus 的数据写入流程,掌握从客户端请求到数据持久化的完整链路,了解写入优化策略和数据一致性保证机制。