""" FutureOSS v1.1.0 - FTP 服务器插件 功能:文件传输、用户管理、访问控制、日志记录 """ import os import logging import threading from datetime import datetime from typing import Dict, List, Optional from oss.plugin.base import BasePlugin from oss.core.context import Context logger = logging.getLogger("futureoss.ftp") class FTPServerPlugin(BasePlugin): name = "ftp_server" version = "1.1.0" description = "FTP 文件传输服务:安全文件上传下载" def __init__(self): super().__init__() self.root_dir = "./ftp_root" self.users: Dict[str, Dict] = {} self.sessions: Dict[str, Dict] = {} self.server = None self.running = False # 默认管理员账户 self.users["admin"] = { "password": "admin123", # 生产环境应加密存储 "home_dir": self.root_dir, "permissions": ["read", "write", "delete"], "max_connections": 5 } def on_load(self, ctx: Context): logger.info("FTP 服务器插件已加载") os.makedirs(self.root_dir, exist_ok=True) # 注册命令 ctx.register_command("ftp.user.add", self.add_user) ctx.register_command("ftp.user.remove", self.remove_user) ctx.register_command("ftp.user.list", self.list_users) ctx.register_command("ftp.start", self.start_server) ctx.register_command("ftp.stop", self.stop_server) ctx.register_command("ftp.session.list", self.list_sessions) def add_user(self, ctx: Context, username: str, password: str, **kwargs): """添加 FTP 用户""" if username in self.users: return {"status": "error", "message": "用户已存在"} self.users[username] = { "password": password, "home_dir": os.path.join(self.root_dir, username), "permissions": kwargs.get("permissions", ["read"]), "max_connections": kwargs.get("max_connections", 3) } # 创建用户主目录 os.makedirs(self.users[username]["home_dir"], exist_ok=True) logger.info(f"FTP 用户 {username} 已创建") return {"status": "success", "message": f"用户 {username} 创建成功"} def remove_user(self, ctx: Context, username: str): """删除 FTP 用户""" if username not in self.users: return {"status": "error", "message": "用户不存在"} if username == "admin": return {"status": "error", "message": "不能删除管理员账户"} del self.users[username] logger.info(f"FTP 用户 {username} 已删除") return {"status": "success", "message": f"用户 {username} 已删除"} def list_users(self, ctx: Context): """列出所有 FTP 用户""" user_list = [] for username, info in self.users.items(): user_list.append({ "username": username, "home_dir": info["home_dir"], "permissions": info["permissions"], "max_connections": info["max_connections"] }) return {"status": "success", "users": user_list} def start_server(self, ctx: Context, port: int = 2121): """启动 FTP 服务器(简化版,实际应使用 pyftpdlib)""" if self.running: return {"status": "error", "message": "FTP 服务器已在运行"} self.running = True self.port = port # 模拟服务器启动 logger.info(f"FTP 服务器启动在端口 {port}") # 在实际生产中应启动真正的 FTP 服务 # from pyftpdlib.authorizers import DummyAuthorizer # from pyftpdlib.handlers import FTPHandler # from pyftpdlib.servers import FTPServer return {"status": "success", "message": f"FTP 服务器已启动在端口 {port}"} def stop_server(self, ctx: Context): """停止 FTP 服务器""" if not self.running: return {"status": "error", "message": "FTP 服务器未运行"} self.running = False logger.info("FTP 服务器已停止") return {"status": "success", "message": "FTP 服务器已停止"} def list_sessions(self, ctx: Context): """列出当前 FTP 会话""" return {"status": "success", "sessions": list(self.sessions.values())} def on_unload(self, ctx: Context): if self.running: self.stop_server(ctx) logger.info("FTP 服务器插件已卸载")