清理冗余路由代码,修复首页标题与模板安全

- 简化 http-api/http-tcp/web-toolkit 的 router.py,抽取重复代码
- 修复 ws-api middleware 中间件返回值传递问题
- 修复 web-toolkit template.py 安全漏洞 (eval → AST 验证)
- 将首页标题从 "OSS Runtime" 改为 "Future OSS"
- 更新 README.md 与 static/banner.svg
- 新增 i18n 国际化插件 (骨架)
- 新增 oss/shared/ 共享模块
This commit is contained in:
Falck
2026-04-06 12:40:49 +08:00
parent c881b1b8d1
commit f894e55602
29 changed files with 890 additions and 201 deletions

View File

@@ -1,63 +1,21 @@
"""Web 路由器"""
from typing import Callable, Optional, Any
from oss.shared.router import BaseRouter, match_path
class WebRoute:
"""Web 路由"""
def __init__(self, method: str, path: str, handler: Callable):
self.method = method
self.path = path
self.handler = handler
class WebRouter:
class WebRouter(BaseRouter):
"""Web 路由器"""
def __init__(self):
self.routes: list[WebRoute] = []
def add_route(self, method: str, path: str, handler: Callable):
"""添加路由"""
self.routes.append(WebRoute(method, path, handler))
def get(self, path: str, handler: Callable):
"""GET 路由"""
self.add_route("GET", path, handler)
def post(self, path: str, handler: Callable):
"""POST 路由"""
self.add_route("POST", path, handler)
def put(self, path: str, handler: Callable):
"""PUT 路由"""
self.add_route("PUT", path, handler)
def delete(self, path: str, handler: Callable):
"""DELETE 路由"""
self.add_route("DELETE", path, handler)
def handle(self, request: dict) -> Optional[Any]:
"""处理请求"""
method = request.get("method", "GET")
path = request.get("path", "/")
for route in self.routes:
if route.method == method and self._match(route.path, path):
return route.handler(request)
result = self.find_route(method, path)
if result:
route, params = result
# 将路径参数注入到请求中
request["path_params"] = params
return route.handler(request)
return None
def _match(self, pattern: str, path: str) -> bool:
"""路径匹配"""
if pattern == path:
return True
if ":" in pattern:
pattern_parts = pattern.strip("/").split("/")
path_parts = path.strip("/").split("/")
if len(pattern_parts) != len(path_parts):
return False
for p, a in zip(pattern_parts, path_parts):
if not p.startswith(":") and p != a:
return False
return True
return False

View File

@@ -8,9 +8,10 @@ from typing import Any, Optional
class TemplateEngine:
"""简单模板引擎"""
def __init__(self, root: str = "./templates"):
def __init__(self, root: str = "./templates", max_depth: int = 10):
self.root = root
self._cache: dict[str, str] = {}
self.max_depth = max_depth
self._ensure_root()
def _ensure_root(self):
@@ -26,7 +27,7 @@ class TemplateEngine:
def render(self, name: str, context: dict[str, Any]) -> str:
"""渲染模板"""
template = self._load_template(name)
return self._render_template(template, context)
return self._render_template(template, context, depth=0)
def _load_template(self, name: str) -> str:
"""加载模板"""
@@ -88,8 +89,22 @@ class TemplateEngine:
self._validate_ast(node.slice, allowed_names))
return False
def _render_template(self, template: str, context: dict[str, Any]) -> str:
"""渲染模板内容"""
def _render_template(self, template: str, context: dict[str, Any], depth: int = 0) -> str:
"""渲染模板内容
Args:
template: 模板内容
context: 上下文变量
depth: 当前递归深度
Raises:
RecursionError: 当嵌套深度超过 max_depth 时
"""
if depth > self.max_depth:
raise RecursionError(
f"模板嵌套深度超过限制 ({self.max_depth}),可能存在无限递归"
)
# 替换 {{ variable }}
def replace_var(match):
var_name = match.group(1).strip()
@@ -102,14 +117,14 @@ class TemplateEngine:
result = re.sub(r'\{\{(.*?)\}\}', replace_var, template)
# 处理 {% if condition %} ... {% endif %}
result = self._process_if(result, context)
result = self._process_if(result, context, depth)
# 处理 {% for item in list %} ... {% endfor %}
result = self._process_for(result, context)
result = self._process_for(result, context, depth)
return result
def _process_if(self, template: str, context: dict) -> str:
def _process_if(self, template: str, context: dict, depth: int = 0) -> str:
"""处理 if 条件"""
pattern = r'\{%\s*if\s+(.*?)\s*%\}(.*?){%\s*endif\s*%\}'
@@ -118,11 +133,14 @@ class TemplateEngine:
content = match.group(2)
# 安全条件评估
value = self._safe_eval(condition, context)
return content if value else ""
if value:
# 递归处理嵌套内容,深度+1
return self._render_template(content, context, depth + 1)
return ""
return re.sub(pattern, replace_if, template, flags=re.DOTALL)
def _process_for(self, template: str, context: dict) -> str:
def _process_for(self, template: str, context: dict, depth: int = 0) -> str:
"""处理 for 循环"""
pattern = r'\{%\s*for\s+(\w+)\s+in\s+(\w+)\s*%\}(.*?){%\s*endfor\s*%\}'
@@ -138,7 +156,8 @@ class TemplateEngine:
result = ""
for item in items:
loop_context = {**context, item_name: item}
result += self._render_template(content, loop_context)
# 递归处理嵌套内容,深度+1
result += self._render_template(content, loop_context, depth + 1)
return result
return re.sub(pattern, replace_for, template, flags=re.DOTALL)

