← 返回文章列表

从零到一实现生产级 MCP Gateway(四):认证与授权

2025-03-07·8 分钟阅读

从零到一实现生产级 MCP Gateway(四):认证授权实现

前言

在企业环境中,MCP Gateway 需要严格的认证授权机制来保护敏感工具和资源。本章将深入实现 JWT Token、API Key 双认证机制,以及基于角色的访问控制(RBAC)权限模型。

设计思路:为什么 Gateway 需要认证授权?

问题背景

没有认证的 MCP Gateway 就像一个"敞开大门"的服务:

┌─────────────────────────────────────────────────────────────────────┐
│                    无认证的风险                                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  风险场景 1:敏感数据泄露                                            │
│  - 任何人可以调用 read_file 读取任意文件                            │
│  - 任何人可以调用 list_resources 列出所有资源                       │
│                                                                      │
│  风险场景 2:资源滥用                                                │
│  - 恶意用户无限调用工具,消耗资源                                    │
│  - 无限制访问 LLM API,产生高额费用                                  │
│                                                                      │
│  风险场景 3:权限越界                                                │
│  - 普通用户调用管理员工具                                            │
│  - 访问不属于自己的项目资源                                          │
│                                                                      │
│  风险场景 4:无法追踪                                                │
│  - 不知道谁在什么时候做了什么                                        │
│  - 出问题时无法追责                                                  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

认证 vs 授权

┌─────────────────────────────────────────────────────────────────────┐
│                    认证与授权的区别                                   │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  认证(Authentication):你是谁?                                    │
│  - 验证用户/服务身份                                                 │
│  - 产出:身份标识(user_id, client_id)                             │
│  - 方式:API Key、JWT、OAuth 2.0                                    │
│                                                                      │
│  授权(Authorization):你能做什么?                                 │
│  - 验证用户权限                                                      │
│  - 产出:权限列表(roles, permissions)                              │
│  - 方式:RBAC、ABAC、ACL                                            │
│                                                                      │
│  关系:                                                              │
│  请求 → 认证(获取身份)→ 授权(检查权限)→ 执行/拒绝                │
│                                                                      │
│  示例:                                                              │
│  请求: { Authorization: "Bearer jwt_token" }                         │
│  认证: JWT 解析出 { user_id: "alice", role: "developer" }            │
│  授权: 检查 developer 角色是否有权限调用 write_file                  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

为什么选择 JWT + API Key 双模式?

┌─────────────────────────────────────────────────────────────────────┐
│                    认证方式对比                                       │
├─────────────────────────────────────────────────────────────────────┤
│                                                                      │
│  方式 A:API Key                                                     │
│  - 优点:简单,适合服务间调用                                        │
│  - 缺点:无法携带用户信息,需要额外存储                              │
│  - 适用:后端服务、CI/CD                                            │
│                                                                      │
│  方式 B:JWT                                                         │
│  - 优点:无状态,携带用户信息,支持过期                              │
│  - 缺点:无法主动失效,Token 泄露风险                                │
│  - 适用:用户登录、前端应用                                          │
│                                                                      │
│  方式 C:OAuth 2.0                                                   │
│  - 优点:标准化,支持第三方授权                                      │
│  - 缺点:复杂,需要额外服务                                          │
│  - 适用:公开平台、SaaS                                              │
│                                                                      │
│  本文选择:JWT + API Key 双模式                                      │
│  - JWT:用户登录场景                                                │
│  - API Key:服务间调用场景                                          │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

方案对比:权限控制模型

方案一:ACL(访问控制列表)

# 每个资源维护一个访问列表
acl = {
    "/project/1/read_file": ["alice", "bob"],
    "/project/2/read_file": ["charlie"],
}

def check_permission(user: str, resource: str) -> bool:
    return user in acl.get(resource, [])

优点:细粒度控制
缺点:管理复杂,资源多时性能差
适用:资源数量少的场景

方案二:RBAC(基于角色的访问控制)

# 用户 -> 角色 -> 权限
roles = {
    "admin": ["read", "write", "delete"],
    "developer": ["read", "write"],
    "viewer": ["read"],
}

user_roles = {
    "alice": ["admin"],
    "bob": ["developer"],
}

