"""REPL 交互终端 - 基于 Python cmd 模块""" import cmd import shlex import sys import readline import os from pathlib import Path from oss import __version__ HISTORY_FILE = str(Path.home() / ".nebula_repl_history") class NebulaShell(cmd.Cmd): """NebulaShell REPL 交互终端""" def __init__(self, plugin_mgr): super().__init__() self.plugin_mgr = plugin_mgr self.prompt = "\033[1;36mNebula>\033[0m " # 青色提示符 self.intro = ( f"\033[1;33mNebulaShell Core v{__version__}\033[0m\n" "输入 \033[1;32mhelp\033[0m 查看命令列表 | 输入 \033[1;31mexit\033[0m 退出" ) # 加载历史记录 self._load_history() def _load_history(self): """加载命令历史记录""" try: readline.read_history_file(HISTORY_FILE) except (FileNotFoundError, OSError) as e: print(f"[REPL] 文件操作失败: {e}") readline.set_history_length(500) def _save_history(self): """保存命令历史记录""" try: readline.write_history_file(HISTORY_FILE) except OSError as e: print(f"[REPL] 系统错误: {e}") def _get_plugins(self): """获取所有已加载的插件列表""" if not self.plugin_mgr: return [] return list(self.plugin_mgr.plugins.keys()) def _get_injected_functions(self): """获取所有 PL 注入的功能""" if not self.plugin_mgr: return {} return self.plugin_mgr.pl_injector.get_registry_info() # ── 命令:plugins ── def do_plugins(self, arg): """列出所有已加载的插件""" plugins = self._get_plugins() if not plugins: print("\033[1;33m没有已加载的插件\033[0m") return print(f"\033[1;36m已加载插件 ({len(plugins)}):\033[0m") for name in plugins: info = self.plugin_mgr.get_info(name) if info: status = "" if self.plugin_mgr.fallback_manager.is_degraded(name): status = " \033[1;31m[降级]\033[0m" print(f" \033[1;32m{name}\033[0m v{info.version} - {info.description}{status}") else: print(f" \033[1;32m{name}\033[0m") # ── 命令:pl ── def do_pl(self, arg): """列出所有 PL 注入的功能""" registry = self._get_injected_functions() if not registry: print("\033[1;33m没有 PL 注入功能\033[0m") return print(f"\033[1;36mPL 注入功能 ({len(registry)}):\033[0m") for name, info in registry.items(): descs = [d for d in info["descriptions"] if d] desc_str = f" - {descs[0]}" if descs else "" print(f" \033[1;32m{name}\033[0m (来自 {', '.join(info['plugins'])}){desc_str}") # ── 命令:call ── def do_call(self, arg): """调用 PL 注入功能: call [args...]""" if not arg: print("\033[1;33m用法: call [args...]\033[0m") return parts = shlex.split(arg) name = parts[0] args = parts[1:] funcs = self.plugin_mgr.pl_injector.get_injected_functions(name) if not funcs: print(f"\033[1;31m未找到功能: {name}\033[0m") return for func in funcs: try: result = func(*args) if result is not None: print(result) except Exception as e: print(f"\033[1;31m执行失败: {e}\033[0m") # ── 命令:status ── def do_status(self, arg): """显示 Core 状态""" if not self.plugin_mgr: print("\033[1;31mCore 未就绪\033[0m") return status = self.plugin_mgr.get_status() print(f"\033[1;36mCore 状态:\033[0m") print(f" 插件总数: {status['plugins']['total']}") if status['plugins']['degraded']: print(f" 降级插件: \033[1;31m{', '.join(status['plugins']['degraded'])}\033[0m") print(f" HTTP 服务: {'\033[1;32m运行中\033[0m' if status['http_server'] else '\033[1;31m未启动\033[0m'}") print(f" 防篡改监控: {'\033[1;32m运行中\033[0m' if status['tamper_monitor'] else '\033[1;31m未启动\033[0m'}") print(f" 审计日志: {status['audit_logs']} 条") print(f" 篡改告警: {status['tamper_alerts']} 条") print(f" 数据目录: {status['data_store']}") # ── 命令:audit ── def do_audit(self, arg): """查看审计日志: audit [plugin_name]""" if not self.plugin_mgr: return logs = self.plugin_mgr.get_audit_logs(plugin_name=arg if arg else None, limit=20) if not logs: print("\033[1;33m无审计日志\033[0m") return print(f"\033[1;36m审计日志 ({len(logs)} 条):\033[0m") for log in reversed(logs): t = log["time"] print(f" [{log['plugin']}] {log['action']} - {log['detail']}") # ── 命令:alerts ── def do_alerts(self, arg): """查看防篡改告警""" if not self.plugin_mgr: return alerts = self.plugin_mgr.get_tamper_alerts() if not alerts: print("\033[1;32m无防篡改告警\033[0m") return print(f"\033[1;31m防篡改告警 ({len(alerts)}):\033[0m") for alert in alerts: print(f" [{alert['plugin']}] {alert['message']}") # ── 命令:recover ── def do_recover(self, arg): """恢复降级插件: recover """ if not arg: print("\033[1;33m用法: recover \033[0m") return if self.plugin_mgr.recover_plugin(arg): print(f"\033[1;32m插件 '{arg}' 已恢复\033[0m") else: print(f"\033[1;31m插件 '{arg}' 恢复失败(可能未处于降级状态)\033[0m") # ── 命令:exit ── def do_exit(self, arg): """退出 REPL""" self._save_history() print("\033[1;33m再见!\033[0m") return True def do_EOF(self, arg): """Ctrl+D 退出""" return self.do_exit(arg) def default(self, line): """未知命令""" print(f"\033[1;31m未知命令: {line}\033[0m 输入 \033[1;32mhelp\033[0m 查看命令列表") def emptyline(self): """空行不重复执行上一条命令""" pass