commit e7ad23e390434ab61e063e24749aa4d8fe48790f Author: Fabrication du Numérique Date: Sun Apr 27 11:27:56 2025 +0200 Initialisation diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e381d37 --- /dev/null +++ b/.gitignore @@ -0,0 +1,28 @@ +# Ignorer fichiers sensibles +.env +*.env + +# Ignorer fichiers utilisateurs +*.pyc +__pycache__/ +*.pyo +*.pyd + +# Ignorer cache et temporaire +.cache/ +*.log +*.tmp + +# Ignorer config locale +.ropeproject/ +.streamlit/ +venv + +# Ignorer données Fiches (adapté à ton projet) +schema.txt +Instructions.md +Fiches/ + +# Autres spécifiques si besoin +.DS_Store + diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/assets/fiches_labels.csv b/assets/fiches_labels.csv new file mode 100644 index 0000000..5917b8c --- /dev/null +++ b/assets/fiches_labels.csv @@ -0,0 +1,86 @@ +Fiche,Label opération,Label item +Fiche assemblage casques VR.md,Assemblage,CasquesVR +Fiche assemblage imprimante.md,Assemblage,Imprimante +Fiche assemblage IoT_Wearables.md,Assemblage,IoTWearables +Fiche assemblage matériel dédié IA.md,Assemblage,MaterielIA +Fiche assemblage matériel réseau.md,Assemblage,MaterielReseau +Fiche assemblage ordinateur de bureau.md,Assemblage,OrdiBureau +Fiche assemblage ordinateur portable.md,Assemblage,OrdiPortable +Fiche assemblage serveur.md,Assemblage,Serveur +Fiche assemblage smartphone.md,Assemblage,Smartphone +Fiche assemblage stockage.md,Assemblage,Stockage +Fiche assemblage tablette.md,Assemblage,Tablette +Fiche assemblage télévision - écran.md,Assemblage,Television +Fiche composant audio.md,Fabrication,Audio +Fiche composant batterie.md,Fabrication,Batterie +Fiche composant boîtier.md,Fabrication,Boitier +Fiche composant caméra.md,Fabrication,Camera +Fiche composant capteurs.md,Fabrication,Capteurs +Fiche composant carte mère.md,Fabrication,CarteMere +Fiche composant connecteurs.md,Fabrication,Connecteurs +Fiche composant connectivité.md,Fabrication,Connectivite +Fiche composant disque dur (HDD).md,Fabrication,DisqueDur +Fiche composant écran LCD-TFT.md,Fabrication,EcranLCD +Fiche composant écran mini et micro LED.md,Fabrication,EcranMiniLED +Fiche composant écran OLED.md,Fabrication,EcranOLED +Fiche composant écran specifique.md,Fabrication,EcranSpecifique +Fiche composant mémoire RAM.md,Fabrication,MemoireRAM +Fiche composant processeur ARM.md,Fabrication,ProcesseurARM +Fiche composant processeur ASIC - spécialisé.md,Fabrication,ProcesseurASIC +Fiche composant processeur x86.md,Fabrication,ProcesseurX86 +Fiche composant SSD 2.5.md,Fabrication,SSD25 +Fiche composant SSD M.2.md,Fabrication,SSDM2 +Fiche composant stockage eMMC_UFS.md,Fabrication,StockageEMMC +Fiche minerai acier.md,Extraction/Traitement/Réserves,Acier +Fiche minerai aluminium.md,Extraction/Traitement/Réserves,Aluminium +Fiche minerai antimoine.md,Extraction/Traitement/Réserves,Antimoine +Fiche minerai argent.md,Extraction/Traitement/Réserves,Argent +Fiche minerai arsenic.md,Extraction/Traitement/Réserves,Arsenic +Fiche minerai beryllium.md,Extraction/Traitement/Réserves,Beryllium +Fiche minerai ceramiques.md,Extraction/Traitement/Réserves,Ceramiques +Fiche minerai cerium.md,Extraction/Traitement/Réserves,Cerium +Fiche minerai chrome.md,Extraction/Traitement/Réserves,Chrome +Fiche minerai cobalt.md,Extraction/Traitement/Réserves,Cobalt +Fiche minerai cuivre.md,Extraction/Traitement/Réserves,Cuivre +Fiche minerai dysprosium.md,Extraction/Traitement/Réserves,Dysprosium +Fiche minerai erbium.md,Extraction/Traitement/Réserves,Erbium +Fiche minerai etain.md,Extraction/Traitement/Réserves,Etain +Fiche minerai europium.md,Extraction/Traitement/Réserves,Europium +Fiche minerai fluorite.md,Extraction/Traitement/Réserves,Fluorite +Fiche minerai gadolinium.md,Extraction/Traitement/Réserves,Gadolinium +Fiche minerai gallium.md,Extraction/Traitement/Réserves,Gallium +Fiche minerai germanium.md,Extraction/Traitement/Réserves,Germanium +Fiche minerai graphite.md,Extraction/Traitement/Réserves,Graphite +Fiche minerai hafnium.md,Extraction/Traitement/Réserves,Hafnium +Fiche minerai holmium.md,Extraction/Traitement/Réserves,Holmium +Fiche minerai indiumetain.md,Extraction/Traitement/Réserves,IndiumEtain +Fiche minerai lanthane.md,Extraction/Traitement/Réserves,Lanthane +Fiche minerai lithium.md,Extraction/Traitement/Réserves,Lithium +Fiche minerai magnesium.md,Extraction/Traitement/Réserves,Magnesium +Fiche minerai manganese.md,Extraction/Traitement/Réserves,Manganese +Fiche minerai neodyme.md,Extraction/Traitement/Réserves,Neodyme +Fiche minerai nickel.md,Extraction/Traitement/Réserves,Nickel +Fiche minerai or.md,Extraction/Traitement/Réserves,Or +Fiche minerai palladium.md,Extraction/Traitement/Réserves,Palladium +Fiche minerai pet.md,Extraction/Traitement/Réserves,PET +Fiche minerai phosphore.md,Extraction/Traitement/Réserves,Phosphore +Fiche minerai plastiques.md,Extraction/Traitement/Réserves,Plastiques +Fiche minerai platine.md,Extraction/Traitement/Réserves,Platine +Fiche minerai plomb.md,Extraction/Traitement/Réserves,Plomb +Fiche minerai polystyrene.md,Extraction/Traitement/Réserves,Polystyrene +Fiche minerai praseodyme.md,Extraction/Traitement/Réserves,Praseodyme +Fiche minerai pvc.md,Extraction/Traitement/Réserves,PVC +Fiche minerai quartz.md,Extraction/Traitement/Réserves,Quartz +Fiche minerai samarium.md,Extraction/Traitement/Réserves,Samarium +Fiche minerai scandium.md,Extraction/Traitement/Réserves,Scandium +Fiche minerai silicium.md,Extraction/Traitement/Réserves,Silicium +Fiche minerai tantale.md,Extraction/Traitement/Réserves,Tantale +Fiche minerai terbium.md,Extraction/Traitement/Réserves,Terbium +Fiche minerai titane.md,Extraction/Traitement/Réserves,Titane +Fiche minerai tungstene.md,Extraction/Traitement/Réserves,Tungstene +Fiche minerai verre.md,Extraction/Traitement/Réserves,Verre +Fiche minerai yttrium.md,Extraction/Traitement/Réserves,Yttrium +Fiche minerai zinc.md,Extraction/Traitement/Réserves,Zinc +Fiche technique ICS.md,Criticité,ICS +Fiche technique IHH.md,Criticité,IHH +Fiche technique IVC.md,Criticité,IVC diff --git a/assets/styles.css b/assets/styles.css new file mode 100644 index 0000000..bdcab37 --- /dev/null +++ b/assets/styles.css @@ -0,0 +1,127 @@ +/* styles.css */ + +body, +html { + font-family: + -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, Helvetica, Arial, + sans-serif; +} + +.stAppHeader { + visibility: hidden; +} + +/* Conteneur principal */ +.block-container { + max-width: 1024px !important; + padding-left: 2rem; + padding-right: 2rem; + padding: 0rem 1rem 10rem; +} + +.stVerticalBlock { + gap: 0.5rem !important; +} + +/* Lien normal (non visité) */ +a { + color: #1b5e20; /* vert foncé */ + text-decoration: none; +} + +/* Lien visité */ +a:visited { + color: #388e3c; /* vert moyen */ +} + +/* Lien au survol */ +a:hover { + color: #145a1a; /* vert encore plus foncé */ + text-decoration: underline; +} + +/* Lien actif */ +a:active { + color: #2e7d32; /* action en cours - nuance */ +} + +/* Couleur des boutons primaires et sliders */ +.stButton > button, +.stSlider > div > div { + background-color: darkgreen !important; + color: white !important; + border: 1px solid grey; +} + +/* Style pour impression */ +@media print { + body { + font-size: 12pt; + color: black; + background: white; + } + nav, + footer, + .stSidebar { + display: none !important; + } +} + +/* En-tête large */ +.wide-header { + width: 100vw; + margin-left: calc(-50vw + 50%); + background-color: #f9f9f9; + box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); + border-bottom: 1px solid #ddd; + text-align: center; + padding-top: 1rem; +} + +.titre-header { + font-size: 2rem !important; + font-weight: bolder !important; + color: #555; +} + +/* Accessibilité RGAA pour les onglets */ +div[role="radiogroup"] > label { + background-color: #eee; + color: #333; + padding: 0.5em 1em; + border-radius: 0.4em; + margin-right: 0.5em; + cursor: pointer; + border: 1px solid #ccc; +} +div[role="radiogroup"] > label[data-selected="true"] { + background-color: #1b5e20 !important; + color: white !important; + font-weight: bold; + border: 2px solid #145a1a; +} + +/* Style du graphique Plotly */ +.stPlotlyChart text { + font-family: Verdana !important; + fill: black !important; + font-size: 14px !important; +} + +/* Pied de page */ +.wide-footer { + width: 100vw; + margin-left: calc(-50vw + 50%); + margin-top: 2rem; + background-color: #f9f9f9; + box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); + border-bottom: 1px solid #ddd; + text-align: center; + padding-top: 1rem; +} + +.info-footer { + font-size: 1rem !important; + color: #555; + font-weight: 800; +} diff --git a/assets/weakness.png b/assets/weakness.png new file mode 100644 index 0000000..92b3412 Binary files /dev/null and b/assets/weakness.png differ diff --git a/fabnum.py b/fabnum.py new file mode 100644 index 0000000..5663b50 --- /dev/null +++ b/fabnum.py @@ -0,0 +1,974 @@ +import streamlit as st +from networkx.drawing.nx_agraph import read_dot +import pandas as pd +import plotly.graph_objects as go +import networkx as nx +import logging +import altair as alt +import numpy as np +from collections import OrderedDict +from dotenv import load_dotenv +import os +import requests +import re +from tickets_fiche import gerer_tickets_fiche +import base64 +from dateutil import parser +from datetime import datetime, timezone +import copy + +# Configuration Gitea +load_dotenv() + +GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1") +GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") +ORGANISATION = os.getenv("ORGANISATION", "fabnum") +DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches") +ENV = os.getenv("ENV") + +st.set_page_config( + page_title="Fabnum – Analyse de chaîne", + page_icon="assets/weakness.png" +) + +# Intégration du fichier CSS externe +with open("assets/styles.css") as f: + st.markdown(f"", unsafe_allow_html=True) + +header =""" +
+

