from datetime import datetime from collections import defaultdict, deque import os import sys from datetime import timezone sys.path.append(os.path.dirname(os.path.dirname(__file__))) # À adapter dans ton environnement from config import GITEA_URL, ORGANISATION, DEPOT_FICHES, ENV from utils.gitea import recuperer_date_dernier_commit from IA.make_config import MAKE # MAKE doit être importé depuis un fichier de config def get_mtime(path): try: return datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc) except FileNotFoundError: return None def get_commit_time(path_relative): commits_url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path={path_relative.replace("Fiches", "Documents")}&sha={ENV}" return recuperer_date_dernier_commit(commits_url) def resolve_path_from_where(where_str): parts = where_str.split(".") current = MAKE path_stack = [] for part in parts: if isinstance(current, dict) and part in current: path_stack.append((part, current)) current = current[part] else: return None if not isinstance(current, str): return None for i in range(len(path_stack) - 1, -1, -1): key, context = path_stack[i] if "directory" in context: directory = context["directory"] if "fiches" in where_str: return os.path.join("Fiches", directory, current) else: return os.path.join(directory, current) return None def identifier_type_fiche(path): for type_fiche, data in MAKE["fiches"].items(): if not isinstance(data, dict): continue directory = data.get("directory", "") prefix = data.get("prefix", "") base = os.path.join("Fiches", directory, prefix) if path.startswith(base): return type_fiche, data raise ValueError("Type de fiche non reconnu") def doit_regenerer(fichier, doc_deps, fiche_data=None): mtime_fichier = get_mtime(fichier) if fiche_data: gitea_dep = fiche_data.get("depends_on", {}).get("gitea", {}) if gitea_dep.get("compare") == "file2commit": commit_time = get_commit_time(fichier) if commit_time and mtime_fichier and commit_time > mtime_fichier: return True for _, dep in doc_deps.items(): if isinstance(dep, dict) and "where" in dep: source_path = resolve_path_from_where(dep["where"]) if source_path: if dep["compare"] == "file2file": mtime_source = get_mtime(source_path) elif dep["compare"] == "file2commit": mtime_source = get_commit_time(source_path) else: continue if mtime_source and mtime_fichier and mtime_source > mtime_fichier: return True return False def get_regeneration_plan(fiche_path): def build_dependency_graph_complete(path, graph=None, visited=None): if graph is None: graph = defaultdict(set) if visited is None: visited = set() if path in visited: return graph visited.add(path) try: _, fiche_data = identifier_type_fiche(path) except ValueError: return graph depends = fiche_data.get("depends_on", {}) doc_deps = depends.get("document", {}) for _, dep_info in doc_deps.items(): if isinstance(dep_info, dict) and "where" in dep_info: dep_path = resolve_path_from_where(dep_info["where"]) if dep_path: graph[path].add(dep_path) build_dependency_graph_complete(dep_path, graph, visited) if "file" in fiche_data: for fichier in fiche_data["file"].values(): dir_fiche = fiche_data.get("directory", "") fichier_path = os.path.join("Fiches", dir_fiche, fichier) if fichier_path not in graph: graph[fichier_path] = set() return graph def topological_sort(graph): in_degree = defaultdict(int) for node in graph: for dep in graph[node]: in_degree[dep] += 1 queue = deque([node for node in graph if in_degree[node] == 0]) result = [] while queue: node = queue.popleft() result.append(node) for dep in graph[node]: in_degree[dep] -= 1 if in_degree[dep] == 0: queue.append(dep) all_nodes = set(graph.keys()).union(*graph.values()) for node in all_nodes: if node not in result: result.append(node) return result[::-1] graph = build_dependency_graph_complete(fiche_path) sorted_fiches = topological_sort(graph) if fiche_path not in sorted_fiches: sorted_fiches.append(fiche_path) to_regen = [] regen_flags = {} for fiche in sorted_fiches: print(f"=> {fiche}") try: _, fiche_data = identifier_type_fiche(fiche) except ValueError: fiche_data = None depends = fiche_data.get("depends_on", {}) if fiche_data else {} doc_deps = depends.get("document", {}) if depends else {} doit = doit_regenerer(fiche, doc_deps, fiche_data) if any(regen_flags.get(dep, False) for dep in graph.get(fiche, [])): doit = True regen_flags[fiche] = doit if doit: to_regen.append(fiche) return to_regen plan = get_regeneration_plan("Fiches/Minerai/Fiche minerai antimoine.md") print(plan)