""".nbpf 加载器 — 加载 .nbpf 文件到运行时环境 加载流程: 1. 打开 .nbpf (ZIP) 文件 2. 外层验签:用 Ed25519 公钥验证包签名 3. 外层解密:用 RSA 私钥解密密钥1,解密 META-INF/ 4. 中层验签:用 RSA-4096 公钥验证 NIR 签名 5. 中层解密:用 RSA 私钥解密密钥2,解密 NIR 数据 6. 内层验签:用 HMAC 验证每个模块签名 7. 反序列化 NIR 为 code object 8. 在受限沙箱中执行 9. 内存擦除所有密钥 """ import json import zipfile import sys import types import hashlib import base64 from pathlib import Path from typing import Any, Optional from oss.logger.logger import Log from .crypto import NBPCrypto, NBPCryptoError from .compiler import NIRCompiler, NIRCompileError from .format import NBPFFormatter, NBPFFormatError class NBPFLoadError(Exception): """.nbpf 加载错误""" pass class NBPFLoader: """.nbpf 加载器""" def __init__( self, crypto: NBPCrypto = None, compiler: NIRCompiler = None, trusted_ed25519_keys: dict[str, bytes] = None, trusted_rsa_keys: dict[str, bytes] = None, rsa_private_key: bytes = None, ): """ Args: crypto: 加密工具实例 compiler: 编译器实例 trusted_ed25519_keys: {signer_name: ed25519_public_key_bytes} trusted_rsa_keys: {signer_name: rsa_public_key_pem} rsa_private_key: RSA 私钥 PEM(用于解密 AES 密钥) """ self.crypto = crypto or NBPCrypto() self.compiler = compiler or NIRCompiler() self.trusted_ed25519_keys = trusted_ed25519_keys or {} self.trusted_rsa_keys = trusted_rsa_keys or {} self.rsa_private_key = rsa_private_key self._current_allowed_imports: list[str] = [] def load( self, nbpf_path: Path, plugin_name: str = None, ) -> tuple[Any, dict]: """加载 .nbpf 插件 Args: nbpf_path: .nbpf 文件路径 plugin_name: 插件名称(用于日志,默认从 manifest 读取) Returns: (plugin_instance, plugin_info_dict) Raises: NBPFLoadError: 加载失败 """ if not nbpf_path.exists(): raise NBPFLoadError(f".nbpf 文件不存在: {nbpf_path}") try: with zipfile.ZipFile(nbpf_path, 'r') as zf: # 0. 一次扫描,缓存 NIR 模块数据 nir_modules = self._read_nir_modules(zf) # 1. 外层验签(先用包内公钥验签,再查信任状态) signer_pub_key, is_trusted, trusted_name = self._verify_outer_signature(zf, nir_modules) status = "已信任" if is_trusted else "未信任" Log.info("NBPF", f"外层签名验证通过 (signer: {trusted_name or 'unknown'}, {status})") # 2. 外层解密 key1, meta_inf = self._decrypt_outer(zf) key1_buf = bytearray(key1) # 3. 中层验签(传入外层签名者名称,确保内外签名者一致) rsa_signer = self._verify_inner_signature(nir_modules, meta_inf, trusted_name) Log.info("NBPF", f"中层签名验证通过 (signer: {rsa_signer})") # 4. 中层解密 key2 = self._decrypt_inner(meta_inf) key2_buf = bytearray(key2) # 5. 派生 HMAC 密钥 hmac_key = self.crypto.derive_hmac_key(key1, key2) self.crypto._secure_wipe(key1_buf) self.crypto._secure_wipe(key2_buf) # 6. 解密 NIR 数据 nir_data = self._decrypt_nir_data_raw(nir_modules, key2) # 7. 内层验签 self._verify_module_signatures(nir_data, meta_inf, hmac_key) Log.info("NBPF", "内层模块签名验证通过") # 8. 获取插件名称 manifest = meta_inf.get("manifest", {}) meta = manifest.get("metadata", {}) name = plugin_name or meta.get("name", nbpf_path.stem) # 9. 反序列化并执行(传入 imports 白名单) perms = manifest.get("permissions", {}) if isinstance(perms, dict): self._current_allowed_imports = perms.get("imports", []) else: self._current_allowed_imports = [] instance, module = self._deserialize_and_exec(nir_data, name) # 10. 构建插件信息 author_name = meta.get("author", trusted_name or "") info = { "name": name, "version": meta.get("version", ""), "author": author_name, "description": meta.get("description", ""), "manifest": manifest, "nbpf_path": str(nbpf_path), "signer": trusted_name or author_name, "signer_public_key": base64.b64encode(signer_pub_key).decode(), "trusted": is_trusted, } Log.ok("NBPF", f"插件 '{name}' 加载成功") return instance, info except (NBPFFormatError, NBPCryptoError, NIRCompileError) as e: raise NBPFLoadError(str(e)) from e except zipfile.BadZipFile as e: raise NBPFLoadError(f".nbpf 文件损坏: {e}") from e except Exception as e: raise NBPFLoadError(f"加载失败: {type(e).__name__}: {e}") from e @staticmethod def _read_nir_modules(zf: zipfile.ZipFile) -> dict[str, dict]: """一次扫描 ZIP 中所有 NIR 模块,缓存结果""" modules = {} for info in zf.infolist(): if info.filename.startswith(NBPFFormatter.NIR_DIR) and not info.filename.endswith("/"): mod_name = info.filename[len(NBPFFormatter.NIR_DIR):] modules[mod_name] = json.loads(zf.read(info.filename).decode("utf-8")) return modules # ── 外层验签 ── def _verify_outer_signature(self, zf: zipfile.ZipFile, nir_modules: dict[str, dict]) -> tuple[bytes, bool, str | None]: """外层 Ed25519 签名验证 先用包内公钥验签(不依赖外部信任列表),验签通过后再检查信任状态。 签名计算方式与 full_encrypt_package 一致: SHA256(outer_encryption_json + sorted_module_names_and_ciphertexts) Returns: (signer_pub_key_bytes, is_trusted, trusted_name) """ if NBPFFormatter.SIGNATURE not in zf.namelist(): raise NBPFLoadError("缺少外层签名文件") if NBPFFormatter.SIGNER_PEM not in zf.namelist(): raise NBPFLoadError("缺少签名者公钥文件") signature_b64 = zf.read(NBPFFormatter.SIGNATURE).decode().strip() signer_pub_key = zf.read(NBPFFormatter.SIGNER_PEM) # 计算包摘要(使用缓存的 nir_modules) encryption_data = json.loads(zf.read(NBPFFormatter.ENCRYPTION).decode("utf-8")) digest = NBPCrypto._build_package_digest(encryption_data["data"], nir_modules) # 直接用包内公钥验签 signature = base64.b64decode(signature_b64) if not self.crypto.outer_verify(digest, signature, signer_pub_key): raise NBPFLoadError("外层签名验证失败,包可能被篡改") # 检查公钥是否在本地信任列表中 is_trusted = False trusted_name = None for name, trusted_key in self.trusted_ed25519_keys.items(): if trusted_key == signer_pub_key: is_trusted = True trusted_name = name break return signer_pub_key, is_trusted, trusted_name # ── 外层解密 ── def _decrypt_outer(self, zf: zipfile.ZipFile) -> tuple[bytes, dict]: """外层解密,返回 (key1, meta_inf_dict)""" if NBPFFormatter.ENCRYPTION not in zf.namelist(): raise NBPFLoadError("缺少外层加密信息") encryption_info = json.loads(zf.read(NBPFFormatter.ENCRYPTION).decode("utf-8")) # 用 RSA 私钥解密 key1 if self.rsa_private_key is None: raise NBPFLoadError("未配置 RSA 私钥,无法解密") key1 = self.crypto.decrypt_key(encryption_info["encrypted_key"], self.rsa_private_key) # 解密 META-INF 数据 meta_inf_bytes = self.crypto.outer_decrypt(encryption_info["data"], key1) meta_inf = json.loads(meta_inf_bytes.decode("utf-8")) return key1, meta_inf # ── 中层验签 ── def _verify_inner_signature(self, nir_modules: dict[str, dict], meta_inf: dict, ed25519_signer: str = None) -> str: """中层 RSA-4096 签名验证,返回签名者名称 签名计算方式与 full_encrypt_package 一致。 如果传入了 ed25519_signer,优先使用同名 RSA 密钥验签; 否则遍历所有信任的 RSA 密钥。 Args: nir_modules: 缓存的 NIR 模块数据 meta_inf: 解密后的 META-INF 数据 ed25519_signer: 外层 Ed25519 签名者名称 Returns: RSA 签名者名称 Raises: NBPFLoadError: 所有信任密钥均无法验证签名时抛出 """ inner_sig_b64 = meta_inf.get("inner_signature") if not inner_sig_b64: raise NBPFLoadError("缺少中层签名") # 使用缓存的 nir_modules 计算摘要 nir_digest = NBPCrypto._build_nir_digest(nir_modules) inner_sig = base64.b64decode(inner_sig_b64) # 优先使用与外层签名者同名的 RSA 密钥 candidates: list[tuple[str, bytes]] = [] if ed25519_signer and ed25519_signer in self.trusted_rsa_keys: candidates.append((ed25519_signer, self.trusted_rsa_keys[ed25519_signer])) else: candidates = list(self.trusted_rsa_keys.items()) for name, rsa_pub_key in candidates: if self.crypto.inner_verify(nir_digest, inner_sig, rsa_pub_key): return name raise NBPFLoadError("中层签名验证失败,无法匹配任何信任的 RSA 公钥") # ── 中层解密 ── def _decrypt_inner(self, meta_inf: dict) -> bytes: """中层解密,返回 key2""" inner_enc = meta_inf.get("inner_encryption", {}) encrypted_key = inner_enc.get("encrypted_key") if not encrypted_key: raise NBPFLoadError("缺少中层加密密钥") if self.rsa_private_key is None: raise NBPFLoadError("未配置 RSA 私钥,无法解密") return self.crypto.decrypt_key(encrypted_key, self.rsa_private_key) # ── 解密 NIR 数据 ── def _decrypt_nir_data_raw(self, nir_modules: dict[str, dict], key2: bytes) -> dict[str, bytes]: """解密 NIR 数据,使用缓存的模块数据""" return { mod_name: self.crypto.inner_decrypt(enc_info, key2) for mod_name, enc_info in nir_modules.items() } # ── 内层验签 ── def _verify_module_signatures(self, nir_data: dict, meta_inf: dict, hmac_key: bytes): """内层 HMAC 模块签名验证""" module_sigs = meta_inf.get("module_signatures", {}) if not module_sigs: Log.warn("NBPF", "未找到模块签名,跳过内层验签") return for mod_name, mod_data in nir_data.items(): expected_sig = module_sigs.get(mod_name) if expected_sig: if not self.crypto.module_verify(mod_data, expected_sig, hmac_key): raise NBPFLoadError(f"模块 '{mod_name}' HMAC 签名验证失败") # ── 反序列化并执行 ── def _deserialize_and_exec( self, nir_data: dict[str, bytes], plugin_name: str, ) -> tuple[Any, types.ModuleType]: """反序列化 NIR 并执行,返回 (instance, module)""" # 构建安全的全局命名空间 safe_globals = self._build_safe_globals(plugin_name, self._current_allowed_imports) # 按依赖顺序执行模块 main_module = None for mod_name in sorted(nir_data.keys()): nir_bytes = nir_data[mod_name] code = self.compiler.deserialize_nir(nir_bytes) # 创建模块 module_name = f"nbpf.{plugin_name}.{mod_name}" if mod_name == NBPFFormatter.ENTRY_POINT: module_name = f"nbpf.{plugin_name}" module = types.ModuleType(module_name) module.__package__ = f"nbpf.{plugin_name}" module.__path__ = [] module.__file__ = f"" sys.modules[module_name] = module # 执行 code object exec(code, module.__dict__) if mod_name == NBPFFormatter.ENTRY_POINT: main_module = module # 调用 New() 创建实例 if main_module is None: raise NBPFLoadError(f"缺少入口模块 '{NBPFFormatter.ENTRY_POINT}'") if not hasattr(main_module, "New"): raise NBPFLoadError("插件缺少 New() 函数") try: instance = main_module.New() except Exception as e: raise NBPFLoadError(f"创建插件实例失败: {e}") from e return instance, main_module def _build_safe_globals(self, plugin_name: str, allowed_imports: list[str] = None) -> dict: """构建安全的全局命名空间 如果插件在 manifest 中声明了 imports 权限,将 `__import__` 加回内置函数, 并用白名单包装器限制只能导入声明的模块。 注意:Python 沙箱无法完全阻止通过 ()__class__.__bases__[0].__subclasses__() 等反射方式逃逸。本沙箱仅用于防止意外访问危险模块,真正的安全隔离 需要 OS 级容器化。 """ safe_builtins = { 'True': True, 'False': False, 'None': None, 'dict': dict, 'list': list, 'str': str, 'int': int, 'float': float, 'bool': bool, 'tuple': tuple, 'set': set, 'len': len, 'range': range, 'enumerate': enumerate, 'zip': zip, 'map': map, 'filter': filter, 'sorted': sorted, 'reversed': reversed, 'min': min, 'max': max, 'sum': sum, 'abs': abs, 'round': round, 'isinstance': isinstance, 'issubclass': issubclass, 'id': id, 'hash': hash, 'repr': repr, 'print': print, 'property': property, 'staticmethod': staticmethod, 'classmethod': classmethod, 'super': super, 'iter': iter, 'next': next, 'any': any, 'all': all, 'callable': callable, 'ValueError': ValueError, 'TypeError': TypeError, 'KeyError': KeyError, 'IndexError': IndexError, 'Exception': Exception, 'BaseException': BaseException, } # 如果插件声明了 imports 权限,添加白名单 __import__ if allowed_imports: _allowed_set = set(allowed_imports) def _safe_import(name, *args, **kwargs): base = name.split(".")[0] if base not in _allowed_set: raise ImportError( f"模块 '{name}' 不在权限白名单中。" f"请在 manifest.json 的 permissions.imports 中声明: {sorted(_allowed_set)}" ) return __import__(name, *args, **kwargs) safe_builtins['__import__'] = _safe_import return { '__builtins__': safe_builtins, '__name__': f'nbpf.{plugin_name}', }