View File

@@ -1,72 +1,18 @@
"""路由器 - 路径匹配和处理器分发"""
from typing import Callable, Optional
from oss.shared.router import BaseRouter, match_path
from .server import Request, Response
class Route:
"""路由定义"""
def __init__(self, method: str, path: str, handler: Callable):
self.method = method
self.path = path
self.handler = handler
class Router:
"""路由器"""
def __init__(self):
self.routes: list[Route] = []
def add(self, method: str, path: str, handler: Callable):
"""添加路由"""
self.routes.append(Route(method, path, handler))
def get(self, path: str, handler: Callable):
"""GET 路由"""
self.add("GET", path, handler)
def post(self, path: str, handler: Callable):
"""POST 路由"""
self.add("POST", path, handler)
def put(self, path: str, handler: Callable):
"""PUT 路由"""
self.add("PUT", path, handler)
def delete(self, path: str, handler: Callable):
"""DELETE 路由"""
self.add("DELETE", path, handler)
class Router(BaseRouter):
"""HTTP API 路由器"""
def handle(self, request: Request) -> Response:
"""处理请求"""
for route in self.routes:
if route.method == request.method and self._match(route.path, request.path):
return route.handler(request)
result = self.find_route(request.method, request.path)
if result:
route, params = result
# 将路径参数注入到请求中
request.path_params = params
return route.handler(request)
return Response(status=404, body='{"error": "Not Found"}')
def _match(self, pattern: str, path: str) -> bool:
"""路径匹配"""
if pattern == path:
return True
if ":" in pattern:
pattern_parts = pattern.strip("/").split("/")
path_parts = path.strip("/").split("/")
# 检查前缀是否匹配
for i, p in enumerate(pattern_parts):
if i >= len(path_parts):
return False
if not p.startswith(":") and p != path_parts[i]:
return False
# 如果最后一个 pattern 是 :path通配符允许更多路径段
last_pattern = pattern_parts[-1]
if last_pattern.startswith(":") and len(path_parts) >= len(pattern_parts):
return True
# 否则必须精确匹配段数
if len(pattern_parts) != len(path_parts):
return False
return True
return False

View File