def check_permission(user: str, permission: str) -> bool:
    for role in user_roles.get(user, []):
        if permission in roles[role]:
            return True
    return False

优点:管理简单,易于理解
缺点:不够灵活
适用:大多数企业应用(本文选择)

方案三:ABAC(基于属性的访问控制)

# 基于属性动态判断
def check_permission(user: User, resource: Resource, action: str) -> bool:
    policy = """
    user.role == "admin" OR
    (user.department == resource.department AND action in ["read", "write"])
    """
    return evaluate(policy, {
        "user": user,
        "resource": resource,
        "action": action,
    })

优点:灵活,支持复杂规则
缺点:实现复杂,性能开销大
适用:复杂权限场景

常见陷阱与解决方案

陷阱一:JWT Secret 使用弱密钥

问题描述

# 危险:使用简单字符串作为 Secret
SECRET = "my-secret-key"  # 可被暴力破解

# 危险:硬编码在代码中
SECRET = "super_secret_123"  # 泄露后无法更改

解决方案:使用强密钥 + 环境变量

import os
import secrets

# 生成强密钥
# python -c "import secrets; print(secrets.token_urlsafe(32))"
SECRET_KEY = os.environ.get("JWT_SECRET_KEY")

if not SECRET_KEY:
    raise ValueError("JWT_SECRET_KEY environment variable not set")

# 签名时使用
token = jwt.encode(payload, SECRET_KEY, algorithm="HS256")

陷阱二:忽略 JWT 过期检查

问题描述

# 只解析,不验证过期时间
payload = jwt.decode(token, SECRET_KEY, options={"verify_exp": False})

# 危险:已过期的 Token 仍然有效

解决方案:始终验证过期时间

try:
    payload = jwt.decode(
        token,
        SECRET_KEY,
        algorithms=["HS256"],
        options={"require": ["exp", "iat"]},  # 必须包含 exp 和 iat
    )
except jwt.ExpiredSignatureError:
    raise HTTPException(401, "Token has expired")
except jwt.InvalidTokenError:
    raise HTTPException(401, "Invalid token")

陷阱三:API Key 明文存储

问题描述

# 数据库中明文存储
api_keys = {
    "alice": "sk_live_abc123...",  # 泄露即失效
}

解决方案:存储哈希值

import hashlib

def hash_api_key(key: str) -> str:
    return hashlib.sha256(key.encode()).hexdigest()

# 存储
hashed = hash_api_key(api_key)
db.save(user_id, hashed)

# 验证
def verify_api_key(key: str) -> User:
    hashed = hash_api_key(key)
    return db.find_by_hashed_key(hashed)

# 注意:原始 Key 只在创建时显示一次

陷阱四:权限检查遗漏

问题描述

# 有的接口忘记检查权限
@app.post("/tools/execute")
async def execute_tool(request: Request):
    # 直接执行,未检查权限
    return await tool_executor.run(request.tool, request.params)

解决方案:使用中间件统一处理

@app.middleware("http")
async def auth_middleware(request: Request, call_next):
    # 排除公开路径
    if request.url.path in ["/health", "/login"]:
        return await call_next(request)
    
    # 统一认证授权
    user = await authenticate(request)
    if not await authorize(user, request.url.path, request.method):
        return JSONResponse({"error": "Forbidden"}, 403)
    
    request.state.user = user
    return await call_next(request)

陷阱五:敏感信息记录在日志中

问题描述

# 日志中记录了敏感信息
logger.info(f"User login: {username}, password: {password}")
logger.debug(f"API Key: {api_key}")

解决方案:脱敏处理

def mask_sensitive(data: dict) -> dict:
    """脱敏敏感字段"""
    sensitive_fields = {"password", "token", "api_key", "secret"}
    return {
        k: "***" if k in sensitive_fields else v
        for k, v in data.items()
    }

logger.info("User login", extra=mask_sensitive({
    "username": username,
    "password": password,  # 会被脱敏为 ***
}))

认证架构

