Code/utils/persistance.py
Stéphan Peccini 6d2e877341
feat(audit): audit qualité complet — 907→0 erreurs ruff + fix multiselect labels
- Correction des 907 erreurs ruff (pathlib, imports, nommage, simplifications, docstrings)
- Fix déduplication labels dans multiselect nœuds d'arrivée (analyse)
- Expansion 1→N label→IDs pour le Sankey (Pays d'opération)
- Ajout CLAUDE.md et document de design de l'audit
- Mise à jour .gitignore (artefacts tests exploratoires)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 11:52:01 +01:00

205 lines
6.6 KiB
Python

import json
import os
from datetime import date
from pathlib import Path
import streamlit as st
from dotenv import load_dotenv
from utils.translations import _
load_dotenv(".env")
def get_session_id() -> str:
"""Recupere l'identifiant de session Streamlit depuis les headers HTTP.
Returns:
str: ID de session ou "anonymous" si non disponible.
"""
return st.context.headers.get("x-session-id", "anonymous")
def update_session_paths():
"""Initialise les chemins de sauvegarde specifiques a la session courante.
Cree le repertoire tmp/sessions/<session_id>/ et definit les variables globales
pour le chemin du fichier de statut de la session.
Note:
Modifie les globales SAVE_STATUT, SAVE_SESSIONS_PATH, SAVE_STATUT_PATH.
"""
global SAVE_STATUT, SAVE_SESSIONS_PATH, SAVE_STATUT_PATH
SAVE_STATUT = os.getenv("SAVE_STATUT", "statut_general.json")
SAVE_SESSIONS_PATH = Path(f"tmp/sessions/{get_session_id()}")
SAVE_SESSIONS_PATH.mkdir(parents=True, exist_ok=True)
SAVE_STATUT_PATH = SAVE_SESSIONS_PATH / SAVE_STATUT
def _maj_champ(fichier, cle: str, contenu: str = "") -> bool:
def serialize(obj):
return obj.isoformat() if isinstance(obj, date) else obj
def inserer_cle_json(structure: dict, cle: str, valeur: any) -> dict:
"""Insère une clé de type 'a.b.c' dans un dictionnaire JSON imbriqué.
Args:
structure: Dictionnaire racine à mettre à jour
cle: Chaîne de clé séparée par des points
valeur: Valeur à insérer
Returns:
Le dictionnaire mis à jour
"""
parties = cle.split(".")
d = structure
for p in parties[:-1]:
d = d.setdefault(p, {})
d[parties[-1]] = valeur
return structure
if fichier.exists():
try:
with fichier.open(encoding="utf-8") as f:
sauvegarde = json.load(f)
sauvegarde = inserer_cle_json(sauvegarde, cle, serialize(contenu))
except Exception as e:
st.error(_("persistance.errors.read_file").format(function="_maj_champ", file=fichier, error=e))
return False
else:
sauvegarde = {}
sauvegarde = inserer_cle_json(sauvegarde, cle, serialize(contenu))
try:
with fichier.open("w", encoding="utf-8") as f:
json.dump(sauvegarde, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
st.error(_("persistance.errors.write_file").format(function="_maj_champ", file=fichier, error=e))
return False
def _get_champ(fichier, cle: str) -> str:
def extraire_valeur_par_cle(structure: dict, cle: str):
"""Extrait une valeur depuis un dictionnaire imbriqué avec une clé au format 'a.b.c'.
Args:
structure: Dictionnaire d'origine
cle: Chaîne représentant la clé imbriquée
Returns:
La valeur trouvée, ou `par_defaut` si elle est introuvable
"""
parties = cle.split(".")
d = structure
for p in parties:
if isinstance(d, dict) and p in d:
d = d[p]
else:
return ""
return d
import json
def charger_json_sain(fichier: str) -> dict:
with fichier.open(encoding="utf-8") as f:
contenu = json.load(f)
if isinstance(contenu, str):
try:
contenu = json.loads(contenu) # On essaie de parser une 2e fois si nécessaire
except json.JSONDecodeError as err:
raise ValueError("Le fichier contient une chaîne JSON invalide.") from err
if not isinstance(contenu, dict):
raise ValueError("Le contenu JSON n'est pas un objet/dictionnaire valide.")
return contenu
if fichier.exists():
try:
sauvegarde = charger_json_sain(fichier)
return extraire_valeur_par_cle(sauvegarde, cle)
except Exception as e:
st.error(_("persistance.errors.read_file").format(function="_get_champ", file=fichier, error=e))
return ""
def _supprime_champ(fichier: Path, cle: str) -> bool:
def supprimer_cle_profonde(d: dict, chemin: str, separateur="."):
cles = chemin.split(separateur)
sous_dict = d
for cle in cles[:-1]:
sous_dict = sous_dict.get(cle, {})
if not isinstance(sous_dict, dict):
return False # Le chemin est invalide
return sous_dict.pop(cles[-1], None) is not None
if fichier.exists():
try:
with fichier.open(encoding="utf-8") as f:
sauvegarde = json.load(f)
except Exception as e:
st.error(_("persistance.errors.read_file").format(function="_maj_champ", file=fichier, error=e))
return False
supprimer_cle_profonde(sauvegarde, cle)
try:
with fichier.open("w", encoding="utf-8") as f:
json.dump(sauvegarde, f, indent=4)
except Exception as e:
st.error(_("persistance.errors.write_file").format(function="_supprime_champ", file=fichier, error=e))
return False
return True
def maj_champ_statut(cle: str, contenu: str = "") -> bool:
"""Met a jour un champ dans le fichier de statut de la session courante.
Args:
cle: Cle hierarchique au format "a.b.c" pour acceder au champ.
contenu: Valeur a enregistrer (sera serialisee si date).
Returns:
bool: True si succes, False sinon.
"""
return _maj_champ(SAVE_STATUT_PATH, cle, contenu)
def get_champ_statut(cle: str) -> str:
"""Recupere un champ depuis le fichier de statut de la session courante.
Args:
cle: Cle hierarchique au format "a.b.c" pour acceder au champ.
Returns:
str: Valeur du champ ou chaine vide si non trouve.
"""
return _get_champ(SAVE_STATUT_PATH, cle)
def supprime_champ_statut(cle: str) -> None:
"""Supprime un champ du fichier de statut de la session courante.
Args:
cle: Cle hierarchique au format "a.b.c" du champ a supprimer.
"""
_supprime_champ(SAVE_STATUT_PATH, cle)
def get_full_structure() -> dict|None:
"""Recupere la structure JSON complete du fichier de statut de la session.
Returns:
dict | None: Structure JSON complete ou None si erreur.
"""
fichier = SAVE_STATUT_PATH
if fichier.exists():
try:
with fichier.open(encoding="utf-8") as f:
return json.load(f)
except Exception as e:
st.error(_("persistance.errors.read_file").format(function="_maj_champ", file=fichier, error=e))
return None
else:
return None