Files
NebulaShell/tests/test_nbpf.py
Falck bce27db4ac 重大重构:引擎模块拆分 + P0插件实现 + 55个Bug修复
核心变更:
- engine.py(1781行)拆分为8个独立模块: lifecycle/security/deps/
  datastore/pl_injector/watcher/signature/manager
- 新增plugin-bridge: 事件总线 + 服务注册 + RPC通信
- 新增i18n: 国际化/多语言翻译支持
- 新增plugin-storage: 插件键值/文件存储
- 新增ws-api: WebSocket实时通信(pub/sub + 自定义处理器)
- nodejs-adapter统一为Plugin ABC模式

Bug修复:
- 修复load_all()中store_dir未定义崩溃
- 修复DependencyResolver入度计算(拓扑排序)
- 修复PermissionError隐藏内置异常
- 修复CORS中间件头部未附加到响应
- 修复IntegrityChecker跳过__pycache__目录
- 修复版本号不一致(v2.0.0→v1.2.0)
- 修复测试文件的Logger导入/路径/私有方法调用
- 修复context.py缺少typing导入
- 修复config.py STORE_DIR默认路径(./mods→./store)

测试覆盖: 14→91个测试, 全部通过
2026-05-12 11:40:06 +08:00

488 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""NBPF 模块测试"""
import sys
import os
import json
import tempfile
import shutil
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from oss.core.nbpf import (
NBPCrypto, NBPCryptoError,
NIRCompiler, NIRCompileError,
NBPFPacker, NBPFUnpacker, NBPFFormatter,
NBPFLoader, NBPFLoadError,
)
# ═══════════════════════════════════════════════════════════════
# 辅助函数
# ═══════════════════════════════════════════════════════════════
def _create_test_plugin(tmp_dir: Path) -> Path:
"""创建测试插件目录"""
plugin_dir = tmp_dir / "test-plugin"
plugin_dir.mkdir(parents=True)
# manifest.json
manifest = {
"metadata": {
"name": "test-plugin",
"version": "1.0.0",
"author": "test",
"description": "测试插件",
},
"dependencies": [],
}
(plugin_dir / "manifest.json").write_text(json.dumps(manifest, indent=2))
# main.py
main_py = '''"""测试插件"""
class New:
def __init__(self):
self.name = "test-plugin"
self.started = False
def start(self):
self.started = True
return True
def stop(self):
self.started = False
return True
def greet(self, name: str) -> str:
return f"Hello, {name}!"
'''
(plugin_dir / "main.py").write_text(main_py)
return plugin_dir
def _generate_keys(tmp_dir: Path) -> dict:
"""生成测试密钥,返回路径字典"""
keys = {}
# Ed25519
ed25519_private, ed25519_public = NBPCrypto.generate_ed25519_keypair()
keys["ed25519_private"] = ed25519_private
keys["ed25519_public"] = ed25519_public
# RSA
rsa_private, rsa_public = NBPCrypto.generate_rsa_keypair(key_size=2048) # 测试用 2048
keys["rsa_private"] = rsa_private
keys["rsa_public"] = rsa_public
return keys
# ═══════════════════════════════════════════════════════════════
# 测试NBPCrypto
# ═══════════════════════════════════════════════════════════════
def test_crypto_aes_encrypt_decrypt():
"""测试 AES-256-GCM 加密解密"""
data = b"Hello, NebulaShell!"
key = NBPCrypto.generate_aes_key()
assert len(key) == 32
nonce, ct, tag = NBPCrypto._aes_encrypt(data, key)
assert len(nonce) == 12
assert len(tag) == 16
assert isinstance(ct, bytes)
dec = NBPCrypto._aes_decrypt(ct, key, nonce, tag)
assert dec == data
def test_crypto_outer_encrypt_decrypt():
"""测试外层加密解密"""
data = b"Secret plugin data"
key = NBPCrypto.generate_aes_key()
enc = NBPCrypto.outer_encrypt(data, key)
assert isinstance(enc, dict)
dec = NBPCrypto.outer_decrypt(enc, key)
assert dec == data
def test_crypto_inner_encrypt_decrypt():
"""测试中层加密解密"""
data = b"NIR compiled data"
key = NBPCrypto.generate_aes_key()
enc = NBPCrypto.inner_encrypt(data, key)
assert isinstance(enc, dict)
dec = NBPCrypto.inner_decrypt(enc, key)
assert dec == data
def test_crypto_ed25519_sign_verify():
"""测试 Ed25519 签名验证"""
data = b"Package content hash"
private, public = NBPCrypto.generate_ed25519_keypair()
signature = NBPCrypto.outer_sign(data, private)
assert isinstance(signature, bytes)
assert NBPCrypto.outer_verify(data, signature, public) is True
assert NBPCrypto.outer_verify(b"tampered", signature, public) is False
def test_crypto_rsa_sign_verify():
"""测试 RSA-PSS 签名验证"""
data = b"NIR content hash"
private, public = NBPCrypto.generate_rsa_keypair(key_size=2048)
signature = NBPCrypto.inner_sign(data, private)
assert isinstance(signature, bytes)
assert NBPCrypto.inner_verify(data, signature, public) is True
assert NBPCrypto.inner_verify(b"tampered", signature, public) is False
def test_crypto_hmac_sign_verify():
"""测试 HMAC-SHA256 签名验证"""
data = b"Module content"
key1 = NBPCrypto.generate_aes_key()
key2 = NBPCrypto.generate_aes_key()
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
signature = NBPCrypto.module_sign(data, hmac_key)
assert isinstance(signature, str)
assert NBPCrypto.module_verify(data, signature, hmac_key) is True
assert NBPCrypto.module_verify(b"tampered", signature, hmac_key) is False
def test_crypto_rsa_oaep_key_encapsulation():
"""测试 RSA-OAEP 密钥封装"""
aes_key = NBPCrypto.generate_aes_key()
rsa_private, rsa_public = NBPCrypto.generate_rsa_keypair(key_size=2048)
encrypted = NBPCrypto.encrypt_key(aes_key, rsa_public)
assert isinstance(encrypted, str)
decrypted = NBPCrypto.decrypt_key(encrypted, rsa_private)
assert decrypted == aes_key
def test_crypto_anti_debug():
"""测试反调试检测(不触发,仅验证函数存在)"""
assert hasattr(NBPCrypto, "_anti_debug_check")
assert callable(NBPCrypto._anti_debug_check)
def test_crypto_secure_wipe():
"""测试内存擦除"""
buf = bytearray(b"secret_key_1234567890")
NBPCrypto._secure_wipe(buf)
assert all(b == 0 for b in buf)
# ═══════════════════════════════════════════════════════════════
# 测试NIRCompiler
# ═══════════════════════════════════════════════════════════════
def test_compiler_compile_source():
"""测试编译源码为 NIR"""
compiler = NIRCompiler()
source = "x = 1\ny = x + 2\nresult = y * 3\n"
nir_data = compiler.compile_source(source, "test.py")
assert isinstance(nir_data, bytes)
# 反序列化
code = NIRCompiler.deserialize_nir(nir_data)
assert code is not None
# 执行
ns = {}
exec(code, ns)
assert ns["result"] == 9
def test_compiler_create_function():
"""测试从 code object 创建函数"""
compiler = NIRCompiler()
source = "def add(a, b): return a + b"
nir_data = compiler.compile_source(source, "test.py")
code = NIRCompiler.deserialize_nir(nir_data)
ns = {}
exec(code, ns)
assert ns["add"](2, 3) == 5
def test_compiler_forbidden_modules():
"""测试禁止导入危险模块"""
compiler = NIRCompiler()
dangerous_sources = [
"import os",
"from os import path",
"import subprocess",
"import sys",
"import socket",
"import ctypes",
]
for source in dangerous_sources:
try:
compiler.compile_source(source, "test.py")
assert False, f"应该拒绝: {source}"
except NIRCompileError:
pass # 预期行为
def test_compiler_compile_plugin():
"""测试编译整个插件目录"""
compiler = NIRCompiler()
with tempfile.TemporaryDirectory() as tmp:
plugin_dir = _create_test_plugin(Path(tmp))
nir_data = compiler.compile_plugin(plugin_dir)
assert "main" in nir_data
assert isinstance(nir_data["main"], bytes)
# ═══════════════════════════════════════════════════════════════
# 测试NBPFPacker + NBPFUnpacker
# ═══════════════════════════════════════════════════════════════
def _create_packer():
"""创建 NBPFPacker 实例"""
return NBPFPacker(crypto=NBPCrypto(), compiler=NIRCompiler())
def test_pack_unpack():
"""测试打包和解包完整流程"""
packer = _create_packer()
unpacker = NBPFUnpacker()
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
plugin_dir = _create_test_plugin(tmp_path)
keys = _generate_keys(tmp_path)
# 打包
nbpf_path = tmp_path / "test-plugin.nbpf"
result = packer.pack(
plugin_dir=plugin_dir,
output_path=nbpf_path,
ed25519_private_key=keys["ed25519_private"],
rsa_private_key_pem=keys["rsa_private"],
rsa_public_key_pem=keys["rsa_public"],
signer_name="test",
)
assert result.exists()
assert result.suffix == ".nbpf"
# 解包
unpack_dir = tmp_path / "unpacked"
result_dir = unpacker.unpack(nbpf_path, unpack_dir)
assert result_dir.exists()
# 解包后公开元数据在 META-INF/PLUGIN.MF
assert (result_dir / "META-INF" / "PLUGIN.MF").exists()
# 完整 manifest 不再明文存储(在加密段中)
assert not (result_dir / "META-INF" / "MANIFEST.MF").exists()
def test_extract_manifest():
"""测试提取 manifest"""
packer = _create_packer()
unpacker = NBPFUnpacker()
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
plugin_dir = _create_test_plugin(tmp_path)
keys = _generate_keys(tmp_path)
nbpf_path = tmp_path / "test-plugin.nbpf"
packer.pack(
plugin_dir=plugin_dir,
output_path=nbpf_path,
ed25519_private_key=keys["ed25519_private"],
rsa_private_key_pem=keys["rsa_private"],
rsa_public_key_pem=keys["rsa_public"],
signer_name="test",
)
manifest = unpacker.extract_manifest(nbpf_path)
assert manifest["name"] == "test-plugin"
assert manifest["version"] == "1.0.0"
def test_verify_signature():
"""测试签名验证"""
packer = _create_packer()
unpacker = NBPFUnpacker()
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
plugin_dir = _create_test_plugin(tmp_path)
keys = _generate_keys(tmp_path)
nbpf_path = tmp_path / "test-plugin.nbpf"
packer.pack(
plugin_dir=plugin_dir,
output_path=nbpf_path,
ed25519_private_key=keys["ed25519_private"],
rsa_private_key_pem=keys["rsa_private"],
rsa_public_key_pem=keys["rsa_public"],
signer_name="test",
)
# 用正确的公钥验证
trusted = {"test": keys["ed25519_public"]}
valid, msg = unpacker.verify_signature(nbpf_path, trusted)
assert valid, f"验证失败: {msg}"
# 用错误的公钥验证
wrong_private, wrong_public = NBPCrypto.generate_ed25519_keypair()
wrong_trusted = {"wrong": wrong_public}
valid, msg = unpacker.verify_signature(nbpf_path, wrong_trusted)
assert not valid, "应该验证失败"
# ═══════════════════════════════════════════════════════════════
# 测试NBPFLoader
# ═══════════════════════════════════════════════════════════════
def test_loader_full_flow():
"""测试加载器完整流程"""
packer = _create_packer()
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
plugin_dir = _create_test_plugin(tmp_path)
keys = _generate_keys(tmp_path)
# 打包
nbpf_path = tmp_path / "test-plugin.nbpf"
packer.pack(
plugin_dir=plugin_dir,
output_path=nbpf_path,
ed25519_private_key=keys["ed25519_private"],
rsa_private_key_pem=keys["rsa_private"],
rsa_public_key_pem=keys["rsa_public"],
signer_name="test",
)
# 加载
loader = NBPFLoader(
crypto=NBPCrypto(),
compiler=NIRCompiler(),
trusted_ed25519_keys={"test": keys["ed25519_public"]},
trusted_rsa_keys={"test": keys["rsa_public"]},
rsa_private_key=keys["rsa_private"],
)
instance, info = loader.load(nbpf_path)
assert instance is not None
assert info["name"] == "test-plugin"
assert info["version"] == "1.0.0"
assert info["trusted"] is True
assert "signer_public_key" in info
assert isinstance(info["signer_public_key"], str)
# 验证插件功能
assert instance.name == "test-plugin"
assert instance.greet("World") == "Hello, World!"
# 验证启动/停止
assert instance.start() is True
assert instance.started is True
assert instance.stop() is True
assert instance.started is False
def test_loader_wrong_signature():
"""测试加载器检测到未信任作者时返回 trusted=False"""
packer = _create_packer()
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
plugin_dir = _create_test_plugin(tmp_path)
keys = _generate_keys(tmp_path)
nbpf_path = tmp_path / "test-plugin.nbpf"
packer.pack(
plugin_dir=plugin_dir,
output_path=nbpf_path,
ed25519_private_key=keys["ed25519_private"],
rsa_private_key_pem=keys["rsa_private"],
rsa_public_key_pem=keys["rsa_public"],
signer_name="test",
)
# 用错误的 Ed25519 公钥(不在信任列表中)
_, wrong_public = NBPCrypto.generate_ed25519_keypair()
loader = NBPFLoader(
trusted_ed25519_keys={"wrong": wrong_public},
trusted_rsa_keys={"test": keys["rsa_public"]},
rsa_private_key=keys["rsa_private"],
)
# 当前逻辑:先用包内公钥验签(通过),再查信任列表(未信任)
# 不应抛出异常,而是返回 trusted=False
instance, info = loader.load(nbpf_path)
assert instance is not None, "签名验证应通过(用包内公钥)"
assert info["trusted"] is False, "应标记为未信任"
assert info["signer"] != "wrong", "不应使用错误的信任名称"
# ═══════════════════════════════════════════════════════════════
# 测试PluginManager 集成
# ═══════════════════════════════════════════════════════════════
def test_plugin_manager_nbpf_methods():
"""测试 PluginManager 的 NBPF 方法"""
from oss.core.engine import PluginManager
pm = PluginManager()
assert hasattr(pm, "load_nbpf")
assert hasattr(pm, "_init_nbpf")
assert pm._nbpf_initialized is False
# 初始化
pm._init_nbpf()
# 没有密钥文件时,加载器可能为 None
# 但方法应该存在且不崩溃
# ═══════════════════════════════════════════════════════════════
# 运行
# ═══════════════════════════════════════════════════════════════
if __name__ == "__main__":
tests = [
("NBPCrypto AES", test_crypto_aes_encrypt_decrypt),
("NBPCrypto 外层加密", test_crypto_outer_encrypt_decrypt),
("NBPCrypto 中层加密", test_crypto_inner_encrypt_decrypt),
("NBPCrypto Ed25519", test_crypto_ed25519_sign_verify),
("NBPCrypto RSA-PSS", test_crypto_rsa_sign_verify),
("NBPCrypto HMAC", test_crypto_hmac_sign_verify),
("NBPCrypto RSA-OAEP", test_crypto_rsa_oaep_key_encapsulation),
("NBPCrypto 反调试", test_crypto_anti_debug),
("NBPCrypto 内存擦除", test_crypto_secure_wipe),
("NIRCompiler 编译", test_compiler_compile_source),
("NIRCompiler 函数创建", test_compiler_create_function),
("NIRCompiler 禁止模块", test_compiler_forbidden_modules),
("NIRCompiler 编译目录", test_compiler_compile_plugin),
("NBPF 打包解包", test_pack_unpack),
("NBPF 提取 manifest", test_extract_manifest),
("NBPF 签名验证", test_verify_signature),
("NBPF 加载器完整流程", test_loader_full_flow),
("NBPF 加载器未信任作者", test_loader_wrong_signature),
("PluginManager 集成", test_plugin_manager_nbpf_methods),
]
passed = 0
failed = 0
for name, test_fn in tests:
try:
test_fn()
print(f"{name}")
passed += 1
except Exception as e:
print(f"{name}: {type(e).__name__}: {e}")
failed += 1
print(f"\n结果: {passed} 通过, {failed} 失败, {passed + failed} 总计")
sys.exit(1 if failed > 0 else 0)