Code/fabnum.py
2025-05-01 18:40:57 +02:00

1263 lines
48 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from networkx.drawing.nx_agraph import read_dot, write_dot
import pandas as pd
import plotly.graph_objects as go
import networkx as nx
import logging
import altair as alt
import numpy as np
from dotenv import load_dotenv
import os
import requests
import re
from tickets_fiche import gerer_tickets_fiche
import base64
from dateutil import parser
from datetime import datetime, timezone
import streamlit.components.v1 as components
from connexion import connexion, bouton_deconnexion
import tempfile
import json
# Configuration Gitea
load_dotenv()
GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORGANISATION = os.getenv("ORGANISATION", "fabnum")
DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches")
DEPOT_CODE = os.getenv("DEPOT_CODE", "code")
ENV = os.getenv("ENV")
ENV_CODE = os.getenv("ENV_CODE")
DOT_FILE = os.getenv("DOT_FILE")
INSTRUCTIONS = os.getenv("INSTRUCTIONS", "Instructions.md")
st.set_page_config(
page_title="Fabnum Analyse de chaîne",
page_icon="assets/weakness.png"
)
session_id = st.context.headers.get("x-session-id")
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def get_total_bytes_for_session(session_id):
total_bytes = 0
try:
with open("/var/log/nginx/fabnum-dev.access.log", "r") as f:
for line in f:
if session_id in line:
match = re.search(r'"GET.*?" \d+ (\d+)', line)
if match:
bytes_sent = int(match.group(1))
total_bytes += bytes_sent
except Exception as e:
st.error(f"Erreur lecture log: {e}")
return total_bytes
# Intégration du fichier CSS externe
with open("assets/styles.css") as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
header ="""
<header role="banner" aria-labelledby="entete-header">
<div class='wide-header'>
<p id='entete-header' class='titre-header'>FabNum - Chaîne de fabrication du numérique</p>"""
if ENV == "dev":
header+="<p>🔧 Vous êtes dans l'environnement de développement.</p>"
else:
header+="<p>Parcours de l'écosystème et identification des vulnérabilités.</p>"
header+="""
</div>
</header>
"""
st.markdown(header, unsafe_allow_html=True)
def charger_instructions_depuis_gitea(nom_fichier="Instructions.md"):
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/{nom_fichier}?ref={ENV}"
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
data = response.json()
contenu_md = base64.b64decode(data["content"]).decode("utf-8")
return contenu_md
except Exception as e:
st.error(f"Erreur lors du chargement des instructions depuis Gitea : {e}")
return None
def afficher_menu():
with st.sidebar:
st.markdown("""
<nav role="navigation" aria-label="Menu principal">
<div role="region" aria-label="Navigation principale" class="onglets-accessibles">
<hr />
<p class="decorative-heading">Navigation</p>
<hr />
""", unsafe_allow_html=True)
if "onglet" not in st.session_state:
st.session_state.onglet = "Instructions"
if st.button("Instructions"):
st.session_state.onglet = "Instructions"
if st.button("Personnalisation"):
st.session_state.onglet = "Personnalisation"
if st.button("Analyse"):
st.session_state.onglet = "Analyse"
if st.button("Visualisations"):
st.session_state.onglet = "Visualisations"
if st.button("Fiches"):
st.session_state.onglet = "Fiches"
st.markdown("---")
connexion()
# Si l'utilisateur est connecté, afficher le reste
if st.session_state.get("logged_in", False):
bouton_deconnexion()
st.markdown("""
<hr />
</div>
</nav>""", unsafe_allow_html=True)
afficher_menu()
st.markdown("""
<main role="main">
""", unsafe_allow_html=True)
def recuperer_date_dernier_commit_schema():
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_CODE}/commits?path={DOT_FILE}&sha={ENV_CODE}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
commits = response.json()
if commits:
latest_commit_date = parser.isoparse(commits[0]["commit"]["author"]["date"])
return latest_commit_date
else:
return None
except Exception as e:
st.error(f"Erreur lors de la récupération du dernier commit de {DOT_FILE} : {e}")
return None
def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"):
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_CODE}/contents/{DOT_FILE}?ref={ENV_CODE}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
remote_last_modified = recuperer_date_dernier_commit_schema()
local_last_modified = local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None
if not local_last_modified or remote_last_modified > local_last_modified:
dot_text = base64.b64decode(data["content"]).decode("utf-8")
with open(fichier_local, "w", encoding="utf-8") as f:
f.write(dot_text)
return "OK"
except Exception as e:
st.error(f"Erreur lors du chargement de {DOT_FILE} depuis Gitea : {e}")
return None
def charger_arborescence_fiches():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url_base = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/Documents?ref={ENV}"
try:
response = requests.get(url_base, headers=headers)
response.raise_for_status()
dossiers = response.json()
arbo = {}
for dossier in sorted(dossiers, key=lambda d: d['name'].lower()):
if dossier['type'] == 'dir':
dossier_name = dossier['name']
url_dossier = dossier['url']
response_dossier = requests.get(url_dossier, headers=headers)
response_dossier.raise_for_status()
fichiers = response_dossier.json()
fiches = sorted(
[
{"nom": f["name"], "download_url": f["download_url"]}
for f in fichiers if f["name"].endswith(".md")
],
key=lambda x: x['nom'].lower()
)
arbo[dossier_name] = fiches
return arbo
except Exception as e:
st.error(f"Erreur lors du chargement des fiches : {e}")
return {}
def couleur_noeud(n, niveaux, G):
niveau = niveaux.get(n, 99)
attrs = G.nodes[n]
# Niveau 99 : pays géographique avec isg
if niveau == 99:
isg = int(attrs.get("isg", -1))
return (
"darkred" if isg >= 60 else
"orange" if isg >= 31 else
"darkgreen" if isg >= 0 else
"gray"
)
# Niveau 11 ou 12 connecté à un pays géographique
if niveau in (11, 12, 1011, 1012):
for succ in G.successors(n):
if niveaux.get(succ) == 99:
isg = int(G.nodes[succ].get("isg", -1))
return (
"darkred" if isg >= 60 else
"orange" if isg >= 31 else
"darkgreen" if isg >= 0 else
"gray"
)
# Logique existante pour IHH / IVC
if niveau in (10, 1010) and attrs.get("ihh_pays"):
ihh = int(attrs["ihh_pays"])
return (
"darkgreen" if ihh <= 15 else
"orange" if ihh <= 25 else
"darkred"
)
elif niveau == 2 and attrs.get("ivc"):
ivc = int(attrs["ivc"])
return (
"darkgreen" if ivc <= 15 else
"orange" if ivc <= 30 else
"darkred"
)
return "lightblue"
def recuperer_donnees(graph, noeuds):
donnees = []
criticite = {}
for noeud in noeuds:
try:
operation, minerai = noeud.split('_', 1)
except ValueError:
logging.warning(f"Nom de nœud inattendu : {noeud}")
continue
if operation == "Traitement":
try:
fabrications = list(graph.predecessors(minerai))
valeurs = [
int(float(graph.get_edge_data(f, minerai)[0].get('criticite', 0)) * 100)
for f in fabrications
if graph.get_edge_data(f, minerai)
]
if valeurs:
criticite[minerai] = round(sum(valeurs) / len(valeurs))
except Exception as e:
logging.warning(f"Erreur lors du calcul de criticité pour {noeud} : {e}")
criticite[minerai] = 50
for noeud in noeuds:
try:
operation, minerai = noeud.split('_', 1)
ihh_pays = int(graph.nodes[noeud].get('ihh_pays', 0))
ihh_acteurs = int(graph.nodes[noeud].get('ihh_acteurs', 0))
criticite_val = criticite.get(minerai, 50)
criticite_cat = 1 if criticite_val <= 33 else (2 if criticite_val <= 66 else 3)
donnees.append({
'categorie': operation,
'nom': minerai,
'ihh_pays': ihh_pays,
'ihh_acteurs': ihh_acteurs,
'criticite_minerai': criticite_val,
'criticite_cat': criticite_cat
})
except Exception as e:
logging.error(f"Erreur sur le nœud {noeud} : {e}", exc_info=True)
return pd.DataFrame(donnees)
def afficher_graphique_altair(df):
ordre_personnalise = ['Assemblage', 'Fabrication', 'Traitement', 'Extraction']
categories = [cat for cat in ordre_personnalise if cat in df['categorie'].unique()]
for cat in categories:
st.markdown(f"### {cat}")
df_cat = df[df['categorie'] == cat].copy()
# Appliquer un jitter contrôlé pour disperser les nœuds proches
from collections import Counter
coord_pairs = list(zip(df_cat['ihh_pays'].round(1), df_cat['ihh_acteurs'].round(1)))
counts = Counter(coord_pairs)
offset_x = []
offset_y = {}
seen = Counter()
for pair in coord_pairs:
rank = seen[pair]
seen[pair] += 1
if counts[pair] > 1:
angle = rank * 1.5 # écart angulaire
radius = 0.8 + 0.4 * rank # spiral growing radius
offset_x.append(radius * np.cos(angle))
offset_y[pair] = radius * np.sin(angle)
else:
offset_x.append(0)
offset_y[pair] = 0
df_cat['ihh_pays'] += offset_x
df_cat['ihh_acteurs'] += [offset_y[p] for p in coord_pairs]
# Décalage des étiquettes pour les relier
df_cat['ihh_pays_text'] = df_cat['ihh_pays'] + 0.5
df_cat['ihh_acteurs_text'] = df_cat['ihh_acteurs'] + 0.5
base = alt.Chart(df_cat).encode(
x=alt.X('ihh_pays:Q', title='IHH Pays (%)'),
y=alt.Y('ihh_acteurs:Q', title='IHH Acteurs (%)'),
size=alt.Size('criticite_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None),
color=alt.Color('criticite_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred']))
)
points = base.mark_circle(opacity=0.6)
lines = alt.Chart(df_cat).mark_rule(strokeWidth=0.5, color='gray').encode(
x='ihh_pays:Q',
x2='ihh_pays_text:Q',
y='ihh_acteurs:Q',
y2='ihh_acteurs_text:Q'
)
label_chart = alt.Chart(df_cat).mark_text(
align='left', dx=3, dy=-3,
fontSize=8, font='Arial', angle=335
).encode(
x='ihh_pays_text:Q',
y='ihh_acteurs_text:Q',
text='nom:N'
)
hline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15))
hline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25))
vline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15))
vline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25))
chart = (points + lines + label_chart + hline_15 + hline_25 + vline_15 + vline_25).properties(
width=500,
height=400,
title=f"Concentration et criticité {cat}"
).interactive()
st.altair_chart(chart, use_container_width=True)
def lancer_visualisation_ihh_criticite(graph):
niveaux = nx.get_node_attributes(graph, "niveau")
noeuds = [n for n, v in niveaux.items() if v == "10" and "Reserves" not in n]
noeuds.sort()
df = recuperer_donnees(graph, noeuds)
if df.empty:
st.warning("Aucune donnée à visualiser.")
else:
afficher_graphique_altair(df)
def extraire_chemins_depuis(G, source):
chemins = []
stack = [(source, [source])]
while stack:
(node, path) = stack.pop()
voisins = list(G.successors(node))
if not voisins:
chemins.append(path)
else:
for voisin in voisins:
if voisin not in path:
stack.append((voisin, path + [voisin]))
return chemins
def extraire_chemins_vers(G, target, niveau_demande):
chemins = []
reverse_G = G.reverse()
niveaux = nx.get_node_attributes(G, "niveau")
stack = [(target, [target])]
while stack:
(node, path) = stack.pop()
voisins = list(reverse_G.successors(node))
if not voisins:
chemin_inverse = list(reversed(path))
contient_niveau = any(
int(niveaux.get(n, -1)) == niveau_demande for n in chemin_inverse
)
if contient_niveau:
chemins.append(chemin_inverse)
else:
for voisin in voisins:
if voisin not in path:
stack.append((voisin, path + [voisin]))
return chemins
def afficher_sankey(
G,
niveau_depart, niveau_arrivee,
noeuds_depart=None, noeuds_arrivee=None,
minerais=None,
filtrer_criticite=False, filtrer_ivc=False,
filtrer_ihh=False, filtrer_isg=False,
logique_filtrage="OU"):
niveaux = {}
for node, attrs in G.nodes(data=True):
niveau_str = attrs.get("niveau")
try:
if niveau_str:
niveaux[node] = int(str(niveau_str).strip('"'))
except ValueError:
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
for na in noeuds_arrivee:
tous_chemins = extraire_chemins_depuis(G, nd)
chemins.extend([chemin for chemin in tous_chemins if na in chemin])
elif noeuds_depart:
for nd in noeuds_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
elif noeuds_arrivee:
for na in noeuds_arrivee:
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
else:
sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart]
for nd in sources_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
if minerais:
chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)]
def extraire_criticite(u, v):
data = G.get_edge_data(u, v)
if not data:
return 0
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
return float(data[0].get("criticite", 0))
return float(data.get("criticite", 0))
liens_chemins = set()
chemins_filtres = set()
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
for chemin in chemins:
has_ihh = has_ivc = has_criticite = has_isg_critique = False
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u)
niveau_v = niveaux.get(v)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens_chemins.add((u, v))
if filtrer_ihh and ihh_type:
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
if niveau_u in (10, 1010) and int(G.nodes[u].get(ihh_field, 0)) > 25:
has_ihh = True
if niveau_v in (10, 1010) and int(G.nodes[v].get(ihh_field, 0)) > 25:
has_ihh = True
if filtrer_ivc and niveau_u in (2, 1002) and int(G.nodes[u].get("ivc", 0)) > 30:
has_ivc = True
if filtrer_criticite and ((niveau_u == 1 and niveau_v == 2) or (niveau_u == 1001 and niveau_v == 1002) or (niveau_u == 10 and niveau_v in (1000, 1001))) and extraire_criticite(u, v) > 0.66:
has_criticite = True
for n in (u, v):
if niveaux.get(n) == 99 and int(G.nodes[n].get("isg", 0)) >= 60:
has_isg_critique = True
elif niveaux.get(n) in (11, 12, 1011, 1012):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60:
has_isg_critique = True
if logique_filtrage == "ET":
keep = True
if filtrer_ihh:
keep = keep and has_ihh
if filtrer_ivc:
keep = keep and has_ivc
if filtrer_criticite:
keep = keep and has_criticite
if filtrer_isg:
keep = keep and has_isg_critique
if keep:
chemins_filtres.add(tuple(chemin))
elif logique_filtrage == "OU":
if (filtrer_ihh and has_ihh) or (filtrer_ivc and has_ivc) or (filtrer_criticite and has_criticite) or (filtrer_isg and has_isg_critique):
chemins_filtres.add(tuple(chemin))
if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]):
chemins = list(chemins_filtres)
liens_chemins = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u, 999)
niveau_v = niveaux.get(v, 999)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens_chemins.add((u, v))
if not liens_chemins:
st.warning("Aucun chemin ne correspond aux critères.")
return
df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"])
df_liens = df_liens.groupby(
["source", "target"]).size().reset_index(name="value")
df_liens["criticite"] = df_liens.apply(
lambda row: extraire_criticite(row["source"], row["target"]), axis=1)
df_liens["value"] = 0.1
# Ne garder que les nœuds effectivement connectés
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
# Inclure les nœuds connectés + tous les nœuds 10xx traversés dans les chemins
noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"])
for chemin in chemins:
for n in chemin:
if niveaux.get(n) in niveaux_speciaux:
noeuds_utilises.add(n)
sorted_nodes = [
n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True)
if n in noeuds_utilises
]
def couleur_criticite(p):
if p <= 0.33:
return "darkgreen"
elif p <= 0.66:
return "orange"
else:
return "darkred"
df_liens["color"] = df_liens.apply(
lambda row: couleur_criticite(row["criticite"]) if row["criticite"] > 0 else "gray",
axis=1
)
all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel())
sorted_nodes = sorted(
all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True)
node_indices = {name: i for i, name in enumerate(sorted_nodes)}
sources = df_liens["source"].map(node_indices).tolist()
targets = df_liens["target"].map(node_indices).tolist()
values = df_liens["value"].tolist()
customdata = []
for n in sorted_nodes:
info = [f"{k}: {v}" for k, v in G.nodes[n].items()]
niveau = niveaux.get(n, 99)
# Ajout dun ISG hérité si applicable
if niveau in (11, 12, 1011, 1012):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]:
isg_val = G.nodes[succ]["isg"]
info.append(f"isg (géographique): {isg_val}")
break
customdata.append("<br>".join(info))
def edge_info(u, v):
data = G.get_edge_data(u, v)
if not data:
return f"Relation : {u}{v}"
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
data = data[0]
base = [f"{k}: {v}" for k, v in data.items()]
return f"Relation : {u}{v}<br>" + "<br>".join(base)
link_customdata = [
edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows()
]
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=10,
thickness=8,
label=sorted_nodes,
x=[niveaux.get(n, 99) / 100 for n in sorted_nodes],
color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes],
customdata=customdata,
hovertemplate="%{customdata}<extra></extra>"
),
link=dict(
source=sources,
target=targets,
value=values,
color=df_liens["color"].tolist(),
customdata=link_customdata,
hovertemplate="%{customdata}<extra></extra>"
)
))
fig.update_layout(
title_text="Hiérarchie filtrée par niveaux et noeuds",
paper_bgcolor="white",
plot_bgcolor="white"
)
st.plotly_chart(fig)
if st.session_state.get("logged_in", False):
if liens_chemins:
G_export = nx.DiGraph()
for u, v in liens_chemins:
G_export.add_node(u, **G.nodes[u])
G_export.add_node(v, **G.nodes[v])
data = G.get_edge_data(u, v)
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
G_export.add_edge(u, v, **data[0])
elif isinstance(data, dict):
G_export.add_edge(u, v, **data)
else:
G_export.add_edge(u, v)
with tempfile.NamedTemporaryFile(delete=False, suffix=".dot", mode="w", encoding="utf-8") as f:
write_dot(G_export, f.name)
dot_path = f.name
with open(dot_path, encoding="utf-8") as f:
st.download_button(
label="Télécharger le fichier DOT filtré",
data=f.read(),
file_name="graphe_filtré.dot",
mime="text/plain"
)
def creer_graphes(donnees):
if not donnees:
st.warning("Aucune donnée à afficher.")
return
try:
df = pd.DataFrame(donnees)
df['ivc_cat'] = df['ivc'].apply(lambda x: 1 if x <= 15 else (2 if x <= 30 else 3))
# Jitter contrôlé pour dispersion
from collections import Counter
coord_pairs = list(zip(df['ihh_extraction'].round(1), df['ihh_reserves'].round(1)))
counts = Counter(coord_pairs)
offset_x, offset_y = [], {}
seen = Counter()
for pair in coord_pairs:
rank = seen[pair]
seen[pair] += 1
if counts[pair] > 1:
angle = rank * 1.5
radius = 0.8 + 0.4 * rank
offset_x.append(radius * np.cos(angle))
offset_y[pair] = radius * np.sin(angle)
else:
offset_x.append(0)
offset_y[pair] = 0
df['ihh_extraction'] += offset_x
df['ihh_reserves'] += [offset_y[p] for p in coord_pairs]
# Position des étiquettes
df['ihh_extraction_text'] = df['ihh_extraction'] + 0.5
df['ihh_reserves_text'] = df['ihh_reserves'] + 0.5
base = alt.Chart(df).encode(
x=alt.X('ihh_extraction:Q', title='IHH Extraction (%)'),
y=alt.Y('ihh_reserves:Q', title='IHH Réserves (%)'),
size=alt.Size('ivc_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None),
color=alt.Color('ivc_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])),
tooltip=['nom:N', 'ivc:Q', 'ihh_extraction:Q', 'ihh_reserves:Q']
)
points = base.mark_circle(opacity=0.6)
lines = alt.Chart(df).mark_rule(strokeWidth=0.5, color='gray').encode(
x='ihh_extraction:Q', x2='ihh_extraction_text:Q',
y='ihh_reserves:Q', y2='ihh_reserves_text:Q'
)
labels = alt.Chart(df).mark_text(
align='left', dx=10, dy=-10, fontSize=10, font='Arial', angle=335
).encode(
x='ihh_extraction_text:Q',
y='ihh_reserves_text:Q',
text='nom:N'
)
# Lignes seuils
hline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15))
hline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25))
vline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15))
vline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25))
chart = (points + lines + labels + hline_15 + hline_25 + vline_15 + vline_25).properties(
width=600,
height=500,
title="Concentration des ressources critiques vs vulnérabilité IVC"
).interactive()
st.altair_chart(chart, use_container_width=True)
except Exception as e:
st.error(f"Erreur lors de la création du graphique : {e}")
def recuperer_donnees_2(graph, noeuds_2):
donnees = []
for minerai in noeuds_2:
try:
missing = []
if not graph.has_node(minerai):
missing.append(minerai)
if not graph.has_node(f"Extraction_{minerai}"):
missing.append(f"Extraction_{minerai}")
if not graph.has_node(f"Reserves_{minerai}"):
missing.append(f"Reserves_{minerai}")
if missing:
print(f"⚠️ Nœuds manquants pour {minerai} : {', '.join(missing)} — Ignoré.")
continue
ivc = int(graph.nodes[minerai].get('ivc', 0))
ihh_extraction_pays = int(graph.nodes[f"Extraction_{minerai}"].get('ihh_pays', 0))
ihh_reserves_pays = int(graph.nodes[f"Reserves_{minerai}"].get('ihh_pays', 0))
donnees.append({
'nom': minerai,
'ivc': ivc,
'ihh_extraction': ihh_extraction_pays,
'ihh_reserves': ihh_reserves_pays
})
except Exception as e:
print(f"Erreur avec le nœud {minerai} : {e}")
return donnees
def lancer_visualisation_ihh_ivc(graph):
try:
noeuds_niveau_2 = [
n for n, data in graph.nodes(data=True)
if data.get("niveau") == "2" and "ivc" in data
]
if not noeuds_niveau_2:
# print("⚠️ Aucun nœud de niveau 2 avec un IVC détecté.")
return
data = recuperer_donnees_2(graph, noeuds_niveau_2)
creer_graphes(data)
except Exception as e:
print(f"Erreur lors du traitement du fichier DOT : {e}")
def afficher_fiches():
if "fiches_arbo" not in st.session_state:
st.session_state["fiches_arbo"] = charger_arborescence_fiches()
arbo = st.session_state.get("fiches_arbo", {})
if not arbo:
st.warning("Aucune fiche disponible pour le moment.")
return
dossiers = sorted(arbo.keys(), key=lambda x: x.lower())
dossier_choisi = st.selectbox("📁 Choisissez un dossier", ["-- Sélectionner un dossier --"] + dossiers)
if dossier_choisi and dossier_choisi != "-- Sélectionner un dossier --":
fiches = arbo.get(dossier_choisi, [])
noms_fiches = [f['nom'] for f in fiches]
fiche_choisie = st.selectbox("🗂️ Choisissez une fiche", ["-- Sélectionner une fiche --"] + noms_fiches)
if fiche_choisie and fiche_choisie != "-- Sélectionner une fiche --":
fiche_info = next((f for f in fiches if f["nom"] == fiche_choisie), None)
if fiche_info:
try:
headers = {"Authorization": f"token {GITEA_TOKEN}"}
reponse_fiche = requests.get(fiche_info["download_url"], headers=headers)
reponse_fiche.raise_for_status()
contenu_md = reponse_fiche.content.decode("utf-8")
# Nouveau traitement hiérarchique du markdown
lignes = contenu_md.split('\n')
sections_n1 = []
section_n1_actuelle = {"titre": None, "intro": [], "sections_n2": {}}
dans_section_n1 = False
section_n2_actuelle = None
for ligne in lignes:
if re.match(r'^#[^#]', ligne):
# Nouveau titre de niveau 1
if section_n1_actuelle["titre"] or section_n1_actuelle["intro"] or section_n1_actuelle["sections_n2"]:
sections_n1.append(section_n1_actuelle)
section_n1_actuelle = {
"titre": ligne.strip('# ').strip(),
"intro": [],
"sections_n2": {}
}
section_n2_actuelle = None
dans_section_n1 = True
elif re.match(r'^##[^#]', ligne):
# Nouveau titre de niveau 2
section_n2_actuelle = ligne.strip('# ').strip()
section_n1_actuelle["sections_n2"][section_n2_actuelle] = []
elif section_n2_actuelle:
section_n1_actuelle["sections_n2"][section_n2_actuelle].append(ligne)
elif dans_section_n1:
section_n1_actuelle["intro"].append(ligne)
# Ajouter la dernière section si présente
if section_n1_actuelle["titre"] or section_n1_actuelle["intro"] or section_n1_actuelle["sections_n2"]:
sections_n1.append(section_n1_actuelle)
# Affichage
for bloc in sections_n1:
if bloc["titre"]:
st.markdown(f"# {bloc['titre']}")
if bloc["intro"]:
st.markdown("\n".join(bloc["intro"]), unsafe_allow_html=True)
for sous_titre, contenu in bloc["sections_n2"].items():
with st.expander(sous_titre):
st.markdown("\n".join(contenu), unsafe_allow_html=True)
gerer_tickets_fiche(fiche_choisie)
except Exception as e:
st.error(f"Erreur lors du chargement de la fiche : {e}")
def lancer_personnalisation(G):
"""
Affiche et modifie uniquement les produits finaux personnalisables (ceux ajoutés)
et permet d'ajouter de nouveaux produits finaux.
Permet aussi d'importer et d'exporter la configuration personnalisée.
Retour:
G: le graphe modifié
"""
st.header("Personnalisation des produits finaux")
st.markdown("""
---
Dans cette section, vous pouvez ajouter des produits finaux qui ne sont pas présents dans la liste,
par exemple des produits que vous concevez vous même.
Pour chacun de ces produits, vous allez lui associer les composants qui le constituent, et si
cela vous convient, lui associer une opération d'assemblage existante.
Les modifications que vous faites ne sont pas stockées dans l'application. Vous pouvez toutefois
les enregistrer dans un fichier que vous pourrez recharger ultérieurement.
---
""")
# --- 1. Ajouter un nouveau produit final
st.subheader("Ajouter un nouveau produit final")
new_prod = st.text_input("Nom du nouveau produit (unique)", key="new_prod")
if new_prod:
# Opérations d'assemblage disponibles (niveau 10)
ops_dispo = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "10"
and any(
G.has_edge(p, n) and G.nodes[p].get("niveau") == "0"
for p in G.predecessors(n)
)
])
sel_new_op = st.selectbox(
"Opération d'assemblage (optionnelle)",
options=["-- Aucune --"] + ops_dispo,
index=0,
key="new_op"
)
# Composants de niveau 1
niveau1 = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "1"
])
sel_comps = st.multiselect(
"Composants à lier", options=niveau1, key="new_links"
)
if st.button("Créer le produit", key="btn_new"):
G.add_node(new_prod, niveau="0", personnalisation="oui", label=new_prod)
if sel_new_op != "-- Aucune --":
G.add_edge(new_prod, sel_new_op)
for comp in sel_comps:
G.add_edge(new_prod, comp)
st.success(
f"{new_prod} ajouté : {len(sel_comps)} composant(s)"
+ (f", opération {sel_new_op}" if sel_new_op != "-- Aucune --" else "")
)
st.markdown("---")
# --- 2. Modifier un produit final ajouté
st.subheader("Modifier un produit final ajouté")
produits0 = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "0" and d.get("personnalisation") == "oui"
])
sel_display = st.multiselect(
"Sélectionnez un produit final ajouté à modifier",
options=produits0,
key="prod_sel"
)
if sel_display:
prod = sel_display[0]
# Bouton de suppression
if st.button(f"Supprimer le produit {prod}", key=f"del_{prod}"):
G.remove_node(prod)
st.success(f"Produit « {prod} » supprimé.")
st.session_state.pop("prod_sel", None)
return G
# Opérations d'assemblage disponibles
ops_dispo = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "10"
and any(
G.has_edge(p, n) and G.nodes[p].get("niveau") == "0"
for p in G.predecessors(n)
)
])
# Opération actuelle
curr_ops = [
succ for succ in G.successors(prod)
if G.nodes[succ].get("niveau") == "10"
]
default_idx = 0
if curr_ops and curr_ops[0] in ops_dispo:
default_idx = ops_dispo.index(curr_ops[0]) + 1
sel_op = st.selectbox(
f"Opération d'assemblage liée à {prod} (optionnelle)",
options=["-- Aucune --"] + ops_dispo,
index=default_idx,
key=f"op_{prod}"
)
# Composants liés
niveau1 = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "1"
])
linked = [
succ for succ in G.successors(prod)
if G.nodes[succ].get("niveau") == "1"
]
nouveaux = st.multiselect(
f"Composants liés à {prod}",
options=niveau1,
default=linked,
key=f"links_{prod}"
)
# Mise à jour
if st.button(f"Mettre à jour {prod}", key=f"btn_{prod}"):
# Mettre à jour l'opération
for op in curr_ops:
if sel_op == "-- Aucune --" or op != sel_op:
G.remove_edge(prod, op)
if sel_op != "-- Aucune --" and (not curr_ops or sel_op not in curr_ops):
G.add_edge(prod, sel_op)
# Mettre à jour les composants
for comp in set(linked) - set(nouveaux):
G.remove_edge(prod, comp)
for comp in set(nouveaux) - set(linked):
G.add_edge(prod, comp)
st.success(
f"{prod} mis à jour : {len(nouveaux)} composant(s)"
+ (f", opération {sel_op}" if sel_op != "-- Aucune --" else "")
)
st.markdown("---")
# --- 3. Sauvegarder ou restaurer la configuration
st.subheader("Sauvegarder ou restaurer la configuration")
# Export
if st.button("Exporter configuration", key="export_config"):
nodes = [n for n, d in G.nodes(data=True) if d.get("personnalisation")=="oui"]
edges = [(u, v) for u, v in G.edges() if u in nodes]
conf = {"nodes": nodes, "edges": edges}
json_str = json.dumps(conf, ensure_ascii=False)
st.download_button(
label="Télécharger la config (JSON)",
data=json_str,
file_name="config_personnalisation.json",
mime="application/json"
)
# Import
uploaded = st.file_uploader(
"Importer une configuration (JSON) (max 100 Ko)",
type=["json"], key="import_config"
)
if uploaded:
if uploaded.size > 100 * 1024:
st.error("Fichier trop volumineux (max 100Ko).")
else:
try:
conf = json.load(uploaded)
for node in conf.get("nodes", []):
if not G.has_node(node):
G.add_node(node, niveau="0", personnalisation="oui", label=node)
for u, v in conf.get("edges", []):
if G.has_node(u) and G.has_node(v) and not G.has_edge(u, v):
G.add_edge(u, v)
st.success("Configuration importée avec succès.")
except Exception as e:
st.error(f"Erreur d'import: {e}")
return G
dot_file_path = None
if st.session_state.onglet == "Instructions":
markdown_content = charger_instructions_depuis_gitea(INSTRUCTIONS)
if markdown_content:
st.markdown(markdown_content)
elif st.session_state.onglet == "Fiches":
st.markdown("---")
st.markdown("**Affichage des fiches**")
st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.")
st.markdown("---")
afficher_fiches()
else:
# Charger le graphe une seule fois
if "G_temp" not in st.session_state:
try:
if charger_schema_depuis_gitea(DOT_FILE):
st.session_state["G_temp"] = read_dot(DOT_FILE)
st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy()
dot_file_path = True
else:
dot_file_path = False
except Exception as e:
st.error(f"Erreur de lecture du fichier DOT : {e}")
dot_file_path = False
else:
dot_file_path = True
if dot_file_path:
G_temp = st.session_state["G_temp"]
G_temp_ivc = st.session_state["G_temp_ivc"]
else:
st.error("Impossible de charger le graphe pour cet onglet.")
if dot_file_path and st.session_state.onglet == "Analyse":
try:
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G_temp.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp])
G_temp.remove_nodes_from(
[n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
st.markdown("---")
st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**")
st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche")
niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values())
niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart")
if niveau_depart_label != "-- Sélectionner un niveau --":
niveau_depart = inverse_niveau_labels[niveau_depart_label]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart]
st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite")
niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles
niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee")
if niveau_arrivee_label != "-- Sélectionner un niveau --":
niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label]
minerais_selection = None
if niveau_depart < 2 < niveau_arrivee:
# Tous les nœuds de niveau 2 (minerai)
minerais_nodes = sorted([
n for n, d in G_temp.nodes(data=True)
if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2
])
minerais_selection = st.multiselect(
"Filtrer par minerais (optionnel)",
options=minerais_nodes,
key="analyse_minerais"
)
st.markdown("---")
depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee]
st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**")
st.markdown("Sélectionner un ou plusieurs items du niveau de départ")
noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart")
st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée")
noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee")
st.markdown("---")
noeuds_depart = noeuds_depart if noeuds_depart else None
noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None
st.markdown("**Sélection des filtres pour identifier les vulnérabilités**")
filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite")
filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh")
ihh_type = None
if filtrer_ihh:
ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type")
filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg")
logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage")
st.markdown("---")
if st.button("Lancer lanalyse", type="primary", key="analyse_lancer"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
minerais=minerais_selection,
filtrer_criticite=filtrer_criticite,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
except Exception as e:
st.error(f"Erreur de prévisualisation du graphe : {e}")
elif dot_file_path and st.session_state.onglet == "Visualisations":
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité substituabilité du minerai
""")
if st.button("Lancer", key="btn_ihh_criticite"):
try:
lancer_visualisation_ihh_criticite(G_temp)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}")
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité concurrentielle du minerai
""")
if st.button("Lancer", key="btn_ihh_ivc"):
try:
lancer_visualisation_ihh_ivc(G_temp_ivc)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs IVC : {e}")
elif dot_file_path and st.session_state.onglet == "Personnalisation":
G_temp = lancer_personnalisation(G_temp)
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("""</section>""", unsafe_allow_html=True)
st.markdown("</main>", unsafe_allow_html=True)
st.markdown("""<section role="region" aria-label="Contenu principal" id="main-content">""", unsafe_allow_html=True)
st.markdown("""
<div role='contentinfo' aria-labelledby='footer-appli' class='wide-footer'>
<div class='info-footer'>
<p id='footer-appli' class='info-footer'>
Fabnum © 2025 <a href='mailto:stephan-pro@peccini.fr'>Contact</a> Licence <a href='https://creativecommons.org/licenses/by-nc-sa/4.0/' target='_blank'>CC BY-NC-SA</a>
</p>
<p class='footer-note'>
🌱 Calculs CO₂ via <a href='https://www.thegreenwebfoundation.org/' target='_blank'>The Green Web Foundation</a><br>
🚀 Propulsé par <a href='https://streamlit.io/' target='_blank'>Streamlit</a>
</p>
</div>
</div>
""", unsafe_allow_html=True)
total_bytes = get_total_bytes_for_session(session_id)
with st.sidebar:
components.html(f"""
<html lang="fr">
<head>
<title>Impact environnemental estimé de votre session</title>
</head>
<body>
<div role="region" aria-label="Impact environnemental de la session" class="impact-environnement">
<p class="decorative-heading">Impact environnemental de votre session</p>
<p>
<span id="network-usage">Chargement en cours…</span>
</p>
</div>
<script>
document.addEventListener("DOMContentLoaded", async function() {{
try {{
const module = await import("/assets/impact_co2.js");
module.calculerImpactCO2({total_bytes});
}} catch (error) {{
console.error("Erreur lors du chargement du module impact_co2.js", error);
}}
}});
</script>
</body>
</html>
""")