新增 oss/core/security/ 模块(852行): - jwt_auth.py: JWT签发/验证(HMAC-SHA256,零外部依赖) - csrf.py: CSRF Token生成与校验 - input_validator.py: JSON Schema校验+类型强制 - tls.py: 自签名证书生成+SSL上下文 新增 oss/core/ops/ 模块: - health.py: 增强版/health端点(CPU/内存/磁盘/运行时间) - metrics.py: Prometheus兼容/metrics端点 对接改造: - engine.py: 导出新模块 - manager.py: 注册/api/login /health /metrics路由 - middleware.py: CSRF+InputValidation中间件 - config.py: JWT_SECRET/CSRF_SECRET等配置项 - security.py→security/__init__.py: 合并插件沙箱与HTTP安全
96 lines
3.3 KiB
Python
96 lines
3.3 KiB
Python
"""HTTPS 支持 — 自签名证书生成 + TLS 上下文加载"""
|
|
import os
|
|
import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from cryptography import x509
|
|
from cryptography.x509.oid import NameOID
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
from oss.config import get_config
|
|
from oss.logger.logger import Log
|
|
|
|
|
|
class TLSManager:
|
|
"""TLS 证书管理"""
|
|
|
|
@staticmethod
|
|
def ensure_cert(cert_dir: str = None) -> tuple[str, str]:
|
|
"""确保证书存在,不存在则生成自签名证书
|
|
|
|
Returns:
|
|
(cert_path, key_path)
|
|
"""
|
|
config = get_config()
|
|
cert_dir = cert_dir or config.get("TLS_CERT_DIR", "./data/tls")
|
|
cert_path = Path(cert_dir) / "server.crt"
|
|
key_path = Path(cert_dir) / "server.key"
|
|
|
|
if cert_path.exists() and key_path.exists():
|
|
return str(cert_path), str(key_path)
|
|
|
|
Log.info("TLS", "生成自签名证书...")
|
|
cert_dir_path = Path(cert_dir)
|
|
cert_dir_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
TLSManager._generate_self_signed(cert_path, key_path)
|
|
Log.ok("TLS", f"自签名证书已生成: {cert_path}")
|
|
return str(cert_path), str(key_path)
|
|
|
|
@staticmethod
|
|
def _generate_self_signed(cert_path: Path, key_path: Path):
|
|
"""生成自签名证书"""
|
|
key = rsa.generate_private_key(
|
|
public_exponent=65537, key_size=2048, backend=default_backend()
|
|
)
|
|
key_path.write_bytes(key.private_bytes(
|
|
serialization.Encoding.PEM,
|
|
serialization.PrivateFormat.TraditionalOpenSSL,
|
|
serialization.NoEncryption(),
|
|
))
|
|
|
|
subject = issuer = x509.Name([
|
|
x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
|
|
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "NebulaShell"),
|
|
x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
|
|
])
|
|
|
|
now = datetime.datetime.now(datetime.timezone.utc)
|
|
cert = (
|
|
x509.CertificateBuilder()
|
|
.subject_name(subject)
|
|
.issuer_name(issuer)
|
|
.public_key(key.public_key())
|
|
.serial_number(x509.random_serial_number())
|
|
.not_valid_before(now)
|
|
.not_valid_after(now + datetime.timedelta(days=365))
|
|
.add_extension(
|
|
x509.SubjectAlternativeName([
|
|
x509.DNSName("localhost"),
|
|
x509.DNSName("127.0.0.1"),
|
|
]),
|
|
critical=False,
|
|
)
|
|
.sign(key, hashes.SHA256(), default_backend())
|
|
)
|
|
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
|
|
|
|
@staticmethod
|
|
def create_ssl_context(cert_path: str = None, key_path: str = None) -> Optional[object]:
|
|
"""创建 SSL 上下文(用于 HTTPS 服务器)"""
|
|
try:
|
|
import ssl
|
|
cert, key = TLSManager.ensure_cert()
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
|
ctx.load_cert_chain(
|
|
cert_path or cert,
|
|
key_path or key,
|
|
)
|
|
return ctx
|
|
except Exception as e:
|
|
Log.error("TLS", f"创建 SSL 上下文失败: {e}")
|
|
return None
|