Files
Starlight-apk 5e957096fa
Some checks failed
CI / test (3.10) (push) Has been cancelled
CI / test (3.11) (push) Has been cancelled
CI / test (3.12) (push) Has been cancelled
CI / test (3.13) (push) Has been cancelled
feat: Phase 1 - 安全中间件 + 运维工具箱
新增 oss/core/security/ 模块(852行):
- jwt_auth.py: JWT签发/验证(HMAC-SHA256,零外部依赖)
- csrf.py: CSRF Token生成与校验
- input_validator.py: JSON Schema校验+类型强制
- tls.py: 自签名证书生成+SSL上下文

新增 oss/core/ops/ 模块:
- health.py: 增强版/health端点(CPU/内存/磁盘/运行时间)
- metrics.py: Prometheus兼容/metrics端点

对接改造:
- engine.py: 导出新模块
- manager.py: 注册/api/login /health /metrics路由
- middleware.py: CSRF+InputValidation中间件
- config.py: JWT_SECRET/CSRF_SECRET等配置项
- security.py→security/__init__.py: 合并插件沙箱与HTTP安全
2026-05-17 15:42:40 +08:00

96 lines
3.3 KiB
Python

"""HTTPS 支持 — 自签名证书生成 + TLS 上下文加载"""
import os
import datetime
from pathlib import Path
from typing import Optional
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from oss.config import get_config
from oss.logger.logger import Log
class TLSManager:
"""TLS 证书管理"""
@staticmethod
def ensure_cert(cert_dir: str = None) -> tuple[str, str]:
"""确保证书存在,不存在则生成自签名证书
Returns:
(cert_path, key_path)
"""
config = get_config()
cert_dir = cert_dir or config.get("TLS_CERT_DIR", "./data/tls")
cert_path = Path(cert_dir) / "server.crt"
key_path = Path(cert_dir) / "server.key"
if cert_path.exists() and key_path.exists():
return str(cert_path), str(key_path)
Log.info("TLS", "生成自签名证书...")
cert_dir_path = Path(cert_dir)
cert_dir_path.mkdir(parents=True, exist_ok=True)
TLSManager._generate_self_signed(cert_path, key_path)
Log.ok("TLS", f"自签名证书已生成: {cert_path}")
return str(cert_path), str(key_path)
@staticmethod
def _generate_self_signed(cert_path: Path, key_path: Path):
"""生成自签名证书"""
key = rsa.generate_private_key(
public_exponent=65537, key_size=2048, backend=default_backend()
)
key_path.write_bytes(key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption(),
))
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, "CN"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, "NebulaShell"),
x509.NameAttribute(NameOID.COMMON_NAME, "localhost"),
])
now = datetime.datetime.now(datetime.timezone.utc)
cert = (
x509.CertificateBuilder()
.subject_name(subject)
.issuer_name(issuer)
.public_key(key.public_key())
.serial_number(x509.random_serial_number())
.not_valid_before(now)
.not_valid_after(now + datetime.timedelta(days=365))
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName("localhost"),
x509.DNSName("127.0.0.1"),
]),
critical=False,
)
.sign(key, hashes.SHA256(), default_backend())
)
cert_path.write_bytes(cert.public_bytes(serialization.Encoding.PEM))
@staticmethod
def create_ssl_context(cert_path: str = None, key_path: str = None) -> Optional[object]:
"""创建 SSL 上下文(用于 HTTPS 服务器)"""
try:
import ssl
cert, key = TLSManager.ensure_cert()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ctx.load_cert_chain(
cert_path or cert,
key_path or key,
)
return ctx
except Exception as e:
Log.error("TLS", f"创建 SSL 上下文失败: {e}")
return None