Continuation dans le typage et la documentation pour les autres app's

This commit is contained in:
Fabrication du Numérique 2025-06-05 09:28:49 +02:00
parent 35aa7d12fa
commit 4d511cbe23
23 changed files with 1014 additions and 183 deletions

View File

@ -1,3 +1,5 @@
from typing import List, Tuple, Dict, Optional
import networkx as nx
import streamlit as st
from utils.translations import _
from utils.widgets import html_expander
@ -17,8 +19,20 @@ niveau_labels = {
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def preparer_graphe(G):
"""Nettoie et prépare le graphe pour l'analyse."""
def preparer_graphe(
G: nx.DiGraph,
) -> Tuple[nx.DiGraph, Dict[str, int]]:
"""
Nettoie et prépare le graphe pour l'analyse.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
Tuple[nx.DiGraph, Dict[str, int]]: Un tuple contenant :
- Le graphe NetworkX proprement configuré
- Un dictionnaire des niveaux associés aux nœuds
"""
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G.nodes(data=True)
@ -30,8 +44,15 @@ def preparer_graphe(G):
return G, niveaux_temp
def selectionner_niveaux():
"""Interface pour sélectionner les niveaux de départ et d'arrivée."""
def selectionner_niveaux(
) -> Tuple[int|None, int|None]:
"""
Interface pour sélectionner les niveaux de départ et d'arrivée.
Returns:
Tuple[int, int]: Un tuple contenant deux nombres si des nœuds ont été sélectionnés,
- None sinon
"""
st.markdown(f"## {str(_('pages.analyse.selection_nodes'))}")
valeur_defaut = str(_("pages.analyse.select_level"))
niveau_choix = [valeur_defaut] + list(niveau_labels.values())
@ -52,8 +73,23 @@ def selectionner_niveaux():
return niveau_depart_int, niveau_arrivee_int
def selectionner_minerais(G, niveau_depart, niveau_arrivee):
"""Interface pour sélectionner les minerais si nécessaire."""
def selectionner_minerais(
G: nx.DiGraph,
niveau_depart: int,
niveau_arrivee: int
) -> Optional[List[str]]:
"""
Interface pour sélectionner les minerais si nécessaire.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveau_depart (int): Le niveau de départ sélectionné.
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
Returns:
Optional[List[str]]: La liste des minerais si une sélection a été effectuée,
- None sinon
"""
minerais_selection = None
if niveau_depart < 2 < niveau_arrivee:
st.markdown(f"### {str(_('pages.analyse.select_minerals'))}")
@ -72,8 +108,25 @@ def selectionner_minerais(G, niveau_depart, niveau_arrivee):
return minerais_selection
def selectionner_noeuds(G, niveaux_temp, niveau_depart, niveau_arrivee):
"""Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée."""
def selectionner_noeuds(
G: nx.DiGraph,
niveaux_temp: Dict[str, int],
niveau_depart: int,
niveau_arrivee: int
) -> Tuple[List[str]|None, List[str]|None]:
"""
Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveaux_temp (Dict[str, int]): Dictionnaire contenant les niveaux associés aux nœuds.
niveau_depart (int): Le niveau de départ sélectionné.
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
Returns:
Optional[Tuple[List[str], List[str]]]: Un tuple contenant les listes des nœuds
- None sinon
"""
st.markdown("---")
st.markdown(f"## {str(_('pages.analyse.fine_selection'))}")
@ -93,8 +146,20 @@ def selectionner_noeuds(G, niveaux_temp, niveau_depart, niveau_arrivee):
return noeuds_depart, noeuds_arrivee
def configurer_filtres_vulnerabilite():
"""Interface pour configurer les filtres de vulnérabilité."""
def configurer_filtres_vulnerabilite(
) -> Tuple[bool, bool, bool, str, bool, str]:
"""
Interface pour configurer les filtres de vulnérabilité.
Returns:
Tuple[bool, bool, bool, str, bool, str]: Un tuple contenant :
- La possibilité de filtrer les ICS
- La possibilité de filtrer les IV C
- La possibilité de filtrer les IH H
- Le type d'application pour les IH H (pays ou acteur)
- La possibilité de filtrer les IS G
- La logique de filtrage (ou ou and)
"""
st.markdown("---")
st.markdown(f"## {str(_('pages.analyse.vulnerability_filters'))}")
@ -122,7 +187,15 @@ def configurer_filtres_vulnerabilite():
return filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage
def interface_analyse(G_temp):
def interface_analyse(
G_temp: nx.DiGraph,
) -> None:
"""
Interface utilisateur pour l'analyse des graphes.
Args:
G_temp (nx.DiGraph): Le graphe NetworkX à analyser.
"""
st.markdown(f"# {str(_('pages.analyse.title'))}")
html_expander(f"{str(_('pages.analyse.help'))}", content="\n".join(_("pages.analyse.help_content")), open_by_default=False, details_class="details_introduction")
st.markdown("---")

View File

@ -1,3 +1,4 @@
from typing import Dict, List, Tuple, Optional, Set
import streamlit as st
from networkx.drawing.nx_agraph import write_dot
import pandas as pd
@ -25,8 +26,18 @@ niveau_labels = {
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def extraire_niveaux(G):
"""Extrait les niveaux des nœuds du graphe"""
def extraire_niveaux(
G: nx.DiGraph,
) -> Dict[str, int]:
"""
Extrait les niveaux des nœuds du graphe.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
Dict[str, int]: Un dictionnaire associant chaque nœud à son niveau.
"""
niveaux = {}
for node, attrs in G.nodes(data=True):
niveau_str = attrs.get("niveau")
@ -37,8 +48,22 @@ def extraire_niveaux(G):
logging.warning(f"Niveau non entier pour le noeud {node}: {niveau_str}")
return niveaux
def extraire_ics(G, u, v):
"""Extrait la criticité d'un lien entre deux nœuds"""
def extraire_ics(
G: nx.DiGraph,
u: str,
v: str,
) -> float:
"""
Extrait la criticité d'un lien entre deux nœuds.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
u (str): L'ID du premier nœud.
v (str): L'ID du second nœud.
Returns:
float: La valeur de criticité entre les deux nœuds, ou 0 si impossible à extraire.
"""
data = G.get_edge_data(u, v)
if not data:
return 0
@ -46,8 +71,29 @@ def extraire_ics(G, u, v):
return float(data[0].get("ics", 0))
return float(data.get("ics", 0))
def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais):
"""Extrait les chemins selon les critères spécifiés"""
def extraire_chemins_selon_criteres(
G: nx.DiGraph,
niveaux: Dict[str, int],
niveau_depart: int,
noeuds_depart: Optional[List[str]],
noeuds_arrivee: Optional[List[str]],
minerais: Optional[List[str]],
) -> List[Tuple[str, ...]]:
"""
Extrait les chemins selon les critères spécifiés.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
niveau_depart (int): Le niveau de départ sélectionné.
noeuds_depart (Optional[List[str]]): Les nœuds de départ si sélectionnés.
noeuds_arrivee (Optional[List[str]]): Les nœuds d'arrivée si sélectionnés.
minerais (Optional[List[str]]): Les minerais si sélectionnés.
Returns:
List[Tuple[str, ...]]: Liste des chemins valides selon les critères spécifiés.
"""
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
@ -70,8 +116,24 @@ def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, no
return chemins
def verifier_critere_ihh(G, chemin, niveaux, ihh_type):
"""Vérifie si un chemin respecte le critère IHH (concentration géographique ou industrielle)"""
def verifier_critere_ihh(
G: nx.DiGraph,
chemin: Tuple[str, ...],
niveaux: Dict[str, int],
ihh_type: str,
) -> bool:
"""
Vérifie si un chemin respecte le critère IHH (concentration géographique ou industrielle).
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
chemin (Tuple[str, ...]): Un chemin validé selon les critères antérieurs.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
ihh_type (str): Le type d'application pour les IHH (pays ou acteur).
Returns:
bool: True si le chemin respecte le critère IHH, False sinon.
"""
ihh_field = "ihh_pays" if ihh_type == "Pays" else "ihh_acteurs"
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
@ -84,8 +146,22 @@ def verifier_critere_ihh(G, chemin, niveaux, ihh_type):
return True
return False
def verifier_critere_ivc(G, chemin, niveaux):
"""Vérifie si un chemin respecte le critère IVC (criticité par rapport à la concurrence sectorielle)"""
def verifier_critere_ivc(
G: nx.DiGraph,
chemin: Tuple[str, ...],
niveaux: Dict[str, int],
) -> bool:
"""
Vérifie si un chemin respecte le critère IVC (criticité par rapport à la concurrence sectorielle).
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
chemin (Tuple[str, ...]): Un chemin validé selon les critères antérieurs.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
Returns:
bool: True si le chemin respecte le critère IVC, False sinon.
"""
for i in range(len(chemin) - 1):
u = chemin[i]
niveau_u = niveaux.get(u)
@ -93,8 +169,22 @@ def verifier_critere_ivc(G, chemin, niveaux):
return True
return False
def verifier_critere_ics(G, chemin, niveaux):
"""Vérifie si un chemin respecte le critère ICS (criticité d'un minerai pour un composant)"""
def verifier_critere_ics(
G: nx.DiGraph,
chemin: Tuple[str, ...],
niveaux: Dict[str, int],
) -> bool:
"""
Vérifie si un chemin respecte le critère ICS (criticité d'un minerai pour un composant).
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
chemin (Tuple[str, ...]): Un chemin validé selon les critères antérieurs.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
Returns:
bool: True si le chemin respecte le critère ICS, False sinon.
"""
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
niveau_u = niveaux.get(u)
@ -106,8 +196,22 @@ def verifier_critere_ics(G, chemin, niveaux):
return True
return False
def verifier_critere_isg(G, chemin, niveaux):
"""Vérifie si un chemin contient un pays instable (ISG ≥ 60)"""
def verifier_critere_isg(
G: nx.DiGraph,
chemin: Tuple[str, ...],
niveaux: Dict[str, int],
) -> bool:
"""
Vérifie si un chemin contient un pays instable (ISG 60).
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
chemin (Tuple[str, ...]): Un chemin validé selon les critères antérieurs.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
Returns:
bool: True si le chemin contient un pays instable, False sinon.
"""
for i in range(len(chemin) - 1):
u, v = chemin[i], chemin[i + 1]
@ -120,8 +224,26 @@ def verifier_critere_isg(G, chemin, niveaux):
return True
return False
def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux):
"""Extrait les liens des chemins en respectant les niveaux"""
def extraire_liens_filtres(
chemins: List[Tuple[str, ...]],
niveaux: Dict[str, int],
niveau_depart: int,
niveau_arrivee: int,
niveaux_speciaux: List[int]
) -> Set[Tuple[str, str]]:
"""
Extrait les liens des chemins en respectant les niveaux.
Args:
chemins (List[Tuple[str, ...]]): Liste initiale des chemins validés.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
niveau_depart (int): Le niveau de départ sélectionné.
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
niveaux_speciaux (List[int]): Les niveaux spéciaux à inclure dans l'extraction.
Returns:
Set[Tuple[str, str]]: Ensemble des paires de nœuds liés après filtrage.
"""
liens = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
@ -135,9 +257,40 @@ def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, nive
liens.add((u, v))
return liens
def filtrer_chemins_par_criteres(G, chemins, niveaux, niveau_depart, niveau_arrivee,
filtrer_ics, filtrer_ivc, filtrer_ihh, ihh_type, filtrer_isg, logique_filtrage):
"""Filtre les chemins selon les critères de vulnérabilité"""
def filtrer_chemins_par_criteres(
G: nx.DiGraph,
chemins: List[Tuple[str, ...]],
niveaux: Dict[str, int],
niveau_depart: int,
niveau_arrivee: int,
filtrer_ics: bool,
filtrer_ivc: bool,
filtrer_ihh: bool,
ihh_type: str,
filtrer_isg: bool,
logique_filtrage: str,
) -> Tuple[Set[Tuple[str, str]], Set[Tuple[str, ...]]]:
"""
Filtre les chemins selon les critères de vulnérabilité.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
chemins (List[Tuple[str, ...]]): Liste initiale des chemins validés.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
niveau_depart (int): Le niveau de départ sélectionné.
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
filtrer_ics (bool): Si le filtre ICS doit être appliqué.
filtrer_ivc (bool): Si le filtre IVC doit être appliqué.
filtrer_ihh (bool): Si le filtre IHH doit être appliqué.
ihh_type (str): Le type d'application pour les IHH (pays ou acteur).
filtrer_isg (bool): Si le filtre ISG doit être appliqué.
logique_filtrage (str): La logique de filtrage (ET OU).
Returns:
Tuple[Set[Tuple[str, str]], Set[Tuple[str, ...]]]: Un tuple contenant :
- Les liens validés
- Les chemins filtrés finaux
"""
niveaux_speciaux = [1000, 1001, 1002, 1010, 1011, 1012]
# Extraction des liens sans filtrage
@ -176,8 +329,18 @@ def filtrer_chemins_par_criteres(G, chemins, niveaux, niveau_depart, niveau_arri
return liens_filtres, chemins_filtres
def couleur_ics(p):
"""Retourne la couleur en fonction du niveau de criticité"""
def couleur_ics(
p: float
) -> str:
"""
Retourne la couleur en fonction du niveau de criticité.
Args:
p (float): Valeur de criticité (entre 0 et 1).
Returns:
str: Couleur représentative du niveau de criticité.
"""
if p <= 0.33:
return "darkgreen"
elif p <= 0.66:
@ -185,8 +348,22 @@ def couleur_ics(p):
else:
return "darkred"
def edge_info(G, u, v):
"""Génère l'info-bulle pour un lien"""
def edge_info(
G: nx.DiGraph,
u: str,
v: str
) -> str:
"""
Génère l'info-bulle pour un lien.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
u (str): L'ID du premier nœud.
v (str): L'ID du second nœud.
Returns:
str: Texte à afficher dans l'info-bulle pour le lien spécifié.
"""
# Liste d'attributs à exclure des infobulles des liens
attributs_exclus = ["poids", "color", "fontcolor"]
@ -198,8 +375,30 @@ def edge_info(G, u, v):
base = [f"{k}: {v}" for k, v in data.items() if k not in attributs_exclus]
return f"{str(_('pages.analyse.sankey.relation'))} : {u}{v}<br>" + "<br>".join(base)
def preparer_donnees_sankey(G, liens_chemins, niveaux, chemins):
"""Prépare les données pour le graphique Sankey"""
def preparer_donnees_sankey(
G: nx.DiGraph,
liens_chemins: Set[Tuple[str, str]],
niveaux: Dict[str, int],
chemins: List[Tuple[str, ...]]
) -> Tuple[pd.DataFrame, List[str], List[List[str]], List[str], Dict[str, int]]:
"""
Prépare les données pour le graphique Sankey.
Args:
G (Any): Le graphe NetworkX contenant les données des produits.
liens_chemins (Set[Tuple[str, str]]): Ensemble des paires de nœuds liés.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
chemins (List[Tuple[str, ...]]): Liste initiale des chemins validés.
Returns:
Tuple[pd.DataFrame, List[str], List[List[str]], List[str], Dict[str, int]]:
Un tuple contenant :
- Le DataFrame lié aux chemins filtrés
- La liste triée et ordonnée des nœuds
- Les listes de données personnalisées pour les nœuds
- Les donnnées personnalisées pour les liens
- Le dictionnaire associant chaque nœud à son index
"""
# Liste d'attributs à exclure des infobulles des nœuds
node_attributs_exclus = ["fillcolor", "niveau"]
@ -251,8 +450,30 @@ def preparer_donnees_sankey(G, liens_chemins, niveaux, chemins):
return df_liens, sorted_nodes, customdata, link_customdata, node_indices
def creer_graphique_sankey(G, niveaux, df_liens, sorted_nodes, customdata, link_customdata, node_indices):
"""Crée et retourne le graphique Sankey"""
def creer_graphique_sankey(
G: nx.DiGraph,
niveaux: Dict[str, int],
df_liens: pd.DataFrame,
sorted_nodes: List[str],
customdata: List[str],
link_customdata: List[str],
node_indices: Dict[str, int],
) -> go.Figure:
"""
Crée et retourne le graphique Sankey.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
df_liens (pd.DataFrame): Données du DataFrame lié aux chemins filtrés.
sorted_nodes (List[str]): Liste triée et ordonnée des nœuds.
customdata (List[str]): Listes de données personnalisées pour les nœuds.
link_customdata (List[str]): Données personnalisées pour les liens.
node_indices (Dict[str, int]): Dictionnaire associant chaque nœud à son index.
Returns:
go.Figure: Le graphique Sankey final.
"""
sources = df_liens["source"].map(node_indices).tolist()
targets = df_liens["target"].map(node_indices).tolist()
values = df_liens["value"].tolist()
@ -291,8 +512,20 @@ def creer_graphique_sankey(G, niveaux, df_liens, sorted_nodes, customdata, link_
return fig
def exporter_graphe_filtre(G, liens_chemins):
"""Gère l'export du graphe filtré au format DOT"""
def exporter_graphe_filtre(
G: nx.DiGraph,
liens_chemins: Set[Tuple[str, str]]
) -> None:
"""
Gère l'export du graphe filtré au format DOT.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
liens_chemins (Set[Tuple[str, str]]): Ensemble des paires de nœuds liés.
Returns:
None
"""
if not st.session_state.get("logged_in", False) or not liens_chemins:
return
@ -321,13 +554,30 @@ def exporter_graphe_filtre(G, liens_chemins):
)
def afficher_sankey(
G,
niveau_depart, niveau_arrivee,
noeuds_depart=None, noeuds_arrivee=None,
G: nx.DiGraph,
niveau_depart: int, niveau_arrivee: int,
noeuds_depart: Optional[List[str]] = None, noeuds_arrivee: Optional[List[str]] = None,
minerais=None,
filtrer_ics=False, filtrer_ivc=False,
filtrer_ihh=False, ihh_type="Pays", filtrer_isg=False,
logique_filtrage="OU"):
filtrer_ics: bool = False, filtrer_ivc: bool = False,
filtrer_ihh: bool = False, ihh_type: str = "Pays", filtrer_isg: bool = False,
logique_filtrage: str = "OU") -> None:
"""
Fonction principale qui s'occupe de la création et de l'affichage du graphique Sankey.
Args:
G: Le graphe NetworkX contenant les données des produits.
niveau_depart, niveau_arrivee: Les niveaux initiaux pour le filtrage.
noeuds_depart, noeuds_arrivee: Les nœuds initiaux pour le filtrage.
minerais: La liste des minerais à inclure dans le filtrage.
filtrer_ics, filtrer_ivc: Les booléens pour le filtrage ICS et IVC.
filtrer_ihh: Le booléen pour le filtrage IHH.
ihh_type: Le type d'application pour les IHH (Pays ou Acteur).
filtrer_isg: Le booléen pour le filtrage ISG.
logique_filtrage: La logique de filtrage à appliquer (ET OU).
Returns:
go.Figure
"""
# Étape 1 : Extraction des niveaux des nœuds
niveaux = extraire_niveaux(G)

