import streamlit as st import networkx as nx from utils.translations import _ from utils.widgets import html_expander from utils.graph_utils import ( extraire_chemins_depuis, extraire_chemins_vers ) from batch_ia.batch_utils import soumettre_batch, statut_utilisateur, nettoyage_post_telechargement niveau_labels = { 0: "Produit final", 1: "Composant", 2: "Minerai", 10: "Opération", 11: "Pays d'opération", 12: "Acteur d'opération", 99: "Pays géographique" } inverse_niveau_labels = {v: k for k, v in niveau_labels.items()} def preparer_graphe(G): """Nettoie et prépare le graphe pour l'analyse.""" niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp]) G.remove_nodes_from( [n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) return G, niveaux_temp def selectionner_minerais(G): """Interface pour sélectionner les minerais si nécessaire.""" minerais_selection = None st.markdown(f"## {str(_('pages.ia_nalyse.select_minerals'))}") # Tous les nœuds de niveau 2 (minerai) minerais_nodes = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2 ]) minerais_selection = st.multiselect( str(_("pages.ia_nalyse.filter_by_minerals")), minerais_nodes, key="analyse_minerais" ) return minerais_selection def selectionner_noeuds(G, niveaux_temp, niveau_depart): """Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.""" st.markdown("---") st.markdown(f"## {str(_('pages.ia_nalyse.fine_selection'))}") depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart] noeuds_arrivee = [n for n in G.nodes() if niveaux_temp.get(n) == 99] noeuds_depart = st.multiselect(str(_("pages.ia_nalyse.filter_start_nodes")), sorted(depart_nodes), key="analyse_noeuds_depart") noeuds_depart = noeuds_depart if noeuds_depart else None return noeuds_depart, noeuds_arrivee def extraire_niveaux(G): """Extrait les niveaux des nœuds du graphe""" niveaux = {} for node, attrs in G.nodes(data=True): niveau_str = attrs.get("niveau") if niveau_str: niveaux[node] = int(str(niveau_str).strip('"')) return niveaux def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais): """Extrait les chemins selon les critères spécifiés""" chemins = [] if noeuds_depart and noeuds_arrivee: for nd in noeuds_depart: for na in noeuds_arrivee: tous_chemins = extraire_chemins_depuis(G, nd) chemins.extend([chemin for chemin in tous_chemins if na in chemin]) elif noeuds_depart: for nd in noeuds_depart: chemins.extend(extraire_chemins_depuis(G, nd)) elif noeuds_arrivee: for na in noeuds_arrivee: chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) else: sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart] for nd in sources_depart: chemins.extend(extraire_chemins_depuis(G, nd)) if minerais: chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)] return chemins def exporter_graphe_filtre(G, liens_chemins): """Gère l'export du graphe filtré au format DOT""" if not st.session_state.get("logged_in", False) or not liens_chemins: return G_export = nx.DiGraph() for u, v in liens_chemins: G_export.add_node(u, **G.nodes[u]) G_export.add_node(v, **G.nodes[v]) data = G.get_edge_data(u, v) if isinstance(data, dict) and all(isinstance(k, int) for k in data): G_export.add_edge(u, v, **data[0]) elif isinstance(data, dict): G_export.add_edge(u, v, **data) else: G_export.add_edge(u, v) return(G_export) def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux): """Extrait les liens des chemins en respectant les niveaux""" liens = set() for chemin in chemins: for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u, 999) niveau_v = niveaux.get(v, 999) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens.add((u, v)) return liens def interface_ia_nalyse(G_temp): st.markdown(f"# {str(_('pages.ia_nalyse.title'))}") html_expander(f"{str(_('pages.ia_nalyse.help'))}", content="\n".join(_("pages.ia_nalyse.help_content")), open_by_default=False, details_class="details_introduction") st.markdown("---") resultat = statut_utilisateur(st.session_state.username) st.info(resultat["message"]) if resultat["statut"] is None: # Préparation du graphe G_temp, niveaux_temp = preparer_graphe(G_temp) # Sélection des niveaux niveau_depart = 0 niveau_arrivee = 99 # Sélection fine des noeuds noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart) # Sélection des minerais si nécessaire minerais = selectionner_minerais(G_temp) # Étape 1 : Extraction des niveaux des nœuds niveaux = extraire_niveaux(G_temp) # Étape 2 : Extraction des chemins selon les critères chemins = extraire_chemins_selon_criteres(G_temp, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais) niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012] # Extraction des liens sans filtrage liens_chemins = extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux) if liens_chemins: G_final = exporter_graphe_filtre(G_temp, liens_chemins) if st.button(str(_("pages.ia_nalyse.submit_request"))): soumettre_batch(st.session_state.username, G_final) st.rerun() else: st.info(str(_("pages.ia_nalyse.empty_graph"))) elif resultat["statut"] == "terminé" and resultat["telechargement"]: if not st.session_state.get("telechargement_confirme"): st.download_button(str(_("buttons.download")), resultat["telechargement"], file_name="analyse.zip") if st.button(str(_("pages.ia_nalyse.confirm_download"))): nettoyage_post_telechargement(st.session_state.username) st.session_state["telechargement_confirme"] = True else: st.success("Résultat supprimé. Vous pouvez relancer une nouvelle analyse.") if st.button(str(_("buttons.refresh"))): st.rerun() else: if st.button(str(_("buttons.refresh"))): st.rerun()