Files
NebulaShell/store/@{FutureOSS}/plugin-loader/main.py
2026-04-18 00:27:33 +08:00

695 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""插件加载器插件 - 支持能力扫描和扩展"""
import sys
import json
import importlib.util
from pathlib import Path
from typing import Any, Optional
from oss.plugin.types import Plugin, register_plugin_type
from oss.plugin.capabilities import scan_capabilities
# ===== 彩色日志 =====
class Log:
"""智能彩色日志"""
_TTY = sys.stdout.isatty()
_C = {
"reset": "\033[0m",
"white": "\033[0;37m",
"yellow": "\033[1;33m",
"blue": "\033[1;34m",
"red": "\033[1;31m",
}
@classmethod
def c(cls, text: str, color: str) -> str:
if not cls._TTY:
return text
return f"{cls._C.get(color, '')}{text}{cls._C['reset']}"
@classmethod
def info(cls, tag: str, msg: str):
print(f"{cls.c(f'[{tag}]', 'white')} {cls.c(msg, 'white')}")
@classmethod
def warn(cls, tag: str, msg: str):
print(f"{cls.c(f'[{tag}]', 'yellow')} {cls.c('', 'yellow')} {cls.c(msg, 'yellow')}")
@classmethod
def tip(cls, tag: str, msg: str):
print(f"{cls.c(f'[{tag}]', 'blue')} {cls.c('', 'blue')} {cls.c(msg, 'blue')}")
@classmethod
def error(cls, tag: str, msg: str):
print(f"{cls.c(f'[{tag}]', 'red')} {cls.c('', 'red')} {cls.c(msg, 'red')}")
@classmethod
def ok(cls, tag: str, msg: str):
print(f"{cls.c(f'[{tag}]', 'white')} {cls.c(msg, 'white')}")
class PluginInfo:
"""插件信息"""
def __init__(self):
self.name: str = ""
self.version: str = ""
self.author: str = ""
self.description: str = ""
self.readme: str = ""
self.config: dict[str, Any] = {}
self.extensions: dict[str, Any] = {}
self.lifecycle: Any = None
self.capabilities: set[str] = set()
self.dependencies: list[str] = []
class PermissionError(Exception):
"""权限错误"""
pass
class PluginProxy:
"""插件代理 - 防止越级访问"""
def __init__(self, plugin_name: str, plugin_instance: Any, allowed_plugins: list[str], all_plugins: dict[str, dict[str, Any]]):
self._plugin_name = plugin_name
self._plugin_instance = plugin_instance
self._allowed_plugins = set(allowed_plugins)
self._all_plugins = all_plugins
def get_plugin(self, name: str) -> Any:
"""获取其他插件实例(带权限检查)"""
if name not in self._allowed_plugins and "*" not in self._allowed_plugins:
raise PermissionError(f"插件 '{self._plugin_name}' 无权访问插件 '{name}'")
if name not in self._all_plugins:
return None
return self._all_plugins[name]["instance"]
def list_plugins(self) -> list[str]:
"""列出有权限访问的插件"""
if "*" in self._allowed_plugins:
return list(self._all_plugins.keys())
return [name for name in self._allowed_plugins if name in self._all_plugins]
def get_capability(self, capability: str) -> Any:
"""获取能力(带权限检查)"""
# 能力访问不需要额外权限,能力注册表会自动处理
return None
def __getattr__(self, name: str):
"""代理其他属性到插件实例"""
return getattr(self._plugin_instance, name)
class CapabilityRegistry:
"""能力注册表"""
def __init__(self, permission_check: bool = True):
self.providers: dict[str, dict[str, Any]] = {}
self.consumers: dict[str, list[str]] = {}
self.permission_check = permission_check
def register_provider(self, capability: str, plugin_name: str, instance: Any):
"""注册能力提供者"""
self.providers[capability] = {
"plugin": plugin_name,
"instance": instance,
}
if capability not in self.consumers:
self.consumers[capability] = []
def register_consumer(self, capability: str, plugin_name: str):
"""注册能力消费者"""
if capability not in self.consumers:
self.consumers[capability] = []
if plugin_name not in self.consumers[capability]:
self.consumers[capability].append(plugin_name)
def get_provider(self, capability: str, requester: str = "", allowed_plugins: list[str] = None) -> Optional[Any]:
"""获取能力提供者实例(带权限检查)"""
if capability not in self.providers:
return None
if self.permission_check and allowed_plugins is not None:
provider_name = self.providers[capability]["plugin"]
if provider_name != requester and provider_name not in allowed_plugins and "*" not in allowed_plugins:
raise PermissionError(f"插件 '{requester}' 无权使用能力 '{capability}'")
return self.providers[capability]["instance"]
def has_capability(self, capability: str) -> bool:
"""检查是否有某个能力"""
return capability in self.providers
def get_consumers(self, capability: str) -> list[str]:
"""获取能力消费者列表"""
return self.consumers.get(capability, [])
class PluginManager:
"""插件管理器"""
def __init__(self, permission_check: bool = True):
self.plugins: dict[str, dict[str, Any]] = {}
self.lifecycle_plugin: Optional[Any] = None
self._dependency_plugin: Optional[Any] = None # dependency 插件引用
self._signature_verifier: Optional[Any] = None # signature-verifier 插件引用
self.capability_registry = CapabilityRegistry(permission_check=permission_check)
self.permission_check = permission_check
self.enforce_signature = True # 是否强制验证官方插件签名
def set_signature_verifier(self, verifier: Any):
"""设置签名验证器"""
self._signature_verifier = verifier
def set_lifecycle(self, lifecycle_plugin: Any):
"""设置生命周期插件"""
self.lifecycle_plugin = lifecycle_plugin
def _load_manifest(self, plugin_dir: Path) -> dict[str, Any]:
"""加载 manifest.json"""
manifest_file = plugin_dir / "manifest.json"
if not manifest_file.exists():
return {}
with open(manifest_file, "r", encoding="utf-8") as f:
return json.load(f)
def _load_readme(self, plugin_dir: Path) -> str:
"""加载 README.md"""
readme_file = plugin_dir / "README.md"
if not readme_file.exists():
return ""
with open(readme_file, "r", encoding="utf-8") as f:
return f.read()
def _load_config(self, plugin_dir: Path) -> dict[str, Any]:
"""加载 Python 配置文件(带安全措施)"""
config_file = plugin_dir / "config.py"
if not config_file.exists():
return {}
# 读取并验证文件内容
with open(config_file, "r", encoding="utf-8") as f:
content = f.read()
# 安全检查:禁止危险操作
dangerous_patterns = ['import ', 'open(', 'exec(', 'eval(', 'os.', 'sys.', 'subprocess']
for pattern in dangerous_patterns:
if pattern in content:
Log.warn("plugin-loader", f"{config_file} 包含危险代码: {pattern}")
return {}
safe_globals = {
"__builtins__": {
"True": True,
"False": False,
"None": None,
"dict": dict,
"list": list,
"str": str,
"int": int,
"float": float,
"bool": bool,
}
}
local_vars = {}
try:
code = compile(content, str(config_file), "exec")
exec(code, safe_globals, local_vars)
except Exception as e:
Log.error("plugin-loader", f"配置文件解析失败: {e}")
return {}
return {
k: v for k, v in local_vars.items()
if not k.startswith("_") and not callable(v)
}
def _load_extensions(self, plugin_dir: Path) -> dict[str, Any]:
"""加载扩展语法Python 文件)"""
ext_file = plugin_dir / "extensions.py"
if not ext_file.exists():
return {}
# 读取并验证文件内容
with open(ext_file, "r", encoding="utf-8") as f:
content = f.read()
# 安全检查:禁止危险操作
dangerous_patterns = ['import ', 'open(', 'exec(', 'eval(', 'os.', 'sys.', 'subprocess']
for pattern in dangerous_patterns:
if pattern in content:
Log.warn("plugin-loader", f"{ext_file} 包含危险代码: {pattern}")
return {}
safe_globals = {
"__builtins__": {
"True": True,
"False": False,
"None": None,
"dict": dict,
"list": list,
"str": str,
"int": int,
"float": float,
"bool": bool,
}
}
local_vars = {}
try:
code = compile(content, str(ext_file), "exec")
exec(code, safe_globals, local_vars)
except Exception as e:
Log.error("plugin-loader", f"扩展文件解析失败: {e}")
return {}
return {
k: v for k, v in local_vars.items()
if not k.startswith("_") and not callable(v)
}
def load(self, plugin_dir: Path, use_sandbox: bool = True) -> Optional[Any]:
"""加载单个插件"""
main_file = plugin_dir / "main.py"
if not main_file.exists():
return None
manifest = self._load_manifest(plugin_dir)
readme = self._load_readme(plugin_dir)
config = self._load_config(plugin_dir)
extensions = self._load_extensions(plugin_dir)
# 自动扫描能力
capabilities = scan_capabilities(plugin_dir)
plugin_name = plugin_dir.name
# 清理插件名(去掉 } 后缀)
plugin_name = plugin_dir.name.rstrip("}")
# 解析权限
permissions = manifest.get("permissions", [])
# 沙箱加载
if use_sandbox:
from oss.plugin.loader import PluginLoader as FrameworkLoader
framework_loader = FrameworkLoader(enable_sandbox=True)
result = framework_loader.load_sandbox_plugin(plugin_dir)
if not result:
return None
module = result["module"]
instance = result["instance"]
else:
spec = importlib.util.spec_from_file_location(
f"plugin.{plugin_name}", str(main_file)
)
module = importlib.util.module_from_spec(spec)
module.__package__ = f"plugin.{plugin_name}"
module.__path__ = [str(plugin_dir)] # 启用相对导入子模块
sys.modules[spec.name] = module
spec.loader.exec_module(module)
if not hasattr(module, "New"):
return None
instance = module.New()
# 创建代理包装器
if self.permission_check and permissions:
instance = PluginProxy(plugin_name, instance, permissions, self.plugins)
info = PluginInfo()
meta = manifest.get("metadata", {})
info.name = meta.get("name", plugin_name)
info.version = meta.get("version", "")
info.author = meta.get("author", "")
info.description = meta.get("description", "")
info.readme = readme
info.config = manifest.get("config", {}).get("args", config)
info.extensions = extensions
info.capabilities = capabilities
info.dependencies = manifest.get("dependencies", [])
# 注册能力
for cap in capabilities:
self.capability_registry.register_provider(cap, plugin_name, instance)
# 创建生命周期
if self.lifecycle_plugin and plugin_name != "lifecycle":
lc = self.lifecycle_plugin.create(plugin_name)
info.lifecycle = lc
self.plugins[plugin_name] = {
"instance": instance,
"module": module,
"info": info,
"permissions": permissions,
}
return instance
def load_all(self, store_dir: str = "store"):
"""加载 store 下所有插件(跳过自己)"""
# 确保 plugin 命名空间包存在(必须在最开头)
import types
if 'plugin' not in sys.modules:
plugin_pkg = types.ModuleType('plugin')
plugin_pkg.__path__ = []
plugin_pkg.__package__ = 'plugin'
sys.modules['plugin'] = plugin_pkg
Log.tip("plugin-loader", "已创建 plugin 命名空间包")
# 检查是否有任何插件存在
has_plugins = self._check_any_plugins(store_dir)
if not has_plugins:
Log.warn("plugin-loader", "未检测到任何插件,自动引导安装...")
self._bootstrap_installation()
# 加载 lifecycle
lifecycle_plugin = None
lifecycle_dir = Path(store_dir) / "@{FutureOSS}" / "lifecycle"
if lifecycle_dir.exists() and (lifecycle_dir / "main.py").exists():
try:
instance = self.load(lifecycle_dir)
if instance:
lifecycle_plugin = instance
self.plugins.pop("lifecycle", None)
except Exception:
pass
# 加载 dependency
dependency_plugin = None
dependency_dir = Path(store_dir) / "@{FutureOSS}" / "dependency"
if dependency_dir.exists() and (dependency_dir / "main.py").exists():
try:
instance = self.load(dependency_dir)
if instance:
dependency_plugin = instance
self._dependency_plugin = instance
self.plugins.pop("dependency", None)
except Exception:
pass
# 加载 signature-verifier
signature_verifier = None
sig_verifier_dir = Path(store_dir) / "@{FutureOSS}" / "signature-verifier"
if sig_verifier_dir.exists() and (sig_verifier_dir / "main.py").exists():
try:
instance = self.load(sig_verifier_dir)
if instance:
signature_verifier = instance
self.set_signature_verifier(instance.verifier)
Log.ok("plugin-loader", "签名验证服务已加载")
except Exception as e:
Log.warn("plugin-loader", f"signature-verifier 加载失败: {e}")
# 加载 lifecycle
if lifecycle_plugin:
self.set_lifecycle(lifecycle_plugin)
# 加载其他插件
self._load_plugins_from_dir(Path(store_dir))
# 按依赖排序
if dependency_plugin:
self._sort_by_dependencies(dependency_plugin)
def _load_plugins_from_dir(self, store_dir: Path):
"""从指定目录加载插件"""
if not store_dir.exists():
return
# 核心插件列表(不使用沙箱,允许完整功能)
core_plugins = {"webui", "dashboard", "pkg-manager"}
# 第一遍:加载所有插件
for author_dir in store_dir.iterdir():
if author_dir.is_dir():
for plugin_dir in author_dir.iterdir():
if plugin_dir.is_dir() and plugin_dir.name not in ("plugin-loader", "lifecycle", "dependency", "signature-verifier"):
if (plugin_dir / "main.py").exists():
# 核心插件不使用沙箱
use_sandbox = plugin_dir.name not in core_plugins
self.load(plugin_dir, use_sandbox=use_sandbox)
# 第二遍:关联能力
self._link_capabilities()
def _check_any_plugins(self, store_dir: str) -> bool:
"""检查是否存在任何插件"""
store = Path(store_dir)
if store.exists():
for author_dir in store.iterdir():
if author_dir.is_dir():
for plugin_dir in author_dir.iterdir():
if plugin_dir.is_dir() and (plugin_dir / "main.py").exists():
return True
return False
def _bootstrap_installation(self):
"""引导安装 FutureOSS 官方插件"""
Log.info("plugin-loader", "跳过引导安装pkg 插件已移除)")
def _sort_by_dependencies(self, dep_plugin):
"""按依赖关系排序"""
if not dep_plugin:
return
# 添加所有插件的依赖
for name, info in self.plugins.items():
deps = info["info"].dependencies
dep_plugin.add_plugin(name, deps)
try:
order = dep_plugin.resolve()
# 重新排序 plugins
sorted_plugins = {}
for name in order:
if name in self.plugins:
sorted_plugins[name] = self.plugins[name]
# 检查是否所有插件都在排序结果中
missing = set(self.plugins.keys()) - set(sorted_plugins.keys())
for name in missing:
sorted_plugins[name] = self.plugins[name]
self.plugins = sorted_plugins
except Exception as e:
Log.error("plugin-loader", f"依赖解析失败: {e}")
def _link_capabilities(self):
"""关联能力:带权限检查"""
for plugin_name, info in self.plugins.items():
caps = info["info"].capabilities
allowed = info.get("permissions", [])
for cap in caps:
# 如果这个插件是某个能力的提供者
if self.capability_registry.has_capability(cap):
# 找到所有需要这个能力的消费者
consumers = self.capability_registry.get_consumers(cap)
for consumer_name in consumers:
if consumer_name in self.plugins:
consumer_info = self.plugins[consumer_name]["info"]
consumer_allowed = self.plugins[consumer_name].get("permissions", [])
# 权限检查
try:
provider = self.capability_registry.get_provider(
cap,
requester=consumer_name,
allowed_plugins=consumer_allowed
)
if provider and hasattr(consumer_info, "extensions"):
consumer_info.extensions[f"_{cap}_provider"] = provider
except PermissionError as e:
Log.error("plugin-loader", f"权限拒绝: {e}")
def start_all(self):
"""启动所有插件(假设已初始化)"""
# 注入依赖实例
self._inject_dependencies()
# 启动所有插件
for name, info in self.plugins.items():
try:
info["instance"].start()
except Exception as e:
Log.error("plugin-loader", f"启动失败 {name}: {e}")
def init_and_start_all(self):
"""初始化并启动所有插件
正确顺序:
1. 注入依赖实例
2. 按拓扑顺序 init() 所有插件
3. 按拓扑顺序 start() 所有插件
"""
Log.info("plugin-loader", f"init_and_start_all 被调用plugins={len(self.plugins)}")
# 1. 注入依赖实例
self._inject_dependencies()
# 2. 获取拓扑排序
ordered_plugins = self._get_ordered_plugins()
Log.tip("plugin-loader", f"插件启动顺序: {' -> '.join(ordered_plugins)}")
# 3. 初始化所有插件(跳过 plugin-loader 自己)
Log.info("plugin-loader", "开始初始化所有插件...")
for name in ordered_plugins:
if "plugin-loader" in name:
continue
info = self.plugins[name]
try:
Log.info("plugin-loader", f"初始化: {name}")
info["instance"].init()
except Exception as e:
Log.error("plugin-loader", f"初始化失败 {name}: {e}")
# 4. 启动所有插件(跳过 plugin-loader 自己)
Log.info("plugin-loader", "开始启动所有插件...")
for name in ordered_plugins:
if "plugin-loader" in name:
continue
info = self.plugins[name]
try:
Log.info("plugin-loader", f"启动: {name}")
info["instance"].start()
except Exception as e:
Log.error("plugin-loader", f"启动失败 {name}: {e}")
def _get_ordered_plugins(self) -> list[str]:
"""获取按依赖排序的插件列表"""
# 如果没有 dependency 插件,直接返回原始顺序
if not hasattr(self, '_dependency_plugin') or not self._dependency_plugin:
return list(self.plugins.keys())
try:
# 使用 dependency 插件解析
order = self._dependency_plugin.resolve()
# 过滤出实际存在的插件
return [name for name in order if name in self.plugins]
except Exception as e:
Log.warn("plugin-loader", f"依赖解析失败,使用原始顺序: {e}")
return list(self.plugins.keys())
def _inject_dependencies(self):
"""注入插件依赖实例"""
Log.info("plugin-loader", f"开始注入依赖,共 {len(self.plugins)} 个插件")
# 构建名称映射(处理 } 后缀问题)
name_map = {}
for name in self.plugins:
clean = name.rstrip("}")
name_map[clean] = name
name_map[clean + "}"] = name
for name, info in self.plugins.items():
instance = info["instance"]
info_obj = info.get("info")
if not info_obj:
continue
deps = info_obj.dependencies
if not deps:
continue
Log.tip("plugin-loader", f"{name} 依赖: {deps}")
for dep_name in deps:
# 使用名称映射查找
actual_dep = name_map.get(dep_name) or name_map.get(dep_name + "}")
if actual_dep and actual_dep in self.plugins:
dep_instance = self.plugins[actual_dep]["instance"]
setter_name = f"set_{dep_name.replace('-', '_')}"
Log.tip("plugin-loader", f"尝试注入: {name} <- {actual_dep} ({setter_name})")
if hasattr(instance, setter_name):
try:
getattr(instance, setter_name)(dep_instance)
Log.ok("plugin-loader", f"注入成功: {name} <- {actual_dep}")
except Exception as e:
Log.error("plugin-loader", f"注入依赖失败 {name}.{setter_name}: {e}")
else:
Log.warn("plugin-loader", f"{name} 没有 {setter_name} 方法")
def stop_all(self):
"""停止所有插件"""
for name, info in reversed(list(self.plugins.items())):
try:
info["instance"].stop()
except Exception:
pass
if self.lifecycle_plugin:
self.lifecycle_plugin.stop_all()
def get_info(self, name: str) -> Optional[PluginInfo]:
"""获取插件信息"""
if name in self.plugins:
return self.plugins[name]["info"]
return None
def has_capability(self, capability: str) -> bool:
"""检查系统是否有某个能力"""
return self.capability_registry.has_capability(capability)
def get_capability_provider(self, capability: str) -> Optional[Any]:
"""获取能力提供者"""
return self.capability_registry.get_provider(capability)
class PluginLoaderPlugin(Plugin):
"""插件加载器插件"""
def __init__(self):
self.manager = PluginManager()
self._loaded = False
self._started = False
self._ensure_plugin_package()
def _ensure_plugin_package(self):
"""确保 plugin 命名空间包存在"""
import types
if 'plugin' not in sys.modules:
plugin_pkg = types.ModuleType('plugin')
plugin_pkg.__path__ = []
sys.modules['plugin'] = plugin_pkg
def init(self, deps: dict = None):
"""加载所有插件"""
if self._loaded:
return
self._loaded = True
# 确保 plugin 命名空间包存在(必须在加载任何插件之前)
import types
if 'plugin' not in sys.modules:
plugin_pkg = types.ModuleType('plugin')
plugin_pkg.__path__ = []
sys.modules['plugin'] = plugin_pkg
Log.info("plugin-loader", "开始加载插件...")
self.manager.load_all()
def start(self):
"""启动所有插件"""
if self._started:
return
self._started = True
Log.info("plugin-loader", "启动插件...")
self.manager.init_and_start_all()
def stop(self):
"""停止所有插件"""
Log.info("plugin-loader", "停止插件...")
self.manager.stop_all()
# 注册类型
register_plugin_type("PluginManager", PluginManager)
register_plugin_type("PluginInfo", PluginInfo)
register_plugin_type("CapabilityRegistry", CapabilityRegistry)
def New():
return PluginLoaderPlugin()