┌─────────────────────────────────────────────────────────────────────┐
│                     Authentication Architecture                      │
│                                                                      │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                      Request                                  │  │
│  │  Authorization: Bearer <JWT>  或  X-API-Key: <API Key>       │  │
│  └───────────────────────────┬───────────────────────────────────┘  │
│                              │                                       │
│                              ▼                                       │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                   Auth Middleware                             │  │
│  │  ┌─────────────┐     ┌─────────────┐                         │  │
│  │  │ JWT验证     │ 或  │ API Key验证 │                         │  │
│  │  │ • 解析Token │     │ • 查询DB    │                         │  │
│  │  │ • 验证签名  │     │ • 验证状态  │                         │  │
│  │  │ • 提取Claims│     │ • 提取角色  │                         │  │
│  │  └──────┬──────┘     └──────┬──────┘                         │  │
│  │         │                   │                                 │  │
│  │         └─────────┬─────────┘                                │  │
│  │                   ▼                                          │  │
│  │  ┌─────────────────────────────────────┐                    │  │
│  │  │          构建用户上下文              │                    │  │
│  │  │  {user_id, roles, permissions, ...} │                    │  │
│  │  └─────────────────┬───────────────────┘                    │  │
│  └────────────────────┼────────────────────────────────────────┘  │
│                       ▼                                              │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                    RBAC 检查                                  │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐          │  │
│  │  │ 资源权限    │  │ 操作权限    │  │ 条件权限    │          │  │
│  │  │ tool:xxx    │  │ tool:execute│  │ owner=user  │          │  │
│  │  └─────────────┘  └─────────────┘  └─────────────┘          │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

JWT Token 管理

JWT Manager 实现

# auth/jwt.py

from __future__ import annotations
import logging
from datetime import datetime, timedelta, timezone
from typing import Any

import jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

from ..config import get_config

logger = logging.getLogger(__name__)

security = HTTPBearer(auto_error=False)


class JWTManager:
    """JWT Token 管理器
    
    功能:
    - Access Token 生成
    - Refresh Token 生成
    - Token 验证
    - Token 刷新
    
    Example:
        manager = JWTManager(secret_key="my-secret")
        token = manager.create_token(user_id="user123", roles=["admin"])
        payload = manager.verify_token(token)
    """
    
    def __init__(
        self,
        secret_key: str,
        algorithm: str = "HS256",
        access_token_expire_minutes: int = 30,
        refresh_token_expire_days: int = 7,
    ):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.access_token_expire_minutes = access_token_expire_minutes
        self.refresh_token_expire_days = refresh_token_expire_days
    
    def create_access_token(
        self,
        user_id: str,
        roles: list[str],
        permissions: list[str] | None = None,
        additional_claims: dict[str, Any] | None = None,
    ) -> str:
        """创建 Access Token
        
        Args:
            user_id: 用户标识
            roles: 用户角色列表
            permissions: 特定权限列表
            additional_claims: 额外 JWT Claims
            
        Returns:
            编码后的 JWT Access Token
        """
        now = datetime.now(timezone.utc)
        expire = now + timedelta(minutes=self.access_token_expire_minutes)
        
        payload = {
            "sub": user_id,
            "roles": roles,
            "type": "access",
            "exp": expire,
            "iat": now,
        }
        
        if permissions:
            payload["permissions"] = permissions
        
        if additional_claims:
            payload.update(additional_claims)
        
        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        logger.debug(f"Created access token for user: {user_id}")
        return token
    
    def create_refresh_token(self, user_id: str) -> str:
        """创建 Refresh Token
        
        Args:
            user_id: 用户标识
            
        Returns:
            编码后的 JWT Refresh Token
        """
        now = datetime.now(timezone.utc)
        expire = now + timedelta(days=self.refresh_token_expire_days)
        
        payload = {
            "sub": user_id,
            "type": "refresh",
            "exp": expire,
            "iat": now,
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        logger.debug(f"Created refresh token for user: {user_id}")
        return token
    
    def verify_token(
        self, 
        token: str, 
        expected_type: str = "access"
    ) -> dict[str, Any]:
        """验证并解码 JWT Token
        
        Args:
            token: JWT Token 字符串
            expected_type: 期望的 Token 类型
            
        Returns:
            解码后的 Token Payload
            
        Raises:
            HTTPException: Token 无效或已过期
        """
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm],
            )
            
            if payload.get("type") != expected_type:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail=f"Invalid token type. Expected {expected_type}",
                    headers={"WWW-Authenticate": "Bearer"},
                )
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token has expired",
                headers={"WWW-Authenticate": "Bearer"},
            )
        except jwt.InvalidTokenError as e:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail=f"Invalid token: {str(e)}",
                headers={"WWW-Authenticate": "Bearer"},
            )
    
    def refresh_access_token(
        self,
        refresh_token: str,
        roles: list[str],
        permissions: list[str] | None = None,
    ) -> str:
        """使用 Refresh Token 刷新 Access Token
        
        Args:
            refresh_token: 有效的 Refresh Token
            roles: 用户角色(应从 DB 获取)
            permissions: 用户权限
            
        Returns:
            新的 Access Token
        """
        payload = self.verify_token(refresh_token, expected_type="refresh")
        user_id = payload.get("sub")
        
        return self.create_access_token(
            user_id=user_id,
            roles=roles,
            permissions=permissions,
        )

