Files
NebulaShell/store/@{FutureOSS}/plugin-loader/main.py
Falck 76147bae94 初始提交 - FutureOSS v1.0 插件化运行时框架
一切皆为插件的开发者工具运行时框架

🧩 核心特性:
  - 插件热插拔 (importlib 动态加载)
  - 依赖自动解析 (拓扑排序 + 循环检测)
  - 企业级稳定 (熔断/降级/重试/隔离)
  - 事件驱动 (发布/订阅事件总线)
  - 完整配置 (YAML 配置 + 热重载)
2026-04-06 09:57:10 +08:00

610 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""插件加载器插件 - 支持能力扫描和扩展"""
import sys
import json
import importlib.util
from pathlib import Path
from typing import Any, Optional
from oss.plugin.types import Plugin, register_plugin_type
from oss.plugin.capabilities import scan_capabilities
class PluginInfo:
"""插件信息"""
def __init__(self):
self.name: str = ""
self.version: str = ""
self.author: str = ""
self.description: str = ""
self.readme: str = ""
self.config: dict[str, Any] = {}
self.extensions: dict[str, Any] = {}
self.lifecycle: Any = None
self.capabilities: set[str] = set()
self.dependencies: list[str] = []
class PermissionError(Exception):
"""权限错误"""
pass
class PluginProxy:
"""插件代理 - 防止越级访问"""
def __init__(self, plugin_name: str, plugin_instance: Any, allowed_plugins: list[str], all_plugins: dict[str, dict[str, Any]]):
self._plugin_name = plugin_name
self._plugin_instance = plugin_instance
self._allowed_plugins = set(allowed_plugins)
self._all_plugins = all_plugins
def get_plugin(self, name: str) -> Any:
"""获取其他插件实例(带权限检查)"""
if name not in self._allowed_plugins and "*" not in self._allowed_plugins:
raise PermissionError(f"插件 '{self._plugin_name}' 无权访问插件 '{name}'")
if name not in self._all_plugins:
return None
return self._all_plugins[name]["instance"]
def list_plugins(self) -> list[str]:
"""列出有权限访问的插件"""
if "*" in self._allowed_plugins:
return list(self._all_plugins.keys())
return [name for name in self._allowed_plugins if name in self._all_plugins]
def get_capability(self, capability: str) -> Any:
"""获取能力(带权限检查)"""
# 能力访问不需要额外权限,能力注册表会自动处理
return None
def __getattr__(self, name: str):
"""代理其他属性到插件实例"""
return getattr(self._plugin_instance, name)
class CapabilityRegistry:
"""能力注册表"""
def __init__(self, permission_check: bool = True):
self.providers: dict[str, dict[str, Any]] = {}
self.consumers: dict[str, list[str]] = {}
self.permission_check = permission_check
def register_provider(self, capability: str, plugin_name: str, instance: Any):
"""注册能力提供者"""
self.providers[capability] = {
"plugin": plugin_name,
"instance": instance,
}
if capability not in self.consumers:
self.consumers[capability] = []
def register_consumer(self, capability: str, plugin_name: str):
"""注册能力消费者"""
if capability not in self.consumers:
self.consumers[capability] = []
if plugin_name not in self.consumers[capability]:
self.consumers[capability].append(plugin_name)
def get_provider(self, capability: str, requester: str = "", allowed_plugins: list[str] = None) -> Optional[Any]:
"""获取能力提供者实例(带权限检查)"""
if capability not in self.providers:
return None
if self.permission_check and allowed_plugins is not None:
provider_name = self.providers[capability]["plugin"]
if provider_name != requester and provider_name not in allowed_plugins and "*" not in allowed_plugins:
raise PermissionError(f"插件 '{requester}' 无权使用能力 '{capability}'")
return self.providers[capability]["instance"]
def has_capability(self, capability: str) -> bool:
"""检查是否有某个能力"""
return capability in self.providers
def get_consumers(self, capability: str) -> list[str]:
"""获取能力消费者列表"""
return self.consumers.get(capability, [])
class PluginManager:
"""插件管理器"""
def __init__(self, permission_check: bool = True):
self.plugins: dict[str, dict[str, Any]] = {}
self.lifecycle_plugin: Optional[Any] = None
self._dependency_plugin: Optional[Any] = None # dependency 插件引用
self.capability_registry = CapabilityRegistry(permission_check=permission_check)
self.permission_check = permission_check
def set_lifecycle(self, lifecycle_plugin: Any):
"""设置生命周期插件"""
self.lifecycle_plugin = lifecycle_plugin
def _load_manifest(self, plugin_dir: Path) -> dict[str, Any]:
"""加载 manifest.json"""
manifest_file = plugin_dir / "manifest.json"
if not manifest_file.exists():
return {}
with open(manifest_file, "r", encoding="utf-8") as f:
return json.load(f)
def _load_readme(self, plugin_dir: Path) -> str:
"""加载 README.md"""
readme_file = plugin_dir / "README.md"
if not readme_file.exists():
return ""
with open(readme_file, "r", encoding="utf-8") as f:
return f.read()
def _load_config(self, plugin_dir: Path) -> dict[str, Any]:
"""加载 Python 配置文件(带安全措施)"""
config_file = plugin_dir / "config.py"
if not config_file.exists():
return {}
safe_globals = {
"__builtins__": {
"True": True,
"False": False,
"None": None,
"dict": dict,
"list": list,
"str": str,
"int": int,
"float": float,
"bool": bool,
}
}
local_vars = {}
with open(config_file, "r", encoding="utf-8") as f:
code = compile(f.read(), str(config_file), "exec")
exec(code, safe_globals, local_vars)
return {
k: v for k, v in local_vars.items()
if not k.startswith("_") and not callable(v)
}
def _load_extensions(self, plugin_dir: Path) -> dict[str, Any]:
"""加载扩展语法Python 文件)"""
ext_file = plugin_dir / "extensions.py"
if not ext_file.exists():
return {}
safe_globals = {
"__builtins__": {
"True": True,
"False": False,
"None": None,
"dict": dict,
"list": list,
"str": str,
"int": int,
"float": float,
"bool": bool,
}
}
local_vars = {}
with open(ext_file, "r", encoding="utf-8") as f:
code = compile(f.read(), str(ext_file), "exec")
exec(code, safe_globals, local_vars)
return {
k: v for k, v in local_vars.items()
if not k.startswith("_") and not callable(v)
}
def load(self, plugin_dir: Path, use_sandbox: bool = True) -> Optional[Any]:
"""加载单个插件"""
main_file = plugin_dir / "main.py"
if not main_file.exists():
return None
manifest = self._load_manifest(plugin_dir)
readme = self._load_readme(plugin_dir)
config = self._load_config(plugin_dir)
extensions = self._load_extensions(plugin_dir)
# 自动扫描能力
capabilities = scan_capabilities(plugin_dir)
plugin_name = plugin_dir.name
# 清理插件名(去掉 } 后缀)
plugin_name = plugin_dir.name.rstrip("}")
# 解析权限
permissions = manifest.get("permissions", [])
# 沙箱加载
if use_sandbox:
from oss.plugin.loader import PluginLoader as FrameworkLoader
framework_loader = FrameworkLoader(enable_sandbox=True)
result = framework_loader.load_sandbox_plugin(plugin_dir)
if not result:
return None
module = result["module"]
instance = result["instance"]
else:
spec = importlib.util.spec_from_file_location(
f"plugin.{plugin_name}", str(main_file)
)
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
if not hasattr(module, "New"):
return None
instance = module.New()
# 创建代理包装器
if self.permission_check and permissions:
instance = PluginProxy(plugin_name, instance, permissions, self.plugins)
info = PluginInfo()
meta = manifest.get("metadata", {})
info.name = meta.get("name", plugin_name)
info.version = meta.get("version", "")
info.author = meta.get("author", "")
info.description = meta.get("description", "")
info.readme = readme
info.config = manifest.get("config", {}).get("args", config)
info.extensions = extensions
info.capabilities = capabilities
info.dependencies = manifest.get("dependencies", [])
# 注册能力
for cap in capabilities:
self.capability_registry.register_provider(cap, plugin_name, instance)
# 创建生命周期
if self.lifecycle_plugin and plugin_name != "lifecycle":
lc = self.lifecycle_plugin.create(plugin_name)
info.lifecycle = lc
self.plugins[plugin_name] = {
"instance": instance,
"module": module,
"info": info,
"permissions": permissions,
}
return instance
def load_all(self, store_dir: str = "store"):
"""加载 store 和 data/pkg 下所有插件(跳过自己)"""
# 检查是否有任何插件存在
has_plugins = self._check_any_plugins(store_dir)
if not has_plugins:
print("[plugin-loader] 未检测到任何插件,自动引导安装...")
self._bootstrap_installation()
# 可选:加载 lifecycle
lifecycle_plugin = None
lifecycle_dir = Path(store_dir) / "@{FutureOSS}" / "lifecycle"
if lifecycle_dir.exists() and (lifecycle_dir / "main.py").exists():
try:
instance = self.load(lifecycle_dir)
if instance:
lifecycle_plugin = instance
self.plugins.pop("lifecycle", None)
except Exception:
pass
# 可选:加载 dependency
dependency_plugin = None
dependency_dir = Path(store_dir) / "@{FutureOSS}" / "dependency"
if dependency_dir.exists() and (dependency_dir / "main.py").exists():
try:
instance = self.load(dependency_dir)
if instance:
dependency_plugin = instance
self._dependency_plugin = instance # 保存引用供拓扑排序使用
self.plugins.pop("dependency", None)
except Exception:
pass
# 加载 lifecycle
if lifecycle_plugin:
self.set_lifecycle(lifecycle_plugin)
# 加载其他插件
self._load_plugins_from_dir(Path(store_dir))
self._load_plugins_from_dir(Path("./data/pkg"))
# 可选:按依赖排序
if dependency_plugin:
self._sort_by_dependencies(dependency_plugin)
def _load_plugins_from_dir(self, store_dir: Path):
"""从指定目录加载插件"""
if not store_dir.exists():
return
# 第一遍:加载所有插件
for author_dir in store_dir.iterdir():
if author_dir.is_dir():
for plugin_dir in author_dir.iterdir():
if plugin_dir.is_dir() and plugin_dir.name not in ("plugin-loader", "lifecycle", "dependency"):
if (plugin_dir / "main.py").exists():
self.load(plugin_dir)
# 第二遍:关联能力
self._link_capabilities()
def _check_any_plugins(self, store_dir: str) -> bool:
"""检查是否存在任何插件"""
store = Path(store_dir)
if store.exists():
for author_dir in store.iterdir():
if author_dir.is_dir():
for plugin_dir in author_dir.iterdir():
if plugin_dir.is_dir() and (plugin_dir / "main.py").exists():
return True
pkg_dir = Path("./data/pkg")
if pkg_dir.exists():
for d in pkg_dir.iterdir():
if d.is_dir() and (d / "main.py").exists():
return True
return False
def _bootstrap_installation(self):
"""引导安装 FutureOSS 官方插件"""
# 加载 pkg 插件
pkg_dir = Path("store/@{FutureOSS}/pkg")
if pkg_dir.exists() and (pkg_dir / "main.py").exists():
try:
pkg_instance = self.load(pkg_dir, use_sandbox=False)
if pkg_instance:
pkg_mgr = pkg_instance.manager
print("[plugin-loader] 正在搜索可用插件...")
results = pkg_mgr.search()
if not results:
print("[plugin-loader] 未找到远程插件")
return
print(f"[plugin-loader] 发现 {len(results)} 个插件,开始安装...")
installed_count = 0
for pkg_info in results:
print(f"[plugin-loader] 安装: {pkg_info.name}")
if pkg_mgr.install(pkg_info.name):
installed_count += 1
if installed_count > 0:
print(f"[plugin-loader] 已安装 {installed_count} 个插件,重新扫描加载...")
# pkg 保留,重新加载其他插件
except Exception as e:
print(f"[plugin-loader] 引导安装失败: {e}")
else:
print("[plugin-loader] pkg 插件不存在,跳过引导安装")
def _sort_by_dependencies(self, dep_plugin):
"""按依赖关系排序"""
if not dep_plugin:
return
# 添加所有插件的依赖
for name, info in self.plugins.items():
deps = info["info"].dependencies
dep_plugin.add_plugin(name, deps)
try:
order = dep_plugin.resolve()
# 重新排序 plugins
sorted_plugins = {}
for name in order:
if name in self.plugins:
sorted_plugins[name] = self.plugins[name]
# 检查是否所有插件都在排序结果中
missing = set(self.plugins.keys()) - set(sorted_plugins.keys())
for name in missing:
sorted_plugins[name] = self.plugins[name]
self.plugins = sorted_plugins
except Exception as e:
print(f"[plugin-loader] 依赖解析失败: {e}")
def _link_capabilities(self):
"""关联能力:带权限检查"""
for plugin_name, info in self.plugins.items():
caps = info["info"].capabilities
allowed = info.get("permissions", [])
for cap in caps:
# 如果这个插件是某个能力的提供者
if self.capability_registry.has_capability(cap):
# 找到所有需要这个能力的消费者
consumers = self.capability_registry.get_consumers(cap)
for consumer_name in consumers:
if consumer_name in self.plugins:
consumer_info = self.plugins[consumer_name]["info"]
consumer_allowed = self.plugins[consumer_name].get("permissions", [])
# 权限检查
try:
provider = self.capability_registry.get_provider(
cap,
requester=consumer_name,
allowed_plugins=consumer_allowed
)
if provider and hasattr(consumer_info, "extensions"):
consumer_info.extensions[f"_{cap}_provider"] = provider
except PermissionError as e:
print(f"[plugin-loader] 权限拒绝: {e}")
def start_all(self):
"""启动所有插件(假设已初始化)"""
# 注入依赖实例
self._inject_dependencies()
# 启动所有插件
for name, info in self.plugins.items():
try:
info["instance"].start()
except Exception as e:
print(f"[plugin-loader] 启动失败 {name}: {e}")
def init_and_start_all(self):
"""初始化并启动所有插件
正确顺序:
1. 注入依赖实例
2. 按拓扑顺序 init() 所有插件
3. 按拓扑顺序 start() 所有插件
"""
print(f"[plugin-loader] init_and_start_all 被调用plugins={len(self.plugins)}")
# 1. 注入依赖实例
self._inject_dependencies()
# 2. 获取拓扑排序
ordered_plugins = self._get_ordered_plugins()
print(f"[plugin-loader] 插件启动顺序: {' -> '.join(ordered_plugins)}")
# 3. 初始化所有插件(跳过 plugin-loader 自己)
print("[plugin-loader] 开始初始化所有插件...")
for name in ordered_plugins:
if "plugin-loader" in name:
continue
info = self.plugins[name]
try:
print(f"[plugin-loader] 初始化: {name}")
info["instance"].init()
except Exception as e:
print(f"[plugin-loader] 初始化失败 {name}: {e}")
# 4. 启动所有插件(跳过 plugin-loader 自己)
print("[plugin-loader] 开始启动所有插件...")
for name in ordered_plugins:
if "plugin-loader" in name:
continue
info = self.plugins[name]
try:
print(f"[plugin-loader] 启动: {name}")
info["instance"].start()
except Exception as e:
print(f"[plugin-loader] 启动失败 {name}: {e}")
def _get_ordered_plugins(self) -> list[str]:
"""获取按依赖排序的插件列表"""
# 如果没有 dependency 插件,直接返回原始顺序
if not hasattr(self, '_dependency_plugin') or not self._dependency_plugin:
return list(self.plugins.keys())
try:
# 使用 dependency 插件解析
order = self._dependency_plugin.resolve()
# 过滤出实际存在的插件
return [name for name in order if name in self.plugins]
except Exception as e:
print(f"[plugin-loader] 依赖解析失败,使用原始顺序: {e}")
return list(self.plugins.keys())
def _inject_dependencies(self):
"""注入插件依赖实例"""
print(f"[plugin-loader] 开始注入依赖,共 {len(self.plugins)} 个插件")
# 构建名称映射(处理 } 后缀问题)
name_map = {}
for name in self.plugins:
clean = name.rstrip("}")
name_map[clean] = name
name_map[clean + "}"] = name
for name, info in self.plugins.items():
instance = info["instance"]
info_obj = info.get("info")
if not info_obj:
continue
deps = info_obj.dependencies
if not deps:
continue
print(f"[plugin-loader] {name} 依赖: {deps}")
for dep_name in deps:
# 使用名称映射查找
actual_dep = name_map.get(dep_name) or name_map.get(dep_name + "}")
if actual_dep and actual_dep in self.plugins:
dep_instance = self.plugins[actual_dep]["instance"]
setter_name = f"set_{dep_name.replace('-', '_')}"
print(f"[plugin-loader] 尝试注入: {name} <- {actual_dep} ({setter_name})")
if hasattr(instance, setter_name):
try:
getattr(instance, setter_name)(dep_instance)
print(f"[plugin-loader] 注入成功: {name} <- {actual_dep}")
except Exception as e:
print(f"[plugin-loader] 注入依赖失败 {name}.{setter_name}: {e}")
else:
print(f"[plugin-loader] 警告: {name} 没有 {setter_name} 方法")
def stop_all(self):
"""停止所有插件"""
for name, info in reversed(list(self.plugins.items())):
try:
info["instance"].stop()
except Exception:
pass
if self.lifecycle_plugin:
self.lifecycle_plugin.stop_all()
def get_info(self, name: str) -> Optional[PluginInfo]:
"""获取插件信息"""
if name in self.plugins:
return self.plugins[name]["info"]
return None
def has_capability(self, capability: str) -> bool:
"""检查系统是否有某个能力"""
return self.capability_registry.has_capability(capability)
def get_capability_provider(self, capability: str) -> Optional[Any]:
"""获取能力提供者"""
return self.capability_registry.get_provider(capability)
class PluginLoaderPlugin(Plugin):
"""插件加载器插件"""
def __init__(self):
self.manager = PluginManager()
self._loaded = False
self._started = False
def init(self, deps: dict = None):
"""加载所有插件"""
if self._loaded:
return
self._loaded = True
print("[plugin-loader] 开始加载插件...")
self.manager.load_all()
def start(self):
"""启动所有插件"""
if self._started:
return
self._started = True
print("[plugin-loader] 启动插件...")
self.manager.init_and_start_all()
def stop(self):
"""停止所有插件"""
print("[plugin-loader] 停止插件...")
self.manager.stop_all()
# 注册类型
register_plugin_type("PluginManager", PluginManager)
register_plugin_type("PluginInfo", PluginInfo)
register_plugin_type("CapabilityRegistry", CapabilityRegistry)
def New():
return PluginLoaderPlugin()