FabNum - Chaîne de fabrication du numérique

""" + +if ENV == "dev": + header+="

🔧 Vous êtes dans l'environnement de développement.

" +else: + header+="

Parcours de l'écosystème et identification des vulnérabilités.

" + +header+=""" +
+""" + +st.markdown(header, unsafe_allow_html=True) + +def recuperer_date_dernier_commit_schema(): + headers = {"Authorization": f"token " + GITEA_TOKEN} + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path=schema.txt&sha={ENV}" + + try: + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + commits = response.json() + + if commits: + latest_commit_date = parser.isoparse(commits[0]["commit"]["author"]["date"]) + return latest_commit_date + else: + return None + except Exception as e: + st.error(f"Erreur lors de la récupération du dernier commit de schema.txt : {e}") + return None + +def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"): + headers = {"Authorization": f"token " + GITEA_TOKEN} + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/schema.txt?ref={ENV}" + + try: + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + data = response.json() + + remote_last_modified = recuperer_date_dernier_commit_schema() + local_last_modified = local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None + + if not local_last_modified or remote_last_modified > local_last_modified: + dot_text = base64.b64decode(data["content"]).decode("utf-8") + with open(fichier_local, "w", encoding="utf-8") as f: + f.write(dot_text) + + return "OK" + + except Exception as e: + st.error(f"Erreur lors du chargement de schema.txt depuis Gitea : {e}") + return None + +@st.cache_data(ttl=600) +def charger_arborescence_fiches(): + headers = {"Authorization": f"token {GITEA_TOKEN}"} + branche = "dev" if ENV == "dev" else "public" + url_base = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/Documents?ref={branche}" + + try: + response = requests.get(url_base, headers=headers) + response.raise_for_status() + dossiers = response.json() + + arbo = {} + + for dossier in sorted(dossiers, key=lambda d: d['name'].lower()): + if dossier['type'] == 'dir': + dossier_name = dossier['name'] + url_dossier = dossier['url'] + response_dossier = requests.get(url_dossier, headers=headers) + response_dossier.raise_for_status() + fichiers = response_dossier.json() + + fiches = sorted( + [ + {"nom": f["name"], "download_url": f["download_url"]} + for f in fichiers if f["name"].endswith(".md") + ], + key=lambda x: x['nom'].lower() + ) + arbo[dossier_name] = fiches + + return arbo + except Exception as e: + st.error(f"Erreur lors du chargement des fiches : {e}") + return {} + +def couleur_noeud(n, niveaux, G): + niveau = niveaux.get(n, 99) + attrs = G.nodes[n] + + # Niveau 99 : pays géographique avec isg + if niveau == 99: + isg = int(attrs.get("isg", -1)) + if isg >= 60: + return "darkred" + elif isg >= 31: + return "orange" + elif isg >= 0: + return "darkgreen" + else: + return "gray" + + # Niveau 11 ou 12 connecté à un pays géographique + if niveau in (11, 12): + for succ in G.successors(n): + if niveaux.get(succ) == 99: + isg = int(G.nodes[succ].get("isg", -1)) + if isg >= 60: + return "darkred" + elif isg >= 31: + return "orange" + elif isg >= 0: + return "darkgreen" + else: + return "gray" + + # Logique existante pour IHH / IVC + if niveau == 10 and attrs.get("ihh_pays"): + ihh = int(attrs["ihh_pays"]) + if ihh <= 15: + return "darkgreen" + elif ihh <= 25: + return "orange" + else: + return "darkred" + elif niveau == 2 and attrs.get("ivc"): + ivc = int(attrs["ivc"]) + if ivc <= 15: + return "darkgreen" + elif ivc <= 30: + return "orange" + else: + return "darkred" + + return "lightblue" + +def recuperer_donnees(graph, noeuds): + donnees = [] + criticite = {} + + for noeud in noeuds: + try: + operation, minerai = noeud.split('_', 1) + except ValueError: + logging.warning(f"Nom de nœud inattendu : {noeud}") + continue + + if operation == "Traitement": + try: + fabrications = list(graph.predecessors(minerai)) + valeurs = [ + int(float(graph.get_edge_data(f, minerai)[0].get('criticite', 0)) * 100) + for f in fabrications + if graph.get_edge_data(f, minerai) + ] + if valeurs: + criticite[minerai] = round(sum(valeurs) / len(valeurs)) + except Exception as e: + logging.warning(f"Erreur lors du calcul de criticité pour {noeud} : {e}") + criticite[minerai] = 50 + + for noeud in noeuds: + try: + operation, minerai = noeud.split('_', 1) + ihh_pays = int(graph.nodes[noeud].get('ihh_pays', 0)) + ihh_acteurs = int(graph.nodes[noeud].get('ihh_acteurs', 0)) + criticite_val = criticite.get(minerai, 50) + criticite_cat = 1 if criticite_val <= 33 else (2 if criticite_val <= 66 else 3) + + donnees.append({ + 'categorie': operation, + 'nom': minerai, + 'ihh_pays': ihh_pays, + 'ihh_acteurs': ihh_acteurs, + 'criticite_minerai': criticite_val, + 'criticite_cat': criticite_cat + }) + except Exception as e: + logging.error(f"Erreur sur le nœud {noeud} : {e}", exc_info=True) + + return pd.DataFrame(donnees) + +def afficher_graphique_altair(df): + ordre_personnalise = ['Assemblage', 'Fabrication', 'Traitement', 'Extraction'] + categories = [cat for cat in ordre_personnalise if cat in df['categorie'].unique()] + for cat in categories: + st.markdown(f"### {cat}") + df_cat = df[df['categorie'] == cat].copy() + + # Appliquer un jitter contrôlé pour disperser les nœuds proches + from collections import Counter + coord_pairs = list(zip(df_cat['ihh_pays'].round(1), df_cat['ihh_acteurs'].round(1))) + counts = Counter(coord_pairs) + + offset_x = [] + offset_y = {} + seen = Counter() + for pair in coord_pairs: + rank = seen[pair] + seen[pair] += 1 + if counts[pair] > 1: + angle = rank * 1.5 # écart angulaire + radius = 0.8 + 0.4 * rank # spiral growing radius + offset_x.append(radius * np.cos(angle)) + offset_y[pair] = radius * np.sin(angle) + else: + offset_x.append(0) + offset_y[pair] = 0 + + df_cat['ihh_pays'] += offset_x + df_cat['ihh_acteurs'] += [offset_y[p] for p in coord_pairs] + + # Décalage des étiquettes pour les relier + df_cat['ihh_pays_text'] = df_cat['ihh_pays'] + 0.5 + df_cat['ihh_acteurs_text'] = df_cat['ihh_acteurs'] + 0.5 + + base = alt.Chart(df_cat).encode( + x=alt.X('ihh_pays:Q', title='IHH Pays (%)'), + y=alt.Y('ihh_acteurs:Q', title='IHH Acteurs (%)'), + size=alt.Size('criticite_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None), + color=alt.Color('criticite_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])) + ) + + points = base.mark_circle(opacity=0.6) + + lines = alt.Chart(df_cat).mark_rule(strokeWidth=0.5, color='gray').encode( + x='ihh_pays:Q', + x2='ihh_pays_text:Q', + y='ihh_acteurs:Q', + y2='ihh_acteurs_text:Q' + ) + + label_chart = alt.Chart(df_cat).mark_text( + align='left', dx=3, dy=-3, + fontSize=8, font='Arial', angle=335 + ).encode( + x='ihh_pays_text:Q', + y='ihh_acteurs_text:Q', + text='nom:N' + ) + + hline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15)) + hline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25)) + vline_15 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15)) + vline_25 = alt.Chart(df_cat).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25)) + + chart = (points + lines + label_chart + hline_15 + hline_25 + vline_15 + vline_25).properties( + width=500, + height=400, + title=f"Concentration et criticité – {cat}" + ).interactive() + + st.altair_chart(chart, use_container_width=True) + +def lancer_visualisation_ihh_criticite(graph): + niveaux = nx.get_node_attributes(graph, "niveau") + noeuds = [n for n, v in niveaux.items() if v == "10" and "Reserves" not in n] + noeuds.sort() + + df = recuperer_donnees(graph, noeuds) + if df.empty: + st.warning("Aucune donnée à visualiser.") + else: + afficher_graphique_altair(df) + +def extraire_chemins_depuis(G, source): + chemins = [] + stack = [(source, [source])] + while stack: + (node, path) = stack.pop() + voisins = list(G.successors(node)) + if not voisins: + chemins.append(path) + else: + for voisin in voisins: + if voisin not in path: + stack.append((voisin, path + [voisin])) + return chemins + +def extraire_chemins_vers(G, target, niveau_demande): + chemins = [] + reverse_G = G.reverse() + niveaux = nx.get_node_attributes(G, "niveau") + stack = [(target, [target])] + + while stack: + (node, path) = stack.pop() + voisins = list(reverse_G.successors(node)) + if not voisins: + chemin_inverse = list(reversed(path)) + contient_niveau = any( + int(niveaux.get(n, -1)) == niveau_demande for n in chemin_inverse + ) + if contient_niveau: + chemins.append(chemin_inverse) + else: + for voisin in voisins: + if voisin not in path: + stack.append((voisin, path + [voisin])) + + return chemins + +def afficher_sankey( + G, + niveau_depart, niveau_arrivee, + noeuds_depart=None, noeuds_arrivee=None, + filtrer_criticite=False, filtrer_ivc=False, filtrer_ihh=False, + filtrer_isg=False, + logique_filtrage="OU" +): + + niveaux = {} + + for node, attrs in G.nodes(data=True): + # Conversion du niveau + niveau_str = attrs.get("niveau") + try: + if niveau_str: + niveaux[node] = int(str(niveau_str).strip('"')) + except ValueError: + logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}") + + # Suppression des attributs indésirables + ATTRIBUTS_SUPPRIMES = {"fillcolor", "fontcolor", "style", "fontsize"} + for attr in ATTRIBUTS_SUPPRIMES: + attrs.pop(attr, None) + + # Réordonner : label d'abord + if "label" in attrs: + reordered = OrderedDict() + reordered["label"] = attrs["label"] + for k, v in attrs.items(): + if k != "label": + reordered[k] = v + G.nodes[node].clear() + G.nodes[node].update(reordered) + + chemins = [] + if noeuds_depart and noeuds_arrivee: + for nd in noeuds_depart: + for na in noeuds_arrivee: + tous_chemins = extraire_chemins_depuis(G, nd) + chemins.extend( + [chemin for chemin in tous_chemins if na in chemin]) + elif noeuds_depart: + for nd in noeuds_depart: + chemins.extend(extraire_chemins_depuis(G, nd)) + elif noeuds_arrivee: + for na in noeuds_arrivee: + chemins.extend(extraire_chemins_vers(G, na, niveau_depart)) + else: + sources_depart = [n for n in G.nodes() if niveaux.get(n) + == niveau_depart] + for nd in sources_depart: + chemins.extend(extraire_chemins_depuis(G, nd)) + + def extraire_criticite(u, v): + data = G.get_edge_data(u, v) + if not data: + return 0 + if isinstance(data, dict) and all(isinstance(k, int) for k in data): + try: + return float(data[0].get("criticite", 0)) + except: + return 0 + return float(data.get("criticite", 0)) + + liens_chemins = set() + chemins_filtres = set() + for chemin in chemins: + has_ihh = False + has_ivc = False + has_criticite = False + has_isg_critique = False + + for i in range(len(chemin)-1): + u, v = chemin[i], chemin[i+1] + if niveaux.get(u) is not None and niveaux.get(v) is not None: + if niveau_depart <= niveaux.get(u) <= niveau_arrivee and niveau_depart <= niveaux.get(v) <= niveau_arrivee: + liens_chemins.add((u, v)) + # vérification des conditions critiques + if filtrer_ihh: + if filtrer_ihh and ihh_type: + ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs" + if niveaux.get(u) == 10 and G.nodes[u].get(ihh_field) and int(G.nodes[u][ihh_field]) > 25: + has_ihh = True + elif niveaux.get(v) == 10 and G.nodes[v].get(ihh_field) and int(G.nodes[v][ihh_field]) > 25: + has_ihh = True + if filtrer_ivc and niveaux.get(u) == 2 and G.nodes[u].get("ivc") and int(G.nodes[u]["ivc"]) > 30: + has_ivc = True + if filtrer_criticite and niveaux.get(u) == 1 and niveaux.get(v) == 2 and extraire_criticite(u, v) > 0.66: + has_criticite = True + # Vérifie présence d'un isg >= 60 + for n in (u, v): + if niveaux.get(n) == 99: + isg = int(G.nodes[n].get("isg", 0)) + if isg >= 60: + has_isg_critique = True + elif niveaux.get(n) in (11, 12): + for succ in G.successors(n): + if niveaux.get(succ) == 99 and int(G.nodes[succ].get("isg", 0)) >= 60: + has_isg_critique = True + + + if logique_filtrage == "ET": + keep = True + if filtrer_ihh: + keep = keep and has_ihh + if filtrer_ivc: + keep = keep and has_ivc + if filtrer_criticite: + keep = keep and has_criticite + if filtrer_isg: + keep = keep and has_isg_critique + if keep: + chemins_filtres.add(tuple(chemin)) + elif logique_filtrage == "OU": + if (filtrer_ihh and has_ihh) or \ + (filtrer_ivc and has_ivc) or \ + (filtrer_criticite and has_criticite) or \ + (filtrer_isg and has_isg_critique): + chemins_filtres.add(tuple(chemin)) + + if any([filtrer_criticite, filtrer_ivc, filtrer_ihh, filtrer_isg]): + chemins = list(chemins_filtres) + liens_chemins = set() + for chemin in chemins: + for i in range(len(chemin) - 1): + u, v = chemin[i], chemin[i + 1] + if niveau_depart <= niveaux.get(u, 999) <= niveau_arrivee and niveau_depart <= niveaux.get(v, 999) <= niveau_arrivee: + liens_chemins.add((u, v)) + + if not liens_chemins: + st.warning("Aucun chemin ne correspond aux critères.") + return + + df_liens = pd.DataFrame(list(liens_chemins), columns=["source", "target"]) + df_liens = df_liens.groupby( + ["source", "target"]).size().reset_index(name="value") + + df_liens["criticite"] = df_liens.apply( + lambda row: extraire_criticite(row["source"], row["target"]), axis=1) + df_liens["value"] = 0.1 + + # Ne garder que les nœuds effectivement connectés + noeuds_utilises = set(df_liens["source"]) | set(df_liens["target"]) + sorted_nodes = [n for n in sorted(G.nodes(), key=lambda x: niveaux.get(x, 99), reverse=True) if n in noeuds_utilises] + + + def couleur_criticite(p): + if p <= 0.33: + return "darkgreen" + elif p <= 0.66: + return "orange" + else: + return "darkred" + + df_liens["color"] = df_liens.apply( + lambda row: couleur_criticite(row["criticite"]) if niveaux.get( + row["source"]) == 1 and niveaux.get(row["target"]) == 2 else "gray", + axis=1 + ) + + all_nodes = pd.unique(df_liens[["source", "target"]].values.ravel()) + sorted_nodes = sorted( + all_nodes, key=lambda x: niveaux.get(x, 99), reverse=True) + node_indices = {name: i for i, name in enumerate(sorted_nodes)} + + sources = df_liens["source"].map(node_indices).tolist() + targets = df_liens["target"].map(node_indices).tolist() + values = df_liens["value"].tolist() + + customdata = [] + for n in sorted_nodes: + info = [f"{k}: {v}" for k, v in G.nodes[n].items()] + niveau = niveaux.get(n, 99) + + # Ajout d’un ISG hérité si applicable + if niveau in (11, 12): + for succ in G.successors(n): + if niveaux.get(succ) == 99 and "isg" in G.nodes[succ]: + isg_val = G.nodes[succ]["isg"] + info.append(f"isg (géographique): {isg_val}") + break + + customdata.append("
".join(info)) + + def edge_info(u, v): + data = G.get_edge_data(u, v) + if not data: + return f"Relation : {u} → {v}" + if isinstance(data, dict) and all(isinstance(k, int) for k in data): + data = data[0] + base = [f"{k}: {v}" for k, v in data.items()] + return f"Relation : {u} → {v}
" + "
".join(base) + + link_customdata = [ + edge_info(row["source"], row["target"]) for _, row in df_liens.iterrows() + ] + + fig = go.Figure(go.Sankey( + arrangement="snap", + node=dict( + pad=10, + thickness=8, + label=sorted_nodes, + x=[niveaux.get(n, 99) / 100 for n in sorted_nodes], + color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes], + customdata=customdata, + hovertemplate="%{customdata}" + ), + link=dict( + source=sources, + target=targets, + value=values, + color=df_liens["color"].tolist(), + customdata=link_customdata, + hovertemplate="%{customdata}" + ) + )) + + fig.update_layout( + title_text="Hiérarchie filtrée par niveaux et noeuds", + paper_bgcolor="white", + plot_bgcolor="white" + ) + st.plotly_chart(fig) + +def creer_graphes(donnees): + if not donnees: + st.warning("Aucune donnée à afficher.") + return + + try: + df = pd.DataFrame(donnees) + df['ivc_cat'] = df['ivc'].apply(lambda x: 1 if x <= 15 else (2 if x <= 30 else 3)) + + # Jitter contrôlé pour dispersion + from collections import Counter + coord_pairs = list(zip(df['ihh_extraction'].round(1), df['ihh_reserves'].round(1))) + counts = Counter(coord_pairs) + + offset_x, offset_y = [], {} + seen = Counter() + for pair in coord_pairs: + rank = seen[pair] + seen[pair] += 1 + if counts[pair] > 1: + angle = rank * 1.5 + radius = 0.8 + 0.4 * rank + offset_x.append(radius * np.cos(angle)) + offset_y[pair] = radius * np.sin(angle) + else: + offset_x.append(0) + offset_y[pair] = 0 + + df['ihh_extraction'] += offset_x + df['ihh_reserves'] += [offset_y[p] for p in coord_pairs] + + # Position des étiquettes + df['ihh_extraction_text'] = df['ihh_extraction'] + 0.5 + df['ihh_reserves_text'] = df['ihh_reserves'] + 0.5 + + base = alt.Chart(df).encode( + x=alt.X('ihh_extraction:Q', title='IHH Extraction (%)'), + y=alt.Y('ihh_reserves:Q', title='IHH Réserves (%)'), + size=alt.Size('ivc_cat:Q', scale=alt.Scale(domain=[1, 2, 3], range=[50, 500, 1000]), legend=None), + color=alt.Color('ivc_cat:N', scale=alt.Scale(domain=[1, 2, 3], range=['darkgreen', 'orange', 'darkred'])), + tooltip=['nom:N', 'ivc:Q', 'ihh_extraction:Q', 'ihh_reserves:Q'] + ) + + points = base.mark_circle(opacity=0.6) + + lines = alt.Chart(df).mark_rule(strokeWidth=0.5, color='gray').encode( + x='ihh_extraction:Q', x2='ihh_extraction_text:Q', + y='ihh_reserves:Q', y2='ihh_reserves_text:Q' + ) + + labels = alt.Chart(df).mark_text( + align='left', dx=10, dy=-10, fontSize=10, font='Arial', angle=335 + ).encode( + x='ihh_extraction_text:Q', + y='ihh_reserves_text:Q', + text='nom:N' + ) + + # Lignes seuils + hline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(y=alt.datum(15)) + hline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(y=alt.datum(25)) + vline_15 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='green').encode(x=alt.datum(15)) + vline_25 = alt.Chart(df).mark_rule(strokeDash=[2,2], color='red').encode(x=alt.datum(25)) + + chart = (points + lines + labels + hline_15 + hline_25 + vline_15 + vline_25).properties( + width=600, + height=500, + title="Concentration des ressources critiques vs vulnérabilité IVC" + ).interactive() + + st.altair_chart(chart, use_container_width=True) + + except Exception as e: + st.error(f"Erreur lors de la création du graphique : {e}") + +def recuperer_donnees_2(graph, noeuds_2): + donnees = [] + for minerai in noeuds_2: + try: + missing = [] + if not graph.has_node(minerai): + missing.append(minerai) + if not graph.has_node(f"Extraction_{minerai}"): + missing.append(f"Extraction_{minerai}") + if not graph.has_node(f"Reserves_{minerai}"): + missing.append(f"Reserves_{minerai}") + + if missing: + print(f"⚠️ Nœuds manquants pour {minerai} : {', '.join(missing)} — Ignoré.") + continue + + ivc = int(graph.nodes[minerai].get('ivc', 0)) + ihh_extraction_pays = int(graph.nodes[f"Extraction_{minerai}"].get('ihh_pays', 0)) + ihh_reserves_pays = int(graph.nodes[f"Reserves_{minerai}"].get('ihh_pays', 0)) + + donnees.append({ + 'nom': minerai, + 'ivc': ivc, + 'ihh_extraction': ihh_extraction_pays, + 'ihh_reserves': ihh_reserves_pays + }) + except Exception as e: + print(f"Erreur avec le nœud {minerai} : {e}") + return donnees + +def lancer_visualisation_ihh_ivc(graph): + try: + noeuds_niveau_2 = [ + n for n, data in graph.nodes(data=True) + if data.get("niveau") == "2" and "ivc" in data + ] + + if not noeuds_niveau_2: + # print("⚠️ Aucun nœud de niveau 2 avec un IVC détecté.") + return + + data = recuperer_donnees_2(graph, noeuds_niveau_2) + creer_graphes(data) + + except Exception as e: + print(f"Erreur lors du traitement du fichier DOT : {e}") + +def afficher_fiches(): + if "fiches_arbo" not in st.session_state: + st.session_state["fiches_arbo"] = charger_arborescence_fiches() + + arbo = st.session_state.get("fiches_arbo", {}) + if not arbo: + st.warning("Aucune fiche disponible pour le moment.") + return + + dossiers = sorted(arbo.keys(), key=lambda x: x.lower()) + dossier_choisi = st.selectbox("📁 Choisissez un dossier", ["-- Sélectionner un dossier --"] + dossiers) + + if dossier_choisi and dossier_choisi != "-- Sélectionner un dossier --": + fiches = arbo.get(dossier_choisi, []) + noms_fiches = [f['nom'] for f in fiches] + + fiche_choisie = st.selectbox("🗂️ Choisissez une fiche", ["-- Sélectionner une fiche --"] + noms_fiches) + + if fiche_choisie and fiche_choisie != "-- Sélectionner une fiche --": + fiche_info = next((f for f in fiches if f["nom"] == fiche_choisie), None) + if fiche_info: + try: + headers = {"Authorization": f"token {GITEA_TOKEN}"} + reponse_fiche = requests.get(fiche_info["download_url"], headers=headers) + reponse_fiche.raise_for_status() + contenu_md = reponse_fiche.content.decode("utf-8") + + # Traitement du markdown + lignes = contenu_md.split('\n') + contenu_niveau_1 = [] + sections_niveau_2 = {} + section_actuelle = None + + for ligne in lignes: + if re.match(r'^##[^#]', ligne): + section_actuelle = ligne.strip('# ').strip() + sections_niveau_2[section_actuelle] = [] + elif section_actuelle: + sections_niveau_2[section_actuelle].append(ligne) + else: + contenu_niveau_1.append(ligne) + + if contenu_niveau_1: + st.markdown("\n".join(contenu_niveau_1), unsafe_allow_html=True) + + for titre, contenu in sections_niveau_2.items(): + with st.expander(titre): + st.markdown("\n".join(contenu), unsafe_allow_html=True) + + gerer_tickets_fiche(fiche_choisie) + + except Exception as e: + st.error(f"Erreur lors du chargement de la fiche : {e}") + +def afficher_fiches_old(): + import streamlit as st + from pathlib import Path + + base_path = Path("Fiches") + if not base_path.exists(): + st.warning("Le dossier 'Fiches' est introuvable.") + return + + dossiers = sorted([p for p in base_path.iterdir() if p.is_dir() and 'Criticités' not in p.name]) + criticite_path = next((p for p in base_path.iterdir() if p.is_dir() and 'Criticités' in p.name), None) + if criticite_path: + dossiers.append(criticite_path) + + noms_dossiers = [d.name for d in dossiers] + + dossier_choisi = st.selectbox("📁 Dossiers disponibles", noms_dossiers) + chemin_dossier = base_path / dossier_choisi + fichiers_md = sorted(chemin_dossier.glob("*.md")) + noms_fichiers = [] + fichiers_dict = {} + for f in fichiers_md: + try: + with f.open(encoding="utf-8") as md: + titre = md.readline().strip() + if ':' in titre: + titre = titre.split(':', 1)[1].strip() + else: + titre = f.stem + fichiers_dict[titre] = f.name + noms_fichiers.append(titre) + except Exception as e: + st.error(f"Erreur lecture fichier {f.name} : {e}") + + noms_fichiers.sort() + fiche_label = st.selectbox("🗂️ Fiches Markdown", noms_fichiers) + fichier_choisi = fichiers_dict[fiche_label] + + + chemin_fichier = chemin_dossier / fichier_choisi + with chemin_fichier.open(encoding="utf-8") as f: + contenu = f.read() + st.markdown(contenu, unsafe_allow_html=True) + + +niveau_labels = { + 0: "Produit final", + 1: "Composant", + 2: "Minerai", + 10: "Opération", + 11: "Pays d'opération", + 12: "Acteur d'opération", + 99: "Pays géographique" +} +inverse_niveau_labels = {v: k for k, v in niveau_labels.items()} + +DOT_FILE = "schema.txt" +charger_schema_depuis_gitea(DOT_FILE) + +# Charger le graphe une seule fois +try: + dot_file_path = True + if "G_temp" not in st.session_state: + if charger_schema_depuis_gitea(DOT_FILE): + st.session_state["G_temp"] = read_dot(DOT_FILE) + st.session_state["G_temp_ivc"] = st.session_state["G_temp"].copy() + else: + dot_file_path = False + G_temp = st.session_state["G_temp"] + G_temp_ivc = st.session_state["G_temp_ivc"] +except: + st.error("Erreur de lecture du fichier DOT") + dot_file_path = False + +if dot_file_path: + st.markdown(""" +
+ """, unsafe_allow_html=True) + + with st.sidebar: + st.markdown("---") + st.header("Navigation") + st.markdown("---") + if "onglet" not in st.session_state: + st.session_state.onglet = "Instructions" + + if st.button("📄 Instructions"): + st.session_state.onglet = "Instructions" + if st.button("🔍 Analyse"): + st.session_state.onglet = "Analyse" + if st.button("📊 Visualisations"): + st.session_state.onglet = "Visualisations" + if st.button("📚 Fiches"): + st.session_state.onglet = "Fiches" + + st.markdown("---") + + if st.session_state.onglet == "Instructions": + with open("Instructions.md", "r", encoding="utf-8") as f: + markdown_content = f.read() + st.markdown(markdown_content) + + elif st.session_state.onglet == "Analyse": + try: + niveaux_temp = { + node: int(str(attrs.get("niveau")).strip('"')) + for node, attrs in G_temp.nodes(data=True) + if attrs.get("niveau") and str(attrs.get("niveau")).strip('"').isdigit() + } + G_temp.remove_nodes_from([n for n in G_temp.nodes() if n not in niveaux_temp]) + G_temp.remove_nodes_from( + [n for n in G_temp.nodes() if niveaux_temp.get(n) == 10 and 'Reserves' in n]) + + st.markdown("---") + st.markdown("**Sélection du niveau des nœuds de départ et d'arrivée pour choisir la zone à analyser**") + st.markdown("Sélectionner le niveau de départ qui donnera les nœuds de gauche") + niveau_choix = ["-- Sélectionner un niveau --"] + list(niveau_labels.values()) + + niveau_depart_label = st.selectbox("Niveau de départ", niveau_choix, key="analyse_niveau_depart") + + if niveau_depart_label != "-- Sélectionner un niveau --": + niveau_depart = inverse_niveau_labels[niveau_depart_label] + + niveaux_arrivee_possibles = [v for k, v in niveau_labels.items() if k > niveau_depart] + + st.markdown("Sélectionner le niveau d'arrivée qui donnera les nœuds de droite") + + niveaux_arrivee_choix = ["-- Sélectionner un niveau --"] + niveaux_arrivee_possibles + + niveau_arrivee_label = st.selectbox("Niveau d'arrivée", niveaux_arrivee_choix, key="analyse_niveau_arrivee") + + st.markdown("---") + + if niveau_arrivee_label != "-- Sélectionner un niveau --": + niveau_arrivee = inverse_niveau_labels[niveau_arrivee_label] + + depart_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_depart] + arrivee_nodes = [n for n in G_temp.nodes() if niveaux_temp.get(n) == niveau_arrivee] + + st.markdown("**Sélection fine des items du niveau de départ et d'arrivée**") + st.markdown("Sélectionner un ou plusieurs items du niveau de départ") + + noeuds_depart = st.multiselect("Filtrer par noeuds de départ (optionnel)", sorted(depart_nodes), key="analyse_noeuds_depart") + + st.markdown("Sélectionner un ou plusieurs items du niveau d'arrivée") + + noeuds_arrivee = st.multiselect("Filtrer par noeuds d'arrivée (optionnel)", sorted(arrivee_nodes), key="analyse_noeuds_arrivee") + + st.markdown("---") + + noeuds_depart = noeuds_depart if noeuds_depart else None + noeuds_arrivee = noeuds_arrivee if noeuds_arrivee else None + + st.markdown("**Sélection des filtres pour identifier les vulnérabilités**") + + filtrer_criticite = st.checkbox("Filtrer les chemins contenant au moins minerai critique pour un composant (ICS > 66 %)", key="analyse_filtrer_criticite") + filtrer_ivc = st.checkbox("Filtrer les chemins contenant au moins un minerai critique par rapport à la concurrence sectorielle (IVC > 30)", key="analyse_filtrer_ivc") + filtrer_ihh = st.checkbox("Filtrer les chemins contenant au moins une opération critique par rapport à la concentration géographique ou industrielle (IHH pays ou acteurs > 25)", key="analyse_filtrer_ihh") + + ihh_type = None + if filtrer_ihh: + ihh_type = st.radio("Appliquer le filtre IHH sur :", ["Pays", "Acteurs"], horizontal=True, key="analyse_ihh_type") + + filtrer_isg = st.checkbox("Filtrer les chemins contenant un pays instable (ISG ≥ 60)", key="analyse_filtrer_isg") + logique_filtrage = st.radio("Logique de filtrage", ["OU", "ET"], horizontal=True, key="analyse_logique_filtrage") + + st.markdown("---") + + if st.button("Lancer l’analyse", type="primary", key="analyse_lancer"): + afficher_sankey( + G_temp, + niveau_depart=niveau_depart, + niveau_arrivee=niveau_arrivee, + noeuds_depart=noeuds_depart, + noeuds_arrivee=noeuds_arrivee, + filtrer_criticite=filtrer_criticite, + filtrer_ivc=filtrer_ivc, + filtrer_ihh=filtrer_ihh, + filtrer_isg=filtrer_isg, + logique_filtrage=logique_filtrage + ) + + except Exception as e: + st.error(f"Erreur de prévisualisation du graphe : {e}") + + elif st.session_state.onglet == "Visualisations": + st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs Criticité** + +Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. + +Taille des points = criticité substituabilité du minerai +""") + if st.button("Lancer", key="btn_ihh_criticite"): + try: + lancer_visualisation_ihh_criticite(G_temp) + except Exception as e: + st.error(f"Erreur dans la visualisation IHH vs Criticité : {e}") + + st.markdown("""**Indice de Herfindahl-Hirschmann - IHH vs IVC** + +Entre 0 et 15%, concentration faible, entre 15 et 25%, modérée, au-delà, forte. + +Taille des points = criticité concurrentielle du minerai +""") + + if st.button("Lancer", key="btn_ihh_ivc"): + try: + lancer_visualisation_ihh_ivc(G_temp_ivc) + except Exception as e: + st.error(f"Erreur dans la visualisation IHH vs IVC : {e}") + + elif st.session_state.onglet == "Fiches": + st.markdown("---") + st.markdown("**Affichage des fiches**") + st.markdown("Sélectionner d'abord l'opération que vous souhaitez examiner et ensuite choisisez la fiche à lire.") + st.markdown("---") + afficher_fiches() + + st.markdown("
", unsafe_allow_html=True) + +st.markdown(""" + +""", unsafe_allow_html=True +) diff --git a/launch_fabnum.sh b/launch_fabnum.sh new file mode 100755 index 0000000..5c7c0b8 --- /dev/null +++ b/launch_fabnum.sh @@ -0,0 +1,24 @@ +#!/bin/bash +# Lancer fabnum avec Streamlit, selon l'environnement défini dans .env + +# Aller dans le dossier du script +cd "$(dirname "$0")" + +# Charger l'environnement Python +source venv/bin/activate + +# Charger les variables d'environnement définies dans .env +if [ -f .env ]; then + export $(grep -v '^#' .env | xargs) +else + echo "⚠️ Fichier .env manquant !" + exit 1 +fi + +# Valeur par défaut si PORT non défini +PORT=${PORT:-8501} + +echo "🔄 Lancement de Fabnum ($ENV) sur le port $PORT..." + +# Exécuter streamlit via l'interpréteur du venv +exec venv/bin/streamlit run fabnum.py --server.address=127.0.0.1 --server.port=$PORT diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..c9a67d1 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,7 @@ +streamlit +networkx +pygraphviz +pandas +plotly +requests +kaleido>=0.2.1 diff --git a/tickets_fiche.py b/tickets_fiche.py new file mode 100644 index 0000000..5a39539 --- /dev/null +++ b/tickets_fiche.py @@ -0,0 +1,371 @@ +import streamlit as st +from datetime import datetime +from dateutil import parser +from collections import defaultdict +import os +import csv +import requests +import base64 +import re +import json +import html + +# Configuration Gitea +GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1") +GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") +ORGANISATION = os.getenv("ORGANISATION", "fabnum") +DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches") +ENV = os.getenv("ENV") + +def charger_fiches_et_labels(): + chemin_csv = os.path.join("assets", "fiches_labels.csv") + dictionnaire_fiches = {} + + try: + with open(chemin_csv, mode="r", encoding="utf-8") as fichier_csv: + lecteur = csv.DictReader(fichier_csv) + for ligne in lecteur: + fiche = ligne.get("Fiche") + operations = ligne.get("Label opération") + item = ligne.get("Label item") + + if fiche and operations and item: + dictionnaire_fiches[fiche.strip()] = { + "operations": [op.strip() for op in operations.split("/")], + "item": item.strip() + } + except FileNotFoundError: + st.error(f"❌ Le fichier {chemin_csv} est introuvable.") + except Exception as e: + st.error(f"❌ Erreur lors du chargement des fiches : {str(e)}") + + return dictionnaire_fiches + +def rechercher_tickets_gitea(fiche_selectionnee): + headers = {"Authorization": f"token " + GITEA_TOKEN} + params = {"state": "open"} + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues" + + try: + reponse = requests.get(url, headers=headers, params=params, timeout=10) + reponse.raise_for_status() + issues = reponse.json() + + correspondances = charger_fiches_et_labels() + cible = correspondances.get(fiche_selectionnee) + + if not cible: + return [] + + labels_cibles = set(cible["operations"] + [cible["item"]]) + + tickets_associes = [] + for issue in issues: + if issue.get("ref") != f"refs/heads/{ENV}": + continue + issue_labels = set() + for label in issue.get("labels", []): + if isinstance(label, dict) and "name" in label: + issue_labels.add(label["name"]) + + if labels_cibles.issubset(issue_labels): + tickets_associes.append(issue) + + return tickets_associes + + except requests.RequestException as e: + st.error(f"Erreur lors de la récupération des tickets : {e}") + return [] + +def extraire_statut_par_label(ticket): + labels = [label.get('name', '') for label in ticket.get('labels', [])] + for statut in ["Backlog", "En attente de traitement", "En cours", "Terminés", "Non retenus"]: + if statut in labels: + return statut + return "Autres" + +def afficher_tickets_par_fiche(tickets): + if not tickets: + st.info("Aucun ticket lié à cette fiche.") + return + + st.markdown("📝 **Tickets associés à cette fiche**") + + tickets_groupes = defaultdict(list) + for ticket in tickets: + statut = extraire_statut_par_label(ticket) + tickets_groupes[statut].append(ticket) + + st.info(f" ⤇ {len(tickets_groupes["Backlog"])} ticket(s) en attente de modération ne sont pas affichés.") + + ordre_statuts = ["En attente de traitement", "En cours", "Terminés", "Non retenus", "Autres"] + + for statut in ordre_statuts: + if tickets_groupes[statut]: + with st.expander(f"{statut} ({len(tickets_groupes[statut])})", expanded=(statut == "En cours")): + for ticket in tickets_groupes[statut]: + afficher_carte_ticket(ticket) + +def recuperer_commentaires_ticket(issue_index): + headers = { + "Authorization": f"token {GITEA_TOKEN}" + } + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues/{issue_index}/comments" + try: + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + commentaires = response.json() + return commentaires + except Exception as e: + st.error(f"Erreur lors de la récupération des commentaires pour le ticket {issue_index} : {e}") + return [] + +def afficher_carte_ticket(ticket): + titre = ticket.get("title", "Sans titre") + url = ticket.get("html_url", "") + user = ticket.get("user", {}).get("login", "inconnu") + created = ticket.get("created_at", "") + updated = ticket.get("updated_at", "") + body = ticket.get("body", "") + labels = [l["name"] for l in ticket.get("labels", []) if "name" in l] + + sujet = "" + match = re.search(r"## Sujet de la proposition\s+(.+?)(\n|$)", body, re.DOTALL) + if match: + sujet = match.group(1).strip() + + if created: + try: + dt_created = parser.isoparse(created) + date_created_str = dt_created.strftime("%d/%m/%Y") + except Exception: + date_created_str = "Date inconnue" + else: + date_created_str = "Date inconnue" + + if updated and updated != created: + try: + dt_updated = parser.isoparse(updated) + date_updated_str = dt_updated.strftime("%d/%m/%Y") + maj_info = f"(MAJ {date_updated_str})" + except Exception: + maj_info = "" + else: + maj_info = "" + + commentaires = recuperer_commentaires_ticket(ticket.get("number")) + + commentaires_html = '' + if commentaires: + for commentaire in commentaires: + auteur = html.escape(commentaire.get('user', {}).get('login', 'inconnu')) + contenu = html.escape(commentaire.get('body', '')) + date_commentaire = commentaire.get('created_at', '') + if date_commentaire: + try: + dt_comment = parser.isoparse(date_commentaire) + date_commentaire_str = dt_comment.strftime("%d/%m/%Y") + except Exception: + date_commentaire_str = "" + else: + date_commentaire_str = "" + + commentaires_html += f""" +
+

{auteur} ({date_commentaire_str})

+

{contenu}

+
+ """ + else: + commentaires_html = '

Aucun commentaire.

' + + with st.container(): + st.markdown(f""" +
+

🎫 {titre}

+

Ouvert par {html.escape(user)} le {date_created_str} {maj_info}

+

Sujet de la proposition : {html.escape(sujet)}

+

{' • '.join(html.escape(label) for label in labels) if labels else 'aucun'}

+
+
+ """, unsafe_allow_html=True) + + st.markdown("**Contenu du ticket :**") + st.markdown(body, unsafe_allow_html=False) + + st.markdown("---") + st.markdown("**Commentaires :**") + st.markdown(commentaires_html, unsafe_allow_html=True) + +def get_labels_existants(): + headers = {"Authorization": f"token {GITEA_TOKEN}"} + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/labels" + try: + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + labels_data = response.json() + return {label['name']: label['id'] for label in labels_data} + except Exception as e: + st.error(f"Erreur lors de la récupération des labels existants : {e}") + return {} + + +def creer_ticket_gitea(titre, corps, labels): + headers = { + "Authorization": f"token {GITEA_TOKEN}", + "Content-Type": "application/json" + } + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues" + + data = { + "title": titre, + "body": corps, + "labels": labels, + "ref": f"refs/heads/{ENV}" + } + + try: + response = requests.post(url, headers=headers, data=json.dumps(data), timeout=10) + response.raise_for_status() + issue = response.json() + issue_url = issue.get("html_url", "") + if issue_url: + st.success(f"✅ Ticket créé avec succès ! [Voir le ticket]({issue_url})") + else: + st.success("✅ Ticket créé avec succès !") + except Exception as e: + st.error(f"❌ Erreur lors de la création du ticket : {e}") + +def charger_modele_ticket(): + headers = {"Authorization": f"token {GITEA_TOKEN}"} + url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/.gitea/ISSUE_TEMPLATE/Contenu.md" + + try: + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + contenu_base64 = response.json().get("content", "") + contenu = base64.b64decode(contenu_base64).decode("utf-8") + return contenu + except Exception as e: + st.error(f"Erreur lors du chargement du modèle de ticket : {e}") + return "" + +def gerer_tickets_fiche(fiche_selectionnee): + st.markdown(""" +
+ """, unsafe_allow_html=True) + st.markdown("### 🧾 Gestion des tickets pour cette fiche") + + tickets = rechercher_tickets_gitea(fiche_selectionnee) + afficher_tickets_par_fiche(tickets) + formulaire_creation_ticket_dynamique(fiche_selectionnee) + +# Modification de formulaire_creation_ticket_dynamique pour ajouter Annuler +def formulaire_creation_ticket_dynamique(fiche_selectionnee): + with st.expander("➕ Créer un nouveau ticket lié à cette fiche", expanded=False): + contenu_modele = charger_modele_ticket() + + if not contenu_modele: + st.error("Impossible de charger le modèle de ticket.") + return + + sections = {} + lignes = contenu_modele.splitlines() + titre_courant = None + contenu_section = [] + + for ligne in lignes: + if ligne.startswith("## ") and titre_courant: + sections[titre_courant] = "\n".join(contenu_section).strip() + titre_courant = ligne[3:].strip() + contenu_section = [] + elif ligne.startswith("## "): + titre_courant = ligne[3:].strip() + contenu_section = [] + elif titre_courant: + contenu_section.append(ligne) + + if titre_courant and contenu_section: + sections[titre_courant] = "\n".join(contenu_section).strip() + + reponses = {} + labels = [] + selected_operations = [] + + correspondances = charger_fiches_et_labels() + cible = correspondances.get(fiche_selectionnee) + if cible: + if len(cible["operations"]) == 1: + labels.append(cible["operations"][0]) + elif len(cible["operations"]) > 1: + selected_operations = st.multiselect("Labels opération à associer", cible["operations"], default=cible["operations"]) + + for section, aide in sections.items(): + if "Type de contribution" in section: + options = re.findall(r"- \[.\] (.+)", aide) + clean_options = [] + for opt in options: + base = opt.split(":")[0].strip() + if base not in clean_options: + clean_options.append(base) + if "Autre" not in clean_options: + clean_options.append("Autre") + + type_contribution = st.radio("Type de contribution", clean_options) + if type_contribution == "Autre": + autre = st.text_input("Précisez le type de contribution") + reponses[section] = autre + else: + reponses[section] = type_contribution + + elif "Fiche concernée" in section: + base_url = f"https://fabnum-git.peccini.fr/FabNum/Fiches/src/branch/{ENV}/Documents/" + url_fiche = f"{base_url}{fiche_selectionnee.replace(' ', '%20')}" + reponses[section] = url_fiche + st.text_input("Fiche concernée", value=url_fiche, disabled=True) + + elif "Sujet de la proposition" in section: + reponses[section] = st.text_input(section, help=aide) + + else: + reponses[section] = st.text_area(section, help=aide) + + col1, col2 = st.columns(2) + + with col1: + if st.button("Prévisualiser le ticket"): + st.session_state.previsualiser = True + + with col2: + if st.button("Annuler"): + st.session_state.previsualiser = False + st.rerun() + + if st.session_state.get("previsualiser", False): + st.subheader("Prévisualisation du ticket") + for section, texte in reponses.items(): + st.markdown(f"#### {section}") + st.markdown(texte) + + if st.button("Confirmer la création du ticket"): + if cible: + labels.append(cible["item"]) + if selected_operations: + labels.extend(selected_operations) + + labels = list(set([l.strip() for l in labels if l and l.strip()])) + titre_ticket = reponses.get("Sujet de la proposition", "").strip() or "Ticket FabNum" + + labels_existants = get_labels_existants() + labels_ids = [labels_existants[l] for l in labels if l in labels_existants] + + if "Backlog" in labels_existants: + labels_ids.append(labels_existants["Backlog"]) + + corps = "" + for section, texte in reponses.items(): + corps += f"## {section}\n{texte}\n\n" + + creer_ticket_gitea(titre=titre_ticket, corps=corps, labels=labels_ids) + + st.success("Formulaire vidé après création du ticket.")