FastAPI 依赖注入

async def get_current_user(
    credentials: HTTPAuthorizationCredentials | None = Depends(security),
) -> dict[str, Any]:
    """FastAPI 依赖:获取当前认证用户
    
    使用方式:
        @app.get("/protected")
        async def protected_route(user: dict = Depends(get_current_user)):
            return {"user_id": user["sub"]}
    """
    if credentials is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Not authenticated",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    config = get_config()
    manager = JWTManager(secret_key=config.auth.secret_key)
    return manager.verify_token(credentials.credentials)

API Key 管理

API Key Manager 实现

# auth/api_key.py

from __future__ import annotations
import hashlib
import logging
import secrets
from dataclasses import dataclass
from datetime import datetime
from typing import Any

from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader

from ..config import get_config
from ..storage import APIKeyRepository

logger = logging.getLogger(__name__)

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)


@dataclass
class APIKeyInfo:
    """API Key 信息"""
    id: str
    name: str
    prefix: str                    # Key 前缀(用于识别)
    key_hash: str                  # Key 哈希(存储)
    roles: list[str]
    permissions: list[str]
    is_active: bool = True
    expires_at: datetime | None = None
    created_at: datetime | None = None
    last_used_at: datetime | None = None
    metadata: dict[str, Any] | None = None


class APIKeyManager:
    """API Key 管理器
    
    功能:
    - API Key 生成
    - API Key 验证
    - API Key 撤销
    
    安全设计:
    - 使用前缀识别 Key
    - 只存储 Key 哈希
    - 原始 Key 只在生成时显示一次
    """
    
    KEY_PREFIX = "mcp_"           # Key 前缀
    KEY_LENGTH = 32               # Key 随机部分长度
    
    def __init__(self, repository: APIKeyRepository | None = None):
        self.repository = repository
    
    def generate_key(
        self,
        name: str,
        roles: list[str],
        permissions: list[str] | None = None,
        expires_days: int | None = None,
    ) -> tuple[str, APIKeyInfo]:
        """生成新的 API Key
        
        Args:
            name: Key 名称
            roles: 角色
            permissions: 权限
            expires_days: 过期天数
            
        Returns:
            (原始 Key, API Key 信息)
        """
        # 生成随机 Key
        raw_key = secrets.token_urlsafe(self.KEY_LENGTH)
        full_key = f"{self.KEY_PREFIX}{raw_key}"
        
        # 计算哈希
        key_hash = self._hash_key(full_key)
        
        # 提取前缀(前 8 个字符用于识别)
        prefix = full_key[:8]
        
        # 构建 Key 信息
        now = datetime.now()
        expires_at = None
        if expires_days:
            expires_at = now + timedelta(days=expires_days)
        
        api_key_info = APIKeyInfo(
            id=secrets.token_urlsafe(8),
            name=name,
            prefix=prefix,
            key_hash=key_hash,
            roles=roles,
            permissions=permissions or [],
            is_active=True,
            expires_at=expires_at,
            created_at=now,
        )
        
        logger.info(f"Generated API Key: {name} (prefix: {prefix})")
        
        return full_key, api_key_info
    
    def _hash_key(self, key: str) -> str:
        """计算 Key 哈希"""
        return hashlib.sha256(key.encode()).hexdigest()
    
    async def verify_key(
        self, 
        raw_key: str
    ) -> APIKeyInfo | None:
        """验证 API Key
        
        Args:
            raw_key: 原始 API Key
            
        Returns:
            验证成功返回 API Key 信息,失败返回 None
        """
        if not raw_key or not raw_key.startswith(self.KEY_PREFIX):
            return None
        
        # 计算哈希
        key_hash = self._hash_key(raw_key)
        
        # 从存储查询
        if self.repository:
            api_key = await self.repository.find_by_hash(key_hash)
            
            if api_key and api_key.is_active:
                # 检查过期
                if api_key.expires_at and api_key.expires_at < datetime.now():
                    return None
                
                # 更新最后使用时间
                await self.repository.update_last_used(api_key.id)
                
                return api_key
        
        return None
    
    async def revoke_key(self, key_id: str) -> bool:
        """撤销 API Key"""
        if self.repository:
            return await self.repository.deactivate(key_id)
        return False

