328 lines
12 KiB
Python
328 lines
12 KiB
Python
from typing import List, Tuple, Dict, Optional
|
|
import networkx as nx
|
|
import streamlit as st
|
|
from utils.translations import _
|
|
from utils.widgets import html_expander
|
|
from utils.persistance import maj_champ_statut, get_champ_statut, supprime_champ_statut
|
|
|
|
from .sankey import afficher_sankey
|
|
|
|
niveau_labels = {
|
|
0: "Produit final",
|
|
1: "Composant",
|
|
2: "Minerai",
|
|
10: "Opération",
|
|
11: "Pays d'opération",
|
|
12: "Acteur d'opération",
|
|
99: "Pays géographique"
|
|
}
|
|
|
|
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
|
|
|
|
|
|
def preparer_graphe(
|
|
G: nx.DiGraph,
|
|
) -> Tuple[nx.DiGraph, Dict[str, int]]:
|
|
"""
|
|
Nettoie et prépare le graphe pour l'analyse.
|
|
|
|
Args:
|
|
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
|
|
|
|
Returns:
|
|
Tuple[nx.DiGraph, Dict[str, int]]: Un tuple contenant :
|
|
- Le graphe NetworkX proprement configuré
|
|
- Un dictionnaire des niveaux associés aux nœuds
|
|
"""
|
|
niveaux_temp = {
|
|
node: int(str(attrs.get("niveau")).strip('"'))
|
|
for node, attrs in G.nodes(data=True)
|
|
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
|
|
}
|
|
G.remove_nodes_from([n for n in G.nodes() if n not in niveaux_temp])
|
|
G.remove_nodes_from(
|
|
[n for n in G.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
|
|
return G, niveaux_temp
|
|
|
|
def selectionner_niveaux(
|
|
) -> Tuple[int|None, int|None]:
|
|
"""
|
|
Interface pour sélectionner les niveaux de départ et d'arrivée.
|
|
|
|
Returns:
|
|
Tuple[int, int]: Un tuple contenant deux nombres si des nœuds ont été sélectionnés,
|
|
- None sinon
|
|
"""
|
|
st.markdown(f"## {str(_('pages.analyse.selection_nodes'))}")
|
|
valeur_defaut = str(_("pages.analyse.select_level"))
|
|
niveau_choix = [valeur_defaut] + list(niveau_labels.values())
|
|
default_index = next((i for i, opt in enumerate(niveau_choix) if get_champ_statut("pages.analyse.select_level.niveau_depart") in opt), 0)
|
|
|
|
niveau_depart = st.selectbox(str(_("pages.analyse.start_level")), niveau_choix, index=default_index, key="analyse_niveau_depart")
|
|
if niveau_depart == valeur_defaut:
|
|
return None, None
|
|
else:
|
|
maj_champ_statut("pages.analyse.select_level.niveau_depart", niveau_depart)
|
|
|
|
niveau_depart_int = inverse_niveau_labels[niveau_depart]
|
|
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart_int]
|
|
niveau_choix = [valeur_defaut] + niveaux_arrivee_possibles
|
|
default_index = next((i for i, opt in enumerate(niveau_choix) if get_champ_statut("pages.analyse.select_level.niveau_arrivee") in opt), 0)
|
|
|
|
niveau_arrivee = st.selectbox(str(_("pages.analyse.end_level")), niveau_choix, index=default_index, key="analyse_niveau_arrivee")
|
|
if niveau_arrivee == valeur_defaut:
|
|
return niveau_depart_int, None
|
|
else:
|
|
maj_champ_statut("pages.analyse.select_level.niveau_arrivee", niveau_arrivee)
|
|
|
|
niveau_arrivee_int = inverse_niveau_labels[niveau_arrivee]
|
|
return niveau_depart_int, niveau_arrivee_int
|
|
|
|
def selectionner_minerais(G: nx.DiGraph, niveau_depart: int, niveau_arrivee: int) -> Optional[List[str]]:
|
|
if not (niveau_depart < 2 < niveau_arrivee):
|
|
return None
|
|
|
|
st.markdown(f"### {str(_('pages.analyse.select_minerals'))}")
|
|
|
|
minerais_nodes = sorted([
|
|
n for n, d in G.nodes(data=True)
|
|
if d.get("niveau") and int(str(d.get("niveau")).strip('\"')) == 2
|
|
])
|
|
|
|
# Initialiser depuis champ_statut si besoin
|
|
if "analyse_minerais" not in st.session_state:
|
|
anciens = []
|
|
i = 0
|
|
while True:
|
|
m = get_champ_statut(f"pages.analyse.filter_by_minerals.{i}")
|
|
if not m:
|
|
break
|
|
anciens.append(m)
|
|
i += 1
|
|
st.session_state["analyse_minerais"] = anciens
|
|
|
|
# Widget multiselect sans default, seulement key
|
|
st.multiselect(
|
|
str(_("pages.analyse.filter_by_minerals")),
|
|
minerais_nodes,
|
|
key="analyse_minerais"
|
|
)
|
|
|
|
selection = st.session_state["analyse_minerais"]
|
|
|
|
# Toujours purger, puis recréer si nécessaire
|
|
supprime_champ_statut("pages.analyse.filter_by_minerals")
|
|
if selection:
|
|
for i, m in enumerate(selection):
|
|
maj_champ_statut(f"pages.analyse.filter_by_minerals.{i}", m)
|
|
|
|
return selection if selection else None
|
|
|
|
def selectionner_noeuds(
|
|
G: nx.DiGraph,
|
|
niveaux_temp: Dict[str, int],
|
|
niveau_depart: int,
|
|
niveau_arrivee: int
|
|
) -> Tuple[List[str]|None, List[str]|None]:
|
|
"""
|
|
Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.
|
|
|
|
Args:
|
|
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
|
|
niveaux_temp (Dict[str, int]): Dictionnaire contenant les niveaux associés aux nœuds.
|
|
niveau_depart (int): Le niveau de départ sélectionné.
|
|
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
|
|
|
|
Returns:
|
|
Optional[Tuple[List[str], List[str]]]: Un tuple contenant les listes des nœuds
|
|
- None sinon
|
|
"""
|
|
st.markdown("---")
|
|
st.markdown(f"## {str(_('pages.analyse.fine_selection'))}")
|
|
|
|
depart_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_depart]
|
|
arrivee_nodes = [n for n in G.nodes() if niveaux_temp.get(n) == niveau_arrivee]
|
|
|
|
# DEPARTS -------------------------------------
|
|
if "analyse_noeuds_depart" not in st.session_state:
|
|
anciens_departs = []
|
|
i = 0
|
|
while True:
|
|
val = get_champ_statut(f"pages.analyse.filter_start_nodes.{i}")
|
|
if not val:
|
|
break
|
|
anciens_departs.append(val)
|
|
i += 1
|
|
st.session_state["analyse_noeuds_depart"] = anciens_departs
|
|
|
|
st.multiselect(
|
|
str(_("pages.analyse.filter_start_nodes")),
|
|
sorted(depart_nodes),
|
|
key="analyse_noeuds_depart"
|
|
)
|
|
|
|
departs_selection = st.session_state["analyse_noeuds_depart"]
|
|
|
|
supprime_champ_statut("pages.analyse.filter_start_nodes")
|
|
if departs_selection:
|
|
for i, val in enumerate(departs_selection):
|
|
maj_champ_statut(f"pages.analyse.filter_start_nodes.{i}", val)
|
|
|
|
# ARRIVEES -------------------------------------
|
|
if "analyse_noeuds_arrivee" not in st.session_state:
|
|
anciens_arrivees = []
|
|
i = 0
|
|
while True:
|
|
val = get_champ_statut(f"pages.analyse.filter_end_nodes.{i}")
|
|
if not val:
|
|
break
|
|
anciens_arrivees.append(val)
|
|
i += 1
|
|
st.session_state["analyse_noeuds_arrivee"] = anciens_arrivees
|
|
|
|
st.multiselect(
|
|
str(_("pages.analyse.filter_end_nodes")),
|
|
sorted(arrivee_nodes),
|
|
key="analyse_noeuds_arrivee"
|
|
)
|
|
|
|
arrivees_selection = st.session_state["analyse_noeuds_arrivee"]
|
|
|
|
supprime_champ_statut("pages.analyse.filter_end_nodes")
|
|
if arrivees_selection:
|
|
for i, val in enumerate(arrivees_selection):
|
|
maj_champ_statut(f"pages.analyse.filter_end_nodes.{i}", val)
|
|
|
|
departs_selection = departs_selection if departs_selection else None
|
|
arrivees_selection = arrivees_selection if arrivees_selection else None
|
|
|
|
return departs_selection, arrivees_selection
|
|
|
|
def configurer_filtres_vulnerabilite() -> Tuple[bool, bool, bool, str, bool, str]:
|
|
"""
|
|
Interface pour configurer les filtres de vulnérabilité.
|
|
|
|
Returns:
|
|
Tuple[bool, bool, bool, str, bool, str]: Un tuple contenant :
|
|
- La possibilité de filtrer les ICS
|
|
- La possibilité de filtrer les IV C
|
|
- La possibilité de filtrer les IH H
|
|
- Le type d'application pour les IH H (pays ou acteur)
|
|
- La possibilité de filtrer les IS G
|
|
- La logique de filtrage (ou ou and)
|
|
"""
|
|
st.markdown("---")
|
|
st.markdown(f"## {str(_('pages.analyse.vulnerability_filters'))}")
|
|
|
|
def init_checkbox(key, champ):
|
|
if key not in st.session_state:
|
|
val = get_champ_statut(champ)
|
|
st.session_state[key] = val.lower() == "true"
|
|
|
|
def init_radio(key, champ, options, default):
|
|
if key not in st.session_state:
|
|
val = get_champ_statut(champ)
|
|
st.session_state[key] = val if val in options else default
|
|
|
|
# Initialiser les valeurs si F5
|
|
init_checkbox("analyse_filtrer_ics", "pages.analyse.filter_ics")
|
|
init_checkbox("analyse_filtrer_ivc", "pages.analyse.filter_ivc")
|
|
init_checkbox("analyse_filtrer_ihh", "pages.analyse.filter_ihh")
|
|
init_checkbox("analyse_filtrer_isg", "pages.analyse.filter_isg")
|
|
init_radio("analyse_ihh_type", "pages.analyse.apply_ihh_filter", ["Pays", "Acteurs"], "Pays")
|
|
init_radio("analyse_logique_filtrage", "pages.analyse.filter_logic", ["OU", "ET"], "OU")
|
|
|
|
filtrer_ics = st.checkbox(str(_("pages.analyse.filter_ics")), key="analyse_filtrer_ics")
|
|
filtrer_ivc = st.checkbox(str(_("pages.analyse.filter_ivc")), key="analyse_filtrer_ivc")
|
|
filtrer_ihh = st.checkbox(str(_("pages.analyse.filter_ihh")), key="analyse_filtrer_ihh")
|
|
|
|
ihh_type = "Pays"
|
|
if filtrer_ihh:
|
|
ihh_type = st.radio(
|
|
str(_("pages.analyse.apply_ihh_filter")),
|
|
[str(_("pages.analyse.countries")), str(_("pages.analyse.actors"))],
|
|
horizontal=True,
|
|
key="analyse_ihh_type"
|
|
)
|
|
|
|
filtrer_isg = st.checkbox(str(_("pages.analyse.filter_isg")), key="analyse_filtrer_isg")
|
|
|
|
logique_options = ["OU", "ET"]
|
|
logique_labels = {
|
|
"OU": str(_("pages.analyse.or")),
|
|
"ET": str(_("pages.analyse.and"))
|
|
}
|
|
|
|
logique_filtrage = st.radio(
|
|
str(_("pages.analyse.filter_logic")),
|
|
options=logique_options,
|
|
format_func=lambda x: logique_labels.get(x, x),
|
|
horizontal=True,
|
|
key="analyse_logique_filtrage"
|
|
)
|
|
|
|
# Sauvegarde de l'état
|
|
maj_champ_statut("pages.analyse.filter_ics", str(filtrer_ics))
|
|
maj_champ_statut("pages.analyse.filter_ivc", str(filtrer_ivc))
|
|
maj_champ_statut("pages.analyse.filter_ihh", str(filtrer_ihh))
|
|
maj_champ_statut("pages.analyse.apply_ihh_filter", ihh_type)
|
|
maj_champ_statut("pages.analyse.filter_isg", str(filtrer_isg))
|
|
maj_champ_statut("pages.analyse.filter_logic", logique_filtrage)
|
|
|
|
return filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage
|
|
|
|
def interface_analyse(
|
|
G_temp: nx.DiGraph,
|
|
) -> None:
|
|
"""
|
|
Interface utilisateur pour l'analyse des graphes.
|
|
|
|
Args:
|
|
G_temp (nx.DiGraph): Le graphe NetworkX à analyser.
|
|
"""
|
|
titre = f"# {str(_('pages.analyse.title'))}"
|
|
maj_champ_statut("pages.analyse.title", titre)
|
|
st.markdown(titre)
|
|
html_expander(f"{str(_('pages.analyse.help'))}", content="\n".join(_("pages.analyse.help_content")), open_by_default=False, details_class="details_introduction")
|
|
st.markdown("---")
|
|
|
|
try:
|
|
|
|
# Préparation du graphe
|
|
G_temp, niveaux_temp = preparer_graphe(G_temp)
|
|
|
|
# Sélection des niveaux
|
|
niveau_depart, niveau_arrivee = selectionner_niveaux()
|
|
if niveau_depart is None or niveau_arrivee is None:
|
|
return
|
|
|
|
# Sélection des minerais si nécessaire
|
|
minerais_selection = selectionner_minerais(G_temp, niveau_depart, niveau_arrivee)
|
|
|
|
# Sélection fine des noeuds
|
|
noeuds_depart, noeuds_arrivee = selectionner_noeuds(G_temp, niveaux_temp, niveau_depart, niveau_arrivee)
|
|
|
|
# Configuration des filtres de vulnérabilité
|
|
filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage = configurer_filtres_vulnerabilite()
|
|
|
|
# Lancement de l'analyse
|
|
st.markdown("---")
|
|
if st.button(str(_("pages.analyse.run_analysis")), type="primary", key="analyse_lancer", icon=":material/graph_4:"):
|
|
afficher_sankey(
|
|
G_temp,
|
|
niveau_depart=niveau_depart,
|
|
niveau_arrivee=niveau_arrivee,
|
|
noeuds_depart=noeuds_depart,
|
|
noeuds_arrivee=noeuds_arrivee,
|
|
minerais=minerais_selection,
|
|
filtrer_ics=filtrer_ics,
|
|
filtrer_ivc=filtrer_ivc,
|
|
filtrer_ihh=filtrer_ihh,
|
|
ihh_type=ihh_type,
|
|
filtrer_isg=filtrer_isg,
|
|
logique_filtrage=logique_filtrage
|
|
)
|
|
|
|
except Exception as e:
|
|
st.error(f"{str(_('errors.graph_preview_error'))} {e}")
|