Code/app/plan_d_action/interface.py
2025-06-02 17:00:08 +02:00

238 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour générer un rapport factorisé des vulnérabilités critiques
suivant la structure définie dans Remarques.md.
"""
import streamlit as st
import networkx as nx
import uuid
from utils.translations import _
from utils.widgets import html_expander
from networkx.drawing.nx_agraph import write_dot
from batch_ia import (
load_config,
write_report,
parse_graphs,
extract_data_from_graph,
calculate_vulnerabilities,
generate_report,
)
from .plan_d_action import initialiser_interface
from utils.graph_utils import (
extraire_chemins_depuis,
extraire_chemins_vers
)
from pathlib import Path
# Répertoire courant du script
CURRENT_DIR = Path(__file__).resolve().parent
# Répertoire "jobs" dans app/plan_d_action
JOBS = CURRENT_DIR / "jobs"
JOBS.mkdir(exist_ok=True)
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def preparer_graphe(G):
"""Nettoie et prépare le graphe pour l'analyse."""
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp])
G.remove_nodes_from(
[n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
return G, niveaux_temp
def selectionner_minerais(G, noeuds_depart):
"""Interface pour sélectionner les minerais si nécessaire."""
minerais_selection = None
st.markdown(f"## {str(_('pages.plan_d_action.select_minerals'))}")
# Étape 1 : récupérer tous les nœuds descendants depuis les produits finaux
descendants = set()
for start in noeuds_depart:
descendants.update(nx.descendants(G, start)) # tous les successeurs (récursifs)
# Étape 2 : ne garder que les nœuds de niveau 2 parmi les descendants
minerais_nodes = sorted([
n for n in descendants
if G.nodes[n].get("niveau") and int(str(G.nodes[n].get("niveau")).strip('"')) == 2
])
minerais_selection = st.multiselect(
str(_("pages.plan_d_action.filter_by_minerals")),
minerais_nodes,
key="analyse_minerais"
)
return minerais_selection
def selectionner_noeuds(G, niveaux_temp, niveau_depart):
"""Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée."""
st.markdown("---")
st.markdown(f"## {str(_('pages.plan_d_action.fine_selection'))}")
depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart]
noeuds_arrivee = [n for n in G.nodes() if niveaux_temp.get(n) == 99]
noeuds_depart = st.multiselect(str(_("pages.plan_d_action.filter_start_nodes")),
sorted(depart_nodes),
key="analyse_noeuds_depart")
noeuds_depart = noeuds_depart if noeuds_depart else None
return noeuds_depart, noeuds_arrivee
def extraire_niveaux(G):
"""Extrait les niveaux des nœuds du graphe"""
niveaux = {}
for node, attrs in G.nodes(data=True):
niveau_str = attrs.get("niveau")
if niveau_str:
niveaux[node] = int(str(niveau_str).strip('"'))
return niveaux
def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais):
"""Extrait les chemins selon les critères spécifiés"""
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
for na in noeuds_arrivee:
tous_chemins = extraire_chemins_depuis(G, nd)
chemins.extend([chemin for chemin in tous_chemins if na in chemin])
elif noeuds_depart:
for nd in noeuds_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
elif noeuds_arrivee:
for na in noeuds_arrivee:
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
else:
sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart]
for nd in sources_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
if minerais:
chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)]
return chemins
def exporter_graphe_filtre(G, liens_chemins):
"""Gère l'export du graphe filtré au format DOT"""
G_export = nx.DiGraph()
for u, v in liens_chemins:
G_export.add_node(u, **G.nodes[u])
G_export.add_node(v, **G.nodes[v])
data = G.get_edge_data(u, v)
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
G_export.add_edge(u, v, **data[0])
elif isinstance(data, dict):
G_export.add_edge(u, v, **data)
else:
G_export.add_edge(u, v)
return(G_export)
def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux):
"""Extrait les liens des chemins en respectant les niveaux"""
liens = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u, 999)
niveau_v = niveaux.get(v, 999)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens.add((u, v))
return liens
def interface_plan_d_action(G_temp):
st.markdown(f"# {str(_('pages.plan_d_action.title'))}")
if "plan_d_action" not in st.session_state:
st.session_state["plan_d_action"] = 0
if "g_md_done" not in st.session_state:
st.session_state["g_md_done"] = False
if "uuid" not in st.session_state:
st.session_state["uuid"] = str(uuid.uuid4())[:8]
st.session_state["G_dot"] = f"{JOBS}/{st.session_state["uuid"]}.dot"
st.session_state["G_md"] = f"{JOBS}/{st.session_state["uuid"]}.md"
if st.session_state["plan_d_action"] == 0:
html_expander(f"{str(_('pages.plan_d_action.help'))}", content="\n".join(_("pages.plan_d_action.help_content")), open_by_default=False, details_class="details_introduction")
# Préparation du graphe
G_temp, niveaux_temp = preparer_graphe(G_temp)
# Sélection des niveaux
niveau_depart = 0
niveau_arrivee = 99
# Sélection fine des noeuds
noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart)
# Sélection des minerais si nécessaire
if noeuds_depart:
minerais = selectionner_minerais(G_temp, noeuds_depart)
# Étape 1 : Extraction des niveaux des nœuds
niveaux = extraire_niveaux(G_temp)
# Étape 2 : Extraction des chemins selon les critères
chemins = extraire_chemins_selon_criteres(G_temp, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais)
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
# Extraction des liens sans filtrage
liens_chemins = extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux)
else:
liens_chemins = None
if liens_chemins:
G_final = exporter_graphe_filtre(G_temp, liens_chemins)
st.session_state["G_final"] = G_final
# formulaire ou sélection
if st.button(str(_("pages.plan_d_action.submit_request")), icon=":material/play_arrow:"):
# On déclenche la suite — mais on NE traite rien maintenant
st.session_state["plan_d_action"] = 1
st.rerun() # force la réexécution immédiatement avec état mis à jour
elif st.session_state["plan_d_action"] == 1:
# Traitement lourd une seule fois
if not st.session_state["g_md_done"]:
write_dot(st.session_state["G_final"], st.session_state["G_dot"])
config = load_config()
graph, ref_graph = parse_graphs(st.session_state["G_dot"])
data = extract_data_from_graph(graph, ref_graph)
results = calculate_vulnerabilities(data, config)
report, file_names = generate_report(data, results, config)
write_report(report, st.session_state["G_md"])
st.session_state["g_md_done"] = True # pour ne pas re-traiter à chaque affichage
# Affichage de linterface Streamlit
initialiser_interface(st.session_state["G_md"])
if (st.button("Réinitialiser", icon=":material/refresh:")):
st.session_state["plan_d_action"] = 0
st.session_state["g_md_done"] = False
for f in JOBS.glob(f"*{st.session_state["uuid"]}*"):
if f.is_file():
f.unlink()
st.rerun()