View File

@ -1,2 +1,4 @@
# __init__.py app/fiches
from .interface import interface_fiches
__all__ = ["interface_fiches"]

View File

@ -1,24 +1,47 @@
"""
Module de génération des fiches pour l'application.
Fonctions principales :
1. `remplacer_latex_par_mathml`
2. `markdown_to_html_rgaa`
3. `rendu_html`
4. `generer_fiche`
Toutes ces fonctions gèrent la conversion et le rendu de contenu Markdown
vers du HTML structuré avec des mathématiques, respectant les règles RGAA.
"""
import re
import os
import yaml
import markdown
from bs4 import BeautifulSoup
from latex2mathml.converter import convert as latex_to_mathml
from .utils.fiche_utils import render_fiche_markdown
import pypandoc
import streamlit as st
from .utils.dynamic import (
from app.fiches.utils import (
build_dynamic_sections,
build_ivc_sections,
build_ihh_sections,
build_isg_sections,
build_production_sections,
build_minerai_sections
build_minerai_sections,
render_fiche_markdown
)
# === Fonctions de transformation ===
def remplacer_latex_par_mathml(markdown_text):
def remplacer_latex_par_mathml(markdown_text: str) -> str:
"""
Remplace les formules LaTeX par des blocs MathML.
Args:
markdown_text (str): Texte Markdown contenant du LaTeX.
Returns:
str: Le même texte avec les formules LaTeX converties en MathML.
"""
def remplacer_bloc_display(match):
formule_latex = match.group(1).strip()
try:
@ -39,7 +62,17 @@ def remplacer_latex_par_mathml(markdown_text):
markdown_text = re.sub(r"(?<!\$)\$(.+?)\$(?!\$)", remplacer_bloc_inline, markdown_text, flags=re.DOTALL)
return markdown_text
def markdown_to_html_rgaa(markdown_text, caption_text=None):
def markdown_to_html_rgaa(markdown_text: str, caption_text: str|None) -> str:
"""
Convertit un texte Markdown en HTML structuré accessible.
Args:
markdown_text (str): Texte Markdown à convertir.
caption_text (str, optional): Titre du tableau si applicable.
Returns:
str: Le HTML structuré avec des attributs de contraintes ARIA.
"""
html = markdown.markdown(markdown_text, extensions=['tables'])
soup = BeautifulSoup(html, "html.parser")
for i, table in enumerate(soup.find_all("table"), start=1):
@ -53,7 +86,16 @@ def markdown_to_html_rgaa(markdown_text, caption_text=None):
th["scope"] = "col"
return str(soup)
def rendu_html(contenu_md):
def rendu_html(contenu_md: str) -> list[str]:
"""
Rend le contenu Markdown en HTML avec une structure spécifique.
Args:
contenu_md (str): Texte Markdown à formater.
Returns:
list[str]: Liste d'étapes de construction du HTML final.
"""
lignes = contenu_md.split('\n')
sections_n1 = []
section_n1_actuelle = {"titre": None, "intro": [], "sections_n2": {}}
@ -84,7 +126,7 @@ def rendu_html(contenu_md):
html_output.append(f"<h2>{bloc['titre']}</h2>")
if bloc["intro"]:
intro_md = remplacer_latex_par_mathml("\n".join(bloc["intro"]))
html_intro = markdown_to_html_rgaa(intro_md)
html_intro = markdown_to_html_rgaa(intro_md, None)
html_output.append(html_intro)
for sous_titre, contenu in bloc["sections_n2"].items():
contenu_md = remplacer_latex_par_mathml("\n".join(contenu))
@ -95,7 +137,25 @@ def rendu_html(contenu_md):
return html_output
def generer_fiche(md_source, dossier, nom_fichier, seuils):
def generer_fiche(md_source: str, dossier: str, nom_fichier: str, seuils: dict) -> str:
"""
Génère un document PDF et son HTML correspondant pour une fiche.
Args:
md_source (str): Texte Markdown source contenant la fiche.
dossier (str): Dossier/rubrique de destination.
nom_fichier (str): Nom du fichier (sans extension).
seuils (dict): Valeurs de seuils pour l'analyse.
Returns:
str: Chemin absolu vers le fichier HTML généré.
Notes:
Cette fonction :
- Convertit et formate les données Markdown.
- Génère un document PDF sous format XeLaTeX.
- Crée un document HTML accessible avec des mathématiques.
"""
front_match = re.match(r"(?s)^---\n(.*?)\n---\n", md_source)
context = yaml.safe_load(front_match.group(1)) if front_match else {}

View File

@ -4,20 +4,36 @@ import requests
import os
from utils.translations import _
from .utils.tickets.display import afficher_tickets_par_fiche
from .utils.tickets.creation import formulaire_creation_ticket_dynamique
from .utils.tickets.core import rechercher_tickets_gitea
from config import GITEA_TOKEN, GITEA_URL, ORGANISATION, DEPOT_FICHES, ENV, FICHES_CRITICITE
from utils.gitea import charger_arborescence_fiches
from .utils.fiche_utils import load_seuils, doit_regenerer_fiche
from .generer import generer_fiche
from utils.widgets import html_expander
def interface_fiches():
from app.fiches.utils import (
afficher_tickets_par_fiche,
formulaire_creation_ticket_dynamique,
rechercher_tickets_gitea,
load_seuils,
doit_regenerer_fiche
)
from app.fiches.generer import generer_fiche
def interface_fiches() -> None:
"""
Affiche l'interface utilisateur des fiches.
Parameters
----------
Aucun
Notes
-----
Cette fonction initialise l'interface utilisateur qui permet aux utilisateurs d'afficher,
visualiser et interagir avec les fiches. Elle gère également :
- Le chargement de l'arborescence des fiches depuis Gitea.
- La navigation entre différentes catégories de fiches.
- L'affichage des tickets associés aux fiches.
- Le téléchargement des PDF (si disponible).
"""
st.markdown(f"# {str(_('pages.fiches.title'))}")
html_expander(f"{str(_('pages.fiches.help'))}", content="\n".join(_("pages.fiches.help_content")), open_by_default=False, details_class="details_introduction")
st.markdown("---")

View File

@ -0,0 +1,31 @@
from .tickets.display import afficher_tickets_par_fiche
from .tickets.creation import formulaire_creation_ticket_dynamique
from .tickets.core import rechercher_tickets_gitea
from .fiche_utils import (
load_seuils,
doit_regenerer_fiche
)
from .dynamic import (
build_dynamic_sections,
build_ivc_sections,
build_ihh_sections,
build_isg_sections,
build_production_sections,
build_minerai_sections
)
from .fiche_utils import render_fiche_markdown
__all__ = [
"afficher_tickets_par_fiche",
"formulaire_creation_ticket_dynamique",
"rechercher_tickets_gitea",
"load_seuils",
"doit_regenerer_fiche",
"build_dynamic_sections",
"build_ivc_sections",
"build_ihh_sections",
"build_isg_sections",
"build_production_sections",
"build_minerai_sections",
"render_fiche_markdown"
]

View File

@ -7,7 +7,22 @@ import streamlit as st
from config import FICHES_CRITICITE
def build_production_sections(md: str) -> str:
"""
Procédure pour construire et remplacer les sections des fiches de production.
Cette fonction permet d'extraire les données du markdown, organiser
les informations sur les pays d'implantation et acteurs, puis générer
un tableau structuré dans l'intervention. Le code prend en charge
deux types de fiches : fabrication et assemblage.
Args:
md (str): Fichier Markdown à traiter contenant la structure YAML des sections.
Returns:
str: Markdown modifié avec les tableaux construits selon le type de fiche.
"""
schema = None
type_fiche = None
front_match = re.match(r"(?s)^---\n(.*?)\n---\n", md)
if front_match:
try:
@ -23,10 +38,10 @@ def build_production_sections(md: str) -> str:
yaml_block = re.search(r"```yaml\n(.+?)\n```", md, re.DOTALL)
if not yaml_block:
return md
# Capture le bloc YAML complet pour le supprimer plus tard
yaml_block_full = yaml_block.group(0)
try:
yaml_data = yaml.safe_load(yaml_block.group(1))
except Exception as e:
@ -133,7 +148,7 @@ def build_production_sections(md: str) -> str:
st.warning(f"Aucune section IHH trouvée pour le schéma {schema} dans la fiche technique IHH.")
except Exception as e:
st.error(f"Erreur lors de la lecture/traitement de la fiche IHH: {e}")
# Supprimer le bloc YAML du markdown final
md_modifie = md_modifie.replace(yaml_block_full, "")

View File

@ -1,14 +1,33 @@
# pastille.py
from typing import Any
PASTILLE_ICONS = {
"vert": "",
"orange": "🔶",
"rouge": "🔴"
}
def pastille(indice: str, valeur: Any, seuils: dict = None) -> str:
def pastille(indice: str, valeur: str, seuils: dict) -> str:
"""Renvoie l'icône Unicode correspondante à la pastille en fonction de sa valeur et des seuils.
La pastille prend une couleur (vert, orange ou rouge) selon la valeur
de l'indicateur par rapport aux seuils définis. Les icônes sont définies
comme des emojis Unicode pour faciliter leur affichage dans les interfaces
interactives comme Streamlit.
Args:
indice (str): Clé permettant d'accéder au seuil spécifique.
Exemples de valeurs possibles : "criticite", "confort".
valeur (Any): Valeur numérique à comparer aux seuils.
Généralement une float, mais peut être convertie automatiquement
en nombre si possible.
seuils (dict, optional): Dictionnaire des seuils pour chaque indicateur.
Si None, les valeurs par défaut sont utilisées selon l'état de session
Stocké dans Streamlit.
Returns:
str: Une icône Unicode correspondante à la pastille. Si aucune icône n'est définie,
une chaîne vide est renvoyée.
"""
try:
import streamlit as st
seuils = seuils or st.session_state.get("seuils", {})

View File

@ -12,22 +12,35 @@ Usage :
"""
from __future__ import annotations
import os
import frontmatter, yaml, jinja2, re, pathlib
from typing import Dict
from datetime import datetime, timezone
import os
from utils.gitea import recuperer_date_dernier_commit
def load_seuils(path: str | pathlib.Path = "config/indices_seuils.yaml") -> Dict:
"""Charge le fichier YAML des seuils et renvoie le dict 'seuils'."""
"""Charge le fichier YAML des seuils et renvoie le dict 'seuils'.
Args:
path (str | pathlib.Path, optional): Chemin vers le fichier des seuils. Defaults to "config/indices_seuils.yaml".
Returns:
Dict: Dictionnaire contenant les seuils.
"""
data = yaml.safe_load(pathlib.Path(path).read_text(encoding="utf-8"))
return data.get("seuils", {})
def _migrate_metadata(meta: Dict) -> Dict:
"""Normalise les clés YAML (ex : sheet_type → type_fiche)."""
"""Normalise les clés YAML (ex : sheet_type → type_fiche).
Args:
meta (Dict): Métadonnées à normaliser.
Returns:
Dict: Métadonnées normalisées.
"""
keymap = {
"sheet_type": "type_fiche",
"indice_code": "indice_court", # si besoin
@ -37,12 +50,24 @@ def _migrate_metadata(meta: Dict) -> Dict:
meta[new] = meta.pop(old)
return meta
def render_fiche_markdown(
md_text: str,
seuils: Dict,
license_path: str = "assets/licence.md"
) -> str:
"""Renvoie la fiche rendue (Markdown) avec les placeholders remplacés et le tableau de version.
def render_fiche_markdown(md_text: str, seuils: Dict, license_path: str = "assets/licence.md") -> str:
"""Renvoie la fiche rendue (Markdown) :
placeholders Jinja2 remplacés ({{ }})
table seuils injectée via dict 'seuils'.
- licence ajoutée après le tableau de version et avant le premier titre de niveau 2
Args:
md_text (str): Contenu Markdown brut.
seuils (Dict): Tableau des versions.
license_path (str, optional): Chemin vers le fichier de licence. Defaults to "assets/licence.md".
Returns:
str: Fiche Markdown rendue avec les placeholders remplacés et la table de version.
Note:
- Les licences sont ajoutées après le tableau de version.
- Les titres de niveau 2 doivent être présents pour l'insertion automatique de licence.
"""
post = frontmatter.loads(md_text)
meta = _migrate_metadata(dict(post.metadata))
@ -68,11 +93,11 @@ def render_fiche_markdown(md_text: str, seuils: Dict, license_path: str = "asset
# Charger le contenu de la licence
try:
license_content = pathlib.Path(license_path).read_text(encoding="utf-8")
# Insérer la licence après le tableau de version et avant le premier titre h2
# Trouver la position du premier titre h2
h2_match = re.search(r"^## ", rendered_body, flags=re.M)
if h2_match:
h2_position = h2_match.start()
rendered_body = f"{rendered_body[:h2_position]}\n\n{license_content}\n\n{rendered_body[h2_position:]}"
@ -81,19 +106,51 @@ def render_fiche_markdown(md_text: str, seuils: Dict, license_path: str = "asset
rendered_body = f"{rendered_body}\n\n{license_content}"
except Exception as e:
# En cas d'erreur lors de la lecture du fichier de licence, continuer sans l'ajouter
import streamlit as st
st.error(e)
pass
return rendered_body
def fichier_plus_recent(chemin_fichier, reference):
def fichier_plus_recent(
chemin_fichier: str|None,
reference: datetime
) -> bool:
"""Vérifie si un fichier est plus récent que la référence donnée.
Args:
chemin_fichier (str): Chemin vers le fichier à vérifier.
reference (datetime): Référence temporelle de comparaison.
Returns:
bool: True si le fichier est plus récent, False sinon.
"""
try:
modif = datetime.fromtimestamp(os.path.getmtime(chemin_fichier), tz=timezone.utc)
return modif > reference
except Exception:
return False
def doit_regenerer_fiche(html_path, fiche_type, fiche_choisie, commit_url, fichiers_criticite):
def doit_regenerer_fiche(
html_path: str,
fiche_type: str,
fiche_choisie: str,
commit_url: str,
fichiers_criticite: Dict[str, str]
) -> bool:
"""Détermine si une fiche doit être regénérée.
Args:
html_path (str): Chemin vers le fichier HTML.
fiche_type (str): Type de la fiche.
fiche_choisie (str): Nom choisi pour la fiche.
commit_url (str): URL du dernier commit.
fichiers_criticite (Dict[str, str]): Dictionnaire des fichiers critiques.
Returns:
bool: True si la fiche doit être regénérée, False sinon.
"""
if not os.path.exists(html_path):
return True

View File

@ -1,3 +1,4 @@
from typing import List, Optional, Tuple, Dict, Set
import streamlit as st
import networkx as nx
from utils.translations import _
@ -23,8 +24,20 @@ niveau_labels = {
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def preparer_graphe(G):
"""Nettoie et prépare le graphe pour l'analyse."""
def preparer_graphe(
G: nx.DiGraph,
) -> Tuple[nx.DiGraph, Dict[str, int]]:
"""
Nettoie et prépare le graphe pour l'analyse.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
Tuple[nx.DiGraph, Dict[str, int]]: Un tuple contenant :
- Le graphe NetworkX proprement configuré
- Un dictionnaire des niveaux associés aux nœuds
"""
niveaux_temp = {
node: int(str(attrs.get("niveau")).strip('"'))
for node, attrs in G.nodes(data=True)
@ -36,8 +49,19 @@ def preparer_graphe(G):
return G, niveaux_temp
def selectionner_minerais(G):
"""Interface pour sélectionner les minerais si nécessaire."""
def selectionner_minerais(
G: nx.DiGraph,
) -> Optional[List[str]]:
"""
Interface pour sélectionner les minerais si nécessaire.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
Optional[List[str]]: La liste des minerais si une sélection a été effectuée,
- None sinon
"""
minerais_selection = None
st.markdown(f"## {str(_('pages.ia_nalyse.select_minerals'))}")
@ -56,8 +80,25 @@ def selectionner_minerais(G):
return minerais_selection
def selectionner_noeuds(G, niveaux_temp, niveau_depart):
"""Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée."""
def selectionner_noeuds(
G: nx.DiGraph,
niveaux_temp: Dict[str, int],
niveau_depart: int,
) -> Tuple[Optional[List[str]], List[str]]:
"""
Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveaux_temp (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
niveau_depart (int): Le niveau de départ sélectionné.
Returns:
Tuple[Optional[List[str]], List[str]]: Un tuple contenant :
- La liste des nœuds de départ si une sélection a été effectuée,
- None sinon
- La liste des nœuds d'arrivée
"""
st.markdown("---")
st.markdown(f"## {str(_('pages.ia_nalyse.fine_selection'))}")
@ -72,8 +113,18 @@ def selectionner_noeuds(G, niveaux_temp, niveau_depart):
return noeuds_depart, noeuds_arrivee
def extraire_niveaux(G):
"""Extrait les niveaux des nœuds du graphe"""
def extraire_niveaux(
G: nx.DiGraph,
) -> Dict[str, int]:
"""
Extrait les niveaux des nœuds du graphe.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
Dict[str, int]: Un dictionnaire associant chaque nœud à son niveau.
"""
niveaux = {}
for node, attrs in G.nodes(data=True):
niveau_str = attrs.get("niveau")
@ -81,8 +132,28 @@ def extraire_niveaux(G):
niveaux[node] = int(str(niveau_str).strip('"'))
return niveaux
def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, noeuds_arrivee, minerais):
"""Extrait les chemins selon les critères spécifiés"""
def extraire_chemins_selon_criteres(
G: nx.DiGraph,
niveaux: Dict[str, int],
niveau_depart: int,
noeuds_depart: Optional[List[str]],
noeuds_arrivee: Optional[List[str]],
minerais: Optional[List[str]],
) -> List[Tuple[str, ...]]:
"""
Extrait les chemins selon les critères spécifiés.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
niveau_depart (int): Le niveau de départ sélectionné.
noeuds_depart (Optional[List[str]]): Les nœuds de départ si sélectionnés.
noeuds_arrivee (Optional[List[str]]): Les nœuds d'arrivée si sélectionnés.
minerais (Optional[List[str]]): Les minerais si sélectionnés.
Returns:
List[Tuple[str, ...]]: Liste des chemins valides selon les critères spécifiés.
"""
chemins = []
if noeuds_depart and noeuds_arrivee:
for nd in noeuds_depart:
@ -105,8 +176,21 @@ def extraire_chemins_selon_criteres(G, niveaux, niveau_depart, noeuds_depart, no
return chemins
def exporter_graphe_filtre(G, liens_chemins):
"""Gère l'export du graphe filtré au format DOT"""
def exporter_graphe_filtre(
G: nx.DiGraph,
liens_chemins: Set[Tuple[str, str]],
) -> nx.DiGraph|None:
"""
Gère l'export du graphe filtré au format DOT.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
liens_chemins (Set[Tuple[str, str]]): Ensemble des paires de nœuds liés.
Returns:
nx.DiGraph: le graphe exporté
- Sinon aucun résultat (None)
"""
if not st.session_state.get("logged_in", False) or not liens_chemins:
return
@ -124,8 +208,26 @@ def exporter_graphe_filtre(G, liens_chemins):
return(G_export)
def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, niveaux_speciaux):
"""Extrait les liens des chemins en respectant les niveaux"""
def extraire_liens_filtres(
chemins: List[Tuple[str, ...]],
niveaux: Dict[str, int],
niveau_depart: int,
niveau_arrivee: int,
niveaux_speciaux: List[int]
) -> Set[Tuple[str, str]]:
"""
Extrait les liens des chemins en respectant les niveaux.
Args:
chemins (List[Tuple[str, ...]]): Liste initiale des chemins validés.
niveaux (Dict[str, int]): Dictionnaire associant chaque nœud à son niveau.
niveau_depart (int): Le niveau de départ sélectionné.
niveau_arrivee (int): Le niveau d'arrivée sélectionné.
niveaux_speciaux (List[int]): Les niveaux spéciaux à inclure dans l'extraction.
Returns:
Set[Tuple[str, str]]: Ensemble des paires de nœuds liés après filtrage.
"""
liens = set()
for chemin in chemins:
for i in range(len(chemin) - 1):
@ -139,15 +241,27 @@ def extraire_liens_filtres(chemins, niveaux, niveau_depart, niveau_arrivee, nive
liens.add((u, v))
return liens
def interface_ia_nalyse(G_temp):
def interface_ia_nalyse(
G_temp: nx.DiGraph,
) -> None:
"""
Fonction principale qui s'occupe de la création du graphe pour analyse.
Args:
G_temp (nx.DiGraph): Le graphe NetworkX contenant les données des produits.
Returns:
None
"""
st.markdown(f"# {str(_('pages.ia_nalyse.title'))}")
html_expander(f"{str(_('pages.ia_nalyse.help'))}", content="\n".join(_("pages.ia_nalyse.help_content")), open_by_default=False, details_class="details_introduction")
st.markdown("---")
resultat = statut_utilisateur(st.session_state.username)
st.info(resultat["message"])
if resultat:
st.info(resultat["message"])
if resultat["statut"] is None:
if resultat and resultat["statut"] is None:
# Préparation du graphe
G_temp, niveaux_temp = preparer_graphe(G_temp)
@ -179,7 +293,7 @@ def interface_ia_nalyse(G_temp):
else:
st.info(str(_("pages.ia_nalyse.empty_graph")))
elif resultat["statut"] == "terminé" and resultat["telechargement"]:
elif resultat and resultat["statut"] == "terminé" and resultat["telechargement"]:
if not st.session_state.get("telechargement_confirme"):
st.download_button(str(_("buttons.download")), resultat["telechargement"], file_name="analyse.zip", icon=":material/download:")
if st.button(str(_("pages.ia_nalyse.confirm_download")), icon=":material/task_alt:"):

View File

@ -1,7 +1,21 @@
import streamlit as st
import networkx as nx
from utils.translations import _
def ajouter_produit(G):
def ajouter_produit(G: nx.DiGraph) -> nx.DiGraph:
"""
Ajoute un produit personnalisé dans le graphe en cours.
Args:
G (nx.DiGraph): graphe en cours.
Returns:
nx.DiGraph: le graphe avec le nouveau produit
Notes:
Cette fonction ajoute un nouveau produit final temporaire
au graphe de référence.
"""
st.markdown(f"## {str(_('pages.personnalisation.add_new_product'))}")
new_prod = st.text_input(str(_("pages.personnalisation.new_product_name")), key="new_prod")
if new_prod:

View File

@ -1,8 +1,9 @@
import streamlit as st
import json
from utils.translations import get_translation as _
import networkx as nx
def importer_exporter_graph(G):
def importer_exporter_graph(G: nx.DiGraph) -> nx.DiGraph:
st.markdown(f"## {_('pages.personnalisation.save_restore_config')}")
if st.button(str(_("pages.personnalisation.export_config")), icon=":material/save:"):
nodes = [n for n, d in G.nodes(data=True) if d.get("personnalisation") == "oui"]

View File

@ -1,39 +1,122 @@
from typing import List
import streamlit as st
from utils.translations import _
import networkx as nx
def get_produits_personnalises(G):
"""Récupère la liste des produits personnalisés du niveau 0."""
def get_produits_personnalises(
G: nx.DiGraph
) -> List[str]:
"""
Récupère la liste des produits personnalisés du niveau 0.
Args:
G (Any): Le graphe NetworkX contenant les données des produits.
Returns:
List[str]: Liste triée des noms de produits.
"""
return sorted([n for n, d in G.nodes(data=True) if d.get("niveau") == "0" and d.get("personnalisation") == "oui"])
def supprimer_produit(G, prod):
"""Supprime un produit du graphe."""
def supprimer_produit(
G: nx.DiGraph,
prod: str
) -> nx.DiGraph:
"""
Supprime un produit du graphe et affiche le message de succès.
Args:
G (Any): Le graphe NetworkX sur lequel supprimer le produit.
prod (str): Le nom du produit à supprimer.
"""
G.remove_node(prod)
st.success(f"{prod} {str(_('pages.personnalisation.deleted'))}")
st.session_state.pop("prod_sel", None)
return G
def get_operations_disponibles(G):
"""Récupère la liste des opérations d'assemblage disponibles."""
def get_operations_disponibles(
G: nx.DiGraph
) -> List[str]:
"""
Récupère la liste des opérations d'assemblage disponibles.
Args:
G (Any): Le graphe NetworkX contenant les données des produits et des opérations.
Returns:
List[str]: Liste triée des noms des opérations.
"""
return sorted([
n for n, d in G.nodes(data=True)
if d.get("niveau") == "10"
and any(G.has_edge(p, n) and G.nodes[p].get("niveau") == "0" for p in G.predecessors(n))
])
def get_operations_actuelles(G, prod):
"""Récupère les opérations actuellement liées au produit."""
def get_operations_actuelles(
G: nx.DiGraph,
prod: str
) -> List[str]:
"""
Récupère les opérations actuellement liées au produit.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits et des opérations.
prod (str): Le nom du produit dont récupérer les opérations.
Returns:
List[str]: Liste des noms des opérations actuelles.
"""
return [succ for succ in G.successors(prod) if G.nodes[succ].get("niveau") == "10"]
def get_composants_niveau1(G):
"""Récupère la liste des composants de niveau 1."""
def get_composants_niveau1(
G: nx.DiGraph
) -> List[str]:
"""
Récupère la liste des composants de niveau 1.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits et des composants.
Returns:
List[str]: Liste triée des noms des composants.
"""
return sorted([n for n, d in G.nodes(data=True) if d.get("niveau") == "1"])
def get_composants_lies(G, prod):
"""Récupère les composants actuellement liés au produit."""
def get_composants_lies(
G: nx.DiGraph,
prod: str
) -> List[str]:
"""
Récupère les composants actuellement liés au produit.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits et des composants.
prod (str): Le nom du produit dont récupérer les composants.
Returns:
List[str]: Liste des noms des composants liés.
"""
return [succ for succ in G.successors(prod) if G.nodes[succ].get("niveau") == "1"]
def mettre_a_jour_operations(G, prod, curr_ops, sel_op):
"""Met à jour les opérations liées au produit."""
def mettre_a_jour_operations(
G: nx.DiGraph,
prod: str,
curr_ops: List[str],
sel_op: str
) -> nx.DiGraph:
"""
Met à jour les opérations liées au produit.
Args:
G (Any): Le graphe NetworkX contenant les données des produits et des opérations.
prod (str): Le nom du produit dont mettre à jour les opérations.
curr_ops (List[str]): Liste actuelle des opérations liées.
sel_op (str): L'opération sélectionnée pour mise à jour.
Notes:
Cette fonction crée ou supprime les liens entre le produit et les opérations
selon la sélection effectuée par l'utilisateur.
"""
none_option = str(_("pages.personnalisation.none", "-- Aucune --"))
for op in curr_ops:
if sel_op == none_option or op != sel_op:
@ -42,7 +125,25 @@ def mettre_a_jour_operations(G, prod, curr_ops, sel_op):
G.add_edge(prod, sel_op)
return G
def mettre_a_jour_composants(G, prod, linked, nouveaux):
def mettre_a_jour_composants(
G: nx.DiGraph,
prod: str,
linked: List[str],
nouveaux: List[str]
) -> nx.DiGraph:
"""
Met à jour les composants liés au produit.
Args:
G (nx.DiGraph): Le graphe NetworkX contenant les données des produits et des composants.
prod (str): Le nom du produit dont mettre à jour les composants.
linked (List[str]): Liste actuelle des composants liés.
nouveaux (List[str]): Nouvelle liste de composants à lier.
Notes:
Cette fonction crée ou supprime les liens entre le produit et les composants
selon la sélection effectuée par l'utilisateur.
"""
"""Met à jour les composants liés au produit."""
for comp in set(linked) - set(nouveaux):
G.remove_edge(prod, comp)
@ -50,7 +151,23 @@ def mettre_a_jour_composants(G, prod, linked, nouveaux):
G.add_edge(prod, comp)
return G
def modifier_produit(G):
def modifier_produit(
G: nx.DiGraph
) -> nx.DiGraph:
"""
Méthode de personnalisation qui permet à l'utilisateur d'ajuster un produit.
Args:
G (Any): Le graphe NetworkX sur lequel modifier les produits et leurs composants.
Contient des données concernant la personalisation des produits,
leur niveau, et les opérations liées.
Notes:
Cette fonction fournit une interface utilisateur pour sélectionner
un produit à personnaliser, gérer ses composants, et définir ses opérations
d'assemblage. Elle implémente la logique de mise à jour des connexions entre
les différents éléments du graphe.
"""
st.markdown(f"## {str(_('pages.personnalisation.modify_product'))}")
# Sélection du produit à modifier

View File

@ -40,11 +40,11 @@ from app.plan_d_action import (
inverse_niveau_labels = {v: k for k, v in niveau_labels.items()}
def interface_plan_d_action(G_temp: nx.Graph) -> None:
def interface_plan_d_action(G_temp: nx.DiGraph) -> None:
"""Interface pour planifier l'action de sélection des minerais.
Args:
G_temp (nx.Graph): Le graphe temporaire à analyser.
G_temp (nx.DiGraph): Le graphe temporaire à analyser.
Returns:
None: Modifie le state du Streamlit avec les données nécessaires

View File

@ -1,14 +1,14 @@
from typing import Dict, Tuple, Union, List, Set
from typing import Dict, Tuple, Union, List
import networkx as nx
def exporter_graphe_filtre(
G: nx.Graph,
G: nx.DiGraph,
liens_chemins: List[Tuple[Union[str, int], Union[str, int]]]
) -> nx.Graph:
) -> nx.DiGraph:
"""Gère l'export du graphe filtré au format DOT.
Args:
G (nx.Graph): Le graphe d'origine à exporter.
G (nx.DiGraph): Le graphe d'origine à exporter.
liens_chemins (list): Liste des tuples contenant les paires de nœuds reliées
par un chemin dans le graphe, avec leurs attributs associés.

View File

@ -1,11 +1,11 @@
from typing import Dict
import networkx as nx
def extraire_niveaux(G: nx.Graph) -> Dict[str | int, int]:
def extraire_niveaux(G: nx.DiGraph) -> Dict[str | int, int]:
"""Extrait les niveaux des nœuds du graphe.
Args:
G (nx.Graph): Le graphe d'origine à analyser.
G (nx.DiGraph): Le graphe d'origine à analyser.
Returns:
dict: Un dictionnaire associant chaque nœud au niveau correspondant.

View File

@ -1,11 +1,11 @@
from typing import Dict, Tuple, Union
import networkx as nx
def preparer_graphe(G: nx.Graph) -> Tuple[nx.Graph, Dict[Union[str, int], int]]:
def preparer_graphe(G: nx.DiGraph) -> Tuple[nx.DiGraph, Dict[Union[str, int], int]]:
"""Nettoie et prépare le graphe pour l'analyse.
Args:
G (nx.Graph): Le graphe d'origine à nettoyer.
G (nx.DiGraph): Le graphe d'origine à nettoyer.
Returns:
tuple: Un tuple contenant le graphe nettoyé et les niveaux temporels associés

View File

@ -8,11 +8,11 @@ from utils.graph_utils import (
extraire_chemins_vers
)
def selectionner_minerais(G: nx.Graph, noeuds_depart: List[Any]) -> List[Union[str, int]]:
def selectionner_minerais(G: nx.DiGraph, noeuds_depart: List[Any]) -> List[Union[str, int]]:
"""Interface pour sélectionner les minerais si nécessaire.
Args:
G (nx.Graph): Le graphe des relations entre les nœuds.
G (nx.DiGraph): Le graphe des relations entre les nœuds.
noeuds_depart (list): Les nœuds de départ qui doivent être considérés.
Returns:
@ -42,14 +42,14 @@ def selectionner_minerais(G: nx.Graph, noeuds_depart: List[Any]) -> List[Union[s
return minerais_selection
def selectionner_noeuds(
G: nx.Graph,
G: nx.DiGraph,
niveaux_temp: Dict[Union[str, int], int],
niveau_depart: int
) -> Tuple[Optional[List[Union[str, int]]], List[Union[str, int]]]:
"""Interface pour sélectionner les nœuds spécifiques de départ et d'arrivée.
Args:
G (nx.Graph): Le graphe des relations entre les nœuds.
G (nx.DiGraph): Le graphe des relations entre les nœuds.
niveaux_temp (dict): Dictionnaire contenant les niveaux des nœuds.
niveau_depart (int): Niveau à partir duquel commencer la sélection.
@ -71,7 +71,7 @@ def selectionner_noeuds(
return noeuds_depart, noeuds_arrivee
def extraire_chemins_selon_criteres(
G: nx.Graph,
G: nx.DiGraph,
niveaux: Dict[str | int, int],
niveau_depart: int,
noeuds_depart: Optional[List[Union[str, int]]],
@ -81,7 +81,7 @@ def extraire_chemins_selon_criteres(
"""Extrait les chemins selon les critères spécifiés.
Args:
G (nx.Graph): Le graphe des relations entre les nœuds.
G (nx.DiGraph): Le graphe des relations entre les nœuds.
niveaux (dict): Dictionnaire contenant les niveaux des nœuds.
niveau_depart (int): Niveau à partir duquel commencer la sélection.
noeuds_depart (list, optional): Les nœuds de départ qui doivent être considérés.

View File

@ -1,3 +1,5 @@
from typing import List, Dict, Optional, Any
import networkx as nx
import streamlit as st
import altair as alt
import numpy as np
@ -6,7 +8,17 @@ import pandas as pd
from utils.translations import _
def afficher_graphique_altair(df):
def afficher_graphique_altair(df: pd.DataFrame) -> None:
"""
Affiche un graphique Altair pour les données d'IHH.
Args:
df (pd.DataFrame): DataFrame contenant les données de IHH.
Notes:
Cette fonction crée un graphique interactif pour visualiser les
données d'IHH selon différentes catégories et niveaux de gravité.
"""
# Définir les catégories originales (en français) et leur ordre
categories_fr = ["Assemblage", "Fabrication", "Traitement", "Extraction"]
@ -89,7 +101,20 @@ def afficher_graphique_altair(df):
st.altair_chart(chart, use_container_width=True)
def creer_graphes(donnees):
def creer_graphes(donnees: Optional[List[Dict[str, Any]]]) -> None:
"""
Crée un graphique Altair pour les données d'IVC.
Args:
donnees (Optional[List[Dict[str, Any]]]): Liste des données d'IVC.
Returns:
None.
Notes:
Cette fonction traite les données d'IVC et crée un graphique
interactif pour visualiser la concentration des ressources.
"""
if not donnees:
st.warning(str(_("pages.visualisations.no_data")))
return
@ -162,7 +187,17 @@ def creer_graphes(donnees):
st.error(f"{str(_('errors.graph_creation_error'))} {e}")
def lancer_visualisation_ihh_ics(graph):
def lancer_visualisation_ihh_ics(graph: nx.DiGraph) -> None:
"""
Lance une visualisation Altair pour les données d'IHH critique.
Args:
graph (nx.DiGraph): Le graphe NetworkX contenant les données de IHH.
Notes:
Cette fonction traite le graphe et crée un graphique Altair
pour visualiser les données d'IHH critique.
"""
try:
import networkx as nx
from utils.graph_utils import recuperer_donnees
@ -180,7 +215,17 @@ def lancer_visualisation_ihh_ics(graph):
st.error(f"{str(_('errors.ihh_criticality_error'))} {e}")
def lancer_visualisation_ihh_ivc(graph):
def lancer_visualisation_ihh_ivc(graph: nx.DiGraph) -> None:
"""
Lance une visualisation Altair pour les données d'IVC.
Args:
graph (Annx.Graphy): Le graphe NetworkX contenant les données de IV C.
Notes:
Cette fonction traite le graphe et crée un graphique Altair
pour visualiser les données d'IV C.
"""
try:
from utils.graph_utils import recuperer_donnees_2
noeuds_niveau_2 = [

View File

@ -1,6 +1,7 @@
import streamlit as st
from utils.widgets import html_expander
from utils.translations import _
import networkx as nx
from .graphes import (
lancer_visualisation_ihh_ics,
@ -8,7 +9,23 @@ from .graphes import (
)
def interface_visualisations(G_temp, G_temp_ivc):
def interface_visualisations(G_temp: nx.DiGraph, G_temp_ivc: nx.DiGraph) -> None:
"""
Affiche l'interface utilisateur des visualisations.
Parameters
----------
G_temp : object
Graphique temporel contenant les données de IHH.
G_temp_ivc : object
Graphique temporel contenant les données d'IVC.
Notes
-----
Cette fonction initialise l'interface utilisateur qui permet aux utilisateurs de visualiser
différentes données relatives à la gravité et au risque d'infections.
Elle gère également le traitement des erreurs liées aux graphiques temporels IHH et IV C.
"""
st.markdown(f"# {str(_('pages.visualisations.title'))}")
html_expander(f"{str(_('pages.visualisations.help'))}", content="\n".join(_("pages.visualisations.help_content")), open_by_default=False, details_class="details_introduction")
st.markdown("---")

View File

@ -163,7 +163,7 @@
"select_minerals": "Select one or more minerals",
"filter_by_minerals": "Filter by minerals (optional, but highly recommended)",
"fine_selection": "End product selection",
"filter_start_nodes": "Filter by start nodes (optional, but recommended)",
"filter_start_nodes": "Filter by start nodes",
"run_analysis": "Run analysis",
"confirm_download": "Confirm download",
"submit_request": "Submit your request",

View File

@ -163,7 +163,7 @@
"select_minerals": "Sélectionner un ou plusieurs minerais",
"filter_by_minerals": "Filtrer par minerais (optionnel, mais recommandé)",
"fine_selection": "Sélection des produits finaux",
"filter_start_nodes": "Filtrer par noeuds de départ (optionnel, mais recommandé)",
"filter_start_nodes": "Filtrer par noeuds de départ",
"run_analysis": "Lancer l'analyse",
"confirm_download": "Confirmer le téléchargement",
"submit_request": "Soumettre votre demande",

View File

@ -14,7 +14,7 @@ import argparse
import logging
import requests
from pathlib import Path
from typing import List, Set, Dict, Any, Optional
from typing import List, Set, Dict, Any
from datetime import datetime
# Configuration du logging
@ -36,12 +36,12 @@ DEFAULT_SUPPORTED_EXTENSIONS = {
class PrivateGPTIngestor:
"""Classe pour gérer l'ingestion de fichiers dans Private GPT."""
def __init__(self, api_url: str = "http://localhost:8001",
def __init__(self, api_url: str = "http://localhost:8001",
processed_file: str = "processed_files.json"):
"""
Initialise l'ingesteur.
Args:
api_url: URL de l'API Private GPT
processed_file: Fichier pour stocker les fichiers déjà traités
@ -60,7 +60,7 @@ class PrivateGPTIngestor:
except Exception as e:
logger.error(f"Erreur lors du chargement des fichiers traités: {e}")
return set()
def _save_processed_files(self) -> None:
"""Sauvegarde la liste des fichiers déjà traités."""
try:
@ -68,32 +68,32 @@ class PrivateGPTIngestor:
json.dump(list(self.processed_files), f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des fichiers traités: {e}")
def scan_directory(self, directory: str, extensions: Set[str] = None,
def scan_directory(self, directory: str, extensions: Set[str] = None,
recursive: bool = True) -> List[str]:
"""
Scanne un répertoire pour trouver des fichiers à injecter.
Args:
directory: Le répertoire à scanner
extensions: Extensions de fichiers à prendre en compte
recursive: Si True, scanne les sous-répertoires
Returns:
Liste des chemins des fichiers à injecter
"""
if extensions is None:
extensions = DEFAULT_SUPPORTED_EXTENSIONS
files_to_ingest = []
directory_path = Path(directory)
if not directory_path.exists():
logger.error(f"Le répertoire {directory} n'existe pas")
return []
logger.info(f"Scan du répertoire {directory}")
# Fonction de scan
def scan_dir(path: Path):
for item in path.iterdir():
@ -103,28 +103,28 @@ class PrivateGPTIngestor:
files_to_ingest.append(abs_path)
elif item.is_dir() and recursive:
scan_dir(item)
scan_dir(directory_path)
logger.info(f"Trouvé {len(files_to_ingest)} fichiers à injecter")
return files_to_ingest
def ingest_file(self, file_path: str) -> bool:
"""
Injecte un fichier dans Private GPT via l'API.
Args:
file_path: Chemin du fichier à injecter
Returns:
True si l'injection a réussi, False sinon
"""
logger.info(f"Injection du fichier: {file_path}")
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
response = requests.post(f"{self.api_url}/v1/ingest/file", files=files)
if response.status_code == 200:
logger.info(f"Injection réussie pour {file_path}")
self.processed_files.add(file_path)
@ -136,11 +136,11 @@ class PrivateGPTIngestor:
except Exception as e:
logger.error(f"Erreur lors de l'injection de {file_path}: {e}")
return False
def list_documents(self) -> List[Dict[str, Any]]:
"""
Liste les documents déjà injectés dans Private GPT.
Returns:
Liste des documents injectés
"""
@ -154,13 +154,13 @@ class PrivateGPTIngestor:
except Exception as e:
logger.error(f"Erreur lors de la récupération des documents: {e}")
return []
def run_ingestion(self, directory: str, extensions: Set[str] = None,
recursive: bool = True, batch_size: int = 5,
def run_ingestion(self, directory: str, extensions: Set[str] = None,
recursive: bool = True, batch_size: int = 5,
delay: float = 2.0) -> None:
"""
Exécute l'ingestion des fichiers d'un répertoire.
Args:
directory: Répertoire à scanner
extensions: Extensions à prendre en compte
@ -169,34 +169,34 @@ class PrivateGPTIngestor:
delay: Délai entre chaque lot (en secondes)
"""
files_to_ingest = self.scan_directory(directory, extensions, recursive)
if not files_to_ingest:
logger.info("Aucun nouveau fichier à injecter")
return
total_files = len(files_to_ingest)
successful = 0
failed = 0
for i, file_path in enumerate(files_to_ingest):
logger.info(f"Progression: {i+1}/{total_files}")
if self.ingest_file(file_path):
successful += 1
else:
failed += 1
# Pause après chaque lot
if (i + 1) % batch_size == 0 and i < total_files - 1:
logger.info(f"Pause de {delay} secondes après le lot de {batch_size} fichiers")
time.sleep(delay)
logger.info(f"Ingestion terminée: {successful} réussis, {failed} échoués sur {total_files} fichiers")
def parse_args():
"""Parse les arguments de ligne de commande."""
parser = argparse.ArgumentParser(description="Outil d'injection automatique pour Private GPT")
parser.add_argument("--directory", "-d", type=str, required=True,
help="Répertoire contenant les fichiers à injecter")
parser.add_argument("--api-url", type=str, default="http://localhost:8001",
@ -215,15 +215,15 @@ def parse_args():
help="Mode surveillance: vérifier périodiquement les nouveaux fichiers")
parser.add_argument("--watch-interval", type=int, default=300,
help="Intervalle de surveillance en secondes (défaut: 300)")
return parser.parse_args()
def main():
"""Fonction principale."""
args = parse_args()
ingestor = PrivateGPTIngestor(api_url=args.api_url)
# Option pour lister les documents
if args.list:
documents = ingestor.list_documents()
@ -231,7 +231,7 @@ def main():
for doc in documents:
print(f"- {doc.get('doc_id')}: {doc.get('doc_metadata', {}).get('file_name', 'Inconnu')}")
return
# Conversion des extensions
extensions = None
if args.extensions:
@ -240,7 +240,7 @@ def main():
if not ext.startswith('.'):
ext = '.' + ext
extensions.add(ext.lower())
# Mode surveillance
if args.watch:
logger.info(f"Mode surveillance activé. Vérification toutes les {args.watch_interval} secondes")
@ -248,7 +248,7 @@ def main():
while True:
start_time = datetime.now()
logger.info(f"Démarrage d'un scan à {start_time.strftime('%H:%M:%S')}")
ingestor.run_ingestion(
directory=args.directory,
extensions=extensions,
@ -256,11 +256,11 @@ def main():
batch_size=args.batch_size,
delay=args.delay
)
# Calcul du temps à attendre
elapsed = (datetime.now() - start_time).total_seconds()
wait_time = max(0, args.watch_interval - elapsed)
if wait_time > 0:
logger.info(f"En attente pendant {wait_time:.1f} secondes jusqu'au prochain scan")
time.sleep(wait_time)
@ -277,4 +277,4 @@ def main():
)
if __name__ == "__main__":
main()
main()