Files
NebulaShell/store/@{FutureOSS}/plugin-loader/main.py
Falck 395cda2e8b chore: add website directory to gitignore and update VSCode config
- Add `website/` to .gitignore to exclude website build artifacts
- Add Node.js debug configurations for FutureOSS website in launch.json
- Update VSCode color theme to "Default Dark Modern"
- Refactor plugin loader to simplify dependency and lifecycle plugin loading logic
2026-04-25 06:43:45 +08:00

642 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""插件加载器插件 - 支持能力扫描和扩展 + PL 注入机制"""
import sys
import json
import re
import types
import traceback
import importlib.util
from pathlib import Path
from typing import Any, Optional, Callable
from oss.plugin.types import Plugin, register_plugin_type
from oss.plugin.capabilities import scan_capabilities
class Log:
"""智能彩色日志"""
_TTY = sys.stdout.isatty()
_C = {"reset": "\033[0m", "white": "\033[0;37m", "yellow": "\033[1;33m", "blue": "\033[1;34m", "red": "\033[1;31m"}
@classmethod
def c(cls, text: str, color: str) -> str:
if not cls._TTY: return text
return f"{cls._C.get(color, '')}{text}{cls._C['reset']}"
@classmethod
def info(cls, tag: str, msg: str): print(f"{cls.c(f'[{tag}]', 'white')} {cls.c(msg, 'white')}")
@classmethod
def warn(cls, tag: str, msg: str): print(f"{cls.c(f'[{tag}]', 'yellow')} {cls.c('', 'yellow')} {cls.c(msg, 'yellow')}")
@classmethod
def tip(cls, tag: str, msg: str): print(f"{cls.c(f'[{tag}]', 'blue')} {cls.c('', 'blue')} {cls.c(msg, 'blue')}")
@classmethod
def error(cls, tag: str, msg: str): print(f"{cls.c(f'[{tag}]', 'red')} {cls.c('', 'red')} {cls.c(msg, 'red')}")
@classmethod
def ok(cls, tag: str, msg: str): print(f"{cls.c(f'[{tag}]', 'white')} {cls.c(msg, 'white')}")
class PluginInfo:
"""插件信息"""
def __init__(self):
self.name: str = ""
self.version: str = ""
self.author: str = ""
self.description: str = ""
self.readme: str = ""
self.config: dict[str, Any] = {}
self.extensions: dict[str, Any] = {}
self.lifecycle: Any = None
self.capabilities: set[str] = set()
self.dependencies: list[str] = []
self.pl_injected: bool = False
class PermissionError(Exception):
"""权限错误"""
pass
class PluginProxy:
"""插件代理 - 防止越级访问"""
def __init__(self, plugin_name: str, plugin_instance: Any, allowed_plugins: list[str], all_plugins: dict):
self._plugin_name = plugin_name
self._plugin_instance = plugin_instance
self._allowed_plugins = set(allowed_plugins)
self._all_plugins = all_plugins
def get_plugin(self, name: str) -> Any:
if name not in self._allowed_plugins and "*" not in self._allowed_plugins:
raise PermissionError(f"插件 '{self._plugin_name}' 无权访问插件 '{name}'")
if name not in self._all_plugins: return None
return self._all_plugins[name]["instance"]
def list_plugins(self) -> list[str]:
if "*" in self._allowed_plugins: return list(self._all_plugins.keys())
return [n for n in self._allowed_plugins if n in self._all_plugins]
def get_capability(self, capability: str) -> Any: return None
def __getattr__(self, name: str): return getattr(self._plugin_instance, name)
class CapabilityRegistry:
"""能力注册表"""
def __init__(self, permission_check: bool = True):
self.providers: dict = {}
self.consumers: dict = {}
self.permission_check = permission_check
def register_provider(self, capability: str, plugin_name: str, instance: Any):
self.providers[capability] = {"plugin": plugin_name, "instance": instance}
if capability not in self.consumers: self.consumers[capability] = []
def register_consumer(self, capability: str, plugin_name: str):
if capability not in self.consumers: self.consumers[capability] = []
if plugin_name not in self.consumers[capability]: self.consumers[capability].append(plugin_name)
def get_provider(self, capability: str, requester: str = "", allowed_plugins: list = None) -> Optional[Any]:
if capability not in self.providers: return None
if self.permission_check and allowed_plugins is not None:
pn = self.providers[capability]["plugin"]
if pn != requester and pn not in allowed_plugins and "*" not in allowed_plugins:
raise PermissionError(f"插件 '{requester}' 无权使用能力 '{capability}'")
return self.providers[capability]["instance"]
def has_capability(self, capability: str) -> bool: return capability in self.providers
def get_consumers(self, capability: str) -> list: return self.consumers.get(capability, [])
class PLValidationError(Exception):
"""PL 校验错误"""
pass
class PLInjector:
"""PL 注入管理器 - 带完整安全限制"""
MAX_FUNCTIONS_PER_PLUGIN = 50
MAX_REGISTRATIONS_PER_NAME = 10
MAX_NAME_LENGTH = 128
MAX_DESCRIPTION_LENGTH = 256
_FUNCTION_NAME_RE = re.compile(r'^[a-zA-Z0-9_:/\-.]+$')
_EVENT_NAME_RE = re.compile(r'^[a-zA-Z][a-zA-Z0-9_.]+$')
_ROUTE_PATH_RE = re.compile(r'^/[a-zA-Z0-9_\-/.]+$')
_FORBIDDEN_ROUTE_PATTERNS = [r'\.\.', r'//', r'/\.', r'~', r'\%']
def __init__(self, plugin_manager: 'PluginManager'):
self._plugin_manager = plugin_manager
self._injections: dict = {}
self._injection_registry: dict = {}
self._plugin_function_count: dict = {}
def check_and_load_pl(self, plugin_dir: Path, plugin_name: str) -> bool:
"""检查并加载 PL 文件夹,返回 True 表示成功"""
pl_dir = plugin_dir / "PL"
if not pl_dir.exists() or not pl_dir.is_dir():
Log.warn("plugin-loader", f"插件 '{plugin_name}' 声明了 pl_injection但缺少 PL/ 文件夹,拒绝加载")
return False
pl_main = pl_dir / "main.py"
if not pl_main.exists():
Log.warn("plugin-loader", f"插件 '{plugin_name}' 的 PL/ 文件夹中缺少 main.py拒绝加载")
return False
# 禁止危险文件类型
forbidden_ext = {'.sh', '.bat', '.exe', '.dll', '.so', '.dylib', '.bin'}
for f in pl_dir.rglob('*'):
if f.suffix.lower() in forbidden_ext:
Log.error("plugin-loader", f"插件 '{plugin_name}' 的 PL/ 文件夹包含危险文件: {f.name},拒绝加载")
return False
try:
# 受限沙箱
safe_builtins = {
'True': True, 'False': False, 'None': None,
'dict': dict, 'list': list, 'str': str, 'int': int,
'float': float, 'bool': bool, 'tuple': tuple, 'set': set,
'len': len, 'range': range, 'enumerate': enumerate,
'zip': zip, 'map': map, 'filter': filter,
'sorted': sorted, 'reversed': reversed,
'min': min, 'max': max, 'sum': sum, 'abs': abs,
'round': round, 'isinstance': isinstance, 'issubclass': issubclass,
'type': type, 'id': id, 'hash': hash, 'repr': repr,
'print': print, 'object': object, 'property': property,
'staticmethod': staticmethod, 'classmethod': classmethod,
'super': super, 'iter': iter, 'next': next,
'any': any, 'all': all, 'callable': callable,
'hasattr': hasattr, 'getattr': getattr, 'setattr': setattr,
'ValueError': ValueError, 'TypeError': TypeError,
'KeyError': KeyError, 'IndexError': IndexError,
'Exception': Exception, 'BaseException': BaseException,
}
safe_globals = {
'__builtins__': safe_builtins,
'__name__': f'plugin.{plugin_name}.PL',
'__package__': f'plugin.{plugin_name}.PL',
'__file__': str(pl_main),
}
with open(pl_main, 'r', encoding='utf-8') as f:
source = f.read()
# 静态源码安全检查
self._static_source_check(source, str(pl_main))
code = compile(source, str(pl_main), 'exec')
exec(code, safe_globals)
register_func = safe_globals.get('register')
if register_func and callable(register_func):
register_func(self)
Log.ok("plugin-loader", f"插件 '{plugin_name}' PL 注入成功")
else:
Log.warn("plugin-loader", f"插件 '{plugin_name}' 的 PL/main.py 缺少 register() 函数,但仍允许加载")
self._injections[plugin_name] = {"dir": str(pl_dir)}
return True
except PLValidationError as e:
Log.error("plugin-loader", f"插件 '{plugin_name}' PL 安全检查失败: {e}")
return False
except SyntaxError as e:
Log.error("plugin-loader", f"插件 '{plugin_name}' PL/main.py 语法错误: {e}")
return False
except Exception as e:
Log.error("plugin-loader", f"加载插件 '{plugin_name}' 的 PL 失败: {e}")
return False
def _static_source_check(self, source: str, file_path: str):
"""静态源码安全检查"""
forbidden = [
(r'^\s*import\s+(os|sys|subprocess|shutil|socket|ctypes|cffi|multiprocessing|threading)', '禁止导入系统级模块'),
(r'^\s*from\s+(os|sys|subprocess|shutil|socket|ctypes|cffi|multiprocessing|threading)\s+import', '禁止导入系统级模块'),
(r'__import__\s*\(', '禁止使用 __import__'),
(r'exec\s*\(', '禁止使用 exec'),
(r'eval\s*\(', '禁止使用 eval'),
(r'compile\s*\(', '禁止使用 compile'),
(r'open\s*\(', '禁止直接操作文件'),
(r'__builtins__', '禁止访问 __builtins__'),
]
for line_num, line in enumerate(source.split('\n'), 1):
stripped = line.strip()
if not stripped or stripped.startswith('#'): continue
for pattern, msg in forbidden:
if re.search(pattern, stripped):
raise PLValidationError(f"{file_path}:{line_num} - {msg}: '{stripped}'")
def _validate_function_name(self, name: str) -> bool:
if not name or not isinstance(name, str): return False
if len(name) > self.MAX_NAME_LENGTH: return False
return bool(self._FUNCTION_NAME_RE.match(name))
def _validate_route_path(self, path: str) -> bool:
if not path or not isinstance(path, str): return False
if len(path) > 256: return False
if not self._ROUTE_PATH_RE.match(path): return False
for p in self._FORBIDDEN_ROUTE_PATTERNS:
if re.search(p, path): return False
return True
def _validate_event_name(self, event_name: str) -> bool:
if not event_name or not isinstance(event_name, str): return False
if len(event_name) > self.MAX_NAME_LENGTH: return False
return bool(self._EVENT_NAME_RE.match(event_name))
def _check_plugin_limit(self, plugin_name: str) -> bool:
count = self._plugin_function_count.get(plugin_name, 0)
if count >= self.MAX_FUNCTIONS_PER_PLUGIN:
Log.warn("plugin-loader", f"插件 '{plugin_name}' 注册功能数已达上限 ({self.MAX_FUNCTIONS_PER_PLUGIN})")
return False
return True
def _check_name_limit(self, name: str) -> bool:
registrations = self._injection_registry.get(name, [])
if len(registrations) >= self.MAX_REGISTRATIONS_PER_NAME:
Log.warn("plugin-loader", f"功能名称 '{name}' 注册次数已达上限 ({self.MAX_REGISTRATIONS_PER_NAME})")
return False
return True
def _wrap_function(self, func: Callable, plugin_name: str, name: str) -> Callable:
"""包装函数,异常安全"""
def _safe_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
Log.error("plugin-loader", f"PL 注入功能 '{name}' (来自 {plugin_name}) 执行异常: {e}")
return None
return _safe_wrapper
def _get_caller_plugin_name(self) -> Optional[str]:
"""通过栈帧回溯获取调用者插件名"""
stack = traceback.extract_stack()
for frame in stack:
filename = frame.filename
if '/PL/' in filename and 'main.py' in filename:
parts = Path(filename).parts
for i, part in enumerate(parts):
if part == 'PL':
return parts[i - 1] if i > 0 else None
return None
def register_function(self, name: str, func: Callable, description: str = ""):
"""注册注入功能 - 带参数校验和权限限制"""
if not self._validate_function_name(name):
Log.error("plugin-loader", f"PL 注入功能名称非法: '{name}'")
return
if not callable(func):
Log.error("plugin-loader", f"PL 注入功能 '{name}' 不是可调用对象")
return
if description and len(description) > self.MAX_DESCRIPTION_LENGTH:
description = description[:self.MAX_DESCRIPTION_LENGTH]
plugin_name = self._get_caller_plugin_name() or "unknown"
if not self._check_plugin_limit(plugin_name): return
if not self._check_name_limit(name): return
wrapped_func = self._wrap_function(func, plugin_name, name)
if name not in self._injection_registry:
self._injection_registry[name] = []
self._injection_registry[name].append({
"func": wrapped_func, "plugin": plugin_name, "description": description,
})
self._plugin_function_count[plugin_name] = self._plugin_function_count.get(plugin_name, 0) + 1
Log.tip("plugin-loader", f"PL 注入功能已注册: '{name}' (来自 {plugin_name})")
def register_route(self, method: str, path: str, handler: Callable):
"""注册 HTTP 路由 - 带路径安全校验"""
valid_methods = {'GET', 'POST', 'PUT', 'DELETE', 'PATCH', 'HEAD', 'OPTIONS'}
method_upper = method.upper()
if method_upper not in valid_methods:
Log.error("plugin-loader", f"PL 注入路由方法非法: '{method}'")
return
if not self._validate_route_path(path):
Log.error("plugin-loader", f"PL 注入路由路径非法: '{path}'")
return
self.register_function(f"{method_upper}:{path}", handler, f"路由 {method_upper} {path}")
def register_event_handler(self, event_name: str, handler: Callable):
"""注册事件处理器 - 带名称校验"""
if not self._validate_event_name(event_name):
Log.error("plugin-loader", f"PL 注入事件名称非法: '{event_name}'")
return
self.register_function(f"event:{event_name}", handler, f"事件 {event_name}")
def get_injected_functions(self, name: str = None) -> list[Callable]:
if name: return [e["func"] for e in self._injection_registry.get(name, [])]
return [f for es in self._injection_registry.values() for f in [e["func"] for e in es]]
def get_injection_info(self, plugin_name: str = None) -> dict:
if plugin_name: return self._injections.get(plugin_name, {})
return dict(self._injections)
def has_injection(self, plugin_name: str) -> bool:
return plugin_name in self._injections
def get_registry_info(self) -> dict:
info = {}
for name, entries in self._injection_registry.items():
info[name] = {
"count": len(entries),
"plugins": [e["plugin"] for e in entries],
"descriptions": [e["description"] for e in entries],
}
return info
class PluginManager:
"""插件管理器"""
def __init__(self, permission_check: bool = True):
self.plugins: dict = {}
self.lifecycle_plugin = None
self._dependency_plugin = None
self._signature_verifier = None
self.capability_registry = CapabilityRegistry(permission_check=permission_check)
self.permission_check = permission_check
self.enforce_signature = True
self.pl_injector = PLInjector(self)
def set_signature_verifier(self, verifier): self._signature_verifier = verifier
def set_lifecycle(self, lifecycle_plugin): self.lifecycle_plugin = lifecycle_plugin
def _load_manifest(self, plugin_dir: Path) -> dict:
mf = plugin_dir / "manifest.json"
if not mf.exists(): return {}
with open(mf, "r", encoding="utf-8") as f: return json.load(f)
def _load_readme(self, plugin_dir: Path) -> str:
rf = plugin_dir / "README.md"
if not rf.exists(): return ""
with open(rf, "r", encoding="utf-8") as f: return f.read()
def _load_config(self, plugin_dir: Path) -> dict:
cf = plugin_dir / "config.py"
if not cf.exists(): return {}
with open(cf, "r", encoding="utf-8") as f: content = f.read()
for p in ['import ', 'open(', 'exec(', 'eval(', 'os.', 'sys.', 'subprocess']:
if p in content: Log.warn("plugin-loader", f"{cf} 包含危险代码: {p}"); return {}
sg = {"__builtins__": {"True": True, "False": False, "None": None, "dict": dict, "list": list, "str": str, "int": int, "float": float, "bool": bool}}
lv = {}
try: code = compile(content, str(cf), "exec"); exec(code, sg, lv)
except Exception as e: Log.error("plugin-loader", f"配置文件解析失败: {e}"); return {}
return {k: v for k, v in lv.items() if not k.startswith("_") and not callable(v)}
def _load_extensions(self, plugin_dir: Path) -> dict:
ef = plugin_dir / "extensions.py"
if not ef.exists(): return {}
with open(ef, "r", encoding="utf-8") as f: content = f.read()
for p in ['import ', 'open(', 'exec(', 'eval(', 'os.', 'sys.', 'subprocess']:
if p in content: Log.warn("plugin-loader", f"{ef} 包含危险代码: {p}"); return {}
sg = {"__builtins__": {"True": True, "False": False, "None": None, "dict": dict, "list": list, "str": str, "int": int, "float": float, "bool": bool}}
lv = {}
try: code = compile(content, str(ef), "exec"); exec(code, sg, lv)
except Exception as e: Log.error("plugin-loader", f"扩展文件解析失败: {e}"); return {}
return {k: v for k, v in lv.items() if not k.startswith("_") and not callable(v)}
def load(self, plugin_dir: Path, use_sandbox: bool = True) -> Optional[Any]:
"""加载单个插件"""
main_file = plugin_dir / "main.py"
if not main_file.exists(): return None
manifest = self._load_manifest(plugin_dir)
readme = self._load_readme(plugin_dir)
config = self._load_config(plugin_dir)
extensions = self._load_extensions(plugin_dir)
capabilities = scan_capabilities(plugin_dir)
plugin_name = plugin_dir.name.rstrip("}")
# PL 注入检查
pl_injection = manifest.get("config", {}).get("args", {}).get("pl_injection", False)
if pl_injection:
Log.tip("plugin-loader", f"插件 '{plugin_name}' 声明了 pl_injection正在检查 PL/ 文件夹...")
if not self.pl_injector.check_and_load_pl(plugin_dir, plugin_name):
Log.error("plugin-loader", f"插件 '{plugin_name}' 因 PL 注入检查失败被拒绝加载")
return None
Log.ok("plugin-loader", f"插件 '{plugin_name}' PL 注入检查通过")
permissions = manifest.get("permissions", [])
if use_sandbox:
from oss.plugin.loader import PluginLoader as FrameworkLoader
fl = FrameworkLoader(enable_sandbox=True)
result = fl.load_sandbox_plugin(plugin_dir)
if not result: return None
module, instance = result["module"], result["instance"]
else:
spec = importlib.util.spec_from_file_location(f"plugin.{plugin_name}", str(main_file))
module = importlib.util.module_from_spec(spec)
module.__package__ = f"plugin.{plugin_name}"
module.__path__ = [str(plugin_dir)]
sys.modules[spec.name] = module
spec.loader.exec_module(module)
if not hasattr(module, "New"): return None
instance = module.New()
if self.permission_check and permissions:
instance = PluginProxy(plugin_name, instance, permissions, self.plugins)
info = PluginInfo()
meta = manifest.get("metadata", {})
info.name = meta.get("name", plugin_name)
info.version = meta.get("version", "")
info.author = meta.get("author", "")
info.description = meta.get("description", "")
info.readme = readme
info.config = manifest.get("config", {}).get("args", config)
info.extensions = extensions
info.capabilities = capabilities
info.dependencies = manifest.get("dependencies", [])
info.pl_injected = pl_injection
for cap in capabilities:
self.capability_registry.register_provider(cap, plugin_name, instance)
if self.lifecycle_plugin and plugin_name != "lifecycle":
info.lifecycle = self.lifecycle_plugin.create(plugin_name)
self.plugins[plugin_name] = {"instance": instance, "module": module, "info": info, "permissions": permissions}
return instance
def load_all(self, store_dir: str = "store"):
if 'plugin' not in sys.modules:
pkg = types.ModuleType('plugin')
pkg.__path__ = []; pkg.__package__ = 'plugin'
sys.modules['plugin'] = pkg
Log.tip("plugin-loader", "已创建 plugin 命名空间包")
if not self._check_any_plugins(store_dir):
Log.warn("plugin-loader", "未检测到任何插件,自动引导安装...")
self._bootstrap_installation()
lifecycle_plugin = None
lc_dir = Path(store_dir) / "@{FutureOSS}" / "lifecycle"
if lc_dir.exists() and (lc_dir / "main.py").exists():
try:
inst = self.load(lc_dir)
if inst: lifecycle_plugin = inst; self.plugins.pop("lifecycle", None)
except Exception: pass
dep_plugin = None
dep_dir = Path(store_dir) / "@{FutureOSS}" / "dependency"
if dep_dir.exists() and (dep_dir / "main.py").exists():
try:
inst = self.load(dep_dir)
if inst: dep_plugin = inst; self._dependency_plugin = inst; self.plugins.pop("dependency", None)
except Exception: pass
sig_dir = Path(store_dir) / "@{FutureOSS}" / "signature-verifier"
if sig_dir.exists() and (sig_dir / "main.py").exists():
try:
inst = self.load(sig_dir)
if inst: self.set_signature_verifier(inst.verifier); Log.ok("plugin-loader", "签名验证服务已加载")
except Exception as e: Log.warn("plugin-loader", f"signature-verifier 加载失败: {e}")
if lifecycle_plugin: self.set_lifecycle(lifecycle_plugin)
self._load_plugins_from_dir(Path(store_dir))
if dep_plugin: self._sort_by_dependencies(dep_plugin)
def _load_plugins_from_dir(self, store_dir: Path):
if not store_dir.exists(): return
core_plugins = {"webui", "dashboard", "pkg-manager"}
skip = {"plugin-loader", "lifecycle", "dependency", "signature-verifier"}
for ad in store_dir.iterdir():
if ad.is_dir():
for pd in ad.iterdir():
if pd.is_dir() and pd.name not in skip and (pd / "main.py").exists():
self.load(pd, use_sandbox=pd.name not in core_plugins)
self._link_capabilities()
def _check_any_plugins(self, store_dir: str) -> bool:
sp = Path(store_dir)
if sp.exists():
for ad in sp.iterdir():
if ad.is_dir():
for pd in ad.iterdir():
if pd.is_dir() and (pd / "main.py").exists(): return True
return False
def _bootstrap_installation(self): Log.info("plugin-loader", "跳过引导安装pkg 插件已移除)")
def _sort_by_dependencies(self, dep_plugin):
if not dep_plugin: return
for n, i in self.plugins.items(): dep_plugin.add_plugin(n, i["info"].dependencies)
try:
order = dep_plugin.resolve()
sp = {}
for n in order:
if n in self.plugins: sp[n] = self.plugins[n]
for n in set(self.plugins.keys()) - set(sp.keys()): sp[n] = self.plugins[n]
self.plugins = sp
except Exception as e: Log.error("plugin-loader", f"依赖解析失败: {e}")
def _link_capabilities(self):
for pn, info in self.plugins.items():
for cap in info["info"].capabilities:
if self.capability_registry.has_capability(cap):
for cn in self.capability_registry.get_consumers(cap):
if cn in self.plugins:
ci = self.plugins[cn]["info"]
ca = self.plugins[cn].get("permissions", [])
try:
p = self.capability_registry.get_provider(cap, requester=cn, allowed_plugins=ca)
if p and hasattr(ci, "extensions"): ci.extensions[f"_{cap}_provider"] = p
except PermissionError as e: Log.error("plugin-loader", f"权限拒绝: {e}")
def start_all(self):
self._inject_dependencies()
for n, i in self.plugins.items():
try: i["instance"].start()
except Exception as e: Log.error("plugin-loader", f"启动失败 {n}: {e}")
def init_and_start_all(self):
Log.info("plugin-loader", f"init_and_start_all 被调用plugins={len(self.plugins)}")
self._inject_dependencies()
ordered = self._get_ordered_plugins()
Log.tip("plugin-loader", f"插件启动顺序: {' -> '.join(ordered)}")
for name in ordered:
if "plugin-loader" in name: continue
try:
Log.info("plugin-loader", f"初始化: {name}")
self.plugins[name]["instance"].init()
except Exception as e: Log.error("plugin-loader", f"初始化失败 {name}: {e}")
for name in ordered:
if "plugin-loader" in name: continue
try:
Log.info("plugin-loader", f"启动: {name}")
self.plugins[name]["instance"].start()
except Exception as e: Log.error("plugin-loader", f"启动失败 {name}: {e}")
def _get_ordered_plugins(self) -> list[str]:
if not self._dependency_plugin: return list(self.plugins.keys())
try: return [n for n in self._dependency_plugin.resolve() if n in self.plugins]
except Exception as e: Log.warn("plugin-loader", f"依赖解析失败,使用原始顺序: {e}"); return list(self.plugins.keys())
def _inject_dependencies(self):
Log.info("plugin-loader", f"开始注入依赖,共 {len(self.plugins)} 个插件")
nm = {}
for n in self.plugins:
c = n.rstrip("}"); nm[c] = n; nm[c + "}"] = n
for n, i in self.plugins.items():
inst = i["instance"]; io = i.get("info")
if not io or not io.dependencies: continue
for dn in io.dependencies:
ad = nm.get(dn) or nm.get(dn + "}")
if ad and ad in self.plugins:
sn = f"set_{dn.replace('-', '_')}"
if hasattr(inst, sn):
try: getattr(inst, sn)(self.plugins[ad]["instance"]); Log.ok("plugin-loader", f"注入成功: {n} <- {ad}")
except Exception as e: Log.error("plugin-loader", f"注入依赖失败 {n}.{sn}: {e}")
else: Log.warn("plugin-loader", f"{n} 没有 {sn} 方法")
def stop_all(self):
for n, i in reversed(list(self.plugins.items())):
try: i["instance"].stop()
except Exception: pass
if self.lifecycle_plugin: self.lifecycle_plugin.stop_all()
def get_info(self, name: str) -> Optional[PluginInfo]:
if name in self.plugins: return self.plugins[name]["info"]
return None
def has_capability(self, capability: str) -> bool: return self.capability_registry.has_capability(capability)
def get_capability_provider(self, capability: str) -> Optional[Any]: return self.capability_registry.get_provider(capability)
class PluginLoaderPlugin(Plugin):
"""插件加载器插件"""
def __init__(self):
self.manager = PluginManager()
self._loaded = False
self._started = False
self._ensure_plugin_package()
def _ensure_plugin_package(self):
if 'plugin' not in sys.modules:
pkg = types.ModuleType('plugin'); pkg.__path__ = []; sys.modules['plugin'] = pkg
def init(self, deps: dict = None):
if self._loaded: return
self._loaded = True
self._ensure_plugin_package()
Log.info("plugin-loader", "开始加载插件...")
self.manager.load_all()
def start(self):
if self._started: return
self._started = True
Log.info("plugin-loader", "启动插件...")
self.manager.init_and_start_all()
def stop(self):
Log.info("plugin-loader", "停止插件...")
self.manager.stop_all()
register_plugin_type("PluginManager", PluginManager)
register_plugin_type("PluginInfo", PluginInfo)
register_plugin_type("CapabilityRegistry", CapabilityRegistry)
register_plugin_type("PLInjector", PLInjector)
def New():
return PluginLoaderPlugin()