← 返回文章列表

从零到一实现生产级 MCP Gateway(七):可观测性

2025-04-07·5 分钟阅读

从零到一实现生产级 MCP Gateway(七):可观测性实现

前言

可观测性是生产级系统的重要保障,包括日志(Logging)、指标(Metrics)和追踪(Tracing)三大支柱。本章将深入实现结构化日志、Prometheus 指标和 OpenTelemetry 分布式追踪,为 MCP Gateway 构建完整的可观测性体系。

可观测性架构

┌─────────────────────────────────────────────────────────────────────┐
│                    Observability Architecture                        │
│                                                                      │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                     MCP Gateway                                │  │
│  │                                                                │  │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐           │  │
│  │  │  Logging    │  │  Metrics    │  │  Tracing    │           │  │
│  │  │  (JSON)     │  │ (Prometheus)│  │ (OpenTel)   │           │  │
│  │  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘           │  │
│  │         │                │                │                   │  │
│  └─────────┼────────────────┼────────────────┼───────────────────┘  │
│            │                │                │                       │
│            ▼                ▼                ▼                       │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                 │
│  │    ELK      │  │ Prometheus  │  │   Jaeger    │                 │
│  │   Stack     │  │ + Grafana   │  │   Tempo     │                 │
│  └─────────────┘  └─────────────┘  └─────────────┘                 │
│                                                                      │
│  ┌───────────────────────────────────────────────────────────────┐  │
│  │                    Unified Dashboard                           │  │
│  │  ┌─────────────────────────────────────────────────────────┐  │  │
│  │  │  Grafana Dashboard (Logs + Metrics + Traces)            │  │  │
│  │  └─────────────────────────────────────────────────────────┘  │  │
│  └───────────────────────────────────────────────────────────────┘  │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

结构化日志

JSON 日志实现

# observability/logging.py

from __future__ import annotations
import json
import logging
import sys
from datetime import datetime, timezone
from typing import Any

from ..config import get_config


class JSONFormatter(logging.Formatter):
    """JSON 格式化器
    
    输出结构化的 JSON 日志,便于日志聚合和分析。
    """
    
    def format(self, record: logging.LogRecord) -> str:
        """格式化日志记录为 JSON"""
        # 基础字段
        log_data = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        
        # 添加位置信息
        log_data["location"] = {
            "file": record.filename,
            "line": record.lineno,
            "function": record.funcName,
        }
        
        # 添加额外字段
        if hasattr(record, "request_id"):
            log_data["request_id"] = record.request_id
        
        if hasattr(record, "user_id"):
            log_data["user_id"] = record.user_id
        
        if hasattr(record, "trace_id"):
            log_data["trace_id"] = record.trace_id
        
        if hasattr(record, "span_id"):
            log_data["span_id"] = record.span_id
        
        # 添加异常信息
        if record.exc_info:
            log_data["exception"] = {
                "type": record.exc_info[0].__name__ if record.exc_info[0] else None,
                "message": str(record.exc_info[1]) if record.exc_info[1] else None,
                "traceback": self.formatException(record.exc_info),
            }
        
        # 添加额外属性
        extra_attrs = {}
        for key, value in record.__dict__.items():
            if key not in {
                "name", "msg", "args", "created", "filename", "funcName",
                "levelname", "levelno", "lineno", "module", "msecs",
                "pathname", "process", "processName", "relativeCreated",
                "stack_info", "exc_info", "exc_text", "message",
                "request_id", "user_id", "trace_id", "span_id",
            }:
                try:
                    json.dumps(value)  # 检查是否可序列化
                    extra_attrs[key] = value
                except (TypeError, ValueError):
                    extra_attrs[key] = str(value)
        
        if extra_attrs:
            log_data["extra"] = extra_attrs
        
        return json.dumps(log_data)


class ContextFilter(logging.Filter):
    """上下文过滤器
    
    从上下文变量中提取请求 ID、用户 ID 等信息。
    """
    
    def filter(self, record: logging.LogRecord) -> bool:
        """添加上下文信息到日志记录"""
        # 尝试从上下文获取信息
        from .tracing import get_current_span
        
        span = get_current_span()
        if span:
            record.trace_id = span.trace_id
            record.span_id = span.span_id
        
        return True


