172 lines
5.6 KiB
Python
172 lines
5.6 KiB
Python
from datetime import datetime
|
|
from collections import defaultdict, deque
|
|
import os
|
|
import sys
|
|
from datetime import timezone
|
|
|
|
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
|
|
|
|
# À adapter dans ton environnement
|
|
from config import GITEA_URL, ORGANISATION, DEPOT_FICHES, ENV
|
|
from utils.gitea import recuperer_date_dernier_commit
|
|
from IA.make_config import MAKE # MAKE doit être importé depuis un fichier de config
|
|
|
|
def get_mtime(path):
|
|
try:
|
|
return datetime.fromtimestamp(os.path.getmtime(path), tz=timezone.utc)
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
def get_commit_time(path_relative):
|
|
commits_url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path={path_relative.replace("Fiches", "Documents")}&sha={ENV}"
|
|
return recuperer_date_dernier_commit(commits_url)
|
|
|
|
def resolve_path_from_where(where_str):
|
|
parts = where_str.split(".")
|
|
current = MAKE
|
|
path_stack = []
|
|
|
|
for part in parts:
|
|
if isinstance(current, dict) and part in current:
|
|
path_stack.append((part, current))
|
|
current = current[part]
|
|
else:
|
|
return None
|
|
|
|
if not isinstance(current, str):
|
|
return None
|
|
|
|
for i in range(len(path_stack) - 1, -1, -1):
|
|
key, context = path_stack[i]
|
|
if "directory" in context:
|
|
directory = context["directory"]
|
|
if "fiches" in where_str:
|
|
return os.path.join("Fiches", directory, current)
|
|
else:
|
|
return os.path.join(directory, current)
|
|
|
|
return None
|
|
|
|
def identifier_type_fiche(path):
|
|
for type_fiche, data in MAKE["fiches"].items():
|
|
if not isinstance(data, dict):
|
|
continue
|
|
directory = data.get("directory", "")
|
|
prefix = data.get("prefix", "")
|
|
base = os.path.join("Fiches", directory, prefix)
|
|
if path.startswith(base):
|
|
return type_fiche, data
|
|
raise ValueError("Type de fiche non reconnu")
|
|
|
|
def doit_regenerer(fichier, doc_deps, fiche_data=None):
|
|
mtime_fichier = get_mtime(fichier)
|
|
|
|
if fiche_data:
|
|
gitea_dep = fiche_data.get("depends_on", {}).get("gitea", {})
|
|
if gitea_dep.get("compare") == "file2commit":
|
|
commit_time = get_commit_time(fichier)
|
|
if commit_time and mtime_fichier and commit_time > mtime_fichier:
|
|
return True
|
|
|
|
for _, dep in doc_deps.items():
|
|
if isinstance(dep, dict) and "where" in dep:
|
|
source_path = resolve_path_from_where(dep["where"])
|
|
if source_path:
|
|
if dep["compare"] == "file2file":
|
|
mtime_source = get_mtime(source_path)
|
|
elif dep["compare"] == "file2commit":
|
|
mtime_source = get_commit_time(source_path)
|
|
else:
|
|
continue
|
|
if mtime_source and mtime_fichier and mtime_source > mtime_fichier:
|
|
return True
|
|
return False
|
|
|
|
def get_regeneration_plan(fiche_path):
|
|
def build_dependency_graph_complete(path, graph=None, visited=None):
|
|
if graph is None:
|
|
graph = defaultdict(set)
|
|
if visited is None:
|
|
visited = set()
|
|
if path in visited:
|
|
return graph
|
|
visited.add(path)
|
|
|
|
try:
|
|
_, fiche_data = identifier_type_fiche(path)
|
|
except ValueError:
|
|
return graph
|
|
|
|
depends = fiche_data.get("depends_on", {})
|
|
doc_deps = depends.get("document", {})
|
|
|
|
for _, dep_info in doc_deps.items():
|
|
if isinstance(dep_info, dict) and "where" in dep_info:
|
|
dep_path = resolve_path_from_where(dep_info["where"])
|
|
if dep_path:
|
|
graph[path].add(dep_path)
|
|
build_dependency_graph_complete(dep_path, graph, visited)
|
|
|
|
if "file" in fiche_data:
|
|
for fichier in fiche_data["file"].values():
|
|
dir_fiche = fiche_data.get("directory", "")
|
|
fichier_path = os.path.join("Fiches", dir_fiche, fichier)
|
|
if fichier_path not in graph:
|
|
graph[fichier_path] = set()
|
|
|
|
return graph
|
|
|
|
def topological_sort(graph):
|
|
in_degree = defaultdict(int)
|
|
for node in graph:
|
|
for dep in graph[node]:
|
|
in_degree[dep] += 1
|
|
queue = deque([node for node in graph if in_degree[node] == 0])
|
|
result = []
|
|
|
|
while queue:
|
|
node = queue.popleft()
|
|
result.append(node)
|
|
for dep in graph[node]:
|
|
in_degree[dep] -= 1
|
|
if in_degree[dep] == 0:
|
|
queue.append(dep)
|
|
|
|
all_nodes = set(graph.keys()).union(*graph.values())
|
|
for node in all_nodes:
|
|
if node not in result:
|
|
result.append(node)
|
|
|
|
return result[::-1]
|
|
|
|
graph = build_dependency_graph_complete(fiche_path)
|
|
sorted_fiches = topological_sort(graph)
|
|
if fiche_path not in sorted_fiches:
|
|
sorted_fiches.append(fiche_path)
|
|
|
|
|
|
to_regen = []
|
|
regen_flags = {}
|
|
|
|
for fiche in sorted_fiches:
|
|
print(f"=> {fiche}")
|
|
try:
|
|
_, fiche_data = identifier_type_fiche(fiche)
|
|
except ValueError:
|
|
fiche_data = None
|
|
depends = fiche_data.get("depends_on", {}) if fiche_data else {}
|
|
doc_deps = depends.get("document", {}) if depends else {}
|
|
|
|
doit = doit_regenerer(fiche, doc_deps, fiche_data)
|
|
if any(regen_flags.get(dep, False) for dep in graph.get(fiche, [])):
|
|
doit = True
|
|
|
|
regen_flags[fiche] = doit
|
|
if doit:
|
|
to_regen.append(fiche)
|
|
|
|
return to_regen
|
|
|
|
plan = get_regeneration_plan("Fiches/Minerai/Fiche minerai antimoine.md")
|
|
print(plan)
|