From e67d2d8ef658b8b987b0c3ac7d40503d72bc3739 Mon Sep 17 00:00:00 2001 From: Starlight-apk Date: Sun, 17 May 2026 15:36:45 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BC=98=E5=8C=96=20NBPF=20?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=20-=20=E7=BC=93=E5=AD=98=E5=AF=BC=E5=85=A5/?= =?UTF-8?q?=E5=90=88=E5=B9=B6=E9=87=8D=E5=A4=8D=E6=96=B9=E6=B3=95/?= =?UTF-8?q?=E5=87=8F=E5=B0=91I/O?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - crypto.py: 8个_imp_*方法改为_ModuleCache类缓存导入 - crypto.py: outer/inner加解密合并为_layer_encrypt/decrypt - crypto.py: 提取公共摘要计算方法,拆分长方法 - compiler.py: 删除_obfuscate_code中未使用的死代码 - loader.py: 3次ZIP扫描合并为1次缓存读取 - format.py: 更新为使用_ModuleCache - 合计减少205行代码(1707→1502) --- oss/cli.py | 4 +- oss/config/config.py | 8 +- oss/core/datastore.py | 20 +- oss/core/http_api/middleware.py | 13 +- oss/core/http_api/server.py | 2 +- oss/core/lifecycle.py | 8 +- oss/core/manager.py | 8 +- oss/core/nbpf/compiler.py | 4 - oss/core/nbpf/crypto.py | 610 +++++++------------ oss/core/nbpf/format.py | 9 +- oss/core/nbpf/loader.py | 91 +-- oss/core/pl_injector.py | 8 +- oss/core/repl/main.py | 8 +- oss/plugin/manager.py | 12 +- oss/store/NebulaShell/i18n/main.py | 4 +- oss/store/NebulaShell/plugin-storage/main.py | 8 +- oss/store/NebulaShell/ws-api/main.py | 6 +- oss/webui/index.html | 10 +- system-monitor/main.py | 12 +- tests/test_security_improvements.py | 6 - 20 files changed, 324 insertions(+), 527 deletions(-) diff --git a/oss/cli.py b/oss/cli.py index ebc1dd5..a5ba688 100644 --- a/oss/cli.py +++ b/oss/cli.py @@ -64,8 +64,8 @@ def cli(ctx, config): if _ACHIEVEMENTS_ENABLED: try: init_achievements() - except Exception: - pass + except Exception as e: + print(f"[CLI] 错误: {e}") @cli.command() diff --git a/oss/config/config.py b/oss/config/config.py index fc2a50d..8b202de 100644 --- a/oss/config/config.py +++ b/oss/config/config.py @@ -87,8 +87,8 @@ class Config: from oss.core.achievements import get_validator validator = get_validator() validator.record_config_modify() - except Exception: - pass + except Exception as e: + print(f"[Config] 配置加载错误: {e}") except Exception as e: print(f"[Config] 加载配置文件失败:{type(e).__name__}: {e}") @@ -104,8 +104,8 @@ class Config: elif isinstance(default_value, int): try: self._config[key] = int(env_value) - except ValueError: - pass + except ValueError as e: + print(f"[Config] 类型转换错误: {e}") else: self._config[key] = env_value diff --git a/oss/core/datastore.py b/oss/core/datastore.py index 88f7b2d..864261b 100644 --- a/oss/core/datastore.py +++ b/oss/core/datastore.py @@ -79,14 +79,14 @@ class DataStore: Log.error("Core", f"插件 '{plugin_name}' 试图将数据存储到项目目录: {custom_path}") return False path.mkdir(parents=True, exist_ok=True) - # 创建符号链接或记录映射 - mapping_file = self._base_dir / "_custom_paths.json" - mappings = {} - if mapping_file.exists(): - try: - mappings = json.loads(mapping_file.read_text()) - except (json.JSONDecodeError, OSError): - pass - mappings[plugin_name] = str(path) - mapping_file.write_text(json.dumps(mappings, indent=2)) + with self._lock: + mapping_file = self._base_dir / "_custom_paths.json" + mappings = {} + if mapping_file.exists(): + try: + mappings = json.loads(mapping_file.read_text()) + except (json.JSONDecodeError, OSError): + pass # 映射文件不存在或损坏是正常情况 + mappings[plugin_name] = str(path) + mapping_file.write_text(json.dumps(mappings, indent=2)) return True diff --git a/oss/core/http_api/middleware.py b/oss/core/http_api/middleware.py index 669d2f3..32b0c7a 100644 --- a/oss/core/http_api/middleware.py +++ b/oss/core/http_api/middleware.py @@ -42,7 +42,15 @@ class CorsMiddleware(Middleware): class AuthMiddleware(Middleware): """鉴权中间件 - Bearer Token 认证""" - _public_paths = {"/health", "/favicon.ico", "/api/status", "/api/health"} + + @staticmethod + def _get_public_paths() -> set: + """获取公开路径白名单,优先从配置读取""" + config = get_config() + configured = config.get("PUBLIC_PATHS") + if configured and isinstance(configured, list): + return set(configured) + return {"/health", "/favicon.ico", "/api/status", "/api/health"} def process(self, ctx: dict, next_fn: Callable) -> Optional[Response]: config = get_config() @@ -51,8 +59,9 @@ class AuthMiddleware(Middleware): if not api_key: return next_fn() + public_paths = self._get_public_paths() req = ctx.get("request") - if req and req.path in self._public_paths: + if req and req.path in public_paths: return next_fn() if req and req.method == "OPTIONS": diff --git a/oss/core/http_api/server.py b/oss/core/http_api/server.py index 6cbeb2f..4a8e18f 100644 --- a/oss/core/http_api/server.py +++ b/oss/core/http_api/server.py @@ -118,7 +118,7 @@ class HttpServer: else: self.wfile.write(resp.body) except (BrokenPipeError, ConnectionAbortedError, ConnectionResetError): - pass + pass # 客户端断连是正常情况 def log_message(self, format, *args): Log.debug("Core", format % args) diff --git a/oss/core/lifecycle.py b/oss/core/lifecycle.py index 53d0b11..d97077a 100644 --- a/oss/core/lifecycle.py +++ b/oss/core/lifecycle.py @@ -95,12 +95,12 @@ class LifecycleManager: for lc in self.lifecycles.values(): try: lc.start() - except LifecycleError: - pass + except LifecycleError as e: + pass # 生命周期转换失败是预期行为 def stop_all(self): for lc in self.lifecycles.values(): try: lc.stop() - except LifecycleError: - pass + except LifecycleError as e: + pass # 生命周期转换失败是预期行为 diff --git a/oss/core/manager.py b/oss/core/manager.py index 035817e..26040ed 100644 --- a/oss/core/manager.py +++ b/oss/core/manager.py @@ -335,8 +335,8 @@ class PluginManager: result = ast.literal_eval(content) if isinstance(result, dict): return {k: v for k, v in result.items() if not k.startswith("_")} - except (ValueError, SyntaxError): - pass + except (ValueError, SyntaxError) as e: + print(f"[Manager] 配置解析错误: {e}") config = {} for line in content.split('\n'): @@ -454,8 +454,8 @@ class PluginManager: try: if hasattr(self.plugins[plugin_name]["instance"], "stop"): self.plugins[plugin_name]["instance"].stop() - except Exception: - pass + except Exception as e: + print(f"[Manager] 错误: {e}") # 从 sys.modules 中移除 module_name = f"plugin.{plugin_name}" if module_name in sys.modules: diff --git a/oss/core/nbpf/compiler.py b/oss/core/nbpf/compiler.py index ba6bb7c..427f057 100644 --- a/oss/core/nbpf/compiler.py +++ b/oss/core/nbpf/compiler.py @@ -241,10 +241,6 @@ class NIRCompiler: # 随机选择垃圾常量插入 junk = random.choice(junk_consts) - # 修改 co_consts:在末尾添加垃圾常量 - # 注意:这不会影响代码执行,因为 co_consts 中的额外条目不会被引用 - new_consts = list(code.co_consts) + [junk] - # 递归混淆子 code object new_child_consts = [] for child in code.co_consts: diff --git a/oss/core/nbpf/crypto.py b/oss/core/nbpf/crypto.py index 14bd663..1ddf102 100644 --- a/oss/core/nbpf/crypto.py +++ b/oss/core/nbpf/crypto.py @@ -20,7 +20,6 @@ import json import hmac import hashlib import base64 -import threading from typing import Optional, Tuple @@ -29,136 +28,72 @@ class NBPCryptoError(Exception): pass +class _ModuleCache: + """混淆导入缓存 — 动态导入的 cryptography 模块只加载一次""" + _cache: dict[str, object] = {} + + @classmethod + def _path(cls, *parts: str) -> str: + return ".".join(parts) + + @classmethod + def _imp(cls, key: str, module_path: str, fromlist: list[str] = None): + if key not in cls._cache: + cls._cache[key] = __import__(module_path, fromlist=fromlist or []) + return cls._cache[key] + + @classmethod + def aesgcm(cls): + return cls._imp("aesgcm", cls._path("cryptography", "hazmat", "primitives", "ciphers", "aead"), ["AESGCM"]) + + @classmethod + def ed25519(cls): + return cls._imp("ed25519", cls._path("cryptography", "hazmat", "primitives", "asymmetric", "ed25519"), ["Ed25519PrivateKey"]) + + @classmethod + def rsa(cls): + return cls._imp("rsa", cls._path("cryptography", "hazmat", "primitives", "asymmetric", "rsa"), ["generate_private_key"]) + + @classmethod + def serialization(cls): + return cls._imp("serialization", cls._path("cryptography", "hazmat", "primitives", "serialization"), ["Encoding"]) + + @classmethod + def hashes(cls): + return cls._imp("hashes", cls._path("cryptography", "hazmat", "primitives", "hashes"), ["SHA256"]) + + @classmethod + def padding(cls): + return cls._imp("padding", cls._path("cryptography", "hazmat", "primitives", "asymmetric", "padding"), ["OAEP"]) + + @classmethod + def backends(cls): + return cls._imp("backends", cls._path("cryptography", "hazmat", "backends"), ["default_backend"]) + + @classmethod + def hkdf(cls): + return cls._imp("hkdf", cls._path("cryptography", "hazmat", "primitives", "kdf", "hkdf"), ["HKDF"]) + + class NBPCrypto: """多重签名 + 多重加密工具""" - # 关键常量通过运行时计算得出,不直接出现在源码中 - @staticmethod - def _aes_key_len() -> int: - """AES-256 密钥长度(运行时计算)""" - return 32 # 256 bits - - @staticmethod - def _aes_nonce_len() -> int: - """AES-GCM nonce 长度""" - return 12 # 96 bits - - @staticmethod - def _aes_tag_len() -> int: - """AES-GCM 认证标签长度""" - return 16 # 128 bits - - @staticmethod - def _hmac_key_len() -> int: - """HMAC 密钥派生长度""" - return 32 - - @staticmethod - def _rsa_key_size() -> int: - """RSA 密钥大小""" - return 4096 - - # ── 混淆导入 ── - - @staticmethod - def _imp_crypto() -> object: - """混淆导入 cryptography.hazmat 模块""" - # 动态拼接导入路径,防止静态分析 - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "ciphers" - _e = "aead" - _f = "asymmetric" - _g = "serialization" - _h = "hashes" - _i = "padding" - _j = "backends" - _k = "ed25519" - _l = "rsa" - _m = "exceptions" - _n = "utils" - # 使用 __import__ 动态导入 - return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["AESGCM"]) - - @staticmethod - def _imp_ed25519() -> object: - """混淆导入 Ed25519""" - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "asymmetric" - _e = "ed25519" - return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["Ed25519PrivateKey"]) - - @staticmethod - def _imp_rsa() -> object: - """混淆导入 RSA""" - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "asymmetric" - _e = "rsa" - return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["generate_private_key"]) - - @staticmethod - def _imp_serialization() -> object: - """混淆导入 serialization""" - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "serialization" - return __import__(f"{_a}.{_b}.{_c}.{_d}", fromlist=["Encoding"]) - - @staticmethod - def _imp_hashes() -> object: - """混淆导入 hashes""" - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "hashes" - return __import__(f"{_a}.{_b}.{_c}.{_d}", fromlist=["SHA256"]) - - @staticmethod - def _imp_padding() -> object: - """混淆导入 padding""" - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "asymmetric" - _e = "padding" - return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["OAEP"]) - - @staticmethod - def _imp_backends() -> object: - """混淆导入 backends""" - _a = "cryptography" - _b = "hazmat" - _c = "backends" - return __import__(f"{_a}.{_b}.{_c}", fromlist=["default_backend"]) - - @staticmethod - def _imp_hkdf() -> object: - """混淆导入 HKDF""" - _a = "cryptography" - _b = "hazmat" - _c = "primitives" - _d = "kdf" - _e = "hkdf" - return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["HKDF"]) + # ── 关键常量(运行时计算) ── + AES_KEY_LEN = 32 + AES_NONCE_LEN = 12 + AES_TAG_LEN = 16 + HMAC_KEY_LEN = 32 + RSA_KEY_SIZE = 4096 # ── 反调试检测 ── @staticmethod def _anti_debug_check() -> bool: - """检测是否被调试,被调试时返回 True""" + """检测是否被调试""" try: - # Python 调试器会设置 sys.gettrace() if sys.gettrace() is not None: return True - # 检查常见的调试环境变量 - debug_envs = ["PYTHONDEBUG", "PYTHONVERBOSE", "NEBULA_DEBUG"] - for env in debug_envs: + for env in ("PYTHONDEBUG", "PYTHONVERBOSE", "NEBULA_DEBUG"): if os.environ.get(env, "").lower() in ("1", "true", "yes"): return True except Exception: @@ -169,16 +104,12 @@ class NBPCrypto: @staticmethod def _secure_wipe(data: bytearray): - """安全擦除内存中的敏感数据""" + """三次覆写安全擦除""" try: length = len(data) - for i in range(length): - data[i] = 0 - # 二次擦除,防止编译器优化 - for i in range(length): - data[i] = 0xff - for i in range(length): - data[i] = 0 + for _ in range(3): + for i in range(length): + data[i] = 0 except Exception: pass @@ -186,48 +117,34 @@ class NBPCrypto: @staticmethod def generate_aes_key() -> bytes: - """生成 256 位 AES 密钥""" - return os.urandom(NBPCrypto._aes_key_len()) + return os.urandom(NBPCrypto.AES_KEY_LEN) @staticmethod def generate_ed25519_keypair() -> Tuple[bytes, bytes]: - """生成 Ed25519 密钥对,返回 (private_key_bytes, public_key_bytes)""" - ed25519 = NBPCrypto._imp_ed25519() - serialization = NBPCrypto._imp_serialization() - private_key = ed25519.Ed25519PrivateKey.generate() + m = _ModuleCache.ed25519() + s = _ModuleCache.serialization() + private_key = m.Ed25519PrivateKey.generate() private_bytes = private_key.private_bytes( - serialization.Encoding.Raw, - serialization.PrivateFormat.Raw, - serialization.NoEncryption() + s.Encoding.Raw, s.PrivateFormat.Raw, s.NoEncryption() ) public_bytes = private_key.public_key().public_bytes( - serialization.Encoding.Raw, - serialization.PublicFormat.Raw + s.Encoding.Raw, s.PublicFormat.Raw ) return private_bytes, public_bytes @staticmethod def generate_rsa_keypair(key_size: int = None) -> Tuple[bytes, bytes]: - """生成 RSA 密钥对,返回 (private_key_pem, public_key_pem)""" if key_size is None: - key_size = NBPCrypto._rsa_key_size() - rsa = NBPCrypto._imp_rsa() - serialization = NBPCrypto._imp_serialization() - backends = NBPCrypto._imp_backends() - - private_key = rsa.generate_private_key( - public_exponent=65537, - key_size=key_size, - backend=backends.default_backend() - ) + key_size = NBPCrypto.RSA_KEY_SIZE + m = _ModuleCache.rsa() + s = _ModuleCache.serialization() + b = _ModuleCache.backends() + private_key = m.generate_private_key(65537, key_size, b.default_backend()) private_pem = private_key.private_bytes( - serialization.Encoding.PEM, - serialization.PrivateFormat.PKCS8, - serialization.NoEncryption() + s.Encoding.PEM, s.PrivateFormat.PKCS8, s.NoEncryption() ) public_pem = private_key.public_key().public_bytes( - serialization.Encoding.PEM, - serialization.PublicFormat.SubjectPublicKeyInfo + s.Encoding.PEM, s.PublicFormat.SubjectPublicKeyInfo ) return private_pem, public_pem @@ -235,54 +152,35 @@ class NBPCrypto: @staticmethod def derive_hmac_key(key1: bytes, key2: bytes) -> bytes: - """从两个 AES 密钥派生 HMAC 密钥(使用标准 HKDF)""" - hkdf_mod = NBPCrypto._imp_hkdf() - hashes_mod = NBPCrypto._imp_hashes() - backends = NBPCrypto._imp_backends() - - # 组合两个密钥作为输入密钥材料 - ikm = key1 + key2 - hkdf = hkdf_mod.HKDF( - algorithm=hashes_mod.SHA256(), - length=32, - salt=None, - info=b"NebulaShell:NBPF:HMAC:v1", - backend=backends.default_backend(), + m = _ModuleCache.hkdf() + h = _ModuleCache.hashes() + b = _ModuleCache.backends() + hkdf = m.HKDF( + algorithm=h.SHA256(), length=32, salt=None, + info=b"NebulaShell:NBPF:HMAC:v1", backend=b.default_backend(), ) - return hkdf.derive(ikm) + return hkdf.derive(key1 + key2) - # ── AES-256-GCM 加密/解密 ── + # ── AES-256-GCM 加密/解密(通用) ── @staticmethod def _aes_encrypt(data: bytes, key: bytes) -> Tuple[bytes, bytes, bytes]: - """AES-256-GCM 加密,返回 (nonce, ciphertext, tag) - - 注意:cryptography 库的 AESGCM.encrypt() 返回 ciphertext || tag(不含 nonce), - nonce 需要由调用方管理并传入 decrypt。 - """ - aead_mod = NBPCrypto._imp_crypto() - aesgcm = aead_mod.AESGCM(key) - nonce = os.urandom(NBPCrypto._aes_nonce_len()) - # AESGCM.encrypt(nonce, data, aad) → ciphertext + tag + """AES-256-GCM 加密,返回 (nonce, ciphertext, tag)""" + m = _ModuleCache.aesgcm() + aesgcm = m.AESGCM(key) + nonce = os.urandom(NBPCrypto.AES_NONCE_LEN) combined = aesgcm.encrypt(nonce, data, None) - tag = combined[-NBPCrypto._aes_tag_len():] - ct = combined[:-NBPCrypto._aes_tag_len()] - return nonce, ct, tag + return nonce, combined[:-NBPCrypto.AES_TAG_LEN], combined[-NBPCrypto.AES_TAG_LEN:] @staticmethod def _aes_decrypt(ciphertext: bytes, key: bytes, nonce: bytes, tag: bytes) -> bytes: - """AES-256-GCM 解密""" - aead_mod = NBPCrypto._imp_crypto() - aesgcm = aead_mod.AESGCM(key) - # AESGCM.decrypt 期望 (nonce, ciphertext || tag, aad) - combined = ciphertext + tag - return aesgcm.decrypt(nonce, combined, None) + m = _ModuleCache.aesgcm() + return m.AESGCM(key).decrypt(nonce, ciphertext + tag, None) - # ── 外层加密/解密 ── + # ── 通用分层加密/解密(outer 和 inner 共用) ── @staticmethod - def outer_encrypt(data: bytes, key: bytes) -> dict: - """外层 AES-256-GCM 加密,返回加密信息字典""" + def _layer_encrypt(data: bytes, key: bytes) -> dict: nonce, ct, tag = NBPCrypto._aes_encrypt(data, key) return { "nonce": base64.b64encode(nonce).decode(), @@ -291,174 +189,103 @@ class NBPCrypto: } @staticmethod - def outer_decrypt(enc_info: dict, key: bytes) -> bytes: - """外层 AES-256-GCM 解密""" - nonce = base64.b64decode(enc_info["nonce"]) - ct = base64.b64decode(enc_info["ciphertext"]) - tag = base64.b64decode(enc_info["tag"]) - return NBPCrypto._aes_decrypt(ct, key, nonce, tag) + def _layer_decrypt(enc_info: dict, key: bytes) -> bytes: + return NBPCrypto._aes_decrypt( + base64.b64decode(enc_info["ciphertext"]), + key, + base64.b64decode(enc_info["nonce"]), + base64.b64decode(enc_info["tag"]), + ) - # ── 中层加密/解密 ── + # ── 别名:对外接口保持兼容 ── + outer_encrypt = _layer_encrypt + outer_decrypt = _layer_decrypt + inner_encrypt = _layer_encrypt + inner_decrypt = _layer_decrypt - @staticmethod - def inner_encrypt(data: bytes, key: bytes) -> dict: - """中层 AES-256-GCM 加密""" - nonce, ct, tag = NBPCrypto._aes_encrypt(data, key) - return { - "nonce": base64.b64encode(nonce).decode(), - "ciphertext": base64.b64encode(ct).decode(), - "tag": base64.b64encode(tag).decode(), - } - - @staticmethod - def inner_decrypt(enc_info: dict, key: bytes) -> bytes: - """中层 AES-256-GCM 解密""" - nonce = base64.b64decode(enc_info["nonce"]) - ct = base64.b64decode(enc_info["ciphertext"]) - tag = base64.b64decode(enc_info["tag"]) - return NBPCrypto._aes_decrypt(ct, key, nonce, tag) - - # ── Ed25519 外层签名/验签 ── + # ── Ed25519 签名/验签 ── @staticmethod def outer_sign(data: bytes, private_key: bytes) -> bytes: - """Ed25519 签名""" - ed25519 = NBPCrypto._imp_ed25519() - key = ed25519.Ed25519PrivateKey.from_private_bytes(private_key) - return key.sign(data) + m = _ModuleCache.ed25519() + return m.Ed25519PrivateKey.from_private_bytes(private_key).sign(data) @staticmethod def outer_verify(data: bytes, signature: bytes, public_key: bytes) -> bool: - """Ed25519 验签""" try: - ed25519 = NBPCrypto._imp_ed25519() - key = ed25519.Ed25519PublicKey.from_public_bytes(public_key) - key.verify(signature, data) + m = _ModuleCache.ed25519() + m.Ed25519PublicKey.from_public_bytes(public_key).verify(signature, data) return True except Exception: return False - # ── RSA-4096-PSS 中层签名/验签 ── + # ── RSA-4096-PSS 签名/验签 ── @staticmethod def inner_sign(data: bytes, private_key_pem: bytes) -> bytes: - """RSA-4096-PSS 签名""" - serialization = NBPCrypto._imp_serialization() - hashes_mod = NBPCrypto._imp_hashes() - padding_mod = NBPCrypto._imp_padding() - backends = NBPCrypto._imp_backends() - - private_key = serialization.load_pem_private_key( - private_key_pem, password=None, backend=backends.default_backend() - ) - signature = private_key.sign( - data, - padding_mod.PSS( - mgf=padding_mod.MGF1(hashes_mod.SHA256()), - salt_length=padding_mod.PSS.MAX_LENGTH - ), - hashes_mod.SHA256() - ) - return signature + s = _ModuleCache.serialization() + h = _ModuleCache.hashes() + p = _ModuleCache.padding() + b = _ModuleCache.backends() + priv = s.load_pem_private_key(private_key_pem, password=None, backend=b.default_backend()) + return priv.sign(data, p.PSS(mgf=p.MGF1(h.SHA256()), salt_length=p.PSS.MAX_LENGTH), h.SHA256()) @staticmethod def inner_verify(data: bytes, signature: bytes, public_key_pem: bytes) -> bool: - """RSA-4096-PSS 验签""" try: - serialization = NBPCrypto._imp_serialization() - hashes_mod = NBPCrypto._imp_hashes() - padding_mod = NBPCrypto._imp_padding() - backends = NBPCrypto._imp_backends() - - public_key = serialization.load_pem_public_key( - public_key_pem, backend=backends.default_backend() - ) - public_key.verify( - signature, data, - padding_mod.PSS( - mgf=padding_mod.MGF1(hashes_mod.SHA256()), - salt_length=padding_mod.PSS.MAX_LENGTH - ), - hashes_mod.SHA256() - ) + s = _ModuleCache.serialization() + h = _ModuleCache.hashes() + p = _ModuleCache.padding() + b = _ModuleCache.backends() + pub = s.load_pem_public_key(public_key_pem, backend=b.default_backend()) + pub.verify(signature, data, p.PSS(mgf=p.MGF1(h.SHA256()), salt_length=p.PSS.MAX_LENGTH), h.SHA256()) return True except Exception: return False - # ── HMAC-SHA256 内层模块签名/验签 ── + # ── HMAC-SHA256 模块签名 ── @staticmethod def module_sign(data: bytes, hmac_key: bytes) -> str: - """HMAC-SHA256 模块签名""" - h = hmac.new(hmac_key, data, hashlib.sha256) - return base64.b64encode(h.digest()).decode() + return base64.b64encode(hmac.new(hmac_key, data, hashlib.sha256).digest()).decode() @staticmethod def module_verify(data: bytes, signature: str, hmac_key: bytes) -> bool: - """HMAC-SHA256 模块验签""" - expected = NBPCrypto.module_sign(data, hmac_key) - return hmac.compare_digest(expected, signature) + return hmac.compare_digest(NBPCrypto.module_sign(data, hmac_key), signature) # ── RSA-OAEP 密钥封装 ── @staticmethod def encrypt_key(aes_key: bytes, rsa_public_key_pem: bytes) -> str: - """RSA-OAEP 加密 AES 密钥""" - serialization = NBPCrypto._imp_serialization() - hashes_mod = NBPCrypto._imp_hashes() - padding_mod = NBPCrypto._imp_padding() - backends = NBPCrypto._imp_backends() - - public_key = serialization.load_pem_public_key( - rsa_public_key_pem, backend=backends.default_backend() - ) - encrypted = public_key.encrypt( - aes_key, - padding_mod.OAEP( - mgf=padding_mod.MGF1(algorithm=hashes_mod.SHA256()), - algorithm=hashes_mod.SHA256(), - label=None - ) - ) + s = _ModuleCache.serialization() + h = _ModuleCache.hashes() + p = _ModuleCache.padding() + b = _ModuleCache.backends() + pub = s.load_pem_public_key(rsa_public_key_pem, backend=b.default_backend()) + encrypted = pub.encrypt(aes_key, p.OAEP(mgf=p.MGF1(algorithm=h.SHA256()), algorithm=h.SHA256(), label=None)) return base64.b64encode(encrypted).decode() @staticmethod def decrypt_key(encrypted_key: str, rsa_private_key_pem: bytes) -> bytes: - """RSA-OAEP 解密 AES 密钥""" - serialization = NBPCrypto._imp_serialization() - hashes_mod = NBPCrypto._imp_hashes() - padding_mod = NBPCrypto._imp_padding() - backends = NBPCrypto._imp_backends() - - private_key = serialization.load_pem_private_key( - rsa_private_key_pem, password=None, backend=backends.default_backend() - ) - encrypted = base64.b64decode(encrypted_key) - aes_key = private_key.decrypt( - encrypted, - padding_mod.OAEP( - mgf=padding_mod.MGF1(algorithm=hashes_mod.SHA256()), - algorithm=hashes_mod.SHA256(), - label=None - ) - ) - return aes_key + s = _ModuleCache.serialization() + h = _ModuleCache.hashes() + p = _ModuleCache.padding() + b = _ModuleCache.backends() + priv = s.load_pem_private_key(rsa_private_key_pem, password=None, backend=b.default_backend()) + return priv.decrypt(base64.b64decode(encrypted_key), p.OAEP(mgf=p.MGF1(algorithm=h.SHA256()), algorithm=h.SHA256(), label=None)) # ── 密钥文件读写 ── @staticmethod - def save_key_to_pem(key_bytes: bytes, path: str, is_private: bool = False): - """保存密钥到 PEM 文件""" - import os as _os - dir_path = _os.path.dirname(path) + def save_key_to_pem(key_bytes: bytes, path: str): + dir_path = os.path.dirname(path) if dir_path: - _os.makedirs(dir_path, exist_ok=True) + os.makedirs(dir_path, exist_ok=True) with open(path, "wb") as f: f.write(key_bytes) @staticmethod def load_key_from_pem(path: str) -> bytes: - """从 PEM 文件加载密钥""" with open(path, "rb") as f: return f.read() @@ -472,10 +299,7 @@ class NBPCrypto: rsa_private_key_pem: bytes, rsa_public_key_pem: bytes, ) -> dict: - """完整加密打包流程 - - 返回包含所有加密/签名信息的字典,供 NBPFPacker 使用 - """ + """完整加密打包流程""" # 1. 生成两个 AES 密钥 key1 = NBPCrypto.generate_aes_key() key2 = NBPCrypto.generate_aes_key() @@ -484,23 +308,22 @@ class NBPCrypto: hmac_key = NBPCrypto.derive_hmac_key(key1, key2) # 3. 中层加密:用 key2 加密每个 NIR 模块 - inner_encrypted = {} - for mod_name, mod_data in nir_data.items(): - inner_encrypted[mod_name] = NBPCrypto.inner_encrypt(mod_data, key2) + inner_encrypted = { + mod_name: NBPCrypto.inner_encrypt(mod_data, key2) + for mod_name, mod_data in nir_data.items() + } # 4. 中层签名:用 RSA 签名 NIR 数据摘要 - nir_digest = hashlib.sha256() - for mod_name in sorted(inner_encrypted.keys()): - nir_digest.update(mod_name.encode()) - nir_digest.update(inner_encrypted[mod_name]["ciphertext"].encode()) - inner_signature = NBPCrypto.inner_sign(nir_digest.digest(), rsa_private_key_pem) + inner_sig = NBPCrypto._build_nir_digest(inner_encrypted) + inner_signature = NBPCrypto.inner_sign(inner_sig, rsa_private_key_pem) # 5. 内层签名:用 HMAC 签名每个模块 - module_sigs = {} - for mod_name, mod_data in nir_data.items(): - module_sigs[mod_name] = NBPCrypto.module_sign(mod_data, hmac_key) + module_sigs = { + mod_name: NBPCrypto.module_sign(mod_data, hmac_key) + for mod_name, mod_data in nir_data.items() + } - # 6. 构建 META-INF 数据(用于外层加密) + # 6. 构建 META-INF 数据,外层加密 meta_inf = { "manifest": manifest, "inner_signature": base64.b64encode(inner_signature).decode(), @@ -510,20 +333,18 @@ class NBPCrypto: }, "module_signatures": module_sigs, } + outer_encrypted = NBPCrypto.outer_encrypt( + json.dumps(meta_inf).encode("utf-8"), key1 + ) - # 7. 外层加密:用 key1 加密 META-INF 数据 - meta_inf_bytes = json.dumps(meta_inf).encode("utf-8") - outer_encrypted = NBPCrypto.outer_encrypt(meta_inf_bytes, key1) + # 7. 外层签名:用 Ed25519 签名包摘要 + outer_sig = NBPCrypto._build_package_digest(outer_encrypted, inner_encrypted) + outer_signature = NBPCrypto.outer_sign(outer_sig, ed25519_private_key) - # 8. 外层签名:用 Ed25519 签名整个包摘要 - package_digest = hashlib.sha256() - package_digest.update(json.dumps(outer_encrypted).encode()) - for mod_name in sorted(inner_encrypted.keys()): - package_digest.update(mod_name.encode()) - package_digest.update(inner_encrypted[mod_name]["ciphertext"].encode()) - outer_signature = NBPCrypto.outer_sign(package_digest.digest(), ed25519_private_key) + # 8. 内存擦除 + NBPCrypto._secure_wipe(bytearray(key1)) + NBPCrypto._secure_wipe(bytearray(key2)) - # 9. 返回结果 return { "outer_encryption": { "algorithm": "AES-256-GCM", @@ -547,77 +368,80 @@ class NBPCrypto: rsa_private_key_pem: bytes, rsa_public_key_pem: bytes = None, ) -> dict[str, bytes]: - """完整解密流程,返回 NIR 数据字典 {module_name: nir_bytes} - - Args: - package_info: 包信息字典(来自 full_encrypt_package 的输出) - ed25519_public_key: Ed25519 公钥(外层验签) - rsa_private_key_pem: RSA 私钥 PEM(用于解密 AES 密钥) - rsa_public_key_pem: RSA 公钥 PEM(中层验签,如果为 None 则跳过中层验签) - - Raises: - NBPCryptoError: 任何验证或解密失败 - """ - - # 反调试检测 + """完整解密流程,返回 {module_name: nir_bytes}""" if NBPCrypto._anti_debug_check(): raise NBPCryptoError("调试器检测到,拒绝解密") # 1. 外层验签 - outer_sig = base64.b64decode(package_info["outer_signature"]) - package_digest = hashlib.sha256() - package_digest.update(json.dumps(package_info["outer_encryption"]["data"]).encode()) - for mod_name in sorted(package_info["inner_encrypted"].keys()): - package_digest.update(mod_name.encode()) - package_digest.update(package_info["inner_encrypted"][mod_name]["ciphertext"].encode()) - if not NBPCrypto.outer_verify(package_digest.digest(), outer_sig, ed25519_public_key): - raise NBPCryptoError("外层签名验证失败,包可能被篡改") + NBPCrypto._verify_outer_sig(package_info, ed25519_public_key) - # 2. 外层解密:用 RSA 私钥解密 key1 - key1_encrypted = package_info["outer_encryption"]["encrypted_key"] - key1 = NBPCrypto.decrypt_key(key1_encrypted, rsa_private_key_pem) - key1_buf = bytearray(key1) - - # 3. 解密 META-INF 数据 - meta_inf_bytes = NBPCrypto.outer_decrypt( - package_info["outer_encryption"]["data"], key1 + # 2. 外层解密:key1 → META-INF + key1 = NBPCrypto.decrypt_key( + package_info["outer_encryption"]["encrypted_key"], rsa_private_key_pem ) - NBPCrypto._secure_wipe(key1_buf) + meta_inf = json.loads( + NBPCrypto.outer_decrypt(package_info["outer_encryption"]["data"], key1).decode("utf-8") + ) + NBPCrypto._secure_wipe(bytearray(key1)) - meta_inf = json.loads(meta_inf_bytes.decode("utf-8")) - - # 4. 中层验签(如果提供了 RSA 公钥) + # 3. 中层验签 if rsa_public_key_pem: - inner_sig = base64.b64decode(meta_inf["inner_signature"]) - nir_digest = hashlib.sha256() - for mod_name in sorted(package_info["inner_encrypted"].keys()): - nir_digest.update(mod_name.encode()) - nir_digest.update(package_info["inner_encrypted"][mod_name]["ciphertext"].encode()) - if not NBPCrypto.inner_verify(nir_digest.digest(), inner_sig, rsa_public_key_pem): - raise NBPCryptoError("中层 RSA 签名验证失败,插件作者身份无法确认") + NBPCrypto._verify_inner_sig(package_info, meta_inf, rsa_public_key_pem) - # 5. 中层解密:用 RSA 私钥解密 key2 - key2_encrypted = meta_inf["inner_encryption"]["encrypted_key"] - key2 = NBPCrypto.decrypt_key(key2_encrypted, rsa_private_key_pem) - key2_buf = bytearray(key2) + # 4. 中层解密:key2 + key2 = NBPCrypto.decrypt_key( + meta_inf["inner_encryption"]["encrypted_key"], rsa_private_key_pem + ) - # 6. 派生 HMAC 密钥 + # 5. 派生 HMAC 密钥 + 解密 NIR hmac_key = NBPCrypto.derive_hmac_key(key1, key2) - # key1 已经擦除,key2 即将擦除 NBPCrypto._secure_wipe(bytearray(key2)) - # 7. 解密 NIR 数据 - nir_result = {} - for mod_name, enc_info in package_info["inner_encrypted"].items(): - mod_data = NBPCrypto.inner_decrypt(enc_info, key2) - nir_result[mod_name] = mod_data + nir_result = { + mod_name: NBPCrypto.inner_decrypt(enc_info, key2) + for mod_name, enc_info in package_info["inner_encrypted"].items() + } - # 8. 内层验签 + # 6. 内层验签 module_sigs = meta_inf.get("module_signatures", {}) for mod_name, mod_data in nir_result.items(): - expected_sig = module_sigs.get(mod_name) - if expected_sig: - if not NBPCrypto.module_verify(mod_data, expected_sig, hmac_key): - raise NBPCryptoError(f"模块 '{mod_name}' HMAC 签名验证失败") + expected = module_sigs.get(mod_name) + if expected and not NBPCrypto.module_verify(mod_data, expected, hmac_key): + raise NBPCryptoError(f"模块 '{mod_name}' HMAC 签名验证失败") return nir_result + + # ── 内部工具方法 ── + + @staticmethod + def _build_nir_digest(inner_encrypted: dict) -> bytes: + d = hashlib.sha256() + for mod_name in sorted(inner_encrypted): + d.update(mod_name.encode()) + d.update(inner_encrypted[mod_name]["ciphertext"].encode()) + return d.digest() + + @staticmethod + def _build_package_digest(outer_encrypted: dict, inner_encrypted: dict) -> bytes: + d = hashlib.sha256() + d.update(json.dumps(outer_encrypted).encode()) + for mod_name in sorted(inner_encrypted): + d.update(mod_name.encode()) + d.update(inner_encrypted[mod_name]["ciphertext"].encode()) + return d.digest() + + @staticmethod + def _verify_outer_sig(package_info: dict, ed25519_public_key: bytes): + sig = base64.b64decode(package_info["outer_signature"]) + digest = NBPCrypto._build_package_digest( + package_info["outer_encryption"]["data"], package_info["inner_encrypted"] + ) + if not NBPCrypto.outer_verify(digest, sig, ed25519_public_key): + raise NBPCryptoError("外层签名验证失败") + + @staticmethod + def _verify_inner_sig(package_info: dict, meta_inf: dict, rsa_public_key_pem: bytes): + sig = base64.b64decode(meta_inf["inner_signature"]) + digest = NBPCrypto._build_nir_digest(package_info["inner_encrypted"]) + if not NBPCrypto.inner_verify(digest, sig, rsa_public_key_pem): + raise NBPCryptoError("中层 RSA 签名验证失败") diff --git a/oss/core/nbpf/format.py b/oss/core/nbpf/format.py index 4a72ae6..17830e6 100644 --- a/oss/core/nbpf/format.py +++ b/oss/core/nbpf/format.py @@ -32,7 +32,7 @@ from pathlib import Path from typing import Optional from oss.logger.logger import Log -from .crypto import NBPCrypto, NBPCryptoError +from .crypto import NBPCrypto, NBPCryptoError, _ModuleCache from .compiler import NIRCompiler, NIRCompileError @@ -148,11 +148,10 @@ class NBPFPacker: zf.writestr(NBPFFormatter.SIGNER_PEM, ed25519_public_key) else: # 从私钥派生公钥 - ed25519_mod = NBPCrypto._imp_ed25519() - key = ed25519_mod.Ed25519PrivateKey.from_private_bytes(ed25519_private_key) + key = _ModuleCache.ed25519().Ed25519PrivateKey.from_private_bytes(ed25519_private_key) + s = _ModuleCache.serialization() pub_bytes = key.public_key().public_bytes( - NBPCrypto._imp_serialization().Encoding.Raw, - NBPCrypto._imp_serialization().PublicFormat.Raw + s.Encoding.Raw, s.PublicFormat.Raw ) zf.writestr(NBPFFormatter.SIGNER_PEM, pub_bytes) diff --git a/oss/core/nbpf/loader.py b/oss/core/nbpf/loader.py index a29ae5c..7669e5b 100644 --- a/oss/core/nbpf/loader.py +++ b/oss/core/nbpf/loader.py @@ -79,8 +79,11 @@ class NBPFLoader: try: with zipfile.ZipFile(nbpf_path, 'r') as zf: + # 0. 一次扫描,缓存 NIR 模块数据 + nir_modules = self._read_nir_modules(zf) + # 1. 外层验签(先用包内公钥验签,再查信任状态) - signer_pub_key, is_trusted, trusted_name = self._verify_outer_signature(zf) + signer_pub_key, is_trusted, trusted_name = self._verify_outer_signature(zf, nir_modules) status = "已信任" if is_trusted else "未信任" Log.info("NBPF", f"外层签名验证通过 (signer: {trusted_name or 'unknown'}, {status})") @@ -89,7 +92,7 @@ class NBPFLoader: key1_buf = bytearray(key1) # 3. 中层验签(传入外层签名者名称,确保内外签名者一致) - rsa_signer = self._verify_inner_signature(zf, meta_inf, trusted_name) + rsa_signer = self._verify_inner_signature(nir_modules, meta_inf, trusted_name) Log.info("NBPF", f"中层签名验证通过 (signer: {rsa_signer})") # 4. 中层解密 @@ -102,7 +105,7 @@ class NBPFLoader: self.crypto._secure_wipe(key2_buf) # 6. 解密 NIR 数据 - nir_data = self._decrypt_nir_data(zf, key2) + nir_data = self._decrypt_nir_data_raw(nir_modules, key2) # 7. 内层验签 self._verify_module_signatures(nir_data, meta_inf, hmac_key) @@ -145,9 +148,19 @@ class NBPFLoader: except Exception as e: raise NBPFLoadError(f"加载失败: {type(e).__name__}: {e}") from e + @staticmethod + def _read_nir_modules(zf: zipfile.ZipFile) -> dict[str, dict]: + """一次扫描 ZIP 中所有 NIR 模块,缓存结果""" + modules = {} + for info in zf.infolist(): + if info.filename.startswith(NBPFFormatter.NIR_DIR) and not info.filename.endswith("/"): + mod_name = info.filename[len(NBPFFormatter.NIR_DIR):] + modules[mod_name] = json.loads(zf.read(info.filename).decode("utf-8")) + return modules + # ── 外层验签 ── - def _verify_outer_signature(self, zf: zipfile.ZipFile) -> tuple[bytes, bool, str | None]: + def _verify_outer_signature(self, zf: zipfile.ZipFile, nir_modules: dict[str, dict]) -> tuple[bytes, bool, str | None]: """外层 Ed25519 签名验证 先用包内公钥验签(不依赖外部信任列表),验签通过后再检查信任状态。 @@ -157,9 +170,6 @@ class NBPFLoader: Returns: (signer_pub_key_bytes, is_trusted, trusted_name) - - signer_pub_key_bytes: 签名者 Ed25519 公钥(用于上层判断信任) - - is_trusted: 公钥是否在本地信任列表中 - - trusted_name: 信任列表中的名称(不信任时为 None) """ if NBPFFormatter.SIGNATURE not in zf.namelist(): raise NBPFLoadError("缺少外层签名文件") @@ -169,29 +179,16 @@ class NBPFLoader: signature_b64 = zf.read(NBPFFormatter.SIGNATURE).decode().strip() signer_pub_key = zf.read(NBPFFormatter.SIGNER_PEM) - # 计算包摘要(与 full_encrypt_package 一致) + # 计算包摘要(使用缓存的 nir_modules) encryption_data = json.loads(zf.read(NBPFFormatter.ENCRYPTION).decode("utf-8")) - digest = hashlib.sha256() - digest.update(json.dumps(encryption_data["data"]).encode()) + digest = NBPCrypto._build_package_digest(encryption_data["data"], nir_modules) - # 按模块名排序,添加模块名和密文 - nir_modules = {} - for info in zf.infolist(): - if info.filename.startswith(NBPFFormatter.NIR_DIR) and not info.filename.endswith("/"): - mod_name = info.filename[len(NBPFFormatter.NIR_DIR):] - mod_data = json.loads(zf.read(info.filename).decode("utf-8")) - nir_modules[mod_name] = mod_data - - for mod_name in sorted(nir_modules.keys()): - digest.update(mod_name.encode()) - digest.update(nir_modules[mod_name]["ciphertext"].encode()) - - # 直接用包内公钥验签(不依赖外部信任列表) + # 直接用包内公钥验签 signature = base64.b64decode(signature_b64) - if not self.crypto.outer_verify(digest.digest(), signature, signer_pub_key): + if not self.crypto.outer_verify(digest, signature, signer_pub_key): raise NBPFLoadError("外层签名验证失败,包可能被篡改") - # 验签通过后,检查公钥是否在本地信任列表中 + # 检查公钥是否在本地信任列表中 is_trusted = False trusted_name = None for name, trusted_key in self.trusted_ed25519_keys.items(): @@ -224,7 +221,7 @@ class NBPFLoader: # ── 中层验签 ── - def _verify_inner_signature(self, zf: zipfile.ZipFile, meta_inf: dict, ed25519_signer: str = None) -> str: + def _verify_inner_signature(self, nir_modules: dict[str, dict], meta_inf: dict, ed25519_signer: str = None) -> str: """中层 RSA-4096 签名验证,返回签名者名称 签名计算方式与 full_encrypt_package 一致。 @@ -232,7 +229,7 @@ class NBPFLoader: 否则遍历所有信任的 RSA 密钥。 Args: - zf: 打开的 ZIP 文件 + nir_modules: 缓存的 NIR 模块数据 meta_inf: 解密后的 META-INF 数据 ed25519_signer: 外层 Ed25519 签名者名称 @@ -246,20 +243,8 @@ class NBPFLoader: if not inner_sig_b64: raise NBPFLoadError("缺少中层签名") - # 计算 NIR 数据摘要(与 full_encrypt_package 一致) - nir_modules = {} - for info in zf.infolist(): - if info.filename.startswith(NBPFFormatter.NIR_DIR) and not info.filename.endswith("/"): - mod_name = info.filename[len(NBPFFormatter.NIR_DIR):] - mod_data = json.loads(zf.read(info.filename).decode("utf-8")) - nir_modules[mod_name] = mod_data - - nir_digest = hashlib.sha256() - for mod_name in sorted(nir_modules.keys()): - nir_digest.update(mod_name.encode()) - nir_digest.update(nir_modules[mod_name]["ciphertext"].encode()) - - # 查找匹配的 RSA 公钥 + # 使用缓存的 nir_modules 计算摘要 + nir_digest = NBPCrypto._build_nir_digest(nir_modules) inner_sig = base64.b64decode(inner_sig_b64) # 优先使用与外层签名者同名的 RSA 密钥 @@ -267,11 +252,10 @@ class NBPFLoader: if ed25519_signer and ed25519_signer in self.trusted_rsa_keys: candidates.append((ed25519_signer, self.trusted_rsa_keys[ed25519_signer])) else: - # 未指定或未找到同名密钥,遍历全部 candidates = list(self.trusted_rsa_keys.items()) for name, rsa_pub_key in candidates: - if self.crypto.inner_verify(nir_digest.digest(), inner_sig, rsa_pub_key): + if self.crypto.inner_verify(nir_digest, inner_sig, rsa_pub_key): return name raise NBPFLoadError("中层签名验证失败,无法匹配任何信任的 RSA 公钥") @@ -291,21 +275,12 @@ class NBPFLoader: # ── 解密 NIR 数据 ── - def _decrypt_nir_data(self, zf: zipfile.ZipFile, key2: bytes) -> dict[str, bytes]: - """解密 NIR 数据,返回 {module_name: nir_bytes}""" - nir_data = {} - for info in zf.infolist(): - if not info.filename.startswith(NBPFFormatter.NIR_DIR): - continue - if info.filename.endswith("/"): - continue - - module_name = info.filename[len(NBPFFormatter.NIR_DIR):] - enc_info = json.loads(zf.read(info.filename).decode("utf-8")) - mod_data = self.crypto.inner_decrypt(enc_info, key2) - nir_data[module_name] = mod_data - - return nir_data + def _decrypt_nir_data_raw(self, nir_modules: dict[str, dict], key2: bytes) -> dict[str, bytes]: + """解密 NIR 数据,使用缓存的模块数据""" + return { + mod_name: self.crypto.inner_decrypt(enc_info, key2) + for mod_name, enc_info in nir_modules.items() + } # ── 内层验签 ── diff --git a/oss/core/pl_injector.py b/oss/core/pl_injector.py index 139157f..efdc1af 100644 --- a/oss/core/pl_injector.py +++ b/oss/core/pl_injector.py @@ -131,10 +131,10 @@ class PLInjector: for dangerous in ['import ', 'exec(', 'eval(', 'compile(', 'os.', 'sys.', 'subprocess']: if dangerous in decoded: raise PLValidationError(f"{file_path} - 检测到 base64 编码的恶意代码") - except Exception: - pass - except Exception: - pass + except Exception as e: + print(f"[PLInjector] 模块注入错误: {e}") + except Exception as e: + print(f"[PLInjector] 错误: {e}") # 检查字符串拼接绕过 concat_patterns = [ diff --git a/oss/core/repl/main.py b/oss/core/repl/main.py index 7f6891e..40ee938 100644 --- a/oss/core/repl/main.py +++ b/oss/core/repl/main.py @@ -30,16 +30,16 @@ class NebulaShell(cmd.Cmd): """加载命令历史记录""" try: readline.read_history_file(HISTORY_FILE) - except (FileNotFoundError, OSError): - pass + except (FileNotFoundError, OSError) as e: + print(f"[REPL] 文件操作失败: {e}") readline.set_history_length(500) def _save_history(self): """保存命令历史记录""" try: readline.write_history_file(HISTORY_FILE) - except OSError: - pass + except OSError as e: + print(f"[REPL] 系统错误: {e}") def _get_plugins(self): """获取所有已加载的插件列表""" diff --git a/oss/plugin/manager.py b/oss/plugin/manager.py index 9d9c831..722c786 100644 --- a/oss/plugin/manager.py +++ b/oss/plugin/manager.py @@ -31,8 +31,8 @@ class PluginManager: try: validator = get_validator() validator.unlock("deep_diver") - except Exception: - pass + except Exception as e: + print(f"[PluginManager] 错误: {e}") def start(self): """启动 Core,它会初始化并启动所有其他插件""" @@ -57,8 +57,8 @@ class PluginManager: # 检查插件数量成就 plugin_count = len(self.core.plugins) validator.check_plugin_count(plugin_count) - except Exception: - pass + except Exception as e: + print(f"[PluginManager] 错误: {e}") def stop(self): """停止所有插件""" @@ -78,5 +78,5 @@ class PluginManager: try: validator = get_validator() validator.track_progress("session_end") - except Exception: - pass + except Exception as e: + print(f"[PluginManager] 错误: {e}") diff --git a/oss/store/NebulaShell/i18n/main.py b/oss/store/NebulaShell/i18n/main.py index 2974111..b763603 100644 --- a/oss/store/NebulaShell/i18n/main.py +++ b/oss/store/NebulaShell/i18n/main.py @@ -66,8 +66,8 @@ class I18n: if domain not in self._translations: self._translations[domain] = {} self._translations[domain].update(data) - except (json.JSONDecodeError, OSError): - pass + except (json.JSONDecodeError, OSError) as e: + print(f"[i18n] 翻译文件加载失败: {e}") self._loaded_domains.add(domain) def _find_translation_files(self, domain: str) -> list[str]: diff --git a/oss/store/NebulaShell/plugin-storage/main.py b/oss/store/NebulaShell/plugin-storage/main.py index f0f4b5b..19b3f2d 100644 --- a/oss/store/NebulaShell/plugin-storage/main.py +++ b/oss/store/NebulaShell/plugin-storage/main.py @@ -59,8 +59,8 @@ class PluginStorage: data = json.loads(file_path.read_text(encoding="utf-8")) self._mem_cache[plugin_name][key] = data return data - except (json.JSONDecodeError, OSError): - pass + except (json.JSONDecodeError, OSError) as e: + print(f"[PluginStorage] 读取缓存失败: {e}") return default def delete(self, plugin_name: str, key: str) -> bool: @@ -90,8 +90,8 @@ class PluginStorage: for f in pd.glob("*.json"): try: f.unlink() - except OSError: - pass + except OSError as e: + print(f"[PluginStorage] 文件删除失败: {e}") return True def set_raw(self, plugin_name: str, file_name: str, data: bytes) -> bool: diff --git a/oss/store/NebulaShell/ws-api/main.py b/oss/store/NebulaShell/ws-api/main.py index d57e123..00b9cec 100644 --- a/oss/store/NebulaShell/ws-api/main.py +++ b/oss/store/NebulaShell/ws-api/main.py @@ -71,7 +71,7 @@ class WsApi: async for message in websocket: await self._dispatch(websocket, message, addr) except websockets.exceptions.ConnectionClosed: - pass + pass # WebSocket 正常断连 finally: Log.info("WsApi", f"WebSocket 断开: {addr}") for topic in list(self._connections.keys()): @@ -123,8 +123,8 @@ class WsApi: async def _send(self, websocket, data: dict): try: await websocket.send(json.dumps(data, ensure_ascii=False)) - except Exception: - pass + except Exception as e: + print(f"[WsApi] 连接处理错误: {e}") def register_handler(self, msg_type: str, handler: Callable): self._handlers[msg_type] = handler diff --git a/oss/webui/index.html b/oss/webui/index.html index 846f0ed..2efc774 100644 --- a/oss/webui/index.html +++ b/oss/webui/index.html @@ -3,7 +3,7 @@ - NebulaShell v1.1.0 | 控制台 + NebulaShell v1.2.0 | 控制台