def setup_logging(
    level: str = "INFO",
    json_format: bool = True,
) -> None:
    """设置日志
    
    Args:
        level: 日志级别
        json_format: 是否使用 JSON 格式
    """
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, level.upper()))
    
    # 移除已有的处理器
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    
    # 创建控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    
    if json_format:
        console_handler.setFormatter(JSONFormatter())
    else:
        console_handler.setFormatter(
            logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
        )
    
    # 添加上下文过滤器
    console_handler.addFilter(ContextFilter())
    
    root_logger.addHandler(console_handler)


def get_logger(name: str) -> logging.Logger:
    """获取日志器"""
    return logging.getLogger(name)

日志上下文管理

import contextvars
from contextvars import ContextVar

# 上下文变量
request_id_var: ContextVar[str | None] = ContextVar("request_id", default=None)
user_id_var: ContextVar[str | None] = ContextVar("user_id", default=None)


class LogContext:
    """日志上下文管理器"""
    
    def __init__(
        self,
        request_id: str | None = None,
        user_id: str | None = None,
        **extra: Any,
    ):
        self.request_id = request_id
        self.user_id = user_id
        self.extra = extra
        self._tokens: list = []
    
    def __enter__(self):
        if self.request_id:
            self._tokens.append(request_id_var.set(self.request_id))
        if self.user_id:
            self._tokens.append(user_id_var.set(self.user_id))
        return self
    
    def __exit__(self, *args):
        for token in reversed(self._tokens):
            try:
                contextvar = request_id_var if "request" in str(token) else user_id_var
                contextvar.reset(token)
            except Exception:
                pass


# 使用示例
def log_with_context():
    with LogContext(request_id="abc123", user_id="user1"):
        logger = get_logger("mcp")
        logger.info("Processing request")  # 自动包含 request_id 和 user_id

Prometheus 指标

指标定义

# observability/metrics.py

from __future__ import annotations
import logging
from typing import Any

from prometheus_client import (
    Counter,
    Gauge,
    Histogram,
    Info,
    CollectorRegistry,
    generate_latest,
    CONTENT_TYPE_LATEST,
)

logger = logging.getLogger(__name__)


class MCPMetrics:
    """MCP Gateway Prometheus 指标
    
    定义和暴露所有业务指标。
    """
    
    def __init__(self, namespace: str = "mcp"):
        self.namespace = namespace
        self.registry = CollectorRegistry()
        
        # 工具相关指标
        self.tools_total = Gauge(
            f"{namespace}_tools_total",
            "Total number of registered tools",
            registry=self.registry,
        )
        
        self.tool_calls_total = Counter(
            f"{namespace}_tool_calls_total",
            "Total number of tool calls",
            ["tool_name", "status"],
            registry=self.registry,
        )
        
        self.tool_call_duration = Histogram(
            f"{namespace}_tool_call_duration_seconds",
            "Tool call duration in seconds",
            ["tool_name"],
            buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
            registry=self.registry,
        )
        
        # 资源相关指标
        self.resources_total = Gauge(
            f"{namespace}_resources_total",
            "Total number of registered resources",
            registry=self.registry,
        )
        
        self.resource_reads_total = Counter(
            f"{namespace}_resource_reads_total",
            "Total number of resource reads",
            ["resource_type"],
            registry=self.registry,
        )
        
        # 提示词相关指标
        self.prompts_total = Gauge(
            f"{namespace}_prompts_total",
            "Total number of registered prompts",
            registry=self.registry,
        )
        
        # 请求相关指标
        self.requests_total = Counter(
            f"{namespace}_requests_total",
            "Total number of HTTP requests",
            ["method", "path", "status"],
            registry=self.registry,
        )
        
        self.request_duration = Histogram(
            f"{namespace}_request_duration_seconds",
            "HTTP request duration in seconds",
            ["method", "path"],
            buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],
            registry=self.registry,
        )
        
        # 认证相关指标
        self.auth_attempts_total = Counter(
            f"{namespace}_auth_attempts_total",
            "Total number of authentication attempts",
            ["type", "status"],
            registry=self.registry,
        )
        
        # 限流相关指标
        self.rate_limit_hits_total = Counter(
            f"{namespace}_rate_limit_hits_total",
            "Total number of rate limit hits",
            ["client_type"],
            registry=self.registry,
        )
        
        # 错误指标
        self.errors_total = Counter(
            f"{namespace}_errors_total",
            "Total number of errors",
            ["type", "component"],
            registry=self.registry,
        )
        
        # 服务信息
        self.service_info = Info(
            f"{namespace}_service",
            "MCP Gateway service information",
            registry=self.registry,
        )
    
    def set_service_info(self, version: str, **labels: str) -> None:
        """设置服务信息"""
        self.service_info.info({
            "version": version,
            **labels,
        })
    
    def record_tool_call(
        self,
        tool_name: str,
        status: str,
        duration: float,
    ) -> None:
        """记录工具调用"""
        self.tool_calls_total.labels(
            tool_name=tool_name,
            status=status,
        ).inc()
        
        self.tool_call_duration.labels(
            tool_name=tool_name,
        ).observe(duration)
    
    def record_request(
        self,
        method: str,
        path: str,
        status: int,
        duration: float,
    ) -> None:
        """记录 HTTP 请求"""
        self.requests_total.labels(
            method=method,
            path=path,
            status=str(status),
        ).inc()
        
        self.request_duration.labels(
            method=method,
            path=path,
        ).observe(duration)
    
    def record_auth_attempt(
        self,
        auth_type: str,
        success: bool,
    ) -> None:
        """记录认证尝试"""
        self.auth_attempts_total.labels(
            type=auth_type,
            status="success" if success else "failure",
        ).inc()
    
    def record_rate_limit_hit(self, client_type: str = "ip") -> None:
        """记录限流命中"""
        self.rate_limit_hits_total.labels(
            client_type=client_type,
        ).inc()
    
    def record_error(
        self,
        error_type: str,
        component: str,
    ) -> None:
        """记录错误"""
        self.errors_total.labels(
            type=error_type,
            component=component,
        ).inc()
    
    def update_registry_counts(
        self,
        tools: int,
        resources: int,
        prompts: int,
    ) -> None:
        """更新注册中心计数"""
        self.tools_total.set(tools)
        self.resources_total.set(resources)
        self.prompts_total.set(prompts)
    
    def get_metrics(self) -> bytes:
        """获取指标数据"""
        return generate_latest(self.registry)
    
    def get_content_type(self) -> str:
        """获取内容类型"""
        return CONTENT_TYPE_LATEST