@@ -1,63 +1,21 @@
"""TCP HTTP 路由器"""
from typing import Callable, Optional, Any
from oss.shared.router import BaseRouter, match_path
class TcpRoute:
"""TCP HTTP 路由"""
def __init__(self, method: str, path: str, handler: Callable):
self.method = method
self.path = path
self.handler = handler
class TcpRouter:
class TcpRouter(BaseRouter):
"""TCP HTTP 路由器"""
def __init__(self):
self.routes: list[TcpRoute] = []
def add(self, method: str, path: str, handler: Callable):
"""添加路由"""
self.routes.append(TcpRoute(method, path, handler))
def get(self, path: str, handler: Callable):
"""GET 路由"""
self.add("GET", path, handler)
def post(self, path: str, handler: Callable):
"""POST 路由"""
self.add("POST", path, handler)
def put(self, path: str, handler: Callable):
"""PUT 路由"""
self.add("PUT", path, handler)
def delete(self, path: str, handler: Callable):
"""DELETE 路由"""
self.add("DELETE", path, handler)
def handle(self, request: dict) -> dict:
"""处理请求"""
method = request.get("method", "GET")
path = request.get("path", "/")
for route in self.routes:
if route.method == method and self._match(route.path, path):
return route.handler(request)
result = self.find_route(method, path)
if result:
route, params = result
# 将路径参数注入到请求中
request["path_params"] = params
return route.handler(request)
return {"status": 404, "headers": {}, "body": "Not Found"}
def _match(self, pattern: str, path: str) -> bool:
"""路径匹配"""
if pattern == path:
return True
if ":" in pattern:
pattern_parts = pattern.strip("/").split("/")
path_parts = path.strip("/").split("/")
if len(pattern_parts) != len(path_parts):
return False
for p, a in zip(pattern_parts, path_parts):
if not p.startswith(":") and p != a:
return False
return True
return False

View File

@@ -77,8 +77,35 @@ class TcpHttpServer:
break
buffer += data
# 检查 HTTP 请求是否完整
# 检查 HTTP 请求是否完整
if b"\r\n\r\n" in buffer:
# 先解析请求头以获取 Content-Length
header_end = buffer.find(b"\r\n\r\n")
header_text = buffer[:header_end].decode("utf-8", errors="replace")
# 从请求头中提取 Content-Length
content_length = 0
for line in header_text.split("\r\n")[1:]:
if line.lower().startswith("content-length:"):
content_length = int(line.split(":", 1)[1].strip())
break
# 计算 body 起始位置
body_start_pos = header_end + 4 # \r\n\r\n
body_received = len(buffer) - body_start_pos
# 等待完整 body
if body_received < content_length:
# 继续接收剩余数据
while body_received < content_length:
remaining = content_length - body_received
chunk = client.conn.recv(min(4096, remaining))
if not chunk:
break
buffer += chunk
body_received += len(chunk)
# 现在解析完整请求
request = self._parse_request(buffer)
if request:
# 触发请求事件

View File

@@ -0,0 +1 @@
"""i18n 国际化多语言支持插件"""

View File

