Files
NebulaShell/oss/core/security/__init__.py
Starlight-apk 5e957096fa
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
feat: Phase 1 - 安全中间件 + 运维工具箱
新增 oss/core/security/ 模块(852行):
- jwt_auth.py: JWT签发/验证(HMAC-SHA256,零外部依赖)
- csrf.py: CSRF Token生成与校验
- input_validator.py: JSON Schema校验+类型强制
- tls.py: 自签名证书生成+SSL上下文

新增 oss/core/ops/ 模块:
- health.py: 增强版/health端点(CPU/内存/磁盘/运行时间)
- metrics.py: Prometheus兼容/metrics端点

对接改造:
- engine.py: 导出新模块
- manager.py: 注册/api/login /health /metrics路由
- middleware.py: CSRF+InputValidation中间件
- config.py: JWT_SECRET/CSRF_SECRET等配置项
- security.py→security/__init__.py: 合并插件沙箱与HTTP安全
2026-05-17 15:42:40 +08:00

273 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""安全模块 — 插件沙箱 + HTTP 安全中间件
插件沙箱PluginProxy, IntegrityChecker, MemoryGuard, AuditLogger, TamperMonitor, FallbackManager
HTTP 安全JWT, CSRF, InputValidator, TLS
"""
# ── 插件沙箱(来自 oss/core/security.py ──
from __future__ import annotations
import threading
import hashlib
import time
import json
import functools
from pathlib import Path
from typing import Any, Optional, Callable, TYPE_CHECKING
from collections import deque
from oss.logger.logger import Log
if TYPE_CHECKING:
from oss.core.manager import PluginManager
class PluginPermissionError(Exception):
"""插件权限错误"""
pass
class PluginProxy:
"""插件代理 - 防止越级访问"""
def __init__(self, plugin_name: str, plugin_instance: Any, allowed_plugins: list[str], all_plugins: dict):
self._plugin_name = plugin_name
self._plugin_instance = plugin_instance
self._allowed_plugins = set(allowed_plugins)
self._all_plugins = all_plugins
def get_plugin(self, name: str) -> Any:
if name not in self._allowed_plugins and "*" not in self._allowed_plugins:
raise PluginPermissionError(f"插件 '{self._plugin_name}' 无权访问插件 '{name}'")
if name not in self._all_plugins:
return None
return self._all_plugins[name]["instance"]
def list_plugins(self) -> list[str]:
if "*" in self._allowed_plugins:
return list(self._all_plugins.keys())
return [n for n in self._allowed_plugins if n in self._all_plugins]
def get_capability(self, capability: str) -> Any:
return None
def __getattr__(self, name: str):
return getattr(self._plugin_instance, name)
class IntegrityChecker:
"""文件完整性检查"""
def __init__(self):
self._hashes: dict[str, str] = {}
def compute_hash(self, plugin_dir: Path) -> str:
hasher = hashlib.sha256()
for file_path in sorted(plugin_dir.rglob("*")):
if file_path.is_file() and "__pycache__" not in file_path.parts and file_path.name != "SIGNATURE":
rel_path = str(file_path.relative_to(plugin_dir))
hasher.update(rel_path.encode("utf-8"))
hasher.update(file_path.read_bytes())
return hasher.hexdigest()
def register(self, plugin_name: str, plugin_dir: Path):
self._hashes[plugin_name] = self.compute_hash(plugin_dir)
def verify(self, plugin_name: str, plugin_dir: Path) -> tuple[bool, str]:
if plugin_name not in self._hashes:
return False, f"插件 '{plugin_name}' 未注册完整性检查"
current = self.compute_hash(plugin_dir)
if current == self._hashes[plugin_name]:
return True, "完整性验证通过"
return False, f"文件 hash 不匹配,插件可能被篡改"
def get_hash(self, plugin_name: str) -> Optional[str]:
return self._hashes.get(plugin_name)
class MemoryGuard:
"""运行时内存保护 - 防止插件修改 Core 内部状态"""
FROZEN_ATTRS = {
"plugins", "capability_registry", "lifecycle_manager",
"dependency_resolver", "signature_verifier", "pl_injector",
"integrity_checker", "audit_logger", "tamper_monitor",
"fallback_manager", "http_server", "repl_shell",
}
def __init__(self, manager: PluginManager):
self._manager = manager
self._protected = True
def enable(self):
self._protected = True
def disable(self):
self._protected = False
def check_setattr(self, obj: Any, name: str, value: Any) -> bool:
if not self._protected:
return True
if obj is self._manager and name in self.FROZEN_ATTRS:
Log.warn("Core", f"内存防护: 阻止了对 Core 内部属性 '{name}' 的修改")
return False
return True
class AuditLogger:
"""插件行为审计"""
def __init__(self, max_logs: int = 1000):
self._logs: deque = deque(maxlen=max_logs)
self._enabled = True
def enable(self):
self._enabled = True
def disable(self):
self._enabled = False
def log(self, plugin_name: str, action: str, detail: str = ""):
if not self._enabled:
return
self._logs.append({"time": time.time(), "plugin": plugin_name, "action": action, "detail": detail})
def get_logs(self, plugin_name: str = None, limit: int = 50) -> list[dict]:
if plugin_name:
filtered = [log for log in self._logs if log["plugin"] == plugin_name]
else:
filtered = list(self._logs)
return filtered[-limit:]
def get_stats(self) -> dict:
stats: dict[str, int] = {}
for log in self._logs:
stats[log["plugin"]] = stats.get(log["plugin"], 0) + 1
return stats
class TamperMonitor:
"""防篡改监控"""
def __init__(self, manager: PluginManager, interval: int = 30):
self._manager = manager
self._interval = interval
self._running = False
self._thread = None
self._alerts: deque = deque(maxlen=100)
def start(self):
self._running = True
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
Log.info("Core", f"防篡改监控已启动 (间隔: {self._interval}s)")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=5)
def _monitor_loop(self):
while self._running:
try:
for plugin_name, info in self._manager.plugins.items():
plugin_dir = self._manager._get_plugin_dir(plugin_name)
if not plugin_dir:
continue
valid, msg = self._manager.integrity_checker.verify(plugin_name, plugin_dir)
if not valid:
alert = {"time": time.time(), "plugin": plugin_name, "message": msg}
self._alerts.append(alert)
Log.error("Core", f"防篡改告警: 插件 '{plugin_name}' 可能被篡改!")
try:
info["instance"].stop()
lifecycle = self._manager.lifecycle_manager.get(plugin_name)
if lifecycle:
lifecycle.mark_crashed()
except Exception as e:
Log.error("Core", f"停止被篡改插件 '{plugin_name}' 失败: {e}")
except Exception as e:
Log.error("Core", f"防篡改监控异常: {e}")
time.sleep(self._interval)
def get_alerts(self) -> list[dict]:
return list(self._alerts)
class FallbackManager:
"""降级恢复机制"""
def __init__(self, manager: PluginManager, max_retries: int = 3):
self._manager = manager
self._max_retries = max_retries
self._retry_counts: dict[str, int] = {}
self._degraded: set[str] = set()
def wrap_plugin_method(self, plugin_name: str, method: Callable) -> Callable:
@functools.wraps(method)
def safe_method(*args, **kwargs):
try:
return method(*args, **kwargs)
except Exception as e:
Log.error("Core", f"插件 '{plugin_name}' 方法 '{method.__name__}' 异常: {e}")
self._handle_crash(plugin_name)
return None
return safe_method
def _handle_crash(self, plugin_name: str):
retry_count = self._retry_counts.get(plugin_name, 0)
lifecycle = self._manager.lifecycle_manager.get(plugin_name)
bridge = self._manager._get_bridge()
if bridge and plugin_name != "plugin-bridge":
bridge.emit("plugin.crashed", name=plugin_name, retry=retry_count)
if retry_count < self._max_retries:
self._retry_counts[plugin_name] = retry_count + 1
Log.warn("Core", f"插件 '{plugin_name}' 崩溃,正在重启 (第 {retry_count + 1}/{self._max_retries} 次)")
try:
if lifecycle:
lifecycle.mark_crashed()
self._manager._restart_plugin(plugin_name)
if lifecycle:
lifecycle.start()
Log.ok("Core", f"插件 '{plugin_name}' 重启成功")
except Exception as e:
Log.error("Core", f"插件 '{plugin_name}' 重启失败: {e}")
else:
Log.error("Core", f"插件 '{plugin_name}' 超过最大重试次数,标记为降级")
self._degraded.add(plugin_name)
if lifecycle:
lifecycle.mark_degraded()
def recover(self, plugin_name: str) -> bool:
if plugin_name not in self._degraded:
return False
self._retry_counts[plugin_name] = 0
self._degraded.discard(plugin_name)
try:
self._manager._restart_plugin(plugin_name)
lifecycle = self._manager.lifecycle_manager.get(plugin_name)
if lifecycle:
lifecycle.start()
Log.ok("Core", f"插件 '{plugin_name}' 已手动恢复")
return True
except Exception as e:
Log.error("Core", f"恢复插件 '{plugin_name}' 失败: {e}")
return False
def is_degraded(self, plugin_name: str) -> bool:
return plugin_name in self._degraded
def get_degraded_plugins(self) -> list[str]:
return list(self._degraded)
# ── HTTP 安全中间件 ──
from .jwt_auth import JWTAuth, JWTError, issue_token, verify_token, get_jwt_auth
from .csrf import CSRFProtection
from .input_validator import InputValidator, ValidationError, get_validator
from .tls import TLSManager
__all__ = [
# 插件沙箱
"PluginPermissionError", "PluginProxy", "IntegrityChecker",
"MemoryGuard", "AuditLogger", "TamperMonitor", "FallbackManager",
# HTTP 安全
"JWTAuth", "JWTError", "issue_token", "verify_token", "get_jwt_auth",
"CSRFProtection",
"InputValidator", "ValidationError", "get_validator",
"TLSManager",
]