update branch

This commit is contained in:
qwen.ai[bot]
2026-04-25 06:07:33 +00:00
parent fe71635899
commit 9322dc857f
4 changed files with 408 additions and 54 deletions

78
.gitignore vendored
View File

@@ -1,72 +1,42 @@
``` ```
# Compiled and build artifacts # Python
*.pyc
__pycache__/ __pycache__/
*.o *.pyc
*.obj *.pyo
*.so *.pyd
*.dll *.py~
*.exe
*.class
*.out
# Dependencies # Dependencies
.venv/ .venv/
venv/ venv/
node_modules/
dist/
build/
target/
.gradle/
.mypy_cache/
.pytest_cache/
# Logs and temp files
*.log
*.tmp
*.swp
*.swo
*.out
# Environment
.env .env
.env.local .env.local
*.env.* .env.*
# Editors # Logs
.vscode/ *.log
.idea/
# System files # OS
.DS_Store .DS_Store
Thumbs.db Thumbs.db
# Coverage # Backup files
*~
*.bak
*.swp
*.swo
# Coverage reports
coverage/ coverage/
htmlcov/ htmlcov/
.coverage .coverage
# Compressed files # Testing
*.zip .pytest_cache/
*.gz .mypy_cache/
*.tar
*.tgz # Distribution / packaging
*.bz2 dist/
*.xz build/
*.7z *.egg-info/
*.rar
*.zst
*.lz4
*.lzh
*.cab
*.arj
*.rpm
*.deb
*.Z
*.lz
*.lzo
*.tar.gz
*.tar.bz2
*.tar.xz
*.tar.zst
``` ```

View File