# 全局指标实例
_metrics: MCPMetrics | None = None


def setup_metrics(namespace: str = "mcp") -> MCPMetrics:
    """设置指标"""
    global _metrics
    _metrics = MCPMetrics(namespace=namespace)
    return _metrics


def get_metrics() -> MCPMetrics:
    """获取指标实例"""
    if _metrics is None:
        _metrics = MCPMetrics()
    return _metrics

OpenTelemetry 分布式追踪

Span 数据结构

# observability/tracing.py

from __future__ import annotations
import functools
import logging
import time
import uuid
from contextlib import contextmanager
from contextvars import ContextVar
from dataclasses import dataclass, field
from typing import Any, Callable, TypeVar

logger = logging.getLogger(__name__)

# 上下文变量
current_span_var: ContextVar["Span | None"] = ContextVar("current_span", default=None)
trace_context_var: ContextVar[dict[str, Any]] = ContextVar("trace_context", default=None)

# OpenTelemetry 可选导入
try:
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.sdk.resources import Resource
    
    OTEL_AVAILABLE = True
except ImportError:
    OTEL_AVAILABLE = False
    trace = None

F = TypeVar("F", bound=Callable[..., Any])


@dataclass
class Span:
    """追踪 Span 数据结构
    
    代表一个工作单元的追踪信息。
    """
    trace_id: str
    span_id: str
    parent_span_id: str | None = None
    name: str = ""
    start_time: float = field(default_factory=time.perf_counter)
    end_time: float | None = None
    attributes: dict[str, Any] = field(default_factory=dict)
    events: list[dict[str, Any]] = field(default_factory=list)
    status: str = "OK"
    
    def set_attribute(self, key: str, value: Any) -> None:
        """设置属性"""
        self.attributes[key] = value
    
    def add_event(
        self, 
        name: str, 
        attributes: dict[str, Any] | None = None
    ) -> None:
        """添加事件"""
        self.events.append({
            "name": name,
            "timestamp": time.perf_counter(),
            "attributes": attributes or {},
        })
    
    def set_status(
        self, 
        status: str, 
        description: str | None = None
    ) -> None:
        """设置状态"""
        self.status = status
        if description:
            self.attributes["status_description"] = description
    
    def record_exception(self, exc: Exception) -> None:
        """记录异常"""
        self.set_status("ERROR", str(exc))
        self.add_event("exception", {
            "type": type(exc).__name__,
            "message": str(exc),
        })
    
    def finish(self) -> None:
        """结束 Span"""
        self.end_time = time.perf_counter()
    
    @property
    def duration_ms(self) -> float:
        """获取耗时(毫秒)"""
        if self.end_time is None:
            return 0.0
        return (self.end_time - self.start_time) * 1000

