Évolution schéma

This commit is contained in:
Fabrication du Numérique 2025-04-30 21:29:54 +02:00
parent 4cf33d74de
commit fbe196e166
8 changed files with 759 additions and 316 deletions

1
.gitignore vendored
View File

@ -12,6 +12,7 @@ __pycache__/
.cache/
*.log
*.tmp
*.old
# Ignorer config locale
.ropeproject/

View File

@ -37,7 +37,6 @@ Pour l'environnement de pré-production, (https://fabnum-dev.peccini.fr)[https:/
ENV=dev
PORT=8502
GITEA_URL = "https://fabnum-git.peccini.fr/api/v1"
>
GITEA_TOKEN = "LE_TOKEN_POUR_ACCEDER_A_GITEA"
ORGANISATION = "fabnum"
DEPOT_FICHES = "fiches"

14
assets/impact_co2.js Normal file
View File

@ -0,0 +1,14 @@
import tgwf from "https://cdn.skypack.dev/@tgwf/co2";
export function calculerImpactCO2(totalBytes) {
const emissions = new tgwf.co2();
const greenHost = true;
let estimatedCO2 = emissions.perByte(totalBytes, greenHost).toFixed(1);
let totalMB = (totalBytes / (1024 * 1024)).toFixed(1);
const target = document.getElementById("network-usage");
if (target) {
target.innerHTML = `Transfert : ${totalMB} Mo<br>CO₂eq estimé : ${estimatedCO2} g`;
}
}

View File

@ -2,126 +2,217 @@
body,
html {
font-family:
-apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial,
sans-serif;
font-family:
-apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial,
sans-serif;
}
.stAppHeader {
visibility: hidden;
visibility: hidden;
}
/* Conteneur principal */
.block-container {
max-width: 1024px !important;
padding-left: 2rem;
padding-right: 2rem;
padding: 0rem 1rem 10rem;
max-width: 1024px !important;
padding-left: 2rem;
padding-right: 2rem;
padding: 0rem 1rem 10rem;
}
.stVerticalBlock {
gap: 0.5rem !important;
gap: 0.5rem !important;
}
/* Lien normal (non visité) */
a {
color: #1b5e20; /* vert foncé */
text-decoration: none;
color: #1b5e20; /* vert foncé */
text-decoration: none;
}
/* Lien visité */
a:visited {
color: #388e3c; /* vert moyen */
color: #388e3c; /* vert moyen */
}
/* Lien au survol */
a:hover {
color: #145a1a; /* vert encore plus foncé */
text-decoration: underline;
color: #145a1a; /* vert encore plus foncé */
text-decoration: underline;
}
/* Lien actif */
a:active {
color: #2e7d32; /* action en cours - nuance */
color: #2e7d32; /* action en cours - nuance */
}
/* Couleur des boutons primaires et sliders */
.stButton > button,
.stSlider > div > div {
background-color: darkgreen !important;
color: white !important;
border: 1px solid grey;
background-color: darkgreen !important;
color: white !important;
border: 1px solid grey;
}
/* Style pour impression */
@media print {
body {
font-size: 12pt;
color: black;
background: white;
}
nav,
footer,
.stSidebar {
display: none !important;
}
body {
font-size: 12pt;
color: black;
background: white;
}
nav,
footer,
.stSidebar {
display: none !important;
}
}
/* En-tête large */
.wide-header {
width: 100vw;
margin-left: calc(-50vw + 50%);
background-color: #f9f9f9;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
border-bottom: 1px solid #ddd;
text-align: center;
padding-top: 1rem;
width: 100vw;
margin-left: calc(-50vw + 50%);
background-color: #f9f9f9;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
border-bottom: 1px solid #ddd;
text-align: center;
padding-top: 1rem;
}
.titre-header {
font-size: 2rem !important;
font-weight: bolder !important;
color: #555;
font-size: 2rem !important;
font-weight: bolder !important;
color: #555;
}
/* Accessibilité RGAA pour les onglets */
div[role="radiogroup"] > label {
background-color: #eee;
color: #333;
padding: 0.5em 1em;
border-radius: 0.4em;
margin-right: 0.5em;
cursor: pointer;
border: 1px solid #ccc;
background-color: #eee;
color: #333;
padding: 0.5em 1em;
border-radius: 0.4em;
margin-right: 0.5em;
cursor: pointer;
border: 1px solid #ccc;
}
div[role="radiogroup"] > label[data-selected="true"] {
background-color: #1b5e20 !important;
color: white !important;
font-weight: bold;
border: 2px solid #145a1a;
background-color: #1b5e20 !important;
color: white !important;
font-weight: bold;
border: 2px solid #145a1a;
}
/* Style du graphique Plotly */
.stPlotlyChart text {
font-family: Verdana !important;
fill: black !important;
font-size: 14px !important;
font-family: Verdana !important;
fill: black !important;
font-size: 14px !important;
}
/* Pied de page */
/* Footer général */
.wide-footer {
width: 100vw;
margin-left: calc(-50vw + 50%);
margin-top: 2rem;
background-color: #f9f9f9;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
border-bottom: 1px solid #ddd;
text-align: center;
padding-top: 1rem;
width: 100vw;
margin-left: calc(-50vw + 50%);
margin-top: 3rem; /* changé pour matcher */
background-color: #f9f9f9;
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
border-top: 1px solid #ddd;
text-align: center;
padding-top: 1rem;
}
/* Texte à l'intérieur du footer */
.info-footer {
font-size: 1rem !important;
color: #555;
font-weight: 800;
font-size: 1rem !important;
color: #333; /* au lieu de #555 */
font-weight: 800;
}
/* Petit paragraphe sous le footer */
.footer-note {
margin-top: 0.5rem;
font-size: small;
}
/* Bloc impact environnemental dans sidebar */
.impact-environnement {
margin-top: 1rem;
font-size: medium;
}
/* Div réseau pour impact CO₂ */
#network-usage {
font-size: small;
margin-top: 1rem;
}
.decorative-heading {
font-size: 1.25rem;
font-weight: bold;
margin-bottom: 0.5rem;
color: #145a1a; /* même couleur que hover, bon contraste */
}
/* Override Streamlit file uploader limit text to 100 Ko */
div[data-testid="stFileUploaderDropzoneInstructions"] small {
visibility: hidden;
}
div[data-testid="stFileUploaderDropzoneInstructions"] small::after {
content: "Limite 100 Ko par fichier • JSON";
visibility: visible;
display: block;
font-size: inherit;
color: inherit;
margin-top: 0.25em;
}
/* Override Streamlit file uploader limit text to 100 Ko */
div[data-testid="stFileUploaderDropzoneInstructions"] small {
visibility: hidden;
}
div[data-testid="stFileUploaderDropzoneInstructions"] small::after {
content: "Limite 100 Ko par fichier • JSON";
visibility: visible;
display: block;
font-size: inherit;
color: inherit;
margin-top: 0.25em;
}
/* Translate Drag and drop and Browse files */
/* Hide original "Drag and drop file here" text */
div[data-testid="stFileUploaderDropzoneInstructions"]
.st-emotion-cache-j7qwjs
> span:nth-of-type(1) {
visibility: hidden;
}
/* Insert French translation */
div[data-testid="stFileUploaderDropzoneInstructions"]
.st-emotion-cache-j7qwjs
> span:nth-of-type(1)::after {
content: "Glissez-déposez votre fichier ici";
visibility: visible;
display: block;
font-size: inherit;
color: inherit;
}
/* Hide original "Browse files" button text */
/* Target the button within the dropzone container for uploader */
div[data-testid="stFileUploaderDropzone"]
button[data-testid="stBaseButton-secondary"] {
color: transparent !important;
position: relative;
}
/* Insert French translation for button */
div[data-testid="stFileUploaderDropzone"]
button[data-testid="stBaseButton-secondary"]::after {
content: "Parcourir les fichiers";
visibility: visible !important;
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
display: block;
font-size: inherit;
color: inherit !important;
}

