#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script pour générer un rapport factorisé des vulnérabilités critiques suivant la structure définie dans Remarques.md. """ import streamlit as st import networkx as nx import uuid import re from utils.translations import _ from utils.widgets import html_expander from networkx.drawing.nx_agraph import write_dot from batch_ia import ( load_config, write_report, parse_graphs, extract_data_from_graph, calculate_vulnerabilities, generate_report, ) from .plan_d_action import initialiser_interface from utils.graph_utils import ( extraire_chemins_depuis, extraire_chemins_vers ) from pathlib import Path # Répertoire courant du script CURRENT_DIR = Path(__file__).resolve().parent # Répertoire "jobs" dans app/plan_d_action JOBS = CURRENT_DIR / "jobs" JOBS.mkdir(exist_ok=True) niveau_labels = { 0: "Produit final", 1: "Composant", 2: "Minerai", 10: "Opération", 11: "Pays d'opération", 12: "Acteur d'opération", 99: "Pays géographique" } inverse_niveau_labels = {v: k for k, v in niveau_labels.items()} def preparer_graphe(G): """Nettoie et prépare le graphe pour l'analyse.""" niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp]) G.remove_nodes_from( [n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) return G, niveaux_temp def selectionner_minerais(G, noeuds_depart): """Interface pour sélectionner les minerais si nécessaire.""" minerais_selection = None st.markdown(f"## {str(_('pages.plan_d_action.select_minerals'))}") # Étape 1 : récupérer tous les nœuds descendants depuis les produits finaux descendants = set() for start in noeuds_depart: descendants.update(nx.descendants(G, start)) # tous les successeurs (récursifs) # Étape 2 : ne garder que les nœuds de niveau 2 parmi les descendants minerais_nodes = sorted([ n for n in descendants if G.nodes[n].get("niveau") and int(str(G.nodes[n].get("niveau")).strip('"')) == 2 ]) minerais_selection = st.multiselect( str(_("pages.plan_d_action.filter_by_minerals")), minerais_nodes, key="analyse_minerais" ) return minerais_selection def selectionner_noeuds(G, niveaux_temp, niveau_depart): """Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.""" st.markdown("---") st.markdown(f"## {str(_('pages.plan_d_action.fine_selection'))}") depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart] noeuds_arrivee = [n for n in G.nodes() if niveaux_temp.get(n) == 99] noeuds_depart = st.multiselect(str(_("pages.plan_d_action.filter_start_nodes")), sorted(depart_nodes), key="analyse_noeuds_depart") noeuds_depart = noeuds_depart if noeuds_depart else None return noeuds_depart, noeuds_arrivee def extraire_niveaux(G): """Extrait les niveaux des nœuds du graphe""" niveaux = {} for node, attrs in G.nodes(data=True): niveau_str = attrs.get("niveau") if niveau_str: niveaux[node] = int(str(niveau_str).strip('"')) return niveaux def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais): """Extrait les chemins selon les critères spécifiés""" chemins = [] if noeuds_depart and noeuds_arrivee: for nd in noeuds_depart: for na in noeuds_arrivee: tous_chemins = extraire_chemins_depuis(G, nd) chemins.extend([chemin for chemin in tous_chemins if na in chemin]) elif noeuds_depart: for nd in noeuds_depart: chemins.extend(extraire_chemins_depuis(G, nd)) elif noeuds_arrivee: for na in noeuds_arrivee: chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) else: sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart] for nd in sources_depart: chemins.extend(extraire_chemins_depuis(G, nd)) if minerais: chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)] return chemins def exporter_graphe_filtre(G, liens_chemins): """Gère l'export du graphe filtré au format DOT""" G_export = nx.DiGraph() for u, v in liens_chemins: G_export.add_node(u, **G.nodes[u]) G_export.add_node(v, **G.nodes[v]) data = G.get_edge_data(u, v) if isinstance(data, dict) and all(isinstance(k, int) for k in data): G_export.add_edge(u, v, **data[0]) elif isinstance(data, dict): G_export.add_edge(u, v, **data) else: G_export.add_edge(u, v) return(G_export) def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux): """Extrait les liens des chemins en respectant les niveaux""" liens = set() for chemin in chemins: for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u, 999) niveau_v = niveaux.get(v, 999) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens.add((u, v)) return liens CORRESPONDANCE_COULEURS = { "Rouge": "red", "Orange": "orange", "Vert": "green", "FAIBLE": "green", "MODÉRÉE": "orange", "ÉLEVÉE à CRITIQUE": "red" } def remplacer_par_badge(markdown_text, correspondance=CORRESPONDANCE_COULEURS): # Échappe les mots à remplacer s'ils contiennent des accents ou espaces for mot, couleur in correspondance.items(): # Utilise des bords de mots (\b) pour éviter les remplacements partiels pattern = r'\b' + re.escape(mot) + r'\b' remplacement = f":{couleur}-badge[{mot}]" markdown_text = re.sub(pattern, remplacement, markdown_text) return markdown_text def interface_plan_d_action(G_temp): st.markdown(f"# {str(_('pages.plan_d_action.title'))}") if "plan_d_action" not in st.session_state: st.session_state["plan_d_action"] = 0 if "g_md_done" not in st.session_state: st.session_state["g_md_done"] = False if "uuid" not in st.session_state: st.session_state["uuid"] = str(uuid.uuid4())[:8] st.session_state["G_dot"] = f"{JOBS}/{st.session_state["uuid"]}.dot" st.session_state["G_md"] = f"{JOBS}/{st.session_state["uuid"]}.md" if st.session_state["plan_d_action"] == 0: html_expander(f"{str(_('pages.plan_d_action.help'))}", content="\n".join(_("pages.plan_d_action.help_content")), open_by_default=False, details_class="details_introduction") # Préparation du graphe G_temp, niveaux_temp = preparer_graphe(G_temp) # Sélection des niveaux niveau_depart = 0 niveau_arrivee = 99 # Sélection fine des noeuds noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart) # Sélection des minerais si nécessaire if noeuds_depart: minerais = selectionner_minerais(G_temp, noeuds_depart) # Étape 1 : Extraction des niveaux des nœuds niveaux = extraire_niveaux(G_temp) # Étape 2 : Extraction des chemins selon les critères chemins = extraire_chemins_selon_criteres(G_temp, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais) niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012] # Extraction des liens sans filtrage liens_chemins = extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux) else: liens_chemins = None if liens_chemins: G_final = exporter_graphe_filtre(G_temp, liens_chemins) st.session_state["G_final"] = G_final # formulaire ou sélection if st.button(str(_("pages.plan_d_action.submit_request")), icon=":material/send:"): # On déclenche la suite — mais on NE traite rien maintenant st.session_state["plan_d_action"] = 1 st.rerun() # force la réexécution immédiatement avec état mis à jour elif st.session_state["plan_d_action"] == 1: # Traitement lourd une seule fois if not st.session_state["g_md_done"]: write_dot(st.session_state["G_final"], st.session_state["G_dot"]) config = load_config() graph, ref_graph = parse_graphs(st.session_state["G_dot"]) data = extract_data_from_graph(graph, ref_graph) results = calculate_vulnerabilities(data, config) report, file_names = generate_report(data, results, config) write_report(remplacer_par_badge(report), st.session_state["G_md"]) st.session_state["g_md_done"] = True # pour ne pas re-traiter à chaque affichage # Affichage de l’interface Streamlit initialiser_interface(st.session_state["G_md"]) if (st.button("Réinitialiser", icon=":material/refresh:")): st.session_state["plan_d_action"] = 0 st.session_state["g_md_done"] = False for f in JOBS.glob(f"*{st.session_state["uuid"]}*"): if f.is_file(): f.unlink() st.rerun()