feat: id自动校验

This commit is contained in:
秋云
2025-06-16 00:58:51 +08:00
parent edd6dc38d5
commit 71b94d8fad
359 changed files with 3776 additions and 1304 deletions

View File

@@ -60,21 +60,15 @@ def get_original_file(file_path):
"""从上游仓库获取原始文件内容,如果失败则尝试从本地获取"""
# 返回值增加一个来源标识: "upstream", "pr_submitted", None
# 首先尝试从上游仓库获取
try:
print(f"尝试从upstream/main获取文件: {file_path}")
result = subprocess.run(['git', 'show', f'upstream/main:{file_path}'],
capture_output=True, text=True, encoding='utf-8')
if result.returncode == 0:
print("从上游仓库成功获取原始文件")
return json.loads(result.stdout), "upstream"
else:
print(f"文件在上游仓库中不存在,可能是新文件")
except Exception as e:
print(f"从上游仓库获取原始文件失败: {str(e)}")
try:
print("尝试使用当前文件作为PR提交文件")
with open(file_path, 'r', encoding='utf-8') as f:
current_data = json.load(f)
# 创建一个副本,避免引用相同的对象
@@ -82,7 +76,6 @@ def get_original_file(file_path):
except Exception as e:
print(f"读取当前文件失败: {str(e)}")
print("无法获取任何形式的原始文件")
return None, None
def load_json_file(file_path):
@@ -318,6 +311,107 @@ def check_bgi_version_compatibility(bgi_version, auto_fix=False):
return bgi_version, corrections
def check_position_ids(positions):
"""检查并修复位置 ID 编编号的连续性
自动修复功能:
1. 缺少 id 字段时,自动按顺序添加
2. id 编号不连续时,自动重新排序
3. id 不是从 1 开始时,自动调整
4. id 值无效(非数字)时,自动修正
"""
corrections = []
validation_issues = []
if not positions:
return validation_issues, corrections
# 检查是否所有位置都有 id 字段,并收集现有 id 值
current_ids = []
missing_ids = []
invalid_ids = []
for idx, pos in enumerate(positions):
if "id" not in pos:
missing_ids.append(idx)
current_ids.append(None)
else:
try:
id_val = int(pos["id"])
current_ids.append(id_val)
except (ValueError, TypeError):
# 如果 id 不是数字,记录为无效
invalid_ids.append(idx)
current_ids.append(None)
# 如果有缺少 id 的位置,记录
if missing_ids:
corrections.append(f"{len(missing_ids)} 个位置自动添加了 id 字段")
# 如果有无效 id记录
if invalid_ids:
corrections.append(f"修正了 {len(invalid_ids)} 个无效的 id 值")
# 生成期望的 id 序列(从 1 开始)
expected_ids = list(range(1, len(positions) + 1))
# 检查当前 id 是否符合期望
needs_reorder = False
# 过滤掉 None 值来检查现有的有效 id
valid_current_ids = [id_val for id_val in current_ids if id_val is not None]
if len(valid_current_ids) != len(positions):
needs_reorder = True
elif valid_current_ids != expected_ids:
needs_reorder = True
else:
# 检查是否有重复的 id
if len(set(valid_current_ids)) != len(valid_current_ids):
needs_reorder = True
duplicates = [id_val for id_val in set(valid_current_ids) if valid_current_ids.count(id_val) > 1]
corrections.append(f"检测到重复的 id: {duplicates}")
# 如果需要重新排序,自动修复
if needs_reorder:
id_issues = []
# 分析具体问题
if missing_ids or invalid_ids:
if missing_ids:
id_issues.append("存在缺少id的位置")
if invalid_ids:
id_issues.append("存在无效id值")
if valid_current_ids:
if min(valid_current_ids) != 1:
id_issues.append("id不是从1开始")
# 检查连续性
sorted_valid_ids = sorted(valid_current_ids)
expected_sorted = list(range(1, len(valid_current_ids) + 1))
if sorted_valid_ids != expected_sorted:
id_issues.append("id编号不连续")
# 重新按顺序分配 id并将 id 字段放在第一个位置
for idx, pos in enumerate(positions):
new_id = idx + 1
# 创建新的有序字典id 放在第一个
new_pos = {"id": new_id}
# 添加其他字段
for key, value in pos.items():
if key != "id":
new_pos[key] = value
# 更新原位置
pos.clear()
pos.update(new_pos)
if id_issues:
corrections.append(f"id编号已重新排序并置于首位 (问题: {', '.join(id_issues)})")
else:
corrections.append("id编号已按顺序重新分配并置于首位")
return validation_issues, corrections
# ==================== 主验证逻辑 ====================
def initialize_data(data, file_path):
@@ -356,9 +450,6 @@ def initialize_data(data, file_path):
data["positions"] = []
messages.append(f"⚠️ 文件缺少 positions 字段,已添加空数组")
for message in messages:
print(message)
return data
def check_actions_compatibility(positions, bgi_version):
@@ -399,7 +490,7 @@ def update_bgi_version_for_compatibility(info, compatibility_issues, auto_fix):
corrections.append(f"bgi_version {info['bgi_version']} 自动更新为 {max_required} 以兼容所有功能")
return [], corrections
except ValueError as e:
print(f"警告: 版本号解析失败 - {e}")
# print(f"警告: 版本号解析失败 - {e}")
info["bgi_version"] = DEFAULT_BGI_VERSION
corrections.append(f"bgi_version 自动更新为 {DEFAULT_BGI_VERSION} (版本解析失败)")
return [], corrections
@@ -439,13 +530,18 @@ def validate_file(file_path, auto_fix=False):
bgi_version, corrections = check_bgi_version_compatibility(info["bgi_version"], auto_fix)
if corrections:
info["bgi_version"] = bgi_version
all_corrections.extend(corrections)
# 检查位置字段 - 修改为接收三个返回值
all_corrections.extend(corrections) # 检查位置字段 - 修改为接收三个返回值
position_issues, notices, pos_corrections = check_position_fields(data["positions"])
if auto_fix and pos_corrections:
all_corrections.extend(pos_corrections)
# 检查位置 ID 编号
if auto_fix:
id_validation_issues, id_corrections = check_position_ids(data["positions"])
if id_corrections:
all_corrections.extend(id_corrections)
position_issues.extend(id_validation_issues)
# 检查 action 兼容性
compatibility_issues, action_validation_issues = check_actions_compatibility(data["positions"], info["bgi_version"])
position_issues.extend(action_validation_issues)
@@ -493,15 +589,6 @@ def validate_file(file_path, auto_fix=False):
# 保存修正
if auto_fix:
# 无论是否有问题,都打印所有自动修正项
if all_corrections:
print("🔧 自动修正:")
for correction in all_corrections:
print(f" - {correction}")
else:
print("✅ 没有需要自动修正的项目")
# 只有在有修正或问题时才保存文件
if all_corrections or position_issues:
if save_json_file(file_path, data):
print("✅ 文件已保存")
@@ -523,7 +610,7 @@ def main():
all_notices = [] # 初始化 all_notices 变量
if os.path.isfile(path) and path.endswith('.json'):
print(f"\n🔍 校验文件: {path}")
# print(f"\n🔍 校验文件: {path}")
notices = validate_file(path, auto_fix)
if notices:
all_notices.extend([f"{path}: {n}" for n in notices]) # 添加到 all_notices