@@ -0,0 +1,370 @@
"""
Auto Dependency Plugin - 依赖自动安装插件
该插件允许其他插件在声明文件 (manifest.json) 中声明所需的系统依赖,
然后扫描所有插件的声明文件,检查并安装缺失的系统依赖。
通过插件加载器的 /PL 注入能力接口进行对接。
"""
import json
import os
import subprocess
import sys
from typing import List, Dict, Any, Optional
from pathlib import Path
from oss.plugin.base import BasePlugin
from oss.core.context import Context
class AutoDependencyPlugin(BasePlugin):
"""依赖自动安装插件"""
def __init__(self):
super().__init__()
self.name = "auto_dependency"
self.version = "1.0.0"
self.description = "自动扫描并安装插件声明的系统依赖"
self.plugins_dir: Optional[Path] = None
self.manifest_filename = "manifest.json"
self.logger = None
def init(self, deps: Optional[Dict[str, Any]] = None):
"""初始化插件"""
# 获取插件目录路径
self.plugins_dir = Path(__file__).parent
if deps and 'logger' in deps:
self.logger = deps['logger']
else:
import logging
self.logger = logging.getLogger(self.name)
self.logger.info(f"AutoDependencyPlugin 初始化完成,插件目录:{self.plugins_dir}")
def start(self):
"""启动插件"""
self.logger.info("AutoDependencyPlugin 启动")
def stop(self):
"""停止插件"""
self.logger.info("AutoDependencyPlugin 停止")
def scan(self) -> List[Dict[str, Any]]:
"""
扫描所有插件的声明文件,收集系统依赖信息
Returns:
List[Dict]: 包含所有插件依赖信息的列表
每个元素格式: {
"plugin": str, # 插件名称
"dependencies": List, # 依赖列表
"package_manager": str # 包管理器类型
}
"""
all_dependencies = []
if not self.plugins_dir.exists():
self.logger.warning(f"插件目录不存在: {self.plugins_dir}")
return all_dependencies
# 遍历所有插件文件
for plugin_file in self.plugins_dir.glob("*.py"):
plugin_name = plugin_file.stem
# 跳过自身和__init__等文件
if plugin_name.startswith("_") or plugin_name == self.name:
continue
# 查找对应的 manifest 文件
manifest_path = self._find_manifest_for_plugin(plugin_name)
if manifest_path and manifest_path.exists():
try:
with open(manifest_path, 'r', encoding='utf-8') as f:
manifest = json.load(f)
# 提取系统依赖信息
system_deps = manifest.get("system_dependencies", [])
package_manager = manifest.get("package_manager", "apt-get")
if system_deps:
all_dependencies.append({
"plugin": plugin_name,
"dependencies": system_deps,
"package_manager": package_manager,
"manifest_path": str(manifest_path)
})
self.logger.info(
f"插件 {plugin_name} 声明了 {len(system_deps)} 个系统依赖"
)
except json.JSONDecodeError as e:
self.logger.error(f"解析 {manifest_path} 失败: {e}")
except Exception as e:
self.logger.error(f"处理插件 {plugin_name} 时出错: {e}")
return all_dependencies
def check(self, dependencies: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
检查指定的系统依赖是否已安装
Args:
dependencies: 依赖信息列表,格式同 scan() 返回值
Returns:
Dict: 检查结果
{
"total": int, # 总依赖数
"installed": int, # 已安装数
"missing": List[Dict], # 缺失的依赖详情
"all_installed": bool # 是否全部已安装
}
"""
result = {
"total": 0,
"installed": 0,
"missing": [],
"all_installed": True
}
for dep_info in dependencies:
plugin_name = dep_info["plugin"]
package_manager = dep_info["package_manager"]
for package in dep_info["dependencies"]:
result["total"] += 1
if self._is_package_installed(package, package_manager):
result["installed"] += 1
self.logger.debug(f"{package} 已安装 (插件: {plugin_name})")
else:
result["missing"].append({
"package": package,
"plugin": plugin_name,
"package_manager": package_manager
})
result["all_installed"] = False
self.logger.warning(f"{package} 未安装 (插件: {plugin_name})")
return result
def install(self, missing: List[Dict[str, str]],
auto_confirm: bool = True) -> Dict[str, Any]:
"""
安装缺失的系统依赖
Args:
missing: 缺失的依赖列表,格式为 [{"package": str, "package_manager": str}]
auto_confirm: 是否自动确认安装
Returns:
Dict: 安装结果
{
"success": List[str], # 成功安装的包
"failed": List[Dict], # 安装失败的包及原因
"total": int, # 尝试安装的总数
}
"""
result = {
"success": [],
"failed": [],
"total": len(missing)
}
if not missing:
self.logger.info("没有需要安装的依赖")
return result
# 按包管理器分组
packages_by_pm: Dict[str, List[str]] = {}
for item in missing:
pm = item.get("package_manager", "apt-get")
pkg = item["package"]
if pm not in packages_by_pm:
packages_by_pm[pm] = []
packages_by_pm[pm].append(pkg)
# 执行安装
for pm, packages in packages_by_pm.items():
self.logger.info(f"使用 {pm} 安装包: {', '.join(packages)}")
success, failed = self._install_packages(packages, pm, auto_confirm)
result["success"].extend(success)
for fail_pkg, reason in failed:
result["failed"].append({
"package": fail_pkg,
"reason": reason
})
return result
def info(self) -> Dict[str, Any]:
"""
获取插件信息
Returns:
Dict: 插件详细信息
"""
return {
"name": self.name,
"version": self.version,
"description": self.description,
"supported_package_managers": [
"apt-get", "yum", "dnf", "pacman", "brew", "apk"
],
"api_methods": ["scan", "check", "install", "info"]
}
def _find_manifest_for_plugin(self, plugin_name: str) -> Optional[Path]:
"""查找插件对应的 manifest 文件"""
# 可能的 manifest 文件位置
possible_paths = [
self.plugins_dir / f"{plugin_name}.json",
self.plugins_dir / plugin_name / "manifest.json",
self.plugins_dir / f"{plugin_name}" / f"{plugin_name}.json",
]
for path in possible_paths:
if path.exists():
return path
# 也检查插件文件同目录下的同名 json 文件
plugin_file = self.plugins_dir / f"{plugin_name}.py"
if plugin_file.exists():
json_file = self.plugins_dir / f"{plugin_name}.json"
if json_file.exists():
return json_file
return None
def _is_package_installed(self, package: str, package_manager: str) -> bool:
"""检查包是否已安装"""
try:
if package_manager in ["apt-get", "apt"]:
cmd = ["dpkg", "-l", package]
elif package_manager in ["yum", "dnf"]:
cmd = ["rpm", "-q", package]
elif package_manager == "pacman":
cmd = ["pacman", "-Q", package]
elif package_manager == "brew":
cmd = ["brew", "list", "--versions", package]
elif package_manager == "apk":
cmd = ["apk", "info", "-e", package]
else:
# 默认使用 which/whereis 检查可执行文件
cmd = ["which", package]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
return result.returncode == 0
except subprocess.TimeoutExpired:
self.logger.warning(f"检查包 {package} 超时")
return False
except Exception as e:
self.logger.error(f"检查包 {package} 时出错: {e}")
return False
def _install_packages(self, packages: List[str],
package_manager: str,
auto_confirm: bool = True) -> tuple:
"""
安装包
Returns:
tuple: (success_list, failed_list)
success_list: 成功安装的包名列表
failed_list: [(包名, 失败原因), ...]
"""
success = []
failed = []
try:
if package_manager in ["apt-get", "apt"]:
cmd_prefix = ["apt-get", "install", "-y"] if auto_confirm else ["apt-get", "install"]
elif package_manager == "yum":
cmd_prefix = ["yum", "install", "-y"] if auto_confirm else ["yum", "install"]
elif package_manager == "dnf":
cmd_prefix = ["dnf", "install", "-y"] if auto_confirm else ["dnf", "install"]
elif package_manager == "pacman":
cmd_prefix = ["pacman", "-S", "--noconfirm"] if auto_confirm else ["pacman", "-S"]
elif package_manager == "brew":
cmd_prefix = ["brew", "install"]
elif package_manager == "apk":
cmd_prefix = ["apk", "add"]
else:
self.logger.error(f"不支持的包管理器: {package_manager}")
for pkg in packages:
failed.append((pkg, f"不支持的包管理器: {package_manager}"))
return success, failed
# 合并命令
cmd = cmd_prefix + packages
self.logger.info(f"执行安装命令: {' '.join(cmd)}")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=300 # 5 分钟超时
)
if result.returncode == 0:
success.extend(packages)
self.logger.info(f"成功安装包: {', '.join(packages)}")
else:
error_msg = result.stderr.strip() or result.stdout.strip()
for pkg in packages:
failed.append((pkg, error_msg))
self.logger.error(f"安装包失败: {error_msg}")
except subprocess.TimeoutExpired:
for pkg in packages:
failed.append((pkg, "安装超时"))
self.logger.error("安装包超时")
except PermissionError:
for pkg in packages:
failed.append((pkg, "权限不足,需要 root 权限"))
self.logger.error("安装包需要 root 权限")
except Exception as e:
for pkg in packages:
failed.append((pkg, str(e)))
self.logger.error(f"安装包时发生异常: {e}")
return success, failed
def execute(self, action: str, **kwargs) -> Any:
"""
执行插件动作 (供插件加载器调用)
Args:
action: 动作名称 (scan, check, install, info)
**kwargs: 动作参数
Returns:
动作执行结果
"""
if action == "scan":
return self.scan()
elif action == "check":
dependencies = kwargs.get("dependencies", self.scan())
return self.check(dependencies)
elif action == "install":
missing = kwargs.get("missing", [])
auto_confirm = kwargs.get("auto_confirm", True)
return self.install(missing, auto_confirm)
elif action == "info":
return self.info()
else:
raise ValueError(f"未知的动作: {action}")

View File

@@ -0,0 +1,7 @@
{
"name": "firewall",
"version": "1.0.0",
"description": "防火墙管理插件",
"system_dependencies": ["iptables", "ufw"],
"package_manager": "apt-get"
}

View File

@@ -0,0 +1,7 @@
{
"name": "ftp_server",
"version": "1.0.0",
"description": "FTP 服务器插件",
"system_dependencies": ["vsftpd", "proftpd"],
"package_manager": "apt-get"
}