Files
NebulaShell/oss/core/nbpf/loader.py
Starlight-apk e67d2d8ef6
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
refactor: 优化 NBPF 模块 - 缓存导入/合并重复方法/减少I/O
- crypto.py: 8个_imp_*方法改为_ModuleCache类缓存导入
- crypto.py: outer/inner加解密合并为_layer_encrypt/decrypt
- crypto.py: 提取公共摘要计算方法,拆分长方法
- compiler.py: 删除_obfuscate_code中未使用的死代码
- loader.py: 3次ZIP扫描合并为1次缓存读取
- format.py: 更新为使用_ModuleCache
- 合计减少205行代码(1707→1502)
2026-05-17 15:36:45 +08:00

394 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
""".nbpf 加载器 — 加载 .nbpf 文件到运行时环境
加载流程:
1. 打开 .nbpf (ZIP) 文件
2. 外层验签:用 Ed25519 公钥验证包签名
3. 外层解密:用 RSA 私钥解密密钥1解密 META-INF/
4. 中层验签:用 RSA-4096 公钥验证 NIR 签名
5. 中层解密:用 RSA 私钥解密密钥2解密 NIR 数据
6. 内层验签:用 HMAC 验证每个模块签名
7. 反序列化 NIR 为 code object
8. 在受限沙箱中执行
9. 内存擦除所有密钥
"""
import json
import zipfile
import sys
import types
import hashlib
import base64
from pathlib import Path
from typing import Any, Optional
from oss.logger.logger import Log
from .crypto import NBPCrypto, NBPCryptoError
from .compiler import NIRCompiler, NIRCompileError
from .format import NBPFFormatter, NBPFFormatError
class NBPFLoadError(Exception):
""".nbpf 加载错误"""
pass
class NBPFLoader:
""".nbpf 加载器"""
def __init__(
self,
crypto: NBPCrypto = None,
compiler: NIRCompiler = None,
trusted_ed25519_keys: dict[str, bytes] = None,
trusted_rsa_keys: dict[str, bytes] = None,
rsa_private_key: bytes = None,
):
"""
Args:
crypto: 加密工具实例
compiler: 编译器实例
trusted_ed25519_keys: {signer_name: ed25519_public_key_bytes}
trusted_rsa_keys: {signer_name: rsa_public_key_pem}
rsa_private_key: RSA 私钥 PEM用于解密 AES 密钥)
"""
self.crypto = crypto or NBPCrypto()
self.compiler = compiler or NIRCompiler()
self.trusted_ed25519_keys = trusted_ed25519_keys or {}
self.trusted_rsa_keys = trusted_rsa_keys or {}
self.rsa_private_key = rsa_private_key
self._current_allowed_imports: list[str] = []
def load(
self,
nbpf_path: Path,
plugin_name: str = None,
) -> tuple[Any, dict]:
"""加载 .nbpf 插件
Args:
nbpf_path: .nbpf 文件路径
plugin_name: 插件名称(用于日志,默认从 manifest 读取)
Returns:
(plugin_instance, plugin_info_dict)
Raises:
NBPFLoadError: 加载失败
"""
if not nbpf_path.exists():
raise NBPFLoadError(f".nbpf 文件不存在: {nbpf_path}")
try:
with zipfile.ZipFile(nbpf_path, 'r') as zf:
# 0. 一次扫描,缓存 NIR 模块数据
nir_modules = self._read_nir_modules(zf)
# 1. 外层验签(先用包内公钥验签,再查信任状态)
signer_pub_key, is_trusted, trusted_name = self._verify_outer_signature(zf, nir_modules)
status = "已信任" if is_trusted else "未信任"
Log.info("NBPF", f"外层签名验证通过 (signer: {trusted_name or 'unknown'}, {status})")
# 2. 外层解密
key1, meta_inf = self._decrypt_outer(zf)
key1_buf = bytearray(key1)
# 3. 中层验签(传入外层签名者名称,确保内外签名者一致)
rsa_signer = self._verify_inner_signature(nir_modules, meta_inf, trusted_name)
Log.info("NBPF", f"中层签名验证通过 (signer: {rsa_signer})")
# 4. 中层解密
key2 = self._decrypt_inner(meta_inf)
key2_buf = bytearray(key2)
# 5. 派生 HMAC 密钥
hmac_key = self.crypto.derive_hmac_key(key1, key2)
self.crypto._secure_wipe(key1_buf)
self.crypto._secure_wipe(key2_buf)
# 6. 解密 NIR 数据
nir_data = self._decrypt_nir_data_raw(nir_modules, key2)
# 7. 内层验签
self._verify_module_signatures(nir_data, meta_inf, hmac_key)
Log.info("NBPF", "内层模块签名验证通过")
# 8. 获取插件名称
manifest = meta_inf.get("manifest", {})
meta = manifest.get("metadata", {})
name = plugin_name or meta.get("name", nbpf_path.stem)
# 9. 反序列化并执行(传入 imports 白名单)
perms = manifest.get("permissions", {})
if isinstance(perms, dict):
self._current_allowed_imports = perms.get("imports", [])
else:
self._current_allowed_imports = []
instance, module = self._deserialize_and_exec(nir_data, name)
# 10. 构建插件信息
author_name = meta.get("author", trusted_name or "<unknown>")
info = {
"name": name,
"version": meta.get("version", ""),
"author": author_name,
"description": meta.get("description", ""),
"manifest": manifest,
"nbpf_path": str(nbpf_path),
"signer": trusted_name or author_name,
"signer_public_key": base64.b64encode(signer_pub_key).decode(),
"trusted": is_trusted,
}
Log.ok("NBPF", f"插件 '{name}' 加载成功")
return instance, info
except (NBPFFormatError, NBPCryptoError, NIRCompileError) as e:
raise NBPFLoadError(str(e)) from e
except zipfile.BadZipFile as e:
raise NBPFLoadError(f".nbpf 文件损坏: {e}") from e
except Exception as e:
raise NBPFLoadError(f"加载失败: {type(e).__name__}: {e}") from e
@staticmethod
def _read_nir_modules(zf: zipfile.ZipFile) -> dict[str, dict]:
"""一次扫描 ZIP 中所有 NIR 模块,缓存结果"""
modules = {}
for info in zf.infolist():
if info.filename.startswith(NBPFFormatter.NIR_DIR) and not info.filename.endswith("/"):
mod_name = info.filename[len(NBPFFormatter.NIR_DIR):]
modules[mod_name] = json.loads(zf.read(info.filename).decode("utf-8"))
return modules
# ── 外层验签 ──
def _verify_outer_signature(self, zf: zipfile.ZipFile, nir_modules: dict[str, dict]) -> tuple[bytes, bool, str | None]:
"""外层 Ed25519 签名验证
先用包内公钥验签(不依赖外部信任列表),验签通过后再检查信任状态。
签名计算方式与 full_encrypt_package 一致:
SHA256(outer_encryption_json + sorted_module_names_and_ciphertexts)
Returns:
(signer_pub_key_bytes, is_trusted, trusted_name)
"""
if NBPFFormatter.SIGNATURE not in zf.namelist():
raise NBPFLoadError("缺少外层签名文件")
if NBPFFormatter.SIGNER_PEM not in zf.namelist():
raise NBPFLoadError("缺少签名者公钥文件")
signature_b64 = zf.read(NBPFFormatter.SIGNATURE).decode().strip()
signer_pub_key = zf.read(NBPFFormatter.SIGNER_PEM)
# 计算包摘要(使用缓存的 nir_modules
encryption_data = json.loads(zf.read(NBPFFormatter.ENCRYPTION).decode("utf-8"))
digest = NBPCrypto._build_package_digest(encryption_data["data"], nir_modules)
# 直接用包内公钥验签
signature = base64.b64decode(signature_b64)
if not self.crypto.outer_verify(digest, signature, signer_pub_key):
raise NBPFLoadError("外层签名验证失败,包可能被篡改")
# 检查公钥是否在本地信任列表中
is_trusted = False
trusted_name = None
for name, trusted_key in self.trusted_ed25519_keys.items():
if trusted_key == signer_pub_key:
is_trusted = True
trusted_name = name
break
return signer_pub_key, is_trusted, trusted_name
# ── 外层解密 ──
def _decrypt_outer(self, zf: zipfile.ZipFile) -> tuple[bytes, dict]:
"""外层解密,返回 (key1, meta_inf_dict)"""
if NBPFFormatter.ENCRYPTION not in zf.namelist():
raise NBPFLoadError("缺少外层加密信息")
encryption_info = json.loads(zf.read(NBPFFormatter.ENCRYPTION).decode("utf-8"))
# 用 RSA 私钥解密 key1
if self.rsa_private_key is None:
raise NBPFLoadError("未配置 RSA 私钥,无法解密")
key1 = self.crypto.decrypt_key(encryption_info["encrypted_key"], self.rsa_private_key)
# 解密 META-INF 数据
meta_inf_bytes = self.crypto.outer_decrypt(encryption_info["data"], key1)
meta_inf = json.loads(meta_inf_bytes.decode("utf-8"))
return key1, meta_inf
# ── 中层验签 ──
def _verify_inner_signature(self, nir_modules: dict[str, dict], meta_inf: dict, ed25519_signer: str = None) -> str:
"""中层 RSA-4096 签名验证,返回签名者名称
签名计算方式与 full_encrypt_package 一致。
如果传入了 ed25519_signer优先使用同名 RSA 密钥验签;
否则遍历所有信任的 RSA 密钥。
Args:
nir_modules: 缓存的 NIR 模块数据
meta_inf: 解密后的 META-INF 数据
ed25519_signer: 外层 Ed25519 签名者名称
Returns:
RSA 签名者名称
Raises:
NBPFLoadError: 所有信任密钥均无法验证签名时抛出
"""
inner_sig_b64 = meta_inf.get("inner_signature")
if not inner_sig_b64:
raise NBPFLoadError("缺少中层签名")
# 使用缓存的 nir_modules 计算摘要
nir_digest = NBPCrypto._build_nir_digest(nir_modules)
inner_sig = base64.b64decode(inner_sig_b64)
# 优先使用与外层签名者同名的 RSA 密钥
candidates: list[tuple[str, bytes]] = []
if ed25519_signer and ed25519_signer in self.trusted_rsa_keys:
candidates.append((ed25519_signer, self.trusted_rsa_keys[ed25519_signer]))
else:
candidates = list(self.trusted_rsa_keys.items())
for name, rsa_pub_key in candidates:
if self.crypto.inner_verify(nir_digest, inner_sig, rsa_pub_key):
return name
raise NBPFLoadError("中层签名验证失败,无法匹配任何信任的 RSA 公钥")
# ── 中层解密 ──
def _decrypt_inner(self, meta_inf: dict) -> bytes:
"""中层解密,返回 key2"""
inner_enc = meta_inf.get("inner_encryption", {})
encrypted_key = inner_enc.get("encrypted_key")
if not encrypted_key:
raise NBPFLoadError("缺少中层加密密钥")
if self.rsa_private_key is None:
raise NBPFLoadError("未配置 RSA 私钥,无法解密")
return self.crypto.decrypt_key(encrypted_key, self.rsa_private_key)
# ── 解密 NIR 数据 ──
def _decrypt_nir_data_raw(self, nir_modules: dict[str, dict], key2: bytes) -> dict[str, bytes]:
"""解密 NIR 数据,使用缓存的模块数据"""
return {
mod_name: self.crypto.inner_decrypt(enc_info, key2)
for mod_name, enc_info in nir_modules.items()
}
# ── 内层验签 ──
def _verify_module_signatures(self, nir_data: dict, meta_inf: dict, hmac_key: bytes):
"""内层 HMAC 模块签名验证"""
module_sigs = meta_inf.get("module_signatures", {})
if not module_sigs:
Log.warn("NBPF", "未找到模块签名,跳过内层验签")
return
for mod_name, mod_data in nir_data.items():
expected_sig = module_sigs.get(mod_name)
if expected_sig:
if not self.crypto.module_verify(mod_data, expected_sig, hmac_key):
raise NBPFLoadError(f"模块 '{mod_name}' HMAC 签名验证失败")
# ── 反序列化并执行 ──
def _deserialize_and_exec(
self,
nir_data: dict[str, bytes],
plugin_name: str,
) -> tuple[Any, types.ModuleType]:
"""反序列化 NIR 并执行,返回 (instance, module)"""
# 构建安全的全局命名空间
safe_globals = self._build_safe_globals(plugin_name, self._current_allowed_imports)
# 按依赖顺序执行模块
main_module = None
for mod_name in sorted(nir_data.keys()):
nir_bytes = nir_data[mod_name]
code = self.compiler.deserialize_nir(nir_bytes)
# 创建模块
module_name = f"nbpf.{plugin_name}.{mod_name}"
if mod_name == NBPFFormatter.ENTRY_POINT:
module_name = f"nbpf.{plugin_name}"
module = types.ModuleType(module_name)
module.__package__ = f"nbpf.{plugin_name}"
module.__path__ = []
module.__file__ = f"<nbpf:{plugin_name}/{mod_name}>"
sys.modules[module_name] = module
# 执行 code object
exec(code, module.__dict__)
if mod_name == NBPFFormatter.ENTRY_POINT:
main_module = module
# 调用 New() 创建实例
if main_module is None:
raise NBPFLoadError(f"缺少入口模块 '{NBPFFormatter.ENTRY_POINT}'")
if not hasattr(main_module, "New"):
raise NBPFLoadError("插件缺少 New() 函数")
try:
instance = main_module.New()
except Exception as e:
raise NBPFLoadError(f"创建插件实例失败: {e}") from e
return instance, main_module
def _build_safe_globals(self, plugin_name: str, allowed_imports: list[str] = None) -> dict:
"""构建安全的全局命名空间
如果插件在 manifest 中声明了 imports 权限,将 `__import__` 加回内置函数,
并用白名单包装器限制只能导入声明的模块。
注意Python 沙箱无法完全阻止通过 ()__class__.__bases__[0].__subclasses__()
等反射方式逃逸。本沙箱仅用于防止意外访问危险模块,真正的安全隔离
需要 OS 级容器化。
"""
safe_builtins = {
'True': True, 'False': False, 'None': None,
'dict': dict, 'list': list, 'str': str, 'int': int,
'float': float, 'bool': bool, 'tuple': tuple, 'set': set,
'len': len, 'range': range, 'enumerate': enumerate,
'zip': zip, 'map': map, 'filter': filter,
'sorted': sorted, 'reversed': reversed,
'min': min, 'max': max, 'sum': sum, 'abs': abs,
'round': round, 'isinstance': isinstance, 'issubclass': issubclass,
'id': id, 'hash': hash, 'repr': repr,
'print': print, 'property': property,
'staticmethod': staticmethod, 'classmethod': classmethod,
'super': super, 'iter': iter, 'next': next,
'any': any, 'all': all, 'callable': callable,
'ValueError': ValueError, 'TypeError': TypeError,
'KeyError': KeyError, 'IndexError': IndexError,
'Exception': Exception, 'BaseException': BaseException,
}
# 如果插件声明了 imports 权限,添加白名单 __import__
if allowed_imports:
_allowed_set = set(allowed_imports)
def _safe_import(name, *args, **kwargs):
base = name.split(".")[0]
if base not in _allowed_set:
raise ImportError(
f"模块 '{name}' 不在权限白名单中。"
f"请在 manifest.json 的 permissions.imports 中声明: {sorted(_allowed_set)}"
)
return __import__(name, *args, **kwargs)
safe_builtins['__import__'] = _safe_import
return {
'__builtins__': safe_builtins,
'__name__': f'nbpf.{plugin_name}',
}