86
connexion.py Normal file
View File

@ -0,0 +1,86 @@
import streamlit as st
import requests
import logging
import os
def initialiser_logger():
LOG_FILE_PATH = "/var/log/fabnum-auth.log"
if not os.path.exists(os.path.dirname(LOG_FILE_PATH)):
os.makedirs(os.path.dirname(LOG_FILE_PATH), exist_ok=True)
logger = logging.getLogger("auth_logger")
logger.setLevel(logging.INFO)
if not logger.hasHandlers():
fh = logging.FileHandler(LOG_FILE_PATH)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
return logger
def connexion():
if not st.session_state.get("logged_in", False):
st.title("Authentification")
GITEA_URL = "https://fabnum-git.peccini.fr/api/v1"
ORGANISATION = "FabNum"
EQUIPE_CIBLE = "Administrateurs"
logger = initialiser_logger()
if "logged_in" not in st.session_state:
st.session_state.logged_in = False
st.session_state.username = ""
st.session_state.token = ""
if not st.session_state.logged_in:
with st.form("auth_form"):
token = st.text_input("Token d'accès personnel Gitea", type="password")
submitted = st.form_submit_button("Se connecter")
if submitted and token:
erreur = True
headers = {"Authorization": f"token {token}"}
ip = os.environ.get("REMOTE_ADDR", "inconnu")
username = "inconnu"
try:
user_response = requests.get(f"{GITEA_URL}/user", headers=headers, timeout=5)
user_response.raise_for_status()
utilisateur = user_response.json()
username = utilisateur.get("login", "inconnu")
logger.info(f"Tentative par {username} depuis IP {ip}")
teams_url = f"{GITEA_URL}/orgs/{ORGANISATION}/teams"
teams_response = requests.get(teams_url, headers=headers, timeout=5)
teams_response.raise_for_status()
equipes = teams_response.json()
equipe_admin = next((e for e in equipes if e["name"] == EQUIPE_CIBLE), None)
if equipe_admin:
team_id = equipe_admin["id"]
check_url = f"{GITEA_URL}/teams/{team_id}/members/{username}"
check_response = requests.get(check_url, headers=headers, timeout=5)
if check_response.status_code == 200:
st.session_state.logged_in = True
st.session_state.username = username
st.session_state.token = token
erreur = False
logger.info(f"Connexion réussie pour {username} depuis IP {ip}")
st.rerun()
except requests.RequestException:
st.error("❌ Impossible de vérifier l'utilisateur auprès de Gitea.")
if erreur:
logger.warning(f"Accès refusé pour tentative avec token depuis IP {ip}")
st.error("❌ Accès refusé.")
def bouton_deconnexion():
if st.session_state.get("logged_in", False):
st.sidebar.markdown(f"Connecté en tant que `{st.session_state.username}`")
if st.sidebar.button("Se déconnecter"):
st.session_state.logged_in = False
st.session_state.username = ""
st.session_state.token = ""
st.success("Déconnecté avec succès.")
st.rerun()

