Title: 继续修复所有错误

Key features implemented:
- Updated .gitignore to include additional cache and coverage directories (.mypy_cache/, .pytest_cache/, coverage/, htmlcov/)
- Enhanced XSS protection in pkg-manager plugin by adding HTML escaping for all user data in page rendering functions
- Improved PL injection security in plugin-loader with enhanced static source code analysis including base64 decoding checks and string concatenation bypass detection
- Strengthened configuration file loading security using ast.literal_eval for safe parsing and stricter code execution prevention
- Added comprehensive dangerous pattern checks in PL injection static analysis to prevent various bypass techniques

The updates focus on security hardening across the plugin system, particularly addressing input sanitization and code injection vulnerabilities while expanding the project's ignore patterns for better repository cleanliness.
This commit is contained in:
qwen.ai[bot]
2026-04-25 14:14:33 +00:00
committed by Falck
parent 64c8713945
commit 902d2782cf
8 changed files with 156 additions and 50 deletions

29
.gitignore vendored
View File

@@ -5,9 +5,6 @@ __pycache__/
*.pyo
*.pyd
# Dependencies
node_modules/
# Logs and temp files
*.log
*.tmp
@@ -17,12 +14,32 @@ node_modules/
.env.local
*.env.*
# Editors
.vscode/
.idea/
# Dependencies
.venv/
venv/
node_modules/
# Build artifacts
dist/
build/
target/
# Coverage
.coverage
coverage/
htmlcov/
# Editors
.vscode/
.idea/
# System
.DS_Store
Thumbs.db
# MyPy
.mypy_cache/
# Pytest
.pytest_cache/
```

View File

@@ -2,6 +2,7 @@
import os
import sys
import json
import html
import urllib.request
from pathlib import Path
from oss.logger.logger import Log
@@ -112,15 +113,19 @@ class PkgManagerPlugin(Plugin):
for pkg_name, info in plugins.items():
status_class = "success" if info.get('enabled', False) else "secondary"
status_text = "已启用" if info.get('enabled', False) else "已禁用"
# XSS 防护:对所有用户数据进行 HTML 转义
safe_pkg_name = html.escape(pkg_name)
safe_version = html.escape(str(info.get('version', '未知')))
safe_author = html.escape(str(info.get('author', '未知')))
plugin_rows += f"""
<tr>
<td>{pkg_name}</td>
<td>{info.get('version', '未知')}</td>
<td>{info.get('author', '未知')}</td>
<td>{safe_pkg_name}</td>
<td>{safe_version}</td>
<td>{safe_author}</td>
<td><span class="badge badge-{status_class}">{status_text}</span></td>
<td>
<button class="btn btn-sm btn-primary" onclick="togglePlugin('{pkg_name}')">切换状态</button>
<button class="btn btn-sm btn-danger" onclick="uninstallPlugin('{pkg_name}')">卸载</button>
<button class="btn btn-sm btn-primary" onclick="togglePlugin('{safe_pkg_name}')">切换状态</button>
<button class="btn btn-sm btn-danger" onclick="uninstallPlugin('{safe_pkg_name}')">卸载</button>
</td>
</tr>"""
@@ -209,15 +214,23 @@ class PkgManagerPlugin(Plugin):
plugin_cards = ""
for pkg_name, info in available.items():
is_installed = pkg_name in installed
action_btn = f'<button class="btn btn-success" onclick="installPlugin(\'{pkg_name}\')">安装</button>' if not is_installed else '<button class="btn btn-secondary" disabled>已安装</button>'
# XSS 防护:对所有用户数据进行 HTML 转义
safe_pkg_name = html.escape(pkg_name)
safe_name = html.escape(str(info.get('name', pkg_name)))
safe_desc = html.escape(str(info.get('description', '暂无描述')))
safe_version = html.escape(str(info.get('version', '未知')))
safe_author = html.escape(str(info.get('author', '未知')))
# JavaScript 中的字符串也需要转义
js_safe_pkg_name = pkg_name.replace('\\', '\\\\').replace("'", "\\'").replace('"', '\\"')
action_btn = f'<button class="btn btn-success" onclick="installPlugin(\'{js_safe_pkg_name}\')">安装</button>' if not is_installed else '<button class="btn btn-secondary" disabled>已安装</button>'
plugin_cards += f"""
<div class="plugin-card">
<div class="plugin-icon"><i class="ri-plug-line"></i></div>
<h3>{info.get('name', pkg_name)}</h3>
<p class="plugin-desc">{info.get('description', '暂无描述')}</p>
<h3>{safe_name}</h3>
<p class="plugin-desc">{safe_desc}</p>
<div class="plugin-meta">
<span>版本:{info.get('version', '未知')}</span>
<span>作者:{info.get('author', '未知')}</span>
<span>版本:{safe_version}</span>
<span>作者:{safe_author}</span>
</div>
<div class="plugin-actions">
{action_btn}

View File

@@ -213,16 +213,48 @@ class PLInjector:
return False
def _static_source_check(self, source: str, file_path: str):
"""静态源码安全检查"""
"""静态源码安全检查 - 增强版,防止字符串拼接/编码绕过"""
import base64
# 首先检查是否有 base64 编码的恶意代码
try:
# 查找所有字符串字面量
string_pattern = r'([A-Za-z0-9+/=]{20,})'
for match in re.finditer(string_pattern, source):
try:
decoded = base64.b64decode(match.group(1)).decode('utf-8', errors='ignore')
# 检查解码后的内容
for dangerous in ['import ', 'exec(', 'eval(', 'compile(', 'os.', 'sys.', 'subprocess']:
if dangerous in decoded:
raise PLValidationError(f"{file_path} - 检测到 base64 编码的恶意代码")
except:
pass
except:
pass
# 检查字符串拼接绕过 (如 'ex' + 'ec')
concat_patterns = [
r"""['"]ex['"]\s*\+\s*['"]ec['"]""",
r"""['"]impor['"]\s*\+\s*['"]t['"]""",
r"""['"]eva['"]\s*\+\s*['"]l['"]""",
r"""['"]compil['"]\s*\+\s*['"]e['"]""",
]
for pattern in concat_patterns:
if re.search(pattern, source):
raise PLValidationError(f"{file_path} - 检测到字符串拼接绕过尝试")
forbidden = [
(r'^\s*import\s+(os|sys|subprocess|shutil|socket|ctypes|cffi|multiprocessing|threading)', '禁止导入系统级模块'),
(r'^\s*from\s+(os|sys|subprocess|shutil|socket|ctypes|cffi|multiprocessing|threading)\s+import', '禁止导入系统级模块'),
(r'__import__\s*\(', '禁止使用 __import__'),
(r'exec\s*\(', '禁止使用 exec'),
(r'eval\s*\(', '禁止使用 eval'),
(r'compile\s*\(', '禁止使用 compile'),
(r'open\s*\(', '禁止直接操作文件'),
(r'(?<![a-zA-Z_])exec\s*\(', '禁止使用 exec'),
(r'(?<![a-zA-Z_])eval\s*\(', '禁止使用 eval'),
(r'(?<![a-zA-Z_])compile\s*\(', '禁止使用 compile'),
(r'(?<![a-zA-Z_])open\s*\(', '禁止直接操作文件'),
(r'__builtins__', '禁止访问 __builtins__'),
(r'getattr\s*\(\s*__builtins__', '禁止通过 getattr 访问 __builtins__'),
(r'setattr\s*\(', '禁止使用 setattr'),
(r'type\s*\(\s*\(\s*[\'"]', '禁止使用 type 动态创建类'),
]
for line_num, line in enumerate(source.split('\n'), 1):
stripped = line.strip()
@@ -379,7 +411,8 @@ class PluginManager:
with open(rf, "r", encoding="utf-8") as f: return f.read()
def _load_config(self, plugin_dir: Path) -> dict:
"""加载插件配置文件"""
"""加载插件配置文件 - 使用 ast.literal_eval 安全解析"""
import ast
cf = plugin_dir / "config.py"
if not cf.exists():
return {}
@@ -396,43 +429,86 @@ class PluginManager:
Log.error("plugin-loader", f"配置文件编码错误:{cf} - {e}")
return {}
# 安全检查
for p in ['import ', 'open(', 'exec(', 'eval(', 'os.', 'sys.', 'subprocess']:
# 严格检查:不允许任何代码执行
for p in ['import ', 'from ', 'open(', 'exec(', 'eval(', 'compile(', 'os.', 'sys.', 'subprocess', 'lambda', 'def ', 'class ']:
if p in content:
Log.warn("plugin-loader", f"{cf} 包含危险代码:{p}")
return {}
sg = {"__builtins__": {"True": True, "False": False, "None": None, "dict": dict, "list": list, "str": str, "int": int, "float": float, "bool": bool}}
lv = {}
# 尝试使用 ast.literal_eval 安全解析
try:
code = compile(content, str(cf), "exec")
exec(code, sg, lv)
except SyntaxError as e:
Log.error("plugin-loader", f"配置文件语法错误:{cf} - {e}")
return {}
except NameError as e:
Log.error("plugin-loader", f"配置文件名称错误:{cf} - {e}")
return {}
except TypeError as e:
Log.error("plugin-loader", f"配置文件类型错误:{cf} - {e}")
return {}
except Exception as e:
Log.error("plugin-loader", f"配置文件解析失败:{cf} - {type(e).__name__}: {e}")
return {}
result = ast.literal_eval(content)
if isinstance(result, dict):
return {k: v for k, v in result.items() if not k.startswith("_")}
except (ValueError, SyntaxError):
pass
# 如果失败,尝试提取简单的键值对
config = {}
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
match = re.match(r'^([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(.+)$', line)
if match:
key, value_str = match.groups()
if key.startswith('_'):
continue
try:
value = ast.literal_eval(value_str)
config[key] = value
except (ValueError, SyntaxError):
Log.warn("plugin-loader", f"{cf} 跳过无效的值:{line}")
continue
return config
return {k: v for k, v in lv.items() if not k.startswith("_") and not callable(v)}
def _load_extensions(self, plugin_dir: Path) -> dict:
"""加载插件扩展配置 - 使用 ast.literal_eval 安全解析"""
import ast
ef = plugin_dir / "extensions.py"
if not ef.exists(): return {}
with open(ef, "r", encoding="utf-8") as f: content = f.read()
for p in ['import ', 'open(', 'exec(', 'eval(', 'os.', 'sys.', 'subprocess']:
if p in content: Log.warn("plugin-loader", f"{ef} 包含危险代码: {p}"); return {}
sg = {"__builtins__": {"True": True, "False": False, "None": None, "dict": dict, "list": list, "str": str, "int": int, "float": float, "bool": bool}}
lv = {}
try: code = compile(content, str(ef), "exec"); exec(code, sg, lv)
except Exception as e: Log.error("plugin-loader", f"扩展文件解析失败: {e}"); return {}
return {k: v for k, v in lv.items() if not k.startswith("_") and not callable(v)}
if not ef.exists():
return {}
try:
with open(ef, "r", encoding="utf-8") as f:
content = f.read()
except Exception as e:
Log.error("plugin-loader", f"扩展文件读取失败:{e}")
return {}
# 严格检查:不允许任何代码执行
for p in ['import ', 'from ', 'open(', 'exec(', 'eval(', 'compile(', 'os.', 'sys.', 'subprocess', 'lambda', 'def ', 'class ']:
if p in content:
Log.warn("plugin-loader", f"{ef} 包含危险代码:{p}")
return {}
# 尝试使用 ast.literal_eval 安全解析
try:
result = ast.literal_eval(content)
if isinstance(result, dict):
return {k: v for k, v in result.items() if not k.startswith("_")}
except (ValueError, SyntaxError):
pass
# 如果失败,尝试提取简单的键值对
extensions = {}
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#'):
continue
match = re.match(r'^([a-zA-Z_][a-zA-Z0-9_]*)\s*=\s*(.+)$', line)
if match:
key, value_str = match.groups()
if key.startswith('_'):
continue
try:
value = ast.literal_eval(value_str)
extensions[key] = value
except (ValueError, SyntaxError):
Log.warn("plugin-loader", f"{ef} 跳过无效的值:{line}")
continue
return extensions
def load(self, plugin_dir: Path, use_sandbox: bool = True) -> Optional[Any]:
"""加载单个插件"""