import json import time from pathlib import Path from networkx.drawing.nx_agraph import write_dot import streamlit as st import sys import os current_file = os.path.realpath(__file__) parent_dir = os.path.abspath(os.path.join(os.path.dirname(current_file), '..')) if parent_dir not in sys.path: sys.path.insert(0, parent_dir) from utils.translations import _ BATCH_DIR = Path(__file__).resolve().parent JOBS_DIR = BATCH_DIR / "jobs" STATUS_FILE = BATCH_DIR / "status.json" ANALYSE = " - analyse.md" RAPPORT = " - rapport.md" def charger_status(): if STATUS_FILE.exists(): return json.loads(STATUS_FILE.read_text()) return {} def sauvegarder_status(data): STATUS_FILE.write_text(json.dumps(data, indent=2)) def statut_utilisateur(login): status = charger_status() entry = status.get(login) if not entry: return {"statut": None, "position": None, "telechargement": None, "message": f"{str(_('batch.no_task'))}."} if entry["status"] == "en attente": return {"statut": "en attente", "position": entry.get("position"), "telechargement": None, "message": f"{str(_('batch.in_queue'))} (position {entry.get('position', '?')})."} if entry["status"] == "en cours": if "step" not in st.session_state: st.session_state["step"] = 1 return {"statut": "en cours", "position": 0, "telechargement": None, "message": f"{str(_('batch.in_progress'))} ({str(_('batch.step'))} {st.session_state["step"]}/5)."} if entry["status"] == "terminé": result_file = JOBS_DIR / f"{login}.zip" if result_file.exists(): return {"statut": "terminé", "position": None, "telechargement": result_file.read_bytes(), "message": f"{str(_('batch.complete'))}."} if entry["status"] == "échoué": return {"statut": "échoué", "position": None, "telechargement": None, "message": f"{str(_('batch.failure'))} : {entry.get('error', {str(_('batch.unknown_error'))})}"} def soumettre_batch(login, G): if statut_utilisateur(login)["statut"]: raise RuntimeError("Un batch est déjà en cours.") write_dot(G, JOBS_DIR / f"{login}.dot") status = charger_status() status[login] = {"status": "en attente", "timestamp": time.time()} sauvegarder_status(status) def nettoyage_post_telechargement(login): (JOBS_DIR / f"{login}.dot").unlink(missing_ok=True) (JOBS_DIR / f"{login}.zip").unlink(missing_ok=True) status = charger_status() status.pop(login, None) sauvegarder_status(status)