Files
Starlight-apk 5e957096fa
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
feat: Phase 1 - 安全中间件 + 运维工具箱
新增 oss/core/security/ 模块(852行):
- jwt_auth.py: JWT签发/验证(HMAC-SHA256,零外部依赖)
- csrf.py: CSRF Token生成与校验
- input_validator.py: JSON Schema校验+类型强制
- tls.py: 自签名证书生成+SSL上下文

新增 oss/core/ops/ 模块:
- health.py: 增强版/health端点(CPU/内存/磁盘/运行时间)
- metrics.py: Prometheus兼容/metrics端点

对接改造:
- engine.py: 导出新模块
- manager.py: 注册/api/login /health /metrics路由
- middleware.py: CSRF+InputValidation中间件
- config.py: JWT_SECRET/CSRF_SECRET等配置项
- security.py→security/__init__.py: 合并插件沙箱与HTTP安全
2026-05-17 15:42:40 +08:00

51 lines
1.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CSRF 防护 — Token 校验中间件"""
import secrets
import time
import hashlib
from typing import Optional
from oss.config import get_config
from oss.logger.logger import Log
class CSRFProtection:
"""CSRF Token 生成与验证"""
def __init__(self, secret: str = None):
config = get_config()
self._secret = secret or config.get("CSRF_SECRET", "")
if not self._secret:
self._secret = hashlib.sha256(config.get("API_KEY", "nebula-csrf-default").encode()).hexdigest()
self._token_ttl = config.get("CSRF_TOKEN_TTL", 3600) # 默认1小时
def generate_token(self, session_id: str) -> str:
"""生成 CSRF Token绑定 session"""
salt = secrets.token_hex(16)
timestamp = int(time.time())
raw = f"{session_id}:{salt}:{timestamp}:{self._secret}"
token = hashlib.sha256(raw.encode()).hexdigest()
return f"{timestamp}:{salt}:{token}"
def verify_token(self, session_id: str, token: str) -> bool:
"""验证 CSRF Token"""
try:
parts = token.split(":")
if len(parts) != 3:
return False
timestamp, salt, hash_val = parts
# 检查过期
if int(time.time()) - int(timestamp) > self._token_ttl:
return False
expected = hashlib.sha256(f"{session_id}:{salt}:{timestamp}:{self._secret}".encode()).hexdigest()
return hash_val == expected
except (ValueError, IndexError):
return False
SAFE_METHODS = {"GET", "HEAD", "OPTIONS"}
@staticmethod
def is_safe_method(method: str) -> bool:
return method.upper() in CSRFProtection.SAFE_METHODS