- crypto.py: 8个_imp_*方法改为_ModuleCache类缓存导入 - crypto.py: outer/inner加解密合并为_layer_encrypt/decrypt - crypto.py: 提取公共摘要计算方法,拆分长方法 - compiler.py: 删除_obfuscate_code中未使用的死代码 - loader.py: 3次ZIP扫描合并为1次缓存读取 - format.py: 更新为使用_ModuleCache - 合计减少205行代码(1707→1502)
273 lines
8.9 KiB
Python
273 lines
8.9 KiB
Python
"""NIR (Nebula Intermediate Representation) 编译器
|
||
|
||
将 Python 插件源码编译为序列化 code object,实现"一次编译,到处运行"。
|
||
|
||
NIR 基于 Python 原生 code object + marshal 序列化:
|
||
- 任何 Python 3.10+ 平台均可执行
|
||
- 不依赖特定 CPU 架构或操作系统
|
||
- 编译时拒绝 C 扩展,保证纯 Python 可移植性
|
||
"""
|
||
import ast
|
||
import marshal
|
||
import types
|
||
import sys
|
||
import random
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
|
||
class NIRCompileError(Exception):
|
||
"""NIR 编译错误"""
|
||
pass
|
||
|
||
|
||
class NIRCompiler:
|
||
"""NIR 编译器 — Python 源码 ↔ 序列化 code object"""
|
||
|
||
# 允许的 Python 字节码版本范围
|
||
MIN_PY_VERSION = (3, 10)
|
||
MAX_PY_VERSION = (3, 13)
|
||
|
||
# 禁止导入的 C 扩展模块
|
||
FORBIDDEN_C_EXTENSIONS = {
|
||
".so", ".pyd", ".dll", ".dylib",
|
||
}
|
||
|
||
# 禁止导入的危险模块
|
||
FORBIDDEN_MODULES = {
|
||
"os", "sys", "subprocess", "shutil", "socket",
|
||
"ctypes", "cffi", "multiprocessing", "threading",
|
||
"signal", "fcntl", "termios", "ptty", "grp", "pwd",
|
||
"resource", "syslog", "crypt",
|
||
}
|
||
|
||
def __init__(self, obfuscate: bool = True):
|
||
self.obfuscate = obfuscate
|
||
|
||
# ── 编译 ──
|
||
|
||
def compile_source(self, source: str, filename: str = "<nbpf>", allowed_imports: list[str] = None) -> bytes:
|
||
"""将 Python 源码编译为序列化的 code object
|
||
|
||
Args:
|
||
source: Python 源码
|
||
filename: 文件名(用于错误报告)
|
||
|
||
Returns:
|
||
序列化的 code object (bytes)
|
||
|
||
Raises:
|
||
NIRCompileError: 编译失败
|
||
"""
|
||
try:
|
||
# 静态安全检查
|
||
self._static_check(source, filename, allowed_imports or [])
|
||
|
||
# 编译为 code object
|
||
code = compile(source, filename, 'exec')
|
||
|
||
# 可选:插入花指令混淆
|
||
if self.obfuscate:
|
||
code = self._obfuscate_code(code)
|
||
|
||
# 序列化
|
||
return marshal.dumps(code)
|
||
except SyntaxError as e:
|
||
raise NIRCompileError(f"语法错误: {e}") from e
|
||
except NIRCompileError:
|
||
raise
|
||
except Exception as e:
|
||
raise NIRCompileError(f"编译失败: {type(e).__name__}: {e}") from e
|
||
|
||
def compile_plugin(self, plugin_dir: Path, allowed_imports: list[str] = None) -> dict[str, bytes]:
|
||
"""编译整个插件目录为 NIR
|
||
|
||
Args:
|
||
plugin_dir: 插件目录路径
|
||
allowed_imports: 允许导入的系统模块白名单(来自 manifest permissions.imports)
|
||
|
||
Returns:
|
||
{module_name: nir_bytes} 字典
|
||
"""
|
||
if not plugin_dir.exists():
|
||
raise NIRCompileError(f"插件目录不存在: {plugin_dir}")
|
||
|
||
# 拒绝 C 扩展
|
||
self._reject_c_extensions(plugin_dir)
|
||
|
||
# 收集所有 .py 文件
|
||
sources = self._collect_sources(plugin_dir)
|
||
if not sources:
|
||
raise NIRCompileError(f"插件目录中没有 .py 文件: {plugin_dir}")
|
||
|
||
# 编译每个文件
|
||
nir_data = {}
|
||
for rel_path, source in sources.items():
|
||
module_name = rel_path.replace(".py", "").replace("/", ".")
|
||
if module_name.endswith(".__init__"):
|
||
module_name = module_name[:-9] # 去掉 .__init__
|
||
nir_data[module_name] = self.compile_source(source, str(plugin_dir / rel_path), allowed_imports)
|
||
|
||
return nir_data
|
||
|
||
def _collect_sources(self, plugin_dir: Path) -> dict[str, str]:
|
||
"""收集插件目录下所有 .py 文件源码
|
||
|
||
Returns:
|
||
{相对路径: 源码} 字典
|
||
"""
|
||
sources = {}
|
||
for file_path in sorted(plugin_dir.rglob("*.py")):
|
||
# 跳过 __pycache__
|
||
if "__pycache__" in file_path.parts:
|
||
continue
|
||
rel_path = str(file_path.relative_to(plugin_dir))
|
||
try:
|
||
source = file_path.read_text(encoding="utf-8")
|
||
sources[rel_path] = source
|
||
except Exception as e:
|
||
raise NIRCompileError(f"读取文件失败 {rel_path}: {e}") from e
|
||
return sources
|
||
|
||
# ── 反序列化 ──
|
||
|
||
@staticmethod
|
||
def deserialize_nir(nir_data: bytes) -> types.CodeType:
|
||
"""反序列化 NIR 数据为 code object
|
||
|
||
Args:
|
||
nir_data: 序列化的 code object (bytes)
|
||
|
||
Returns:
|
||
code object
|
||
"""
|
||
try:
|
||
code = marshal.loads(nir_data)
|
||
if not isinstance(code, types.CodeType):
|
||
raise NIRCompileError("反序列化结果不是 code object")
|
||
return code
|
||
except Exception as e:
|
||
raise NIRCompileError(f"NIR 反序列化失败: {e}") from e
|
||
|
||
@staticmethod
|
||
def create_function(code: types.CodeType, globals_dict: dict) -> types.FunctionType:
|
||
"""从 code object 创建可调用函数
|
||
|
||
Args:
|
||
code: code object
|
||
globals_dict: 全局命名空间
|
||
|
||
Returns:
|
||
可调用的函数对象
|
||
"""
|
||
return types.FunctionType(code, globals_dict)
|
||
|
||
# ── 静态安全检查 ──
|
||
|
||
def _static_check(self, source: str, filename: str, allowed_imports: list[str] = None):
|
||
"""静态源码安全检查"""
|
||
try:
|
||
tree = ast.parse(source, filename=filename)
|
||
except SyntaxError:
|
||
raise
|
||
|
||
for node in ast.walk(tree):
|
||
# 检查 import 语句
|
||
if isinstance(node, ast.Import):
|
||
for alias in node.names:
|
||
self._check_module(alias.name, node.lineno, allowed_imports)
|
||
|
||
# 检查 from ... import 语句
|
||
elif isinstance(node, ast.ImportFrom):
|
||
if node.module:
|
||
self._check_module(node.module, node.lineno, allowed_imports)
|
||
|
||
# 检查 __import__ 调用
|
||
elif isinstance(node, ast.Call):
|
||
if isinstance(node.func, ast.Name) and node.func.id == "__import__":
|
||
raise NIRCompileError(
|
||
f"{filename}:{node.lineno} - 禁止使用 __import__()"
|
||
)
|
||
|
||
# 检查 exec/eval/compile 调用
|
||
elif isinstance(node, ast.Call):
|
||
if isinstance(node.func, ast.Name):
|
||
if node.func.id in ("exec", "eval", "compile"):
|
||
raise NIRCompileError(
|
||
f"{filename}:{node.lineno} - 禁止使用 {node.func.id}()"
|
||
)
|
||
|
||
def _check_module(self, module_name: str, lineno: int, allowed_imports: list[str] = None):
|
||
"""检查模块是否被禁止(支持白名单豁免)"""
|
||
base = module_name.split(".")[0]
|
||
if base in self.FORBIDDEN_MODULES:
|
||
# 检查是否在白名单中
|
||
if allowed_imports and base in allowed_imports:
|
||
return # 白名单放行
|
||
raise NIRCompileError(
|
||
f"第 {lineno} 行 - 禁止导入系统模块: '{module_name}'"
|
||
f"(如需使用请在 manifest.json 的 permissions.imports 中声明)"
|
||
)
|
||
|
||
def _reject_c_extensions(self, plugin_dir: Path):
|
||
"""拒绝 C 扩展"""
|
||
for ext in self.FORBIDDEN_C_EXTENSIONS:
|
||
for f in plugin_dir.rglob(f"*{ext}"):
|
||
raise NIRCompileError(
|
||
f"插件包含 C 扩展,拒绝编译: {f.relative_to(plugin_dir)}"
|
||
)
|
||
|
||
# ── 花指令混淆 ──
|
||
|
||
def _obfuscate_code(self, code: types.CodeType) -> types.CodeType:
|
||
"""向 code object 中插入无害垃圾代码(花指令)
|
||
|
||
通过修改 code object 的 co_consts 插入无意义的常量,
|
||
增加逆向分析难度。
|
||
"""
|
||
# 只对非空代码进行混淆
|
||
if not code.co_code or len(code.co_consts) == 0:
|
||
return code
|
||
|
||
# 生成无害的垃圾常量
|
||
junk_consts = [
|
||
None,
|
||
42,
|
||
"NebulaShell",
|
||
True,
|
||
False,
|
||
]
|
||
|
||
# 随机选择垃圾常量插入
|
||
junk = random.choice(junk_consts)
|
||
|
||
# 递归混淆子 code object
|
||
new_child_consts = []
|
||
for child in code.co_consts:
|
||
if isinstance(child, types.CodeType):
|
||
new_child_consts.append(self._obfuscate_code(child))
|
||
else:
|
||
new_child_consts.append(child)
|
||
|
||
# 重建 code object
|
||
try:
|
||
new_code = code.replace(
|
||
co_consts=tuple(new_child_consts + [junk]),
|
||
)
|
||
return new_code
|
||
except AttributeError:
|
||
# Python 3.7 及以下不支持 replace
|
||
return code
|
||
|
||
# ── 工具方法 ──
|
||
|
||
@staticmethod
|
||
def check_python_version() -> bool:
|
||
"""检查 Python 版本是否支持 NIR"""
|
||
ver = sys.version_info[:2]
|
||
if ver < NIRCompiler.MIN_PY_VERSION:
|
||
return False
|
||
if ver > NIRCompiler.MAX_PY_VERSION:
|
||
return False
|
||
return True
|