Files
NebulaShell/oss/core/signature.py
Falck bce27db4ac 重大重构:引擎模块拆分 + P0插件实现 + 55个Bug修复
核心变更:
- engine.py(1781行)拆分为8个独立模块: lifecycle/security/deps/
  datastore/pl_injector/watcher/signature/manager
- 新增plugin-bridge: 事件总线 + 服务注册 + RPC通信
- 新增i18n: 国际化/多语言翻译支持
- 新增plugin-storage: 插件键值/文件存储
- 新增ws-api: WebSocket实时通信(pub/sub + 自定义处理器)
- nodejs-adapter统一为Plugin ABC模式

Bug修复:
- 修复load_all()中store_dir未定义崩溃
- 修复DependencyResolver入度计算(拓扑排序)
- 修复PermissionError隐藏内置异常
- 修复CORS中间件头部未附加到响应
- 修复IntegrityChecker跳过__pycache__目录
- 修复版本号不一致(v2.0.0→v1.2.0)
- 修复测试文件的Logger导入/路径/私有方法调用
- 修复context.py缺少typing导入
- 修复config.py STORE_DIR默认路径(./mods→./store)

测试覆盖: 14→91个测试, 全部通过
2026-05-12 11:40:06 +08:00

140 lines
5.7 KiB
Python

import hashlib
import json
import time
import base64
from pathlib import Path
from typing import Optional
from oss.config import get_config
from oss.logger.logger import Log
class SignatureError(Exception):
pass
class SignatureVerifier:
def __init__(self, key_dir: str = None):
config = get_config()
self.key_dir = Path(key_dir or str(config.get("SIGNATURE_KEYS_DIR", "./data/signature-verifier/keys")))
self.key_dir.mkdir(parents=True, exist_ok=True)
self.public_keys: dict[str, bytes] = {}
self._load_builtin_keys()
def _load_builtin_keys(self):
pub_dir = self.key_dir / "public"
if not pub_dir.exists():
return
for key_file in pub_dir.glob("*.pem"):
author_name = key_file.stem
self.public_keys[author_name] = key_file.read_bytes()
def _compute_plugin_hash(self, plugin_dir: Path) -> str:
hasher = hashlib.sha256()
files_to_hash = []
for file_path in sorted(plugin_dir.rglob("*")):
if file_path.is_file() and file_path.name != "SIGNATURE":
rel_path = file_path.relative_to(plugin_dir)
files_to_hash.append((str(rel_path), file_path))
for rel_path, file_path in files_to_hash:
hasher.update(rel_path.encode("utf-8"))
hasher.update(file_path.read_bytes())
return hasher.hexdigest()
def verify_plugin(self, plugin_dir: Path, author: str = "Falck") -> tuple[bool, str]:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
from cryptography.exceptions import InvalidSignature
signature_file = plugin_dir / "SIGNATURE"
if not signature_file.exists():
return False, f"Plugin missing signature file: {plugin_dir}"
try:
sig_data = json.loads(signature_file.read_text())
except json.JSONDecodeError as e:
return False, f"Signature file format error: {e}"
required_fields = ["signature", "signer", "algorithm", "timestamp"]
for field in required_fields:
if field not in sig_data:
return False, f"Signature missing required field: {field}"
signer = sig_data["signer"]
signature = base64.b64decode(sig_data["signature"])
if signer not in self.public_keys:
return False, f"Unknown signer: {signer}"
try:
public_key = serialization.load_pem_public_key(
self.public_keys[signer], backend=default_backend()
)
except Exception as e:
return False, f"Public key load failed: {e}"
current_hash = self._compute_plugin_hash(plugin_dir)
try:
signed_data = f"{author}:{current_hash}".encode("utf-8")
public_key.verify(
signature, signed_data,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
return True, f"Signature verified (signer: {signer})"
except InvalidSignature:
return False, f"Signature mismatch! Plugin may have been tampered with (signer: {signer})"
except Exception as e:
return False, f"Signature verification error: {e}"
def is_official_plugin(self, plugin_dir: Path) -> bool:
"""检查是否为官方插件(使用内置公钥验证)"""
result, _ = self.verify_plugin(plugin_dir, author="NebulaShell")
return result
class PluginSigner:
def __init__(self, private_key_path: str = None):
self.private_key = None
if private_key_path:
self.load_private_key(private_key_path)
def load_private_key(self, key_path: str):
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
with open(key_path, "rb") as f:
pem_data = f.read()
self.private_key = serialization.load_pem_private_key(
pem_data, password=None, backend=default_backend()
)
def sign_plugin(self, plugin_dir: Path, signer_name: str, author: str = "Falck") -> str:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.backends import default_backend
if not self.private_key:
raise ValueError("Private key not loaded")
hasher = hashlib.sha256()
files_to_hash = []
for file_path in sorted(plugin_dir.rglob("*")):
if file_path.is_file() and file_path.name not in ("SIGNATURE",):
rel_path = file_path.relative_to(plugin_dir)
files_to_hash.append((str(rel_path), file_path))
for rel_path, file_path in files_to_hash:
hasher.update(rel_path.encode("utf-8"))
hasher.update(file_path.read_bytes())
plugin_hash = hasher.hexdigest()
signed_data = f"{author}:{plugin_hash}".encode("utf-8")
signature = self.private_key.sign(
signed_data,
padding.PSS(mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
sig_data = {
"signature": base64.b64encode(signature).decode(),
"signer": signer_name,
"algorithm": "RSA-SHA256",
"timestamp": time.time(),
"plugin_hash": plugin_hash,
"author": author
}
signature_file = plugin_dir / "SIGNATURE"
signature_file.write_text(json.dumps(sig_data, indent=2))
return str(signature_file)