Code/fabnum.py
Fabrication du Numérique 8710014345 Modifications du jour
2025-05-08 21:35:37 +02:00

581 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from networkx.drawing.nx_agraph import read_dot, write_dot
import pandas as pd
import plotly.graph_objects as go
import networkx as nx
import logging
import re
import tempfile
# Configuration Gitea
from config import DOT_FILE, INSTRUCTIONS
from utils.gitea import (
charger_instructions_depuis_gitea,
charger_schema_depuis_gitea
)
from utils.graph_utils import (
extraire_chemins_depuis,
extraire_chemins_vers,
lancer_personnalisation
)
from utils.visualisation import (
lancer_visualisation_ihh_criticite,
lancer_visualisation_ihh_ivc
)
from components.sidebar import (
afficher_menu,
afficher_impact
)
from components.header import afficher_entete
from components.footer import afficher_pied_de_page
from components.fiches import afficher_fiches
st.set_page_config(
page_title="Fabnum Analyse de chaîne",
page_icon="assets/weakness.png", # ajout
layout="centered",
initial_sidebar_state="expanded"
)
session_id = st.context.headers.get("x-session-id")
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
# Une seule lecture du fichier, mais injection à chaque run
if "base_css_content" not in st.session_state:
with open("assets/styles/base.css") as f:
st.session_state["base_css_content"] = f.read()
st.markdown(f"<style>{st.session_state['base_css_content']}</style>", unsafe_allow_html=True)
# Chargement initial des thèmes (variables CSS uniquement)
if "theme_css_content_clair" not in st.session_state:
with open("assets/styles/theme-light.css") as f:
st.session_state["theme_css_content_clair"] = f.read()
if "theme_css_content_sombre" not in st.session_state:
with open("assets/styles/theme-dark.css") as f:
st.session_state["theme_css_content_sombre"] = f.read()
# Thème en cours
current_theme = st.session_state.get("theme_mode", "Clair").lower()
theme_css = st.session_state[f"theme_css_content_{current_theme}"]
# Injection des variables du thème
st.markdown(f"<style>{theme_css}</style>", unsafe_allow_html=True)
# Chargement unique du CSS principal (base.css)
if "base_css_content" not in st.session_state:
with open("assets/styles/base.css") as f:
st.session_state["base_css_content"] = f.read()
# Injection du style principal basé sur les variables
st.markdown(f"<style>{st.session_state['base_css_content']}</style>", unsafe_allow_html=True)
def get_total_bytes_for_session(session_id):
total_bytes = 0
try:
with open("/var/log/nginx/fabnum-dev.access.log", "r") as f:
for line in f:
if session_id in line:
match = re.search(r'"GET.*?" \d+ (\d+)', line)
if match:
bytes_sent = int(match.group(1))
total_bytes += bytes_sent
except Exception as e:
st.error(f"Erreur lecture log: {e}")
return total_bytes
# Intégration du fichier CSS externe
# with open("assets/styles.css") as f:
# st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
afficher_entete()
afficher_menu()
st.markdown("""
<main role="main">
""", unsafe_allow_html=True)
def couleur_noeud(n, niveaux, G):
niveau = niveaux.get(n, 99)
attrs = G.nodes[n]
# Niveau 99 : pays géographique avec isg
if niveau == 99:
isg = int(attrs.get("isg", -1))
return (
"darkred" if isg >= 60 else
"orange" if isg >= 31 else
"darkgreen" if isg >= 0 else
"gray"
)
# Niveau 11 ou 12 connecté à un pays géographique
if niveau in (11, 12, 1011, 1012):
for succ in G.successors(n):
if niveaux.get(succ) == 99:
isg = int(G.nodes[succ].get("isg", -1))
return (
"darkred" if isg >= 60 else
"orange" if isg >= 31 else
"darkgreen" if isg >= 0 else
"gray"
)
# Logique existante pour IHH / IVC
if niveau in (10, 1010) and attrs.get("ihh_pays"):
ihh = int(attrs["ihh_pays"])
return (
"darkgreen" if ihh <= 15 else
"orange" if ihh <= 25 else
"darkred"
)
elif niveau == 2 and attrs.get("ivc"):
ivc = int(attrs["ivc"])
return (
"darkgreen" if ivc <= 15 else
"orange" if ivc <= 30 else
"darkred"
)
return "lightblue"
def afficher_sankey(
G,
niveau_depart, niveau_arrivee,
noeuds_depart=None, noeuds_arrivee=None,
minerais=None,
filtrer_ics=False, filtrer_ivc=False,
filtrer_ihh=False, filtrer_isg=False,
logique_filtrage="OU"):
niveaux = {}
for node, attrs in G.nodes(data=True):
niveau_str = attrs.get("niveau")
try:
if niveau_str:
niveaux[node] = int(str(niveau_str).strip('"'))
except ValueError:
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
for na in noeuds_arrivee:
tous_chemins = extraire_chemins_depuis(G, nd)
chemins.extend([chemin for chemin in tous_chemins if na in chemin])
elif noeuds_depart:
for nd in noeuds_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
elif noeuds_arrivee:
for na in noeuds_arrivee:
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
else:
sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart]
for nd in sources_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
if minerais:
chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)]
def extraire_criticite(u, v):
data = G.get_edge_data(u, v)
if not data:
return 0
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
return float(data[0].get("criticite", 0))
return float(data.get("criticite", 0))
liens_chemins = set()
chemins_filtres = set()
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
for chemin in chemins:
has_ihh = has_ivc = has_criticite = has_isg_critique = False
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u)
niveau_v = niveaux.get(v)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens_chemins.add((u, v))
if filtrer_ihh and ihh_type:
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
if niveau_u in (10, 1010) and int(G.nodes[u].get(ihh_field, 0)) > 25:
has_ihh = True
if niveau_v in (10, 1010) and int(G.nodes[v].get(ihh_field, 0)) > 25:
has_ihh = True
if filtrer_ivc and niveau_u in (2, 1002) and int(G.nodes[u].get("ivc", 0)) > 30:
has_ivc = True
if filtrer_ics and ((niveau_u == 1 and niveau_v == 2) or (niveau_u == 1001 and niveau_v == 1002) or (niveau_u == 10 and niveau_v in (1000, 1001))) and extraire_criticite(u, v) > 0.66:
has_criticite = True
for n in (u, v):
if niveaux.get(n) == 99 and int(G.nodes[n].get("isg", 0)) >= 60:
has_isg_critique = True
elif niveaux.get(n) in (11, 12, 1011, 1012):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60:
has_isg_critique = True
if logique_filtrage == "ET":
keep = True
if filtrer_ihh:
keep = keep and has_ihh
if filtrer_ivc:
keep = keep and has_ivc
if filtrer_ics:
keep = keep and has_criticite
if filtrer_isg:
keep = keep and has_isg_critique
if keep:
chemins_filtres.add(tuple(chemin))
elif logique_filtrage == "OU":
if (filtrer_ihh and has_ihh) or (filtrer_ivc and has_ivc) or (filtrer_ics and has_criticite) or (filtrer_isg and has_isg_critique):
chemins_filtres.add(tuple(chemin))
if any([filtrer_ics, filtrer_ivc, filtrer_ihh, filtrer_isg]):
chemins = list(chemins_filtres)
liens_chemins = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u, 999)
niveau_v = niveaux.get(v, 999)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens_chemins.add((u, v))
if not liens_chemins:
st.warning("Aucun chemin ne correspond aux critères.")
return
df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"])
df_liens = df_liens.groupby(
["source", "target"]).size().reset_index(name="value")
df_liens["criticite"] = df_liens.apply(
lambda row: extraire_criticite(row["source"], row["target"]), axis=1)
df_liens["value"] = 0.1
# Ne garder que les nœuds effectivement connectés
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
# Inclure les nœuds connectés + tous les nœuds 10xx traversés dans les chemins
noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"])
for chemin in chemins:
for n in chemin:
if niveaux.get(n) in niveaux_speciaux:
noeuds_utilises.add(n)
sorted_nodes = [
n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True)
if n in noeuds_utilises
]
def couleur_criticite(p):
if p <= 0.33:
return "darkgreen"
elif p <= 0.66:
return "orange"
else:
return "darkred"
df_liens["color"] = df_liens.apply(
lambda row: couleur_criticite(row["criticite"]) if row["criticite"] > 0 else "gray",
axis=1
)
all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel())
sorted_nodes = sorted(
all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True)
node_indices = {name: i for i, name in enumerate(sorted_nodes)}
sources = df_liens["source"].map(node_indices).tolist()
targets = df_liens["target"].map(node_indices).tolist()
values = df_liens["value"].tolist()
customdata = []
for n in sorted_nodes:
info = [f"{k}: {v}" for k, v in G.nodes[n].items()]
niveau = niveaux.get(n, 99)
# Ajout dun ISG hérité si applicable
if niveau in (11, 12, 1011, 1012):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]:
isg_val = G.nodes[succ]["isg"]
info.append(f"isg (géographique): {isg_val}")
break
customdata.append("<br>".join(info))
def edge_info(u, v):
data = G.get_edge_data(u, v)
if not data:
return f"Relation : {u}{v}"
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
data = data[0]
base = [f"{k}: {v}" for k, v in data.items()]
return f"Relation : {u}{v}<br>" + "<br>".join(base)
link_customdata = [
edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows()
]
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=10,
thickness=8,
label=sorted_nodes,
x=[niveaux.get(n, 99) / 100 for n in sorted_nodes],
color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes],
customdata=customdata,
hovertemplate="%{customdata}<extra></extra>"
),
link=dict(
source=sources,
target=targets,
value=values,
color=df_liens["color"].tolist(),
customdata=link_customdata,
hovertemplate="%{customdata}<extra></extra>"
)
))
fig.update_layout(
title_text="Hiérarchie filtrée par niveaux et noeuds",
paper_bgcolor="white",
plot_bgcolor="white"
)
st.plotly_chart(fig)
if st.session_state.get("logged_in", False):
if liens_chemins:
G_export = nx.DiGraph()
for u, v in liens_chemins:
G_export.add_node(u, **G.nodes[u])
G_export.add_node(v, **G.nodes[v])
data = G.get_edge_data(u, v)
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
G_export.add_edge(u, v, **data[0])
elif isinstance(data, dict):
G_export.add_edge(u, v, **data)
else:
G_export.add_edge(u, v)
with tempfile.NamedTemporaryFile(delete=False, suffix=".dot", mode="w", encoding="utf-8") as f:
write_dot(G_export, f.name)
dot_path = f.name
with open(dot_path, encoding="utf-8") as f:
st.download_button(
label="Télécharger le fichier DOT filtré",
data=f.read(),
file_name="graphe_filtré.dot",
mime="text/plain"
)
dot_file_path = None
if st.session_state.onglet == "Instructions":
markdown_content = charger_instructions_depuis_gitea(INSTRUCTIONS)
if markdown_content:
st.markdown(markdown_content)
elif st.session_state.onglet == "Fiches":
st.markdown("# Affichage des fiches")
st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.")
st.markdown("---")
afficher_fiches()
else:
# Charger le graphe une seule fois
if "G_temp" not in st.session_state:
try:
if charger_schema_depuis_gitea(DOT_FILE):
st.session_state["G_temp"] = read_dot(DOT_FILE)
st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy()
dot_file_path = True
else:
dot_file_path = False
except Exception as e:
st.error(f"Erreur de lecture du fichier DOT : {e}")
dot_file_path = False
else:
dot_file_path = True
if dot_file_path:
G_temp = st.session_state["G_temp"]
G_temp_ivc = st.session_state["G_temp_ivc"]
else:
st.error("Impossible de charger le graphe pour cet onglet.")
if dot_file_path and st.session_state.onglet == "Analyse":
try:
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G_temp.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp])
G_temp.remove_nodes_from(
[n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
st.markdown("# Analyse")
st.markdown("## Sélection des nœuds de départ et d'arrivée")
valeur_defaut = "-- Sélectionner un niveau --"
niveau_choix = [valeur_defaut] + list(niveau_labels.values())
niveau_depart = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart")
if niveau_depart != "-- Sélectionner un niveau --":
niveau_depart = inverse_niveau_labels[niveau_depart]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart]
niveaux_arrivee_choix = [valeur_defaut] + niveaux_arrivee_possibles
analyse_niveau_arrivee = st.selectbox("Niveau d'arrivée", niveau_choix, key="analyse_niveau_arrivee")
if analyse_niveau_arrivee != "-- Sélectionner un niveau --":
niveau_arrivee = inverse_niveau_labels[analyse_niveau_arrivee]
minerais_selection = None
if niveau_depart < 2 < niveau_arrivee:
st.markdown("### Sélectionner un ou plusieurs minerais")
# Tous les nœuds de niveau 2 (minerai)
minerais_nodes = sorted([
n for n, d in G_temp.nodes(data=True)
if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2
])
minerais_selection = st.multiselect(
"Filtrer par minerais (optionnel)",
minerais_nodes,
key="analyse_minerais"
)
st.markdown("---")
depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee]
st.markdown("## Sélection fine des items")
noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart")
noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee")
st.markdown("---")
noeuds_depart = noeuds_depart if noeuds_depart else None
noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None
st.markdown("## Sélection des filtres pour identifier les vulnérabilités")
filtrer_ics = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_ics")
filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh")
ihh_type = None
if filtrer_ihh:
ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type")
filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg")
logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage")
st.markdown("---")
if st.button("Lancer lanalyse", type="primary", key="analyse_lancer"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
minerais=minerais_selection,
filtrer_ics=filtrer_ics,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
except Exception as e:
st.error(f"Erreur de prévisualisation du graphe : {e}")
elif dot_file_path and st.session_state.onglet == "Visualisations":
st.markdown("# Visualisations")
st.markdown("""## Indice de Herfindahl-Hirschmann - IHH vs Criticité
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité substituabilité du minerai
""")
if st.button("Lancer", key="btn_ihh_criticite"):
try:
lancer_visualisation_ihh_criticite(G_temp)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}")
st.markdown("""## Indice de Herfindahl-Hirschmann - IHH vs IVC
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité concurrentielle du minerai
""")
if st.button("Lancer", key="btn_ihh_ivc"):
try:
lancer_visualisation_ihh_ivc(G_temp_ivc)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs IVC : {e}")
elif dot_file_path and st.session_state.onglet == "Personnalisation":
G_temp = lancer_personnalisation(G_temp)
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("""</section>""", unsafe_allow_html=True)
st.markdown("</main>", unsafe_allow_html=True)
total_bytes = get_total_bytes_for_session(session_id)
afficher_pied_de_page()
afficher_impact(total_bytes)