新增 oss/core/security/ 模块(852行): - jwt_auth.py: JWT签发/验证(HMAC-SHA256,零外部依赖) - csrf.py: CSRF Token生成与校验 - input_validator.py: JSON Schema校验+类型强制 - tls.py: 自签名证书生成+SSL上下文 新增 oss/core/ops/ 模块: - health.py: 增强版/health端点(CPU/内存/磁盘/运行时间) - metrics.py: Prometheus兼容/metrics端点 对接改造: - engine.py: 导出新模块 - manager.py: 注册/api/login /health /metrics路由 - middleware.py: CSRF+InputValidation中间件 - config.py: JWT_SECRET/CSRF_SECRET等配置项 - security.py→security/__init__.py: 合并插件沙箱与HTTP安全
99 lines
3.5 KiB
Python
99 lines
3.5 KiB
Python
"""Prometheus 兼容的 /metrics 端点"""
|
|
import time
|
|
import json
|
|
from collections import defaultdict
|
|
from typing import Optional
|
|
|
|
|
|
class MetricsCollector:
|
|
"""轻量级指标收集器,输出 Prometheus 兼容格式"""
|
|
|
|
def __init__(self):
|
|
self._counters: dict[str, int] = defaultdict(int)
|
|
self._gauges: dict[str, float] = {}
|
|
self._histograms: dict[str, list[float]] = defaultdict(list)
|
|
self._start_time = time.time()
|
|
|
|
def inc(self, name: str, labels: dict = None, value: int = 1):
|
|
"""增加计数器"""
|
|
key = self._label_key(name, labels)
|
|
self._counters[key] += value
|
|
|
|
def set_gauge(self, name: str, value: float, labels: dict = None):
|
|
"""设置 gauge 值"""
|
|
key = self._label_key(name, labels)
|
|
self._gauges[key] = value
|
|
|
|
def observe(self, name: str, value: float, labels: dict = None):
|
|
"""记录直方图观测值"""
|
|
key = self._label_key(name, labels)
|
|
self._histograms[key].append(value)
|
|
|
|
def render(self) -> str:
|
|
"""渲染为 Prometheus 文本格式"""
|
|
lines = []
|
|
now = time.time()
|
|
|
|
# HELP / TYPE 注释
|
|
seen = set()
|
|
for key in self._counters:
|
|
metric_name = key.split("{")[0] if "{" in key else key
|
|
if metric_name not in seen:
|
|
lines.append(f"# HELP {metric_name} Counter metric")
|
|
lines.append(f"# TYPE {metric_name} counter")
|
|
seen.add(metric_name)
|
|
for key in self._gauges:
|
|
metric_name = key.split("{")[0] if "{" in key else key
|
|
if metric_name not in seen:
|
|
lines.append(f"# HELP {metric_name} Gauge metric")
|
|
lines.append(f"# TYPE {metric_name} gauge")
|
|
seen.add(metric_name)
|
|
for key in self._histograms:
|
|
metric_name = key.split("{")[0] if "{" in key else key
|
|
if metric_name not in seen:
|
|
lines.append(f"# HELP {metric_name} Histogram metric")
|
|
lines.append(f"# TYPE {metric_name} histogram")
|
|
seen.add(metric_name)
|
|
|
|
# 计数器
|
|
for key, val in sorted(self._counters.items()):
|
|
lines.append(f"{key} {val}")
|
|
|
|
# Gauges
|
|
for key, val in sorted(self._gauges.items()):
|
|
lines.append(f"{key} {val}")
|
|
|
|
# 直方图
|
|
buckets = [0.01, 0.05, 0.1, 0.5, 1.0, 5.0]
|
|
for key, vals in sorted(self._histograms.items()):
|
|
metric_name = key.split("{")[0]
|
|
total = len(vals)
|
|
for b in buckets:
|
|
le = sum(1 for v in vals if v <= b)
|
|
lines.append(f'{metric_name}_bucket{{{key.split("{", 1)[1] if "{" in key else ""},le="{b}"}} {le}')
|
|
lines.append(f'{metric_name}_bucket{{le="+Inf"}} {total}')
|
|
lines.append(f"{metric_name}_count {total}")
|
|
if total > 0:
|
|
lines.append(f"{metric_name}_sum {sum(vals)}")
|
|
|
|
lines.append(f"nebula_uptime_seconds {now - self._start_time}")
|
|
return "\n".join(lines) + "\n"
|
|
|
|
@staticmethod
|
|
def _label_key(name: str, labels: dict = None) -> str:
|
|
if not labels:
|
|
return name
|
|
parts = ",".join(f'{k}="{v}"' for k, v in sorted(labels.items()))
|
|
return f'{name}{{{parts}}}'
|
|
|
|
|
|
# 全局单例
|
|
_collector: Optional[MetricsCollector] = None
|
|
|
|
|
|
def get_metrics() -> MetricsCollector:
|
|
global _collector
|
|
if _collector is None:
|
|
_collector = MetricsCollector()
|
|
return _collector
|