Code/app/analyse/interface.py
Stéphan Peccini 6d2e877341
feat(audit): audit qualité complet — 907→0 erreurs ruff + fix multiselect labels
- Correction des 907 erreurs ruff (pathlib, imports, nommage, simplifications, docstrings)
- Fix déduplication labels dans multiselect nœuds d'arrivée (analyse)
- Expansion 1→N label→IDs pour le Sankey (Pays d'opération)
- Ajout CLAUDE.md et document de design de l'audit
- Mise à jour .gitignore (artefacts tests exploratoires)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 11:52:01 +01:00

337 lines
13 KiB
Python

import networkx as nx
import streamlit as st
from utils.persistance import get_champ_statut, maj_champ_statut, supprime_champ_statut
from utils.translations import _
from utils.widgets import html_expander
from .sankey import afficher_sankey
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def preparer_graphe(
G: nx.DiGraph,
) -> tuple[nx.DiGraph, dict[str, int]]:
"""Nettoie et prépare le graphe pour l'analyse.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
Tuple[nx.DiGraph, Dict[str, int]]: Un tuple contenant :
- Le graphe NetworkX proprement configuré
- Un dictionnaire des niveaux associés aux nœuds
"""
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp])
G.remove_nodes_from(
[n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
return G, niveaux_temp
def selectionner_niveaux(
) -> tuple[int|None, int|None]:
"""Interface pour sélectionner les niveaux de départ et d'arrivée.
Returns:
Tuple[int, int]: Un tuple contenant deux nombres si des nœuds ont été sélectionnés,
- None sinon
"""
st.markdown(f"## {str(_('pages.analyse.selection_nodes'))}")
valeur_defaut = str(_("pages.analyse.select_level"))
niveau_choix = [valeur_defaut] + list(niveau_labels.values())
default_index = next((i for i, opt in enumerate(niveau_choix) if get_champ_statut("pages.analyse.select_level.niveau_depart") in opt), 0)
niveau_depart = st.selectbox(str(_("pages.analyse.start_level")), niveau_choix, index=default_index, key="analyse_niveau_depart")
if niveau_depart == valeur_defaut:
return None, None
maj_champ_statut("pages.analyse.select_level.niveau_depart", niveau_depart)
niveau_depart_int = inverse_niveau_labels[niveau_depart]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart_int]
niveau_choix = [valeur_defaut] + niveaux_arrivee_possibles
default_index = next((i for i, opt in enumerate(niveau_choix) if get_champ_statut("pages.analyse.select_level.niveau_arrivee") in opt), 0)
niveau_arrivee = st.selectbox(str(_("pages.analyse.end_level")), niveau_choix, index=default_index, key="analyse_niveau_arrivee")
if niveau_arrivee == valeur_defaut:
return niveau_depart_int, None
maj_champ_statut("pages.analyse.select_level.niveau_arrivee", niveau_arrivee)
niveau_arrivee_int = inverse_niveau_labels[niveau_arrivee]
return niveau_depart_int, niveau_arrivee_int
def selectionner_minerais(G: nx.DiGraph, niveau_depart: int, niveau_arrivee: int) -> list[str] | None:
"""Affiche l'interface de selection des minerais (niveau 2) si pertinent pour l'analyse."""
if not (niveau_depart < 2 < niveau_arrivee):
return None
st.markdown(f"### {str(_('pages.analyse.select_minerals'))}")
minerais_nodes = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") and int(str(d.get("niveau")).strip('\"')) == 2
])
# Initialiser depuis champ_statut si besoin
if "analyse_minerais" not in st.session_state:
anciens = []
i = 0
while True:
m = get_champ_statut(f"pages.analyse.filter_by_minerals.{i}")
if not m:
break
anciens.append(m)
i += 1
st.session_state["analyse_minerais"] = anciens
# Widget multiselect sans default, seulement key
st.multiselect(
str(_("pages.analyse.filter_by_minerals")),
minerais_nodes,
key="analyse_minerais"
)
selection = st.session_state["analyse_minerais"]
# Toujours purger, puis recréer si nécessaire
supprime_champ_statut("pages.analyse.filter_by_minerals")
if selection:
for i, m in enumerate(selection):
maj_champ_statut(f"pages.analyse.filter_by_minerals.{i}", m)
return selection if selection else None
def selectionner_noeuds(
G: nx.DiGraph,
niveaux_temp: dict[str, int],
niveau_depart: int,
niveau_arrivee: int
) -> tuple[list[str]|None, list[str]|None]:
"""Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveaux_temp (Dict[str, int]): Dictionnaire contenant les niveaux associés aux nœuds.
niveau_depart (int): Le niveau de départ sélectionné.
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
Returns:
Optional[Tuple[List[str], List[str]]]: Un tuple contenant les listes des nœuds
- None sinon
"""
st.markdown("---")
st.markdown(f"## {str(_('pages.analyse.fine_selection'))}")
depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_arrivee]
# Créer un mapping ID -> Label pour l'affichage
depart_labels = {n: G.nodes[n].get("label", n) for n in depart_nodes}
arrivee_labels = {n: G.nodes[n].get("label", n) for n in arrivee_nodes}
# Mapping inverse Label -> tous les IDs (un label peut correspondre à N nœuds)
depart_par_label: dict[str, list[str]] = {}
for node_id, label in depart_labels.items():
depart_par_label.setdefault(label, []).append(node_id)
arrivee_par_label: dict[str, list[str]] = {}
for node_id, label in arrivee_labels.items():
arrivee_par_label.setdefault(label, []).append(node_id)
# DEPARTS -------------------------------------
# Persistance par labels (pas par IDs) pour éviter l'explosion des doublons
if "analyse_labels_depart" not in st.session_state:
anciens = []
i = 0
while True:
val = get_champ_statut(f"pages.analyse.filter_start_nodes.{i}")
if not val:
break
anciens.append(val)
i += 1
st.session_state["analyse_labels_depart"] = anciens
selected_labels_depart = st.multiselect(
str(_("pages.analyse.filter_start_nodes")),
sorted(depart_par_label),
default=[lb for lb in st.session_state["analyse_labels_depart"] if lb in depart_par_label],
key="analyse_labels_depart_widget"
)
st.session_state["analyse_labels_depart"] = selected_labels_depart
supprime_champ_statut("pages.analyse.filter_start_nodes")
for i, val in enumerate(selected_labels_depart):
maj_champ_statut(f"pages.analyse.filter_start_nodes.{i}", val)
# Expansion label → tous les IDs pour le Sankey
departs_selection = [nid for lb in selected_labels_depart for nid in depart_par_label[lb]]
# ARRIVEES -------------------------------------
if "analyse_labels_arrivee" not in st.session_state:
anciens = []
i = 0
while True:
val = get_champ_statut(f"pages.analyse.filter_end_nodes.{i}")
if not val:
break
anciens.append(val)
i += 1
st.session_state["analyse_labels_arrivee"] = anciens
selected_labels_arrivee = st.multiselect(
str(_("pages.analyse.filter_end_nodes")),
sorted(arrivee_par_label),
default=[lb for lb in st.session_state["analyse_labels_arrivee"] if lb in arrivee_par_label],
key="analyse_labels_arrivee_widget"
)
st.session_state["analyse_labels_arrivee"] = selected_labels_arrivee
supprime_champ_statut("pages.analyse.filter_end_nodes")
for i, val in enumerate(selected_labels_arrivee):
maj_champ_statut(f"pages.analyse.filter_end_nodes.{i}", val)
# Expansion label → tous les IDs pour le Sankey
arrivees_selection = [nid for lb in selected_labels_arrivee for nid in arrivee_par_label[lb]]
return departs_selection or None, arrivees_selection or None
def configurer_filtres_vulnerabilite() -> tuple[bool, bool, bool, str, bool, str]:
"""Interface pour configurer les filtres de vulnérabilité.
Returns:
Tuple[bool, bool, bool, str, bool, str]: Un tuple contenant :
- La possibilité de filtrer les ICS
- La possibilité de filtrer les IV C
- La possibilité de filtrer les IH H
- Le type d'application pour les IH H (pays ou acteur)
- La possibilité de filtrer les IS G
- La logique de filtrage (ou ou and)
"""
st.markdown("---")
st.markdown(f"## {str(_('pages.analyse.vulnerability_filters'))}")
def init_checkbox(key, champ):
if key not in st.session_state:
val = get_champ_statut(champ)
st.session_state[key] = val.lower() == "true"
def init_radio(key, champ, options, default):
if key not in st.session_state:
val = get_champ_statut(champ)
st.session_state[key] = val if val in options else default
# Initialiser les valeurs si F5
init_checkbox("analyse_filtrer_ics", "pages.analyse.filter_ics")
init_checkbox("analyse_filtrer_ivc", "pages.analyse.filter_ivc")
init_checkbox("analyse_filtrer_ihh", "pages.analyse.filter_ihh")
init_checkbox("analyse_filtrer_isg", "pages.analyse.filter_isg")
init_radio("analyse_ihh_type", "pages.analyse.apply_ihh_filter", ["Pays", "Acteurs"], "Pays")
init_radio("analyse_logique_filtrage", "pages.analyse.filter_logic", ["OU", "ET"], "OU")
filtrer_ics = st.checkbox(str(_("pages.analyse.filter_ics")), key="analyse_filtrer_ics")
filtrer_ivc = st.checkbox(str(_("pages.analyse.filter_ivc")), key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox(str(_("pages.analyse.filter_ihh")), key="analyse_filtrer_ihh")
ihh_type = "Pays"
if filtrer_ihh:
ihh_type = st.radio(
str(_("pages.analyse.apply_ihh_filter")),
[str(_("pages.analyse.countries")), str(_("pages.analyse.actors"))],
horizontal=True,
key="analyse_ihh_type"
)
filtrer_isg = st.checkbox(str(_("pages.analyse.filter_isg")), key="analyse_filtrer_isg")
logique_options = ["OU", "ET"]
logique_labels = {
"OU": str(_("pages.analyse.or")),
"ET": str(_("pages.analyse.and"))
}
logique_filtrage = st.radio(
str(_("pages.analyse.filter_logic")),
options=logique_options,
format_func=lambda x: logique_labels.get(x, x),
horizontal=True,
key="analyse_logique_filtrage"
)
# Sauvegarde de l'état
maj_champ_statut("pages.analyse.filter_ics", str(filtrer_ics))
maj_champ_statut("pages.analyse.filter_ivc", str(filtrer_ivc))
maj_champ_statut("pages.analyse.filter_ihh", str(filtrer_ihh))
maj_champ_statut("pages.analyse.apply_ihh_filter", ihh_type)
maj_champ_statut("pages.analyse.filter_isg", str(filtrer_isg))
maj_champ_statut("pages.analyse.filter_logic", logique_filtrage)
return filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage
def interface_analyse(
G_temp: nx.DiGraph,
) -> None:
"""Interface utilisateur pour l'analyse des graphes.
Args:
G_temp (nx.DiGraph): Le graphe NetworkX à analyser.
"""
titre = f"# {str(_('pages.analyse.title'))}"
maj_champ_statut("pages.analyse.title", titre)
st.markdown(titre)
html_expander(f"{str(_('pages.analyse.help'))}", content="\n".join(_("pages.analyse.help_content")), open_by_default=False, details_class="details_introduction")
st.markdown("---")
try:
# Préparation du graphe
G_temp, niveaux_temp = preparer_graphe(G_temp)
# Sélection des niveaux
niveau_depart, niveau_arrivee = selectionner_niveaux()
if niveau_depart is None or niveau_arrivee is None:
return
# Sélection des minerais si nécessaire
minerais_selection = selectionner_minerais(G_temp, niveau_depart, niveau_arrivee)
# Sélection fine des noeuds
noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart, niveau_arrivee)
# Configuration des filtres de vulnérabilité
filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage = configurer_filtres_vulnerabilite()
# Lancement de l'analyse
st.markdown("---")
if st.button(str(_("pages.analyse.run_analysis")), type="primary", key="analyse_lancer", icon=":material/graph_4:"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
minerais=minerais_selection,
filtrer_ics=filtrer_ics,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
ihh_type=ihh_type,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
except Exception as e:
st.error(f"{str(_('errors.graph_preview_error'))} {e}")