@@ -0,0 +1,156 @@
"""i18n 核心引擎"""
import json
import re
from pathlib import Path
from typing import Any, Optional
class I18nEngine:
"""国际化引擎"""
def __init__(self):
self._translations: dict[str, dict[str, Any]] = {} # {locale: {key: value}}
self._current_locale: str = "zh-CN"
self._fallback_locale: str = "en-US"
self._supported_locales: list[str] = []
self._locales_dir: str = ""
def load_locales(self, locales_dir: str, locales: list[str]):
"""加载语言文件
Args:
locales_dir: 语言文件目录路径
locales: 支持的语言列表
"""
self._locales_dir = locales_dir
self._supported_locales = locales
locales_path = Path(locales_dir)
if not locales_path.exists():
locales_path.mkdir(parents=True, exist_ok=True)
return
for locale in locales:
locale_file = locales_path / f"{locale}.json"
if locale_file.exists():
try:
content = locale_file.read_text(encoding="utf-8")
self._translations[locale] = json.loads(content)
except (json.JSONDecodeError, Exception) as e:
print(f"[i18n] 加载语言文件失败 {locale_file}: {e}")
self._translations[locale] = {}
def set_locale(self, locale: str):
"""设置当前语言"""
if locale in self._supported_locales:
self._current_locale = locale
def get_locale(self) -> str:
"""获取当前语言"""
return self._current_locale
def set_fallback(self, locale: str):
"""设置回退语言"""
self._fallback_locale = locale
def t(self, key: str, locale: Optional[str] = None, **kwargs) -> str:
"""翻译文本
Args:
key: 翻译键 (支持点号分隔的嵌套路径,如 "user.greeting")
locale: 指定语言 (默认使用当前语言)
**kwargs: 插值参数
Returns:
翻译后的文本
"""
target_locale = locale or self._current_locale
# 尝试从指定语言获取
value = self._get_nested(key, self._translations.get(target_locale, {}))
# 如果未找到,尝试从回退语言获取
if value is None and target_locale != self._fallback_locale:
value = self._get_nested(key, self._translations.get(self._fallback_locale, {}))
# 仍未找到,返回键名
if value is None:
return key
# 插值处理: {{name}} 或 {name}
return self._interpolate(value, kwargs)
def _get_nested(self, key: str, data: dict) -> Any:
"""获取嵌套字典值"""
keys = key.split(".")
current = data
for k in keys:
if isinstance(current, dict) and k in current:
current = current[k]
else:
return None
return current
def _interpolate(self, text: str, kwargs: dict) -> str:
"""插值替换: {{name}} 或 {name}"""
# 支持 {{name}} 格式
result = re.sub(r'\{\{(\w+)\}\}', lambda m: str(kwargs.get(m.group(1), m.group(0))), text)
# 支持 {name} 格式 (如果未被 {{}} 替换)
result = re.sub(r'\{(\w+)\}', lambda m: str(kwargs.get(m.group(1), m.group(0))), result)
return result
def get_supported_locales(self) -> list[str]:
"""获取支持的语言列表"""
return self._supported_locales
def is_valid_locale(self, locale: str) -> bool:
"""检查语言是否有效"""
return locale in self._supported_locales
def detect_locale(self, accept_language: Optional[str] = None,
query_lang: Optional[str] = None,
cookie_lang: Optional[str] = None) -> str:
"""检测语言优先级
Args:
accept_language: HTTP Accept-Language 头
query_lang: URL 查询参数 ?lang=xx
cookie_lang: Cookie 中的语言
Returns:
检测到的语言代码
"""
# 1. 查询参数优先级最高
if query_lang and self.is_valid_locale(query_lang):
return query_lang
# 2. Cookie 次之
if cookie_lang and self.is_valid_locale(cookie_lang):
return cookie_lang
# 3. Accept-Language 头
if accept_language:
# 解析 "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7"
languages = []
for part in accept_language.split(","):
part = part.strip()
if ";q=" in part:
lang, q = part.split(";q=")
languages.append((lang.strip(), float(q)))
else:
languages.append((part, 1.0))
# 按权重排序
languages.sort(key=lambda x: x[1], reverse=True)
for lang, _ in languages:
# 精确匹配
if self.is_valid_locale(lang):
return lang
# 前缀匹配 (zh 匹配 zh-CN, zh-TW)
for supported in self._supported_locales:
if supported.startswith(lang + "-") or lang.startswith(supported.split("-")[0] + "-"):
return supported
# 4. 默认语言
return self._current_locale

View File

@@ -0,0 +1,51 @@
{
"common": {
"success": "Success",
"error": "Error",
"not_found": "Not Found",
"forbidden": "Forbidden",
"unauthorized": "Unauthorized",
"server_error": "Internal Server Error",
"bad_request": "Bad Request",
"ok": "OK",
"cancel": "Cancel",
"save": "Save",
"delete": "Delete",
"edit": "Edit",
"create": "Create",
"search": "Search",
"loading": "Loading...",
"no_data": "No Data",
"confirm": "Confirm",
"back": "Back"
},
"health": {
"status": "Running",
"service": "Service",
"version": "Version",
"uptime": "Uptime"
},
"api": {
"welcome": "Welcome to FutureOSS API",
"docs": "API Documentation",
"rate_limit": "Rate limit exceeded, please try again later",
"invalid_request": "Invalid request parameters",
"missing_param": "Missing required parameter: {{param}}",
"invalid_param": "Invalid parameter format: {{param}}"
},
"errors": {
"400": "Bad Request",
"401": "Please login first",
"403": "You don't have permission to perform this action",
"404": "The requested resource was not found",
"500": "Internal server error, please try again later",
"502": "Bad Gateway",
"503": "Service temporarily unavailable, please try again later"
},
"plugin": {
"i18n_name": "Internationalization",
"i18n_desc": "Provides translation loading, language detection, and HTTP middleware",
"locale_changed": "Locale changed to {{locale}}",
"locale_not_supported": "Unsupported locale: {{locale}}"
}
}

