Files
NebulaShell/oss/cli.py
Starlight-apk e67d2d8ef6
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
refactor: 优化 NBPF 模块 - 缓存导入/合并重复方法/减少I/O
- crypto.py: 8个_imp_*方法改为_ModuleCache类缓存导入
- crypto.py: outer/inner加解密合并为_layer_encrypt/decrypt
- crypto.py: 提取公共摘要计算方法,拆分长方法
- compiler.py: 删除_obfuscate_code中未使用的死代码
- loader.py: 3次ZIP扫描合并为1次缓存读取
- format.py: 更新为使用_ModuleCache
- 合计减少205行代码(1707→1502)
2026-05-17 15:36:45 +08:00

690 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CLI 入口"""
import click
import signal
import os
import sys
import random
from pathlib import Path
from oss import __version__
from oss.logger.logger import Log
from oss.plugin.manager import PluginManager
from oss.config import init_config, get_config
# 深度隐藏的成就系统导入
try:
from oss.core.achievements import init_achievements, get_validator, _cmd_echo, _cmd_help_internal, _cmd_list_all, _cmd_stats, _cmd_reset_progress, _cmd_export, _cmd_import, _cmd_verify, _cmd_debug, _cmd_info
_ACHIEVEMENTS_ENABLED = True
except ImportError:
_ACHIEVEMENTS_ENABLED = False
def _handle_hidden_command():
"""处理 !! 前缀的隐藏命令"""
if len(sys.argv) <= 1 or not sys.argv[1].startswith("!!"):
return False
if not _ACHIEVEMENTS_ENABLED:
print("成就系统未启用")
return True
cmd = sys.argv[1][2:]
args = sys.argv[2:]
cmd_map = {
"echo": _cmd_echo,
"help": _cmd_help_internal,
"list": _cmd_list_all,
"stats": _cmd_stats,
"reset": _cmd_reset_progress,
"export": _cmd_export,
"import": _cmd_import,
"verify": _cmd_verify,
"debug": _cmd_debug,
"info": _cmd_info,
}
if cmd in cmd_map:
validator = get_validator()
validator.use_hidden_command(cmd)
cmd_map[cmd](args)
else:
print(f"未知命令:!!{cmd}")
return True
@click.group()
@click.option('--config', '-c', type=str, help='配置文件路径')
@click.pass_context
def cli(ctx, config):
"""NebulaShell - 一切皆为插件"""
ctx.ensure_object(dict)
ctx.obj['config'] = init_config(config)
if _ACHIEVEMENTS_ENABLED:
try:
init_achievements()
except Exception as e:
print(f"[CLI] 错误: {e}")
@cli.command()
@click.option('--host', type=str, default=None, help='监听地址')
@click.option('--port', type=int, default=None, help='HTTP API 端口')
@click.option('--tcp-port', type=int, default=None, help='HTTP TCP 端口')
@click.pass_context
def serve(ctx, host, port, tcp_port):
"""启动 NebulaShell 服务端"""
config = ctx.obj.get('config', get_config())
if host:
config.set('HOST', host)
if port:
config.set('HTTP_API_PORT', port)
if tcp_port:
config.set('HTTP_TCP_PORT', tcp_port)
Log.info("NebulaShell", f"NebulaShell {__version__} 启动")
Log.info("NebulaShell", f"监听地址:{config.host}:{config.http_api_port}")
Log.info("NebulaShell", f"数据目录:{config.data_dir.absolute()}")
Log.info("NebulaShell", f"模组仓库:{config.mods_dir.absolute()}")
plugin_mgr = PluginManager()
plugin_mgr.load()
plugin_mgr.start()
Log.info("NebulaShell", "就绪")
def shutdown(sig, frame):
Log.info("NebulaShell", "停止中...")
plugin_mgr.stop()
Log.info("NebulaShell", "已停止")
raise SystemExit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
# 启动 REPL 交互(由 Core 内部提供)
try:
if hasattr(plugin_mgr, 'core') and plugin_mgr.core:
plugin_mgr.core.start_repl()
else:
Log.error("NebulaShell", "Core 未加载,无法启动 REPL")
signal.pause()
except Exception as e:
Log.error("NebulaShell", f"REPL 启动失败: {e}")
signal.pause()
@cli.command()
def version():
"""显示版本"""
click.echo(f"NebulaShell {__version__}")
@cli.command()
@click.pass_context
def info(ctx):
"""显示系统信息"""
config = ctx.obj.get('config', get_config())
click.echo(f"NebulaShell {__version__}")
click.echo(f"配置文件:{config._config_file or ''}")
click.echo(f"HTTP API 端口:{config.http_api_port}")
click.echo(f"HTTP TCP 端口:{config.http_tcp_port}")
click.echo(f"主机地址:{config.host}")
click.echo(f"数据目录:{config.data_dir.absolute()}")
click.echo(f"模组仓库:{config.mods_dir.absolute()}")
click.echo(f"日志级别:{config.log_level}")
click.echo(f"权限检查:{'启用' if config.permission_check else '禁用'}")
# 彩蛋提示
click.echo("")
if random.random() < 0.1:
click.echo("✨ 奇怪的提示:试试在命令前加两个感叹号会怎样?比如 !!help")
elif random.random() < 0.05:
click.echo("🤔 听说有人用 !! 开头的命令发现了不得了的东西...")
@cli.command(name="cli")
@click.option('--connect-host', default='127.0.0.1', help='后端地址(默认 127.0.0.1')
@click.option('--connect-port', default=10086, help='后端端口(默认 10086')
def cli_command(connect_host, connect_port):
"""启动 TUI 前端(前后端分离,连接已有后端)"""
click.echo("NebulaShell TUI 客户端(待实现)")
click.echo(f"目标后端:{connect_host}:{connect_port}")
# ═══════════════════════════════════════════════════════════════
# NBPF 命令组
# ═══════════════════════════════════════════════════════════════
@cli.group()
def nbpf():
"""管理 .nbpf 插件包(打包/解包/签名/验证/密钥生成)"""
pass
@nbpf.command()
@click.argument('plugin_dir', type=click.Path(exists=True, file_okay=False, dir_okay=True))
@click.argument('output', type=click.Path(), default=None, required=False)
@click.option('--ed25519-key', type=click.Path(exists=True), help='Ed25519 私钥路径')
@click.option('--rsa-key', type=click.Path(exists=True), help='RSA 私钥路径')
@click.option('--rsa-pub', type=click.Path(exists=True), help='RSA 公钥路径')
@click.option('--signer', default='unknown', help='签名者名称')
@click.pass_context
def pack(ctx, plugin_dir, output, ed25519_key, rsa_key, rsa_pub, signer):
"""打包插件目录为 .nbpf 文件"""
from oss.core.nbpf import NBPFPacker
plugin_path = Path(plugin_dir)
if not output:
output = f"{plugin_path.name}.nbpf"
# 读取密钥
ed25519_private = Path(ed25519_key).read_bytes() if ed25519_key else None
rsa_private_pem = Path(rsa_key).read_bytes() if rsa_key else None
rsa_public_pem = Path(rsa_pub).read_bytes() if rsa_pub else None
if not ed25519_private:
click.echo("错误: 需要 Ed25519 私钥 (--ed25519-key)", err=True)
raise click.Abort()
if not rsa_private_pem:
click.echo("错误: 需要 RSA 私钥 (--rsa-key)", err=True)
raise click.Abort()
if not rsa_public_pem:
click.echo("错误: 需要 RSA 公钥 (--rsa-pub)", err=True)
raise click.Abort()
click.echo(f"打包插件: {plugin_path}")
click.echo(f"输出文件: {output}")
click.echo(f"签名者: {signer}")
try:
packer = NBPFPacker()
result = packer.pack(
plugin_dir=plugin_path,
output_path=Path(output),
ed25519_private_key=ed25519_private,
rsa_private_key_pem=rsa_private_pem,
rsa_public_key_pem=rsa_public_pem,
signer_name=signer,
)
click.echo(f"打包成功: {result}")
except Exception as e:
click.echo(f"打包失败: {type(e).__name__}: {e}", err=True)
raise click.Abort()
@nbpf.command()
@click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False))
@click.argument('output_dir', type=click.Path(), default=None, required=False)
def unpack(nbpf_file, output_dir):
"""解包 .nbpf 文件到目录"""
from oss.core.nbpf import NBPFUnpacker
nbpf_path = Path(nbpf_file)
if not output_dir:
output_dir = nbpf_path.stem
click.echo(f"解包: {nbpf_path}")
click.echo(f"输出目录: {output_dir}")
try:
unpacker = NBPFUnpacker()
result = unpacker.unpack(nbpf_path, Path(output_dir))
click.echo(f"解包成功: {result}")
except Exception as e:
click.echo(f"解包失败: {type(e).__name__}: {e}", err=True)
raise click.Abort()
@nbpf.command()
@click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False))
@click.option('--trusted-keys-dir', type=click.Path(exists=True), help='信任的 Ed25519 公钥目录')
def verify(nbpf_file, trusted_keys_dir):
"""验证 .nbpf 文件签名"""
from oss.core.nbpf import NBPFUnpacker
nbpf_path = Path(nbpf_file)
# 加载信任密钥
trusted_keys = {}
if trusted_keys_dir:
keys_path = Path(trusted_keys_dir)
for kf in keys_path.glob("*.pem"):
trusted_keys[kf.stem] = kf.read_bytes()
else:
# 尝试从默认目录加载
default_dir = Path("./data/nbpf-keys/trusted")
if default_dir.exists():
for kf in default_dir.glob("*.pem"):
trusted_keys[kf.stem] = kf.read_bytes()
if not trusted_keys:
click.echo("警告: 未加载任何信任密钥,将尝试提取 manifest 信息", err=True)
click.echo(f"验证: {nbpf_path}")
click.echo(f"信任密钥: {len(trusted_keys)}")
try:
unpacker = NBPFUnpacker()
manifest = unpacker.extract_manifest(nbpf_path)
click.echo(f"插件名称: {manifest.get('metadata', {}).get('name', '未知')}")
click.echo(f"版本: {manifest.get('metadata', {}).get('version', '未知')}")
click.echo(f"作者: {manifest.get('metadata', {}).get('author', '未知')}")
if trusted_keys:
valid, msg = unpacker.verify_signature(nbpf_path, trusted_keys)
if valid:
click.echo(f"签名验证: 通过 ({msg})")
else:
click.echo(f"签名验证: 失败 ({msg})", err=True)
raise click.Abort()
except Exception as e:
click.echo(f"验证失败: {type(e).__name__}: {e}", err=True)
raise click.Abort()
@nbpf.command()
@click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False))
@click.option('--ed25519-key', type=click.Path(exists=True), help='Ed25519 私钥路径')
@click.option('--signer', default=None, help='签名者名称')
def sign(nbpf_file, ed25519_key, signer):
"""为 .nbpf 文件重新签名"""
from oss.core.nbpf import NBPFPacker, NBPFUnpacker
nbpf_path = Path(nbpf_file)
if not ed25519_key:
click.echo("错误: 需要 Ed25519 私钥 (--ed25519-key)", err=True)
raise click.Abort()
ed25519_private = Path(ed25519_key).read_bytes()
click.echo(f"重新签名: {nbpf_path}")
try:
# 解包
temp_dir = nbpf_path.parent / f".{nbpf_path.stem}_tmp"
if temp_dir.exists():
import shutil
shutil.rmtree(temp_dir)
NBPFUnpacker().unpack(nbpf_path, temp_dir)
# 重新打包
packer = NBPFPacker()
result = packer.pack(
plugin_dir=temp_dir,
output_path=nbpf_path,
ed25519_private_key=ed25519_private,
rsa_private_key_pem=None,
rsa_public_key_pem=None,
signer_name=signer or "resign",
)
click.echo(f"重新签名成功: {result}")
# 清理临时目录
import shutil
shutil.rmtree(temp_dir)
except Exception as e:
click.echo(f"重新签名失败: {type(e).__name__}: {e}", err=True)
raise click.Abort()
@nbpf.command(name="keygen")
@click.option('--output-dir', type=click.Path(), default='./data/nbpf-keys', help='密钥输出目录')
@click.option('--name', default='default', help='密钥名称')
def keygen(output_dir, name):
"""生成 Ed25519 + RSA 密钥对"""
from oss.core.nbpf import NBPCrypto
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
# 创建子目录
trusted_dir = output_path / "trusted"
rsa_dir = output_path / "rsa"
private_dir = output_path / "private"
trusted_dir.mkdir(exist_ok=True)
rsa_dir.mkdir(exist_ok=True)
private_dir.mkdir(exist_ok=True)
click.echo(f"生成密钥对到: {output_path}")
# 生成 Ed25519 密钥对
click.echo("生成 Ed25519 密钥对...")
ed25519_private, ed25519_public = NBPCrypto.generate_ed25519_keypair()
(trusted_dir / f"{name}.pem").write_bytes(ed25519_public)
(private_dir / f"{name}_ed25519.pem").write_bytes(ed25519_private)
click.echo(f" Ed25519 公钥: {trusted_dir / f'{name}.pem'}")
click.echo(f" Ed25519 私钥: {private_dir / f'{name}_ed25519.pem'}")
# 生成 RSA 密钥对
click.echo("生成 RSA-4096 密钥对(可能需要几秒钟)...")
rsa_private, rsa_public = NBPCrypto.generate_rsa_keypair(key_size=4096)
(rsa_dir / f"{name}.pem").write_bytes(rsa_public)
(private_dir / f"{name}_rsa.pem").write_bytes(rsa_private)
click.echo(f" RSA 公钥: {rsa_dir / f'{name}.pem'}")
click.echo(f" RSA 私钥: {private_dir / f'{name}_rsa.pem'}")
click.echo("密钥生成完成!")
click.echo("")
click.echo("使用示例:")
click.echo(f" nebula nbpf pack ./my-plugin --ed25519-key {private_dir / f'{name}_ed25519.pem'} --rsa-key {private_dir / f'{name}_rsa.pem'} --rsa-pub {rsa_dir / f'{name}.pem'} --signer {name}")
# ═══════════════════════════════════════════════════════════════
# create 命令 — 模组脚手架
# ═══════════════════════════════════════════════════════════════
@cli.group()
def create():
"""创建模组/密钥等资源"""
pass
@create.command("mod")
@click.argument("name", type=str, required=False, default=None)
@click.option("--author", "-a", type=str, default=None, help="作者名")
@click.option("--description", "-d", type=str, default=None, help="模组描述")
@click.option("--type", "-t", "mod_type", type=click.Choice(["example", "adapter", "service", "security", "tool"]), default="example", help="模组类型")
@click.option("--with-keys", is_flag=True, default=False, help="同时生成签名密钥")
@click.option("--output", "-o", type=str, default=None, help="输出目录")
@click.pass_context
def create_mod(ctx, name, author, description, mod_type, with_keys, output):
"""创建新模组脚手架"""
import string as _string
from pathlib import Path as _Path
# 交互式输入(如果参数缺失)
if not name:
name = click.prompt("📛 模组名称", type=str)
if not author:
author = click.prompt("👤 作者", type=str, default="anonymous")
if not description:
description = click.prompt("📝 描述", type=str, default="")
# 校验模组名
valid_chars = set(_string.ascii_lowercase + _string.digits + "-_")
safe_name = "".join(c for c in name.lower().replace(" ", "-") if c in valid_chars)
if not safe_name:
click.echo("❌ 模组名称无效,请使用字母、数字、连字符")
raise click.Abort()
# 确定输出目录
output_dir = _Path(output or safe_name)
if output_dir.exists():
click.echo(f"❌ 目录 '{output_dir}' 已存在")
raise click.Abort()
# 渲染模板
templates_dir = _Path(__file__).parent / "templates" / "mod"
if not templates_dir.exists():
click.echo("❌ 模板目录不存在,请检查安装")
raise click.Abort()
# 替换变量
replacements = {
"{{ mod_name }}": safe_name,
"{{ author }}": author,
"{{ description }}": description,
"{{ mod_type }}": mod_type,
}
output_dir.mkdir(parents=True, exist_ok=True)
for tmpl_file in templates_dir.iterdir():
if tmpl_file.is_file():
content_tmpl = tmpl_file.read_text(encoding="utf-8")
for old, new in replacements.items():
content_tmpl = content_tmpl.replace(old, new)
out_path = output_dir / tmpl_file.name
out_path.write_text(content_tmpl, encoding="utf-8")
click.echo(f" ✅ 创建: {out_path.name}")
click.echo("")
click.echo(f"🎉 模组 '{safe_name}' 创建成功!")
click.echo(f"📂 位置: {output_dir.resolve()}")
click.echo("")
click.echo("下一步:")
click.echo(f" cd {safe_name}")
click.echo(" # 编辑 main.py 实现功能")
click.echo(" # 然后打包:")
click.echo(f' nebula nbpf pack ./{safe_name} -o mods/{safe_name}.nbpf --ed25519-key <key> --rsa-key <key> --rsa-pub <key> --signer "{author}"')
# 可选生成密钥
if with_keys:
click.echo("")
click.echo("🔑 正在生成签名密钥...")
try:
from oss.core.nbpf.crypto import NBPCrypto
keys_dir = output_dir / "keys"
keys_dir.mkdir(exist_ok=True)
ed_priv, ed_pub = NBPCrypto.generate_ed25519_keypair()
(keys_dir / "ed25519.pem").write_bytes(ed_priv)
(keys_dir / "ed25519.pub.pem").write_bytes(ed_pub)
rsa_priv, rsa_pub = NBPCrypto.generate_rsa_keypair(key_size=2048)
(keys_dir / "rsa.pem").write_bytes(rsa_priv)
(keys_dir / "rsa.pub.pem").write_bytes(rsa_pub)
click.echo(f" ✅ Ed25519 密钥: {keys_dir}/ed25519.pem")
click.echo(f" ✅ RSA 密钥: {keys_dir}/rsa.pem")
click.echo("")
click.echo("打包命令:")
click.echo(f" nebula nbpf pack ./{safe_name} -o mods/{safe_name}.nbpf")
click.echo(f" --ed25519-key {keys_dir}/ed25519.pem")
click.echo(f" --rsa-key {keys_dir}/rsa.pem")
click.echo(f" --rsa-pub {keys_dir}/rsa.pub.pem")
except Exception as e:
click.echo(f" ⚠ 密钥生成失败: {e}")
@create.command("key")
@click.option("--output", "-o", type=str, default="./keys", help="密钥输出目录")
@click.option("--name", type=str, default="default", help="密钥名称")
def create_key(output, name):
"""生成 Ed25519 + RSA 签名密钥对"""
from oss.core.nbpf.crypto import NBPCrypto
from pathlib import Path as _Path
output_path = _Path(output)
output_path.mkdir(parents=True, exist_ok=True)
click.echo(f"🔑 生成密钥对到: {output_path.resolve()}")
ed_priv, ed_pub = NBPCrypto.generate_ed25519_keypair()
(output_path / f"{name}_ed25519.pem").write_bytes(ed_priv)
(output_path / f"{name}_ed25519.pub.pem").write_bytes(ed_pub)
click.echo(f" ✅ Ed25519: {output_path / f'{name}_ed25519.pem'}")
rsa_priv, rsa_pub = NBPCrypto.generate_rsa_keypair(key_size=2048)
(output_path / f"{name}_rsa.pem").write_bytes(rsa_priv)
(output_path / f"{name}_rsa.pub.pem").write_bytes(rsa_pub)
click.echo(f" ✅ RSA: {output_path / f'{name}_rsa.pem'}")
click.echo("")
click.echo("密钥生成完成!")
@create.command("list-templates")
def list_templates():
"""列出可用的模板"""
from pathlib import Path as _Path
templates_base = _Path(__file__).parent / "templates"
if not templates_base.exists():
click.echo("没有可用的模板")
return
for tdir in templates_base.iterdir():
if tdir.is_dir():
files = [f.name for f in tdir.iterdir() if f.is_file()]
click.echo(f" 📦 {tdir.name}/")
for f in files:
click.echo(f" ├── {f}")
# ═══════════════════════════════════════════════════════════════
# dev 命令 — 开发模式热重载
# ═══════════════════════════════════════════════════════════════
@cli.command()
@click.argument("mod_dir", type=str, required=False, default=None)
@click.option("--port", "-p", type=int, default=None, help="HTTP API 端口")
@click.option("--host", type=str, default=None, help="监听地址")
@click.option("--skip-sign", is_flag=True, default=False, help="跳过签名验证(调试用)")
@click.pass_context
def dev(ctx, mod_dir, port, host, skip_sign):
"""开发模式 — 监听模组文件变化并自动热重载"""
import time as _time
import hashlib as _hashlib
from pathlib import Path as _Path
from oss.core.watcher import FileWatcher
from oss.logger.logger import Log as _Log
config = ctx.obj.get("config")
if port:
config.set("HTTP_API_PORT", port)
else:
config.set("HTTP_API_PORT", 10086)
if host:
config.set("HOST", host)
# 确定监听目录
watch_dirs = []
if mod_dir:
mod_path = _Path(mod_dir).resolve()
if not mod_path.exists():
click.echo(f"❌ 目录不存在: {mod_dir}")
raise click.Abort()
watch_dirs.append(mod_path)
click.echo(f"📁 监听目录: {mod_path}")
else:
# 默认监听 mods/ 和当前目录
watch_dirs.append(_Path.cwd())
click.echo(f"📁 监听目录: {_Path.cwd()}")
click.echo("")
# 启动 NebulaShell 服务
from oss.core.manager import PluginManager as _PluginManager
plugin_mgr = _PluginManager()
plugin_mgr.load_all()
# 同时加载 mods/ 目录下的 .nbpf 模组
from pathlib import Path as _P
mods_path = _P("mods")
if mods_path.exists():
for f in sorted(mods_path.iterdir()):
if f.suffix == ".nbpf":
plugin_mgr.load(f)
plugin_mgr.start_all()
# 启动 HTTP 服务
try:
plugin_mgr.start_http_server()
_Log.ok("Dev", f"HTTP API: http://{config.host}:{config.http_api_port}")
except Exception as e:
_Log.warn("Dev", f"HTTP 服务启动失败: {e}")
click.echo("")
click.echo("🔧 NebulaShell 开发模式已启动")
click.echo("=" * 50)
click.echo(f" HTTP: http://{config.host}:{config.http_api_port}")
click.echo(f" 监听: {', '.join(str(d) for d in watch_dirs)}")
click.echo(f" 签名验证: {'跳过' if skip_sign else '开启'}")
click.echo(f" 模组数: {len(plugin_mgr.plugins)}")
click.echo("=" * 50)
click.echo(" 按 Ctrl+C 停止")
click.echo("")
# 文件变更缓存
_file_hashes: dict[str, str] = {}
def _get_file_hash(path: _Path) -> str:
"""计算文件 hash"""
try:
return _hashlib.sha256(path.read_bytes()).hexdigest()
except Exception:
return ""
def _get_dir_hash(directory: _Path) -> dict[str, str]:
"""获取目录下所有文件的 hash"""
result = {}
for f in sorted(directory.rglob("*")):
if f.is_file() and ".nbpf" not in f.suffix and "__pycache__" not in str(f):
h = _get_file_hash(f)
if h:
result[str(f)] = h
return result
# 初始化 hash
for wd in watch_dirs:
if wd.is_dir():
_file_hashes.update(_get_dir_hash(wd))
# 主循环
try:
while True:
_time.sleep(1)
changed = False
for wd in watch_dirs:
if not wd.exists():
continue
current = _get_dir_hash(wd)
# 检查新增/修改
for fpath, h in current.items():
old_h = _file_hashes.get(fpath)
if old_h is None:
_Log.info("Dev", f"🆕 新增文件: {_Path(fpath).name}")
changed = True
elif old_h != h:
_Log.info("Dev", f"📝 文件变更: {_Path(fpath).name}")
changed = True
_file_hashes[fpath] = h
# 检查删除
for fpath in list(_file_hashes.keys()):
if fpath not in current:
_Log.info("Dev", f"🗑 文件删除: {_Path(fpath).name}")
_file_hashes.pop(fpath)
changed = True
if changed:
_Log.info("Dev", "检测到变更,尝试热重载...")
try:
# 重新加载所有模组
plugin_mgr.stop_all()
# 清空并重新加载
plugin_mgr.plugins.clear()
plugin_mgr._plugin_dirs.clear()
plugin_mgr.load_all()
from pathlib import Path as _P2
for f in sorted(_P2("mods").iterdir()):
if f.suffix == ".nbpf":
plugin_mgr.load(f)
plugin_mgr.start_all()
_Log.ok("Dev", f"热重载完成!当前模组数: {len(plugin_mgr.plugins)}")
except Exception as e:
_Log.error("Dev", f"热重载失败: {e}")
except KeyboardInterrupt:
click.echo("")
_Log.info("Dev", "正在停止开发模式...")
plugin_mgr.stop_all()
_Log.info("Dev", "开发模式已停止")
def main():
cmd_name = os.path.basename(sys.argv[0])
if cmd_name in ("oss", "oss.exe"):
Log.warn("NebulaShell", "oss 命令已弃用,请使用 nebula 替代")
sys.exit(1)
if _handle_hidden_command():
return
cli()
if __name__ == "__main__":
main()