Files
NebulaShell/oss/core/security/input_validator.py
Starlight-apk 5e957096fa
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
feat: Phase 1 - 安全中间件 + 运维工具箱
新增 oss/core/security/ 模块(852行):
- jwt_auth.py: JWT签发/验证(HMAC-SHA256,零外部依赖)
- csrf.py: CSRF Token生成与校验
- input_validator.py: JSON Schema校验+类型强制
- tls.py: 自签名证书生成+SSL上下文

新增 oss/core/ops/ 模块:
- health.py: 增强版/health端点(CPU/内存/磁盘/运行时间)
- metrics.py: Prometheus兼容/metrics端点

对接改造:
- engine.py: 导出新模块
- manager.py: 注册/api/login /health /metrics路由
- middleware.py: CSRF+InputValidation中间件
- config.py: JWT_SECRET/CSRF_SECRET等配置项
- security.py→security/__init__.py: 合并插件沙箱与HTTP安全
2026-05-17 15:42:40 +08:00

159 lines
5.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""输入验证 — JSON Schema 校验 + 参数白名单 + 类型强制"""
import json
import re
from typing import Any, Optional
from oss.logger.logger import Log
class ValidationError(Exception):
def __init__(self, message: str, field: str = None):
self.field = field
super().__init__(message)
class InputValidator:
"""输入验证器"""
# ── 内置类型校验器 ──
@staticmethod
def is_string(val: Any, min_len: int = 0, max_len: int = None) -> bool:
if not isinstance(val, str):
return False
if len(val) < min_len:
return False
if max_len and len(val) > max_len:
return False
return True
@staticmethod
def is_integer(val: Any, min_val: int = None, max_val: int = None) -> bool:
if not isinstance(val, int) or isinstance(val, bool):
return False
if min_val is not None and val < min_val:
return False
if max_val is not None and val > max_val:
return False
return True
@staticmethod
def is_float(val: Any, min_val: float = None, max_val: float = None) -> bool:
if not isinstance(val, (int, float)) or isinstance(val, bool):
return False
if min_val is not None and val < min_val:
return False
if max_val is not None and val > max_val:
return False
return True
@staticmethod
def is_boolean(val: Any) -> bool:
return isinstance(val, bool)
@staticmethod
def is_list(val: Any, item_type: type = None) -> bool:
if not isinstance(val, list):
return False
if item_type:
return all(isinstance(v, item_type) for v in val)
return True
@staticmethod
def is_dict(val: Any) -> bool:
return isinstance(val, dict)
@staticmethod
def is_email(val: str) -> bool:
return bool(re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", val))
@staticmethod
def is_ip_address(val: str) -> bool:
return bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", val))
@staticmethod
def is_alphanumeric(val: str) -> bool:
return bool(re.match(r"^[a-zA-Z0-9_\-]+$", val))
# ── JSON Schema 校验 ──
def validate_schema(self, data: dict, schema: dict) -> list[str]:
"""根据 JSON Schema 校验数据,返回错误列表
Schema 格式:
{
"field_name": {
"type": "string|int|float|bool|list|dict|email|ip",
"required": True/False,
"min_len": 1, # 仅 string
"max_len": 100, # 仅 string
"min_val": 0, # 仅 int/float
"max_val": 100, # 仅 int/float
"pattern": "^...$", # 正则(仅 string
"default": "val", # 默认值(可选字段)
"items": "string", # list 元素类型
"fields": {...}, # 嵌套 dict schema
}
}
"""
errors = []
type_map = {
"string": lambda v, r: self.is_string(v, r.get("min_len", 0), r.get("max_len")),
"int": lambda v, r: self.is_integer(v, r.get("min_val"), r.get("max_val")),
"float": lambda v, r: self.is_float(v, r.get("min_val"), r.get("max_val")),
"bool": lambda v, _: self.is_boolean(v),
"list": lambda v, r: self.is_list(v),
"dict": lambda v, _: self.is_dict(v),
"email": lambda v, _: self.is_email(v),
"ip": lambda v, _: self.is_ip_address(v),
}
for field, rules in schema.items():
value = data.get(field)
# 必填检查
if rules.get("required", False):
if value is None:
errors.append(f"缺少必填字段: {field}")
continue
elif value is None:
continue
# 类型检查
expected_type = rules.get("type", "string")
checker = type_map.get(expected_type)
if checker:
if not checker(value, rules):
errors.append(f"字段 '{field}' 类型错误,期望 {expected_type}")
# 正则匹配string 类型)
pattern = rules.get("pattern")
if pattern and isinstance(value, str):
if not re.match(pattern, value):
errors.append(f"字段 '{field}' 格式不匹配: {pattern}")
# 嵌套 dict 校验
nested = rules.get("fields")
if nested and isinstance(value, dict):
errors.extend(self.validate_schema(value, nested))
return errors
# ── 快捷校验 ──
def validate_or_raise(self, data: dict, schema: dict):
"""校验失败抛出 ValidationError"""
errors = self.validate_schema(data, schema)
if errors:
raise ValidationError(errors[0])
_validator_instance: Optional[InputValidator] = None
def get_validator() -> InputValidator:
global _validator_instance
if _validator_instance is None:
_validator_instance = InputValidator()
return _validator_instance