Code/IA/get_regeneration_plan.py
2025-05-27 17:21:49 +02:00

172 lines
5.6 KiB
Python

from datetime import datetime
from collections import defaultdict, deque
import os
import sys
from datetime import timezone
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
# À adapter dans ton environnement
from config import GITEA_URL, ORGANISATION, DEPOT_FICHES, ENV
from utils.gitea import recuperer_date_dernier_commit
from IA.make_config import MAKE # MAKE doit être importé depuis un fichier de config
def get_mtime(path):
try:
return datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc)
except FileNotFoundError:
return None
def get_commit_time(path_relative):
commits_url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path={path_relative.replace("Fiches", "Documents")}&sha={ENV}"
return recuperer_date_dernier_commit(commits_url)
def resolve_path_from_where(where_str):
parts = where_str.split(".")
current = MAKE
path_stack = []
for part in parts:
if isinstance(current, dict) and part in current:
path_stack.append((part, current))
current = current[part]
else:
return None
if not isinstance(current, str):
return None
for i in range(len(path_stack) - 1, -1, -1):
key, context = path_stack[i]
if "directory" in context:
directory = context["directory"]
if "fiches" in where_str:
return os.path.join("Fiches", directory, current)
else:
return os.path.join(directory, current)
return None
def identifier_type_fiche(path):
for type_fiche, data in MAKE["fiches"].items():
if not isinstance(data, dict):
continue
directory = data.get("directory", "")
prefix = data.get("prefix", "")
base = os.path.join("Fiches", directory, prefix)
if path.startswith(base):
return type_fiche, data
raise ValueError("Type de fiche non reconnu")
def doit_regenerer(fichier, doc_deps, fiche_data=None):
mtime_fichier = get_mtime(fichier)
if fiche_data:
gitea_dep = fiche_data.get("depends_on", {}).get("gitea", {})
if gitea_dep.get("compare") == "file2commit":
commit_time = get_commit_time(fichier)
if commit_time and mtime_fichier and commit_time > mtime_fichier:
return True
for _, dep in doc_deps.items():
if isinstance(dep, dict) and "where" in dep:
source_path = resolve_path_from_where(dep["where"])
if source_path:
if dep["compare"] == "file2file":
mtime_source = get_mtime(source_path)
elif dep["compare"] == "file2commit":
mtime_source = get_commit_time(source_path)
else:
continue
if mtime_source and mtime_fichier and mtime_source > mtime_fichier:
return True
return False
def get_regeneration_plan(fiche_path):
def build_dependency_graph_complete(path, graph=None, visited=None):
if graph is None:
graph = defaultdict(set)
if visited is None:
visited = set()
if path in visited:
return graph
visited.add(path)
try:
_, fiche_data = identifier_type_fiche(path)
except ValueError:
return graph
depends = fiche_data.get("depends_on", {})
doc_deps = depends.get("document", {})
for _, dep_info in doc_deps.items():
if isinstance(dep_info, dict) and "where" in dep_info:
dep_path = resolve_path_from_where(dep_info["where"])
if dep_path:
graph[path].add(dep_path)
build_dependency_graph_complete(dep_path, graph, visited)
if "file" in fiche_data:
for fichier in fiche_data["file"].values():
dir_fiche = fiche_data.get("directory", "")
fichier_path = os.path.join("Fiches", dir_fiche, fichier)
if fichier_path not in graph:
graph[fichier_path] = set()
return graph
def topological_sort(graph):
in_degree = defaultdict(int)
for node in graph:
for dep in graph[node]:
in_degree[dep] += 1
queue = deque([node for node in graph if in_degree[node] == 0])
result = []
while queue:
node = queue.popleft()
result.append(node)
for dep in graph[node]:
in_degree[dep] -= 1
if in_degree[dep] == 0:
queue.append(dep)
all_nodes = set(graph.keys()).union(*graph.values())
for node in all_nodes:
if node not in result:
result.append(node)
return result[::-1]
graph = build_dependency_graph_complete(fiche_path)
sorted_fiches = topological_sort(graph)
if fiche_path not in sorted_fiches:
sorted_fiches.append(fiche_path)
to_regen = []
regen_flags = {}
for fiche in sorted_fiches:
print(f"=> {fiche}")
try:
_, fiche_data = identifier_type_fiche(fiche)
except ValueError:
fiche_data = None
depends = fiche_data.get("depends_on", {}) if fiche_data else {}
doc_deps = depends.get("document", {}) if depends else {}
doit = doit_regenerer(fiche, doc_deps, fiche_data)
if any(regen_flags.get(dep, False) for dep in graph.get(fiche, [])):
doit = True
regen_flags[fiche] = doit
if doit:
to_regen.append(fiche)
return to_regen
plan = get_regeneration_plan("Fiches/Minerai/Fiche minerai antimoine.md")
print(plan)