748
fabnum.py
View File

@ -1,5 +1,5 @@
import streamlit as st
from networkx.drawing.nx_agraph import read_dot
from networkx.drawing.nx_agraph import read_dot, write_dot
import pandas as pd
import plotly.graph_objects as go
import networkx as nx
@ -15,7 +15,17 @@ from tickets_fiche import gerer_tickets_fiche
import base64
from dateutil import parser
from datetime import datetime, timezone
import copy
import streamlit.components.v1 as components
from connexion import connexion, bouton_deconnexion
import tempfile
import json
st.set_page_config(
page_title="Fabnum Analyse de chaîne",
page_icon="assets/weakness.png"
)
session_id = st.context.headers.get("x-session-id")
# Configuration Gitea
load_dotenv()
@ -26,17 +36,40 @@ ORGANISATION = os.getenv("ORGANISATION", "fabnum")
DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches")
ENV = os.getenv("ENV")
st.set_page_config(
page_title="Fabnum Analyse de chaîne",
page_icon="assets/weakness.png"
)
DOT_FILE = "schema.txt"
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def get_total_bytes_for_session(session_id):
total_bytes = 0
try:
with open("/var/log/nginx/fabnum-dev.access.log", "r") as f:
for line in f:
if session_id in line:
match = re.search(r'"GET.*?" \d+ (\d+)', line)
if match:
bytes_sent = int(match.group(1))
total_bytes += bytes_sent
except Exception as e:
st.error(f"Erreur lecture log: {e}")
return total_bytes
# Intégration du fichier CSS externe
with open("assets/styles.css") as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
header ="""
<div role='region' aria-labelledby='entete-header' class='wide-header'>
<header role="banner" aria-labelledby="entete-header">
<div class='wide-header'>
<p id='entete-header' class='titre-header'>FabNum - Chaîne de fabrication du numérique</p>"""
if ENV == "dev":
@ -46,10 +79,51 @@ else:
header+="""
</div>
</header>
"""
st.markdown(header, unsafe_allow_html=True)
def afficher_menu():
with st.sidebar:
st.markdown("""
<nav role="navigation" aria-label="Menu principal">
<div role="region" aria-label="Navigation principale" class="onglets-accessibles">
<hr />
<p class="decorative-heading">Navigation</p>
<hr />
""", unsafe_allow_html=True)
if "onglet" not in st.session_state:
st.session_state.onglet = "Instructions"
if st.button("Instructions"):
st.session_state.onglet = "Instructions"
if st.button("Personnalisation"):
st.session_state.onglet = "Personnalisation"
if st.button("Analyse"):
st.session_state.onglet = "Analyse"
if st.button("Visualisations"):
st.session_state.onglet = "Visualisations"
if st.button("Fiches"):
st.session_state.onglet = "Fiches"
st.markdown("---")
connexion()
# Si l'utilisateur est connecté, afficher le reste
if st.session_state.get("logged_in", False):
bouton_deconnexion()
st.markdown("""
<hr />
</div>
</nav>""", unsafe_allow_html=True)
afficher_menu()
st.markdown("""
<main role="main">
""", unsafe_allow_html=True)
def recuperer_date_dernier_commit_schema():
headers = {"Authorization": f"token " + GITEA_TOKEN}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path=schema.txt&sha={ENV}"
@ -346,15 +420,14 @@ def afficher_sankey(
G,
niveau_depart, niveau_arrivee,
noeuds_depart=None, noeuds_arrivee=None,
filtrer_criticite=False, filtrer_ivc=False, filtrer_ihh=False,
filtrer_isg=False,
logique_filtrage="OU"
):
minerais=None,
filtrer_criticite=False, filtrer_ivc=False,
filtrer_ihh=False, filtrer_isg=False,
logique_filtrage="OU"):
niveaux = {}
for node, attrs in G.nodes(data=True):
# Conversion du niveau
niveau_str = attrs.get("niveau")
try:
if niveau_str:
@ -362,28 +435,12 @@ def afficher_sankey(
except ValueError:
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
# Suppression des attributs indésirables
ATTRIBUTS_SUPPRIMES = {"fillcolor", "fontcolor", "style", "fontsize"}
for attr in ATTRIBUTS_SUPPRIMES:
attrs.pop(attr, None)
# Réordonner : label d'abord
if "label" in attrs:
reordered = OrderedDict()
reordered["label"] = attrs["label"]
for k, v in attrs.items():
if k != "label":
reordered[k] = v
G.nodes[node].clear()
G.nodes[node].update(reordered)
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
for na in noeuds_arrivee:
tous_chemins = extraire_chemins_depuis(G, nd)
chemins.extend(
[chemin for chemin in tous_chemins if na in chemin])
chemins.extend([chemin for chemin in tous_chemins if na in chemin])
elif noeuds_depart:
for nd in noeuds_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
@ -391,59 +448,60 @@ def afficher_sankey(
for na in noeuds_arrivee:
chemins.extend(extraire_chemins_vers(G, na, niveau_depart))
else:
sources_depart = [n for n in G.nodes() if niveaux.get(n)
== niveau_depart]
sources_depart = [n for n in G.nodes() if niveaux.get(n) == niveau_depart]
for nd in sources_depart:
chemins.extend(extraire_chemins_depuis(G, nd))
if minerais:
chemins = [chemin for chemin in chemins if any(n in minerais for n in chemin)]
def extraire_criticite(u, v):
data = G.get_edge_data(u, v)
if not data:
return 0
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
try:
return float(data[0].get("criticite", 0))
except:
return 0
return float(data[0].get("criticite", 0))
return float(data.get("criticite", 0))
liens_chemins = set()
chemins_filtres = set()
for chemin in chemins:
has_ihh = False
has_ivc = False
has_criticite = False
has_isg_critique = False
niveaux_speciaux = [1001]
for chemin in chemins:
has_ihh = has_ivc = has_criticite = has_isg_critique = False
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u)
niveau_v = niveaux.get(v)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens_chemins.add((u, v))
if filtrer_ihh and ihh_type:
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
if niveau_u == 10 and int(G.nodes[u].get(ihh_field, 0)) > 25:
has_ihh = True
if niveau_v == 10 and int(G.nodes[v].get(ihh_field, 0)) > 25:
has_ihh = True
if filtrer_ivc and niveau_u == 2 and int(G.nodes[u].get("ivc", 0)) > 30:
has_ivc = True
if filtrer_criticite and niveau_u == 1 and niveau_v == 2 and extraire_criticite(u, v) > 0.66:
has_criticite = True
for i in range(len(chemin)-1):
u, v = chemin[i], chemin[i+1]
if niveaux.get(u) is not None and niveaux.get(v) is not None:
if niveau_depart <= niveaux.get(u) <= niveau_arrivee and niveau_depart <= niveaux.get(v) <= niveau_arrivee:
liens_chemins.add((u, v))
# vérification des conditions critiques
if filtrer_ihh:
if filtrer_ihh and ihh_type:
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
if niveaux.get(u) == 10 and G.nodes[u].get(ihh_field) and int(G.nodes[u][ihh_field]) > 25:
has_ihh = True
elif niveaux.get(v) == 10 and G.nodes[v].get(ihh_field) and int(G.nodes[v][ihh_field]) > 25:
has_ihh = True
if filtrer_ivc and niveaux.get(u) == 2 and G.nodes[u].get("ivc") and int(G.nodes[u]["ivc"]) > 30:
has_ivc = True
if filtrer_criticite and niveaux.get(u) == 1 and niveaux.get(v) == 2 and extraire_criticite(u, v) > 0.66:
has_criticite = True
# Vérifie présence d'un isg >= 60
for n in (u, v):
if niveaux.get(n) == 99:
isg = int(G.nodes[n].get("isg", 0))
if isg >= 60:
has_isg_critique = True
if niveaux.get(n) == 99 and int(G.nodes[n].get("isg", 0)) >= 60:
has_isg_critique = True
elif niveaux.get(n) in (11, 12):
for succ in G.successors(n):
if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60:
has_isg_critique = True
if logique_filtrage == "ET":
keep = True
if filtrer_ihh:
@ -457,10 +515,7 @@ def afficher_sankey(
if keep:
chemins_filtres.add(tuple(chemin))
elif logique_filtrage == "OU":
if (filtrer_ihh and has_ihh) or \
(filtrer_ivc and has_ivc) or \
(filtrer_criticite and has_criticite) or \
(filtrer_isg and has_isg_critique):
if (filtrer_ihh and has_ihh) or (filtrer_ivc and has_ivc) or (filtrer_criticite and has_criticite) or (filtrer_isg and has_isg_critique):
chemins_filtres.add(tuple(chemin))
if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]):
@ -469,7 +524,12 @@ def afficher_sankey(
for chemin in chemins:
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
if niveau_depart <= niveaux.get(u, 999) <= niveau_arrivee and niveau_depart <= niveaux.get(v, 999) <= niveau_arrivee:
niveau_u = niveaux.get(u, 999)
niveau_v = niveaux.get(v, 999)
if (
(niveau_depart <= niveau_u <= niveau_arrivee or niveau_u in niveaux_speciaux)
and (niveau_depart <= niveau_v <= niveau_arrivee or niveau_v in niveaux_speciaux)
):
liens_chemins.add((u, v))
if not liens_chemins:
@ -488,7 +548,6 @@ def afficher_sankey(
noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"])
sorted_nodes = [n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises]
def couleur_criticite(p):
if p <= 0.33:
return "darkgreen"
@ -498,8 +557,7 @@ def afficher_sankey(
return "darkred"
df_liens["color"] = df_liens.apply(
lambda row: couleur_criticite(row["criticite"]) if niveaux.get(
row["source"]) == 1 and niveaux.get(row["target"]) == 2 else "gray",
lambda row: couleur_criticite(row["criticite"]) if row["criticite"] > 0 else "gray",
axis=1
)
@ -568,6 +626,32 @@ def afficher_sankey(
)
st.plotly_chart(fig)
if st.session_state.get("logged_in", False):
if liens_chemins:
G_export = nx.DiGraph()
for u, v in liens_chemins:
G_export.add_node(u, **G.nodes[u])
G_export.add_node(v, **G.nodes[v])
data = G.get_edge_data(u, v)
if isinstance(data, dict) and all(isinstance(k, int) for k in data):
G_export.add_edge(u, v, **data[0])
elif isinstance(data, dict):
G_export.add_edge(u, v, **data)
else:
G_export.add_edge(u, v)
with tempfile.NamedTemporaryFile(delete=False, suffix=".dot", mode="w", encoding="utf-8") as f:
write_dot(G_export, f.name)
dot_path = f.name
with open(dot_path, encoding="utf-8") as f:
st.download_button(
label="Télécharger le fichier DOT filtré",
data=f.read(),
file_name="graphe_filtré.dot",
mime="text/plain"
)
def creer_graphes(donnees):
if not donnees:
st.warning("Aucune donnée à afficher.")
@ -744,231 +828,395 @@ def afficher_fiches():
except Exception as e:
st.error(f"Erreur lors du chargement de la fiche : {e}")
def afficher_fiches_old():
import streamlit as st
from pathlib import Path
def lancer_personnalisation(G):
"""
Affiche et modifie uniquement les produits finaux personnalisables (ceux ajoutés)
et permet d'ajouter de nouveaux produits finaux.
Permet aussi d'importer et d'exporter la configuration personnalisée.
base_path = Path("Fiches")
if not base_path.exists():
st.warning("Le dossier 'Fiches' est introuvable.")
return
dossiers = sorted([p for p in base_path.iterdir() if p.is_dir() and 'Criticités' not in p.name])
criticite_path = next((p for p in base_path.iterdir() if p.is_dir() and 'Criticités' in p.name), None)
if criticite_path:
dossiers.append(criticite_path)
noms_dossiers = [d.name for d in dossiers]
dossier_choisi = st.selectbox("📁 Dossiers disponibles", noms_dossiers)
chemin_dossier = base_path / dossier_choisi
fichiers_md = sorted(chemin_dossier.glob("*.md"))
noms_fichiers = []
fichiers_dict = {}
for f in fichiers_md:
try:
with f.open(encoding="utf-8") as md:
titre = md.readline().strip()
if ':' in titre:
titre = titre.split(':', 1)[1].strip()
else:
titre = f.stem
fichiers_dict[titre] = f.name
noms_fichiers.append(titre)
except Exception as e:
st.error(f"Erreur lecture fichier {f.name} : {e}")
noms_fichiers.sort()
fiche_label = st.selectbox("🗂️ Fiches Markdown", noms_fichiers)
fichier_choisi = fichiers_dict[fiche_label]
chemin_fichier = chemin_dossier / fichier_choisi
with chemin_fichier.open(encoding="utf-8") as f:
contenu = f.read()
st.markdown(contenu, unsafe_allow_html=True)
niveau_labels = {
0: "Produit final",
1: "Composant",
2: "Minerai",
10: "Opération",
11: "Pays d'opération",
12: "Acteur d'opération",
99: "Pays géographique"
}
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
DOT_FILE = "schema.txt"
charger_schema_depuis_gitea(DOT_FILE)
# Charger le graphe une seule fois
try:
dot_file_path = True
if "G_temp" not in st.session_state:
if charger_schema_depuis_gitea(DOT_FILE):
st.session_state["G_temp"] = read_dot(DOT_FILE)
st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy()
else:
dot_file_path = False
G_temp = st.session_state["G_temp"]
G_temp_ivc = st.session_state["G_temp_ivc"]
except:
st.error("Erreur de lecture du fichier DOT")
dot_file_path = False
if dot_file_path:
Retour:
G: le graphe modifié
"""
st.header("Personnalisation des produits finaux")
st.markdown("""
<div role="form" aria-label="Navigation des onglets" class="onglets-accessibles">
""", unsafe_allow_html=True)
---
with st.sidebar:
st.markdown("---")
st.header("Navigation")
st.markdown("---")
if "onglet" not in st.session_state:
st.session_state.onglet = "Instructions"
Dans cette section, vous pouvez ajouter des produits finaux qui ne sont pas présents dans la liste,
par exemple des produits que vous concevez vous même.
if st.button("📄 Instructions"):
st.session_state.onglet = "Instructions"
if st.button("🔍 Analyse"):
st.session_state.onglet = "Analyse"
if st.button("📊 Visualisations"):
st.session_state.onglet = "Visualisations"
if st.button("📚 Fiches"):
st.session_state.onglet = "Fiches"
Pour chacun de ces produits, vous allez lui associer les composants qui le constituent, et si
cela vous convient, lui associer une opération d'assemblage existante.
Les modifications que vous faites ne sont pas stockées dans l'application. Vous pouvez toutefois
les enregistrer dans un fichier que vous pourrez recharger ultérieurement.
---
""")
# --- 1. Ajouter un nouveau produit final
st.subheader("Ajouter un nouveau produit final")
new_prod = st.text_input("Nom du nouveau produit (unique)", key="new_prod")
if new_prod:
# Opérations d'assemblage disponibles (niveau 10)
ops_dispo = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "10"
and any(
G.has_edge(p, n) and G.nodes[p].get("niveau") == "0"
for p in G.predecessors(n)
)
])
sel_new_op = st.selectbox(
"Opération d'assemblage (optionnelle)",
options=["-- Aucune --"] + ops_dispo,
index=0,
key="new_op"
)
# Composants de niveau 1
niveau1 = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "1"
])
sel_comps = st.multiselect(
"Composants à lier", options=niveau1, key="new_links"
)
if st.button("Créer le produit", key="btn_new"):
G.add_node(new_prod, niveau="0", personnalisation="oui", label=new_prod)
if sel_new_op != "-- Aucune --":
G.add_edge(new_prod, sel_new_op)
for comp in sel_comps:
G.add_edge(new_prod, comp)
st.success(
f"{new_prod} ajouté : {len(sel_comps)} composant(s)"
+ (f", opération {sel_new_op}" if sel_new_op != "-- Aucune --" else "")
)
st.markdown("---")
# --- 2. Modifier un produit final ajouté
st.subheader("Modifier un produit final ajouté")
produits0 = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "0" and d.get("personnalisation") == "oui"
])
sel_display = st.multiselect(
"Sélectionnez un produit final ajouté à modifier",
options=produits0,
key="prod_sel"
)
if sel_display:
prod = sel_display[0]
# Bouton de suppression
if st.button(f"Supprimer le produit {prod}", key=f"del_{prod}"):
G.remove_node(prod)
st.success(f"Produit « {prod} » supprimé.")
st.session_state.pop("prod_sel", None)
return G
# Opérations d'assemblage disponibles
ops_dispo = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "10"
and any(
G.has_edge(p, n) and G.nodes[p].get("niveau") == "0"
for p in G.predecessors(n)
)
])
# Opération actuelle
curr_ops = [
succ for succ in G.successors(prod)
if G.nodes[succ].get("niveau") == "10"
]
default_idx = 0
if curr_ops and curr_ops[0] in ops_dispo:
default_idx = ops_dispo.index(curr_ops[0]) + 1
sel_op = st.selectbox(
f"Opération d'assemblage liée à {prod} (optionnelle)",
options=["-- Aucune --"] + ops_dispo,
index=default_idx,
key=f"op_{prod}"
)
# Composants liés
niveau1 = sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "1"
])
linked = [
succ for succ in G.successors(prod)
if G.nodes[succ].get("niveau") == "1"
]
nouveaux = st.multiselect(
f"Composants liés à {prod}",
options=niveau1,
default=linked,
key=f"links_{prod}"
)
# Mise à jour
if st.button(f"Mettre à jour {prod}", key=f"btn_{prod}"):
# Mettre à jour l'opération
for op in curr_ops:
if sel_op == "-- Aucune --" or op != sel_op:
G.remove_edge(prod, op)
if sel_op != "-- Aucune --" and (not curr_ops or sel_op not in curr_ops):
G.add_edge(prod, sel_op)
# Mettre à jour les composants
for comp in set(linked) - set(nouveaux):
G.remove_edge(prod, comp)
for comp in set(nouveaux) - set(linked):
G.add_edge(prod, comp)
st.success(
f"{prod} mis à jour : {len(nouveaux)} composant(s)"
+ (f", opération {sel_op}" if sel_op != "-- Aucune --" else "")
)
st.markdown("---")
# --- 3. Sauvegarder ou restaurer la configuration
st.subheader("Sauvegarder ou restaurer la configuration")
# Export
if st.button("Exporter configuration", key="export_config"):
nodes = [n for n, d in G.nodes(data=True) if d.get("personnalisation")=="oui"]
edges = [(u, v) for u, v in G.edges() if u in nodes]
conf = {"nodes": nodes, "edges": edges}
json_str = json.dumps(conf, ensure_ascii=False)
st.download_button(
label="Télécharger la config (JSON)",
data=json_str,
file_name="config_personnalisation.json",
mime="application/json"
)
# Import
uploaded = st.file_uploader(
"Importer une configuration (JSON) (max 100 Ko)",
type=["json"], key="import_config"
)
if uploaded:
if uploaded.size > 100 * 1024:
st.error("Fichier trop volumineux (max 100Ko).")
else:
try:
conf = json.load(uploaded)
for node in conf.get("nodes", []):
if not G.has_node(node):
G.add_node(node, niveau="0", personnalisation="oui", label=node)
for u, v in conf.get("edges", []):
if G.has_node(u) and G.has_node(v) and not G.has_edge(u, v):
G.add_edge(u, v)
st.success("Configuration importée avec succès.")
except Exception as e:
st.error(f"Erreur d'import: {e}")
return G
dot_file_path = None
if st.session_state.onglet == "Instructions":
with open("Instructions.md", "r", encoding="utf-8") as f:
markdown_content = f.read()
st.markdown(markdown_content)
elif st.session_state.onglet == "Fiches":
st.markdown("---")
st.markdown("**Affichage des fiches**")
st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.")
st.markdown("---")
afficher_fiches()
else:
# Charger le graphe une seule fois
if "G_temp" not in st.session_state:
try:
if charger_schema_depuis_gitea(DOT_FILE):
st.session_state["G_temp"] = read_dot(DOT_FILE)
st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy()
dot_file_path = True
else:
dot_file_path = False
except Exception as e:
st.error(f"Erreur de lecture du fichier DOT : {e}")
dot_file_path = False
else:
dot_file_path = True
if dot_file_path:
G_temp = st.session_state["G_temp"]
G_temp_ivc = st.session_state["G_temp_ivc"]
else:
st.error("Impossible de charger le graphe pour cet onglet.")
if dot_file_path and st.session_state.onglet == "Analyse":
try:
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G_temp.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp])
G_temp.remove_nodes_from(
[n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
st.markdown("---")
st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**")
st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche")
niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values())
if st.session_state.onglet == "Instructions":
with open("Instructions.md", "r", encoding="utf-8") as f:
markdown_content = f.read()
st.markdown(markdown_content)
niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart")
elif st.session_state.onglet == "Analyse":
try:
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G_temp.nodes(data=True)
if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit()
}
G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp])
G_temp.remove_nodes_from(
[n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n])
if niveau_depart_label != "-- Sélectionner un niveau --":
niveau_depart = inverse_niveau_labels[niveau_depart_label]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart]
st.markdown("---")
st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**")
st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche")
niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values())
st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite")
niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart")
niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles
niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee")
if niveau_depart_label != "-- Sélectionner un niveau --":
niveau_depart = inverse_niveau_labels[niveau_depart_label]
if niveau_arrivee_label != "-- Sélectionner un niveau --":
niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label]
niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart]
st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite")
niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles
niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee")
minerais_selection = None
if niveau_depart < 2 < niveau_arrivee:
# Tous les nœuds de niveau 2 (minerai)
minerais_nodes = sorted([
n for n, d in G_temp.nodes(data=True)
if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2
])
minerais_selection = st.multiselect(
"Filtrer par minerais (optionnel)",
options=minerais_nodes,
key="analyse_minerais"
)
st.markdown("---")
if niveau_arrivee_label != "-- Sélectionner un niveau --":
niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label]
depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee]
depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart]
arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee]
st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**")
st.markdown("Sélectionner un ou plusieurs items du niveau de départ")
st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**")
st.markdown("Sélectionner un ou plusieurs items du niveau de départ")
noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart")
noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart")
st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée")
st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée")
noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee")
noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee")
st.markdown("---")
st.markdown("---")
noeuds_depart = noeuds_depart if noeuds_depart else None
noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None
noeuds_depart = noeuds_depart if noeuds_depart else None
noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None
st.markdown("**Sélection des filtres pour identifier les vulnérabilités**")
st.markdown("**Sélection des filtres pour identifier les vulnérabilités**")
filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite")
filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh")
filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite")
filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc")
filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh")
ihh_type = None
if filtrer_ihh:
ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type")
ihh_type = None
if filtrer_ihh:
ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type")
filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg")
logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage")
filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg")
logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage")
st.markdown("---")
st.markdown("---")
if st.button("Lancer lanalyse", type="primary", key="analyse_lancer"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
minerais=minerais_selection,
filtrer_criticite=filtrer_criticite,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
if st.button("Lancer lanalyse", type="primary", key="analyse_lancer"):
afficher_sankey(
G_temp,
niveau_depart=niveau_depart,
niveau_arrivee=niveau_arrivee,
noeuds_depart=noeuds_depart,
noeuds_arrivee=noeuds_arrivee,
filtrer_criticite=filtrer_criticite,
filtrer_ivc=filtrer_ivc,
filtrer_ihh=filtrer_ihh,
filtrer_isg=filtrer_isg,
logique_filtrage=logique_filtrage
)
except Exception as e:
st.error(f"Erreur de prévisualisation du graphe : {e}")
except Exception as e:
st.error(f"Erreur de prévisualisation du graphe : {e}")
elif st.session_state.onglet == "Visualisations":
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité**
elif dot_file_path and st.session_state.onglet == "Visualisations":
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité substituabilité du minerai
""")
if st.button("Lancer", key="btn_ihh_criticite"):
try:
lancer_visualisation_ihh_criticite(G_temp)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}")
if st.button("Lancer", key="btn_ihh_criticite"):
try:
lancer_visualisation_ihh_criticite(G_temp)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}")
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC**
st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC**
Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte.
Taille des points = criticité concurrentielle du minerai
""")
if st.button("Lancer", key="btn_ihh_ivc"):
try:
lancer_visualisation_ihh_ivc(G_temp_ivc)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs IVC : {e}")
if st.button("Lancer", key="btn_ihh_ivc"):
try:
lancer_visualisation_ihh_ivc(G_temp_ivc)
except Exception as e:
st.error(f"Erreur dans la visualisation IHH vs IVC : {e}")
elif st.session_state.onglet == "Fiches":
st.markdown("---")
st.markdown("**Affichage des fiches**")
st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.")
st.markdown("---")
afficher_fiches()
elif dot_file_path and st.session_state.onglet == "Personnalisation":
G_temp = lancer_personnalisation(G_temp)
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("""</section>""", unsafe_allow_html=True)
st.markdown("</main>", unsafe_allow_html=True)
st.markdown("""<section role="region" aria-label="Contenu principal" id="main-content">""", unsafe_allow_html=True)
st.markdown("""
<div role='contentinfo' aria-labelledby='footer-appli' class='wide-footer'>
<div class='info-footer'>
<p id='footer-appli' class='info-footer'>Fabnum © 2025 <a href='mailto:stephan-pro@peccini.fr'>Contact</a> Licence <a href='https://creativecommons.org/licenses/by-nc-sa/4.0/'>CC BY-NC-SA </a></p>
<p id='footer-appli' class='info-footer'>
Fabnum © 2025 <a href='mailto:stephan-pro@peccini.fr'>Contact</a> Licence <a href='https://creativecommons.org/licenses/by-nc-sa/4.0/' target='_blank'>CC BY-NC-SA</a>
</p>
<p class='footer-note'>
🌱 Calculs CO₂ via <a href='https://www.thegreenwebfoundation.org/' target='_blank'>The Green Web Foundation</a><br>
🚀 Propulsé par <a href='https://streamlit.io/' target='_blank'>Streamlit</a>
</p>
</div>
</div>
""", unsafe_allow_html=True
)
""", unsafe_allow_html=True)
total_bytes = get_total_bytes_for_session(session_id)
with st.sidebar:
components.html(f"""
<html lang="fr">
<head>
<title>Impact environnemental estimé de votre session</title>
</head>
<body>
<div role="region" aria-label="Impact environnemental de la session" class="impact-environnement">
<p class="decorative-heading">Impact environnemental de votre session</p>
<p>
<span id="network-usage">Chargement en cours</span>
</p>
</div>
<script>
document.addEventListener("DOMContentLoaded", async function() {{
try {{
const module = await import("/assets/impact_co2.js");
module.calculerImpactCO2({total_bytes});
}} catch (error) {{
console.error("Erreur lors du chargement du module impact_co2.js", error);
}}
}});
</script>
</body>
</html>
""")

View File

@ -5,3 +5,4 @@ pandas
plotly
requests
kaleido>=0.2.1
streamlit_browser_cookie

View File

@ -1,5 +1,4 @@
import streamlit as st
from datetime import datetime
from dateutil import parser
from collections import defaultdict
import os
@ -96,7 +95,11 @@ def afficher_tickets_par_fiche(tickets):
statut = extraire_statut_par_label(ticket)
tickets_groupes[statut].append(ticket)
st.info(f"{len(tickets_groupes["Backlog"])} ticket(s) en attente de modération ne sont pas affichés.")
nb_backlogs = len(tickets_groupes["Backlog"])
if nb_backlogs == 1:
st.info(f"{nb_backlogs} ticket en attente de modération n'est pas affiché.")
else :
st.info(f"{nb_backlogs} ticket(s) en attente de modération ne sont pas affichés.")
ordre_statuts = ["En attente de traitement", "En cours", "Terminés", "Non retenus", "Autres"]