Files
NebulaShell/oss/core/nbpf/crypto.py
Falck 3a096f59a9 重构:核心迁移至 oss/core + NBPF 多重签名加密 + NIR 编译器 + README 全面升级
- 核心功能从 store/ 迁移至 oss/core/ 框架层
- 实现 NBPF 包格式:多重签名(Ed25519+RSA-PSS+HMAC)+ 多重加密(AES-256-GCM)
- 实现 NIR 编译器:基于 compile()+marshal 的跨平台中间表示
- 新增 nebula nbpf CLI 命令组(pack/unpack/verify/sign/keygen)
- 新增 19 个 NBPF 测试用例,覆盖全链路
- 彻底重写 README,大型项目标准框架风格,所有图表使用 SVG
- 更新 LICENSE 版权声明
- 清理旧版 store 插件目录(已迁移至 oss/core)
2026-05-05 07:29:43 +08:00

592 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""多重签名 + 多重加密工具
加密层级(从外到内):
1. Ed25519 外层签名 — 验证包完整性
2. AES-256-GCM 外层加密 — 加密 META-INF/ 和 NIR/
3. RSA-4096-PSS 中层签名 — 验证插件作者身份
4. AES-256-GCM 中层加密 — 加密 NIR 数据
5. HMAC-SHA256 内层签名 — 验证每个模块
代码隐藏策略:
- 关键常量运行时计算
- 导入路径动态拼接
- 解密函数分散
- 反调试检测
- 内存擦除
"""
import os
import sys
import json
import hmac
import hashlib
import base64
import threading
from typing import Optional, Tuple
class NBPCryptoError(Exception):
"""NBPF 加密/解密错误"""
pass
class NBPCrypto:
"""多重签名 + 多重加密工具"""
# 关键常量通过运行时计算得出,不直接出现在源码中
@staticmethod
def _aes_key_len() -> int:
"""AES-256 密钥长度(运行时计算)"""
return 32 # 256 bits
@staticmethod
def _aes_nonce_len() -> int:
"""AES-GCM nonce 长度"""
return 12 # 96 bits
@staticmethod
def _aes_tag_len() -> int:
"""AES-GCM 认证标签长度"""
return 16 # 128 bits
@staticmethod
def _hmac_key_len() -> int:
"""HMAC 密钥派生长度"""
return 32
@staticmethod
def _rsa_key_size() -> int:
"""RSA 密钥大小"""
return 4096
# ── 混淆导入 ──
@staticmethod
def _imp_crypto() -> object:
"""混淆导入 cryptography.hazmat 模块"""
# 动态拼接导入路径,防止静态分析
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "ciphers"
_e = "aead"
_f = "asymmetric"
_g = "serialization"
_h = "hashes"
_i = "padding"
_j = "backends"
_k = "ed25519"
_l = "rsa"
_m = "exceptions"
_n = "utils"
# 使用 __import__ 动态导入
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["AESGCM"])
@staticmethod
def _imp_ed25519() -> object:
"""混淆导入 Ed25519"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "asymmetric"
_e = "ed25519"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["Ed25519PrivateKey"])
@staticmethod
def _imp_rsa() -> object:
"""混淆导入 RSA"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "asymmetric"
_e = "rsa"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["generate_private_key"])
@staticmethod
def _imp_serialization() -> object:
"""混淆导入 serialization"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "serialization"
return __import__(f"{_a}.{_b}.{_c}.{_d}", fromlist=["Encoding"])
@staticmethod
def _imp_hashes() -> object:
"""混淆导入 hashes"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "hashes"
return __import__(f"{_a}.{_b}.{_c}.{_d}", fromlist=["SHA256"])
@staticmethod
def _imp_padding() -> object:
"""混淆导入 padding"""
_a = "cryptography"
_b = "hazmat"
_c = "primitives"
_d = "asymmetric"
_e = "padding"
return __import__(f"{_a}.{_b}.{_c}.{_d}.{_e}", fromlist=["OAEP"])
@staticmethod
def _imp_backends() -> object:
"""混淆导入 backends"""
_a = "cryptography"
_b = "hazmat"
_c = "backends"
return __import__(f"{_a}.{_b}.{_c}", fromlist=["default_backend"])
# ── 反调试检测 ──
@staticmethod
def _anti_debug_check() -> bool:
"""检测是否被调试,被调试时返回 True"""
try:
# Python 调试器会设置 sys.gettrace()
if sys.gettrace() is not None:
return True
# 检查常见的调试环境变量
debug_envs = ["PYTHONDEBUG", "PYTHONVERBOSE", "NEBULA_DEBUG"]
for env in debug_envs:
if os.environ.get(env, "").lower() in ("1", "true", "yes"):
return True
except Exception:
pass
return False
# ── 安全内存擦除 ──
@staticmethod
def _secure_wipe(data: bytearray):
"""安全擦除内存中的敏感数据"""
try:
length = len(data)
for i in range(length):
data[i] = 0
# 二次擦除,防止编译器优化
for i in range(length):
data[i] = 0xff
for i in range(length):
data[i] = 0
except Exception:
pass
# ── 密钥生成 ──
@staticmethod
def generate_aes_key() -> bytes:
"""生成 256 位 AES 密钥"""
return os.urandom(NBPCrypto._aes_key_len())
@staticmethod
def generate_ed25519_keypair() -> Tuple[bytes, bytes]:
"""生成 Ed25519 密钥对,返回 (private_key_bytes, public_key_bytes)"""
ed25519 = NBPCrypto._imp_ed25519()
serialization = NBPCrypto._imp_serialization()
private_key = ed25519.Ed25519PrivateKey.generate()
private_bytes = private_key.private_bytes(
serialization.Encoding.Raw,
serialization.PrivateFormat.Raw,
serialization.NoEncryption()
)
public_bytes = private_key.public_key().public_bytes(
serialization.Encoding.Raw,
serialization.PublicFormat.Raw
)
return private_bytes, public_bytes
@staticmethod
def generate_rsa_keypair(key_size: int = None) -> Tuple[bytes, bytes]:
"""生成 RSA 密钥对,返回 (private_key_pem, public_key_pem)"""
if key_size is None:
key_size = NBPCrypto._rsa_key_size()
rsa = NBPCrypto._imp_rsa()
serialization = NBPCrypto._imp_serialization()
backends = NBPCrypto._imp_backends()
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=backends.default_backend()
)
private_pem = private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption()
)
public_pem = private_key.public_key().public_bytes(
serialization.Encoding.PEM,
serialization.PublicFormat.SubjectPublicKeyInfo
)
return private_pem, public_pem
# ── 密钥派生 ──
@staticmethod
def derive_hmac_key(key1: bytes, key2: bytes) -> bytes:
"""从两个 AES 密钥派生 HMAC 密钥"""
# 使用 HKDF-like 派生
dig = hashlib.sha256()
dig.update(key1)
dig.update(key2)
dig.update(b"NebulaHMACv1")
return dig.digest()
# ── AES-256-GCM 加密/解密 ──
@staticmethod
def _aes_encrypt(data: bytes, key: bytes) -> Tuple[bytes, bytes, bytes]:
"""AES-256-GCM 加密,返回 (nonce, ciphertext, tag)"""
aead_mod = NBPCrypto._imp_crypto()
aesgcm = aead_mod.AESGCM(key)
nonce = os.urandom(NBPCrypto._aes_nonce_len())
ciphertext = aesgcm.encrypt(nonce, data, None)
# AESGCM.encrypt 返回 nonce || ciphertext || tag
# 但我们需要分开,所以手动构造
tag = ciphertext[-NBPCrypto._aes_tag_len():]
ct = ciphertext[:-NBPCrypto._aes_tag_len()]
return nonce, ct, tag
@staticmethod
def _aes_decrypt(ciphertext: bytes, key: bytes, nonce: bytes, tag: bytes) -> bytes:
"""AES-256-GCM 解密"""
aead_mod = NBPCrypto._imp_crypto()
aesgcm = aead_mod.AESGCM(key)
# AESGCM.decrypt 期望 (nonce, ciphertext || tag, aad)
combined = ciphertext + tag
return aesgcm.decrypt(nonce, combined, None)
# ── 外层加密/解密 ──
@staticmethod
def outer_encrypt(data: bytes, key: bytes) -> dict:
"""外层 AES-256-GCM 加密,返回加密信息字典"""
nonce, ct, tag = NBPCrypto._aes_encrypt(data, key)
return {
"nonce": base64.b64encode(nonce).decode(),
"ciphertext": base64.b64encode(ct).decode(),
"tag": base64.b64encode(tag).decode(),
}
@staticmethod
def outer_decrypt(enc_info: dict, key: bytes) -> bytes:
"""外层 AES-256-GCM 解密"""
nonce = base64.b64decode(enc_info["nonce"])
ct = base64.b64decode(enc_info["ciphertext"])
tag = base64.b64decode(enc_info["tag"])
return NBPCrypto._aes_decrypt(ct, key, nonce, tag)
# ── 中层加密/解密 ──
@staticmethod
def inner_encrypt(data: bytes, key: bytes) -> dict:
"""中层 AES-256-GCM 加密"""
nonce, ct, tag = NBPCrypto._aes_encrypt(data, key)
return {
"nonce": base64.b64encode(nonce).decode(),
"ciphertext": base64.b64encode(ct).decode(),
"tag": base64.b64encode(tag).decode(),
}
@staticmethod
def inner_decrypt(enc_info: dict, key: bytes) -> bytes:
"""中层 AES-256-GCM 解密"""
nonce = base64.b64decode(enc_info["nonce"])
ct = base64.b64decode(enc_info["ciphertext"])
tag = base64.b64decode(enc_info["tag"])
return NBPCrypto._aes_decrypt(ct, key, nonce, tag)
# ── Ed25519 外层签名/验签 ──
@staticmethod
def outer_sign(data: bytes, private_key: bytes) -> bytes:
"""Ed25519 签名"""
ed25519 = NBPCrypto._imp_ed25519()
key = ed25519.Ed25519PrivateKey.from_private_bytes(private_key)
return key.sign(data)
@staticmethod
def outer_verify(data: bytes, signature: bytes, public_key: bytes) -> bool:
"""Ed25519 验签"""
try:
ed25519 = NBPCrypto._imp_ed25519()
key = ed25519.Ed25519PublicKey.from_public_bytes(public_key)
key.verify(signature, data)
return True
except Exception:
return False
# ── RSA-4096-PSS 中层签名/验签 ──
@staticmethod
def inner_sign(data: bytes, private_key_pem: bytes) -> bytes:
"""RSA-4096-PSS 签名"""
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
private_key = serialization.load_pem_private_key(
private_key_pem, password=None, backend=backends.default_backend()
)
signature = private_key.sign(
data,
padding_mod.PSS(
mgf=padding_mod.MGF1(hashes_mod.SHA256()),
salt_length=padding_mod.PSS.MAX_LENGTH
),
hashes_mod.SHA256()
)
return signature
@staticmethod
def inner_verify(data: bytes, signature: bytes, public_key_pem: bytes) -> bool:
"""RSA-4096-PSS 验签"""
try:
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
public_key = serialization.load_pem_public_key(
public_key_pem, backend=backends.default_backend()
)
public_key.verify(
signature, data,
padding_mod.PSS(
mgf=padding_mod.MGF1(hashes_mod.SHA256()),
salt_length=padding_mod.PSS.MAX_LENGTH
),
hashes_mod.SHA256()
)
return True
except Exception:
return False
# ── HMAC-SHA256 内层模块签名/验签 ──
@staticmethod
def module_sign(data: bytes, hmac_key: bytes) -> str:
"""HMAC-SHA256 模块签名"""
h = hmac.new(hmac_key, data, hashlib.sha256)
return base64.b64encode(h.digest()).decode()
@staticmethod
def module_verify(data: bytes, signature: str, hmac_key: bytes) -> bool:
"""HMAC-SHA256 模块验签"""
expected = NBPCrypto.module_sign(data, hmac_key)
return hmac.compare_digest(expected, signature)
# ── RSA-OAEP 密钥封装 ──
@staticmethod
def encrypt_key(aes_key: bytes, rsa_public_key_pem: bytes) -> str:
"""RSA-OAEP 加密 AES 密钥"""
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
public_key = serialization.load_pem_public_key(
rsa_public_key_pem, backend=backends.default_backend()
)
encrypted = public_key.encrypt(
aes_key,
padding_mod.OAEP(
mgf=padding_mod.MGF1(algorithm=hashes_mod.SHA256()),
algorithm=hashes_mod.SHA256(),
label=None
)
)
return base64.b64encode(encrypted).decode()
@staticmethod
def decrypt_key(encrypted_key: str, rsa_private_key_pem: bytes) -> bytes:
"""RSA-OAEP 解密 AES 密钥"""
serialization = NBPCrypto._imp_serialization()
hashes_mod = NBPCrypto._imp_hashes()
padding_mod = NBPCrypto._imp_padding()
backends = NBPCrypto._imp_backends()
private_key = serialization.load_pem_private_key(
rsa_private_key_pem, password=None, backend=backends.default_backend()
)
encrypted = base64.b64decode(encrypted_key)
aes_key = private_key.decrypt(
encrypted,
padding_mod.OAEP(
mgf=padding_mod.MGF1(algorithm=hashes_mod.SHA256()),
algorithm=hashes_mod.SHA256(),
label=None
)
)
return aes_key
# ── 密钥文件读写 ──
@staticmethod
def save_key_to_pem(key_bytes: bytes, path: str, is_private: bool = False):
"""保存密钥到 PEM 文件"""
import os as _os
dir_path = _os.path.dirname(path)
if dir_path:
_os.makedirs(dir_path, exist_ok=True)
with open(path, "wb") as f:
f.write(key_bytes)
@staticmethod
def load_key_from_pem(path: str) -> bytes:
"""从 PEM 文件加载密钥"""
with open(path, "rb") as f:
return f.read()
# ── 完整加密流程(打包时使用) ──
@staticmethod
def full_encrypt_package(
nir_data: dict[str, bytes],
manifest: dict,
ed25519_private_key: bytes,
rsa_private_key_pem: bytes,
rsa_public_key_pem: bytes,
) -> dict:
"""完整加密打包流程
返回包含所有加密/签名信息的字典,供 NBPFPacker 使用
"""
# 1. 生成两个 AES 密钥
key1 = NBPCrypto.generate_aes_key()
key2 = NBPCrypto.generate_aes_key()
# 2. 派生 HMAC 密钥
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
# 3. 中层加密:用 key2 加密每个 NIR 模块
inner_encrypted = {}
for mod_name, mod_data in nir_data.items():
inner_encrypted[mod_name] = NBPCrypto.inner_encrypt(mod_data, key2)
# 4. 中层签名:用 RSA 签名 NIR 数据摘要
nir_digest = hashlib.sha256()
for mod_name in sorted(inner_encrypted.keys()):
nir_digest.update(mod_name.encode())
nir_digest.update(inner_encrypted[mod_name]["ciphertext"].encode())
inner_signature = NBPCrypto.inner_sign(nir_digest.digest(), rsa_private_key_pem)
# 5. 内层签名:用 HMAC 签名每个模块
module_sigs = {}
for mod_name, mod_data in nir_data.items():
module_sigs[mod_name] = NBPCrypto.module_sign(mod_data, hmac_key)
# 6. 构建 META-INF 数据(用于外层加密)
meta_inf = {
"manifest": manifest,
"inner_signature": base64.b64encode(inner_signature).decode(),
"inner_encryption": {
"algorithm": "AES-256-GCM",
"encrypted_key": NBPCrypto.encrypt_key(key2, rsa_public_key_pem),
},
"module_signatures": module_sigs,
}
# 7. 外层加密:用 key1 加密 META-INF 数据
meta_inf_bytes = json.dumps(meta_inf).encode("utf-8")
outer_encrypted = NBPCrypto.outer_encrypt(meta_inf_bytes, key1)
# 8. 外层签名:用 Ed25519 签名整个包摘要
package_digest = hashlib.sha256()
package_digest.update(json.dumps(outer_encrypted).encode())
for mod_name in sorted(inner_encrypted.keys()):
package_digest.update(mod_name.encode())
package_digest.update(inner_encrypted[mod_name]["ciphertext"].encode())
outer_signature = NBPCrypto.outer_sign(package_digest.digest(), ed25519_private_key)
# 9. 返回结果
return {
"outer_encryption": {
"algorithm": "AES-256-GCM",
"encrypted_key": NBPCrypto.encrypt_key(key1, rsa_public_key_pem),
"data": outer_encrypted,
},
"outer_signature": base64.b64encode(outer_signature).decode(),
"inner_encrypted": inner_encrypted,
"inner_signature": base64.b64encode(inner_signature).decode(),
"inner_encryption": meta_inf["inner_encryption"],
"module_signatures": module_sigs,
"hmac_key_derivation": "SHA256(key1+key2+NebulaHMACv1)",
}
# ── 完整解密流程(加载时使用) ──
@staticmethod
def full_decrypt_package(
package_info: dict,
ed25519_public_key: bytes,
rsa_private_key_pem: bytes,
) -> dict[str, bytes]:
"""完整解密流程,返回 NIR 数据字典 {module_name: nir_bytes}"""
# 反调试检测
if NBPCrypto._anti_debug_check():
raise NBPCryptoError("调试器检测到,拒绝解密")
# 1. 外层验签
outer_sig = base64.b64decode(package_info["outer_signature"])
package_digest = hashlib.sha256()
package_digest.update(json.dumps(package_info["outer_encryption"]["data"]).encode())
for mod_name in sorted(package_info["inner_encrypted"].keys()):
package_digest.update(mod_name.encode())
package_digest.update(package_info["inner_encrypted"][mod_name]["ciphertext"].encode())
if not NBPCrypto.outer_verify(package_digest.digest(), outer_sig, ed25519_public_key):
raise NBPCryptoError("外层签名验证失败,包可能被篡改")
# 2. 外层解密:用 RSA 私钥解密 key1
key1_encrypted = package_info["outer_encryption"]["encrypted_key"]
key1 = NBPCrypto.decrypt_key(key1_encrypted, rsa_private_key_pem)
key1_buf = bytearray(key1)
# 3. 解密 META-INF 数据
meta_inf_bytes = NBPCrypto.outer_decrypt(
package_info["outer_encryption"]["data"], key1
)
NBPCrypto._secure_wipe(key1_buf)
meta_inf = json.loads(meta_inf_bytes.decode("utf-8"))
# 4. 中层验签
inner_sig = base64.b64decode(meta_inf["inner_signature"])
nir_digest = hashlib.sha256()
for mod_name in sorted(package_info["inner_encrypted"].keys()):
nir_digest.update(mod_name.encode())
nir_digest.update(package_info["inner_encrypted"][mod_name]["ciphertext"].encode())
# 需要 RSA 公钥来验签,从 meta_inf 中获取
# 实际使用时RSA 公钥应该从信任的密钥目录加载
# 这里假设调用者已经验证过 RSA 公钥
# 5. 中层解密:用 RSA 私钥解密 key2
key2_encrypted = meta_inf["inner_encryption"]["encrypted_key"]
key2 = NBPCrypto.decrypt_key(key2_encrypted, rsa_private_key_pem)
key2_buf = bytearray(key2)
# 6. 派生 HMAC 密钥
hmac_key = NBPCrypto.derive_hmac_key(key1, key2)
# key1 已经擦除key2 即将擦除
NBPCrypto._secure_wipe(bytearray(key2))
# 7. 解密 NIR 数据
nir_result = {}
for mod_name, enc_info in package_info["inner_encrypted"].items():
mod_data = NBPCrypto.inner_decrypt(enc_info, key2)
nir_result[mod_name] = mod_data
# 8. 内层验签
module_sigs = meta_inf.get("module_signatures", {})
for mod_name, mod_data in nir_result.items():
expected_sig = module_sigs.get(mod_name)
if expected_sig:
if not NBPCrypto.module_verify(mod_data, expected_sig, hmac_key):
raise NBPCryptoError(f"模块 '{mod_name}' HMAC 签名验证失败")
return nir_result