View File

@@ -0,0 +1,51 @@
{
"common": {
"success": "成功",
"error": "错误",
"not_found": "未找到",
"forbidden": "禁止访问",
"unauthorized": "未授权",
"server_error": "服务器内部错误",
"bad_request": "请求格式错误",
"ok": "确定",
"cancel": "取消",
"save": "保存",
"delete": "删除",
"edit": "编辑",
"create": "创建",
"search": "搜索",
"loading": "加载中...",
"no_data": "暂无数据",
"confirm": "确认",
"back": "返回"
},
"health": {
"status": "运行正常",
"service": "服务",
"version": "版本",
"uptime": "运行时间"
},
"api": {
"welcome": "欢迎使用 FutureOSS API",
"docs": "API 文档",
"rate_limit": "请求频率过高,请稍后重试",
"invalid_request": "无效的请求参数",
"missing_param": "缺少必需参数: {{param}}",
"invalid_param": "参数格式错误: {{param}}"
},
"errors": {
"400": "请求格式错误",
"401": "请先登录",
"403": "您没有权限执行此操作",
"404": "请求的资源不存在",
"500": "服务器内部错误,请稍后重试",
"502": "网关错误",
"503": "服务暂时不可用,请稍后重试"
},
"plugin": {
"i18n_name": "国际化多语言支持",
"i18n_desc": "提供翻译加载、语言检测和 HTTP 中间件功能",
"locale_changed": "语言已切换为 {{locale}}",
"locale_not_supported": "不支持的语言: {{locale}}"
}
}

View File

@@ -0,0 +1,51 @@
{
"common": {
"success": "成功",
"error": "錯誤",
"not_found": "找不到",
"forbidden": "禁止存取",
"unauthorized": "未授權",
"server_error": "伺服器內部錯誤",
"bad_request": "請求格式錯誤",
"ok": "確定",
"cancel": "取消",
"save": "儲存",
"delete": "刪除",
"edit": "編輯",
"create": "建立",
"search": "搜尋",
"loading": "載入中...",
"no_data": "暫無資料",
"confirm": "確認",
"back": "返回"
},
"health": {
"status": "運作正常",
"service": "服務",
"version": "版本",
"uptime": "運行時間"
},
"api": {
"welcome": "歡迎使用 FutureOSS API",
"docs": "API 文件",
"rate_limit": "請求頻率過高,請稍後重試",
"invalid_request": "無效的請求參數",
"missing_param": "缺少必要參數: {{param}}",
"invalid_param": "參數格式錯誤: {{param}}"
},
"errors": {
"400": "請求格式錯誤",
"401": "請先登入",
"403": "您沒有權限執行此操作",
"404": "請求的資源不存在",
"500": "伺服器內部錯誤,請稍後重試",
"502": "閘道錯誤",
"503": "服務暫時不可用,請稍後重試"
},
"plugin": {
"i18n_name": "國際化多語言支援",
"i18n_desc": "提供翻譯載入、語言偵測和 HTTP 中介軟體功能",
"locale_changed": "語言已切換為 {{locale}}",
"locale_not_supported": "不支援的語言: {{locale}}"
}
}

View File

