- crypto.py: 8个_imp_*方法改为_ModuleCache类缓存导入 - crypto.py: outer/inner加解密合并为_layer_encrypt/decrypt - crypto.py: 提取公共摘要计算方法,拆分长方法 - compiler.py: 删除_obfuscate_code中未使用的死代码 - loader.py: 3次ZIP扫描合并为1次缓存读取 - format.py: 更新为使用_ModuleCache - 合计减少205行代码(1707→1502)
690 lines
25 KiB
Python
690 lines
25 KiB
Python
"""CLI 入口"""
|
||
import click
|
||
import signal
|
||
import os
|
||
import sys
|
||
import random
|
||
from pathlib import Path
|
||
|
||
from oss import __version__
|
||
from oss.logger.logger import Log
|
||
from oss.plugin.manager import PluginManager
|
||
from oss.config import init_config, get_config
|
||
|
||
|
||
# 深度隐藏的成就系统导入
|
||
try:
|
||
from oss.core.achievements import init_achievements, get_validator, _cmd_echo, _cmd_help_internal, _cmd_list_all, _cmd_stats, _cmd_reset_progress, _cmd_export, _cmd_import, _cmd_verify, _cmd_debug, _cmd_info
|
||
_ACHIEVEMENTS_ENABLED = True
|
||
except ImportError:
|
||
_ACHIEVEMENTS_ENABLED = False
|
||
|
||
|
||
def _handle_hidden_command():
|
||
"""处理 !! 前缀的隐藏命令"""
|
||
if len(sys.argv) <= 1 or not sys.argv[1].startswith("!!"):
|
||
return False
|
||
if not _ACHIEVEMENTS_ENABLED:
|
||
print("成就系统未启用")
|
||
return True
|
||
|
||
cmd = sys.argv[1][2:]
|
||
args = sys.argv[2:]
|
||
|
||
cmd_map = {
|
||
"echo": _cmd_echo,
|
||
"help": _cmd_help_internal,
|
||
"list": _cmd_list_all,
|
||
"stats": _cmd_stats,
|
||
"reset": _cmd_reset_progress,
|
||
"export": _cmd_export,
|
||
"import": _cmd_import,
|
||
"verify": _cmd_verify,
|
||
"debug": _cmd_debug,
|
||
"info": _cmd_info,
|
||
}
|
||
|
||
if cmd in cmd_map:
|
||
validator = get_validator()
|
||
validator.use_hidden_command(cmd)
|
||
cmd_map[cmd](args)
|
||
else:
|
||
print(f"未知命令:!!{cmd}")
|
||
return True
|
||
|
||
|
||
@click.group()
|
||
@click.option('--config', '-c', type=str, help='配置文件路径')
|
||
@click.pass_context
|
||
def cli(ctx, config):
|
||
"""NebulaShell - 一切皆为插件"""
|
||
ctx.ensure_object(dict)
|
||
ctx.obj['config'] = init_config(config)
|
||
|
||
if _ACHIEVEMENTS_ENABLED:
|
||
try:
|
||
init_achievements()
|
||
except Exception as e:
|
||
print(f"[CLI] 错误: {e}")
|
||
|
||
|
||
@cli.command()
|
||
@click.option('--host', type=str, default=None, help='监听地址')
|
||
@click.option('--port', type=int, default=None, help='HTTP API 端口')
|
||
@click.option('--tcp-port', type=int, default=None, help='HTTP TCP 端口')
|
||
@click.pass_context
|
||
def serve(ctx, host, port, tcp_port):
|
||
"""启动 NebulaShell 服务端"""
|
||
config = ctx.obj.get('config', get_config())
|
||
|
||
if host:
|
||
config.set('HOST', host)
|
||
if port:
|
||
config.set('HTTP_API_PORT', port)
|
||
if tcp_port:
|
||
config.set('HTTP_TCP_PORT', tcp_port)
|
||
|
||
Log.info("NebulaShell", f"NebulaShell {__version__} 启动")
|
||
Log.info("NebulaShell", f"监听地址:{config.host}:{config.http_api_port}")
|
||
Log.info("NebulaShell", f"数据目录:{config.data_dir.absolute()}")
|
||
Log.info("NebulaShell", f"模组仓库:{config.mods_dir.absolute()}")
|
||
|
||
plugin_mgr = PluginManager()
|
||
plugin_mgr.load()
|
||
plugin_mgr.start()
|
||
|
||
Log.info("NebulaShell", "就绪")
|
||
|
||
def shutdown(sig, frame):
|
||
Log.info("NebulaShell", "停止中...")
|
||
plugin_mgr.stop()
|
||
Log.info("NebulaShell", "已停止")
|
||
raise SystemExit(0)
|
||
|
||
signal.signal(signal.SIGINT, shutdown)
|
||
signal.signal(signal.SIGTERM, shutdown)
|
||
|
||
# 启动 REPL 交互(由 Core 内部提供)
|
||
try:
|
||
if hasattr(plugin_mgr, 'core') and plugin_mgr.core:
|
||
plugin_mgr.core.start_repl()
|
||
else:
|
||
Log.error("NebulaShell", "Core 未加载,无法启动 REPL")
|
||
signal.pause()
|
||
except Exception as e:
|
||
Log.error("NebulaShell", f"REPL 启动失败: {e}")
|
||
signal.pause()
|
||
|
||
|
||
@cli.command()
|
||
def version():
|
||
"""显示版本"""
|
||
click.echo(f"NebulaShell {__version__}")
|
||
|
||
|
||
@cli.command()
|
||
@click.pass_context
|
||
def info(ctx):
|
||
"""显示系统信息"""
|
||
config = ctx.obj.get('config', get_config())
|
||
click.echo(f"NebulaShell {__version__}")
|
||
click.echo(f"配置文件:{config._config_file or '无'}")
|
||
click.echo(f"HTTP API 端口:{config.http_api_port}")
|
||
click.echo(f"HTTP TCP 端口:{config.http_tcp_port}")
|
||
click.echo(f"主机地址:{config.host}")
|
||
click.echo(f"数据目录:{config.data_dir.absolute()}")
|
||
click.echo(f"模组仓库:{config.mods_dir.absolute()}")
|
||
click.echo(f"日志级别:{config.log_level}")
|
||
click.echo(f"权限检查:{'启用' if config.permission_check else '禁用'}")
|
||
|
||
# 彩蛋提示
|
||
click.echo("")
|
||
if random.random() < 0.1:
|
||
click.echo("✨ 奇怪的提示:试试在命令前加两个感叹号会怎样?比如 !!help")
|
||
elif random.random() < 0.05:
|
||
click.echo("🤔 听说有人用 !! 开头的命令发现了不得了的东西...")
|
||
|
||
|
||
@cli.command(name="cli")
|
||
@click.option('--connect-host', default='127.0.0.1', help='后端地址(默认 127.0.0.1)')
|
||
@click.option('--connect-port', default=10086, help='后端端口(默认 10086)')
|
||
def cli_command(connect_host, connect_port):
|
||
"""启动 TUI 前端(前后端分离,连接已有后端)"""
|
||
click.echo("NebulaShell TUI 客户端(待实现)")
|
||
click.echo(f"目标后端:{connect_host}:{connect_port}")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# NBPF 命令组
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@cli.group()
|
||
def nbpf():
|
||
"""管理 .nbpf 插件包(打包/解包/签名/验证/密钥生成)"""
|
||
pass
|
||
|
||
|
||
@nbpf.command()
|
||
@click.argument('plugin_dir', type=click.Path(exists=True, file_okay=False, dir_okay=True))
|
||
@click.argument('output', type=click.Path(), default=None, required=False)
|
||
@click.option('--ed25519-key', type=click.Path(exists=True), help='Ed25519 私钥路径')
|
||
@click.option('--rsa-key', type=click.Path(exists=True), help='RSA 私钥路径')
|
||
@click.option('--rsa-pub', type=click.Path(exists=True), help='RSA 公钥路径')
|
||
@click.option('--signer', default='unknown', help='签名者名称')
|
||
@click.pass_context
|
||
def pack(ctx, plugin_dir, output, ed25519_key, rsa_key, rsa_pub, signer):
|
||
"""打包插件目录为 .nbpf 文件"""
|
||
from oss.core.nbpf import NBPFPacker
|
||
|
||
plugin_path = Path(plugin_dir)
|
||
if not output:
|
||
output = f"{plugin_path.name}.nbpf"
|
||
|
||
# 读取密钥
|
||
ed25519_private = Path(ed25519_key).read_bytes() if ed25519_key else None
|
||
rsa_private_pem = Path(rsa_key).read_bytes() if rsa_key else None
|
||
rsa_public_pem = Path(rsa_pub).read_bytes() if rsa_pub else None
|
||
|
||
if not ed25519_private:
|
||
click.echo("错误: 需要 Ed25519 私钥 (--ed25519-key)", err=True)
|
||
raise click.Abort()
|
||
if not rsa_private_pem:
|
||
click.echo("错误: 需要 RSA 私钥 (--rsa-key)", err=True)
|
||
raise click.Abort()
|
||
if not rsa_public_pem:
|
||
click.echo("错误: 需要 RSA 公钥 (--rsa-pub)", err=True)
|
||
raise click.Abort()
|
||
|
||
click.echo(f"打包插件: {plugin_path}")
|
||
click.echo(f"输出文件: {output}")
|
||
click.echo(f"签名者: {signer}")
|
||
|
||
try:
|
||
packer = NBPFPacker()
|
||
result = packer.pack(
|
||
plugin_dir=plugin_path,
|
||
output_path=Path(output),
|
||
ed25519_private_key=ed25519_private,
|
||
rsa_private_key_pem=rsa_private_pem,
|
||
rsa_public_key_pem=rsa_public_pem,
|
||
signer_name=signer,
|
||
)
|
||
click.echo(f"打包成功: {result}")
|
||
except Exception as e:
|
||
click.echo(f"打包失败: {type(e).__name__}: {e}", err=True)
|
||
raise click.Abort()
|
||
|
||
|
||
@nbpf.command()
|
||
@click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False))
|
||
@click.argument('output_dir', type=click.Path(), default=None, required=False)
|
||
def unpack(nbpf_file, output_dir):
|
||
"""解包 .nbpf 文件到目录"""
|
||
from oss.core.nbpf import NBPFUnpacker
|
||
|
||
nbpf_path = Path(nbpf_file)
|
||
if not output_dir:
|
||
output_dir = nbpf_path.stem
|
||
|
||
click.echo(f"解包: {nbpf_path}")
|
||
click.echo(f"输出目录: {output_dir}")
|
||
|
||
try:
|
||
unpacker = NBPFUnpacker()
|
||
result = unpacker.unpack(nbpf_path, Path(output_dir))
|
||
click.echo(f"解包成功: {result}")
|
||
except Exception as e:
|
||
click.echo(f"解包失败: {type(e).__name__}: {e}", err=True)
|
||
raise click.Abort()
|
||
|
||
|
||
@nbpf.command()
|
||
@click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False))
|
||
@click.option('--trusted-keys-dir', type=click.Path(exists=True), help='信任的 Ed25519 公钥目录')
|
||
def verify(nbpf_file, trusted_keys_dir):
|
||
"""验证 .nbpf 文件签名"""
|
||
from oss.core.nbpf import NBPFUnpacker
|
||
|
||
nbpf_path = Path(nbpf_file)
|
||
|
||
# 加载信任密钥
|
||
trusted_keys = {}
|
||
if trusted_keys_dir:
|
||
keys_path = Path(trusted_keys_dir)
|
||
for kf in keys_path.glob("*.pem"):
|
||
trusted_keys[kf.stem] = kf.read_bytes()
|
||
else:
|
||
# 尝试从默认目录加载
|
||
default_dir = Path("./data/nbpf-keys/trusted")
|
||
if default_dir.exists():
|
||
for kf in default_dir.glob("*.pem"):
|
||
trusted_keys[kf.stem] = kf.read_bytes()
|
||
|
||
if not trusted_keys:
|
||
click.echo("警告: 未加载任何信任密钥,将尝试提取 manifest 信息", err=True)
|
||
|
||
click.echo(f"验证: {nbpf_path}")
|
||
click.echo(f"信任密钥: {len(trusted_keys)} 个")
|
||
|
||
try:
|
||
unpacker = NBPFUnpacker()
|
||
manifest = unpacker.extract_manifest(nbpf_path)
|
||
click.echo(f"插件名称: {manifest.get('metadata', {}).get('name', '未知')}")
|
||
click.echo(f"版本: {manifest.get('metadata', {}).get('version', '未知')}")
|
||
click.echo(f"作者: {manifest.get('metadata', {}).get('author', '未知')}")
|
||
|
||
if trusted_keys:
|
||
valid, msg = unpacker.verify_signature(nbpf_path, trusted_keys)
|
||
if valid:
|
||
click.echo(f"签名验证: 通过 ({msg})")
|
||
else:
|
||
click.echo(f"签名验证: 失败 ({msg})", err=True)
|
||
raise click.Abort()
|
||
except Exception as e:
|
||
click.echo(f"验证失败: {type(e).__name__}: {e}", err=True)
|
||
raise click.Abort()
|
||
|
||
|
||
@nbpf.command()
|
||
@click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False))
|
||
@click.option('--ed25519-key', type=click.Path(exists=True), help='Ed25519 私钥路径')
|
||
@click.option('--signer', default=None, help='签名者名称')
|
||
def sign(nbpf_file, ed25519_key, signer):
|
||
"""为 .nbpf 文件重新签名"""
|
||
from oss.core.nbpf import NBPFPacker, NBPFUnpacker
|
||
|
||
nbpf_path = Path(nbpf_file)
|
||
|
||
if not ed25519_key:
|
||
click.echo("错误: 需要 Ed25519 私钥 (--ed25519-key)", err=True)
|
||
raise click.Abort()
|
||
|
||
ed25519_private = Path(ed25519_key).read_bytes()
|
||
|
||
click.echo(f"重新签名: {nbpf_path}")
|
||
|
||
try:
|
||
# 解包
|
||
temp_dir = nbpf_path.parent / f".{nbpf_path.stem}_tmp"
|
||
if temp_dir.exists():
|
||
import shutil
|
||
shutil.rmtree(temp_dir)
|
||
NBPFUnpacker().unpack(nbpf_path, temp_dir)
|
||
|
||
# 重新打包
|
||
packer = NBPFPacker()
|
||
result = packer.pack(
|
||
plugin_dir=temp_dir,
|
||
output_path=nbpf_path,
|
||
ed25519_private_key=ed25519_private,
|
||
rsa_private_key_pem=None,
|
||
rsa_public_key_pem=None,
|
||
signer_name=signer or "resign",
|
||
)
|
||
click.echo(f"重新签名成功: {result}")
|
||
|
||
# 清理临时目录
|
||
import shutil
|
||
shutil.rmtree(temp_dir)
|
||
except Exception as e:
|
||
click.echo(f"重新签名失败: {type(e).__name__}: {e}", err=True)
|
||
raise click.Abort()
|
||
|
||
|
||
@nbpf.command(name="keygen")
|
||
@click.option('--output-dir', type=click.Path(), default='./data/nbpf-keys', help='密钥输出目录')
|
||
@click.option('--name', default='default', help='密钥名称')
|
||
def keygen(output_dir, name):
|
||
"""生成 Ed25519 + RSA 密钥对"""
|
||
from oss.core.nbpf import NBPCrypto
|
||
|
||
output_path = Path(output_dir)
|
||
output_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 创建子目录
|
||
trusted_dir = output_path / "trusted"
|
||
rsa_dir = output_path / "rsa"
|
||
private_dir = output_path / "private"
|
||
trusted_dir.mkdir(exist_ok=True)
|
||
rsa_dir.mkdir(exist_ok=True)
|
||
private_dir.mkdir(exist_ok=True)
|
||
|
||
click.echo(f"生成密钥对到: {output_path}")
|
||
|
||
# 生成 Ed25519 密钥对
|
||
click.echo("生成 Ed25519 密钥对...")
|
||
ed25519_private, ed25519_public = NBPCrypto.generate_ed25519_keypair()
|
||
(trusted_dir / f"{name}.pem").write_bytes(ed25519_public)
|
||
(private_dir / f"{name}_ed25519.pem").write_bytes(ed25519_private)
|
||
click.echo(f" Ed25519 公钥: {trusted_dir / f'{name}.pem'}")
|
||
click.echo(f" Ed25519 私钥: {private_dir / f'{name}_ed25519.pem'}")
|
||
|
||
# 生成 RSA 密钥对
|
||
click.echo("生成 RSA-4096 密钥对(可能需要几秒钟)...")
|
||
rsa_private, rsa_public = NBPCrypto.generate_rsa_keypair(key_size=4096)
|
||
(rsa_dir / f"{name}.pem").write_bytes(rsa_public)
|
||
(private_dir / f"{name}_rsa.pem").write_bytes(rsa_private)
|
||
click.echo(f" RSA 公钥: {rsa_dir / f'{name}.pem'}")
|
||
click.echo(f" RSA 私钥: {private_dir / f'{name}_rsa.pem'}")
|
||
|
||
click.echo("密钥生成完成!")
|
||
click.echo("")
|
||
click.echo("使用示例:")
|
||
click.echo(f" nebula nbpf pack ./my-plugin --ed25519-key {private_dir / f'{name}_ed25519.pem'} --rsa-key {private_dir / f'{name}_rsa.pem'} --rsa-pub {rsa_dir / f'{name}.pem'} --signer {name}")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# create 命令 — 模组脚手架
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@cli.group()
|
||
def create():
|
||
"""创建模组/密钥等资源"""
|
||
pass
|
||
|
||
|
||
@create.command("mod")
|
||
@click.argument("name", type=str, required=False, default=None)
|
||
@click.option("--author", "-a", type=str, default=None, help="作者名")
|
||
@click.option("--description", "-d", type=str, default=None, help="模组描述")
|
||
@click.option("--type", "-t", "mod_type", type=click.Choice(["example", "adapter", "service", "security", "tool"]), default="example", help="模组类型")
|
||
@click.option("--with-keys", is_flag=True, default=False, help="同时生成签名密钥")
|
||
@click.option("--output", "-o", type=str, default=None, help="输出目录")
|
||
@click.pass_context
|
||
def create_mod(ctx, name, author, description, mod_type, with_keys, output):
|
||
"""创建新模组脚手架"""
|
||
import string as _string
|
||
from pathlib import Path as _Path
|
||
|
||
# 交互式输入(如果参数缺失)
|
||
if not name:
|
||
name = click.prompt("📛 模组名称", type=str)
|
||
if not author:
|
||
author = click.prompt("👤 作者", type=str, default="anonymous")
|
||
if not description:
|
||
description = click.prompt("📝 描述", type=str, default="")
|
||
|
||
# 校验模组名
|
||
valid_chars = set(_string.ascii_lowercase + _string.digits + "-_")
|
||
safe_name = "".join(c for c in name.lower().replace(" ", "-") if c in valid_chars)
|
||
if not safe_name:
|
||
click.echo("❌ 模组名称无效,请使用字母、数字、连字符")
|
||
raise click.Abort()
|
||
|
||
# 确定输出目录
|
||
output_dir = _Path(output or safe_name)
|
||
if output_dir.exists():
|
||
click.echo(f"❌ 目录 '{output_dir}' 已存在")
|
||
raise click.Abort()
|
||
|
||
# 渲染模板
|
||
templates_dir = _Path(__file__).parent / "templates" / "mod"
|
||
if not templates_dir.exists():
|
||
click.echo("❌ 模板目录不存在,请检查安装")
|
||
raise click.Abort()
|
||
|
||
# 替换变量
|
||
replacements = {
|
||
"{{ mod_name }}": safe_name,
|
||
"{{ author }}": author,
|
||
"{{ description }}": description,
|
||
"{{ mod_type }}": mod_type,
|
||
}
|
||
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
for tmpl_file in templates_dir.iterdir():
|
||
if tmpl_file.is_file():
|
||
content_tmpl = tmpl_file.read_text(encoding="utf-8")
|
||
for old, new in replacements.items():
|
||
content_tmpl = content_tmpl.replace(old, new)
|
||
out_path = output_dir / tmpl_file.name
|
||
out_path.write_text(content_tmpl, encoding="utf-8")
|
||
click.echo(f" ✅ 创建: {out_path.name}")
|
||
|
||
click.echo("")
|
||
click.echo(f"🎉 模组 '{safe_name}' 创建成功!")
|
||
click.echo(f"📂 位置: {output_dir.resolve()}")
|
||
click.echo("")
|
||
click.echo("下一步:")
|
||
click.echo(f" cd {safe_name}")
|
||
click.echo(" # 编辑 main.py 实现功能")
|
||
click.echo(" # 然后打包:")
|
||
click.echo(f' nebula nbpf pack ./{safe_name} -o mods/{safe_name}.nbpf --ed25519-key <key> --rsa-key <key> --rsa-pub <key> --signer "{author}"')
|
||
|
||
# 可选生成密钥
|
||
if with_keys:
|
||
click.echo("")
|
||
click.echo("🔑 正在生成签名密钥...")
|
||
try:
|
||
from oss.core.nbpf.crypto import NBPCrypto
|
||
keys_dir = output_dir / "keys"
|
||
keys_dir.mkdir(exist_ok=True)
|
||
|
||
ed_priv, ed_pub = NBPCrypto.generate_ed25519_keypair()
|
||
(keys_dir / "ed25519.pem").write_bytes(ed_priv)
|
||
(keys_dir / "ed25519.pub.pem").write_bytes(ed_pub)
|
||
|
||
rsa_priv, rsa_pub = NBPCrypto.generate_rsa_keypair(key_size=2048)
|
||
(keys_dir / "rsa.pem").write_bytes(rsa_priv)
|
||
(keys_dir / "rsa.pub.pem").write_bytes(rsa_pub)
|
||
|
||
click.echo(f" ✅ Ed25519 密钥: {keys_dir}/ed25519.pem")
|
||
click.echo(f" ✅ RSA 密钥: {keys_dir}/rsa.pem")
|
||
click.echo("")
|
||
click.echo("打包命令:")
|
||
click.echo(f" nebula nbpf pack ./{safe_name} -o mods/{safe_name}.nbpf")
|
||
click.echo(f" --ed25519-key {keys_dir}/ed25519.pem")
|
||
click.echo(f" --rsa-key {keys_dir}/rsa.pem")
|
||
click.echo(f" --rsa-pub {keys_dir}/rsa.pub.pem")
|
||
except Exception as e:
|
||
click.echo(f" ⚠ 密钥生成失败: {e}")
|
||
|
||
|
||
@create.command("key")
|
||
@click.option("--output", "-o", type=str, default="./keys", help="密钥输出目录")
|
||
@click.option("--name", type=str, default="default", help="密钥名称")
|
||
def create_key(output, name):
|
||
"""生成 Ed25519 + RSA 签名密钥对"""
|
||
from oss.core.nbpf.crypto import NBPCrypto
|
||
from pathlib import Path as _Path
|
||
|
||
output_path = _Path(output)
|
||
output_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
click.echo(f"🔑 生成密钥对到: {output_path.resolve()}")
|
||
|
||
ed_priv, ed_pub = NBPCrypto.generate_ed25519_keypair()
|
||
(output_path / f"{name}_ed25519.pem").write_bytes(ed_priv)
|
||
(output_path / f"{name}_ed25519.pub.pem").write_bytes(ed_pub)
|
||
click.echo(f" ✅ Ed25519: {output_path / f'{name}_ed25519.pem'}")
|
||
|
||
rsa_priv, rsa_pub = NBPCrypto.generate_rsa_keypair(key_size=2048)
|
||
(output_path / f"{name}_rsa.pem").write_bytes(rsa_priv)
|
||
(output_path / f"{name}_rsa.pub.pem").write_bytes(rsa_pub)
|
||
click.echo(f" ✅ RSA: {output_path / f'{name}_rsa.pem'}")
|
||
|
||
click.echo("")
|
||
click.echo("密钥生成完成!")
|
||
|
||
|
||
@create.command("list-templates")
|
||
def list_templates():
|
||
"""列出可用的模板"""
|
||
from pathlib import Path as _Path
|
||
templates_base = _Path(__file__).parent / "templates"
|
||
if not templates_base.exists():
|
||
click.echo("没有可用的模板")
|
||
return
|
||
for tdir in templates_base.iterdir():
|
||
if tdir.is_dir():
|
||
files = [f.name for f in tdir.iterdir() if f.is_file()]
|
||
click.echo(f" 📦 {tdir.name}/")
|
||
for f in files:
|
||
click.echo(f" ├── {f}")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# dev 命令 — 开发模式热重载
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@cli.command()
|
||
@click.argument("mod_dir", type=str, required=False, default=None)
|
||
@click.option("--port", "-p", type=int, default=None, help="HTTP API 端口")
|
||
@click.option("--host", type=str, default=None, help="监听地址")
|
||
@click.option("--skip-sign", is_flag=True, default=False, help="跳过签名验证(调试用)")
|
||
@click.pass_context
|
||
def dev(ctx, mod_dir, port, host, skip_sign):
|
||
"""开发模式 — 监听模组文件变化并自动热重载"""
|
||
import time as _time
|
||
import hashlib as _hashlib
|
||
from pathlib import Path as _Path
|
||
from oss.core.watcher import FileWatcher
|
||
from oss.logger.logger import Log as _Log
|
||
|
||
config = ctx.obj.get("config")
|
||
if port:
|
||
config.set("HTTP_API_PORT", port)
|
||
else:
|
||
config.set("HTTP_API_PORT", 10086)
|
||
if host:
|
||
config.set("HOST", host)
|
||
|
||
# 确定监听目录
|
||
watch_dirs = []
|
||
if mod_dir:
|
||
mod_path = _Path(mod_dir).resolve()
|
||
if not mod_path.exists():
|
||
click.echo(f"❌ 目录不存在: {mod_dir}")
|
||
raise click.Abort()
|
||
watch_dirs.append(mod_path)
|
||
click.echo(f"📁 监听目录: {mod_path}")
|
||
else:
|
||
# 默认监听 mods/ 和当前目录
|
||
watch_dirs.append(_Path.cwd())
|
||
click.echo(f"📁 监听目录: {_Path.cwd()}")
|
||
click.echo("")
|
||
|
||
# 启动 NebulaShell 服务
|
||
from oss.core.manager import PluginManager as _PluginManager
|
||
|
||
plugin_mgr = _PluginManager()
|
||
plugin_mgr.load_all()
|
||
# 同时加载 mods/ 目录下的 .nbpf 模组
|
||
from pathlib import Path as _P
|
||
mods_path = _P("mods")
|
||
if mods_path.exists():
|
||
for f in sorted(mods_path.iterdir()):
|
||
if f.suffix == ".nbpf":
|
||
plugin_mgr.load(f)
|
||
plugin_mgr.start_all()
|
||
|
||
# 启动 HTTP 服务
|
||
try:
|
||
plugin_mgr.start_http_server()
|
||
_Log.ok("Dev", f"HTTP API: http://{config.host}:{config.http_api_port}")
|
||
except Exception as e:
|
||
_Log.warn("Dev", f"HTTP 服务启动失败: {e}")
|
||
|
||
click.echo("")
|
||
click.echo("🔧 NebulaShell 开发模式已启动")
|
||
click.echo("=" * 50)
|
||
click.echo(f" HTTP: http://{config.host}:{config.http_api_port}")
|
||
click.echo(f" 监听: {', '.join(str(d) for d in watch_dirs)}")
|
||
click.echo(f" 签名验证: {'跳过' if skip_sign else '开启'}")
|
||
click.echo(f" 模组数: {len(plugin_mgr.plugins)}")
|
||
click.echo("=" * 50)
|
||
click.echo(" 按 Ctrl+C 停止")
|
||
click.echo("")
|
||
|
||
# 文件变更缓存
|
||
_file_hashes: dict[str, str] = {}
|
||
|
||
def _get_file_hash(path: _Path) -> str:
|
||
"""计算文件 hash"""
|
||
try:
|
||
return _hashlib.sha256(path.read_bytes()).hexdigest()
|
||
except Exception:
|
||
return ""
|
||
|
||
def _get_dir_hash(directory: _Path) -> dict[str, str]:
|
||
"""获取目录下所有文件的 hash"""
|
||
result = {}
|
||
for f in sorted(directory.rglob("*")):
|
||
if f.is_file() and ".nbpf" not in f.suffix and "__pycache__" not in str(f):
|
||
h = _get_file_hash(f)
|
||
if h:
|
||
result[str(f)] = h
|
||
return result
|
||
|
||
# 初始化 hash
|
||
for wd in watch_dirs:
|
||
if wd.is_dir():
|
||
_file_hashes.update(_get_dir_hash(wd))
|
||
|
||
# 主循环
|
||
try:
|
||
while True:
|
||
_time.sleep(1)
|
||
changed = False
|
||
|
||
for wd in watch_dirs:
|
||
if not wd.exists():
|
||
continue
|
||
current = _get_dir_hash(wd)
|
||
# 检查新增/修改
|
||
for fpath, h in current.items():
|
||
old_h = _file_hashes.get(fpath)
|
||
if old_h is None:
|
||
_Log.info("Dev", f"🆕 新增文件: {_Path(fpath).name}")
|
||
changed = True
|
||
elif old_h != h:
|
||
_Log.info("Dev", f"📝 文件变更: {_Path(fpath).name}")
|
||
changed = True
|
||
_file_hashes[fpath] = h
|
||
# 检查删除
|
||
for fpath in list(_file_hashes.keys()):
|
||
if fpath not in current:
|
||
_Log.info("Dev", f"🗑 文件删除: {_Path(fpath).name}")
|
||
_file_hashes.pop(fpath)
|
||
changed = True
|
||
|
||
if changed:
|
||
_Log.info("Dev", "检测到变更,尝试热重载...")
|
||
try:
|
||
# 重新加载所有模组
|
||
plugin_mgr.stop_all()
|
||
# 清空并重新加载
|
||
plugin_mgr.plugins.clear()
|
||
plugin_mgr._plugin_dirs.clear()
|
||
plugin_mgr.load_all()
|
||
from pathlib import Path as _P2
|
||
for f in sorted(_P2("mods").iterdir()):
|
||
if f.suffix == ".nbpf":
|
||
plugin_mgr.load(f)
|
||
plugin_mgr.start_all()
|
||
_Log.ok("Dev", f"热重载完成!当前模组数: {len(plugin_mgr.plugins)}")
|
||
except Exception as e:
|
||
_Log.error("Dev", f"热重载失败: {e}")
|
||
except KeyboardInterrupt:
|
||
click.echo("")
|
||
_Log.info("Dev", "正在停止开发模式...")
|
||
plugin_mgr.stop_all()
|
||
_Log.info("Dev", "开发模式已停止")
|
||
|
||
|
||
def main():
|
||
cmd_name = os.path.basename(sys.argv[0])
|
||
if cmd_name in ("oss", "oss.exe"):
|
||
Log.warn("NebulaShell", "oss 命令已弃用,请使用 nebula 替代")
|
||
sys.exit(1)
|
||
|
||
if _handle_hidden_command():
|
||
return
|
||
|
||
cli()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|