from __future__ import annotations import re import traceback from pathlib import Path from typing import Any, Optional, Callable, TYPE_CHECKING from oss.logger.logger import Log if TYPE_CHECKING: from oss.core.manager import PluginManager class PLValidationError(Exception): """PL 校验错误""" pass class PLInjector: """PL 注入管理器 - 带完整安全限制""" MAX_FUNCTIONS_PER_PLUGIN = 50 MAX_REGISTRATIONS_PER_NAME = 10 MAX_NAME_LENGTH = 128 MAX_DESCRIPTION_LENGTH = 256 _FUNCTION_NAME_RE = re.compile(r'^[a-zA-Z0-9_:/\-.]+$') _EVENT_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_.]+$') _ROUTE_PATH_RE = re.compile(r'^/[a-zA-Z0-9_\-/.]+$') _FORBIDDEN_ROUTE_PATTERNS = [r'\.\.', r'//', r'/\.', r'~', r'\%'] def __init__(self, plugin_manager: PluginManager): self._plugin_manager = plugin_manager self._injections: dict = {} self._injection_registry: dict = {} self._plugin_function_count: dict = {} def check_and_load_pl(self, plugin_dir: Path, plugin_name: str) -> bool: """检查并加载 PL 文件夹,返回 True 表示成功""" pl_dir = plugin_dir / "PL" if not pl_dir.exists() or not pl_dir.is_dir(): Log.warn("Core", f"插件 '{plugin_name}' 声明了 pl_injection,但缺少 PL/ 文件夹,拒绝加载") return False pl_main = pl_dir / "main.py" if not pl_main.exists(): Log.warn("Core", f"插件 '{plugin_name}' 的 PL/ 文件夹中缺少 main.py,拒绝加载") return False # 禁止危险文件类型 forbidden_ext = {'.sh', '.bat', '.exe', '.dll', '.so', '.dylib', '.bin'} for f in pl_dir.rglob('*'): if f.suffix.lower() in forbidden_ext: Log.error("Core", f"插件 '{plugin_name}' 的 PL/ 文件夹包含危险文件: {f.name},拒绝加载") return False try: # 受限沙箱 safe_builtins = { 'True': True, 'False': False, 'None': None, 'dict': dict, 'list': list, 'str': str, 'int': int, 'float': float, 'bool': bool, 'tuple': tuple, 'set': set, 'len': len, 'range': range, 'enumerate': enumerate, 'zip': zip, 'map': map, 'filter': filter, 'sorted': sorted, 'reversed': reversed, 'min': min, 'max': max, 'sum': sum, 'abs': abs, 'round': round, 'isinstance': isinstance, 'issubclass': issubclass, 'type': type, 'id': id, 'hash': hash, 'repr': repr, 'print': print, 'object': object, 'property': property, 'staticmethod': staticmethod, 'classmethod': classmethod, 'super': super, 'iter': iter, 'next': next, 'any': any, 'all': all, 'callable': callable, 'hasattr': hasattr, 'getattr': getattr, 'setattr': setattr, 'ValueError': ValueError, 'TypeError': TypeError, 'KeyError': KeyError, 'IndexError': IndexError, 'Exception': Exception, 'BaseException': BaseException, } safe_globals = { '__builtins__': safe_builtins, '__name__': f'plugin.{plugin_name}.PL', '__package__': f'plugin.{plugin_name}.PL', '__file__': str(pl_main), } with open(pl_main, 'r', encoding='utf-8') as f: source = f.read() # 静态源码安全检查 self._static_source_check(source, str(pl_main)) code = compile(source, str(pl_main), 'exec') exec(code, safe_globals) register_func = safe_globals.get('register') if register_func and callable(register_func): register_func(self) Log.ok("Core", f"插件 '{plugin_name}' PL 注入成功") else: Log.warn("Core", f"插件 '{plugin_name}' 的 PL/main.py 缺少 register() 函数,但仍允许加载") self._injections[plugin_name] = {"dir": str(pl_dir)} return True except PLValidationError as e: Log.error("Core", f"插件 '{plugin_name}' PL 安全检查失败: {e}") return False except SyntaxError as e: Log.error("Core", f"插件 '{plugin_name}' PL/main.py 语法错误: {e}") return False except FileNotFoundError as e: Log.error("Core", f"插件 '{plugin_name}' PL 文件不存在:{e}") return False except PermissionError as e: Log.error("Core", f"插件 '{plugin_name}' PL 文件权限错误:{e}") return False except Exception as e: Log.error("Core", f"加载插件 '{plugin_name}' 的 PL 失败:{type(e).__name__}: {e}") traceback.print_exc() return False def _static_source_check(self, source: str, file_path: str): """静态源码安全检查 - 增强版,防止字符串拼接/编码绕过""" import base64 # 首先检查是否有 base64 编码的恶意代码 try: string_pattern = r'([A-Za-z0-9+/=]{20,})' for match in re.finditer(string_pattern, source): try: decoded = base64.b64decode(match.group(1)).decode('utf-8', errors='ignore') for dangerous in ['import ', 'exec(', 'eval(', 'compile(', 'os.', 'sys.', 'subprocess']: if dangerous in decoded: raise PLValidationError(f"{file_path} - 检测到 base64 编码的恶意代码") except Exception as e: print(f"[PLInjector] 模块注入错误: {e}") except Exception as e: print(f"[PLInjector] 错误: {e}") # 检查字符串拼接绕过 concat_patterns = [ r"""['"]ex['"]\s*\+\s*['"]ec['"]""", r"""['"]impor['"]\s*\+\s*['"]t['"]""", r"""['"]eva['"]\s*\+\s*['"]l['"]""", r"""['"]compil['"]\s*\+\s*['"]e['"]""", ] for pattern in concat_patterns: if re.search(pattern, source): raise PLValidationError(f"{file_path} - 检测到字符串拼接绕过尝试") forbidden = [ (r'^\s*import\s+(os|sys|subprocess|shutil|socket|ctypes|cffi|multiprocessing|threading)', '禁止导入系统级模块'), (r'^\s*from\s+(os|sys|subprocess|shutil|socket|ctypes|cffi|multiprocessing|threading)\s+import', '禁止导入系统级模块'), (r'__import__\s*\(', '禁止使用 __import__'), (r'(? bool: if not name or not isinstance(name, str): return False if len(name) > self.MAX_NAME_LENGTH: return False return bool(self._FUNCTION_NAME_RE.match(name)) def _validate_route_path(self, path: str) -> bool: if not path or not isinstance(path, str): return False if len(path) > 256: return False if not self._ROUTE_PATH_RE.match(path): return False for p in self._FORBIDDEN_ROUTE_PATTERNS: if re.search(p, path): return False return True def _validate_event_name(self, event_name: str) -> bool: if not event_name or not isinstance(event_name, str): return False if len(event_name) > self.MAX_NAME_LENGTH: return False return bool(self._EVENT_NAME_RE.match(event_name)) def _check_plugin_limit(self, plugin_name: str) -> bool: count = self._plugin_function_count.get(plugin_name, 0) if count >= self.MAX_FUNCTIONS_PER_PLUGIN: Log.warn("Core", f"插件 '{plugin_name}' 注册功能数已达上限 ({self.MAX_FUNCTIONS_PER_PLUGIN})") return False return True def _check_name_limit(self, name: str) -> bool: registrations = self._injection_registry.get(name, []) if len(registrations) >= self.MAX_REGISTRATIONS_PER_NAME: Log.warn("Core", f"功能名称 '{name}' 注册次数已达上限 ({self.MAX_REGISTRATIONS_PER_NAME})") return False return True def _wrap_function(self, func: Callable, plugin_name: str, name: str) -> Callable: """包装函数,异常安全""" def _safe_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: Log.error("Core", f"PL 注入功能 '{name}' (来自 {plugin_name}) 执行异常: {e}") return None return _safe_wrapper def _get_caller_plugin_name(self) -> Optional[str]: """通过栈帧回溯获取调用者插件名""" stack = traceback.extract_stack() for frame in stack: filename = frame.filename if '/PL/' in filename and 'main.py' in filename: parts = Path(filename).parts for i, part in enumerate(parts): if part == 'PL': return parts[i - 1] if i > 0 else None return None def register_function(self, name: str, func: Callable, description: str = ""): """注册注入功能 - 带参数校验和权限限制""" if not self._validate_function_name(name): Log.error("Core", f"PL 注入功能名称非法: '{name}'") return if not callable(func): Log.error("Core", f"PL 注入功能 '{name}' 不是可调用对象") return if description and len(description) > self.MAX_DESCRIPTION_LENGTH: description = description[:self.MAX_DESCRIPTION_LENGTH] plugin_name = self._get_caller_plugin_name() or "unknown" if not self._check_plugin_limit(plugin_name): return if not self._check_name_limit(name): return wrapped_func = self._wrap_function(func, plugin_name, name) if name not in self._injection_registry: self._injection_registry[name] = [] self._injection_registry[name].append({ "func": wrapped_func, "plugin": plugin_name, "description": description, }) self._plugin_function_count[plugin_name] = self._plugin_function_count.get(plugin_name, 0) + 1 Log.tip("Core", f"PL 注入功能已注册: '{name}' (来自 {plugin_name})") def register_route(self, method: str, path: str, handler: Callable): """注册 HTTP 路由 - 带路径安全校验""" valid_methods = {'GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS'} method_upper = method.upper() if method_upper not in valid_methods: Log.error("Core", f"PL 注入路由方法非法: '{method}'") return if not self._validate_route_path(path): Log.error("Core", f"PL 注入路由路径非法: '{path}'") return self.register_function(f"{method_upper}:{path}", handler, f"路由 {method_upper} {path}") def register_event_handler(self, event_name: str, handler: Callable): """注册事件处理器 - 带名称校验""" if not self._validate_event_name(event_name): Log.error("Core", f"PL 注入事件名称非法: '{event_name}'") return self.register_function(f"event:{event_name}", handler, f"事件 {event_name}") def get_injected_functions(self, name: str = None) -> list[Callable]: if name: return [e["func"] for e in self._injection_registry.get(name, [])] return [f for es in self._injection_registry.values() for f in [e["func"] for e in es]] def get_injection_info(self, plugin_name: str = None) -> dict: if plugin_name: return self._injections.get(plugin_name, {}) return dict(self._injections) def has_injection(self, plugin_name: str) -> bool: return plugin_name in self._injections def get_registry_info(self) -> dict: info = {} for name, entries in self._injection_registry.items(): info[name] = { "count": len(entries), "plugins": [e["plugin"] for e in entries], "descriptions": [e["description"] for e in entries], } return info