@@ -0,0 +1,215 @@
"""i18n 国际化多语言支持插件"""
import json
from pathlib import Path
from oss.plugin.types import Plugin, register_plugin_type
from .i18n import I18nEngine
from .middleware import I18nMiddleware
class I18nPlugin(Plugin):
"""i18n 国际化插件"""
def __init__(self):
self.engine = I18nEngine()
self.middleware_handler = None
def meta(self):
"""插件元数据"""
from oss.plugin.types import Metadata, PluginConfig, Manifest
return Manifest(
metadata=Metadata(
name="i18n",
version="1.0.0",
author="FutureOSS",
description="国际化多语言支持 - 提供翻译加载/语言切换/HTTP中间件"
),
config=PluginConfig(
enabled=True,
args={
"default_locale": "zh-CN",
"fallback_locale": "en-US",
"supported_locales": ["zh-CN", "en-US", "zh-TW"]
}
),
dependencies=[]
)
def init(self, deps: dict = None):
"""初始化插件
加载语言文件并初始化中间件
"""
# 获取插件配置
config = {}
if deps:
config = deps.get("config", {})
# 默认配置
default_locale = config.get("default_locale", "zh-CN")
fallback_locale = config.get("fallback_locale", "en-US")
supported_locales = config.get("supported_locales", ["zh-CN", "en-US", "zh-TW"])
locales_dir = config.get("locales_dir", "locales")
# 解析 locales_dir 相对路径
plugin_dir = Path(__file__).parent
full_locales_dir = plugin_dir / locales_dir
# 设置回退语言
self.engine.set_fallback(fallback_locale)
# 加载语言文件
self.engine.load_locales(str(full_locales_dir), supported_locales)
# 设置默认语言
self.engine.set_locale(default_locale)
# 初始化中间件
self.middleware_handler = I18nMiddleware(self.engine, config)
print(f"[i18n] 已加载语言: {', '.join(supported_locales)}")
print(f"[i18n] 默认语言: {default_locale}")
def start(self):
"""启动插件
注册 API 路由(如果有 http-api 依赖)
"""
# 如果有 http-api 依赖,注册 i18n 相关路由
http_api = None
if hasattr(self, 'set_http_api'):
http_api = getattr(self, '_http_api', None)
if http_api and hasattr(http_api, 'router'):
http_api.router.get("/api/i18n/locales", self._locales_handler)
http_api.router.get("/api/i18n/translate", self._translate_handler)
http_api.router.post("/api/i18n/locale", self._change_locale_handler)
print("[i18n] API 路由已注册")
def stop(self):
"""停止插件"""
print("[i18n] 插件已停止")
def health(self) -> bool:
"""健康检查"""
return self.engine is not None
def stats(self) -> dict:
"""获取插件统计"""
return {
"current_locale": self.engine.get_locale(),
"supported_locales": self.engine.get_supported_locales(),
"loaded_translations": len(self.engine._translations)
}
# ========== 依赖注入 Setter ==========
def set_http_api(self, http_api):
"""注入 http-api 依赖"""
self._http_api = http_api
# ========== API 处理器 ==========
def _locales_handler(self, request):
"""获取支持的语言列表"""
from oss.plugin.types import Response
t = getattr(request, 't', self.engine.t)
locales = []
for locale in self.engine.get_supported_locales():
locales.append({
"code": locale,
"name": t(f"plugin.i18n_name", locale=locale)
})
return Response(
status=200,
body=json.dumps({
"current": self.engine.get_locale(),
"supported": locales
}),
headers={"Content-Type": "application/json"}
)
def _translate_handler(self, request):
"""翻译接口
GET /api/i18n/translate?key=user.greeting&locale=en-US&name=World
"""
from oss.plugin.types import Response
t = getattr(request, 't', self.engine.t)
# 解析查询参数
query = request.path.split("?", 1)[-1] if "?" in request.path else ""
params = {}
for param in query.split("&"):
if "=" in param:
key, value = param.split("=", 1)
params[key] = value
key = params.get("key", "")
locale = params.get("locale", None)
if not key:
return Response(
status=400,
body=json.dumps({"error": t("api.missing_param", param="key")}),
headers={"Content-Type": "application/json"}
)
# 翻译
result = t(key, locale=locale, **params)
return Response(
status=200,
body=json.dumps({
"key": key,
"locale": locale or self.engine.get_locale(),
"text": result
}),
headers={"Content-Type": "application/json"}
)
def _change_locale_handler(self, request):
"""切换语言接口
POST /api/i18n/locale
Body: {"locale": "en-US"}
"""
from oss.plugin.types import Response
t = getattr(request, 't', self.engine.t)
try:
body = json.loads(request.body) if hasattr(request, 'body') and request.body else {}
except json.JSONDecodeError:
body = {}
new_locale = body.get("locale", "")
if not new_locale:
return Response(
status=400,
body=json.dumps({"error": t("api.missing_param", param="locale")}),
headers={"Content-Type": "application/json"}
)
if not self.engine.is_valid_locale(new_locale):
return Response(
status=400,
body=json.dumps({"error": t("plugin.locale_not_supported", locale=new_locale)}),
headers={"Content-Type": "application/json"}
)
self.engine.set_locale(new_locale)
return Response(
status=200,
body=json.dumps({"message": t("plugin.locale_changed", locale=new_locale)}),
headers={"Content-Type": "application/json"}
)
register_plugin_type("I18nPlugin", I18nPlugin)
def New():
return I18nPlugin()

