import streamlit as st
from networkx.drawing.nx_agraph import read_dot
import pandas as pd
import plotly.graph_objects as go
import networkx as nx
import logging
import altair as alt
import numpy as np
from collections import OrderedDict
from dotenv import load_dotenv
import os
import requests
import re
from tickets_fiche import gerer_tickets_fiche
import base64
from dateutil import parser
from datetime import datetime, timezone
import copy
# Configuration Gitea
load_dotenv()
GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORGANISATION = os.getenv("ORGANISATION", "fabnum")
DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches")
ENV = os.getenv("ENV")
st.set_page_config(
page_title="Fabnum – Analyse de chaîne",
page_icon="assets/weakness.png"
)
# Intégration du fichier CSS externe
with open("assets/styles.css") as f:
st.markdown(f"", unsafe_allow_html=True)
header ="""
"""
st.markdown(header, unsafe_allow_html=True)
def recuperer_date_dernier_commit_schema():
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path=schema.txt&sha={ENV}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
commits = response.json()
if commits:
latest_commit_date = parser.isoparse(commits[0]["commit"]["author"]["date"])
return latest_commit_date
else:
return None
except Exception as e:
st.error(f"Erreur lors de la récupération du dernier commit de schema.txt : {e}")
return None
def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"):
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/schema.txt?ref={ENV}"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
remote_last_modified = recuperer_date_dernier_commit_schema()
local_last_modified = local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None
if not local_last_modified or remote_last_modified > local_last_modified:
dot_text = base64.b64decode(data["content"]).decode("utf-8")
with open(fichier_local, "w", encoding="utf-8") as f:
f.write(dot_text)
return "OK"
except Exception as e:
st.error(f"Erreur lors du chargement de schema.txt depuis Gitea : {e}")
return None
@st.cache_data(ttl=600)
def charger_arborescence_fiches():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
branche = "dev" if ENV == "dev" else "public"
url_base = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/Documents?ref={branche}"
try:
response = requests.get(url_base, headers=headers)
response.raise_for_status()
dossiers = response.json()
arbo = {}
for dossier in sorted(dossiers, key=lambda d: d['name'].lower()):
if dossier['type'] == 'dir':
dossier_name = dossier['name']
url_dossier = dossier['url']
response_dossier = requests.get(url_dossier, headers=headers)
response_dossier.raise_for_status()
fichiers = response_dossier.json()
fiches = sorted(
[
{"nom": f["name"], "download_url": f["download_url"]}
for f in fichiers if f["name"].endswith(".md")
],
key=lambda x: x['nom'].lower()
)
arbo[dossier_name] = fiches
return arbo
except Exception as e:
st.error(f"Erreur lors du chargement des fiches : {e}")
return {}
def couleur_noeud(n, niveaux, G):
niveau = niveaux.get(n, 99)
attrs = G.nodes[n]
# Niveau 99 : pays géographique avec isg
if niveau == 99:
isg = int(attrs.get("isg", -1))
if isg >= 60:
return "darkred"
elif isg >= 31:
return "orange"
elif isg >= 0:
return "darkgreen"
else:
return "gray"
# Niveau 11 ou 12 connecté à un pays géographique
if niveau in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99:
isg = int(G.nodes[succ].get("isg", -1))
if isg >= 60:
return "darkred"
elif isg >= 31:
return "orange"
elif isg >= 0:
return "darkgreen"
else:
return "gray"
# Logique existante pour IHH / IVC
if niveau == 10 and attrs.get("ihh_pays"):
ihh = int(attrs["ihh_pays"])
if ihh <= 15:
return "darkgreen"
elif ihh <= 25:
return "orange"
else:
return "darkred"
elif niveau == 2 and attrs.get("ivc"):
ivc = int(attrs["ivc"])
if ivc <= 15:
return "darkgreen"
elif ivc <= 30:
return "orange"
else:
return "darkred"
return "lightblue"
def recuperer_donnees(graph, noeuds):
donnees = []
criticite = {}
for noeud in noeuds:
try:
operation, minerai = noeud.split('_', 1)
except ValueError:
logging.warning(f"Nom de nœud inattendu : {noeud}")
continue
if operation == "Traitement":
try:
fabrications = list(graph.predecessors(minerai))
valeurs = [
int(float(graph.get_edge_data(f, minerai)[0].get('criticite', 0)) * 100)
for f in fabrications
if graph.get_edge_data(f, minerai)
]
if valeurs:
criticite[minerai] = round(sum(valeurs) / len(valeurs))
except Exception as e:
logging.warning(f"Erreur lors du calcul de criticité pour {noeud} : {e}")
criticite[minerai] = 50
for noeud in noeuds:
try:
operation, minerai = noeud.split('_', 1)
ihh_pays = int(graph.nodes[noeud].get('ihh_pays', 0))
ihh_acteurs = int(graph.nodes[noeud].get('ihh_acteurs', 0))
criticite_val = criticite.get(minerai, 50)
criticite_cat = 1 if criticite_val <= 33 else (2 if criticite_val <= 66 else 3)
donnees.append({
'categorie': operation,
'nom': minerai,
'ihh_pays': ihh_pays,
'ihh_acteurs': ihh_acteurs,
'criticite_minerai': criticite_val,
'criticite_cat': criticite_cat
})
except Exception as e:
logging.error(f"Erreur sur le nœud {noeud} : {e}", exc_info=True)
return pd.DataFrame(donnees)
def afficher_graphique_altair(df):
ordre_personnalise = ['Assemblage', 'Fabrication', 'Traitement', 'Extraction']
categories = [cat for cat in ordre_personnalise if cat in df['categorie'].unique()]
for cat in categories:
st.markdown(f"### {cat}")
df_cat = df[df['categorie'] == cat].copy()
# Appliquer un jitter contrôlé pour disperser les nœuds proches
from collections import Counter
coord_pairs = list(zip(df_cat['ihh_pays'].round(1), df_cat['ihh_acteurs'].round(1)))
counts = Counter(coord_pairs)
offset_x = []
offset_y = {}
seen = Counter()
for pair in coord_pairs:
rank = seen[pair]
seen[pair] += 1
if counts[pair] > 1:
angle = rank * 1.5 # écart angulaire
radius = 0.8 + 0.4 * rank # spiral growing radius
offset_x.append(radius * np.cos(angle))
offset_y[pair] = radius * np.sin(angle)
else:
offset_x.append(0)
offset_y[pair] = 0
df_cat['ihh_pays'] += offset_x
df_cat['ihh_acteurs'] += [offset_y[p] for p in coord_pairs]
# Décalage des étiquettes pour les relier
df_cat['ihh_pays_text'] = df_cat['ihh_pays'] + 0.5
df_cat['ihh_acteurs_text'] = df_cat['ihh_acteurs'] + 0.5
base = alt.Chart(df_cat).encode(
x=alt.X('ihh_pays:Q', title='IHH Pays (%)'),
y=alt.Y('ihh_acteurs:Q', title='IHH Acteurs (%)'),
size=alt.Size('criticite_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None),
color=alt.Color('criticite_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred']))
)
points = base.mark_circle(opacity=0.6)
lines = alt.Chart(df_cat).mark_rule(strokeWidth=0.5, color='gray').encode(
x='ihh_pays:Q',
x2='ihh_pays_text:Q',
y='ihh_acteurs:Q',
y2='ihh_acteurs_text:Q'
)
label_chart = alt.Chart(df_cat).mark_text(
align='left', dx=3, dy=-3,
fontSize=8, font='Arial', angle=335
).encode(
x='ihh_pays_text:Q',
y='ihh_acteurs_text:Q',
text='nom:N'
)
hline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15))
hline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25))
vline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15))
vline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25))
chart = (points + lines + label_chart + hline_15 + hline_25 + vline_15 + vline_25).properties(
width=500,
height=400,
title=f"Concentration et criticité – {cat}"
).interactive()
st.altair_chart(chart, use_container_width=True)
def lancer_visualisation_ihh_criticite(graph):
niveaux = nx.get_node_attributes(graph, "niveau")
noeuds = [n for n, v in niveaux.items() if v == "10" and "Reserves" not in n]
noeuds.sort()
df = recuperer_donnees(graph, noeuds)
if df.empty:
st.warning("Aucune donnée à visualiser.")
else:
afficher_graphique_altair(df)
def extraire_chemins_depuis(G, source):
chemins = []
stack = [(source, [source])]
while stack:
(node, path) = stack.pop()
voisins = list(G.successors(node))
if not voisins:
chemins.append(path)
else:
for voisin in voisins:
if voisin not in path:
stack.append((voisin, path + [voisin]))
return chemins
def extraire_chemins_vers(G, target, niveau_demande):
chemins = []
reverse_G = G.reverse()
niveaux = nx.get_node_attributes(G, "niveau")
stack = [(target, [target])]
while stack:
(node, path) = stack.pop()
voisins = list(reverse_G.successors(node))
if not voisins:
chemin_inverse = list(reversed(path))
contient_niveau = any(
int(niveaux.get(n, -1)) == niveau_demande for n in chemin_inverse
)
if contient_niveau:
chemins.append(chemin_inverse)
else:
for voisin in voisins:
if voisin not in path:
stack.append((voisin, path + [voisin]))
return chemins
def afficher_sankey(
G,
niveau_depart, niveau_arrivee,
noeuds_depart=None, noeuds_arrivee=None,
filtrer_criticite=False, filtrer_ivc=False, filtrer_ihh=False,
filtrer_isg=False,
logique_filtrage="OU"
):
niveaux = {}
for node, attrs in G.nodes(data=True):
# Conversion du niveau
niveau_str = attrs.get("niveau")
try:
if niveau_str:
niveaux[node] = int(str(niveau_str).strip('"'))
except ValueError:
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
# Suppression des attributs indésirables
ATTRIBUTS_SUPPRIMES = {"fillcolor", "fontcolor", "style", "fontsize"}
for attr in ATTRIBUTS_SUPPRIMES:
attrs.pop(attr, None)
# Réordonner : label d'abord
if "label" in attrs:
reordered = OrderedDict()
reordered["label"] = attrs["label"]
for k, v in attrs.items():
if k != "label":
reordered[k] = v
G.nodes[node].clear()
G.nodes[node].update(reordered)
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
for na in noeuds_arrivee:
tous_chemins = extraire_chemins_depuis(G, nd)
chemins.extend(
[chemin for chemin in tous_chemins if na in chemin])
elif noeuds_depart:
for nd in noeuds_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
elif noeuds_arrivee:
for na in noeuds_arrivee:
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
else:
sources_depart = [n for n in G.nodes() if niveaux.get(n)
== niveau_depart]
for nd in sources_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
def extraire_criticite(u, v):
data = G.get_edge_data(u, v)
if not data:
return 0
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
try:
return float(data[0].get("criticite", 0))
except:
return 0
return float(data.get("criticite", 0))
liens_chemins = set()
chemins_filtres = set()
for chemin in chemins:
has_ihh = False
has_ivc = False
has_criticite = False
has_isg_critique = False
for i in range(len(chemin)-1):
u, v = chemin[i], chemin[i+1]
if niveaux.get(u) is not None and niveaux.get(v) is not None:
if niveau_depart <= niveaux.get(u) <= niveau_arrivee and niveau_depart <= niveaux.get(v) <= niveau_arrivee:
liens_chemins.add((u, v))
# vérification des conditions critiques
if filtrer_ihh:
if filtrer_ihh and ihh_type:
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
if niveaux.get(u) == 10 and G.nodes[u].get(ihh_field) and int(G.nodes[u][ihh_field]) > 25:
has_ihh = True
elif niveaux.get(v) == 10 and G.nodes[v].get(ihh_field) and int(G.nodes[v][ihh_field]) > 25:
has_ihh = True
if filtrer_ivc and niveaux.get(u) == 2 and G.nodes[u].get("ivc") and int(G.nodes[u]["ivc"]) > 30:
has_ivc = True
if filtrer_criticite and niveaux.get(u) == 1 and niveaux.get(v) == 2 and extraire_criticite(u, v) > 0.66:
has_criticite = True
# Vérifie présence d'un isg >= 60
for n in (u, v):
if niveaux.get(n) == 99:
isg = int(G.nodes[n].get("isg", 0))
if isg >= 60:
has_isg_critique = True
elif niveaux.get(n) in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60:
has_isg_critique = True
if logique_filtrage == "ET":
keep = True
if filtrer_ihh:
keep = keep and has_ihh
if filtrer_ivc:
keep = keep and has_ivc
if filtrer_criticite:
keep = keep and has_criticite
if filtrer_isg:
keep = keep and has_isg_critique
if keep:
chemins_filtres.add(tuple(chemin))
elif logique_filtrage == "OU":
if (filtrer_ihh and has_ihh) or \
(filtrer_ivc and has_ivc) or \
(filtrer_criticite and has_criticite) or \
(filtrer_isg and has_isg_critique):
chemins_filtres.add(tuple(chemin))
if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]):
chemins = list(chemins_filtres)
liens_chemins = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
if niveau_depart <= niveaux.get(u, 999) <= niveau_arrivee and niveau_depart <= niveaux.get(v, 999) <= niveau_arrivee:
liens_chemins.add((u, v))
if not liens_chemins:
st.warning("Aucun chemin ne correspond aux critères.")
return
df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"])
df_liens = df_liens.groupby(
["source", "target"]).size().reset_index(name="value")
df_liens["criticite"] = df_liens.apply(
lambda row: extraire_criticite(row["source"], row["target"]), axis=1)
df_liens["value"] = 0.1
# Ne garder que les nœuds effectivement connectés
noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"])
sorted_nodes = [n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises]
def couleur_criticite(p):
if p <= 0.33:
return "darkgreen"
elif p <= 0.66:
return "orange"
else:
return "darkred"
df_liens["color"] = df_liens.apply(
lambda row: couleur_criticite(row["criticite"]) if niveaux.get(
row["source"]) == 1 and niveaux.get(row["target"]) == 2 else "gray",
axis=1
)
all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel())
sorted_nodes = sorted(
all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True)
node_indices = {name: i for i, name in enumerate(sorted_nodes)}
sources = df_liens["source"].map(node_indices).tolist()
targets = df_liens["target"].map(node_indices).tolist()
values = df_liens["value"].tolist()
customdata = []
for n in sorted_nodes:
info = [f"{k}: {v}" for k, v in G.nodes[n].items()]
niveau = niveaux.get(n, 99)
# Ajout d’un ISG hérité si applicable
if niveau in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]:
isg_val = G.nodes[succ]["isg"]
info.append(f"isg (géographique): {isg_val}")
break
customdata.append("
".join(info))
def edge_info(u, v):
data = G.get_edge_data(u, v)
if not data:
return f"Relation : {u} → {v}"
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
data = data[0]
base = [f"{k}: {v}" for k, v in data.items()]
return f"Relation : {u} → {v}
" + "
".join(base)
link_customdata = [
edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows()
]
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=10,
thickness=8,
label=sorted_nodes,
x=[niveaux.get(n, 99) / 100 for n in sorted_nodes],
color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes],
customdata=customdata,
hovertemplate="%{customdata}"
),
link=dict(
source=sources,
target=targets,
value=values,
color=df_liens["color"].tolist(),
customdata=link_customdata,
hovertemplate="%{customdata}"
)
))
fig.update_layout(
title_text="Hiérarchie filtrée par niveaux et noeuds",
paper_bgcolor="white",
plot_bgcolor="white"
)
st.plotly_chart(fig)
def creer_graphes(donnees):
if not donnees:
st.warning("Aucune donnée à afficher.")
return
try:
df = pd.DataFrame(donnees)
df['ivc_cat'] = df['ivc'].apply(lambda x: 1 if x <= 15 else (2 if x <= 30 else 3))
# Jitter contrôlé pour dispersion
from collections import Counter
coord_pairs = list(zip(df['ihh_extraction'].round(1), df['ihh_reserves'].round(1)))
counts = Counter(coord_pairs)
offset_x, offset_y = [], {}
seen = Counter()
for pair in coord_pairs:
rank = seen[pair]
seen[pair] += 1
if counts[pair] > 1:
angle = rank * 1.5
radius = 0.8 + 0.4 * rank
offset_x.append(radius * np.cos(angle))
offset_y[pair] = radius * np.sin(angle)
else:
offset_x.append(0)
offset_y[pair] = 0
df['ihh_extraction'] += offset_x
df['ihh_reserves'] += [offset_y[p] for p in coord_pairs]
# Position des étiquettes
df['ihh_extraction_text'] = df['ihh_extraction'] + 0.5
df['ihh_reserves_text'] = df['ihh_reserves'] + 0.5
base = alt.Chart(df).encode(
x=alt.X('ihh_extraction:Q', title='IHH Extraction (%)'),
y=alt.Y('ihh_reserves:Q', title='IHH Réserves (%)'),
size=alt.Size('ivc_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None),
color=alt.Color('ivc_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])),
tooltip=['nom:N', 'ivc:Q', 'ihh_extraction:Q', 'ihh_reserves:Q']
)
points = base.mark_circle(opacity=0.6)
lines = alt.Chart(df).mark_rule(strokeWidth=0.5, color='gray').encode(
x='ihh_extraction:Q', x2='ihh_extraction_text:Q',
y='ihh_reserves:Q', y2='ihh_reserves_text:Q'
)
labels = alt.Chart(df).mark_text(
align='left', dx=10, dy=-10, fontSize=10, font='Arial', angle=335
).encode(
x='ihh_extraction_text:Q',
y='ihh_reserves_text:Q',
text='nom:N'
)
# Lignes seuils
hline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15))
hline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25))
vline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15))
vline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25))
chart = (points + lines + labels + hline_15 + hline_25 + vline_15 + vline_25).properties(
width=600,
height=500,
title="Concentration des ressources critiques vs vulnérabilité IVC"
).interactive()
st.altair_chart(chart, use_container_width=True)
except Exception as e:
st.error(f"Erreur lors de la création du graphique : {e}")
def recuperer_donnees_2(graph, noeuds_2):
donnees = []
for minerai in noeuds_2:
try:
missing = []
if not graph.has_node(minerai):
missing.append(minerai)
if not graph.has_node(f"Extraction_{minerai}"):
missing.append(f"Extraction_{minerai}")
if not graph.has_node(f"Reserves_{minerai}"):
missing.append(f"Reserves_{minerai}")
if missing:
print(f"⚠️ Nœuds manquants pour {minerai} : {', '.join(missing)} — Ignoré.")
continue
ivc = int(graph.nodes[minerai].get('ivc', 0))
ihh_extraction_pays = int(graph.nodes[f"Extraction_{minerai}"].get('ihh_pays', 0))
ihh_reserves_pays = int(graph.nodes[f"Reserves_{minerai}"].get('ihh_pays', 0))
donnees.append({
'nom': minerai,
'ivc': ivc,
'ihh_extraction': ihh_extraction_pays,
'ihh_reserves': ihh_reserves_pays
})
except Exception as e:
print(f"Erreur avec le nœud {minerai} : {e}")
return donnees
def lancer_visualisation_ihh_ivc(graph):
try:
noeuds_niveau_2 = [
n for n, data in graph.nodes(data=True)
if data.get("niveau") == "2" and "ivc" in data
]
if not noeuds_niveau_2:
# print("⚠️ Aucun nœud de niveau 2 avec un IVC détecté.")
return
data = recuperer_donnees_2(graph, noeuds_niveau_2)
creer_graphes(data)
except Exception as e:
print(f"Erreur lors du traitement du fichier DOT : {e}")
def afficher_fiches():
if "fiches_arbo" not in st.session_state:
st.session_state["fiches_arbo"] = charger_arborescence_fiches()
arbo = st.session_state.get("fiches_arbo", {})
if not arbo:
st.warning("Aucune fiche disponible pour le moment.")
return
dossiers = sorted(arbo.keys(), key=lambda x: x.lower())
dossier_choisi = st.selectbox("📁 Choisissez un dossier", ["-- Sélectionner un dossier --"] + dossiers)
if dossier_choisi and dossier_choisi != "-- Sélectionner un dossier --":
fiches = arbo.get(dossier_choisi, [])
noms_fiches = [f['nom'] for f in fiches]
fiche_choisie = st.selectbox("🗂️ Choisissez une fiche", ["-- Sélectionner une fiche --"] + noms_fiches)
if fiche_choisie and fiche_choisie != "-- Sélectionner une fiche --":
fiche_info = next((f for f in fiches if f["nom"] == fiche_choisie), None)
if fiche_info:
try:
headers = {"Authorization": f"token {GITEA_TOKEN}"}
reponse_fiche = requests.get(fiche_info["download_url"], headers=headers)
reponse_fiche.raise_for_status()
contenu_md = reponse_fiche.content.decode("utf-8")
# Traitement du markdown
lignes = contenu_md.split('\n')
contenu_niveau_1 = []
sections_niveau_2 = {}
section_actuelle = None
for ligne in lignes:
if re.match(r'^##[^#]', ligne):
section_actuelle = ligne.strip('# ').strip()
sections_niveau_2[section_actuelle] = []
elif section_actuelle:
sections_niveau_2[section_actuelle].append(ligne)
else:
contenu_niveau_1.append(ligne)
if contenu_niveau_1:
st.markdown("\n".join(contenu_niveau_1), unsafe_allow_html=True)
for titre, contenu in sections_niveau_2.items():
with st.expander(titre):
st.markdown("\n".join(contenu), unsafe_allow_html=True)
gerer_tickets_fiche(fiche_choisie)
except Exception as e:
st.error(f"Erreur lors du chargement de la fiche : {e}")
def afficher_fiches_old():
import streamlit as st
from pathlib import Path
base_path = Path("Fiches")
if not base_path.exists():
st.warning("Le dossier 'Fiches' est introuvable.")
return
dossiers = sorted([p for p in base_path.iterdir() if p.is_dir() and 'Criticités' not in p.name])
criticite_path = next((p for p in base_path.iterdir() if p.is_dir() and 'Criticités' in p.name), None)
if criticite_path:
dossiers.append(criticite_path)
noms_dossiers = [d.name for d in dossiers]
dossier_choisi = st.selectbox("📁 Dossiers disponibles", noms_dossiers)
chemin_dossier = base_path / dossier_choisi
fichiers_md = sorted(chemin_dossier.glob("*.md"))
noms_fichiers = []
fichiers_dict = {}
for f in fichiers_md:
try:
with f.open(encoding="utf-8") as md:
titre = md.readline().strip()
if ':' in titre:
titre = titre.split(':', 1)[1].strip()
else:
titre = f.stem
fichiers_dict[titre] = f.name
noms_fichiers.append(titre)
except Exception as e:
st.error(f"Erreur lecture fichier {f.name} : {e}")
noms_fichiers.sort()
fiche_label = st.selectbox("🗂️ Fiches Markdown", noms_fichiers)
fichier_choisi = fichiers_dict[fiche_label]
chemin_fichier = chemin_dossier / fichier_choisi
with chemin_fichier.open(encoding="utf-8") as f:
contenu = f.read()
st.markdown(contenu, unsafe_allow_html=True)
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
DOT_FILE = "schema.txt"
charger_schema_depuis_gitea(DOT_FILE)
# Charger le graphe une seule fois
try:
dot_file_path = True
if "G_temp" not in st.session_state:
if charger_schema_depuis_gitea(DOT_FILE):
st.session_state["G_temp"] = read_dot(DOT_FILE)
st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy()
else:
dot_file_path = False
G_temp = st.session_state["G_temp"]
G_temp_ivc = st.session_state["G_temp_ivc"]
except:
st.error("Erreur de lecture du fichier DOT")
dot_file_path = False
if dot_file_path:
st.markdown("""
""", unsafe_allow_html=True)
with st.sidebar:
st.markdown("---")
st.header("Navigation")
st.markdown("---")
if "onglet" not in st.session_state:
st.session_state.onglet = "Instructions"
if st.button("📄 Instructions"):
st.session_state.onglet = "Instructions"
if st.button("🔍 Analyse"):
st.session_state.onglet = "Analyse"
if st.button("📊 Visualisations"):
st.session_state.onglet = "Visualisations"
if st.button("📚 Fiches"):
st.session_state.onglet = "Fiches"
st.markdown("---")
if st.session_state.onglet == "Instructions":
with open("Instructions.md", "r", encoding="utf-8") as f:
markdown_content = f.read()
st.markdown(markdown_content)
elif st.session_state.onglet == "Analyse":
try:
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G_temp.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp])
G_temp.remove_nodes_from(
[n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
st.markdown("---")
st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**")
st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche")
niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values())
niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart")
if niveau_depart_label != "-- Sélectionner un niveau --":
niveau_depart = inverse_niveau_labels[niveau_depart_label]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart]
st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite")
niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles
niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee")
st.markdown("---")
if niveau_arrivee_label != "-- Sélectionner un niveau --":
niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label]
depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee]
st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**")
st.markdown("Sélectionner un ou plusieurs items du niveau de départ")
noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart")
st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée")
noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee")
st.markdown("---")
noeuds_depart = noeuds_depart if noeuds_depart else None
noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None
st.markdown("**Sélection des filtres pour identifier les vulnérabilités**")
filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite")
filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh")
ihh_type = None
if filtrer_ihh:
ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type")
filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg")
logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage")
st.markdown("---")
if st.button("Lancer l’analyse", type="primary", key="analyse_lancer"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
filtrer_criticite=filtrer_criticite,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
except Exception as e:
st.error(f"Erreur de prévisualisation du graphe : {e}")
elif st.session_state.onglet == "Visualisations":
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité substituabilité du minerai
""")
if st.button("Lancer", key="btn_ihh_criticite"):
try:
lancer_visualisation_ihh_criticite(G_temp)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}")
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité concurrentielle du minerai
""")
if st.button("Lancer", key="btn_ihh_ivc"):
try:
lancer_visualisation_ihh_ivc(G_temp_ivc)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs IVC : {e}")
elif st.session_state.onglet == "Fiches":
st.markdown("---")
st.markdown("**Affichage des fiches**")
st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.")
st.markdown("---")
afficher_fiches()
st.markdown("
", unsafe_allow_html=True)
st.markdown("""
""", unsafe_allow_html=True
)