""", unsafe_allow_html=True) def couleur_noeud(n, niveaux, G): niveau = niveaux.get(n, 99) attrs = G.nodes[n] # Niveau 99 : pays géographique avec isg if niveau == 99: isg = int(attrs.get("isg", -1)) return ( "darkred" if isg >= 60 else "orange" if isg >= 31 else "darkgreen" if isg >= 0 else "gray" ) # Niveau 11 ou 12 connecté à un pays géographique if niveau in (11, 12, 1011, 1012): for succ in G.successors(n): if niveaux.get(succ) == 99: isg = int(G.nodes[succ].get("isg", -1)) return ( "darkred" if isg >= 60 else "orange" if isg >= 31 else "darkgreen" if isg >= 0 else "gray" ) # Logique existante pour IHH / IVC if niveau in (10, 1010) and attrs.get("ihh_pays"): ihh = int(attrs["ihh_pays"]) return ( "darkgreen" if ihh <= 15 else "orange" if ihh <= 25 else "darkred" ) elif niveau == 2 and attrs.get("ivc"): ivc = int(attrs["ivc"]) return ( "darkgreen" if ivc <= 15 else "orange" if ivc <= 30 else "darkred" ) return "lightblue" def afficher_sankey( G, niveau_depart, niveau_arrivee, noeuds_depart=None, noeuds_arrivee=None, minerais=None, filtrer_ics=False, filtrer_ivc=False, filtrer_ihh=False, filtrer_isg=False, logique_filtrage="OU"): niveaux = {} for node, attrs in G.nodes(data=True): niveau_str = attrs.get("niveau") try: if niveau_str: niveaux[node] = int(str(niveau_str).strip('"')) except ValueError: logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}") chemins = [] if noeuds_depart and noeuds_arrivee: for nd in noeuds_depart: for na in noeuds_arrivee: tous_chemins = extraire_chemins_depuis(G, nd) chemins.extend([chemin for chemin in tous_chemins if na in chemin]) elif noeuds_depart: for nd in noeuds_depart: chemins.extend(extraire_chemins_depuis(G, nd)) elif noeuds_arrivee: for na in noeuds_arrivee: chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) else: sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart] for nd in sources_depart: chemins.extend(extraire_chemins_depuis(G, nd)) if minerais: chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)] def extraire_criticite(u, v): data = G.get_edge_data(u, v) if not data: return 0 if isinstance(data, dict) and all(isinstance(k, int) for k in data): return float(data[0].get("criticite", 0)) return float(data.get("criticite", 0)) liens_chemins = set() chemins_filtres = set() niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012] for chemin in chemins: has_ihh = has_ivc = has_criticite = has_isg_critique = False for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u) niveau_v = niveaux.get(v) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens_chemins.add((u, v)) if filtrer_ihh and ihh_type: ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs" if niveau_u in (10, 1010) and int(G.nodes[u].get(ihh_field, 0)) > 25: has_ihh = True if niveau_v in (10, 1010) and int(G.nodes[v].get(ihh_field, 0)) > 25: has_ihh = True if filtrer_ivc and niveau_u in (2, 1002) and int(G.nodes[u].get("ivc", 0)) > 30: has_ivc = True if filtrer_ics and ((niveau_u == 1 and niveau_v == 2) or (niveau_u == 1001 and niveau_v == 1002) or (niveau_u == 10 and niveau_v in (1000, 1001))) and extraire_criticite(u, v) > 0.66: has_criticite = True for n in (u, v): if niveaux.get(n) == 99 and int(G.nodes[n].get("isg", 0)) >= 60: has_isg_critique = True elif niveaux.get(n) in (11, 12, 1011, 1012): for succ in G.successors(n): if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60: has_isg_critique = True if logique_filtrage == "ET": keep = True if filtrer_ihh: keep = keep and has_ihh if filtrer_ivc: keep = keep and has_ivc if filtrer_ics: keep = keep and has_criticite if filtrer_isg: keep = keep and has_isg_critique if keep: chemins_filtres.add(tuple(chemin)) elif logique_filtrage == "OU": if (filtrer_ihh and has_ihh) or (filtrer_ivc and has_ivc) or (filtrer_ics and has_criticite) or (filtrer_isg and has_isg_critique): chemins_filtres.add(tuple(chemin)) if any([filtrer_ics, filtrer_ivc, filtrer_ihh, filtrer_isg]): chemins = list(chemins_filtres) liens_chemins = set() for chemin in chemins: for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u, 999) niveau_v = niveaux.get(v, 999) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens_chemins.add((u, v)) if not liens_chemins: st.warning("Aucun chemin ne correspond aux critères.") return df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"]) df_liens = df_liens.groupby( ["source", "target"]).size().reset_index(name="value") df_liens["criticite"] = df_liens.apply( lambda row: extraire_criticite(row["source"], row["target"]), axis=1) df_liens["value"] = 0.1 # Ne garder que les nœuds effectivement connectés niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012] # Inclure les nœuds connectés + tous les nœuds 10xx traversés dans les chemins noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"]) for chemin in chemins: for n in chemin: if niveaux.get(n) in niveaux_speciaux: noeuds_utilises.add(n) sorted_nodes = [ n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises ] def couleur_criticite(p): if p <= 0.33: return "darkgreen" elif p <= 0.66: return "orange" else: return "darkred" df_liens["color"] = df_liens.apply( lambda row: couleur_criticite(row["criticite"]) if row["criticite"] > 0 else "gray", axis=1 ) all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel()) sorted_nodes = sorted( all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True) node_indices = {name: i for i, name in enumerate(sorted_nodes)} sources = df_liens["source"].map(node_indices).tolist() targets = df_liens["target"].map(node_indices).tolist() values = df_liens["value"].tolist() customdata = [] for n in sorted_nodes: info = [f"{k}: {v}" for k, v in G.nodes[n].items()] niveau = niveaux.get(n, 99) # Ajout d’un ISG hérité si applicable if niveau in (11, 12, 1011, 1012): for succ in G.successors(n): if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]: isg_val = G.nodes[succ]["isg"] info.append(f"isg (géographique): {isg_val}") break customdata.append("
".join(info)) def edge_info(u, v): data = G.get_edge_data(u, v) if not data: return f"Relation : {u} → {v}" if isinstance(data, dict) and all(isinstance(k, int) for k in data): data = data[0] base = [f"{k}: {v}" for k, v in data.items()] return f"Relation : {u} → {v}
" + "
".join(base) link_customdata = [ edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows() ] fig = go.Figure(go.Sankey( arrangement="snap", node=dict( pad=10, thickness=8, label=sorted_nodes, x=[niveaux.get(n, 99) / 100 for n in sorted_nodes], color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes], customdata=customdata, hovertemplate="%{customdata}" ), link=dict( source=sources, target=targets, value=values, color=df_liens["color"].tolist(), customdata=link_customdata, hovertemplate="%{customdata}" ) )) fig.update_layout( title_text="Hiérarchie filtrée par niveaux et noeuds", paper_bgcolor="white", plot_bgcolor="white" ) st.plotly_chart(fig) if st.session_state.get("logged_in", False): if liens_chemins: G_export = nx.DiGraph() for u, v in liens_chemins: G_export.add_node(u, **G.nodes[u]) G_export.add_node(v, **G.nodes[v]) data = G.get_edge_data(u, v) if isinstance(data, dict) and all(isinstance(k, int) for k in data): G_export.add_edge(u, v, **data[0]) elif isinstance(data, dict): G_export.add_edge(u, v, **data) else: G_export.add_edge(u, v) with tempfile.NamedTemporaryFile(delete=False, suffix=".dot", mode="w", encoding="utf-8") as f: write_dot(G_export, f.name) dot_path = f.name with open(dot_path, encoding="utf-8") as f: st.download_button( label="Télécharger le fichier DOT filtré", data=f.read(), file_name="graphe_filtré.dot", mime="text/plain" ) dot_file_path = None if st.session_state.onglet == "Instructions": markdown_content = charger_instructions_depuis_gitea(INSTRUCTIONS) if markdown_content: st.markdown(markdown_content) elif st.session_state.onglet == "Fiches": st.markdown("# Affichage des fiches") st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.") st.markdown("---") afficher_fiches() else: # Charger le graphe une seule fois if "G_temp" not in st.session_state: try: if charger_schema_depuis_gitea(DOT_FILE): st.session_state["G_temp"] = read_dot(DOT_FILE) st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy() dot_file_path = True else: dot_file_path = False except Exception as e: st.error(f"Erreur de lecture du fichier DOT : {e}") dot_file_path = False else: dot_file_path = True if dot_file_path: G_temp = st.session_state["G_temp"] G_temp_ivc = st.session_state["G_temp_ivc"] else: st.error("Impossible de charger le graphe pour cet onglet.") if dot_file_path and st.session_state.onglet == "Analyse": try: niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G_temp.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp]) G_temp.remove_nodes_from( [n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) st.markdown("# Analyse") st.markdown("## Sélection des nœuds de départ et d'arrivée") valeur_defaut = "-- Sélectionner un niveau --" niveau_choix = [valeur_defaut] + list(niveau_labels.values()) niveau_depart = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart") if niveau_depart != "-- Sélectionner un niveau --": niveau_depart = inverse_niveau_labels[niveau_depart] niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart] niveaux_arrivee_choix = [valeur_defaut] + niveaux_arrivee_possibles analyse_niveau_arrivee = st.selectbox("Niveau d'arrivée", niveau_choix, key="analyse_niveau_arrivee") if analyse_niveau_arrivee != "-- Sélectionner un niveau --": niveau_arrivee = inverse_niveau_labels[analyse_niveau_arrivee] minerais_selection = None if niveau_depart < 2 < niveau_arrivee: st.markdown("### Sélectionner un ou plusieurs minerais") # Tous les nœuds de niveau 2 (minerai) minerais_nodes = sorted([ n for n, d in G_temp.nodes(data=True) if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2 ]) minerais_selection = st.multiselect( "Filtrer par minerais (optionnel)", minerais_nodes, key="analyse_minerais" ) st.markdown("---") depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart] arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee] st.markdown("## Sélection fine des items") noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart") noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee") st.markdown("---") noeuds_depart = noeuds_depart if noeuds_depart else None noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None st.markdown("## Sélection des filtres pour identifier les vulnérabilités") filtrer_ics = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_ics") filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc") filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh") ihh_type = None if filtrer_ihh: ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type") filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg") logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage") st.markdown("---") if st.button("Lancer l’analyse", type="primary", key="analyse_lancer"): afficher_sankey( G_temp, niveau_depart=niveau_depart, niveau_arrivee=niveau_arrivee, noeuds_depart=noeuds_depart, noeuds_arrivee=noeuds_arrivee, minerais=minerais_selection, filtrer_ics=filtrer_ics, filtrer_ivc=filtrer_ivc, filtrer_ihh=filtrer_ihh, filtrer_isg=filtrer_isg, logique_filtrage=logique_filtrage ) except Exception as e: st.error(f"Erreur de prévisualisation du graphe : {e}") elif dot_file_path and st.session_state.onglet == "Visualisations": st.markdown("# Visualisations") st.markdown("""## Indice de Herfindahl-Hirschmann - IHH vs Criticité Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. Taille des points = criticité substituabilité du minerai """) if st.button("Lancer", key="btn_ihh_criticite"): try: lancer_visualisation_ihh_criticite(G_temp) except Exception as e: st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}") st.markdown("""## Indice de Herfindahl-Hirschmann - IHH vs IVC Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. Taille des points = criticité concurrentielle du minerai """) if st.button("Lancer", key="btn_ihh_ivc"): try: lancer_visualisation_ihh_ivc(G_temp_ivc) except Exception as e: st.error(f"Erreur dans la visualisation IHH vs IVC : {e}") elif dot_file_path and st.session_state.onglet == "Personnalisation": G_temp = lancer_personnalisation(G_temp) st.markdown("", unsafe_allow_html=True) st.markdown("""""", unsafe_allow_html=True) st.markdown("