FastAPI 依赖注入

async def get_current_user_from_api_key(
    api_key: str | None = Depends(api_key_header),
) -> dict[str, Any] | None:
    """FastAPI 依赖:从 API Key 获取用户信息"""
    if not api_key:
        return None
    
    manager = APIKeyManager()
    key_info = await manager.verify_key(api_key)
    
    if not key_info:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API Key",
        )
    
    return {
        "sub": key_info.id,
        "name": key_info.name,
        "roles": key_info.roles,
        "permissions": key_info.permissions,
    }

RBAC 权限模型

权限模型设计

# auth/rbac.py

from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable

logger = logging.getLogger(__name__)


class ActionType(str, Enum):
    """操作类型"""
    EXECUTE = "execute"      # 执行工具
    READ = "read"            # 读取资源
    WRITE = "write"          # 写入资源
    ADMIN = "admin"          # 管理操作


@dataclass
class Permission:
    """权限定义"""
    resource: str              # 资源标识,如 "tool:*" 或 "tool:echo"
    action: ActionType         # 操作类型
    conditions: dict[str, Any] = field(default_factory=dict)  # 条件


@dataclass
class Role:
    """角色定义"""
    name: str
    description: str = ""
    permissions: list[Permission] = field(default_factory=list)
    inherits: list[str] = field(default_factory=list)  # 继承的角色


# 内置角色定义
BUILTIN_ROLES: dict[str, Role] = {
    "admin": Role(
        name="admin",
        description="Full access to all resources",
        permissions=[
            Permission(resource="*", action=ActionType.ADMIN),
        ],
    ),
    "developer": Role(
        name="developer",
        description="Execute and read tools/resources",
        permissions=[
            Permission(resource="tool:*", action=ActionType.EXECUTE),
            Permission(resource="resource:*", action=ActionType.READ),
            Permission(resource="prompt:*", action=ActionType.READ),
        ],
    ),
    "viewer": Role(
        name="viewer",
        description="Read-only access",
        permissions=[
            Permission(resource="tool:*", action=ActionType.READ),
            Permission(resource="resource:*", action=ActionType.READ),
        ],
    ),
    "agent": Role(
        name="agent",
        description="Execute tools only",
        permissions=[
            Permission(resource="tool:*", action=ActionType.EXECUTE),
        ],
    ),
}

RBAC 检查器实现