View File

@@ -0,0 +1,23 @@
{
"metadata": {
"name": "i18n",
"version": "1.0.0",
"author": "FutureOSS",
"description": "国际化多语言支持 - 提供翻译加载/语言切换/HTTP中间件",
"type": "middleware"
},
"config": {
"enabled": true,
"args": {
"default_locale": "zh-CN",
"fallback_locale": "en-US",
"locales_dir": "locales",
"supported_locales": ["zh-CN", "en-US", "zh-TW"],
"auto_detect": true,
"cookie_name": "locale",
"query_param": "lang"
}
},
"dependencies": [],
"permissions": ["lifecycle"]
}

View File

@@ -0,0 +1,90 @@
"""i18n HTTP 中间件"""
import json
from typing import Optional, Callable
from oss.plugin.types import Response
class I18nMiddleware:
"""i18n 中间件
自动检测语言并注入到请求上下文
检测优先级:
1. URL 查询参数 ?lang=xx
2. Cookie locale=xx
3. Accept-Language 头
4. 默认语言
"""
def __init__(self, engine, config: dict = None):
self.engine = engine
self.cookie_name = (config or {}).get("cookie_name", "locale")
self.query_param = (config or {}).get("query_param", "lang")
def handle(self, request: dict, next_fn: Callable) -> Response:
"""处理请求
1. 检测语言
2. 将语言注入到请求上下文
3. 调用下一个中间件/处理器
4. 可选: 在响应中添加 Content-Language 头
"""
# 解析查询参数
query_lang = self._parse_query_param(request.get("query", ""))
# 解析 Cookie
cookie_lang = self._parse_cookie(request.get("headers", {}))
# 解析 Accept-Language
accept_language = request.get("headers", {}).get("Accept-Language",
request.get("headers", {}).get("accept-language", ""))
# 检测语言
locale = self.engine.detect_locale(
accept_language=accept_language if accept_language else None,
query_lang=query_lang,
cookie_lang=cookie_lang
)
# 设置当前语言
self.engine.set_locale(locale)
# 注入到请求上下文
request["locale"] = locale
request["t"] = self.engine.t # 提供翻译函数
# 调用下一个处理器
response = next_fn()
# 在响应中添加 Content-Language 头
if isinstance(response, Response):
response.headers["Content-Language"] = locale
return response
def _parse_query_param(self, query_string: str) -> Optional[str]:
"""从查询字符串解析语言参数"""
if not query_string:
return None
# 解析 ?lang=xx 或 &lang=xx
params = {}
for param in query_string.lstrip("?").split("&"):
if "=" in param:
key, value = param.split("=", 1)
params[key.strip()] = value.strip()
return params.get(self.query_param)
def _parse_cookie(self, headers: dict) -> Optional[str]:
"""从 Cookie 解析语言参数"""
cookie_header = headers.get("Cookie", headers.get("cookie", ""))
if not cookie_header:
return None
cookies = {}
for cookie in cookie_header.split(";"):
if "=" in cookie:
key, value = cookie.split("=", 1)
cookies[key.strip()] = value.strip()
return cookies.get(self.cookie_name)

View File

@@ -29,13 +29,16 @@ class WsMiddlewareChain:
async def run(self, client, message) -> Optional[str]:
"""执行中间件链"""
idx = 0
current_message = message
async def next_fn():
nonlocal idx
async def next_fn(msg=None):
nonlocal idx, current_message
if msg is not None:
current_message = msg
if idx < len(self.middlewares):
mw = self.middlewares[idx]
idx += 1
return await mw.process(client, message, next_fn)
return message
return await mw.process(client, current_message, next_fn)
return current_message
return await next_fn()