Files
NebulaShell/store/@{FutureOSS}/http-tcp/middleware.py
Falck 76147bae94 初始提交 - FutureOSS v1.0 插件化运行时框架
一切皆为插件的开发者工具运行时框架

🧩 核心特性:
  - 插件热插拔 (importlib 动态加载)
  - 依赖自动解析 (拓扑排序 + 循环检测)
  - 企业级稳定 (熔断/降级/重试/隔离)
  - 事件驱动 (发布/订阅事件总线)
  - 完整配置 (YAML 配置 + 热重载)
2026-04-06 09:57:10 +08:00

54 lines
1.4 KiB
Python

"""TCP HTTP 中间件链"""
from typing import Callable, Optional, Any
class TcpMiddleware:
"""TCP 中间件基类"""
def process(self, request: dict, next_fn: Callable) -> Optional[dict]:
"""处理请求"""
return next_fn()
class TcpLogMiddleware(TcpMiddleware):
"""日志中间件"""
def process(self, request, next_fn):
print(f"[http-tcp] {request.get('method')} {request.get('path')}")
return next_fn()
class TcpCorsMiddleware(TcpMiddleware):
"""CORS 中间件"""
def process(self, request, next_fn):
response = next_fn()
if response:
response.setdefault("headers", {})
response["headers"]["Access-Control-Allow-Origin"] = "*"
return response
class TcpMiddlewareChain:
"""TCP 中间件链"""
def __init__(self):
self.middlewares: list[TcpMiddleware] = []
self.add(TcpLogMiddleware())
self.add(TcpCorsMiddleware())
def add(self, middleware: TcpMiddleware):
"""添加中间件"""
self.middlewares.append(middleware)
def run(self, request: dict) -> Optional[dict]:
"""执行中间件链"""
idx = 0
def next_fn():
nonlocal idx
if idx < len(self.middlewares):
mw = self.middlewares[idx]
idx += 1
return mw.process(request, next_fn)
return None
return next_fn()