class RBACChecker:
    """RBAC 权限检查器
    
    功能:
    - 权限检查
    - 角色继承
    - 条件权限
    
    检查流程:
    1. 检查 Token 中的直接权限
    2. 检查用户角色的权限
    3. 检查条件权限
    4. 返回结果
    """
    
    def __init__(self, roles: dict[str, Role] | None = None):
        self.roles = roles or BUILTIN_ROLES
    
    def check_permission(
        self,
        user_context: dict[str, Any],
        resource: str,
        action: ActionType,
        resource_context: dict[str, Any] | None = None,
    ) -> bool:
        """检查权限
        
        Args:
            user_context: 用户上下文(包含 roles 和 permissions)
            resource: 资源标识
            action: 操作类型
            resource_context: 资源上下文(用于条件检查)
            
        Returns:
            是否有权限
        """
        # 1. 检查直接权限
        direct_permissions = user_context.get("permissions", [])
        for perm in direct_permissions:
            if self._match_permission(perm, resource, action):
                return True
        
        # 2. 检查角色权限
        user_roles = user_context.get("roles", [])
        for role_name in user_roles:
            if self._check_role_permission(role_name, resource, action, resource_context):
                return True
        
        logger.warning(
            f"Permission denied: user={user_context.get('sub')}, "
            f"resource={resource}, action={action}"
        )
        return False
    
    def _check_role_permission(
        self,
        role_name: str,
        resource: str,
        action: ActionType,
        resource_context: dict[str, Any] | None = None,
    ) -> bool:
        """检查角色权限"""
        role = self.roles.get(role_name)
        if not role:
            return False
        
        # 检查直接权限
        for perm in role.permissions:
            if self._match_permission(perm, resource, action):
                # 检查条件
                if perm.conditions:
                    if self._check_conditions(perm.conditions, resource_context):
                        return True
                else:
                    return True
        
        # 检查继承的角色
        for inherited_role in role.inherits:
            if self._check_role_permission(inherited_role, resource, action, resource_context):
                return True
        
        return False
    
    def _match_permission(
        self,
        permission: Permission | str,
        resource: str,
        action: ActionType,
    ) -> bool:
        """匹配权限"""
        if isinstance(permission, str):
            # 简单字符串权限格式:resource:action
            parts = permission.split(":")
            if len(parts) == 2:
                perm_resource, perm_action = parts
                return self._match_resource(perm_resource, resource) and perm_action == action.value
            return False
        
        return (
            self._match_resource(permission.resource, resource) and
            permission.action == action
        )
    
    def _match_resource(self, pattern: str, resource: str) -> bool:
        """匹配资源模式
        
        支持通配符:
        - * 匹配所有
        - tool:* 匹配所有工具
        - tool:echo 匹配特定工具
        """
        if pattern == "*":
            return True
        
        if "*" in pattern:
            # 前缀匹配
            prefix = pattern.rstrip("*")
            return resource.startswith(prefix.rstrip(":"))
        
        return pattern == resource
    
    def _check_conditions(
        self,
        conditions: dict[str, Any],
        resource_context: dict[str, Any] | None,
    ) -> bool:
        """检查条件权限"""
        if not resource_context:
            return False
        
        for key, value in conditions.items():
            if resource_context.get(key) != value:
                return False
        
        return True

权限检查装饰器

def require_permission(
    resource: str,
    action: ActionType,
):
    """权限检查装饰器
    
    使用方式:
        @require_permission("tool:admin", ActionType.ADMIN)
        async def admin_endpoint():
            ...
    """
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, _context: dict[str, Any] | None = None, **kwargs):
            if not _context:
                raise PermissionError("No user context provided")
            
            checker = RBACChecker()
            if not checker.check_permission(_context, resource, action):
                raise PermissionError(
                    f"Permission denied: {resource}:{action.value}"
                )
            
            return await func(*args, **kwargs)
        
        return wrapper
    return decorator

认证组合器

# auth/__init__.py

from __future__ import annotations
import logging
from typing import Any

from fastapi import Depends, HTTPException, status

from .jwt import JWTManager, get_current_user
from .api_key import APIKeyManager, get_current_user_from_api_key
from .rbac import RBACChecker, ActionType
from ..config import get_config

logger = logging.getLogger(__name__)


async def get_authenticated_user(
    jwt_user: dict[str, Any] | None = Depends(get_current_user),
    api_key_user: dict[str, Any] | None = Depends(get_current_user_from_api_key),
) -> dict[str, Any]:
    """获取已认证用户(支持 JWT 和 API Key 双认证)
    
    优先级:JWT > API Key
    """
    config = get_config()
    
    if not config.auth.enabled:
        # 认证禁用,返回默认用户
        return {
            "sub": "anonymous",
            "roles": ["agent"],
            "permissions": [],
        }
    
    # 优先使用 JWT
    if jwt_user:
        return jwt_user
    
    # 其次使用 API Key
    if api_key_user:
        return api_key_user
    
    raise HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Not authenticated",
        headers={"WWW-Authenticate": "Bearer"},
    )


