更改项目名为NebulaShell
This commit is contained in:
100
store/@{NebulaShell}/code-reviewer/checks/quality.py
Normal file
100
store/@{NebulaShell}/code-reviewer/checks/quality.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""质量检查器"""
|
||||
import ast
|
||||
|
||||
|
||||
class QualityChecker:
|
||||
"""质量检查器"""
|
||||
|
||||
def check(self, filepath: str, content: str) -> list:
|
||||
"""执行质量检查"""
|
||||
issues = []
|
||||
|
||||
# 检查函数长度
|
||||
issues.extend(self._check_function_length(filepath, content))
|
||||
|
||||
# 检查参数数量
|
||||
issues.extend(self._check_parameter_count(filepath, content))
|
||||
|
||||
# 检查复杂度
|
||||
issues.extend(self._check_complexity(filepath, content))
|
||||
|
||||
return issues
|
||||
|
||||
def _check_function_length(self, filepath: str, content: str) -> list:
|
||||
"""检查函数长度"""
|
||||
issues = []
|
||||
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
lines = node.end_lineno - node.lineno
|
||||
if lines > 100:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "warning",
|
||||
"type": "long_function",
|
||||
"message": f"函数 {node.name} 过长 ({lines} 行)"
|
||||
})
|
||||
except:
|
||||
pass
|
||||
|
||||
return issues
|
||||
|
||||
def _check_parameter_count(self, filepath: str, content: str) -> list:
|
||||
"""检查参数数量"""
|
||||
issues = []
|
||||
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
args = node.args
|
||||
count = len(args.args)
|
||||
if count > 5:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "info",
|
||||
"type": "too_many_params",
|
||||
"message": f"函数 {node.name} 参数过多 ({count} 个)"
|
||||
})
|
||||
except:
|
||||
pass
|
||||
|
||||
return issues
|
||||
|
||||
def _check_complexity(self, filepath: str, content: str) -> list:
|
||||
"""检查圈复杂度"""
|
||||
issues = []
|
||||
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
complexity = self._calculate_complexity(node)
|
||||
if complexity > 10:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "warning",
|
||||
"type": "high_complexity",
|
||||
"message": f"函数 {node.name} 复杂度过高 (圈复杂度: {complexity})"
|
||||
})
|
||||
except:
|
||||
pass
|
||||
|
||||
return issues
|
||||
|
||||
def _calculate_complexity(self, node: ast.AST) -> int:
|
||||
"""计算圈复杂度"""
|
||||
complexity = 1
|
||||
|
||||
for child in ast.walk(node):
|
||||
if isinstance(child, (ast.If, ast.While, ast.For, ast.Try, ast.With)):
|
||||
complexity += 1
|
||||
elif isinstance(child, ast.BoolOp):
|
||||
complexity += len(child.values) - 1
|
||||
|
||||
return complexity
|
||||
323
store/@{NebulaShell}/code-reviewer/checks/references.py
Normal file
323
store/@{NebulaShell}/code-reviewer/checks/references.py
Normal file
@@ -0,0 +1,323 @@
|
||||
"""引用检查器 - 检测导入错误、变量错误等"""
|
||||
import ast
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class ReferenceChecker:
|
||||
"""引用检查器"""
|
||||
|
||||
# Python 标准库模块列表
|
||||
STD_MODULES = {
|
||||
'os', 'sys', 'json', 're', 'time', 'datetime', 'pathlib',
|
||||
'typing', 'collections', 'functools', 'itertools', 'io',
|
||||
'string', 'math', 'random', 'hashlib', 'hmac', 'secrets',
|
||||
'urllib', 'http', 'email', 'html', 'xml', 'csv', 'configparser',
|
||||
'logging', 'warnings', 'traceback', 'inspect', 'importlib',
|
||||
'threading', 'multiprocessing', 'subprocess', 'socket',
|
||||
'asyncio', 'concurrent', 'queue', 'contextlib', 'abc',
|
||||
'enum', 'dataclasses', 'copy', 'pprint', 'textwrap',
|
||||
'struct', 'codecs', 'locale', 'gettext', 'argparse',
|
||||
'unittest', 'doctest', 'pdb', 'profile', 'timeit',
|
||||
'tempfile', 'glob', 'fnmatch', 'stat', 'fileinput',
|
||||
'shutil', 'pickle', 'shelve', 'sqlite3', 'dbm',
|
||||
'gzip', 'bz2', 'lzma', 'zipfile', 'tarfile',
|
||||
'base64', 'binascii', 'quopri', 'uu',
|
||||
}
|
||||
|
||||
# Python 内置函数和类型(不应报告为未定义)
|
||||
BUILTINS = {
|
||||
'print', 'len', 'str', 'int', 'float', 'bool', 'list', 'dict',
|
||||
'set', 'tuple', 'range', 'enumerate', 'zip', 'map', 'filter',
|
||||
'sorted', 'reversed', 'min', 'max', 'sum', 'abs', 'round',
|
||||
'isinstance', 'issubclass', 'type', 'id', 'hash', 'repr',
|
||||
'True', 'False', 'None', 'Exception', 'ValueError', 'TypeError',
|
||||
'KeyError', 'AttributeError', 'ImportError', 'FileNotFoundError',
|
||||
'IndexError', 'RuntimeError', 'StopIteration', 'GeneratorExit',
|
||||
'staticmethod', 'classmethod', 'property', 'super',
|
||||
'open', 'input', 'format', 'hex', 'oct', 'bin', 'chr', 'ord',
|
||||
'dir', 'vars', 'locals', 'globals', 'callable', 'getattr',
|
||||
'setattr', 'hasattr', 'delattr', 'exec', 'eval', 'compile',
|
||||
'any', 'all', 'slice', 'frozenset', 'bytearray', 'bytes',
|
||||
'memoryview', 'complex', 'divmod', 'pow', 'object',
|
||||
'dict', 'list', 'str', 'int', 'float', 'bool', 'set',
|
||||
'tuple', 'Exception', 'ValueError', 'TypeError', 'KeyError',
|
||||
'self', 'cls', 'args', 'kwargs',
|
||||
}
|
||||
|
||||
def __init__(self, project_root: str = "."):
|
||||
self.project_root = Path(project_root)
|
||||
self._available_modules = set(self.STD_MODULES)
|
||||
self._scan_project_modules()
|
||||
|
||||
def _scan_project_modules(self):
|
||||
"""扫描项目中的可用模块"""
|
||||
# 扫描 oss 目录(框架核心)
|
||||
oss_dir = self.project_root / "oss"
|
||||
if oss_dir.exists():
|
||||
self._available_modules.add("oss")
|
||||
self._scan_module_dir(oss_dir, "oss")
|
||||
|
||||
# 扫描 store 目录下的所有插件
|
||||
store_dir = self.project_root / "store"
|
||||
if store_dir.exists():
|
||||
for author_dir in store_dir.iterdir():
|
||||
if not author_dir.is_dir():
|
||||
continue
|
||||
for plugin_dir in author_dir.iterdir():
|
||||
if not plugin_dir.is_dir():
|
||||
continue
|
||||
plugin_name = plugin_dir.name
|
||||
# 添加插件名作为可用模块
|
||||
self._available_modules.add(plugin_name)
|
||||
# 扫描插件内部的子模块
|
||||
self._scan_plugin_modules(plugin_dir, plugin_name)
|
||||
|
||||
def _scan_module_dir(self, dir_path: Path, base_name: str):
|
||||
"""扫描模块目录"""
|
||||
if dir_path.exists():
|
||||
for item in dir_path.iterdir():
|
||||
if item.is_file() and item.name.endswith(".py") and item.name != "__init__.py":
|
||||
module_name = item.name[:-3]
|
||||
full_name = f"{base_name}.{module_name}"
|
||||
self._available_modules.add(full_name)
|
||||
elif item.is_dir() and (item / "__init__.py").exists():
|
||||
full_name = f"{base_name}.{item.name}"
|
||||
self._available_modules.add(full_name)
|
||||
self._scan_module_dir(item, full_name)
|
||||
|
||||
def _scan_plugin_modules(self, plugin_dir: Path, base_name: str):
|
||||
"""扫描插件内部的子模块"""
|
||||
for item in plugin_dir.iterdir():
|
||||
if item.is_dir() and (item / "__init__.py").exists():
|
||||
full_name = f"{base_name}.{item.name}"
|
||||
self._available_modules.add(full_name)
|
||||
self._scan_module_dir(item, full_name)
|
||||
elif item.is_file() and item.name.endswith(".py") and item.name != "__init__.py":
|
||||
module_name = item.name[:-3]
|
||||
full_name = f"{base_name}.{module_name}"
|
||||
self._available_modules.add(full_name)
|
||||
|
||||
def _add_module_from_dir(self, dir_path: Path, base_name: str):
|
||||
"""从目录添加模块"""
|
||||
if dir_path.exists():
|
||||
for item in dir_path.iterdir():
|
||||
if item.is_file() and item.name.endswith(".py") and item.name != "__init__.py":
|
||||
module_name = item.name[:-3]
|
||||
self._available_modules.add(f"{base_name}.{module_name}")
|
||||
elif item.is_dir() and (item / "__init__.py").exists():
|
||||
self._add_module_from_dir(item, f"{base_name}.{item.name}")
|
||||
|
||||
def check(self, filepath: str, content: str) -> list:
|
||||
"""执行引用检查"""
|
||||
issues = []
|
||||
|
||||
try:
|
||||
tree = ast.parse(content)
|
||||
except SyntaxError as e:
|
||||
return [{
|
||||
"file": filepath,
|
||||
"line": e.lineno or 0,
|
||||
"severity": "critical",
|
||||
"type": "syntax_error",
|
||||
"message": f"语法错误: {e.msg}"
|
||||
}]
|
||||
|
||||
# 检查导入语句(跳过相对导入)
|
||||
issues.extend(self._check_imports(filepath, tree))
|
||||
|
||||
# 检查属性访问错误
|
||||
issues.extend(self._check_attribute_access(filepath, tree, content))
|
||||
|
||||
# 检查函数调用错误
|
||||
issues.extend(self._check_function_calls(filepath, tree, content))
|
||||
|
||||
return issues
|
||||
|
||||
def _check_imports(self, filepath: str, tree: ast.AST) -> list:
|
||||
"""检查导入语句"""
|
||||
issues = []
|
||||
file_path = Path(filepath)
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
# 跳过 oss 框架模块(运行时可用)
|
||||
if alias.name.startswith('oss.') or alias.name == 'oss':
|
||||
continue
|
||||
# 跳过 websockets 等第三方库
|
||||
if alias.name in ('websockets', 'yaml', 'click'):
|
||||
continue
|
||||
if not self._is_module_available(alias.name, file_path):
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "critical",
|
||||
"type": "import_error",
|
||||
"message": f"无法导入模块: {alias.name}"
|
||||
})
|
||||
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
# 跳过相对导入(以 . 开头)
|
||||
if node.level and node.level > 0:
|
||||
continue
|
||||
|
||||
# 跳过 oss 框架模块
|
||||
if node.module and (node.module.startswith('oss.') or node.module == 'oss'):
|
||||
continue
|
||||
|
||||
# 跳过第三方库
|
||||
if node.module and node.module.split('.')[0] in ('websockets', 'yaml', 'click'):
|
||||
continue
|
||||
|
||||
if node.module:
|
||||
if not self._is_module_available(node.module, file_path):
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "critical",
|
||||
"type": "import_error",
|
||||
"message": f"无法导入模块: {node.module}"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _check_variable_references(self, filepath: str, tree: ast.AST, content: str) -> list:
|
||||
"""检查变量引用"""
|
||||
issues = []
|
||||
lines = content.split('\n')
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Name) and isinstance(node.ctx, ast.Load):
|
||||
# 检查是否引用了未定义的变量
|
||||
if not self._is_name_defined(node.id, tree, node.lineno):
|
||||
if node.id not in ('True', 'False', 'None', 'self', 'cls'):
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "warning",
|
||||
"type": "undefined_variable",
|
||||
"message": f"使用了未定义的变量: {node.id}"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _check_attribute_access(self, filepath: str, tree: ast.AST, content: str) -> list:
|
||||
"""检查属性访问"""
|
||||
issues = []
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Attribute):
|
||||
# 检查可能的属性错误
|
||||
if isinstance(node.value, ast.Name):
|
||||
var_name = node.value.id
|
||||
if var_name in ('None', 'True', 'False'):
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "critical",
|
||||
"type": "attribute_error",
|
||||
"message": f"尝试访问 {var_name} 的属性: {node.attr}"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _check_function_calls(self, filepath: str, tree: ast.AST, content: str) -> list:
|
||||
"""检查函数调用"""
|
||||
issues = []
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Call):
|
||||
# 检查调用不存在的方法
|
||||
if isinstance(node.func, ast.Attribute):
|
||||
if isinstance(node.func.value, ast.Constant) and node.func.value.value is None:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": node.lineno,
|
||||
"severity": "critical",
|
||||
"type": "method_call_on_none",
|
||||
"message": f"在 None 上调用方法: {node.func.attr}"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _is_module_available(self, module_name: str, file_path: Path = None) -> bool:
|
||||
"""检查模块是否可用"""
|
||||
# 检查是否在已扫描的模块中
|
||||
if module_name in self._available_modules:
|
||||
return True
|
||||
|
||||
# 检查标准库
|
||||
base_module = module_name.split('.')[0]
|
||||
if base_module in self.STD_MODULES:
|
||||
return True
|
||||
|
||||
# 检查是否是 oss 框架模块
|
||||
if module_name.startswith('oss.') or module_name == 'oss':
|
||||
return True
|
||||
|
||||
# 检查是否是常见第三方库
|
||||
third_party = {'websockets', 'yaml', 'click', 'requests', 'flask', 'django', 'numpy', 'pandas'}
|
||||
if module_name.split('.')[0] in third_party:
|
||||
return True
|
||||
|
||||
# 检查是否是当前文件的同目录模块(相对导入的情况)
|
||||
if file_path:
|
||||
file_dir = file_path.parent
|
||||
# 检查同级 .py 文件
|
||||
sibling_module = file_dir / f"{module_name}.py"
|
||||
if sibling_module.exists():
|
||||
return True
|
||||
# 检查同级包
|
||||
sibling_pkg = file_dir / module_name
|
||||
if sibling_pkg.is_dir() and (sibling_pkg / "__init__.py").exists():
|
||||
return True
|
||||
# 检查 store 目录下的插件
|
||||
store_dir = self.project_root / "store"
|
||||
if store_dir.exists():
|
||||
for author_dir in store_dir.iterdir():
|
||||
if author_dir.is_dir():
|
||||
for plugin_dir in author_dir.iterdir():
|
||||
if plugin_dir.is_dir() and plugin_dir.name == module_name.split('.')[0]:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _is_name_defined(self, name: str, tree: ast.AST, line: int) -> bool:
|
||||
"""检查名称是否已定义"""
|
||||
# 检查是否是内置函数/类型
|
||||
if name in self.BUILTINS:
|
||||
return True
|
||||
|
||||
# 检查是否是函数参数
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.FunctionDef):
|
||||
for arg in node.args.args:
|
||||
if arg.arg == name:
|
||||
return True
|
||||
|
||||
# 检查是否是赋值目标
|
||||
elif isinstance(node, ast.Assign):
|
||||
for target in node.targets:
|
||||
if isinstance(target, ast.Name) and target.id == name:
|
||||
return True
|
||||
|
||||
# 检查是否是循环变量
|
||||
elif isinstance(node, ast.For):
|
||||
if isinstance(node.target, ast.Name) and node.target.id == name:
|
||||
return True
|
||||
|
||||
# 检查是否是导入
|
||||
elif isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
if alias.asname == name or alias.name == name:
|
||||
return True
|
||||
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module:
|
||||
for alias in node.names:
|
||||
if alias.asname == name or alias.name == name:
|
||||
return True
|
||||
|
||||
return False
|
||||
85
store/@{NebulaShell}/code-reviewer/checks/security.py
Normal file
85
store/@{NebulaShell}/code-reviewer/checks/security.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""安全检查器"""
|
||||
|
||||
|
||||
class SecurityChecker:
|
||||
"""安全检查器"""
|
||||
|
||||
def check(self, filepath: str, content: str) -> list:
|
||||
"""执行安全检查"""
|
||||
issues = []
|
||||
|
||||
# 检查硬编码密钥
|
||||
issues.extend(self._check_secrets(filepath, content))
|
||||
|
||||
# 检查危险函数
|
||||
issues.extend(self._check_dangerous_functions(filepath, content))
|
||||
|
||||
# 检查路径穿越
|
||||
issues.extend(self._check_path_traversal(filepath, content))
|
||||
|
||||
return issues
|
||||
|
||||
def _check_secrets(self, filepath: str, content: str) -> list:
|
||||
"""检查硬编码密钥"""
|
||||
issues = []
|
||||
patterns = ['password', 'secret', 'token', 'api_key', 'access_token']
|
||||
|
||||
for i, line in enumerate(content.split('\n'), 1):
|
||||
stripped = line.strip()
|
||||
# 跳过注释和模式定义行
|
||||
if stripped.startswith('#') or stripped.startswith('patterns') or "'" in stripped[:20]:
|
||||
continue
|
||||
|
||||
for pattern in patterns:
|
||||
if pattern + ' = "' in line.lower() or pattern + " = '" in line.lower():
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": i,
|
||||
"severity": "critical",
|
||||
"type": "hardcoded_secret",
|
||||
"message": f"发现硬编码密钥: {line.strip()[:50]}"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _check_dangerous_functions(self, filepath: str, content: str) -> list:
|
||||
"""检查危险函数"""
|
||||
issues = []
|
||||
dangerous = ['eval(', 'exec(', 'os.system(', 'subprocess.call(', 'subprocess.run(']
|
||||
|
||||
# 跳过检查安全检查器自身
|
||||
if 'code-reviewer/checks/security.py' in filepath:
|
||||
return []
|
||||
|
||||
for i, line in enumerate(content.split('\n'), 1):
|
||||
# 跳过注释和模式定义行
|
||||
stripped = line.strip()
|
||||
if stripped.startswith('#') or 'dangerous' in stripped.lower() or "['" in stripped[:30]:
|
||||
continue
|
||||
|
||||
for func in dangerous:
|
||||
if func in line:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": i,
|
||||
"severity": "warning",
|
||||
"type": "dangerous_function",
|
||||
"message": f"使用危险函数: {func.strip()}"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _check_path_traversal(self, filepath: str, content: str) -> list:
|
||||
"""检查路径穿越风险"""
|
||||
issues = []
|
||||
|
||||
if '../' in content and 'open(' in content:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": 0,
|
||||
"severity": "warning",
|
||||
"type": "path_traversal_risk",
|
||||
"message": "可能存在路径穿越漏洞"
|
||||
})
|
||||
|
||||
return issues
|
||||
70
store/@{NebulaShell}/code-reviewer/checks/style.py
Normal file
70
store/@{NebulaShell}/code-reviewer/checks/style.py
Normal file
@@ -0,0 +1,70 @@
|
||||
"""风格检查器"""
|
||||
|
||||
|
||||
class StyleChecker:
|
||||
"""风格检查器"""
|
||||
|
||||
def check(self, filepath: str, content: str) -> list:
|
||||
"""执行风格检查"""
|
||||
issues = []
|
||||
|
||||
# 检查行长度
|
||||
issues.extend(self._check_line_length(filepath, content))
|
||||
|
||||
# 检查空行
|
||||
issues.extend(self._check_blank_lines(filepath, content))
|
||||
|
||||
# 检查文件末尾换行
|
||||
issues.extend(self._check_final_newline(filepath, content))
|
||||
|
||||
return issues
|
||||
|
||||
def _check_line_length(self, filepath: str, content: str) -> list:
|
||||
"""检查行长度"""
|
||||
issues = []
|
||||
|
||||
for i, line in enumerate(content.split('\n'), 1):
|
||||
if len(line) > 120:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": i,
|
||||
"severity": "info",
|
||||
"type": "line_too_long",
|
||||
"message": f"行过长 ({len(line)} 字符)"
|
||||
})
|
||||
|
||||
return issues
|
||||
|
||||
def _check_blank_lines(self, filepath: str, content: str) -> list:
|
||||
"""检查连续空行"""
|
||||
issues = []
|
||||
blank_count = 0
|
||||
|
||||
for i, line in enumerate(content.split('\n'), 1):
|
||||
if line.strip() == '':
|
||||
blank_count += 1
|
||||
if blank_count > 2:
|
||||
issues.append({
|
||||
"file": filepath,
|
||||
"line": i,
|
||||
"severity": "info",
|
||||
"type": "too_many_blanks",
|
||||
"message": "连续空行过多"
|
||||
})
|
||||
else:
|
||||
blank_count = 0
|
||||
|
||||
return issues
|
||||
|
||||
def _check_final_newline(self, filepath: str, content: str) -> list:
|
||||
"""检查文件末尾换行"""
|
||||
if content and not content.endswith('\n'):
|
||||
return [{
|
||||
"file": filepath,
|
||||
"line": len(content.split('\n')),
|
||||
"severity": "info",
|
||||
"type": "missing_final_newline",
|
||||
"message": "文件末尾缺少换行符"
|
||||
}]
|
||||
|
||||
return []
|
||||
Reference in New Issue
Block a user