"""CLI 入口""" import click import signal import os import sys import random from pathlib import Path from oss import __version__ from oss.logger.logger import Log from oss.plugin.manager import PluginManager from oss.config import init_config, get_config # 深度隐藏的成就系统导入 try: from oss.core.achievements import init_achievements, get_validator, _cmd_echo, _cmd_help_internal, _cmd_list_all, _cmd_stats, _cmd_reset_progress, _cmd_export, _cmd_import, _cmd_verify, _cmd_debug, _cmd_info _ACHIEVEMENTS_ENABLED = True except ImportError: _ACHIEVEMENTS_ENABLED = False def _handle_hidden_command(): """处理 !! 前缀的隐藏命令""" if len(sys.argv) <= 1 or not sys.argv[1].startswith("!!"): return False if not _ACHIEVEMENTS_ENABLED: print("成就系统未启用") return True cmd = sys.argv[1][2:] args = sys.argv[2:] cmd_map = { "echo": _cmd_echo, "help": _cmd_help_internal, "list": _cmd_list_all, "stats": _cmd_stats, "reset": _cmd_reset_progress, "export": _cmd_export, "import": _cmd_import, "verify": _cmd_verify, "debug": _cmd_debug, "info": _cmd_info, } if cmd in cmd_map: validator = get_validator() validator.use_hidden_command(cmd) cmd_map[cmd](args) else: print(f"未知命令:!!{cmd}") return True @click.group() @click.option('--config', '-c', type=str, help='配置文件路径') @click.pass_context def cli(ctx, config): """NebulaShell - 一切皆为插件""" ctx.ensure_object(dict) ctx.obj['config'] = init_config(config) if _ACHIEVEMENTS_ENABLED: try: init_achievements() except Exception as e: print(f"[CLI] 错误: {e}") @cli.command() @click.option('--host', type=str, default=None, help='监听地址') @click.option('--port', type=int, default=None, help='HTTP API 端口') @click.option('--tcp-port', type=int, default=None, help='HTTP TCP 端口') @click.pass_context def serve(ctx, host, port, tcp_port): """启动 NebulaShell 服务端""" config = ctx.obj.get('config', get_config()) if host: config.set('HOST', host) if port: config.set('HTTP_API_PORT', port) if tcp_port: config.set('HTTP_TCP_PORT', tcp_port) Log.info("NebulaShell", f"NebulaShell {__version__} 启动") Log.info("NebulaShell", f"监听地址:{config.host}:{config.http_api_port}") Log.info("NebulaShell", f"数据目录:{config.data_dir.absolute()}") Log.info("NebulaShell", f"模组仓库:{config.mods_dir.absolute()}") plugin_mgr = PluginManager() plugin_mgr.load() plugin_mgr.start() Log.info("NebulaShell", "就绪") def shutdown(sig, frame): Log.info("NebulaShell", "停止中...") plugin_mgr.stop() Log.info("NebulaShell", "已停止") raise SystemExit(0) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) # 启动 REPL 交互(由 Core 内部提供) try: if hasattr(plugin_mgr, 'core') and plugin_mgr.core: plugin_mgr.core.start_repl() else: Log.error("NebulaShell", "Core 未加载,无法启动 REPL") signal.pause() except Exception as e: Log.error("NebulaShell", f"REPL 启动失败: {e}") signal.pause() @cli.command() def version(): """显示版本""" click.echo(f"NebulaShell {__version__}") @cli.command() @click.pass_context def info(ctx): """显示系统信息""" config = ctx.obj.get('config', get_config()) click.echo(f"NebulaShell {__version__}") click.echo(f"配置文件:{config._config_file or '无'}") click.echo(f"HTTP API 端口:{config.http_api_port}") click.echo(f"HTTP TCP 端口:{config.http_tcp_port}") click.echo(f"主机地址:{config.host}") click.echo(f"数据目录:{config.data_dir.absolute()}") click.echo(f"模组仓库:{config.mods_dir.absolute()}") click.echo(f"日志级别:{config.log_level}") click.echo(f"权限检查:{'启用' if config.permission_check else '禁用'}") # 彩蛋提示 click.echo("") if random.random() < 0.1: click.echo("✨ 奇怪的提示:试试在命令前加两个感叹号会怎样?比如 !!help") elif random.random() < 0.05: click.echo("🤔 听说有人用 !! 开头的命令发现了不得了的东西...") @cli.command(name="cli") @click.option('--connect-host', default='127.0.0.1', help='后端地址(默认 127.0.0.1)') @click.option('--connect-port', default=10086, help='后端端口(默认 10086)') def cli_command(connect_host, connect_port): """启动 TUI 前端(前后端分离,连接已有后端)""" click.echo("NebulaShell TUI 客户端(待实现)") click.echo(f"目标后端:{connect_host}:{connect_port}") # ═══════════════════════════════════════════════════════════════ # NBPF 命令组 # ═══════════════════════════════════════════════════════════════ @cli.group() def nbpf(): """管理 .nbpf 插件包(打包/解包/签名/验证/密钥生成)""" pass @nbpf.command() @click.argument('plugin_dir', type=click.Path(exists=True, file_okay=False, dir_okay=True)) @click.argument('output', type=click.Path(), default=None, required=False) @click.option('--ed25519-key', type=click.Path(exists=True), help='Ed25519 私钥路径') @click.option('--rsa-key', type=click.Path(exists=True), help='RSA 私钥路径') @click.option('--rsa-pub', type=click.Path(exists=True), help='RSA 公钥路径') @click.option('--signer', default='unknown', help='签名者名称') @click.pass_context def pack(ctx, plugin_dir, output, ed25519_key, rsa_key, rsa_pub, signer): """打包插件目录为 .nbpf 文件""" from oss.core.nbpf import NBPFPacker plugin_path = Path(plugin_dir) if not output: output = f"{plugin_path.name}.nbpf" # 读取密钥 ed25519_private = Path(ed25519_key).read_bytes() if ed25519_key else None rsa_private_pem = Path(rsa_key).read_bytes() if rsa_key else None rsa_public_pem = Path(rsa_pub).read_bytes() if rsa_pub else None if not ed25519_private: click.echo("错误: 需要 Ed25519 私钥 (--ed25519-key)", err=True) raise click.Abort() if not rsa_private_pem: click.echo("错误: 需要 RSA 私钥 (--rsa-key)", err=True) raise click.Abort() if not rsa_public_pem: click.echo("错误: 需要 RSA 公钥 (--rsa-pub)", err=True) raise click.Abort() click.echo(f"打包插件: {plugin_path}") click.echo(f"输出文件: {output}") click.echo(f"签名者: {signer}") try: packer = NBPFPacker() result = packer.pack( plugin_dir=plugin_path, output_path=Path(output), ed25519_private_key=ed25519_private, rsa_private_key_pem=rsa_private_pem, rsa_public_key_pem=rsa_public_pem, signer_name=signer, ) click.echo(f"打包成功: {result}") except Exception as e: click.echo(f"打包失败: {type(e).__name__}: {e}", err=True) raise click.Abort() @nbpf.command() @click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False)) @click.argument('output_dir', type=click.Path(), default=None, required=False) def unpack(nbpf_file, output_dir): """解包 .nbpf 文件到目录""" from oss.core.nbpf import NBPFUnpacker nbpf_path = Path(nbpf_file) if not output_dir: output_dir = nbpf_path.stem click.echo(f"解包: {nbpf_path}") click.echo(f"输出目录: {output_dir}") try: unpacker = NBPFUnpacker() result = unpacker.unpack(nbpf_path, Path(output_dir)) click.echo(f"解包成功: {result}") except Exception as e: click.echo(f"解包失败: {type(e).__name__}: {e}", err=True) raise click.Abort() @nbpf.command() @click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False)) @click.option('--trusted-keys-dir', type=click.Path(exists=True), help='信任的 Ed25519 公钥目录') def verify(nbpf_file, trusted_keys_dir): """验证 .nbpf 文件签名""" from oss.core.nbpf import NBPFUnpacker nbpf_path = Path(nbpf_file) # 加载信任密钥 trusted_keys = {} if trusted_keys_dir: keys_path = Path(trusted_keys_dir) for kf in keys_path.glob("*.pem"): trusted_keys[kf.stem] = kf.read_bytes() else: # 尝试从默认目录加载 default_dir = Path("./data/nbpf-keys/trusted") if default_dir.exists(): for kf in default_dir.glob("*.pem"): trusted_keys[kf.stem] = kf.read_bytes() if not trusted_keys: click.echo("警告: 未加载任何信任密钥,将尝试提取 manifest 信息", err=True) click.echo(f"验证: {nbpf_path}") click.echo(f"信任密钥: {len(trusted_keys)} 个") try: unpacker = NBPFUnpacker() manifest = unpacker.extract_manifest(nbpf_path) click.echo(f"插件名称: {manifest.get('metadata', {}).get('name', '未知')}") click.echo(f"版本: {manifest.get('metadata', {}).get('version', '未知')}") click.echo(f"作者: {manifest.get('metadata', {}).get('author', '未知')}") if trusted_keys: valid, msg = unpacker.verify_signature(nbpf_path, trusted_keys) if valid: click.echo(f"签名验证: 通过 ({msg})") else: click.echo(f"签名验证: 失败 ({msg})", err=True) raise click.Abort() except Exception as e: click.echo(f"验证失败: {type(e).__name__}: {e}", err=True) raise click.Abort() @nbpf.command() @click.argument('nbpf_file', type=click.Path(exists=True, dir_okay=False)) @click.option('--ed25519-key', type=click.Path(exists=True), help='Ed25519 私钥路径') @click.option('--signer', default=None, help='签名者名称') def sign(nbpf_file, ed25519_key, signer): """为 .nbpf 文件重新签名""" from oss.core.nbpf import NBPFPacker, NBPFUnpacker nbpf_path = Path(nbpf_file) if not ed25519_key: click.echo("错误: 需要 Ed25519 私钥 (--ed25519-key)", err=True) raise click.Abort() ed25519_private = Path(ed25519_key).read_bytes() click.echo(f"重新签名: {nbpf_path}") try: # 解包 temp_dir = nbpf_path.parent / f".{nbpf_path.stem}_tmp" if temp_dir.exists(): import shutil shutil.rmtree(temp_dir) NBPFUnpacker().unpack(nbpf_path, temp_dir) # 重新打包 packer = NBPFPacker() result = packer.pack( plugin_dir=temp_dir, output_path=nbpf_path, ed25519_private_key=ed25519_private, rsa_private_key_pem=None, rsa_public_key_pem=None, signer_name=signer or "resign", ) click.echo(f"重新签名成功: {result}") # 清理临时目录 import shutil shutil.rmtree(temp_dir) except Exception as e: click.echo(f"重新签名失败: {type(e).__name__}: {e}", err=True) raise click.Abort() @nbpf.command(name="keygen") @click.option('--output-dir', type=click.Path(), default='./data/nbpf-keys', help='密钥输出目录') @click.option('--name', default='default', help='密钥名称') def keygen(output_dir, name): """生成 Ed25519 + RSA 密钥对""" from oss.core.nbpf import NBPCrypto output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # 创建子目录 trusted_dir = output_path / "trusted" rsa_dir = output_path / "rsa" private_dir = output_path / "private" trusted_dir.mkdir(exist_ok=True) rsa_dir.mkdir(exist_ok=True) private_dir.mkdir(exist_ok=True) click.echo(f"生成密钥对到: {output_path}") # 生成 Ed25519 密钥对 click.echo("生成 Ed25519 密钥对...") ed25519_private, ed25519_public = NBPCrypto.generate_ed25519_keypair() (trusted_dir / f"{name}.pem").write_bytes(ed25519_public) (private_dir / f"{name}_ed25519.pem").write_bytes(ed25519_private) click.echo(f" Ed25519 公钥: {trusted_dir / f'{name}.pem'}") click.echo(f" Ed25519 私钥: {private_dir / f'{name}_ed25519.pem'}") # 生成 RSA 密钥对 click.echo("生成 RSA-4096 密钥对(可能需要几秒钟)...") rsa_private, rsa_public = NBPCrypto.generate_rsa_keypair(key_size=4096) (rsa_dir / f"{name}.pem").write_bytes(rsa_public) (private_dir / f"{name}_rsa.pem").write_bytes(rsa_private) click.echo(f" RSA 公钥: {rsa_dir / f'{name}.pem'}") click.echo(f" RSA 私钥: {private_dir / f'{name}_rsa.pem'}") click.echo("密钥生成完成!") click.echo("") click.echo("使用示例:") click.echo(f" nebula nbpf pack ./my-plugin --ed25519-key {private_dir / f'{name}_ed25519.pem'} --rsa-key {private_dir / f'{name}_rsa.pem'} --rsa-pub {rsa_dir / f'{name}.pem'} --signer {name}") # ═══════════════════════════════════════════════════════════════ # create 命令 — 模组脚手架 # ═══════════════════════════════════════════════════════════════ @cli.group() def create(): """创建模组/密钥等资源""" pass @create.command("mod") @click.argument("name", type=str, required=False, default=None) @click.option("--author", "-a", type=str, default=None, help="作者名") @click.option("--description", "-d", type=str, default=None, help="模组描述") @click.option("--type", "-t", "mod_type", type=click.Choice(["example", "adapter", "service", "security", "tool"]), default="example", help="模组类型") @click.option("--with-keys", is_flag=True, default=False, help="同时生成签名密钥") @click.option("--output", "-o", type=str, default=None, help="输出目录") @click.pass_context def create_mod(ctx, name, author, description, mod_type, with_keys, output): """创建新模组脚手架""" import string as _string from pathlib import Path as _Path # 交互式输入(如果参数缺失) if not name: name = click.prompt("📛 模组名称", type=str) if not author: author = click.prompt("👤 作者", type=str, default="anonymous") if not description: description = click.prompt("📝 描述", type=str, default="") # 校验模组名 valid_chars = set(_string.ascii_lowercase + _string.digits + "-_") safe_name = "".join(c for c in name.lower().replace(" ", "-") if c in valid_chars) if not safe_name: click.echo("❌ 模组名称无效,请使用字母、数字、连字符") raise click.Abort() # 确定输出目录 output_dir = _Path(output or safe_name) if output_dir.exists(): click.echo(f"❌ 目录 '{output_dir}' 已存在") raise click.Abort() # 渲染模板 templates_dir = _Path(__file__).parent / "templates" / "mod" if not templates_dir.exists(): click.echo("❌ 模板目录不存在,请检查安装") raise click.Abort() # 替换变量 replacements = { "{{ mod_name }}": safe_name, "{{ author }}": author, "{{ description }}": description, "{{ mod_type }}": mod_type, } output_dir.mkdir(parents=True, exist_ok=True) for tmpl_file in templates_dir.iterdir(): if tmpl_file.is_file(): content_tmpl = tmpl_file.read_text(encoding="utf-8") for old, new in replacements.items(): content_tmpl = content_tmpl.replace(old, new) out_path = output_dir / tmpl_file.name out_path.write_text(content_tmpl, encoding="utf-8") click.echo(f" ✅ 创建: {out_path.name}") click.echo("") click.echo(f"🎉 模组 '{safe_name}' 创建成功!") click.echo(f"📂 位置: {output_dir.resolve()}") click.echo("") click.echo("下一步:") click.echo(f" cd {safe_name}") click.echo(" # 编辑 main.py 实现功能") click.echo(" # 然后打包:") click.echo(f' nebula nbpf pack ./{safe_name} -o mods/{safe_name}.nbpf --ed25519-key --rsa-key --rsa-pub --signer "{author}"') # 可选生成密钥 if with_keys: click.echo("") click.echo("🔑 正在生成签名密钥...") try: from oss.core.nbpf.crypto import NBPCrypto keys_dir = output_dir / "keys" keys_dir.mkdir(exist_ok=True) ed_priv, ed_pub = NBPCrypto.generate_ed25519_keypair() (keys_dir / "ed25519.pem").write_bytes(ed_priv) (keys_dir / "ed25519.pub.pem").write_bytes(ed_pub) rsa_priv, rsa_pub = NBPCrypto.generate_rsa_keypair(key_size=2048) (keys_dir / "rsa.pem").write_bytes(rsa_priv) (keys_dir / "rsa.pub.pem").write_bytes(rsa_pub) click.echo(f" ✅ Ed25519 密钥: {keys_dir}/ed25519.pem") click.echo(f" ✅ RSA 密钥: {keys_dir}/rsa.pem") click.echo("") click.echo("打包命令:") click.echo(f" nebula nbpf pack ./{safe_name} -o mods/{safe_name}.nbpf") click.echo(f" --ed25519-key {keys_dir}/ed25519.pem") click.echo(f" --rsa-key {keys_dir}/rsa.pem") click.echo(f" --rsa-pub {keys_dir}/rsa.pub.pem") except Exception as e: click.echo(f" ⚠ 密钥生成失败: {e}") @create.command("key") @click.option("--output", "-o", type=str, default="./keys", help="密钥输出目录") @click.option("--name", type=str, default="default", help="密钥名称") def create_key(output, name): """生成 Ed25519 + RSA 签名密钥对""" from oss.core.nbpf.crypto import NBPCrypto from pathlib import Path as _Path output_path = _Path(output) output_path.mkdir(parents=True, exist_ok=True) click.echo(f"🔑 生成密钥对到: {output_path.resolve()}") ed_priv, ed_pub = NBPCrypto.generate_ed25519_keypair() (output_path / f"{name}_ed25519.pem").write_bytes(ed_priv) (output_path / f"{name}_ed25519.pub.pem").write_bytes(ed_pub) click.echo(f" ✅ Ed25519: {output_path / f'{name}_ed25519.pem'}") rsa_priv, rsa_pub = NBPCrypto.generate_rsa_keypair(key_size=2048) (output_path / f"{name}_rsa.pem").write_bytes(rsa_priv) (output_path / f"{name}_rsa.pub.pem").write_bytes(rsa_pub) click.echo(f" ✅ RSA: {output_path / f'{name}_rsa.pem'}") click.echo("") click.echo("密钥生成完成!") @create.command("list-templates") def list_templates(): """列出可用的模板""" from pathlib import Path as _Path templates_base = _Path(__file__).parent / "templates" if not templates_base.exists(): click.echo("没有可用的模板") return for tdir in templates_base.iterdir(): if tdir.is_dir(): files = [f.name for f in tdir.iterdir() if f.is_file()] click.echo(f" 📦 {tdir.name}/") for f in files: click.echo(f" ├── {f}") # ═══════════════════════════════════════════════════════════════ # dev 命令 — 开发模式热重载 # ═══════════════════════════════════════════════════════════════ @cli.command() @click.argument("mod_dir", type=str, required=False, default=None) @click.option("--port", "-p", type=int, default=None, help="HTTP API 端口") @click.option("--host", type=str, default=None, help="监听地址") @click.option("--skip-sign", is_flag=True, default=False, help="跳过签名验证(调试用)") @click.pass_context def dev(ctx, mod_dir, port, host, skip_sign): """开发模式 — 监听模组文件变化并自动热重载""" import time as _time import hashlib as _hashlib from pathlib import Path as _Path from oss.core.watcher import FileWatcher from oss.logger.logger import Log as _Log config = ctx.obj.get("config") if port: config.set("HTTP_API_PORT", port) else: config.set("HTTP_API_PORT", 10086) if host: config.set("HOST", host) # 确定监听目录 watch_dirs = [] if mod_dir: mod_path = _Path(mod_dir).resolve() if not mod_path.exists(): click.echo(f"❌ 目录不存在: {mod_dir}") raise click.Abort() watch_dirs.append(mod_path) click.echo(f"📁 监听目录: {mod_path}") else: # 默认监听 mods/ 和当前目录 watch_dirs.append(_Path.cwd()) click.echo(f"📁 监听目录: {_Path.cwd()}") click.echo("") # 启动 NebulaShell 服务 from oss.core.manager import PluginManager as _PluginManager plugin_mgr = _PluginManager() plugin_mgr.load_all() # 同时加载 mods/ 目录下的 .nbpf 模组 from pathlib import Path as _P mods_path = _P("mods") if mods_path.exists(): for f in sorted(mods_path.iterdir()): if f.suffix == ".nbpf": plugin_mgr.load(f) plugin_mgr.start_all() # 启动 HTTP 服务 try: plugin_mgr.start_http_server() _Log.ok("Dev", f"HTTP API: http://{config.host}:{config.http_api_port}") except Exception as e: _Log.warn("Dev", f"HTTP 服务启动失败: {e}") click.echo("") click.echo("🔧 NebulaShell 开发模式已启动") click.echo("=" * 50) click.echo(f" HTTP: http://{config.host}:{config.http_api_port}") click.echo(f" 监听: {', '.join(str(d) for d in watch_dirs)}") click.echo(f" 签名验证: {'跳过' if skip_sign else '开启'}") click.echo(f" 模组数: {len(plugin_mgr.plugins)}") click.echo("=" * 50) click.echo(" 按 Ctrl+C 停止") click.echo("") # 文件变更缓存 _file_hashes: dict[str, str] = {} def _get_file_hash(path: _Path) -> str: """计算文件 hash""" try: return _hashlib.sha256(path.read_bytes()).hexdigest() except Exception: return "" def _get_dir_hash(directory: _Path) -> dict[str, str]: """获取目录下所有文件的 hash""" result = {} for f in sorted(directory.rglob("*")): if f.is_file() and ".nbpf" not in f.suffix and "__pycache__" not in str(f): h = _get_file_hash(f) if h: result[str(f)] = h return result # 初始化 hash for wd in watch_dirs: if wd.is_dir(): _file_hashes.update(_get_dir_hash(wd)) # 主循环 try: while True: _time.sleep(1) changed = False for wd in watch_dirs: if not wd.exists(): continue current = _get_dir_hash(wd) # 检查新增/修改 for fpath, h in current.items(): old_h = _file_hashes.get(fpath) if old_h is None: _Log.info("Dev", f"🆕 新增文件: {_Path(fpath).name}") changed = True elif old_h != h: _Log.info("Dev", f"📝 文件变更: {_Path(fpath).name}") changed = True _file_hashes[fpath] = h # 检查删除 for fpath in list(_file_hashes.keys()): if fpath not in current: _Log.info("Dev", f"🗑 文件删除: {_Path(fpath).name}") _file_hashes.pop(fpath) changed = True if changed: _Log.info("Dev", "检测到变更,尝试热重载...") try: # 重新加载所有模组 plugin_mgr.stop_all() # 清空并重新加载 plugin_mgr.plugins.clear() plugin_mgr._plugin_dirs.clear() plugin_mgr.load_all() from pathlib import Path as _P2 for f in sorted(_P2("mods").iterdir()): if f.suffix == ".nbpf": plugin_mgr.load(f) plugin_mgr.start_all() _Log.ok("Dev", f"热重载完成!当前模组数: {len(plugin_mgr.plugins)}") except Exception as e: _Log.error("Dev", f"热重载失败: {e}") except KeyboardInterrupt: click.echo("") _Log.info("Dev", "正在停止开发模式...") plugin_mgr.stop_all() _Log.info("Dev", "开发模式已停止") def main(): cmd_name = os.path.basename(sys.argv[0]) if cmd_name in ("oss", "oss.exe"): Log.warn("NebulaShell", "oss 命令已弃用,请使用 nebula 替代") sys.exit(1) if _handle_hidden_command(): return cli() if __name__ == "__main__": main()