""".nbpf 文件格式定义和打包/解包工具 .nbpf 文件结构(ZIP 格式): ``` .nbpf (ZIP) ├── META-INF/ │ ├── MANIFEST.MF # 插件元数据(明文) │ ├── SIGNATURE # 外层 Ed25519 签名(明文) │ ├── SIGNER.PEM # 外层签名者公钥(明文) │ ├── ENCRYPTION # 外层加密信息(RSA-OAEP 加密的 AES 密钥1) │ ├── INNER_SIGNATURE # 中层 RSA-4096 签名(加密存储) │ ├── INNER_ENCRYPTION # 中层加密信息(RSA-OAEP 加密的 AES 密钥2) │ └── MODULE_SIGS # 内层 HMAC 签名列表(加密存储) ├── NIR/ │ ├── main # 主模块 NIR(双重加密) │ ├── sub_module # 子模块 NIR(双重加密) │ └── ... └── RES/ ├── manifest.json # 原始 manifest(明文) ├── config.py # 配置文件(可选,明文) ├── extensions.py # 扩展配置(可选,明文) └── ... # 其他资源文件(明文) ``` """ import json import zipfile import io import os import hashlib import base64 from pathlib import Path from typing import Optional from oss.logger.logger import Log from .crypto import NBPCrypto, NBPCryptoError from .compiler import NIRCompiler, NIRCompileError class NBPFFormatError(Exception): """.nbpf 格式错误""" pass class NBPFFormatter: """.nbpf 文件格式常量""" MAGIC = b"NBPF" VERSION = 1 ENTRY_POINT = "main" # ZIP 内部路径 META_INF = "META-INF/" NIR_DIR = "NIR/" RES_DIR = "RES/" # META-INF 文件(RSA 私钥持有者可解密读取) MANIFEST = META_INF + "MANIFEST.MF" SIGNATURE = META_INF + "SIGNATURE" SIGNER_PEM = META_INF + "SIGNER.PEM" ENCRYPTION = META_INF + "ENCRYPTION" INNER_SIGNATURE = META_INF + "INNER_SIGNATURE" INNER_ENCRYPTION = META_INF + "INNER_ENCRYPTION" MODULE_SIGS = META_INF + "MODULE_SIGS" # META-INF 公开元数据(明文,仅含 name/version/author/description) PLUGIN_MF = META_INF + "PLUGIN.MF" # 跳过列表(打包时排除的文件) SKIP_FILES = {"__pycache__", "SIGNATURE", ".DS_Store", "Thumbs.db"} class NBPFPacker: """.nbpf 打包工具 — 将插件目录打包为 .nbpf 文件""" def __init__(self, crypto: NBPCrypto = None, compiler: NIRCompiler = None): self.crypto = crypto or NBPCrypto() self.compiler = compiler or NIRCompiler() def pack( self, plugin_dir: Path, output_path: Path, ed25519_private_key: bytes, rsa_private_key_pem: bytes, rsa_public_key_pem: bytes, ed25519_public_key: bytes = None, signer_name: str = "unknown", ) -> Path: """将插件目录打包为 .nbpf 文件 Args: plugin_dir: 插件目录路径 output_path: 输出 .nbpf 文件路径 ed25519_private_key: Ed25519 私钥(外层签名) rsa_private_key_pem: RSA 私钥 PEM(中层签名) rsa_public_key_pem: RSA 公钥 PEM(用于加密 AES 密钥) ed25519_public_key: Ed25519 公钥(存入包内,None 则自动派生) signer_name: 签名者名称 Returns: 输出文件路径 Raises: NBPFFormatError: 打包失败 """ if not plugin_dir.exists(): raise NBPFFormatError(f"插件目录不存在: {plugin_dir}") # 确保输出目录存在 output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) try: # 1. 读取 manifest manifest = self._read_manifest(plugin_dir) # 2. 编译所有 .py 文件为 NIR Log.info("NBPF", f"编译插件: {plugin_dir.name}") nir_data = self.compiler.compile_plugin(plugin_dir) # 3. 收集资源文件 res_files = self._collect_resources(plugin_dir) # 4. 完整加密打包 Log.info("NBPF", "加密打包中...") package_info = self.crypto.full_encrypt_package( nir_data=nir_data, manifest=manifest, ed25519_private_key=ed25519_private_key, rsa_private_key_pem=rsa_private_key_pem, rsa_public_key_pem=rsa_public_key_pem, ) # 5. 构建 ZIP 包 with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: # META-INF/SIGNATURE zf.writestr(NBPFFormatter.SIGNATURE, package_info["outer_signature"]) # META-INF/SIGNER.PEM if ed25519_public_key: zf.writestr(NBPFFormatter.SIGNER_PEM, ed25519_public_key) else: # 从私钥派生公钥 ed25519_mod = NBPCrypto._imp_ed25519() key = ed25519_mod.Ed25519PrivateKey.from_private_bytes(ed25519_private_key) pub_bytes = key.public_key().public_bytes( NBPCrypto._imp_serialization().Encoding.Raw, NBPCrypto._imp_serialization().PublicFormat.Raw ) zf.writestr(NBPFFormatter.SIGNER_PEM, pub_bytes) # META-INF/ENCRYPTION zf.writestr(NBPFFormatter.ENCRYPTION, json.dumps(package_info["outer_encryption"])) # META-INF/INNER_SIGNATURE zf.writestr(NBPFFormatter.INNER_SIGNATURE, package_info["inner_signature"]) # META-INF/INNER_ENCRYPTION zf.writestr(NBPFFormatter.INNER_ENCRYPTION, json.dumps(package_info["inner_encryption"])) # META-INF/MODULE_SIGS zf.writestr(NBPFFormatter.MODULE_SIGS, json.dumps(package_info["module_signatures"])) # NIR/ 目录 for mod_name, enc_info in package_info["inner_encrypted"].items(): nir_path = NBPFFormatter.NIR_DIR + mod_name zf.writestr(nir_path, json.dumps(enc_info)) # RES/ 目录(资源文件不加密) for res_path, res_data in res_files.items(): zf.writestr(NBPFFormatter.RES_DIR + res_path, res_data) # META-INF/PLUGIN.MF(仅公开元数据,明文存储便于发现) meta = manifest.get("metadata", {}) plugin_mf = { "name": meta.get("name", plugin_dir.name), "version": meta.get("version", "1.0.0"), "author": meta.get("author", "unknown"), "description": meta.get("description", ""), } zf.writestr(NBPFFormatter.PLUGIN_MF, json.dumps(plugin_mf, indent=2)) Log.ok("NBPF", f"打包完成: {output_path}") return output_path except NIRCompileError as e: raise NBPFFormatError(f"编译失败: {e}") from e except NBPCryptoError as e: raise NBPFFormatError(f"加密失败: {e}") from e except Exception as e: raise NBPFFormatError(f"打包失败: {type(e).__name__}: {e}") from e def _read_manifest(self, plugin_dir: Path) -> dict: """读取插件 manifest.json""" manifest_file = plugin_dir / "manifest.json" if not manifest_file.exists(): # 生成默认 manifest return { "metadata": { "name": plugin_dir.name, "version": "1.0.0", "author": "unknown", "description": "", }, "config": {"enabled": True, "args": {}}, "dependencies": [], "permissions": [], } try: return json.loads(manifest_file.read_text(encoding="utf-8")) except json.JSONDecodeError as e: raise NBPFFormatError(f"manifest.json 格式错误: {e}") from e def _collect_resources(self, plugin_dir: Path) -> dict[str, bytes]: """收集资源文件(非 .py 文件)""" resources = {} for file_path in sorted(plugin_dir.rglob("*")): if not file_path.is_file(): continue rel_path = str(file_path.relative_to(plugin_dir)) # 跳过 skip = False for skip_name in NBPFFormatter.SKIP_FILES: if skip_name in file_path.parts: skip = True break if skip: continue # 跳过 .py 文件(已编译为 NIR) if file_path.suffix == ".py": continue # 跳过 manifest.json(已单独处理) if file_path.name == "manifest.json": continue try: resources[rel_path] = file_path.read_bytes() except Exception as e: Log.warn("NBPF", f"跳过资源文件 {rel_path}: {e}") return resources class NBPFUnpacker: """.nbpf 解包工具 — 解包 .nbpf 文件到目录""" def __init__(self, crypto: NBPCrypto = None): self.crypto = crypto or NBPCrypto() def unpack(self, nbpf_path: Path, output_dir: Path) -> Path: """解包 .nbpf 到目录(用于调试/开发) Args: nbpf_path: .nbpf 文件路径 output_dir: 输出目录 Returns: 输出目录路径 """ if not nbpf_path.exists(): raise NBPFFormatError(f".nbpf 文件不存在: {nbpf_path}") output_dir = Path(output_dir) output_dir.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(nbpf_path, 'r') as zf: # 提取所有文件 for info in zf.infolist(): # 跳过目录 if info.filename.endswith("/"): continue # 计算输出路径 out_path = output_dir / info.filename # 创建父目录 out_path.parent.mkdir(parents=True, exist_ok=True) # 写入文件 out_path.write_bytes(zf.read(info.filename)) Log.ok("NBPF", f"解包完成: {output_dir}") return output_dir def extract_manifest(self, nbpf_path: Path) -> dict: """提取公开元数据(不解密,读取 PLUGIN.MF) 包含 name / version / author / description 公开字段, 完整 manifest(含依赖和权限声明)仅在加密的 META-INF 中。 Raises: NBPFFormatError: 如果 .nbpf 文件中缺少 PLUGIN.MF """ with zipfile.ZipFile(nbpf_path, 'r') as zf: if NBPFFormatter.PLUGIN_MF not in zf.namelist(): raise NBPFFormatError(".nbpf 文件中缺少 PLUGIN.MF") return json.loads(zf.read(NBPFFormatter.PLUGIN_MF).decode("utf-8")) def verify_signature( self, nbpf_path: Path, trusted_keys: dict[str, bytes], ) -> tuple[bool, str]: """验证 .nbpf 文件的外层 Ed25519 签名 签名计算方式与 full_encrypt_package 一致。 Args: nbpf_path: .nbpf 文件路径 trusted_keys: {signer_name: ed25519_public_key_bytes} 信任的公钥字典 Returns: (是否通过, 消息) """ try: with zipfile.ZipFile(nbpf_path, 'r') as zf: # 读取签名和签名者公钥 if NBPFFormatter.SIGNATURE not in zf.namelist(): return False, "缺少 SIGNATURE 文件" if NBPFFormatter.SIGNER_PEM not in zf.namelist(): return False, "缺少 SIGNER.PEM 文件" signature_b64 = zf.read(NBPFFormatter.SIGNATURE).decode().strip() signer_pub_key = zf.read(NBPFFormatter.SIGNER_PEM) # 查找匹配的信任公钥 matched = False matched_name = None for name, trusted_key in trusted_keys.items(): if trusted_key == signer_pub_key: matched = True matched_name = name break if not matched: return False, "签名者公钥不在信任列表中" # 计算包摘要(与 full_encrypt_package 一致) encryption_data = json.loads(zf.read(NBPFFormatter.ENCRYPTION).decode("utf-8")) digest = hashlib.sha256() digest.update(json.dumps(encryption_data["data"]).encode()) # 按模块名排序,添加模块名和密文 nir_modules = {} for info in zf.infolist(): if info.filename.startswith(NBPFFormatter.NIR_DIR) and not info.filename.endswith("/"): mod_name = info.filename[len(NBPFFormatter.NIR_DIR):] mod_data = json.loads(zf.read(info.filename).decode("utf-8")) nir_modules[mod_name] = mod_data for mod_name in sorted(nir_modules.keys()): digest.update(mod_name.encode()) digest.update(nir_modules[mod_name]["ciphertext"].encode()) # 验签 signature = base64.b64decode(signature_b64) if self.crypto.outer_verify(digest.digest(), signature, signer_pub_key): return True, f"签名验证通过 (signer: {matched_name})" else: return False, "签名验证失败,包可能被篡改" except Exception as e: return False, f"签名验证异常: {type(e).__name__}: {e}"