import streamlit as st from utils.translations import _ from utils.widgets import html_expander from .sankey import afficher_sankey niveau_labels = { 0: "Produit final", 1: "Composant", 2: "Minerai", 10: "Opération", 11: "Pays d'opération", 12: "Acteur d'opération", 99: "Pays géographique" } inverse_niveau_labels = {v: k for k, v in niveau_labels.items()} def preparer_graphe(G): """Nettoie et prépare le graphe pour l'analyse.""" niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp]) G.remove_nodes_from( [n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) return G, niveaux_temp def selectionner_niveaux(): """Interface pour sélectionner les niveaux de départ et d'arrivée.""" st.markdown(f"## {str(_('pages.analyse.selection_nodes'))}") valeur_defaut = str(_("pages.analyse.select_level")) niveau_choix = [valeur_defaut] + list(niveau_labels.values()) niveau_depart = st.selectbox(str(_("pages.analyse.start_level")), niveau_choix, key="analyse_niveau_depart") if niveau_depart == valeur_defaut: return None, None niveau_depart_int = inverse_niveau_labels[niveau_depart] niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart_int] niveaux_arrivee_choix = [valeur_defaut] + niveaux_arrivee_possibles analyse_niveau_arrivee = st.selectbox(str(_("pages.analyse.end_level")), niveaux_arrivee_choix, key="analyse_niveau_arrivee") if analyse_niveau_arrivee == valeur_defaut: return niveau_depart_int, None niveau_arrivee_int = inverse_niveau_labels[analyse_niveau_arrivee] return niveau_depart_int, niveau_arrivee_int def selectionner_minerais(G, niveau_depart, niveau_arrivee): """Interface pour sélectionner les minerais si nécessaire.""" minerais_selection = None if niveau_depart < 2 < niveau_arrivee: st.markdown(f"### {str(_('pages.analyse.select_minerals'))}") # Tous les nœuds de niveau 2 (minerai) minerais_nodes = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2 ]) minerais_selection = st.multiselect( str(_("pages.analyse.filter_by_minerals")), minerais_nodes, key="analyse_minerais" ) return minerais_selection def selectionner_noeuds(G, niveaux_temp, niveau_depart, niveau_arrivee): """Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.""" st.markdown("---") st.markdown(f"## {str(_('pages.analyse.fine_selection'))}") depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart] arrivee_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_arrivee] noeuds_depart = st.multiselect(str(_("pages.analyse.filter_start_nodes")), sorted(depart_nodes), key="analyse_noeuds_depart") noeuds_arrivee = st.multiselect(str(_("pages.analyse.filter_end_nodes")), sorted(arrivee_nodes), key="analyse_noeuds_arrivee") noeuds_depart = noeuds_depart if noeuds_depart else None noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None return noeuds_depart, noeuds_arrivee def configurer_filtres_vulnerabilite(): """Interface pour configurer les filtres de vulnérabilité.""" st.markdown("---") st.markdown(f"## {str(_('pages.analyse.vulnerability_filters'))}") filtrer_ics = st.checkbox(str(_("pages.analyse.filter_ics")), key="analyse_filtrer_ics") filtrer_ivc = st.checkbox(str(_("pages.analyse.filter_ivc")), key="analyse_filtrer_ivc") filtrer_ihh = st.checkbox(str(_("pages.analyse.filter_ihh")), key="analyse_filtrer_ihh") ihh_type = "Pays" if filtrer_ihh: ihh_type = st.radio(str(_("pages.analyse.apply_ihh_filter")), [str(_("pages.analyse.countries")), str(_("pages.analyse.actors"))], horizontal=True, key="analyse_ihh_type") filtrer_isg = st.checkbox(str(_("pages.analyse.filter_isg")), key="analyse_filtrer_isg") logique_filtrage = st.radio(str(_("pages.analyse.filter_logic")), [str(_("pages.analyse.or")), str(_("pages.analyse.and"))], horizontal=True, key="analyse_logique_filtrage") return filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage def interface_analyse(G_temp): st.markdown(f"# {str(_('pages.analyse.title'))}") html_expander(f"{str(_('pages.analyse.help'))}", content="\n".join(_("pages.analyse.help_content")), open_by_default=False, details_class="details_introduction") st.markdown("---") try: # Préparation du graphe G_temp, niveaux_temp = preparer_graphe(G_temp) # Sélection des niveaux niveau_depart, niveau_arrivee = selectionner_niveaux() if niveau_depart is None or niveau_arrivee is None: return # Sélection des minerais si nécessaire minerais_selection = selectionner_minerais(G_temp, niveau_depart, niveau_arrivee) # Sélection fine des noeuds noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart, niveau_arrivee) # Configuration des filtres de vulnérabilité filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage = configurer_filtres_vulnerabilite() # Lancement de l'analyse st.markdown("---") if st.button(str(_("pages.analyse.run_analysis")), type="primary", key="analyse_lancer"): afficher_sankey( G_temp, niveau_depart=niveau_depart, niveau_arrivee=niveau_arrivee, noeuds_depart=noeuds_depart, noeuds_arrivee=noeuds_arrivee, minerais=minerais_selection, filtrer_ics=filtrer_ics, filtrer_ivc=filtrer_ivc, filtrer_ihh=filtrer_ihh, ihh_type=ihh_type, filtrer_isg=filtrer_isg, logique_filtrage=logique_filtrage ) except Exception as e: st.error(f"{str(_('errors.graph_preview_error'))} {e}")