import streamlit as st from networkx.drawing.nx_agraph import read_dot import pandas as pd import plotly.graph_objects as go import networkx as nx import logging import altair as alt import numpy as np from collections import OrderedDict from dotenv import load_dotenv import os import requests import re from tickets_fiche import gerer_tickets_fiche import base64 from dateutil import parser from datetime import datetime, timezone import copy # Configuration Gitea load_dotenv() GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1") GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") ORGANISATION = os.getenv("ORGANISATION", "fabnum") DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches") ENV = os.getenv("ENV") st.set_page_config( page_title="Fabnum – Analyse de chaîne", page_icon="assets/weakness.png" ) # Intégration du fichier CSS externe with open("assets/styles.css") as f: st.markdown(f"", unsafe_allow_html=True) header ="""

FabNum - Chaîne de fabrication du numérique

""" if ENV == "dev": header+="

🔧 Vous êtes dans l'environnement de développement.

" else: header+="

Parcours de l'écosystème et identification des vulnérabilités.

" header+="""
""" st.markdown(header, unsafe_allow_html=True) def recuperer_date_dernier_commit_schema(): headers = {"Authorization": f"token " + GITEA_TOKEN} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path=schema.txt&sha={ENV}" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() commits = response.json() if commits: latest_commit_date = parser.isoparse(commits[0]["commit"]["author"]["date"]) return latest_commit_date else: return None except Exception as e: st.error(f"Erreur lors de la récupération du dernier commit de schema.txt : {e}") return None def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"): headers = {"Authorization": f"token " + GITEA_TOKEN} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/schema.txt?ref={ENV}" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() remote_last_modified = recuperer_date_dernier_commit_schema() local_last_modified = local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None if not local_last_modified or remote_last_modified > local_last_modified: dot_text = base64.b64decode(data["content"]).decode("utf-8") with open(fichier_local, "w", encoding="utf-8") as f: f.write(dot_text) return "OK" except Exception as e: st.error(f"Erreur lors du chargement de schema.txt depuis Gitea : {e}") return None @st.cache_data(ttl=600) def charger_arborescence_fiches(): headers = {"Authorization": f"token {GITEA_TOKEN}"} branche = "dev" if ENV == "dev" else "public" url_base = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/Documents?ref={branche}" try: response = requests.get(url_base, headers=headers) response.raise_for_status() dossiers = response.json() arbo = {} for dossier in sorted(dossiers, key=lambda d: d['name'].lower()): if dossier['type'] == 'dir': dossier_name = dossier['name'] url_dossier = dossier['url'] response_dossier = requests.get(url_dossier, headers=headers) response_dossier.raise_for_status() fichiers = response_dossier.json() fiches = sorted( [ {"nom": f["name"], "download_url": f["download_url"]} for f in fichiers if f["name"].endswith(".md") ], key=lambda x: x['nom'].lower() ) arbo[dossier_name] = fiches return arbo except Exception as e: st.error(f"Erreur lors du chargement des fiches : {e}") return {} def couleur_noeud(n, niveaux, G): niveau = niveaux.get(n, 99) attrs = G.nodes[n] # Niveau 99 : pays géographique avec isg if niveau == 99: isg = int(attrs.get("isg", -1)) if isg >= 60: return "darkred" elif isg >= 31: return "orange" elif isg >= 0: return "darkgreen" else: return "gray" # Niveau 11 ou 12 connecté à un pays géographique if niveau in (11, 12): for succ in G.successors(n): if niveaux.get(succ) == 99: isg = int(G.nodes[succ].get("isg", -1)) if isg >= 60: return "darkred" elif isg >= 31: return "orange" elif isg >= 0: return "darkgreen" else: return "gray" # Logique existante pour IHH / IVC if niveau == 10 and attrs.get("ihh_pays"): ihh = int(attrs["ihh_pays"]) if ihh <= 15: return "darkgreen" elif ihh <= 25: return "orange" else: return "darkred" elif niveau == 2 and attrs.get("ivc"): ivc = int(attrs["ivc"]) if ivc <= 15: return "darkgreen" elif ivc <= 30: return "orange" else: return "darkred" return "lightblue" def recuperer_donnees(graph, noeuds): donnees = [] criticite = {} for noeud in noeuds: try: operation, minerai = noeud.split('_', 1) except ValueError: logging.warning(f"Nom de nœud inattendu : {noeud}") continue if operation == "Traitement": try: fabrications = list(graph.predecessors(minerai)) valeurs = [ int(float(graph.get_edge_data(f, minerai)[0].get('criticite', 0)) * 100) for f in fabrications if graph.get_edge_data(f, minerai) ] if valeurs: criticite[minerai] = round(sum(valeurs) / len(valeurs)) except Exception as e: logging.warning(f"Erreur lors du calcul de criticité pour {noeud} : {e}") criticite[minerai] = 50 for noeud in noeuds: try: operation, minerai = noeud.split('_', 1) ihh_pays = int(graph.nodes[noeud].get('ihh_pays', 0)) ihh_acteurs = int(graph.nodes[noeud].get('ihh_acteurs', 0)) criticite_val = criticite.get(minerai, 50) criticite_cat = 1 if criticite_val <= 33 else (2 if criticite_val <= 66 else 3) donnees.append({ 'categorie': operation, 'nom': minerai, 'ihh_pays': ihh_pays, 'ihh_acteurs': ihh_acteurs, 'criticite_minerai': criticite_val, 'criticite_cat': criticite_cat }) except Exception as e: logging.error(f"Erreur sur le nœud {noeud} : {e}", exc_info=True) return pd.DataFrame(donnees) def afficher_graphique_altair(df): ordre_personnalise = ['Assemblage', 'Fabrication', 'Traitement', 'Extraction'] categories = [cat for cat in ordre_personnalise if cat in df['categorie'].unique()] for cat in categories: st.markdown(f"### {cat}") df_cat = df[df['categorie'] == cat].copy() # Appliquer un jitter contrôlé pour disperser les nœuds proches from collections import Counter coord_pairs = list(zip(df_cat['ihh_pays'].round(1), df_cat['ihh_acteurs'].round(1))) counts = Counter(coord_pairs) offset_x = [] offset_y = {} seen = Counter() for pair in coord_pairs: rank = seen[pair] seen[pair] += 1 if counts[pair] > 1: angle = rank * 1.5 # écart angulaire radius = 0.8 + 0.4 * rank # spiral growing radius offset_x.append(radius * np.cos(angle)) offset_y[pair] = radius * np.sin(angle) else: offset_x.append(0) offset_y[pair] = 0 df_cat['ihh_pays'] += offset_x df_cat['ihh_acteurs'] += [offset_y[p] for p in coord_pairs] # Décalage des étiquettes pour les relier df_cat['ihh_pays_text'] = df_cat['ihh_pays'] + 0.5 df_cat['ihh_acteurs_text'] = df_cat['ihh_acteurs'] + 0.5 base = alt.Chart(df_cat).encode( x=alt.X('ihh_pays:Q', title='IHH Pays (%)'), y=alt.Y('ihh_acteurs:Q', title='IHH Acteurs (%)'), size=alt.Size('criticite_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None), color=alt.Color('criticite_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])) ) points = base.mark_circle(opacity=0.6) lines = alt.Chart(df_cat).mark_rule(strokeWidth=0.5, color='gray').encode( x='ihh_pays:Q', x2='ihh_pays_text:Q', y='ihh_acteurs:Q', y2='ihh_acteurs_text:Q' ) label_chart = alt.Chart(df_cat).mark_text( align='left', dx=3, dy=-3, fontSize=8, font='Arial', angle=335 ).encode( x='ihh_pays_text:Q', y='ihh_acteurs_text:Q', text='nom:N' ) hline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15)) hline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25)) vline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15)) vline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25)) chart = (points + lines + label_chart + hline_15 + hline_25 + vline_15 + vline_25).properties( width=500, height=400, title=f"Concentration et criticité – {cat}" ).interactive() st.altair_chart(chart, use_container_width=True) def lancer_visualisation_ihh_criticite(graph): niveaux = nx.get_node_attributes(graph, "niveau") noeuds = [n for n, v in niveaux.items() if v == "10" and "Reserves" not in n] noeuds.sort() df = recuperer_donnees(graph, noeuds) if df.empty: st.warning("Aucune donnée à visualiser.") else: afficher_graphique_altair(df) def extraire_chemins_depuis(G, source): chemins = [] stack = [(source, [source])] while stack: (node, path) = stack.pop() voisins = list(G.successors(node)) if not voisins: chemins.append(path) else: for voisin in voisins: if voisin not in path: stack.append((voisin, path + [voisin])) return chemins def extraire_chemins_vers(G, target, niveau_demande): chemins = [] reverse_G = G.reverse() niveaux = nx.get_node_attributes(G, "niveau") stack = [(target, [target])] while stack: (node, path) = stack.pop() voisins = list(reverse_G.successors(node)) if not voisins: chemin_inverse = list(reversed(path)) contient_niveau = any( int(niveaux.get(n, -1)) == niveau_demande for n in chemin_inverse ) if contient_niveau: chemins.append(chemin_inverse) else: for voisin in voisins: if voisin not in path: stack.append((voisin, path + [voisin])) return chemins def afficher_sankey( G, niveau_depart, niveau_arrivee, noeuds_depart=None, noeuds_arrivee=None, filtrer_criticite=False, filtrer_ivc=False, filtrer_ihh=False, filtrer_isg=False, logique_filtrage="OU" ): niveaux = {} for node, attrs in G.nodes(data=True): # Conversion du niveau niveau_str = attrs.get("niveau") try: if niveau_str: niveaux[node] = int(str(niveau_str).strip('"')) except ValueError: logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}") # Suppression des attributs indésirables ATTRIBUTS_SUPPRIMES = {"fillcolor", "fontcolor", "style", "fontsize"} for attr in ATTRIBUTS_SUPPRIMES: attrs.pop(attr, None) # Réordonner : label d'abord if "label" in attrs: reordered = OrderedDict() reordered["label"] = attrs["label"] for k, v in attrs.items(): if k != "label": reordered[k] = v G.nodes[node].clear() G.nodes[node].update(reordered) chemins = [] if noeuds_depart and noeuds_arrivee: for nd in noeuds_depart: for na in noeuds_arrivee: tous_chemins = extraire_chemins_depuis(G, nd) chemins.extend( [chemin for chemin in tous_chemins if na in chemin]) elif noeuds_depart: for nd in noeuds_depart: chemins.extend(extraire_chemins_depuis(G, nd)) elif noeuds_arrivee: for na in noeuds_arrivee: chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) else: sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart] for nd in sources_depart: chemins.extend(extraire_chemins_depuis(G, nd)) def extraire_criticite(u, v): data = G.get_edge_data(u, v) if not data: return 0 if isinstance(data, dict) and all(isinstance(k, int) for k in data): try: return float(data[0].get("criticite", 0)) except: return 0 return float(data.get("criticite", 0)) liens_chemins = set() chemins_filtres = set() for chemin in chemins: has_ihh = False has_ivc = False has_criticite = False has_isg_critique = False for i in range(len(chemin)-1): u, v = chemin[i], chemin[i+1] if niveaux.get(u) is not None and niveaux.get(v) is not None: if niveau_depart <= niveaux.get(u) <= niveau_arrivee and niveau_depart <= niveaux.get(v) <= niveau_arrivee: liens_chemins.add((u, v)) # vérification des conditions critiques if filtrer_ihh: if filtrer_ihh and ihh_type: ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs" if niveaux.get(u) == 10 and G.nodes[u].get(ihh_field) and int(G.nodes[u][ihh_field]) > 25: has_ihh = True elif niveaux.get(v) == 10 and G.nodes[v].get(ihh_field) and int(G.nodes[v][ihh_field]) > 25: has_ihh = True if filtrer_ivc and niveaux.get(u) == 2 and G.nodes[u].get("ivc") and int(G.nodes[u]["ivc"]) > 30: has_ivc = True if filtrer_criticite and niveaux.get(u) == 1 and niveaux.get(v) == 2 and extraire_criticite(u, v) > 0.66: has_criticite = True # Vérifie présence d'un isg >= 60 for n in (u, v): if niveaux.get(n) == 99: isg = int(G.nodes[n].get("isg", 0)) if isg >= 60: has_isg_critique = True elif niveaux.get(n) in (11, 12): for succ in G.successors(n): if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60: has_isg_critique = True if logique_filtrage == "ET": keep = True if filtrer_ihh: keep = keep and has_ihh if filtrer_ivc: keep = keep and has_ivc if filtrer_criticite: keep = keep and has_criticite if filtrer_isg: keep = keep and has_isg_critique if keep: chemins_filtres.add(tuple(chemin)) elif logique_filtrage == "OU": if (filtrer_ihh and has_ihh) or \ (filtrer_ivc and has_ivc) or \ (filtrer_criticite and has_criticite) or \ (filtrer_isg and has_isg_critique): chemins_filtres.add(tuple(chemin)) if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]): chemins = list(chemins_filtres) liens_chemins = set() for chemin in chemins: for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] if niveau_depart <= niveaux.get(u, 999) <= niveau_arrivee and niveau_depart <= niveaux.get(v, 999) <= niveau_arrivee: liens_chemins.add((u, v)) if not liens_chemins: st.warning("Aucun chemin ne correspond aux critères.") return df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"]) df_liens = df_liens.groupby( ["source", "target"]).size().reset_index(name="value") df_liens["criticite"] = df_liens.apply( lambda row: extraire_criticite(row["source"], row["target"]), axis=1) df_liens["value"] = 0.1 # Ne garder que les nœuds effectivement connectés noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"]) sorted_nodes = [n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises] def couleur_criticite(p): if p <= 0.33: return "darkgreen" elif p <= 0.66: return "orange" else: return "darkred" df_liens["color"] = df_liens.apply( lambda row: couleur_criticite(row["criticite"]) if niveaux.get( row["source"]) == 1 and niveaux.get(row["target"]) == 2 else "gray", axis=1 ) all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel()) sorted_nodes = sorted( all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True) node_indices = {name: i for i, name in enumerate(sorted_nodes)} sources = df_liens["source"].map(node_indices).tolist() targets = df_liens["target"].map(node_indices).tolist() values = df_liens["value"].tolist() customdata = [] for n in sorted_nodes: info = [f"{k}: {v}" for k, v in G.nodes[n].items()] niveau = niveaux.get(n, 99) # Ajout d’un ISG hérité si applicable if niveau in (11, 12): for succ in G.successors(n): if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]: isg_val = G.nodes[succ]["isg"] info.append(f"isg (géographique): {isg_val}") break customdata.append("
".join(info)) def edge_info(u, v): data = G.get_edge_data(u, v) if not data: return f"Relation : {u} → {v}" if isinstance(data, dict) and all(isinstance(k, int) for k in data): data = data[0] base = [f"{k}: {v}" for k, v in data.items()] return f"Relation : {u} → {v}
" + "
".join(base) link_customdata = [ edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows() ] fig = go.Figure(go.Sankey( arrangement="snap", node=dict( pad=10, thickness=8, label=sorted_nodes, x=[niveaux.get(n, 99) / 100 for n in sorted_nodes], color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes], customdata=customdata, hovertemplate="%{customdata}" ), link=dict( source=sources, target=targets, value=values, color=df_liens["color"].tolist(), customdata=link_customdata, hovertemplate="%{customdata}" ) )) fig.update_layout( title_text="Hiérarchie filtrée par niveaux et noeuds", paper_bgcolor="white", plot_bgcolor="white" ) st.plotly_chart(fig) def creer_graphes(donnees): if not donnees: st.warning("Aucune donnée à afficher.") return try: df = pd.DataFrame(donnees) df['ivc_cat'] = df['ivc'].apply(lambda x: 1 if x <= 15 else (2 if x <= 30 else 3)) # Jitter contrôlé pour dispersion from collections import Counter coord_pairs = list(zip(df['ihh_extraction'].round(1), df['ihh_reserves'].round(1))) counts = Counter(coord_pairs) offset_x, offset_y = [], {} seen = Counter() for pair in coord_pairs: rank = seen[pair] seen[pair] += 1 if counts[pair] > 1: angle = rank * 1.5 radius = 0.8 + 0.4 * rank offset_x.append(radius * np.cos(angle)) offset_y[pair] = radius * np.sin(angle) else: offset_x.append(0) offset_y[pair] = 0 df['ihh_extraction'] += offset_x df['ihh_reserves'] += [offset_y[p] for p in coord_pairs] # Position des étiquettes df['ihh_extraction_text'] = df['ihh_extraction'] + 0.5 df['ihh_reserves_text'] = df['ihh_reserves'] + 0.5 base = alt.Chart(df).encode( x=alt.X('ihh_extraction:Q', title='IHH Extraction (%)'), y=alt.Y('ihh_reserves:Q', title='IHH Réserves (%)'), size=alt.Size('ivc_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None), color=alt.Color('ivc_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])), tooltip=['nom:N', 'ivc:Q', 'ihh_extraction:Q', 'ihh_reserves:Q'] ) points = base.mark_circle(opacity=0.6) lines = alt.Chart(df).mark_rule(strokeWidth=0.5, color='gray').encode( x='ihh_extraction:Q', x2='ihh_extraction_text:Q', y='ihh_reserves:Q', y2='ihh_reserves_text:Q' ) labels = alt.Chart(df).mark_text( align='left', dx=10, dy=-10, fontSize=10, font='Arial', angle=335 ).encode( x='ihh_extraction_text:Q', y='ihh_reserves_text:Q', text='nom:N' ) # Lignes seuils hline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15)) hline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25)) vline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15)) vline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25)) chart = (points + lines + labels + hline_15 + hline_25 + vline_15 + vline_25).properties( width=600, height=500, title="Concentration des ressources critiques vs vulnérabilité IVC" ).interactive() st.altair_chart(chart, use_container_width=True) except Exception as e: st.error(f"Erreur lors de la création du graphique : {e}") def recuperer_donnees_2(graph, noeuds_2): donnees = [] for minerai in noeuds_2: try: missing = [] if not graph.has_node(minerai): missing.append(minerai) if not graph.has_node(f"Extraction_{minerai}"): missing.append(f"Extraction_{minerai}") if not graph.has_node(f"Reserves_{minerai}"): missing.append(f"Reserves_{minerai}") if missing: print(f"⚠️ Nœuds manquants pour {minerai} : {', '.join(missing)} — Ignoré.") continue ivc = int(graph.nodes[minerai].get('ivc', 0)) ihh_extraction_pays = int(graph.nodes[f"Extraction_{minerai}"].get('ihh_pays', 0)) ihh_reserves_pays = int(graph.nodes[f"Reserves_{minerai}"].get('ihh_pays', 0)) donnees.append({ 'nom': minerai, 'ivc': ivc, 'ihh_extraction': ihh_extraction_pays, 'ihh_reserves': ihh_reserves_pays }) except Exception as e: print(f"Erreur avec le nœud {minerai} : {e}") return donnees def lancer_visualisation_ihh_ivc(graph): try: noeuds_niveau_2 = [ n for n, data in graph.nodes(data=True) if data.get("niveau") == "2" and "ivc" in data ] if not noeuds_niveau_2: # print("⚠️ Aucun nœud de niveau 2 avec un IVC détecté.") return data = recuperer_donnees_2(graph, noeuds_niveau_2) creer_graphes(data) except Exception as e: print(f"Erreur lors du traitement du fichier DOT : {e}") def afficher_fiches(): if "fiches_arbo" not in st.session_state: st.session_state["fiches_arbo"] = charger_arborescence_fiches() arbo = st.session_state.get("fiches_arbo", {}) if not arbo: st.warning("Aucune fiche disponible pour le moment.") return dossiers = sorted(arbo.keys(), key=lambda x: x.lower()) dossier_choisi = st.selectbox("📁 Choisissez un dossier", ["-- Sélectionner un dossier --"] + dossiers) if dossier_choisi and dossier_choisi != "-- Sélectionner un dossier --": fiches = arbo.get(dossier_choisi, []) noms_fiches = [f['nom'] for f in fiches] fiche_choisie = st.selectbox("🗂️ Choisissez une fiche", ["-- Sélectionner une fiche --"] + noms_fiches) if fiche_choisie and fiche_choisie != "-- Sélectionner une fiche --": fiche_info = next((f for f in fiches if f["nom"] == fiche_choisie), None) if fiche_info: try: headers = {"Authorization": f"token {GITEA_TOKEN}"} reponse_fiche = requests.get(fiche_info["download_url"], headers=headers) reponse_fiche.raise_for_status() contenu_md = reponse_fiche.content.decode("utf-8") # Traitement du markdown lignes = contenu_md.split('\n') contenu_niveau_1 = [] sections_niveau_2 = {} section_actuelle = None for ligne in lignes: if re.match(r'^##[^#]', ligne): section_actuelle = ligne.strip('# ').strip() sections_niveau_2[section_actuelle] = [] elif section_actuelle: sections_niveau_2[section_actuelle].append(ligne) else: contenu_niveau_1.append(ligne) if contenu_niveau_1: st.markdown("\n".join(contenu_niveau_1), unsafe_allow_html=True) for titre, contenu in sections_niveau_2.items(): with st.expander(titre): st.markdown("\n".join(contenu), unsafe_allow_html=True) gerer_tickets_fiche(fiche_choisie) except Exception as e: st.error(f"Erreur lors du chargement de la fiche : {e}") def afficher_fiches_old(): import streamlit as st from pathlib import Path base_path = Path("Fiches") if not base_path.exists(): st.warning("Le dossier 'Fiches' est introuvable.") return dossiers = sorted([p for p in base_path.iterdir() if p.is_dir() and 'Criticités' not in p.name]) criticite_path = next((p for p in base_path.iterdir() if p.is_dir() and 'Criticités' in p.name), None) if criticite_path: dossiers.append(criticite_path) noms_dossiers = [d.name for d in dossiers] dossier_choisi = st.selectbox("📁 Dossiers disponibles", noms_dossiers) chemin_dossier = base_path / dossier_choisi fichiers_md = sorted(chemin_dossier.glob("*.md")) noms_fichiers = [] fichiers_dict = {} for f in fichiers_md: try: with f.open(encoding="utf-8") as md: titre = md.readline().strip() if ':' in titre: titre = titre.split(':', 1)[1].strip() else: titre = f.stem fichiers_dict[titre] = f.name noms_fichiers.append(titre) except Exception as e: st.error(f"Erreur lecture fichier {f.name} : {e}") noms_fichiers.sort() fiche_label = st.selectbox("🗂️ Fiches Markdown", noms_fichiers) fichier_choisi = fichiers_dict[fiche_label] chemin_fichier = chemin_dossier / fichier_choisi with chemin_fichier.open(encoding="utf-8") as f: contenu = f.read() st.markdown(contenu, unsafe_allow_html=True) niveau_labels = { 0: "Produit final", 1: "Composant", 2: "Minerai", 10: "Opération", 11: "Pays d'opération", 12: "Acteur d'opération", 99: "Pays géographique" } inverse_niveau_labels = {v: k for k, v in niveau_labels.items()} DOT_FILE = "schema.txt" charger_schema_depuis_gitea(DOT_FILE) # Charger le graphe une seule fois try: dot_file_path = True if "G_temp" not in st.session_state: if charger_schema_depuis_gitea(DOT_FILE): st.session_state["G_temp"] = read_dot(DOT_FILE) st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy() else: dot_file_path = False G_temp = st.session_state["G_temp"] G_temp_ivc = st.session_state["G_temp_ivc"] except: st.error("Erreur de lecture du fichier DOT") dot_file_path = False if dot_file_path: st.markdown("""
""", unsafe_allow_html=True) with st.sidebar: st.markdown("---") st.header("Navigation") st.markdown("---") if "onglet" not in st.session_state: st.session_state.onglet = "Instructions" if st.button("📄 Instructions"): st.session_state.onglet = "Instructions" if st.button("🔍 Analyse"): st.session_state.onglet = "Analyse" if st.button("📊 Visualisations"): st.session_state.onglet = "Visualisations" if st.button("📚 Fiches"): st.session_state.onglet = "Fiches" st.markdown("---") if st.session_state.onglet == "Instructions": with open("Instructions.md", "r", encoding="utf-8") as f: markdown_content = f.read() st.markdown(markdown_content) elif st.session_state.onglet == "Analyse": try: niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G_temp.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp]) G_temp.remove_nodes_from( [n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) st.markdown("---") st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**") st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche") niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values()) niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart") if niveau_depart_label != "-- Sélectionner un niveau --": niveau_depart = inverse_niveau_labels[niveau_depart_label] niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart] st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite") niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee") st.markdown("---") if niveau_arrivee_label != "-- Sélectionner un niveau --": niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label] depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart] arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee] st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**") st.markdown("Sélectionner un ou plusieurs items du niveau de départ") noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart") st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée") noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee") st.markdown("---") noeuds_depart = noeuds_depart if noeuds_depart else None noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None st.markdown("**Sélection des filtres pour identifier les vulnérabilités**") filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite") filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc") filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh") ihh_type = None if filtrer_ihh: ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type") filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg") logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage") st.markdown("---") if st.button("Lancer l’analyse", type="primary", key="analyse_lancer"): afficher_sankey( G_temp, niveau_depart=niveau_depart, niveau_arrivee=niveau_arrivee, noeuds_depart=noeuds_depart, noeuds_arrivee=noeuds_arrivee, filtrer_criticite=filtrer_criticite, filtrer_ivc=filtrer_ivc, filtrer_ihh=filtrer_ihh, filtrer_isg=filtrer_isg, logique_filtrage=logique_filtrage ) except Exception as e: st.error(f"Erreur de prévisualisation du graphe : {e}") elif st.session_state.onglet == "Visualisations": st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité** Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. Taille des points = criticité substituabilité du minerai """) if st.button("Lancer", key="btn_ihh_criticite"): try: lancer_visualisation_ihh_criticite(G_temp) except Exception as e: st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}") st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC** Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. Taille des points = criticité concurrentielle du minerai """) if st.button("Lancer", key="btn_ihh_ivc"): try: lancer_visualisation_ihh_ivc(G_temp_ivc) except Exception as e: st.error(f"Erreur dans la visualisation IHH vs IVC : {e}") elif st.session_state.onglet == "Fiches": st.markdown("---") st.markdown("**Affichage des fiches**") st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.") st.markdown("---") afficher_fiches() st.markdown("
", unsafe_allow_html=True) st.markdown(""" """, unsafe_allow_html=True )