""", unsafe_allow_html=True) def recuperer_date_dernier_commit_schema(): headers = {"Authorization": f"token " + GITEA_TOKEN} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_CODE}/commits?path={DOT_FILE}&sha={ENV_CODE}" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() commits = response.json() if commits: latest_commit_date = parser.isoparse(commits[0]["commit"]["author"]["date"]) return latest_commit_date else: return None except Exception as e: st.error(f"Erreur lors de la récupération du dernier commit de {DOT_FILE} : {e}") return None def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"): headers = {"Authorization": f"token " + GITEA_TOKEN} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_CODE}/contents/{DOT_FILE}?ref={ENV_CODE}" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() data = response.json() remote_last_modified = recuperer_date_dernier_commit_schema() local_last_modified = local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None if not local_last_modified or remote_last_modified > local_last_modified: dot_text = base64.b64decode(data["content"]).decode("utf-8") with open(fichier_local, "w", encoding="utf-8") as f: f.write(dot_text) return "OK" except Exception as e: st.error(f"Erreur lors du chargement de {DOT_FILE} depuis Gitea : {e}") return None @st.cache_data(ttl=600) def charger_arborescence_fiches(): headers = {"Authorization": f"token {GITEA_TOKEN}"} branche = "dev" if ENV == "dev" else "public" url_base = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/Documents?ref={branche}" try: response = requests.get(url_base, headers=headers) response.raise_for_status() dossiers = response.json() arbo = {} for dossier in sorted(dossiers, key=lambda d: d['name'].lower()): if dossier['type'] == 'dir': dossier_name = dossier['name'] url_dossier = dossier['url'] response_dossier = requests.get(url_dossier, headers=headers) response_dossier.raise_for_status() fichiers = response_dossier.json() fiches = sorted( [ {"nom": f["name"], "download_url": f["download_url"]} for f in fichiers if f["name"].endswith(".md") ], key=lambda x: x['nom'].lower() ) arbo[dossier_name] = fiches return arbo except Exception as e: st.error(f"Erreur lors du chargement des fiches : {e}") return {} def couleur_noeud(n, niveaux, G): niveau = niveaux.get(n, 99) attrs = G.nodes[n] # Niveau 99 : pays géographique avec isg if niveau == 99: isg = int(attrs.get("isg", -1)) if isg >= 60: return "darkred" elif isg >= 31: return "orange" elif isg >= 0: return "darkgreen" else: return "gray" # Niveau 11 ou 12 connecté à un pays géographique if niveau in (11, 12): for succ in G.successors(n): if niveaux.get(succ) == 99: isg = int(G.nodes[succ].get("isg", -1)) if isg >= 60: return "darkred" elif isg >= 31: return "orange" elif isg >= 0: return "darkgreen" else: return "gray" # Logique existante pour IHH / IVC if niveau == 10 and attrs.get("ihh_pays"): ihh = int(attrs["ihh_pays"]) if ihh <= 15: return "darkgreen" elif ihh <= 25: return "orange" else: return "darkred" elif niveau == 2 and attrs.get("ivc"): ivc = int(attrs["ivc"]) if ivc <= 15: return "darkgreen" elif ivc <= 30: return "orange" else: return "darkred" return "lightblue" def recuperer_donnees(graph, noeuds): donnees = [] criticite = {} for noeud in noeuds: try: operation, minerai = noeud.split('_', 1) except ValueError: logging.warning(f"Nom de nœud inattendu : {noeud}") continue if operation == "Traitement": try: fabrications = list(graph.predecessors(minerai)) valeurs = [ int(float(graph.get_edge_data(f, minerai)[0].get('criticite', 0)) * 100) for f in fabrications if graph.get_edge_data(f, minerai) ] if valeurs: criticite[minerai] = round(sum(valeurs) / len(valeurs)) except Exception as e: logging.warning(f"Erreur lors du calcul de criticité pour {noeud} : {e}") criticite[minerai] = 50 for noeud in noeuds: try: operation, minerai = noeud.split('_', 1) ihh_pays = int(graph.nodes[noeud].get('ihh_pays', 0)) ihh_acteurs = int(graph.nodes[noeud].get('ihh_acteurs', 0)) criticite_val = criticite.get(minerai, 50) criticite_cat = 1 if criticite_val <= 33 else (2 if criticite_val <= 66 else 3) donnees.append({ 'categorie': operation, 'nom': minerai, 'ihh_pays': ihh_pays, 'ihh_acteurs': ihh_acteurs, 'criticite_minerai': criticite_val, 'criticite_cat': criticite_cat }) except Exception as e: logging.error(f"Erreur sur le nœud {noeud} : {e}", exc_info=True) return pd.DataFrame(donnees) def afficher_graphique_altair(df): ordre_personnalise = ['Assemblage', 'Fabrication', 'Traitement', 'Extraction'] categories = [cat for cat in ordre_personnalise if cat in df['categorie'].unique()] for cat in categories: st.markdown(f"### {cat}") df_cat = df[df['categorie'] == cat].copy() # Appliquer un jitter contrôlé pour disperser les nœuds proches from collections import Counter coord_pairs = list(zip(df_cat['ihh_pays'].round(1), df_cat['ihh_acteurs'].round(1))) counts = Counter(coord_pairs) offset_x = [] offset_y = {} seen = Counter() for pair in coord_pairs: rank = seen[pair] seen[pair] += 1 if counts[pair] > 1: angle = rank * 1.5 # écart angulaire radius = 0.8 + 0.4 * rank # spiral growing radius offset_x.append(radius * np.cos(angle)) offset_y[pair] = radius * np.sin(angle) else: offset_x.append(0) offset_y[pair] = 0 df_cat['ihh_pays'] += offset_x df_cat['ihh_acteurs'] += [offset_y[p] for p in coord_pairs] # Décalage des étiquettes pour les relier df_cat['ihh_pays_text'] = df_cat['ihh_pays'] + 0.5 df_cat['ihh_acteurs_text'] = df_cat['ihh_acteurs'] + 0.5 base = alt.Chart(df_cat).encode( x=alt.X('ihh_pays:Q', title='IHH Pays (%)'), y=alt.Y('ihh_acteurs:Q', title='IHH Acteurs (%)'), size=alt.Size('criticite_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None), color=alt.Color('criticite_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])) ) points = base.mark_circle(opacity=0.6) lines = alt.Chart(df_cat).mark_rule(strokeWidth=0.5, color='gray').encode( x='ihh_pays:Q', x2='ihh_pays_text:Q', y='ihh_acteurs:Q', y2='ihh_acteurs_text:Q' ) label_chart = alt.Chart(df_cat).mark_text( align='left', dx=3, dy=-3, fontSize=8, font='Arial', angle=335 ).encode( x='ihh_pays_text:Q', y='ihh_acteurs_text:Q', text='nom:N' ) hline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15)) hline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25)) vline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15)) vline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25)) chart = (points + lines + label_chart + hline_15 + hline_25 + vline_15 + vline_25).properties( width=500, height=400, title=f"Concentration et criticité – {cat}" ).interactive() st.altair_chart(chart, use_container_width=True) def lancer_visualisation_ihh_criticite(graph): niveaux = nx.get_node_attributes(graph, "niveau") noeuds = [n for n, v in niveaux.items() if v == "10" and "Reserves" not in n] noeuds.sort() df = recuperer_donnees(graph, noeuds) if df.empty: st.warning("Aucune donnée à visualiser.") else: afficher_graphique_altair(df) def extraire_chemins_depuis(G, source): chemins = [] stack = [(source, [source])] while stack: (node, path) = stack.pop() voisins = list(G.successors(node)) if not voisins: chemins.append(path) else: for voisin in voisins: if voisin not in path: stack.append((voisin, path + [voisin])) return chemins def extraire_chemins_vers(G, target, niveau_demande): chemins = [] reverse_G = G.reverse() niveaux = nx.get_node_attributes(G, "niveau") stack = [(target, [target])] while stack: (node, path) = stack.pop() voisins = list(reverse_G.successors(node)) if not voisins: chemin_inverse = list(reversed(path)) contient_niveau = any( int(niveaux.get(n, -1)) == niveau_demande for n in chemin_inverse ) if contient_niveau: chemins.append(chemin_inverse) else: for voisin in voisins: if voisin not in path: stack.append((voisin, path + [voisin])) return chemins def afficher_sankey( G, niveau_depart, niveau_arrivee, noeuds_depart=None, noeuds_arrivee=None, minerais=None, filtrer_criticite=False, filtrer_ivc=False, filtrer_ihh=False, filtrer_isg=False, logique_filtrage="OU"): niveaux = {} for node, attrs in G.nodes(data=True): niveau_str = attrs.get("niveau") try: if niveau_str: niveaux[node] = int(str(niveau_str).strip('"')) except ValueError: logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}") chemins = [] if noeuds_depart and noeuds_arrivee: for nd in noeuds_depart: for na in noeuds_arrivee: tous_chemins = extraire_chemins_depuis(G, nd) chemins.extend([chemin for chemin in tous_chemins if na in chemin]) elif noeuds_depart: for nd in noeuds_depart: chemins.extend(extraire_chemins_depuis(G, nd)) elif noeuds_arrivee: for na in noeuds_arrivee: chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) else: sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart] for nd in sources_depart: chemins.extend(extraire_chemins_depuis(G, nd)) if minerais: chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)] def extraire_criticite(u, v): data = G.get_edge_data(u, v) if not data: return 0 if isinstance(data, dict) and all(isinstance(k, int) for k in data): return float(data[0].get("criticite", 0)) return float(data.get("criticite", 0)) liens_chemins = set() chemins_filtres = set() niveaux_speciaux = [1001] for chemin in chemins: has_ihh = has_ivc = has_criticite = has_isg_critique = False for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u) niveau_v = niveaux.get(v) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens_chemins.add((u, v)) if filtrer_ihh and ihh_type: ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs" if niveau_u == 10 and int(G.nodes[u].get(ihh_field, 0)) > 25: has_ihh = True if niveau_v == 10 and int(G.nodes[v].get(ihh_field, 0)) > 25: has_ihh = True if filtrer_ivc and niveau_u == 2 and int(G.nodes[u].get("ivc", 0)) > 30: has_ivc = True if filtrer_criticite and niveau_u == 1 and niveau_v == 2 and extraire_criticite(u, v) > 0.66: has_criticite = True for n in (u, v): if niveaux.get(n) == 99 and int(G.nodes[n].get("isg", 0)) >= 60: has_isg_critique = True elif niveaux.get(n) in (11, 12): for succ in G.successors(n): if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60: has_isg_critique = True if logique_filtrage == "ET": keep = True if filtrer_ihh: keep = keep and has_ihh if filtrer_ivc: keep = keep and has_ivc if filtrer_criticite: keep = keep and has_criticite if filtrer_isg: keep = keep and has_isg_critique if keep: chemins_filtres.add(tuple(chemin)) elif logique_filtrage == "OU": if (filtrer_ihh and has_ihh) or (filtrer_ivc and has_ivc) or (filtrer_criticite and has_criticite) or (filtrer_isg and has_isg_critique): chemins_filtres.add(tuple(chemin)) if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]): chemins = list(chemins_filtres) liens_chemins = set() for chemin in chemins: for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u, 999) niveau_v = niveaux.get(v, 999) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens_chemins.add((u, v)) if not liens_chemins: st.warning("Aucun chemin ne correspond aux critères.") return df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"]) df_liens = df_liens.groupby( ["source", "target"]).size().reset_index(name="value") df_liens["criticite"] = df_liens.apply( lambda row: extraire_criticite(row["source"], row["target"]), axis=1) df_liens["value"] = 0.1 # Ne garder que les nœuds effectivement connectés noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"]) sorted_nodes = [n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises] def couleur_criticite(p): if p <= 0.33: return "darkgreen" elif p <= 0.66: return "orange" else: return "darkred" df_liens["color"] = df_liens.apply( lambda row: couleur_criticite(row["criticite"]) if row["criticite"] > 0 else "gray", axis=1 ) all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel()) sorted_nodes = sorted( all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True) node_indices = {name: i for i, name in enumerate(sorted_nodes)} sources = df_liens["source"].map(node_indices).tolist() targets = df_liens["target"].map(node_indices).tolist() values = df_liens["value"].tolist() customdata = [] for n in sorted_nodes: info = [f"{k}: {v}" for k, v in G.nodes[n].items()] niveau = niveaux.get(n, 99) # Ajout d’un ISG hérité si applicable if niveau in (11, 12): for succ in G.successors(n): if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]: isg_val = G.nodes[succ]["isg"] info.append(f"isg (géographique): {isg_val}") break customdata.append("
".join(info)) def edge_info(u, v): data = G.get_edge_data(u, v) if not data: return f"Relation : {u} → {v}" if isinstance(data, dict) and all(isinstance(k, int) for k in data): data = data[0] base = [f"{k}: {v}" for k, v in data.items()] return f"Relation : {u} → {v}
" + "
".join(base) link_customdata = [ edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows() ] fig = go.Figure(go.Sankey( arrangement="snap", node=dict( pad=10, thickness=8, label=sorted_nodes, x=[niveaux.get(n, 99) / 100 for n in sorted_nodes], color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes], customdata=customdata, hovertemplate="%{customdata}" ), link=dict( source=sources, target=targets, value=values, color=df_liens["color"].tolist(), customdata=link_customdata, hovertemplate="%{customdata}" ) )) fig.update_layout( title_text="Hiérarchie filtrée par niveaux et noeuds", paper_bgcolor="white", plot_bgcolor="white" ) st.plotly_chart(fig) if st.session_state.get("logged_in", False): if liens_chemins: G_export = nx.DiGraph() for u, v in liens_chemins: G_export.add_node(u, **G.nodes[u]) G_export.add_node(v, **G.nodes[v]) data = G.get_edge_data(u, v) if isinstance(data, dict) and all(isinstance(k, int) for k in data): G_export.add_edge(u, v, **data[0]) elif isinstance(data, dict): G_export.add_edge(u, v, **data) else: G_export.add_edge(u, v) with tempfile.NamedTemporaryFile(delete=False, suffix=".dot", mode="w", encoding="utf-8") as f: write_dot(G_export, f.name) dot_path = f.name with open(dot_path, encoding="utf-8") as f: st.download_button( label="Télécharger le fichier DOT filtré", data=f.read(), file_name="graphe_filtré.dot", mime="text/plain" ) def creer_graphes(donnees): if not donnees: st.warning("Aucune donnée à afficher.") return try: df = pd.DataFrame(donnees) df['ivc_cat'] = df['ivc'].apply(lambda x: 1 if x <= 15 else (2 if x <= 30 else 3)) # Jitter contrôlé pour dispersion from collections import Counter coord_pairs = list(zip(df['ihh_extraction'].round(1), df['ihh_reserves'].round(1))) counts = Counter(coord_pairs) offset_x, offset_y = [], {} seen = Counter() for pair in coord_pairs: rank = seen[pair] seen[pair] += 1 if counts[pair] > 1: angle = rank * 1.5 radius = 0.8 + 0.4 * rank offset_x.append(radius * np.cos(angle)) offset_y[pair] = radius * np.sin(angle) else: offset_x.append(0) offset_y[pair] = 0 df['ihh_extraction'] += offset_x df['ihh_reserves'] += [offset_y[p] for p in coord_pairs] # Position des étiquettes df['ihh_extraction_text'] = df['ihh_extraction'] + 0.5 df['ihh_reserves_text'] = df['ihh_reserves'] + 0.5 base = alt.Chart(df).encode( x=alt.X('ihh_extraction:Q', title='IHH Extraction (%)'), y=alt.Y('ihh_reserves:Q', title='IHH Réserves (%)'), size=alt.Size('ivc_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None), color=alt.Color('ivc_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])), tooltip=['nom:N', 'ivc:Q', 'ihh_extraction:Q', 'ihh_reserves:Q'] ) points = base.mark_circle(opacity=0.6) lines = alt.Chart(df).mark_rule(strokeWidth=0.5, color='gray').encode( x='ihh_extraction:Q', x2='ihh_extraction_text:Q', y='ihh_reserves:Q', y2='ihh_reserves_text:Q' ) labels = alt.Chart(df).mark_text( align='left', dx=10, dy=-10, fontSize=10, font='Arial', angle=335 ).encode( x='ihh_extraction_text:Q', y='ihh_reserves_text:Q', text='nom:N' ) # Lignes seuils hline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15)) hline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25)) vline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15)) vline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25)) chart = (points + lines + labels + hline_15 + hline_25 + vline_15 + vline_25).properties( width=600, height=500, title="Concentration des ressources critiques vs vulnérabilité IVC" ).interactive() st.altair_chart(chart, use_container_width=True) except Exception as e: st.error(f"Erreur lors de la création du graphique : {e}") def recuperer_donnees_2(graph, noeuds_2): donnees = [] for minerai in noeuds_2: try: missing = [] if not graph.has_node(minerai): missing.append(minerai) if not graph.has_node(f"Extraction_{minerai}"): missing.append(f"Extraction_{minerai}") if not graph.has_node(f"Reserves_{minerai}"): missing.append(f"Reserves_{minerai}") if missing: print(f"⚠️ Nœuds manquants pour {minerai} : {', '.join(missing)} — Ignoré.") continue ivc = int(graph.nodes[minerai].get('ivc', 0)) ihh_extraction_pays = int(graph.nodes[f"Extraction_{minerai}"].get('ihh_pays', 0)) ihh_reserves_pays = int(graph.nodes[f"Reserves_{minerai}"].get('ihh_pays', 0)) donnees.append({ 'nom': minerai, 'ivc': ivc, 'ihh_extraction': ihh_extraction_pays, 'ihh_reserves': ihh_reserves_pays }) except Exception as e: print(f"Erreur avec le nœud {minerai} : {e}") return donnees def lancer_visualisation_ihh_ivc(graph): try: noeuds_niveau_2 = [ n for n, data in graph.nodes(data=True) if data.get("niveau") == "2" and "ivc" in data ] if not noeuds_niveau_2: # print("⚠️ Aucun nœud de niveau 2 avec un IVC détecté.") return data = recuperer_donnees_2(graph, noeuds_niveau_2) creer_graphes(data) except Exception as e: print(f"Erreur lors du traitement du fichier DOT : {e}") def afficher_fiches(): if "fiches_arbo" not in st.session_state: st.session_state["fiches_arbo"] = charger_arborescence_fiches() arbo = st.session_state.get("fiches_arbo", {}) if not arbo: st.warning("Aucune fiche disponible pour le moment.") return dossiers = sorted(arbo.keys(), key=lambda x: x.lower()) dossier_choisi = st.selectbox("📁 Choisissez un dossier", ["-- Sélectionner un dossier --"] + dossiers) if dossier_choisi and dossier_choisi != "-- Sélectionner un dossier --": fiches = arbo.get(dossier_choisi, []) noms_fiches = [f['nom'] for f in fiches] fiche_choisie = st.selectbox("🗂️ Choisissez une fiche", ["-- Sélectionner une fiche --"] + noms_fiches) if fiche_choisie and fiche_choisie != "-- Sélectionner une fiche --": fiche_info = next((f for f in fiches if f["nom"] == fiche_choisie), None) if fiche_info: try: headers = {"Authorization": f"token {GITEA_TOKEN}"} reponse_fiche = requests.get(fiche_info["download_url"], headers=headers) reponse_fiche.raise_for_status() contenu_md = reponse_fiche.content.decode("utf-8") # Nouveau traitement hiérarchique du markdown lignes = contenu_md.split('\n') sections_n1 = [] section_n1_actuelle = {"titre": None, "intro": [], "sections_n2": {}} dans_section_n1 = False section_n2_actuelle = None for ligne in lignes: if re.match(r'^#[^#]', ligne): # Nouveau titre de niveau 1 if section_n1_actuelle["titre"] or section_n1_actuelle["intro"] or section_n1_actuelle["sections_n2"]: sections_n1.append(section_n1_actuelle) section_n1_actuelle = { "titre": ligne.strip('# ').strip(), "intro": [], "sections_n2": {} } section_n2_actuelle = None dans_section_n1 = True elif re.match(r'^##[^#]', ligne): # Nouveau titre de niveau 2 section_n2_actuelle = ligne.strip('# ').strip() section_n1_actuelle["sections_n2"][section_n2_actuelle] = [] elif section_n2_actuelle: section_n1_actuelle["sections_n2"][section_n2_actuelle].append(ligne) elif dans_section_n1: section_n1_actuelle["intro"].append(ligne) # Ajouter la dernière section si présente if section_n1_actuelle["titre"] or section_n1_actuelle["intro"] or section_n1_actuelle["sections_n2"]: sections_n1.append(section_n1_actuelle) # Affichage for bloc in sections_n1: if bloc["titre"]: st.markdown(f"# {bloc['titre']}") if bloc["intro"]: st.markdown("\n".join(bloc["intro"]), unsafe_allow_html=True) for sous_titre, contenu in bloc["sections_n2"].items(): with st.expander(sous_titre): st.markdown("\n".join(contenu), unsafe_allow_html=True) gerer_tickets_fiche(fiche_choisie) except Exception as e: st.error(f"Erreur lors du chargement de la fiche : {e}") def lancer_personnalisation(G): """ Affiche et modifie uniquement les produits finaux personnalisables (ceux ajoutés) et permet d'ajouter de nouveaux produits finaux. Permet aussi d'importer et d'exporter la configuration personnalisée. Retour: G: le graphe modifié """ st.header("Personnalisation des produits finaux") st.markdown(""" --- Dans cette section, vous pouvez ajouter des produits finaux qui ne sont pas présents dans la liste, par exemple des produits que vous concevez vous même. Pour chacun de ces produits, vous allez lui associer les composants qui le constituent, et si cela vous convient, lui associer une opération d'assemblage existante. Les modifications que vous faites ne sont pas stockées dans l'application. Vous pouvez toutefois les enregistrer dans un fichier que vous pourrez recharger ultérieurement. --- """) # --- 1. Ajouter un nouveau produit final st.subheader("Ajouter un nouveau produit final") new_prod = st.text_input("Nom du nouveau produit (unique)", key="new_prod") if new_prod: # Opérations d'assemblage disponibles (niveau 10) ops_dispo = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") == "10" and any( G.has_edge(p, n) and G.nodes[p].get("niveau") == "0" for p in G.predecessors(n) ) ]) sel_new_op = st.selectbox( "Opération d'assemblage (optionnelle)", options=["-- Aucune --"] + ops_dispo, index=0, key="new_op" ) # Composants de niveau 1 niveau1 = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") == "1" ]) sel_comps = st.multiselect( "Composants à lier", options=niveau1, key="new_links" ) if st.button("Créer le produit", key="btn_new"): G.add_node(new_prod, niveau="0", personnalisation="oui", label=new_prod) if sel_new_op != "-- Aucune --": G.add_edge(new_prod, sel_new_op) for comp in sel_comps: G.add_edge(new_prod, comp) st.success( f"{new_prod} ajouté : {len(sel_comps)} composant(s)" + (f", opération {sel_new_op}" if sel_new_op != "-- Aucune --" else "") ) st.markdown("---") # --- 2. Modifier un produit final ajouté st.subheader("Modifier un produit final ajouté") produits0 = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") == "0" and d.get("personnalisation") == "oui" ]) sel_display = st.multiselect( "Sélectionnez un produit final ajouté à modifier", options=produits0, key="prod_sel" ) if sel_display: prod = sel_display[0] # Bouton de suppression if st.button(f"Supprimer le produit {prod}", key=f"del_{prod}"): G.remove_node(prod) st.success(f"Produit « {prod} » supprimé.") st.session_state.pop("prod_sel", None) return G # Opérations d'assemblage disponibles ops_dispo = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") == "10" and any( G.has_edge(p, n) and G.nodes[p].get("niveau") == "0" for p in G.predecessors(n) ) ]) # Opération actuelle curr_ops = [ succ for succ in G.successors(prod) if G.nodes[succ].get("niveau") == "10" ] default_idx = 0 if curr_ops and curr_ops[0] in ops_dispo: default_idx = ops_dispo.index(curr_ops[0]) + 1 sel_op = st.selectbox( f"Opération d'assemblage liée à {prod} (optionnelle)", options=["-- Aucune --"] + ops_dispo, index=default_idx, key=f"op_{prod}" ) # Composants liés niveau1 = sorted([ n for n, d in G.nodes(data=True) if d.get("niveau") == "1" ]) linked = [ succ for succ in G.successors(prod) if G.nodes[succ].get("niveau") == "1" ] nouveaux = st.multiselect( f"Composants liés à {prod}", options=niveau1, default=linked, key=f"links_{prod}" ) # Mise à jour if st.button(f"Mettre à jour {prod}", key=f"btn_{prod}"): # Mettre à jour l'opération for op in curr_ops: if sel_op == "-- Aucune --" or op != sel_op: G.remove_edge(prod, op) if sel_op != "-- Aucune --" and (not curr_ops or sel_op not in curr_ops): G.add_edge(prod, sel_op) # Mettre à jour les composants for comp in set(linked) - set(nouveaux): G.remove_edge(prod, comp) for comp in set(nouveaux) - set(linked): G.add_edge(prod, comp) st.success( f"{prod} mis à jour : {len(nouveaux)} composant(s)" + (f", opération {sel_op}" if sel_op != "-- Aucune --" else "") ) st.markdown("---") # --- 3. Sauvegarder ou restaurer la configuration st.subheader("Sauvegarder ou restaurer la configuration") # Export if st.button("Exporter configuration", key="export_config"): nodes = [n for n, d in G.nodes(data=True) if d.get("personnalisation")=="oui"] edges = [(u, v) for u, v in G.edges() if u in nodes] conf = {"nodes": nodes, "edges": edges} json_str = json.dumps(conf, ensure_ascii=False) st.download_button( label="Télécharger la config (JSON)", data=json_str, file_name="config_personnalisation.json", mime="application/json" ) # Import uploaded = st.file_uploader( "Importer une configuration (JSON) (max 100 Ko)", type=["json"], key="import_config" ) if uploaded: if uploaded.size > 100 * 1024: st.error("Fichier trop volumineux (max 100 Ko).") else: try: conf = json.load(uploaded) for node in conf.get("nodes", []): if not G.has_node(node): G.add_node(node, niveau="0", personnalisation="oui", label=node) for u, v in conf.get("edges", []): if G.has_node(u) and G.has_node(v) and not G.has_edge(u, v): G.add_edge(u, v) st.success("Configuration importée avec succès.") except Exception as e: st.error(f"Erreur d'import: {e}") return G dot_file_path = None if st.session_state.onglet == "Instructions": with open("Instructions.md", "r", encoding="utf-8") as f: markdown_content = f.read() st.markdown(markdown_content) elif st.session_state.onglet == "Fiches": st.markdown("---") st.markdown("**Affichage des fiches**") st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.") st.markdown("---") afficher_fiches() else: # Charger le graphe une seule fois if "G_temp" not in st.session_state: try: if charger_schema_depuis_gitea(DOT_FILE): st.session_state["G_temp"] = read_dot(DOT_FILE) st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy() dot_file_path = True else: dot_file_path = False except Exception as e: st.error(f"Erreur de lecture du fichier DOT : {e}") dot_file_path = False else: dot_file_path = True if dot_file_path: G_temp = st.session_state["G_temp"] G_temp_ivc = st.session_state["G_temp_ivc"] else: st.error("Impossible de charger le graphe pour cet onglet.") if dot_file_path and st.session_state.onglet == "Analyse": try: niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G_temp.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp]) G_temp.remove_nodes_from( [n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) st.markdown("---") st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**") st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche") niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values()) niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart") if niveau_depart_label != "-- Sélectionner un niveau --": niveau_depart = inverse_niveau_labels[niveau_depart_label] niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart] st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite") niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee") if niveau_arrivee_label != "-- Sélectionner un niveau --": niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label] minerais_selection = None if niveau_depart < 2 < niveau_arrivee: # Tous les nœuds de niveau 2 (minerai) minerais_nodes = sorted([ n for n, d in G_temp.nodes(data=True) if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2 ]) minerais_selection = st.multiselect( "Filtrer par minerais (optionnel)", options=minerais_nodes, key="analyse_minerais" ) st.markdown("---") depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart] arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee] st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**") st.markdown("Sélectionner un ou plusieurs items du niveau de départ") noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart") st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée") noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee") st.markdown("---") noeuds_depart = noeuds_depart if noeuds_depart else None noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None st.markdown("**Sélection des filtres pour identifier les vulnérabilités**") filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite") filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc") filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh") ihh_type = None if filtrer_ihh: ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type") filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg") logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage") st.markdown("---") if st.button("Lancer l’analyse", type="primary", key="analyse_lancer"): afficher_sankey( G_temp, niveau_depart=niveau_depart, niveau_arrivee=niveau_arrivee, noeuds_depart=noeuds_depart, noeuds_arrivee=noeuds_arrivee, minerais=minerais_selection, filtrer_criticite=filtrer_criticite, filtrer_ivc=filtrer_ivc, filtrer_ihh=filtrer_ihh, filtrer_isg=filtrer_isg, logique_filtrage=logique_filtrage ) except Exception as e: st.error(f"Erreur de prévisualisation du graphe : {e}") elif dot_file_path and st.session_state.onglet == "Visualisations": st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité** Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. Taille des points = criticité substituabilité du minerai """) if st.button("Lancer", key="btn_ihh_criticite"): try: lancer_visualisation_ihh_criticite(G_temp) except Exception as e: st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}") st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC** Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. Taille des points = criticité concurrentielle du minerai """) if st.button("Lancer", key="btn_ihh_ivc"): try: lancer_visualisation_ihh_ivc(G_temp_ivc) except Exception as e: st.error(f"Erreur dans la visualisation IHH vs IVC : {e}") elif dot_file_path and st.session_state.onglet == "Personnalisation": G_temp = lancer_personnalisation(G_temp) st.markdown("", unsafe_allow_html=True) st.markdown("""""", unsafe_allow_html=True) st.markdown("