"""输入验证 — JSON Schema 校验 + 参数白名单 + 类型强制""" import json import re from typing import Any, Optional from oss.logger.logger import Log class ValidationError(Exception): def __init__(self, message: str, field: str = None): self.field = field super().__init__(message) class InputValidator: """输入验证器""" # ── 内置类型校验器 ── @staticmethod def is_string(val: Any, min_len: int = 0, max_len: int = None) -> bool: if not isinstance(val, str): return False if len(val) < min_len: return False if max_len and len(val) > max_len: return False return True @staticmethod def is_integer(val: Any, min_val: int = None, max_val: int = None) -> bool: if not isinstance(val, int) or isinstance(val, bool): return False if min_val is not None and val < min_val: return False if max_val is not None and val > max_val: return False return True @staticmethod def is_float(val: Any, min_val: float = None, max_val: float = None) -> bool: if not isinstance(val, (int, float)) or isinstance(val, bool): return False if min_val is not None and val < min_val: return False if max_val is not None and val > max_val: return False return True @staticmethod def is_boolean(val: Any) -> bool: return isinstance(val, bool) @staticmethod def is_list(val: Any, item_type: type = None) -> bool: if not isinstance(val, list): return False if item_type: return all(isinstance(v, item_type) for v in val) return True @staticmethod def is_dict(val: Any) -> bool: return isinstance(val, dict) @staticmethod def is_email(val: str) -> bool: return bool(re.match(r"^[^\s@]+@[^\s@]+\.[^\s@]+$", val)) @staticmethod def is_ip_address(val: str) -> bool: return bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", val)) @staticmethod def is_alphanumeric(val: str) -> bool: return bool(re.match(r"^[a-zA-Z0-9_\-]+$", val)) # ── JSON Schema 校验 ── def validate_schema(self, data: dict, schema: dict) -> list[str]: """根据 JSON Schema 校验数据,返回错误列表 Schema 格式: { "field_name": { "type": "string|int|float|bool|list|dict|email|ip", "required": True/False, "min_len": 1, # 仅 string "max_len": 100, # 仅 string "min_val": 0, # 仅 int/float "max_val": 100, # 仅 int/float "pattern": "^...$", # 正则(仅 string) "default": "val", # 默认值(可选字段) "items": "string", # list 元素类型 "fields": {...}, # 嵌套 dict schema } } """ errors = [] type_map = { "string": lambda v, r: self.is_string(v, r.get("min_len", 0), r.get("max_len")), "int": lambda v, r: self.is_integer(v, r.get("min_val"), r.get("max_val")), "float": lambda v, r: self.is_float(v, r.get("min_val"), r.get("max_val")), "bool": lambda v, _: self.is_boolean(v), "list": lambda v, r: self.is_list(v), "dict": lambda v, _: self.is_dict(v), "email": lambda v, _: self.is_email(v), "ip": lambda v, _: self.is_ip_address(v), } for field, rules in schema.items(): value = data.get(field) # 必填检查 if rules.get("required", False): if value is None: errors.append(f"缺少必填字段: {field}") continue elif value is None: continue # 类型检查 expected_type = rules.get("type", "string") checker = type_map.get(expected_type) if checker: if not checker(value, rules): errors.append(f"字段 '{field}' 类型错误,期望 {expected_type}") # 正则匹配(string 类型) pattern = rules.get("pattern") if pattern and isinstance(value, str): if not re.match(pattern, value): errors.append(f"字段 '{field}' 格式不匹配: {pattern}") # 嵌套 dict 校验 nested = rules.get("fields") if nested and isinstance(value, dict): errors.extend(self.validate_schema(value, nested)) return errors # ── 快捷校验 ── def validate_or_raise(self, data: dict, schema: dict): """校验失败抛出 ValidationError""" errors = self.validate_schema(data, schema) if errors: raise ValidationError(errors[0]) _validator_instance: Optional[InputValidator] = None def get_validator() -> InputValidator: global _validator_instance if _validator_instance is None: _validator_instance = InputValidator() return _validator_instance