Files
NebulaShell/oss/core/nbpf/crypto.py
Starlight-apk e67d2d8ef6
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
refactor: 优化 NBPF 模块 - 缓存导入/合并重复方法/减少I/O
- crypto.py: 8个_imp_*方法改为_ModuleCache类缓存导入
- crypto.py: outer/inner加解密合并为_layer_encrypt/decrypt
- crypto.py: 提取公共摘要计算方法,拆分长方法
- compiler.py: 删除_obfuscate_code中未使用的死代码
- loader.py: 3次ZIP扫描合并为1次缓存读取
- format.py: 更新为使用_ModuleCache
- 合计减少205行代码(1707→1502)
2026-05-17 15:36:45 +08:00

448 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""多重签名 + 多重加密工具
加密层级(从外到内):
1. Ed25519 外层签名 — 验证包完整性
2. AES-256-GCM 外层加密 — 加密 META-INF/ 和 NIR/
3. RSA-4096-PSS 中层签名 — 验证插件作者身份
4. AES-256-GCM 中层加密 — 加密 NIR 数据
5. HMAC-SHA256 内层签名 — 验证每个模块
代码隐藏策略:
- 关键常量运行时计算
- 导入路径动态拼接
- 解密函数分散
- 反调试检测
- 内存擦除
"""
import os
import sys
import json
import hmac
import hashlib
import base64
from typing import Optional, Tuple
class NBPCryptoError(Exception):
"""NBPF 加密/解密错误"""
pass
class _ModuleCache:
"""混淆导入缓存 — 动态导入的 cryptography 模块只加载一次"""
_cache: dict[str, object] = {}
@classmethod
def _path(cls, *parts: str) -> str:
return ".".join(parts)
@classmethod
def _imp(cls, key: str, module_path: str, fromlist: list[str] = None):
if key not in cls._cache:
cls._cache[key] = __import__(module_path, fromlist=fromlist or [])
return cls._cache[key]
@classmethod
def aesgcm(cls):
return cls._imp("aesgcm", cls._path("cryptography", "hazmat", "primitives", "ciphers", "aead"), ["AESGCM"])
@classmethod
def ed25519(cls):
return cls._imp("ed25519", cls._path("cryptography", "hazmat", "primitives", "asymmetric", "ed25519"), ["Ed25519PrivateKey"])
@classmethod
def rsa(cls):
return cls._imp("rsa", cls._path("cryptography", "hazmat", "primitives", "asymmetric", "rsa"), ["generate_private_key"])
@classmethod
def serialization(cls):
return cls._imp("serialization", cls._path("cryptography", "hazmat", "primitives", "serialization"), ["Encoding"])
@classmethod
def hashes(cls):
return cls._imp("hashes", cls._path("cryptography", "hazmat", "primitives", "hashes"), ["SHA256"])
@classmethod
def padding(cls):
return cls._imp("padding", cls._path("cryptography", "hazmat", "primitives", "asymmetric", "padding"), ["OAEP"])
@classmethod
def backends(cls):
return cls._imp("backends", cls._path("cryptography", "hazmat", "backends"), ["default_backend"])
@classmethod
def hkdf(cls):
return cls._imp("hkdf", cls._path("cryptography", "hazmat", "primitives", "kdf", "hkdf"), ["HKDF"])
class NBPCrypto:
"""多重签名 + 多重加密工具"""
# ── 关键常量(运行时计算) ──
AES_KEY_LEN = 32
AES_NONCE_LEN = 12
AES_TAG_LEN = 16
HMAC_KEY_LEN = 32
RSA_KEY_SIZE = 4096
# ── 反调试检测 ──
@staticmethod
def _anti_debug_check() -> bool:
"""检测是否被调试"""
try:
if sys.gettrace() is not None:
return True
for env in ("PYTHONDEBUG", "PYTHONVERBOSE", "NEBULA_DEBUG"):
if os.environ.get(env, "").lower() in ("1", "true", "yes"):
return True
except Exception:
pass
return False
# ── 安全内存擦除 ──
@staticmethod
def _secure_wipe(data: bytearray):
"""三次覆写安全擦除"""
try:
length = len(data)
for _ in range(3):
for i in range(length):
data[i] = 0
except Exception:
pass
# ── 密钥生成 ──
@staticmethod
def generate_aes_key() -> bytes:
return os.urandom(NBPCrypto.AES_KEY_LEN)
@staticmethod
def generate_ed25519_keypair() -> Tuple[bytes, bytes]:
m = _ModuleCache.ed25519()
s = _ModuleCache.serialization()
private_key = m.Ed25519PrivateKey.generate()
private_bytes = private_key.private_bytes(
s.Encoding.Raw, s.PrivateFormat.Raw, s.NoEncryption()
)
public_bytes = private_key.public_key().public_bytes(
s.Encoding.Raw, s.PublicFormat.Raw
)
return private_bytes, public_bytes
@staticmethod
def generate_rsa_keypair(key_size: int = None) -> Tuple[bytes, bytes]:
if key_size is None:
key_size = NBPCrypto.RSA_KEY_SIZE
m = _ModuleCache.rsa()
s = _ModuleCache.serialization()
b = _ModuleCache.backends()
private_key = m.generate_private_key(65537, key_size, b.default_backend())
private_pem = private_key.private_bytes(
s.Encoding.PEM, s.PrivateFormat.PKCS8, s.NoEncryption()
)
public_pem = private_key.public_key().public_bytes(
s.Encoding.PEM, s.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
# ── 密钥派生 ──
@staticmethod
def derive_hmac_key(key1: bytes, key2: bytes) -> bytes:
m = _ModuleCache.hkdf()
h = _ModuleCache.hashes()
b = _ModuleCache.backends()
hkdf = m.HKDF(
algorithm=h.SHA256(), length=32, salt=None,
info=b"NebulaShell:NBPF:HMAC:v1", backend=b.default_backend(),
)
return hkdf.derive(key1 + key2)
# ── AES-256-GCM 加密/解密(通用) ──
@staticmethod
def _aes_encrypt(data: bytes, key: bytes) -> Tuple[bytes, bytes, bytes]:
"""AES-256-GCM 加密,返回 (nonce, ciphertext, tag)"""
m = _ModuleCache.aesgcm()
aesgcm = m.AESGCM(key)
nonce = os.urandom(NBPCrypto.AES_NONCE_LEN)
combined = aesgcm.encrypt(nonce, data, None)
return nonce, combined[:-NBPCrypto.AES_TAG_LEN], combined[-NBPCrypto.AES_TAG_LEN:]
@staticmethod
def _aes_decrypt(ciphertext: bytes, key: bytes, nonce: bytes, tag: bytes) -> bytes:
m = _ModuleCache.aesgcm()
return m.AESGCM(key).decrypt(nonce, ciphertext + tag, None)
# ── 通用分层加密/解密outer 和 inner 共用) ──
@staticmethod
def _layer_encrypt(data: bytes, key: bytes) -> dict:
nonce, ct, tag = NBPCrypto._aes_encrypt(data, key)
return {
"nonce": base64.b64encode(nonce).decode(),
"ciphertext": base64.b64encode(ct).decode(),
"tag": base64.b64encode(tag).decode(),
}
@staticmethod
def _layer_decrypt(enc_info: dict, key: bytes) -> bytes:
return NBPCrypto._aes_decrypt(
base64.b64decode(enc_info["ciphertext"]),
key,
base64.b64decode(enc_info["nonce"]),
base64.b64decode(enc_info["tag"]),
)
# ── 别名:对外接口保持兼容 ──
outer_encrypt = _layer_encrypt
outer_decrypt = _layer_decrypt
inner_encrypt = _layer_encrypt
inner_decrypt = _layer_decrypt
# ── Ed25519 签名/验签 ──
@staticmethod
def outer_sign(data: bytes, private_key: bytes) -> bytes:
m = _ModuleCache.ed25519()
return m.Ed25519PrivateKey.from_private_bytes(private_key).sign(data)
@staticmethod
def outer_verify(data: bytes, signature: bytes, public_key: bytes) -> bool:
try:
m = _ModuleCache.ed25519()
m.Ed25519PublicKey.from_public_bytes(public_key).verify(signature, data)
return True
except Exception:
return False
# ── RSA-4096-PSS 签名/验签 ──
@staticmethod
def inner_sign(data: bytes, private_key_pem: bytes) -> bytes:
s = _ModuleCache.serialization()
h = _ModuleCache.hashes()
p = _ModuleCache.padding()
b = _ModuleCache.backends()
priv = s.load_pem_private_key(private_key_pem, password=None, backend=b.default_backend())
return priv.sign(data, p.PSS(mgf=p.MGF1(h.SHA256()), salt_length=p.PSS.MAX_LENGTH), h.SHA256())
@staticmethod
def inner_verify(data: bytes, signature: bytes, public_key_pem: bytes) -> bool:
try:
s = _ModuleCache.serialization()
h = _ModuleCache.hashes()
p = _ModuleCache.padding()
b = _ModuleCache.backends()
pub = s.load_pem_public_key(public_key_pem, backend=b.default_backend())
pub.verify(signature, data, p.PSS(mgf=p.MGF1(h.SHA256()), salt_length=p.PSS.MAX_LENGTH), h.SHA256())
return True
except Exception:
return False
# ── HMAC-SHA256 模块签名 ──
@staticmethod
def module_sign(data: bytes, hmac_key: bytes) -> str:
return base64.b64encode(hmac.new(hmac_key, data, hashlib.sha256).digest()).decode()
@staticmethod
def module_verify(data: bytes, signature: str, hmac_key: bytes) -> bool:
return hmac.compare_digest(NBPCrypto.module_sign(data, hmac_key), signature)
# ── RSA-OAEP 密钥封装 ──
@staticmethod
def encrypt_key(aes_key: bytes, rsa_public_key_pem: bytes) -> str:
s = _ModuleCache.serialization()
h = _ModuleCache.hashes()
p = _ModuleCache.padding()
b = _ModuleCache.backends()
pub = s.load_pem_public_key(rsa_public_key_pem, backend=b.default_backend())
encrypted = pub.encrypt(aes_key, p.OAEP(mgf=p.MGF1(algorithm=h.SHA256()), algorithm=h.SHA256(), label=None))
return base64.b64encode(encrypted).decode()
@staticmethod
def decrypt_key(encrypted_key: str, rsa_private_key_pem: bytes) -> bytes:
s = _ModuleCache.serialization()
h = _ModuleCache.hashes()
p = _ModuleCache.padding()
b = _ModuleCache.backends()
priv = s.load_pem_private_key(rsa_private_key_pem, password=None, backend=b.default_backend())
return priv.decrypt(base64.b64decode(encrypted_key), p.OAEP(mgf=p.MGF1(algorithm=h.SHA256()), algorithm=h.SHA256(), label=None))
# ── 密钥文件读写 ──
@staticmethod
def save_key_to_pem(key_bytes: bytes, path: str):
dir_path = os.path.dirname(path)
if dir_path:
os.makedirs(dir_path, exist_ok=True)
with open(path, "wb") as f:
f.write(key_bytes)
@staticmethod
def load_key_from_pem(path: str) -> bytes:
with open(path, "rb") as f:
return f.read()
# ── 完整加密流程(打包时使用) ──
@staticmethod
def full_encrypt_package(
nir_data: dict[str, bytes],
manifest: dict,
ed25519_private_key: bytes,
rsa_private_key_pem: bytes,
rsa_public_key_pem: bytes,
) -> dict:
"""完整加密打包流程"""
# 1. 生成两个 AES 密钥
key1 = NBPCrypto.generate_aes_key()
key2 = NBPCrypto.generate_aes_key()
# 2. 派生 HMAC 密钥
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
# 3. 中层加密:用 key2 加密每个 NIR 模块
inner_encrypted = {
mod_name: NBPCrypto.inner_encrypt(mod_data, key2)
for mod_name, mod_data in nir_data.items()
}
# 4. 中层签名:用 RSA 签名 NIR 数据摘要
inner_sig = NBPCrypto._build_nir_digest(inner_encrypted)
inner_signature = NBPCrypto.inner_sign(inner_sig, rsa_private_key_pem)
# 5. 内层签名:用 HMAC 签名每个模块
module_sigs = {
mod_name: NBPCrypto.module_sign(mod_data, hmac_key)
for mod_name, mod_data in nir_data.items()
}
# 6. 构建 META-INF 数据,外层加密
meta_inf = {
"manifest": manifest,
"inner_signature": base64.b64encode(inner_signature).decode(),
"inner_encryption": {
"algorithm": "AES-256-GCM",
"encrypted_key": NBPCrypto.encrypt_key(key2, rsa_public_key_pem),
},
"module_signatures": module_sigs,
}
outer_encrypted = NBPCrypto.outer_encrypt(
json.dumps(meta_inf).encode("utf-8"), key1
)
# 7. 外层签名:用 Ed25519 签名包摘要
outer_sig = NBPCrypto._build_package_digest(outer_encrypted, inner_encrypted)
outer_signature = NBPCrypto.outer_sign(outer_sig, ed25519_private_key)
# 8. 内存擦除
NBPCrypto._secure_wipe(bytearray(key1))
NBPCrypto._secure_wipe(bytearray(key2))
return {
"outer_encryption": {
"algorithm": "AES-256-GCM",
"encrypted_key": NBPCrypto.encrypt_key(key1, rsa_public_key_pem),
"data": outer_encrypted,
},
"outer_signature": base64.b64encode(outer_signature).decode(),
"inner_encrypted": inner_encrypted,
"inner_signature": base64.b64encode(inner_signature).decode(),
"inner_encryption": meta_inf["inner_encryption"],
"module_signatures": module_sigs,
"hmac_key_derivation": "HKDF-SHA256(ikm=key1+key2, info=NebulaShell:NBPF:HMAC:v1)",
}
# ── 完整解密流程(加载时使用) ──
@staticmethod
def full_decrypt_package(
package_info: dict,
ed25519_public_key: bytes,
rsa_private_key_pem: bytes,
rsa_public_key_pem: bytes = None,
) -> dict[str, bytes]:
"""完整解密流程,返回 {module_name: nir_bytes}"""
if NBPCrypto._anti_debug_check():
raise NBPCryptoError("调试器检测到,拒绝解密")
# 1. 外层验签
NBPCrypto._verify_outer_sig(package_info, ed25519_public_key)
# 2. 外层解密key1 → META-INF
key1 = NBPCrypto.decrypt_key(
package_info["outer_encryption"]["encrypted_key"], rsa_private_key_pem
)
meta_inf = json.loads(
NBPCrypto.outer_decrypt(package_info["outer_encryption"]["data"], key1).decode("utf-8")
)
NBPCrypto._secure_wipe(bytearray(key1))
# 3. 中层验签
if rsa_public_key_pem:
NBPCrypto._verify_inner_sig(package_info, meta_inf, rsa_public_key_pem)
# 4. 中层解密key2
key2 = NBPCrypto.decrypt_key(
meta_inf["inner_encryption"]["encrypted_key"], rsa_private_key_pem
)
# 5. 派生 HMAC 密钥 + 解密 NIR
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
NBPCrypto._secure_wipe(bytearray(key2))
nir_result = {
mod_name: NBPCrypto.inner_decrypt(enc_info, key2)
for mod_name, enc_info in package_info["inner_encrypted"].items()
}
# 6. 内层验签
module_sigs = meta_inf.get("module_signatures", {})
for mod_name, mod_data in nir_result.items():
expected = module_sigs.get(mod_name)
if expected and not NBPCrypto.module_verify(mod_data, expected, hmac_key):
raise NBPCryptoError(f"模块 '{mod_name}' HMAC 签名验证失败")
return nir_result
# ── 内部工具方法 ──
@staticmethod
def _build_nir_digest(inner_encrypted: dict) -> bytes:
d = hashlib.sha256()
for mod_name in sorted(inner_encrypted):
d.update(mod_name.encode())
d.update(inner_encrypted[mod_name]["ciphertext"].encode())
return d.digest()
@staticmethod
def _build_package_digest(outer_encrypted: dict, inner_encrypted: dict) -> bytes:
d = hashlib.sha256()
d.update(json.dumps(outer_encrypted).encode())
for mod_name in sorted(inner_encrypted):
d.update(mod_name.encode())
d.update(inner_encrypted[mod_name]["ciphertext"].encode())
return d.digest()
@staticmethod
def _verify_outer_sig(package_info: dict, ed25519_public_key: bytes):
sig = base64.b64decode(package_info["outer_signature"])
digest = NBPCrypto._build_package_digest(
package_info["outer_encryption"]["data"], package_info["inner_encrypted"]
)
if not NBPCrypto.outer_verify(digest, sig, ed25519_public_key):
raise NBPCryptoError("外层签名验证失败")
@staticmethod
def _verify_inner_sig(package_info: dict, meta_inf: dict, rsa_public_key_pem: bytes):
sig = base64.b64decode(meta_inf["inner_signature"])
digest = NBPCrypto._build_nir_digest(package_info["inner_encrypted"])
if not NBPCrypto.inner_verify(digest, sig, rsa_public_key_pem):
raise NBPCryptoError("中层 RSA 签名验证失败")