class AuthComposer:
    """认证组合器
    
    整合 JWT、API Key 和 RBAC 的统一接口
    """
    
    def __init__(self):
        config = get_config()
        self.jwt_manager = JWTManager(
            secret_key=config.auth.secret_key,
            algorithm=config.auth.algorithm,
            access_token_expire_minutes=config.auth.access_token_expire_minutes,
        )
        self.api_key_manager = APIKeyManager()
        self.rbac_checker = RBACChecker()
    
    def create_token(
        self,
        user_id: str,
        roles: list[str],
        permissions: list[str] | None = None,
    ) -> str:
        """创建 Access Token"""
        return self.jwt_manager.create_access_token(
            user_id=user_id,
            roles=roles,
            permissions=permissions,
        )
    
    def verify_token(self, token: str) -> dict[str, Any]:
        """验证 Token"""
        return self.jwt_manager.verify_token(token)
    
    def generate_api_key(
        self,
        name: str,
        roles: list[str],
        permissions: list[str] | None = None,
    ) -> tuple[str, dict[str, Any]]:
        """生成 API Key"""
        raw_key, key_info = self.api_key_manager.generate_key(
            name=name,
            roles=roles,
            permissions=permissions,
        )
        return raw_key, {
            "id": key_info.id,
            "name": key_info.name,
            "prefix": key_info.prefix,
            "roles": key_info.roles,
        }
    
    def check_permission(
        self,
        user_context: dict[str, Any],
        resource: str,
        action: ActionType,
    ) -> bool:
        """检查权限"""
        return self.rbac_checker.check_permission(
            user_context=user_context,
            resource=resource,
            action=action,
        )
    
    def require_permission(
        self,
        user_context: dict[str, Any],
        resource: str,
        action: ActionType,
    ) -> None:
        """要求权限(无权限抛出异常)"""
        if not self.check_permission(user_context, resource, action):
            raise PermissionError(
                f"Permission denied: {resource}:{action.value}"
            )

使用示例

生成 Token

from mcp_gateway_core.auth import AuthComposer

auth = AuthComposer()

# 为用户创建 Token
token = auth.create_token(
    user_id="user123",
    roles=["developer"],
    permissions=["tool:custom_tool"],
)

# 为 Agent 生成 API Key
raw_key, key_info = auth.generate_api_key(
    name="my-agent",
    roles=["agent"],
)

print(f"API Key: {raw_key}")  # 只显示一次
print(f"Key Info: {key_info}")

在请求中使用

# 使用 JWT Token
curl -H "Authorization: Bearer eyJ..." http://localhost:8000/mcp

# 使用 API Key
curl -H "X-API-Key: mcp_xxx..." http://localhost:8000/mcp

权限检查

from mcp_gateway_core.auth import RBACChecker, ActionType

checker = RBACChecker()

user = {
    "sub": "user123",
    "roles": ["developer"],
    "permissions": ["tool:admin_tool"],
}

# 检查权限
if checker.check_permission(user, "tool:echo", ActionType.EXECUTE):
    print("允许执行 echo 工具")

if checker.check_permission(user, "tool:admin_tool", ActionType.EXECUTE):
    print("允许执行 admin_tool 工具")

角色权限矩阵

角色tool:executetool:readresource:readresource:writeadmin
admin
developer
viewer
agent

设计亮点

特性说明面试价值
双认证支持同时支持 JWT 和 API Key灵活的认证设计
API Key 安全只存哈希,前缀识别安全最佳实践
RBAC 模型角色-权限-条件三维模型权限系统设计
条件权限支持资源级别条件判断细粒度权限控制

小结

本章实现了完整的认证授权体系,包括 JWT Token、API Key 双认证机制和 RBAC 权限模型。

关键要点

  1. JWT Manager 提供 Token 的生成、验证和刷新功能
  2. API Key Manager 使用安全的哈希存储机制
  3. RBAC 支持角色继承和条件权限
  4. AuthComposer 提供统一的认证接口

下一章我们将实现中间件层,包括限流和请求日志。

参考资料

分享: