352 lines
13 KiB
Python
352 lines
13 KiB
Python
import streamlit as st
|
|
from networkx.drawing.nx_agraph import write_dot
|
|
import pandas as pd
|
|
import plotly.graph_objects as go
|
|
import networkx as nx
|
|
import logging
|
|
import tempfile
|
|
from utils.translations import _
|
|
|
|
from utils.graph_utils import (
|
|
extraire_chemins_depuis,
|
|
extraire_chemins_vers,
|
|
couleur_noeud
|
|
)
|
|
|
|
niveau_labels = {
|
|
0: "Produit final",
|
|
1: "Composant",
|
|
2: "Minerai",
|
|
10: "Opération",
|
|
11: "Pays d'opération",
|
|
12: "Acteur d'opération",
|
|
99: "Pays géographique"
|
|
}
|
|
|
|
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
|
|
|
|
def extraire_niveaux(G):
|
|
"""Extrait les niveaux des nœuds du graphe"""
|
|
niveaux = {}
|
|
for node, attrs in G.nodes(data=True):
|
|
niveau_str = attrs.get("niveau")
|
|
try:
|
|
if niveau_str:
|
|
niveaux[node] = int(str(niveau_str).strip('"'))
|
|
except ValueError:
|
|
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
|
|
return niveaux
|
|
|
|
def extraire_criticite(G, u, v):
|
|
"""Extrait la criticité d'un lien entre deux nœuds"""
|
|
data = G.get_edge_data(u, v)
|
|
if not data:
|
|
return 0
|
|
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
|
|
return float(data[0].get("criticite", 0))
|
|
return float(data.get("criticite", 0))
|
|
|
|
def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais):
|
|
"""Extrait les chemins selon les critères spécifiés"""
|
|
chemins = []
|
|
if noeuds_depart and noeuds_arrivee:
|
|
for nd in noeuds_depart:
|
|
for na in noeuds_arrivee:
|
|
tous_chemins = extraire_chemins_depuis(G, nd)
|
|
chemins.extend([chemin for chemin in tous_chemins if na in chemin])
|
|
elif noeuds_depart:
|
|
for nd in noeuds_depart:
|
|
chemins.extend(extraire_chemins_depuis(G, nd))
|
|
elif noeuds_arrivee:
|
|
for na in noeuds_arrivee:
|
|
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
|
|
else:
|
|
sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart]
|
|
for nd in sources_depart:
|
|
chemins.extend(extraire_chemins_depuis(G, nd))
|
|
|
|
if minerais:
|
|
chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)]
|
|
|
|
return chemins
|
|
|
|
def verifier_critere_ihh(G, chemin, niveaux, ihh_type):
|
|
"""Vérifie si un chemin respecte le critère IHH (concentration géographique ou industrielle)"""
|
|
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
|
|
for i in range(len(chemin) - 1):
|
|
u, v = chemin[i], chemin[i + 1]
|
|
niveau_u = niveaux.get(u)
|
|
niveau_v = niveaux.get(v)
|
|
|
|
if niveau_u in (10, 1010) and int(G.nodes[u].get(ihh_field, 0)) > 25:
|
|
return True
|
|
if niveau_v in (10, 1010) and int(G.nodes[v].get(ihh_field, 0)) > 25:
|
|
return True
|
|
return False
|
|
|
|
def verifier_critere_ivc(G, chemin, niveaux):
|
|
"""Vérifie si un chemin respecte le critère IVC (criticité par rapport à la concurrence sectorielle)"""
|
|
for i in range(len(chemin) - 1):
|
|
u = chemin[i]
|
|
niveau_u = niveaux.get(u)
|
|
if niveau_u in (2, 1002) and int(G.nodes[u].get("ivc", 0)) > 30:
|
|
return True
|
|
return False
|
|
|
|
def verifier_critere_ics(G, chemin, niveaux):
|
|
"""Vérifie si un chemin respecte le critère ICS (criticité d'un minerai pour un composant)"""
|
|
for i in range(len(chemin) - 1):
|
|
u, v = chemin[i], chemin[i + 1]
|
|
niveau_u = niveaux.get(u)
|
|
niveau_v = niveaux.get(v)
|
|
|
|
if ((niveau_u == 1 and niveau_v == 2) or
|
|
(niveau_u == 1001 and niveau_v == 1002) or
|
|
(niveau_u == 10 and niveau_v in (1000, 1001))) and extraire_criticite(G, u, v) > 0.66:
|
|
return True
|
|
return False
|
|
|
|
def verifier_critere_isg(G, chemin, niveaux):
|
|
"""Vérifie si un chemin contient un pays instable (ISG ≥ 60)"""
|
|
for i in range(len(chemin) - 1):
|
|
u, v = chemin[i], chemin[i + 1]
|
|
|
|
for n in (u, v):
|
|
if niveaux.get(n) == 99 and int(G.nodes[n].get("isg", 0)) >= 60:
|
|
return True
|
|
elif niveaux.get(n) in (11, 12, 1011, 1012):
|
|
for succ in G.successors(n):
|
|
if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60:
|
|
return True
|
|
return False
|
|
|
|
def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux):
|
|
"""Extrait les liens des chemins en respectant les niveaux"""
|
|
liens = set()
|
|
for chemin in chemins:
|
|
for i in range(len(chemin) - 1):
|
|
u, v = chemin[i], chemin[i + 1]
|
|
niveau_u = niveaux.get(u, 999)
|
|
niveau_v = niveaux.get(v, 999)
|
|
if (
|
|
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
|
|
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
|
|
):
|
|
liens.add((u, v))
|
|
return liens
|
|
|
|
def filtrer_chemins_par_criteres(G, chemins, niveaux, niveau_depart, niveau_arrivee,
|
|
filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage):
|
|
"""Filtre les chemins selon les critères de vulnérabilité"""
|
|
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
|
|
|
|
# Extraction des liens sans filtrage
|
|
liens_chemins = extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux)
|
|
|
|
# Si aucun filtre n'est appliqué, retourner tous les chemins
|
|
if not any([filtrer_ics, filtrer_ivc, filtrer_ihh, filtrer_isg]):
|
|
return liens_chemins, set()
|
|
|
|
# Application des filtres sur les chemins
|
|
chemins_filtres = set()
|
|
for chemin in chemins:
|
|
# Vérification des critères pour ce chemin
|
|
has_ihh = filtrer_ihh and verifier_critere_ihh(G, chemin, niveaux, ihh_type)
|
|
has_ivc = filtrer_ivc and verifier_critere_ivc(G, chemin, niveaux)
|
|
has_criticite = filtrer_ics and verifier_critere_ics(G, chemin, niveaux)
|
|
has_isg_critique = filtrer_isg and verifier_critere_isg(G, chemin, niveaux)
|
|
|
|
# Appliquer la logique de filtrage
|
|
if logique_filtrage == "ET":
|
|
keep = True
|
|
if filtrer_ihh: keep = keep and has_ihh
|
|
if filtrer_ivc: keep = keep and has_ivc
|
|
if filtrer_ics: keep = keep and has_criticite
|
|
if filtrer_isg: keep = keep and has_isg_critique
|
|
if keep:
|
|
chemins_filtres.add(tuple(chemin))
|
|
elif logique_filtrage == "OU":
|
|
if has_ihh or has_ivc or has_criticite or has_isg_critique:
|
|
chemins_filtres.add(tuple(chemin))
|
|
|
|
# Extraction des liens après filtrage
|
|
liens_filtres = extraire_liens_filtres(
|
|
chemins_filtres, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux
|
|
)
|
|
|
|
return liens_filtres, chemins_filtres
|
|
|
|
def couleur_criticite(p):
|
|
"""Retourne la couleur en fonction du niveau de criticité"""
|
|
if p <= 0.33:
|
|
return "darkgreen"
|
|
elif p <= 0.66:
|
|
return "orange"
|
|
else:
|
|
return "darkred"
|
|
|
|
def edge_info(G, u, v):
|
|
"""Génère l'info-bulle pour un lien"""
|
|
data = G.get_edge_data(u, v)
|
|
if not data:
|
|
return f"{str(_('pages.analyse.sankey.relation', 'Relation'))} : {u} → {v}"
|
|
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
|
|
data = data[0]
|
|
base = [f"{k}: {v}" for k, v in data.items()]
|
|
return f"{str(_('pages.analyse.sankey.relation', 'Relation'))} : {u} → {v}<br>" + "<br>".join(base)
|
|
|
|
def preparer_donnees_sankey(G, liens_chemins, niveaux, chemins):
|
|
"""Prépare les données pour le graphique Sankey"""
|
|
df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"])
|
|
df_liens = df_liens.groupby(["source", "target"]).size().reset_index(name="value")
|
|
|
|
df_liens["criticite"] = df_liens.apply(
|
|
lambda row: extraire_criticite(G, row["source"], row["target"]), axis=1)
|
|
df_liens["value"] = 0.1
|
|
|
|
# Ne garder que les nœuds effectivement connectés
|
|
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
|
|
|
|
# Inclure les nœuds connectés + tous les nœuds 10xx traversés dans les chemins
|
|
noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"])
|
|
for chemin in chemins:
|
|
for n in chemin:
|
|
if niveaux.get(n) in niveaux_speciaux:
|
|
noeuds_utilises.add(n)
|
|
|
|
df_liens["color"] = df_liens.apply(
|
|
lambda row: couleur_criticite(row["criticite"]) if row["criticite"] > 0 else "gray",
|
|
axis=1
|
|
)
|
|
|
|
all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel())
|
|
sorted_nodes = sorted(
|
|
all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True)
|
|
node_indices = {name: i for i, name in enumerate(sorted_nodes)}
|
|
|
|
customdata = []
|
|
for n in sorted_nodes:
|
|
info = [f"{k}: {v}" for k, v in G.nodes[n].items()]
|
|
niveau = niveaux.get(n, 99)
|
|
|
|
# Ajout d'un ISG hérité si applicable
|
|
if niveau in (11, 12, 1011, 1012):
|
|
for succ in G.successors(n):
|
|
if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]:
|
|
isg_val = G.nodes[succ]["isg"]
|
|
info.append(f"isg (géographique): {isg_val}")
|
|
break
|
|
|
|
customdata.append("<br>".join(info))
|
|
|
|
link_customdata = [
|
|
edge_info(G, row["source"], row["target"]) for _, row in df_liens.iterrows()
|
|
]
|
|
|
|
return df_liens, sorted_nodes, customdata, link_customdata, node_indices
|
|
|
|
def creer_graphique_sankey(G, niveaux, df_liens, sorted_nodes, customdata, link_customdata, node_indices):
|
|
"""Crée et retourne le graphique Sankey"""
|
|
sources = df_liens["source"].map(node_indices).tolist()
|
|
targets = df_liens["target"].map(node_indices).tolist()
|
|
values = df_liens["value"].tolist()
|
|
|
|
fig = go.Figure(go.Sankey(
|
|
arrangement="snap",
|
|
node=dict(
|
|
pad=10,
|
|
thickness=8,
|
|
label=sorted_nodes,
|
|
x=[niveaux.get(n, 99) / 100 for n in sorted_nodes],
|
|
color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes],
|
|
customdata=customdata,
|
|
hovertemplate="%{customdata}<extra></extra>"
|
|
),
|
|
link=dict(
|
|
source=sources,
|
|
target=targets,
|
|
value=values,
|
|
color=df_liens["color"].tolist(),
|
|
customdata=link_customdata,
|
|
hovertemplate="%{customdata}<extra></extra>"
|
|
)
|
|
))
|
|
|
|
fig.update_layout(
|
|
title_text=str(_("pages.analyse.sankey.filtered_hierarchy", "Hiérarchie filtrée par niveaux et noeuds")),
|
|
paper_bgcolor="white",
|
|
plot_bgcolor="white"
|
|
)
|
|
|
|
return fig
|
|
|
|
def exporter_graphe_filtre(G, liens_chemins):
|
|
"""Gère l'export du graphe filtré au format DOT"""
|
|
if not st.session_state.get("logged_in", False) or not liens_chemins:
|
|
return
|
|
|
|
G_export = nx.DiGraph()
|
|
for u, v in liens_chemins:
|
|
G_export.add_node(u, **G.nodes[u])
|
|
G_export.add_node(v, **G.nodes[v])
|
|
data = G.get_edge_data(u, v)
|
|
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
|
|
G_export.add_edge(u, v, **data[0])
|
|
elif isinstance(data, dict):
|
|
G_export.add_edge(u, v, **data)
|
|
else:
|
|
G_export.add_edge(u, v)
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".dot", mode="w", encoding="utf-8") as f:
|
|
write_dot(G_export, f.name)
|
|
dot_path = f.name
|
|
|
|
with open(dot_path, encoding="utf-8") as f:
|
|
st.download_button(
|
|
label=str(_("pages.analyse.sankey.download_dot", "Télécharger le fichier DOT filtré")),
|
|
data=f.read(),
|
|
file_name="graphe_filtré.dot",
|
|
mime="text/plain"
|
|
)
|
|
|
|
def afficher_sankey(
|
|
G,
|
|
niveau_depart, niveau_arrivee,
|
|
noeuds_depart=None, noeuds_arrivee=None,
|
|
minerais=None,
|
|
filtrer_ics=False, filtrer_ivc=False,
|
|
filtrer_ihh=False, ihh_type="Pays", filtrer_isg=False,
|
|
logique_filtrage="OU"):
|
|
|
|
# Étape 1 : Extraction des niveaux des nœuds
|
|
niveaux = extraire_niveaux(G)
|
|
|
|
# Étape 2 : Extraction des chemins selon les critères
|
|
chemins = extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais)
|
|
|
|
if not chemins:
|
|
st.warning(str(_("pages.analyse.sankey.no_paths", "Aucun chemin trouvé pour les critères spécifiés.")))
|
|
return
|
|
|
|
# Étape 3 : Filtrage des chemins selon les critères de vulnérabilité
|
|
liens_chemins, chemins_filtres = filtrer_chemins_par_criteres(
|
|
G, chemins, niveaux, niveau_depart, niveau_arrivee,
|
|
filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage
|
|
)
|
|
|
|
if not liens_chemins:
|
|
st.warning(str(_("pages.analyse.sankey.no_matching_paths", "Aucun chemin ne correspond aux critères.")))
|
|
return
|
|
|
|
# Étape 4 : Préparation des données pour le graphique Sankey
|
|
df_liens, sorted_nodes, customdata, link_customdata, node_indices = preparer_donnees_sankey(
|
|
G, liens_chemins, niveaux, chemins_filtres if any([filtrer_ics, filtrer_ivc, filtrer_ihh, filtrer_isg]) else chemins
|
|
)
|
|
|
|
# Étape 5 : Création et affichage du graphique Sankey
|
|
fig = creer_graphique_sankey(G, niveaux, df_liens, sorted_nodes, customdata, link_customdata, node_indices)
|
|
st.plotly_chart(fig)
|
|
|
|
# Étape 6 : Export optionnel du graphe filtré
|
|
exporter_graphe_filtre(G, liens_chemins)
|