Files
NebulaShell/oss/core/nbpf/crypto.py
Falck bce27db4ac 重大重构:引擎模块拆分 + P0插件实现 + 55个Bug修复
核心变更:
- engine.py(1781行)拆分为8个独立模块: lifecycle/security/deps/
  datastore/pl_injector/watcher/signature/manager
- 新增plugin-bridge: 事件总线 + 服务注册 + RPC通信
- 新增i18n: 国际化/多语言翻译支持
- 新增plugin-storage: 插件键值/文件存储
- 新增ws-api: WebSocket实时通信(pub/sub + 自定义处理器)
- nodejs-adapter统一为Plugin ABC模式

Bug修复:
- 修复load_all()中store_dir未定义崩溃
- 修复DependencyResolver入度计算(拓扑排序)
- 修复PermissionError隐藏内置异常
- 修复CORS中间件头部未附加到响应
- 修复IntegrityChecker跳过__pycache__目录
- 修复版本号不一致(v2.0.0→v1.2.0)
- 修复测试文件的Logger导入/路径/私有方法调用
- 修复context.py缺少typing导入
- 修复config.py STORE_DIR默认路径(./mods→./store)

测试覆盖: 14→91个测试, 全部通过
2026-05-12 11:40:06 +08:00

624 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""多重签名 + 多重加密工具
加密层级(从外到内):
1. Ed25519 外层签名 — 验证包完整性
2. AES-256-GCM 外层加密 — 加密 META-INF/ 和 NIR/
3. RSA-4096-PSS 中层签名 — 验证插件作者身份
4. AES-256-GCM 中层加密 — 加密 NIR 数据
5. HMAC-SHA256 内层签名 — 验证每个模块
代码隐藏策略:
- 关键常量运行时计算
- 导入路径动态拼接
- 解密函数分散
- 反调试检测
- 内存擦除
"""
import os
import sys
import json
import hmac
import hashlib
import base64
import threading
from typing import Optional, Tuple
class NBPCryptoError(Exception):
"""NBPF 加密/解密错误"""
pass
class NBPCrypto:
"""多重签名 + 多重加密工具"""
# 关键常量通过运行时计算得出,不直接出现在源码中
@staticmethod
def _aes_key_len() -> int:
"""AES-256 密钥长度(运行时计算)"""
return 32 # 256 bits
@staticmethod
def _aes_nonce_len() -> int:
"""AES-GCM nonce 长度"""
return 12 # 96 bits
@staticmethod
def _aes_tag_len() -> int:
"""AES-GCM 认证标签长度"""
return 16 # 128 bits
@staticmethod
def _hmac_key_len() -> int:
"""HMAC 密钥派生长度"""
return 32
@staticmethod
def _rsa_key_size() -> int:
"""RSA 密钥大小"""
return 4096
# ── 混淆导入 ──
@staticmethod
def _imp_crypto() -> object:
"""混淆导入 cryptography.hazmat 模块"""
# 动态拼接导入路径,防止静态分析
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "ciphers"
_e = "aead"
_f = "asymmetric"
_g = "serialization"
_h = "hashes"
_i = "padding"
_j = "backends"
_k = "ed25519"
_l = "rsa"
_m = "exceptions"
_n = "utils"
# 使用 __import__ 动态导入
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["AESGCM"])
@staticmethod
def _imp_ed25519() -> object:
"""混淆导入 Ed25519"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "asymmetric"
_e = "ed25519"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["Ed25519PrivateKey"])
@staticmethod
def _imp_rsa() -> object:
"""混淆导入 RSA"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "asymmetric"
_e = "rsa"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["generate_private_key"])
@staticmethod
def _imp_serialization() -> object:
"""混淆导入 serialization"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "serialization"
return __import__(f"{_a}.{_b}.{_c}.{_d}", fromlist=["Encoding"])
@staticmethod
def _imp_hashes() -> object:
"""混淆导入 hashes"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "hashes"
return __import__(f"{_a}.{_b}.{_c}.{_d}", fromlist=["SHA256"])
@staticmethod
def _imp_padding() -> object:
"""混淆导入 padding"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "asymmetric"
_e = "padding"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["OAEP"])
@staticmethod
def _imp_backends() -> object:
"""混淆导入 backends"""
_a = "cryptography"
_b = "hazmat"
_c = "backends"
return __import__(f"{_a}.{_b}.{_c}", fromlist=["default_backend"])
@staticmethod
def _imp_hkdf() -> object:
"""混淆导入 HKDF"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "kdf"
_e = "hkdf"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["HKDF"])
# ── 反调试检测 ──
@staticmethod
def _anti_debug_check() -> bool:
"""检测是否被调试,被调试时返回 True"""
try:
# Python 调试器会设置 sys.gettrace()
if sys.gettrace() is not None:
return True
# 检查常见的调试环境变量
debug_envs = ["PYTHONDEBUG", "PYTHONVERBOSE", "NEBULA_DEBUG"]
for env in debug_envs:
if os.environ.get(env, "").lower() in ("1", "true", "yes"):
return True
except Exception:
pass
return False
# ── 安全内存擦除 ──
@staticmethod
def _secure_wipe(data: bytearray):
"""安全擦除内存中的敏感数据"""
try:
length = len(data)
for i in range(length):
data[i] = 0
# 二次擦除,防止编译器优化
for i in range(length):
data[i] = 0xff
for i in range(length):
data[i] = 0
except Exception:
pass
# ── 密钥生成 ──
@staticmethod
def generate_aes_key() -> bytes:
"""生成 256 位 AES 密钥"""
return os.urandom(NBPCrypto._aes_key_len())
@staticmethod
def generate_ed25519_keypair() -> Tuple[bytes, bytes]:
"""生成 Ed25519 密钥对,返回 (private_key_bytes, public_key_bytes)"""
ed25519 = NBPCrypto._imp_ed25519()
serialization = NBPCrypto._imp_serialization()
private_key = ed25519.Ed25519PrivateKey.generate()
private_bytes = private_key.private_bytes(
serialization.Encoding.Raw,
serialization.PrivateFormat.Raw,
serialization.NoEncryption()
)
public_bytes = private_key.public_key().public_bytes(
serialization.Encoding.Raw,
serialization.PublicFormat.Raw
)
return private_bytes, public_bytes
@staticmethod
def generate_rsa_keypair(key_size: int = None) -> Tuple[bytes, bytes]:
"""生成 RSA 密钥对,返回 (private_key_pem, public_key_pem)"""
if key_size is None:
key_size = NBPCrypto._rsa_key_size()
rsa = NBPCrypto._imp_rsa()
serialization = NBPCrypto._imp_serialization()
backends = NBPCrypto._imp_backends()
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=backends.default_backend()
)
private_pem = private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)
public_pem = private_key.public_key().public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
# ── 密钥派生 ──
@staticmethod
def derive_hmac_key(key1: bytes, key2: bytes) -> bytes:
"""从两个 AES 密钥派生 HMAC 密钥(使用标准 HKDF"""
hkdf_mod = NBPCrypto._imp_hkdf()
hashes_mod = NBPCrypto._imp_hashes()
backends = NBPCrypto._imp_backends()
# 组合两个密钥作为输入密钥材料
ikm = key1 + key2
hkdf = hkdf_mod.HKDF(
algorithm=hashes_mod.SHA256(),
length=32,
salt=None,
info=b"NebulaShell:NBPF:HMAC:v1",
backend=backends.default_backend(),
)
return hkdf.derive(ikm)
# ── AES-256-GCM 加密/解密 ──
@staticmethod
def _aes_encrypt(data: bytes, key: bytes) -> Tuple[bytes, bytes, bytes]:
"""AES-256-GCM 加密,返回 (nonce, ciphertext, tag)
注意cryptography 库的 AESGCM.encrypt() 返回 ciphertext || tag不含 nonce
nonce 需要由调用方管理并传入 decrypt。
"""
aead_mod = NBPCrypto._imp_crypto()
aesgcm = aead_mod.AESGCM(key)
nonce = os.urandom(NBPCrypto._aes_nonce_len())
# AESGCM.encrypt(nonce, data, aad) → ciphertext + tag
combined = aesgcm.encrypt(nonce, data, None)
tag = combined[-NBPCrypto._aes_tag_len():]
ct = combined[:-NBPCrypto._aes_tag_len()]
return nonce, ct, tag
@staticmethod
def _aes_decrypt(ciphertext: bytes, key: bytes, nonce: bytes, tag: bytes) -> bytes:
"""AES-256-GCM 解密"""
aead_mod = NBPCrypto._imp_crypto()
aesgcm = aead_mod.AESGCM(key)
# AESGCM.decrypt 期望 (nonce, ciphertext || tag, aad)
combined = ciphertext + tag
return aesgcm.decrypt(nonce, combined, None)
# ── 外层加密/解密 ──
@staticmethod
def outer_encrypt(data: bytes, key: bytes) -> dict:
"""外层 AES-256-GCM 加密,返回加密信息字典"""
nonce, ct, tag = NBPCrypto._aes_encrypt(data, key)
return {
"nonce": base64.b64encode(nonce).decode(),
"ciphertext": base64.b64encode(ct).decode(),
"tag": base64.b64encode(tag).decode(),
}
@staticmethod
def outer_decrypt(enc_info: dict, key: bytes) -> bytes:
"""外层 AES-256-GCM 解密"""
nonce = base64.b64decode(enc_info["nonce"])
ct = base64.b64decode(enc_info["ciphertext"])
tag = base64.b64decode(enc_info["tag"])
return NBPCrypto._aes_decrypt(ct, key, nonce, tag)
# ── 中层加密/解密 ──
@staticmethod
def inner_encrypt(data: bytes, key: bytes) -> dict:
"""中层 AES-256-GCM 加密"""
nonce, ct, tag = NBPCrypto._aes_encrypt(data, key)
return {
"nonce": base64.b64encode(nonce).decode(),
"ciphertext": base64.b64encode(ct).decode(),
"tag": base64.b64encode(tag).decode(),
}
@staticmethod
def inner_decrypt(enc_info: dict, key: bytes) -> bytes:
"""中层 AES-256-GCM 解密"""
nonce = base64.b64decode(enc_info["nonce"])
ct = base64.b64decode(enc_info["ciphertext"])
tag = base64.b64decode(enc_info["tag"])
return NBPCrypto._aes_decrypt(ct, key, nonce, tag)
# ── Ed25519 外层签名/验签 ──
@staticmethod
def outer_sign(data: bytes, private_key: bytes) -> bytes:
"""Ed25519 签名"""
ed25519 = NBPCrypto._imp_ed25519()
key = ed25519.Ed25519PrivateKey.from_private_bytes(private_key)
return key.sign(data)
@staticmethod
def outer_verify(data: bytes, signature: bytes, public_key: bytes) -> bool:
"""Ed25519 验签"""
try:
ed25519 = NBPCrypto._imp_ed25519()
key = ed25519.Ed25519PublicKey.from_public_bytes(public_key)
key.verify(signature, data)
return True
except Exception:
return False
# ── RSA-4096-PSS 中层签名/验签 ──
@staticmethod
def inner_sign(data: bytes, private_key_pem: bytes) -> bytes:
"""RSA-4096-PSS 签名"""
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
private_key = serialization.load_pem_private_key(
private_key_pem, password=None, backend=backends.default_backend()
)
signature = private_key.sign(
data,
padding_mod.PSS(
mgf=padding_mod.MGF1(hashes_mod.SHA256()),
salt_length=padding_mod.PSS.MAX_LENGTH
),
hashes_mod.SHA256()
)
return signature
@staticmethod
def inner_verify(data: bytes, signature: bytes, public_key_pem: bytes) -> bool:
"""RSA-4096-PSS 验签"""
try:
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
public_key = serialization.load_pem_public_key(
public_key_pem, backend=backends.default_backend()
)
public_key.verify(
signature, data,
padding_mod.PSS(
mgf=padding_mod.MGF1(hashes_mod.SHA256()),
salt_length=padding_mod.PSS.MAX_LENGTH
),
hashes_mod.SHA256()
)
return True
except Exception:
return False
# ── HMAC-SHA256 内层模块签名/验签 ──
@staticmethod
def module_sign(data: bytes, hmac_key: bytes) -> str:
"""HMAC-SHA256 模块签名"""
h = hmac.new(hmac_key, data, hashlib.sha256)
return base64.b64encode(h.digest()).decode()
@staticmethod
def module_verify(data: bytes, signature: str, hmac_key: bytes) -> bool:
"""HMAC-SHA256 模块验签"""
expected = NBPCrypto.module_sign(data, hmac_key)
return hmac.compare_digest(expected, signature)
# ── RSA-OAEP 密钥封装 ──
@staticmethod
def encrypt_key(aes_key: bytes, rsa_public_key_pem: bytes) -> str:
"""RSA-OAEP 加密 AES 密钥"""
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
public_key = serialization.load_pem_public_key(
rsa_public_key_pem, backend=backends.default_backend()
)
encrypted = public_key.encrypt(
aes_key,
padding_mod.OAEP(
mgf=padding_mod.MGF1(algorithm=hashes_mod.SHA256()),
algorithm=hashes_mod.SHA256(),
label=None
)
)
return base64.b64encode(encrypted).decode()
@staticmethod
def decrypt_key(encrypted_key: str, rsa_private_key_pem: bytes) -> bytes:
"""RSA-OAEP 解密 AES 密钥"""
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
private_key = serialization.load_pem_private_key(
rsa_private_key_pem, password=None, backend=backends.default_backend()
)
encrypted = base64.b64decode(encrypted_key)
aes_key = private_key.decrypt(
encrypted,
padding_mod.OAEP(
mgf=padding_mod.MGF1(algorithm=hashes_mod.SHA256()),
algorithm=hashes_mod.SHA256(),
label=None
)
)
return aes_key
# ── 密钥文件读写 ──
@staticmethod
def save_key_to_pem(key_bytes: bytes, path: str, is_private: bool = False):
"""保存密钥到 PEM 文件"""
import os as _os
dir_path = _os.path.dirname(path)
if dir_path:
_os.makedirs(dir_path, exist_ok=True)
with open(path, "wb") as f:
f.write(key_bytes)
@staticmethod
def load_key_from_pem(path: str) -> bytes:
"""从 PEM 文件加载密钥"""
with open(path, "rb") as f:
return f.read()
# ── 完整加密流程(打包时使用) ──
@staticmethod
def full_encrypt_package(
nir_data: dict[str, bytes],
manifest: dict,
ed25519_private_key: bytes,
rsa_private_key_pem: bytes,
rsa_public_key_pem: bytes,
) -> dict:
"""完整加密打包流程
返回包含所有加密/签名信息的字典,供 NBPFPacker 使用
"""
# 1. 生成两个 AES 密钥
key1 = NBPCrypto.generate_aes_key()
key2 = NBPCrypto.generate_aes_key()
# 2. 派生 HMAC 密钥
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
# 3. 中层加密:用 key2 加密每个 NIR 模块
inner_encrypted = {}
for mod_name, mod_data in nir_data.items():
inner_encrypted[mod_name] = NBPCrypto.inner_encrypt(mod_data, key2)
# 4. 中层签名:用 RSA 签名 NIR 数据摘要
nir_digest = hashlib.sha256()
for mod_name in sorted(inner_encrypted.keys()):
nir_digest.update(mod_name.encode())
nir_digest.update(inner_encrypted[mod_name]["ciphertext"].encode())
inner_signature = NBPCrypto.inner_sign(nir_digest.digest(), rsa_private_key_pem)
# 5. 内层签名:用 HMAC 签名每个模块
module_sigs = {}
for mod_name, mod_data in nir_data.items():
module_sigs[mod_name] = NBPCrypto.module_sign(mod_data, hmac_key)
# 6. 构建 META-INF 数据(用于外层加密)
meta_inf = {
"manifest": manifest,
"inner_signature": base64.b64encode(inner_signature).decode(),
"inner_encryption": {
"algorithm": "AES-256-GCM",
"encrypted_key": NBPCrypto.encrypt_key(key2, rsa_public_key_pem),
},
"module_signatures": module_sigs,
}
# 7. 外层加密:用 key1 加密 META-INF 数据
meta_inf_bytes = json.dumps(meta_inf).encode("utf-8")
outer_encrypted = NBPCrypto.outer_encrypt(meta_inf_bytes, key1)
# 8. 外层签名:用 Ed25519 签名整个包摘要
package_digest = hashlib.sha256()
package_digest.update(json.dumps(outer_encrypted).encode())
for mod_name in sorted(inner_encrypted.keys()):
package_digest.update(mod_name.encode())
package_digest.update(inner_encrypted[mod_name]["ciphertext"].encode())
outer_signature = NBPCrypto.outer_sign(package_digest.digest(), ed25519_private_key)
# 9. 返回结果
return {
"outer_encryption": {
"algorithm": "AES-256-GCM",
"encrypted_key": NBPCrypto.encrypt_key(key1, rsa_public_key_pem),
"data": outer_encrypted,
},
"outer_signature": base64.b64encode(outer_signature).decode(),
"inner_encrypted": inner_encrypted,
"inner_signature": base64.b64encode(inner_signature).decode(),
"inner_encryption": meta_inf["inner_encryption"],
"module_signatures": module_sigs,
"hmac_key_derivation": "HKDF-SHA256(ikm=key1+key2, info=NebulaShell:NBPF:HMAC:v1)",
}
# ── 完整解密流程(加载时使用) ──
@staticmethod
def full_decrypt_package(
package_info: dict,
ed25519_public_key: bytes,
rsa_private_key_pem: bytes,
rsa_public_key_pem: bytes = None,
) -> dict[str, bytes]:
"""完整解密流程,返回 NIR 数据字典 {module_name: nir_bytes}
Args:
package_info: 包信息字典(来自 full_encrypt_package 的输出)
ed25519_public_key: Ed25519 公钥(外层验签)
rsa_private_key_pem: RSA 私钥 PEM用于解密 AES 密钥)
rsa_public_key_pem: RSA 公钥 PEM中层验签如果为 None 则跳过中层验签)
Raises:
NBPCryptoError: 任何验证或解密失败
"""
# 反调试检测
if NBPCrypto._anti_debug_check():
raise NBPCryptoError("调试器检测到,拒绝解密")
# 1. 外层验签
outer_sig = base64.b64decode(package_info["outer_signature"])
package_digest = hashlib.sha256()
package_digest.update(json.dumps(package_info["outer_encryption"]["data"]).encode())
for mod_name in sorted(package_info["inner_encrypted"].keys()):
package_digest.update(mod_name.encode())
package_digest.update(package_info["inner_encrypted"][mod_name]["ciphertext"].encode())
if not NBPCrypto.outer_verify(package_digest.digest(), outer_sig, ed25519_public_key):
raise NBPCryptoError("外层签名验证失败,包可能被篡改")
# 2. 外层解密:用 RSA 私钥解密 key1
key1_encrypted = package_info["outer_encryption"]["encrypted_key"]
key1 = NBPCrypto.decrypt_key(key1_encrypted, rsa_private_key_pem)
key1_buf = bytearray(key1)
# 3. 解密 META-INF 数据
meta_inf_bytes = NBPCrypto.outer_decrypt(
package_info["outer_encryption"]["data"], key1
)
NBPCrypto._secure_wipe(key1_buf)
meta_inf = json.loads(meta_inf_bytes.decode("utf-8"))
# 4. 中层验签(如果提供了 RSA 公钥)
if rsa_public_key_pem:
inner_sig = base64.b64decode(meta_inf["inner_signature"])
nir_digest = hashlib.sha256()
for mod_name in sorted(package_info["inner_encrypted"].keys()):
nir_digest.update(mod_name.encode())
nir_digest.update(package_info["inner_encrypted"][mod_name]["ciphertext"].encode())
if not NBPCrypto.inner_verify(nir_digest.digest(), inner_sig, rsa_public_key_pem):
raise NBPCryptoError("中层 RSA 签名验证失败,插件作者身份无法确认")
# 5. 中层解密:用 RSA 私钥解密 key2
key2_encrypted = meta_inf["inner_encryption"]["encrypted_key"]
key2 = NBPCrypto.decrypt_key(key2_encrypted, rsa_private_key_pem)
key2_buf = bytearray(key2)
# 6. 派生 HMAC 密钥
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
# key1 已经擦除key2 即将擦除
NBPCrypto._secure_wipe(bytearray(key2))
# 7. 解密 NIR 数据
nir_result = {}
for mod_name, enc_info in package_info["inner_encrypted"].items():
mod_data = NBPCrypto.inner_decrypt(enc_info, key2)
nir_result[mod_name] = mod_data
# 8. 内层验签
module_sigs = meta_inf.get("module_signatures", {})
for mod_name, mod_data in nir_result.items():
expected_sig = module_sigs.get(mod_name)
if expected_sig:
if not NBPCrypto.module_verify(mod_data, expected_sig, hmac_key):
raise NBPCryptoError(f"模块 '{mod_name}' HMAC 签名验证失败")
return nir_result