Tracer 实现

class Tracer:
    """追踪器
    
    支持 native 实现和 OpenTelemetry SDK。
    """
    
    def __init__(
        self, 
        service_name: str = "mcp-gateway",
        use_otel: bool = True
    ):
        self.service_name = service_name
        self._spans: list[Span] = []
        self._otel_tracer = None
        
        if use_otel and OTEL_AVAILABLE:
            try:
                self._otel_tracer = trace.get_tracer(service_name)
            except Exception as e:
                logger.warning(f"Failed to initialize OpenTelemetry tracer: {e}")
    
    def _generate_id(self) -> str:
        """生成唯一 ID"""
        return uuid.uuid4().hex[:16]
    
    def start_span(
        self,
        name: str,
        parent: Span | None = None,
        attributes: dict[str, Any] | None = None,
    ) -> Span:
        """创建新的 Span"""
        # 从上下文获取父 Span
        if parent is None:
            parent = current_span_var.get()
        
        span = Span(
            trace_id=parent.trace_id if parent else self._generate_id() + self._generate_id(),
            span_id=self._generate_id(),
            parent_span_id=parent.span_id if parent else None,
            name=name,
            attributes={"service": self.service_name, **(attributes or {})},
        )
        
        return span
    
    @contextmanager
    def trace(self, name: str, **attributes: Any):
        """追踪上下文管理器"""
        span = self.start_span(name, attributes=attributes)
        token = current_span_var.set(span)
        
        # OpenTelemetry Span
        otel_span = None
        if self._otel_tracer:
            try:
                otel_span = self._otel_tracer.start_as_current_span(name)
                for k, v in attributes.items():
                    otel_span.set_attribute(k, v)
            except Exception as e:
                logger.debug(f"Failed to start OpenTelemetry span: {e}")
        
        try:
            yield span
        except Exception as e:
            span.record_exception(e)
            if otel_span:
                try:
                    otel_span.record_exception(e)
                    otel_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
                except Exception:
                    pass
            raise
        finally:
            span.finish()
            self._spans.append(span)
            current_span_var.reset(token)
            
            if otel_span:
                try:
                    otel_span.end()
                except Exception:
                    pass
    
    def get_traces(self) -> list[list[dict[str, Any]]]:
        """获取所有追踪数据"""
        traces = {}
        
        for span in self._spans:
            if span.trace_id not in traces:
                traces[span.trace_id] = []
            
            traces[span.trace_id].append({
                "trace_id": span.trace_id,
                "span_id": span.span_id,
                "parent_span_id": span.parent_span_id,
                "name": span.name,
                "duration_ms": span.duration_ms,
                "attributes": span.attributes,
                "events": span.events,
                "status": span.status,
            })
        
        return list(traces.values())
    
    def clear(self) -> None:
        """清空追踪数据"""
        self._spans.clear()


# 全局 Tracer
_tracer: Tracer | None = None


def setup_tracing(
    service_name: str = "mcp-gateway",
    use_otel: bool = True,
    otlp_endpoint: str | None = None,
) -> Tracer:
    """设置追踪"""
    global _tracer
    
    # 配置 OpenTelemetry
    if use_otel and OTEL_AVAILABLE and otlp_endpoint:
        try:
            from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
            
            resource = Resource.create({"service.name": service_name})
            provider = TracerProvider(resource=resource)
            
            exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
            provider.add_span_processor(BatchSpanProcessor(exporter))
            
            trace.set_tracer_provider(provider)
            logger.info(f"OpenTelemetry configured with OTLP: {otlp_endpoint}")
        except Exception as e:
            logger.warning(f"Failed to configure OTLP: {e}")
    
    _tracer = Tracer(service_name=service_name, use_otel=use_otel)
    return _tracer


def get_tracer() -> Tracer:
    """获取 Tracer"""
    global _tracer
    if _tracer is None:
        _tracer = Tracer()
    return _tracer


@contextmanager
def trace_span(name: str, **attributes: Any):
    """Span 上下文管理器"""
    tracer = get_tracer()
    with tracer.trace(name, **attributes) as span:
        yield span


