Code/fabnum.py
Fabrication du Numérique e7ad23e390 Initialisation
2025-04-27 11:27:56 +02:00

975 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from networkx.drawing.nx_agraph import read_dot
import pandas as pd
import plotly.graph_objects as go
import networkx as nx
import logging
import altair as alt
import numpy as np
from collections import OrderedDict
from dotenv import load_dotenv
import os
import requests
import re
from tickets_fiche import gerer_tickets_fiche
import base64
from dateutil import parser
from datetime import datetime, timezone
import copy
# Configuration Gitea
load_dotenv()
GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORGANISATION = os.getenv("ORGANISATION", "fabnum")
DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches")
ENV = os.getenv("ENV")
st.set_page_config(
page_title="Fabnum Analyse de chaîne",
page_icon="assets/weakness.png"
)
# Intégration du fichier CSS externe
with open("assets/styles.css") as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
header ="""
<div role='region' aria-labelledby='entete-header' class='wide-header'>
<p id='entete-header' class='titre-header'>FabNum - Chaîne de fabrication du numérique</p>"""
if ENV == "dev":
header+="<p>🔧 Vous êtes dans l'environnement de développement.</p>"
else:
header+="<p>Parcours de l'écosystème et identification des vulnérabilités.</p>"
header+="""
</div>
"""
st.markdown(header, unsafe_allow_html=True)
def recuperer_date_dernier_commit_schema():
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path=schema.txt&sha={ENV}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
commits = response.json()
if commits:
latest_commit_date = parser.isoparse(commits[0]["commit"]["author"]["date"])
return latest_commit_date
else:
return None
except Exception as e:
st.error(f"Erreur lors de la récupération du dernier commit de schema.txt : {e}")
return None
def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"):
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/schema.txt?ref={ENV}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
remote_last_modified = recuperer_date_dernier_commit_schema()
local_last_modified = local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None
if not local_last_modified or remote_last_modified > local_last_modified:
dot_text = base64.b64decode(data["content"]).decode("utf-8")
with open(fichier_local, "w", encoding="utf-8") as f:
f.write(dot_text)
return "OK"
except Exception as e:
st.error(f"Erreur lors du chargement de schema.txt depuis Gitea : {e}")
return None
@st.cache_data(ttl=600)
def charger_arborescence_fiches():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
branche = "dev" if ENV == "dev" else "public"
url_base = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/Documents?ref={branche}"
try:
response = requests.get(url_base, headers=headers)
response.raise_for_status()
dossiers = response.json()
arbo = {}
for dossier in sorted(dossiers, key=lambda d: d['name'].lower()):
if dossier['type'] == 'dir':
dossier_name = dossier['name']
url_dossier = dossier['url']
response_dossier = requests.get(url_dossier, headers=headers)
response_dossier.raise_for_status()
fichiers = response_dossier.json()
fiches = sorted(
[
{"nom": f["name"], "download_url": f["download_url"]}
for f in fichiers if f["name"].endswith(".md")
],
key=lambda x: x['nom'].lower()
)
arbo[dossier_name] = fiches
return arbo
except Exception as e:
st.error(f"Erreur lors du chargement des fiches : {e}")
return {}
def couleur_noeud(n, niveaux, G):
niveau = niveaux.get(n, 99)
attrs = G.nodes[n]
# Niveau 99 : pays géographique avec isg
if niveau == 99:
isg = int(attrs.get("isg", -1))
if isg >= 60:
return "darkred"
elif isg >= 31:
return "orange"
elif isg >= 0:
return "darkgreen"
else:
return "gray"
# Niveau 11 ou 12 connecté à un pays géographique
if niveau in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99:
isg = int(G.nodes[succ].get("isg", -1))
if isg >= 60:
return "darkred"
elif isg >= 31:
return "orange"
elif isg >= 0:
return "darkgreen"
else:
return "gray"
# Logique existante pour IHH / IVC
if niveau == 10 and attrs.get("ihh_pays"):
ihh = int(attrs["ihh_pays"])
if ihh <= 15:
return "darkgreen"
elif ihh <= 25:
return "orange"
else:
return "darkred"
elif niveau == 2 and attrs.get("ivc"):
ivc = int(attrs["ivc"])
if ivc <= 15:
return "darkgreen"
elif ivc <= 30:
return "orange"
else:
return "darkred"
return "lightblue"
def recuperer_donnees(graph, noeuds):
donnees = []
criticite = {}
for noeud in noeuds:
try:
operation, minerai = noeud.split('_', 1)
except ValueError:
logging.warning(f"Nom de nœud inattendu : {noeud}")
continue
if operation == "Traitement":
try:
fabrications = list(graph.predecessors(minerai))
valeurs = [
int(float(graph.get_edge_data(f, minerai)[0].get('criticite', 0)) * 100)
for f in fabrications
if graph.get_edge_data(f, minerai)
]
if valeurs:
criticite[minerai] = round(sum(valeurs) / len(valeurs))
except Exception as e:
logging.warning(f"Erreur lors du calcul de criticité pour {noeud} : {e}")
criticite[minerai] = 50
for noeud in noeuds:
try:
operation, minerai = noeud.split('_', 1)
ihh_pays = int(graph.nodes[noeud].get('ihh_pays', 0))
ihh_acteurs = int(graph.nodes[noeud].get('ihh_acteurs', 0))
criticite_val = criticite.get(minerai, 50)
criticite_cat = 1 if criticite_val <= 33 else (2 if criticite_val <= 66 else 3)
donnees.append({
'categorie': operation,
'nom': minerai,
'ihh_pays': ihh_pays,
'ihh_acteurs': ihh_acteurs,
'criticite_minerai': criticite_val,
'criticite_cat': criticite_cat
})
except Exception as e:
logging.error(f"Erreur sur le nœud {noeud} : {e}", exc_info=True)
return pd.DataFrame(donnees)
def afficher_graphique_altair(df):
ordre_personnalise = ['Assemblage', 'Fabrication', 'Traitement', 'Extraction']
categories = [cat for cat in ordre_personnalise if cat in df['categorie'].unique()]
for cat in categories:
st.markdown(f"### {cat}")
df_cat = df[df['categorie'] == cat].copy()
# Appliquer un jitter contrôlé pour disperser les nœuds proches
from collections import Counter
coord_pairs = list(zip(df_cat['ihh_pays'].round(1), df_cat['ihh_acteurs'].round(1)))
counts = Counter(coord_pairs)
offset_x = []
offset_y = {}
seen = Counter()
for pair in coord_pairs:
rank = seen[pair]
seen[pair] += 1
if counts[pair] > 1:
angle = rank * 1.5 # écart angulaire
radius = 0.8 + 0.4 * rank # spiral growing radius
offset_x.append(radius * np.cos(angle))
offset_y[pair] = radius * np.sin(angle)
else:
offset_x.append(0)
offset_y[pair] = 0
df_cat['ihh_pays'] += offset_x
df_cat['ihh_acteurs'] += [offset_y[p] for p in coord_pairs]
# Décalage des étiquettes pour les relier
df_cat['ihh_pays_text'] = df_cat['ihh_pays'] + 0.5
df_cat['ihh_acteurs_text'] = df_cat['ihh_acteurs'] + 0.5
base = alt.Chart(df_cat).encode(
x=alt.X('ihh_pays:Q', title='IHH Pays (%)'),
y=alt.Y('ihh_acteurs:Q', title='IHH Acteurs (%)'),
size=alt.Size('criticite_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None),
color=alt.Color('criticite_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred']))
)
points = base.mark_circle(opacity=0.6)
lines = alt.Chart(df_cat).mark_rule(strokeWidth=0.5, color='gray').encode(
x='ihh_pays:Q',
x2='ihh_pays_text:Q',
y='ihh_acteurs:Q',
y2='ihh_acteurs_text:Q'
)
label_chart = alt.Chart(df_cat).mark_text(
align='left', dx=3, dy=-3,
fontSize=8, font='Arial', angle=335
).encode(
x='ihh_pays_text:Q',
y='ihh_acteurs_text:Q',
text='nom:N'
)
hline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15))
hline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25))
vline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15))
vline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25))
chart = (points + lines + label_chart + hline_15 + hline_25 + vline_15 + vline_25).properties(
width=500,
height=400,
title=f"Concentration et criticité {cat}"
).interactive()
st.altair_chart(chart, use_container_width=True)
def lancer_visualisation_ihh_criticite(graph):
niveaux = nx.get_node_attributes(graph, "niveau")
noeuds = [n for n, v in niveaux.items() if v == "10" and "Reserves" not in n]
noeuds.sort()
df = recuperer_donnees(graph, noeuds)
if df.empty:
st.warning("Aucune donnée à visualiser.")
else:
afficher_graphique_altair(df)
def extraire_chemins_depuis(G, source):
chemins = []
stack = [(source, [source])]
while stack:
(node, path) = stack.pop()
voisins = list(G.successors(node))
if not voisins:
chemins.append(path)
else:
for voisin in voisins:
if voisin not in path:
stack.append((voisin, path + [voisin]))
return chemins
def extraire_chemins_vers(G, target, niveau_demande):
chemins = []
reverse_G = G.reverse()
niveaux = nx.get_node_attributes(G, "niveau")
stack = [(target, [target])]
while stack:
(node, path) = stack.pop()
voisins = list(reverse_G.successors(node))
if not voisins:
chemin_inverse = list(reversed(path))
contient_niveau = any(
int(niveaux.get(n, -1)) == niveau_demande for n in chemin_inverse
)
if contient_niveau:
chemins.append(chemin_inverse)
else:
for voisin in voisins:
if voisin not in path:
stack.append((voisin, path + [voisin]))
return chemins
def afficher_sankey(
G,
niveau_depart, niveau_arrivee,
noeuds_depart=None, noeuds_arrivee=None,
filtrer_criticite=False, filtrer_ivc=False, filtrer_ihh=False,
filtrer_isg=False,
logique_filtrage="OU"
):
niveaux = {}
for node, attrs in G.nodes(data=True):
# Conversion du niveau
niveau_str = attrs.get("niveau")
try:
if niveau_str:
niveaux[node] = int(str(niveau_str).strip('"'))
except ValueError:
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
# Suppression des attributs indésirables
ATTRIBUTS_SUPPRIMES = {"fillcolor", "fontcolor", "style", "fontsize"}
for attr in ATTRIBUTS_SUPPRIMES:
attrs.pop(attr, None)
# Réordonner : label d'abord
if "label" in attrs:
reordered = OrderedDict()
reordered["label"] = attrs["label"]
for k, v in attrs.items():
if k != "label":
reordered[k] = v
G.nodes[node].clear()
G.nodes[node].update(reordered)
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
for na in noeuds_arrivee:
tous_chemins = extraire_chemins_depuis(G, nd)
chemins.extend(
[chemin for chemin in tous_chemins if na in chemin])
elif noeuds_depart:
for nd in noeuds_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
elif noeuds_arrivee:
for na in noeuds_arrivee:
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
else:
sources_depart = [n for n in G.nodes() if niveaux.get(n)
== niveau_depart]
for nd in sources_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
def extraire_criticite(u, v):
data = G.get_edge_data(u, v)
if not data:
return 0
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
try:
return float(data[0].get("criticite", 0))
except:
return 0
return float(data.get("criticite", 0))
liens_chemins = set()
chemins_filtres = set()
for chemin in chemins:
has_ihh = False
has_ivc = False
has_criticite = False
has_isg_critique = False
for i in range(len(chemin)-1):
u, v = chemin[i], chemin[i+1]
if niveaux.get(u) is not None and niveaux.get(v) is not None:
if niveau_depart <= niveaux.get(u) <= niveau_arrivee and niveau_depart <= niveaux.get(v) <= niveau_arrivee:
liens_chemins.add((u, v))
# vérification des conditions critiques
if filtrer_ihh:
if filtrer_ihh and ihh_type:
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
if niveaux.get(u) == 10 and G.nodes[u].get(ihh_field) and int(G.nodes[u][ihh_field]) > 25:
has_ihh = True
elif niveaux.get(v) == 10 and G.nodes[v].get(ihh_field) and int(G.nodes[v][ihh_field]) > 25:
has_ihh = True
if filtrer_ivc and niveaux.get(u) == 2 and G.nodes[u].get("ivc") and int(G.nodes[u]["ivc"]) > 30:
has_ivc = True
if filtrer_criticite and niveaux.get(u) == 1 and niveaux.get(v) == 2 and extraire_criticite(u, v) > 0.66:
has_criticite = True
# Vérifie présence d'un isg >= 60
for n in (u, v):
if niveaux.get(n) == 99:
isg = int(G.nodes[n].get("isg", 0))
if isg >= 60:
has_isg_critique = True
elif niveaux.get(n) in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60:
has_isg_critique = True
if logique_filtrage == "ET":
keep = True
if filtrer_ihh:
keep = keep and has_ihh
if filtrer_ivc:
keep = keep and has_ivc
if filtrer_criticite:
keep = keep and has_criticite
if filtrer_isg:
keep = keep and has_isg_critique
if keep:
chemins_filtres.add(tuple(chemin))
elif logique_filtrage == "OU":
if (filtrer_ihh and has_ihh) or \
(filtrer_ivc and has_ivc) or \
(filtrer_criticite and has_criticite) or \
(filtrer_isg and has_isg_critique):
chemins_filtres.add(tuple(chemin))
if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]):
chemins = list(chemins_filtres)
liens_chemins = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
if niveau_depart <= niveaux.get(u, 999) <= niveau_arrivee and niveau_depart <= niveaux.get(v, 999) <= niveau_arrivee:
liens_chemins.add((u, v))
if not liens_chemins:
st.warning("Aucun chemin ne correspond aux critères.")
return
df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"])
df_liens = df_liens.groupby(
["source", "target"]).size().reset_index(name="value")
df_liens["criticite"] = df_liens.apply(
lambda row: extraire_criticite(row["source"], row["target"]), axis=1)
df_liens["value"] = 0.1
# Ne garder que les nœuds effectivement connectés
noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"])
sorted_nodes = [n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises]
def couleur_criticite(p):
if p <= 0.33:
return "darkgreen"
elif p <= 0.66:
return "orange"
else:
return "darkred"
df_liens["color"] = df_liens.apply(
lambda row: couleur_criticite(row["criticite"]) if niveaux.get(
row["source"]) == 1 and niveaux.get(row["target"]) == 2 else "gray",
axis=1
)
all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel())
sorted_nodes = sorted(
all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True)
node_indices = {name: i for i, name in enumerate(sorted_nodes)}
sources = df_liens["source"].map(node_indices).tolist()
targets = df_liens["target"].map(node_indices).tolist()
values = df_liens["value"].tolist()
customdata = []
for n in sorted_nodes:
info = [f"{k}: {v}" for k, v in G.nodes[n].items()]
niveau = niveaux.get(n, 99)
# Ajout dun ISG hérité si applicable
if niveau in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]:
isg_val = G.nodes[succ]["isg"]
info.append(f"isg (géographique): {isg_val}")
break
customdata.append("<br>".join(info))
def edge_info(u, v):
data = G.get_edge_data(u, v)
if not data:
return f"Relation : {u}{v}"
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
data = data[0]
base = [f"{k}: {v}" for k, v in data.items()]
return f"Relation : {u}{v}<br>" + "<br>".join(base)
link_customdata = [
edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows()
]
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=10,
thickness=8,
label=sorted_nodes,
x=[niveaux.get(n, 99) / 100 for n in sorted_nodes],
color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes],
customdata=customdata,
hovertemplate="%{customdata}<extra></extra>"
),
link=dict(
source=sources,
target=targets,
value=values,
color=df_liens["color"].tolist(),
customdata=link_customdata,
hovertemplate="%{customdata}<extra></extra>"
)
))
fig.update_layout(
title_text="Hiérarchie filtrée par niveaux et noeuds",
paper_bgcolor="white",
plot_bgcolor="white"
)
st.plotly_chart(fig)
def creer_graphes(donnees):
if not donnees:
st.warning("Aucune donnée à afficher.")
return
try:
df = pd.DataFrame(donnees)
df['ivc_cat'] = df['ivc'].apply(lambda x: 1 if x <= 15 else (2 if x <= 30 else 3))
# Jitter contrôlé pour dispersion
from collections import Counter
coord_pairs = list(zip(df['ihh_extraction'].round(1), df['ihh_reserves'].round(1)))
counts = Counter(coord_pairs)
offset_x, offset_y = [], {}
seen = Counter()
for pair in coord_pairs:
rank = seen[pair]
seen[pair] += 1
if counts[pair] > 1:
angle = rank * 1.5
radius = 0.8 + 0.4 * rank
offset_x.append(radius * np.cos(angle))
offset_y[pair] = radius * np.sin(angle)
else:
offset_x.append(0)
offset_y[pair] = 0
df['ihh_extraction'] += offset_x
df['ihh_reserves'] += [offset_y[p] for p in coord_pairs]
# Position des étiquettes
df['ihh_extraction_text'] = df['ihh_extraction'] + 0.5
df['ihh_reserves_text'] = df['ihh_reserves'] + 0.5
base = alt.Chart(df).encode(
x=alt.X('ihh_extraction:Q', title='IHH Extraction (%)'),
y=alt.Y('ihh_reserves:Q', title='IHH Réserves (%)'),
size=alt.Size('ivc_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None),
color=alt.Color('ivc_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])),
tooltip=['nom:N', 'ivc:Q', 'ihh_extraction:Q', 'ihh_reserves:Q']
)
points = base.mark_circle(opacity=0.6)
lines = alt.Chart(df).mark_rule(strokeWidth=0.5, color='gray').encode(
x='ihh_extraction:Q', x2='ihh_extraction_text:Q',
y='ihh_reserves:Q', y2='ihh_reserves_text:Q'
)
labels = alt.Chart(df).mark_text(
align='left', dx=10, dy=-10, fontSize=10, font='Arial', angle=335
).encode(
x='ihh_extraction_text:Q',
y='ihh_reserves_text:Q',
text='nom:N'
)
# Lignes seuils
hline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15))
hline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25))
vline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15))
vline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25))
chart = (points + lines + labels + hline_15 + hline_25 + vline_15 + vline_25).properties(
width=600,
height=500,
title="Concentration des ressources critiques vs vulnérabilité IVC"
).interactive()
st.altair_chart(chart, use_container_width=True)
except Exception as e:
st.error(f"Erreur lors de la création du graphique : {e}")
def recuperer_donnees_2(graph, noeuds_2):
donnees = []
for minerai in noeuds_2:
try:
missing = []
if not graph.has_node(minerai):
missing.append(minerai)
if not graph.has_node(f"Extraction_{minerai}"):
missing.append(f"Extraction_{minerai}")
if not graph.has_node(f"Reserves_{minerai}"):
missing.append(f"Reserves_{minerai}")
if missing:
print(f"⚠️ Nœuds manquants pour {minerai} : {', '.join(missing)} — Ignoré.")
continue
ivc = int(graph.nodes[minerai].get('ivc', 0))
ihh_extraction_pays = int(graph.nodes[f"Extraction_{minerai}"].get('ihh_pays', 0))
ihh_reserves_pays = int(graph.nodes[f"Reserves_{minerai}"].get('ihh_pays', 0))
donnees.append({
'nom': minerai,
'ivc': ivc,
'ihh_extraction': ihh_extraction_pays,
'ihh_reserves': ihh_reserves_pays
})
except Exception as e:
print(f"Erreur avec le nœud {minerai} : {e}")
return donnees
def lancer_visualisation_ihh_ivc(graph):
try:
noeuds_niveau_2 = [
n for n, data in graph.nodes(data=True)
if data.get("niveau") == "2" and "ivc" in data
]
if not noeuds_niveau_2:
# print("⚠️ Aucun nœud de niveau 2 avec un IVC détecté.")
return
data = recuperer_donnees_2(graph, noeuds_niveau_2)
creer_graphes(data)
except Exception as e:
print(f"Erreur lors du traitement du fichier DOT : {e}")
def afficher_fiches():
if "fiches_arbo" not in st.session_state:
st.session_state["fiches_arbo"] = charger_arborescence_fiches()
arbo = st.session_state.get("fiches_arbo", {})
if not arbo:
st.warning("Aucune fiche disponible pour le moment.")
return
dossiers = sorted(arbo.keys(), key=lambda x: x.lower())
dossier_choisi = st.selectbox("📁 Choisissez un dossier", ["-- Sélectionner un dossier --"] + dossiers)
if dossier_choisi and dossier_choisi != "-- Sélectionner un dossier --":
fiches = arbo.get(dossier_choisi, [])
noms_fiches = [f['nom'] for f in fiches]
fiche_choisie = st.selectbox("🗂️ Choisissez une fiche", ["-- Sélectionner une fiche --"] + noms_fiches)
if fiche_choisie and fiche_choisie != "-- Sélectionner une fiche --":
fiche_info = next((f for f in fiches if f["nom"] == fiche_choisie), None)
if fiche_info:
try:
headers = {"Authorization": f"token {GITEA_TOKEN}"}
reponse_fiche = requests.get(fiche_info["download_url"], headers=headers)
reponse_fiche.raise_for_status()
contenu_md = reponse_fiche.content.decode("utf-8")
# Traitement du markdown
lignes = contenu_md.split('\n')
contenu_niveau_1 = []
sections_niveau_2 = {}
section_actuelle = None
for ligne in lignes:
if re.match(r'^##[^#]', ligne):
section_actuelle = ligne.strip('# ').strip()
sections_niveau_2[section_actuelle] = []
elif section_actuelle:
sections_niveau_2[section_actuelle].append(ligne)
else:
contenu_niveau_1.append(ligne)
if contenu_niveau_1:
st.markdown("\n".join(contenu_niveau_1), unsafe_allow_html=True)
for titre, contenu in sections_niveau_2.items():
with st.expander(titre):
st.markdown("\n".join(contenu), unsafe_allow_html=True)
gerer_tickets_fiche(fiche_choisie)
except Exception as e:
st.error(f"Erreur lors du chargement de la fiche : {e}")
def afficher_fiches_old():
import streamlit as st
from pathlib import Path
base_path = Path("Fiches")
if not base_path.exists():
st.warning("Le dossier 'Fiches' est introuvable.")
return
dossiers = sorted([p for p in base_path.iterdir() if p.is_dir() and 'Criticités' not in p.name])
criticite_path = next((p for p in base_path.iterdir() if p.is_dir() and 'Criticités' in p.name), None)
if criticite_path:
dossiers.append(criticite_path)
noms_dossiers = [d.name for d in dossiers]
dossier_choisi = st.selectbox("📁 Dossiers disponibles", noms_dossiers)
chemin_dossier = base_path / dossier_choisi
fichiers_md = sorted(chemin_dossier.glob("*.md"))
noms_fichiers = []
fichiers_dict = {}
for f in fichiers_md:
try:
with f.open(encoding="utf-8") as md:
titre = md.readline().strip()
if ':' in titre:
titre = titre.split(':', 1)[1].strip()
else:
titre = f.stem
fichiers_dict[titre] = f.name
noms_fichiers.append(titre)
except Exception as e:
st.error(f"Erreur lecture fichier {f.name} : {e}")
noms_fichiers.sort()
fiche_label = st.selectbox("🗂️ Fiches Markdown", noms_fichiers)
fichier_choisi = fichiers_dict[fiche_label]
chemin_fichier = chemin_dossier / fichier_choisi
with chemin_fichier.open(encoding="utf-8") as f:
contenu = f.read()
st.markdown(contenu, unsafe_allow_html=True)
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
DOT_FILE = "schema.txt"
charger_schema_depuis_gitea(DOT_FILE)
# Charger le graphe une seule fois
try:
dot_file_path = True
if "G_temp" not in st.session_state:
if charger_schema_depuis_gitea(DOT_FILE):
st.session_state["G_temp"] = read_dot(DOT_FILE)
st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy()
else:
dot_file_path = False
G_temp = st.session_state["G_temp"]
G_temp_ivc = st.session_state["G_temp_ivc"]
except:
st.error("Erreur de lecture du fichier DOT")
dot_file_path = False
if dot_file_path:
st.markdown("""
<div role="form" aria-label="Navigation des onglets" class="onglets-accessibles">
""", unsafe_allow_html=True)
with st.sidebar:
st.markdown("---")
st.header("Navigation")
st.markdown("---")
if "onglet" not in st.session_state:
st.session_state.onglet = "Instructions"
if st.button("📄 Instructions"):
st.session_state.onglet = "Instructions"
if st.button("🔍 Analyse"):
st.session_state.onglet = "Analyse"
if st.button("📊 Visualisations"):
st.session_state.onglet = "Visualisations"
if st.button("📚 Fiches"):
st.session_state.onglet = "Fiches"
st.markdown("---")
if st.session_state.onglet == "Instructions":
with open("Instructions.md", "r", encoding="utf-8") as f:
markdown_content = f.read()
st.markdown(markdown_content)
elif st.session_state.onglet == "Analyse":
try:
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G_temp.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp])
G_temp.remove_nodes_from(
[n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
st.markdown("---")
st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**")
st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche")
niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values())
niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart")
if niveau_depart_label != "-- Sélectionner un niveau --":
niveau_depart = inverse_niveau_labels[niveau_depart_label]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart]
st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite")
niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles
niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee")
st.markdown("---")
if niveau_arrivee_label != "-- Sélectionner un niveau --":
niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label]
depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee]
st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**")
st.markdown("Sélectionner un ou plusieurs items du niveau de départ")
noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart")
st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée")
noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee")
st.markdown("---")
noeuds_depart = noeuds_depart if noeuds_depart else None
noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None
st.markdown("**Sélection des filtres pour identifier les vulnérabilités**")
filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite")
filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh")
ihh_type = None
if filtrer_ihh:
ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type")
filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg")
logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage")
st.markdown("---")
if st.button("Lancer lanalyse", type="primary", key="analyse_lancer"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
filtrer_criticite=filtrer_criticite,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
except Exception as e:
st.error(f"Erreur de prévisualisation du graphe : {e}")
elif st.session_state.onglet == "Visualisations":
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité substituabilité du minerai
""")
if st.button("Lancer", key="btn_ihh_criticite"):
try:
lancer_visualisation_ihh_criticite(G_temp)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}")
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité concurrentielle du minerai
""")
if st.button("Lancer", key="btn_ihh_ivc"):
try:
lancer_visualisation_ihh_ivc(G_temp_ivc)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs IVC : {e}")
elif st.session_state.onglet == "Fiches":
st.markdown("---")
st.markdown("**Affichage des fiches**")
st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.")
st.markdown("---")
afficher_fiches()
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("""
<div role='contentinfo' aria-labelledby='footer-appli' class='wide-footer'>
<div class='info-footer'>
<p id='footer-appli' class='info-footer'>Fabnum © 2025 <a href='mailto:stephan-pro@peccini.fr'>Contact</a> Licence <a href='https://creativecommons.org/licenses/by-nc-sa/4.0/'>CC BY-NC-SA </a></p>
</div>
</div>
""", unsafe_allow_html=True
)