#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script pour générer un rapport factorisé des vulnérabilités critiques suivant la structure définie dans Remarques.md. """ import streamlit as st import networkx as nx import uuid from utils.translations import _ from utils.widgets import html_expander from networkx.drawing.nx_agraph import write_dot from batch_ia import ( load_config, write_report, parse_graphs, extract_data_from_graph, calculate_vulnerabilities, generate_report, ) from .plan_d_actions import initialiser_interface from utils.graph_utils import ( extraire_chemins_depuis, extraire_chemins_vers ) from pathlib import Path # Répertoire courant du script CURRENT_DIR = Path(__file__).resolve().parent # Répertoire "jobs" dans app/plan_d_actions JOBS = CURRENT_DIR / "jobs" JOBS.mkdir(parents=True, exist_ok=True) niveau_labels = { 0: "Produit final", 1: "Composant", 2: "Minerai", 10: "Opération", 11: "Pays d'opération", 12: "Acteur d'opération", 99: "Pays géographique" } inverse_niveau_labels = {v: k for k, v in niveau_labels.items()} def preparer_graphe(G): """Nettoie et prépare le graphe pour l'analyse.""" niveaux_temp = { node: int(str(attrs.get("niveau")).strip('"')) for node, attrs in G.nodes(data=True) if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() } G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp]) G.remove_nodes_from( [n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) return G, niveaux_temp def selectionner_minerais(G, noeuds_depart): """Interface pour sélectionner les minerais si nécessaire.""" minerais_selection = None st.markdown(f"## {str(_('pages.plan_d_actions.select_minerals'))}") # Étape 1 : récupérer tous les nœuds descendants depuis les produits finaux descendants = set() for start in noeuds_depart: descendants.update(nx.descendants(G, start)) # tous les successeurs (récursifs) # Étape 2 : ne garder que les nœuds de niveau 2 parmi les descendants minerais_nodes = sorted([ n for n in descendants if G.nodes[n].get("niveau") and int(str(G.nodes[n].get("niveau")).strip('"')) == 2 ]) minerais_selection = st.multiselect( str(_("pages.plan_d_actions.filter_by_minerals")), minerais_nodes, key="analyse_minerais" ) return minerais_selection def selectionner_noeuds(G, niveaux_temp, niveau_depart): """Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.""" st.markdown("---") st.markdown(f"## {str(_('pages.plan_d_actions.fine_selection'))}") depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart] noeuds_arrivee = [n for n in G.nodes() if niveaux_temp.get(n) == 99] noeuds_depart = st.multiselect(str(_("pages.plan_d_actions.filter_start_nodes")), sorted(depart_nodes), key="analyse_noeuds_depart") noeuds_depart = noeuds_depart if noeuds_depart else None return noeuds_depart, noeuds_arrivee def extraire_niveaux(G): """Extrait les niveaux des nœuds du graphe""" niveaux = {} for node, attrs in G.nodes(data=True): niveau_str = attrs.get("niveau") if niveau_str: niveaux[node] = int(str(niveau_str).strip('"')) return niveaux def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais): """Extrait les chemins selon les critères spécifiés""" chemins = [] if noeuds_depart and noeuds_arrivee: for nd in noeuds_depart: for na in noeuds_arrivee: tous_chemins = extraire_chemins_depuis(G, nd) chemins.extend([chemin for chemin in tous_chemins if na in chemin]) elif noeuds_depart: for nd in noeuds_depart: chemins.extend(extraire_chemins_depuis(G, nd)) elif noeuds_arrivee: for na in noeuds_arrivee: chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) else: sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart] for nd in sources_depart: chemins.extend(extraire_chemins_depuis(G, nd)) if minerais: chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)] return chemins def exporter_graphe_filtre(G, liens_chemins): """Gère l'export du graphe filtré au format DOT""" G_export = nx.DiGraph() for u, v in liens_chemins: G_export.add_node(u, **G.nodes[u]) G_export.add_node(v, **G.nodes[v]) data = G.get_edge_data(u, v) if isinstance(data, dict) and all(isinstance(k, int) for k in data): G_export.add_edge(u, v, **data[0]) elif isinstance(data, dict): G_export.add_edge(u, v, **data) else: G_export.add_edge(u, v) return(G_export) def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux): """Extrait les liens des chemins en respectant les niveaux""" liens = set() for chemin in chemins: for i in range(len(chemin) - 1): u, v = chemin[i], chemin[i + 1] niveau_u = niveaux.get(u, 999) niveau_v = niveaux.get(v, 999) if ( (niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux) and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux) ): liens.add((u, v)) return liens def interface_plan_d_actions(G_temp): st.markdown(f"# {str(_('pages.plan_d_actions.title'))}") if "plan_d_actions" not in st.session_state: st.session_state["plan_d_actions"] = 0 if "g_md_done" not in st.session_state: st.session_state["g_md_done"] = False if "uuid" not in st.session_state: st.session_state["uuid"] = str(uuid.uuid4())[:8] st.session_state["G_dot"] = f"{JOBS}/{st.session_state["uuid"]}.dot" st.session_state["G_md"] = f"{JOBS}/{st.session_state["uuid"]}.md" if st.session_state["plan_d_actions"] == 0: html_expander(f"{str(_('pages.plan_d_actions.help'))}", content="\n".join(_("pages.plan_d_actions.help_content")), open_by_default=False, details_class="details_introduction") # Préparation du graphe G_temp, niveaux_temp = preparer_graphe(G_temp) # Sélection des niveaux niveau_depart = 0 niveau_arrivee = 99 # Sélection fine des noeuds noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart) # Sélection des minerais si nécessaire if noeuds_depart: minerais = selectionner_minerais(G_temp, noeuds_depart) # Étape 1 : Extraction des niveaux des nœuds niveaux = extraire_niveaux(G_temp) # Étape 2 : Extraction des chemins selon les critères chemins = extraire_chemins_selon_criteres(G_temp, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais) niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012] # Extraction des liens sans filtrage liens_chemins = extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux) else: liens_chemins = None if liens_chemins: G_final = exporter_graphe_filtre(G_temp, liens_chemins) st.session_state["G_final"] = G_final # formulaire ou sélection if st.button(str(_("pages.plan_d_actions.submit_request"))): # On déclenche la suite — mais on NE traite rien maintenant st.session_state["plan_d_actions"] = 1 st.rerun() # force la réexécution immédiatement avec état mis à jour elif st.session_state["plan_d_actions"] == 1: # Traitement lourd une seule fois if not st.session_state["g_md_done"]: write_dot(st.session_state["G_final"], st.session_state["G_dot"]) config = load_config() graph, ref_graph = parse_graphs(st.session_state["G_dot"]) data = extract_data_from_graph(graph, ref_graph) results = calculate_vulnerabilities(data, config) report, file_names = generate_report(data, results, config) write_report(report, st.session_state["G_md"]) st.session_state["g_md_done"] = True # pour ne pas re-traiter à chaque affichage # Affichage de l’interface Streamlit initialiser_interface(st.session_state["G_md"]) if (st.button("Réinitialiser")): st.session_state["plan_d_actions"] = 0 st.session_state["g_md_done"] = False for f in JOBS.glob(f"*{st.session_state["uuid"]}*"): if f.is_file(): f.unlink() st.rerun()