def traced(
    name: str | None = None, 
    **attributes: Any
) -> Callable[[F], F]:
    """追踪装饰器
    
    Example:
        @traced("process_request")
        async def handle_request(request):
            ...
    """
    def decorator(func: F) -> F:
        span_name = name or func.__name__
        
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            with trace_span(span_name, **attributes) as span:
                span.set_attribute("function", func.__name__)
                span.set_attribute("module", func.__module__)
                
                try:
                    result = await func(*args, **kwargs)
                    return result
                except Exception as e:
                    span.record_exception(e)
                    raise
        
        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            with trace_span(span_name, **attributes) as span:
                span.set_attribute("function", func.__name__)
                span.set_attribute("module", func.__module__)
                
                try:
                    result = func(*args, **kwargs)
                    return result
                except Exception as e:
                    span.record_exception(e)
                    raise
        
        import asyncio
        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        return sync_wrapper
    
    return decorator


def get_current_span() -> Span | None:
    """获取当前 Span"""
    return current_span_var.get()


def set_span_attribute(key: str, value: Any) -> None:
    """设置当前 Span 属性"""
    span = get_current_span()
    if span:
        span.set_attribute(key, value)


def add_span_event(name: str, attributes: dict[str, Any] | None = None) -> None:
    """添加当前 Span 事件"""
    span = get_current_span()
    if span:
        span.add_event(name, attributes)

指标端点

# api/metrics.py

from fastapi import APIRouter, Response

from ..observability.metrics import get_metrics

router = APIRouter()


@router.get("/metrics")
async def prometheus_metrics() -> Response:
    """Prometheus 指标端点"""
    metrics = get_metrics()
    return Response(
        content=metrics.get_metrics(),
        media_type=metrics.get_content_type(),
    )

使用示例

记录工具调用指标

from mcp_gateway_core.observability import get_metrics, trace_span
import time

async def execute_tool(name: str, arguments: dict):
    metrics = get_metrics()
    
    start_time = time.time()
    status = "success"
    
    try:
        with trace_span(f"tool.{name}", tool_name=name) as span:
            span.set_attribute("arguments", str(arguments))
            
            # 执行工具
            result = await tool_handler(**arguments)
            
            span.set_attribute("result_size", len(str(result)))
            return result
            
    except Exception as e:
        status = "error"
        raise
    finally:
        duration = time.time() - start_time
        metrics.record_tool_call(name, status, duration)

日志输出示例

{
  "timestamp": "2026-03-06T10:30:00+00:00",
  "level": "INFO",
  "logger": "mcp_gateway.http",
  "message": "POST /mcp - 200 - 15.23ms",
  "request_id": "ccf710cf-0850-4233-8b3f-4e6c5d2a1f9e",
  "user_id": "user123",
  "trace_id": "abc123def456",
  "span_id": "span789"
}

Prometheus 指标示例

# HELP mcp_tools_total Total number of registered tools
# TYPE mcp_tools_total gauge
mcp_tools_total 4

# HELP mcp_tool_calls_total Total number of tool calls
# TYPE mcp_tool_calls_total counter
mcp_tool_calls_total{tool_name="echo",status="success"} 150
mcp_tool_calls_total{tool_name="echo",status="error"} 2

# HELP mcp_tool_call_duration_seconds Tool call duration in seconds
# TYPE mcp_tool_call_duration_seconds histogram
mcp_tool_call_duration_seconds_bucket{tool_name="echo",le="0.01"} 100
mcp_tool_call_duration_seconds_bucket{tool_name="echo",le="0.05"} 145
mcp_tool_call_duration_seconds_bucket{tool_name="echo",le="0.1"} 148
mcp_tool_call_duration_seconds_bucket{tool_name="echo",le="+Inf"} 150

设计亮点

特性说明面试价值
JSON 结构化日志便于聚合和分析日志最佳实践
Prometheus 指标标准指标格式监控系统集成
OpenTelemetry厂商中立的追踪标准分布式追踪
上下文传播自动关联请求信息全链路追踪

小结

本章实现了完整的可观测性体系,包括结构化日志、Prometheus 指标和 OpenTelemetry 分布式追踪。

关键要点

  1. JSON 格式日志便于日志聚合和分析
  2. Prometheus 指标提供实时监控能力
  3. OpenTelemetry 追踪支持分布式系统调试
  4. 三者结合实现全链路可观测性

下一章我们将讨论生产实践,包括部署、配置和安全加固。

参考资料

分享: