"""Prometheus 兼容的 /metrics 端点""" import time import json from collections import defaultdict from typing import Optional class MetricsCollector: """轻量级指标收集器,输出 Prometheus 兼容格式""" def __init__(self): self._counters: dict[str, int] = defaultdict(int) self._gauges: dict[str, float] = {} self._histograms: dict[str, list[float]] = defaultdict(list) self._start_time = time.time() def inc(self, name: str, labels: dict = None, value: int = 1): """增加计数器""" key = self._label_key(name, labels) self._counters[key] += value def set_gauge(self, name: str, value: float, labels: dict = None): """设置 gauge 值""" key = self._label_key(name, labels) self._gauges[key] = value def observe(self, name: str, value: float, labels: dict = None): """记录直方图观测值""" key = self._label_key(name, labels) self._histograms[key].append(value) def render(self) -> str: """渲染为 Prometheus 文本格式""" lines = [] now = time.time() # HELP / TYPE 注释 seen = set() for key in self._counters: metric_name = key.split("{")[0] if "{" in key else key if metric_name not in seen: lines.append(f"# HELP {metric_name} Counter metric") lines.append(f"# TYPE {metric_name} counter") seen.add(metric_name) for key in self._gauges: metric_name = key.split("{")[0] if "{" in key else key if metric_name not in seen: lines.append(f"# HELP {metric_name} Gauge metric") lines.append(f"# TYPE {metric_name} gauge") seen.add(metric_name) for key in self._histograms: metric_name = key.split("{")[0] if "{" in key else key if metric_name not in seen: lines.append(f"# HELP {metric_name} Histogram metric") lines.append(f"# TYPE {metric_name} histogram") seen.add(metric_name) # 计数器 for key, val in sorted(self._counters.items()): lines.append(f"{key} {val}") # Gauges for key, val in sorted(self._gauges.items()): lines.append(f"{key} {val}") # 直方图 buckets = [0.01, 0.05, 0.1, 0.5, 1.0, 5.0] for key, vals in sorted(self._histograms.items()): metric_name = key.split("{")[0] total = len(vals) for b in buckets: le = sum(1 for v in vals if v <= b) lines.append(f'{metric_name}_bucket{{{key.split("{", 1)[1] if "{" in key else ""},le="{b}"}} {le}') lines.append(f'{metric_name}_bucket{{le="+Inf"}} {total}') lines.append(f"{metric_name}_count {total}") if total > 0: lines.append(f"{metric_name}_sum {sum(vals)}") lines.append(f"nebula_uptime_seconds {now - self._start_time}") return "\n".join(lines) + "\n" @staticmethod def _label_key(name: str, labels: dict = None) -> str: if not labels: return name parts = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items())) return f'{name}{{{parts}}}' # 全局单例 _collector: Optional[MetricsCollector] = None def get_metrics() -> MetricsCollector: global _collector if _collector is None: _collector = MetricsCollector() return _collector