feat(audit): audit qualité complet — 907→0 erreurs ruff + fix multiselect labels

- Correction des 907 erreurs ruff (pathlib, imports, nommage, simplifications, docstrings)
- Fix déduplication labels dans multiselect nœuds d'arrivée (analyse)
- Expansion 1→N label→IDs pour le Sankey (Pays d'opération)
- Ajout CLAUDE.md et document de design de l'audit
- Mise à jour .gitignore (artefacts tests exploratoires)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Stéphan Peccini 2026-03-02 11:52:01 +01:00
parent b97bbfe0ed
commit 6d2e877341
Signed by: stephan
GPG Key ID: 3A9774E9CCBF3501
60 changed files with 908 additions and 770 deletions

8
.gitignore vendored
View File

@ -46,6 +46,14 @@ htmlcov/
.coverage
bandit-report.json
bandit-report.txt
.bandit
requirements-dev.txt
SECURITY_AUDIT.md
security_check.sh
# Fichiers temporaires batch_ia
batch_ia/temp_sections/
# Artefacts de tests exploratoires
test-fabnum-*.png
test_schema.txt

65
CLAUDE.md Normal file
View File

@ -0,0 +1,65 @@
# FabNum
## Description
Application Streamlit d'analyse de risques géopolitiques pour les chaînes d'approvisionnement numériques. Modélise les dépendances (produits, composants, minerais, pays) sous forme de graphe orienté et calcule des indices de criticité (IHH, ICS, IVC, ISG).
## Architecture
- `fabnum.py` — Point d'entrée Streamlit (page d'accueil, instructions)
- `config.py` — Variables d'environnement (Gitea, fiches criticité, `.env` / `.env.local`)
- `app/` — Pages Streamlit organisées par fonctionnalité :
- `analyse/` — Analyse de criticité et visualisation Sankey
- `fiches/` — Gestion des fiches de criticité (IHH, ICS, IVC, ISG)
- `visualisations/` — Graphes et visualisations interactives
- `ia_nalyse/` — Analyse assistée par IA
- `plan_d_action/` — Plans d'action et recommandations
- `personnalisation/` — Personnalisation de l'interface
- `components/` — Composants UI partagés (header, footer, sidebar, connexion)
- `utils/` — Utilitaires métier (graphe, persistance, Gitea, logs, traductions)
- `scripts/` — Scripts d'ingestion et génération (auto_ingest, generer_analyse)
- `tests/` — Tests pytest (unit, integration, fixtures)
- `IA/`, `batch_ia/` — Modules IA (priorité basse, exclus du linting)
## Stack technique
- **Python** >= 3.10
- **Streamlit** 1.45 — Interface web
- **NetworkX** + **PyGraphviz** — Modélisation graphe (format DOT)
- **Plotly** / **Altair** — Visualisations
- **Pandas** / **NumPy** — Traitement de données
- **Requests** — API Gitea (fiches, schéma)
- **Jinja2** / **pypandoc** — Génération PDF
- **pytest** — Tests (8 fichiers, tests/unit/)
- **ruff** — Linter et formateur
## Conventions de code
- Linter : `ruff` (config complète dans `pyproject.toml`, line-length=120)
- Tests : `pytest` avec markers `unit` et `integration`
- Style docstrings : convention Google, en francais
- Variable graphe : `G` (convention NetworkX, ignoree par N803/N806)
- Exclusions ruff : `IA/`, `batch_ia/`, `pgpt/` (priorite basse)
- Imports tries par isort (first-party : app, utils, batch_ia)
- Quotes doubles, indentation espaces
## Commandes utiles
```bash
streamlit run fabnum.py # Lancer l'application
python -m pytest tests/unit/ -v # Lancer les tests unitaires
python -m pytest tests/ -v # Tous les tests
ruff check . # Verifier le code
ruff check --fix . # Corriger automatiquement
ruff format . # Formater le code
```
## Points d'attention
- `fabnum.py` doit appeler `st.set_page_config()` avant tout import de modules app (E402 ignore)
- `utils.persistance.update_session_paths()` est appele en tout premier dans fabnum.py
- Les variables d'environnement critiques (FICHE_IHH, ICS, IVC, ISG) sont obligatoires (OSError si absentes)
- Le graphe est lu depuis un fichier DOT (Graphviz) et manipule via NetworkX DiGraph
- Deux environnements : `dev` (defaut) et `public` (detecte via header Nginx X-Environment)
- Donnees stockees sur Gitea (fiches criticite, schema de dependances)
- Les tests utilisent des fixtures dans `tests/fixtures/` (sample_graph.dot, config YAML)

View File

@ -1,18 +1,17 @@
#!/usr/bin/env python3
import os
import re
import yaml
import requests
import argparse
import logging
import os
import re
import requests
import yaml
# Import des fonctions de génération
from app.fiches.generer import (
generer_fiche
)
from app.fiches.generer import generer_fiche
from app.fiches.utils.fiche_utils import load_seuils
from config import FICHES_CRITICITE, GITEA_TOKEN
from utils.gitea import charger_arborescence_fiches
from config import GITEA_TOKEN, FICHES_CRITICITE
# Configuration du logging
logging.basicConfig(

View File

@ -1,12 +1,10 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script d'analyse de la structure du graphe DOT pour comprendre
"""Script d'analyse de la structure du graphe DOT pour comprendre
comment intégrer l'ISG dans le générateur de template.
"""
import os
from pathlib import Path
from networkx.drawing.nx_agraph import read_dot
# Chemins
@ -16,14 +14,14 @@ GRAPH_PATH = BASE_DIR / "graphe.dot"
def analyze_graph_structure(dot_path):
"""Analyse la structure du graphe et affiche ses caractéristiques."""
print(f"Analyse du fichier: {dot_path}")
# Lire le graphe
G = read_dot(dot_path)
# Informations de base
print(f"Nombre total de nœuds: {len(G.nodes())}")
print(f"Nombre total d'arêtes: {len(G.edges())}")
# Analyse des attributs des nœuds
node_attrs = {}
for node, attrs in G.nodes(data=True):
@ -31,13 +29,13 @@ def analyze_graph_structure(dot_path):
if key not in node_attrs:
node_attrs[key] = set()
node_attrs[key].add(attrs[key])
print("\nAttributs des nœuds:")
for attr, values in node_attrs.items():
print(f"- {attr}: {len(values)} valeurs différentes")
if len(values) < 20: # Afficher seulement si le nombre de valeurs est raisonnable
print(f" Valeurs: {', '.join(sorted(values))}")
# Analyse des niveaux (si l'attribut existe)
if 'level' in node_attrs:
print("\nAnalyse par niveau:")
@ -48,7 +46,7 @@ def analyze_graph_structure(dot_path):
if level not in levels:
levels[level] = []
levels[level].append(node)
for level, nodes in sorted(levels.items()):
print(f"- Niveau {level}: {len(nodes)} nœuds")
# Afficher quelques exemples
@ -56,14 +54,14 @@ def analyze_graph_structure(dot_path):
print(f" Exemples: {', '.join(nodes)}")
else:
print(f" Exemples: {', '.join(nodes[:3])}... (et {len(nodes)-3} autres)")
# Analyse des attributs ISG
print("\nRecherche des attributs ISG:")
isg_nodes = []
for node, attrs in G.nodes(data=True):
if 'isg' in attrs:
isg_nodes.append((node, attrs['isg']))
if isg_nodes:
print(f"- {len(isg_nodes)} nœuds avec attribut ISG")
print(" Exemples:")
@ -71,24 +69,24 @@ def analyze_graph_structure(dot_path):
print(f" - {node}: ISG = {isg}")
else:
print("- Aucun nœud avec attribut ISG trouvé")
# Analyse des connexions pour les nœuds critiques (IHH)
print("\nAnalyse des nœuds avec IHH:")
ihh_nodes = []
for node, attrs in G.nodes(data=True):
if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs:
ihh_value_pays = attrs.get('ihh_pays', 'N/A')
ihh_value_pays = attrs.get('ihh_pays', 'N/A')
ihh_value_acteurs = attrs.get('ihh_acteurs', 'N/A')
ihh_nodes.append((node, ihh_value_pays, ihh_value_acteurs))
if ihh_nodes:
print(f"- {len(ihh_nodes)} nœuds avec attributs IHH")
print(" Exemples:")
for node, ihh_pays, ihh_acteurs in ihh_nodes[:5]:
print(f" - {node}: IHH pays = {ihh_pays}, IHH acteurs = {ihh_acteurs}")
# Analyser les connexions de ce nœud
print(f" Connexions sortantes:")
print(" Connexions sortantes:")
out_edges = list(G.out_edges(node))
if out_edges:
for i, (_, target) in enumerate(out_edges[:3]):
@ -97,8 +95,8 @@ def analyze_graph_structure(dot_path):
print(f" - ... et {len(out_edges)-3} autres")
else:
print(" - Aucune connexion sortante")
print(f" Connexions entrantes:")
print(" Connexions entrantes:")
in_edges = list(G.in_edges(node))
if in_edges:
for i, (source, _) in enumerate(in_edges[:3]):
@ -116,14 +114,14 @@ def analyze_graph_structure(dot_path):
for node, attrs in G.nodes(data=True):
if attrs.get('level') == '99':
level_99_nodes.append(node)
if level_99_nodes:
print(f"- {len(level_99_nodes)} nœuds de niveau 99")
print(" Exemples:")
for node in level_99_nodes[:5]:
print(f" - {node}")
# Analyser les connexions de ce nœud
print(f" Connexions entrantes:")
print(" Connexions entrantes:")
in_edges = list(G.in_edges(node))
if in_edges:
for i, (source, _) in enumerate(in_edges[:3]):
@ -134,14 +132,14 @@ def analyze_graph_structure(dot_path):
print(" - Aucune connexion entrante")
else:
print("- Aucun nœud de niveau 99 trouvé")
def check_isg_paths(dot_path):
"""Vérifie les chemins entre les nœuds critiques (IHH) et les nœuds ISG."""
print("\nAnalyse des chemins entre nœuds IHH et nœuds ISG:")
# Lire le graphe
G = read_dot(dot_path)
# Identifier les nœuds avec IHH
ihh_nodes = []
for node, attrs in G.nodes(data=True):
@ -154,43 +152,43 @@ def check_isg_paths(dot_path):
ihh_nodes.append(node)
except (ValueError, TypeError):
pass
if not ihh_nodes:
print("- Aucun nœud IHH critique trouvé")
return
print(f"- {len(ihh_nodes)} nœuds IHH critiques identifiés")
# Pour chaque nœud IHH critique, chercher des chemins vers des nœuds ISG
for node in ihh_nodes[:5]: # Limiter à 5 exemples
print(f"\n Analyse des chemins pour {node}:")
# Analyser les voisins directs
successors = list(G.successors(node))
print(f" - {len(successors)} successeurs directs")
if successors:
for succ in successors[:3]:
print(f" - Vers {succ}")
# Vérifier les attributs de ce successeur
succ_attrs = G.nodes[succ]
print(f" Attributs: {', '.join(f'{k}={v}' for k, v in succ_attrs.items() if k in ['level', 'isg'])}")
# Chercher les successeurs de niveau 2
succ2 = list(G.successors(succ))
print(f" {len(succ2)} successeurs de niveau 2")
if succ2:
for s2 in succ2[:2]:
print(f" - Vers {s2}")
s2_attrs = G.nodes[s2]
print(f" Attributs: {', '.join(f'{k}={v}' for k, v in s2_attrs.items() if k in ['level', 'isg'])}")
# Chercher encore plus loin si nécessaire
succ3 = list(G.successors(s2))
print(f" {len(succ3)} successeurs de niveau 3")
if succ3:
for s3 in succ3[:2]:
print(f" - Vers {s3}")
@ -201,9 +199,9 @@ def main():
"""Fonction principale."""
print("=== Analyse de la structure du graphe ===")
analyze_graph_structure(GRAPH_PATH)
print("\n=== Analyse des chemins entre nœuds critiques et ISG ===")
check_isg_paths(GRAPH_PATH)
if __name__ == "__main__":
main()
main()

View File

@ -1,16 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import sys
from collections import defaultdict
def extract_paths(file_path):
"""Extrait tous les chemins du fichier rapport_template.md"""
paths = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
with open(file_path, encoding='utf-8') as f:
for line in f:
# Extraire les lignes qui commencent par "Corpus/"
if line.strip().startswith("Corpus/"):
@ -18,7 +16,7 @@ def extract_paths(file_path):
except Exception as e:
print(f"Erreur lors de la lecture du fichier {file_path}: {e}")
sys.exit(1)
return paths
def check_paths(paths, base_dir):
@ -28,17 +26,17 @@ def check_paths(paths, base_dir):
"missing": [],
"problematic": [] # Chemins qui pourraient nécessiter des corrections
}
for path in paths:
# Vérifier si le chemin est absolu ou relatif
abs_path = os.path.join(base_dir, path)
if os.path.exists(abs_path):
results["existing"].append(path)
else:
# Essayer de détecter des problèmes potentiels
problem_detected = False
# Vérifier les chemins avec "Fiche minerai" ou "Fiche fabrication"
if "Fiche minerai" in path or "Fiche fabrication" in path:
# Problème courant: mauvaise casse ou absence du mot "minerai"
@ -48,25 +46,25 @@ def check_paths(paths, base_dir):
if os.path.exists(os.path.join(base_dir, corrected_path)):
results["problematic"].append((path, corrected_path, "Mot 'minerai' manquant"))
problem_detected = True
# Vérifier les chemins SSD
if "SSD25" in path:
corrected_path = path.replace("SSD25", "SSD 2.5")
if os.path.exists(os.path.join(base_dir, corrected_path)):
results["problematic"].append((path, corrected_path, "Format 'SSD25' au lieu de 'SSD 2.5'"))
problem_detected = True
# Si aucun problème spécifique n'a été détecté, marquer comme manquant
if not problem_detected:
results["missing"].append(path)
return results
def find_similar_paths(missing_path, base_dir):
"""Essaie de trouver des chemins similaires pour aider à diagnostiquer le problème"""
missing_parts = missing_path.split('/')
similar_paths = []
# Rechercher dans les sous-répertoires correspondants
search_dir = os.path.join(base_dir, *missing_parts[:-1])
if os.path.exists(search_dir):
@ -74,7 +72,7 @@ def find_similar_paths(missing_path, base_dir):
if file.endswith('.md'):
similar_path = os.path.join(search_dir, file).replace(base_dir + '/', '')
similar_paths.append(similar_path)
# Si aucun chemin similaire n'est trouvé, remonter d'un niveau
if not similar_paths and len(missing_parts) > 2:
parent_dir = os.path.join(base_dir, *missing_parts[:-2])
@ -87,36 +85,36 @@ def find_similar_paths(missing_path, base_dir):
if file.endswith('.md'):
similar_path = os.path.join(dir_path, file).replace(base_dir + '/', '')
similar_paths.append(similar_path)
return similar_paths
def main():
# Vérifier que nous sommes dans le bon répertoire
script_dir = os.path.dirname(os.path.abspath(__file__))
base_dir = script_dir
# Chemin vers le rapport_template.md
template_path = os.path.join(base_dir, "Corpus", "rapport_template.md")
if not os.path.exists(template_path):
print(f"Erreur: Le fichier {template_path} n'existe pas.")
sys.exit(1)
print("=== Vérification des chemins dans rapport_template.md ===")
# Extraire les chemins
paths = extract_paths(template_path)
print(f"Nombre total de chemins trouvés: {len(paths)}")
# Vérifier les chemins
results = check_paths(paths, base_dir)
# Afficher les résultats
print("\n=== Résultats ===")
print(f"Chemins existants: {len(results['existing'])}")
print(f"Chemins manquants: {len(results['missing'])}")
print(f"Chemins problématiques: {len(results['problematic'])}")
# Afficher les chemins manquants
if results["missing"]:
print("\n=== Chemins manquants ===")
@ -127,7 +125,7 @@ def main():
print(" Chemins similaires trouvés:")
for sim_path in similar[:3]: # Limiter à 3 suggestions
print(f" * {sim_path}")
# Afficher les chemins problématiques avec suggestions
if results["problematic"]:
print("\n=== Chemins problématiques ===")
@ -135,7 +133,7 @@ def main():
print(f"- {orig}")
print(f" Suggestion: {corrected}")
print(f" Raison: {reason}")
# Résumé
if not results["missing"] and not results["problematic"]:
print("\nTous les chemins dans le rapport sont valides !")
@ -143,4 +141,4 @@ def main():
print("\nDes chemins problématiques ont été détectés. Veuillez corriger les erreurs.")
if __name__ == "__main__":
main()
main()

View File

@ -1,19 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de génération de template de rapport IHH/ISG.
"""Script de génération de template de rapport IHH/ISG.
Ce script lit un fichier DOT, identifie les éléments Orange ou Rouge,
et génère un template avec les chemins d'accès aux sections à récupérer.
"""
import os
import re
import glob
import yaml
import argparse
import unicodedata
import networkx as nx
import glob
import os
from pathlib import Path
import yaml
from networkx.drawing.nx_agraph import read_dot
# Chemins de base
@ -36,7 +32,7 @@ CRIT_PATH = 'Criticités'
def load_config():
"""Charge les seuils depuis le fichier de configuration"""
try:
with open(CONFIG_PATH, 'r') as file:
with open(CONFIG_PATH) as file:
config = yaml.safe_load(file)
return config.get('seuils', {})
except Exception as e:
@ -518,16 +514,14 @@ def find_real_path(base_dir, resource_type, resource_name, file_pattern, G=None)
if resource_type == "produit":
if is_principaux_pattern:
return f"Assemblage/Fiche assemblage {best_label}/02-principaux-assembleurs.md"
else:
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/10-assemblage-{best_label}/00-indice-de-herfindahl-hirschmann.md"
elif resource_type == "composant":
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/10-assemblage-{best_label}/00-indice-de-herfindahl-hirschmann.md"
if resource_type == "composant":
if is_principaux_pattern:
return f"Fabrication/Fiche fabrication {best_label}/02-principaux-fabricants.md"
else:
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/20-fabrication-{best_label}/00-indice-de-herfindahl-hirschmann.md"
elif resource_type == "minerai":
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/20-fabrication-{best_label}/00-indice-de-herfindahl-hirschmann.md"
if resource_type == "minerai":
operation = file_pattern.split("-")[-1] if "-" in file_pattern else ""
if is_principaux_pattern:
# Choisir le numéro du préfixe en fonction de l'opération
@ -535,9 +529,8 @@ def find_real_path(base_dir, resource_type, resource_name, file_pattern, G=None)
# S'assurer que le nom du minerai est correctement formaté (sans tiret à la fin et en minuscules)
clean_label = best_label.rstrip(" -").lower()
return f"Minerai/Fiche minerai {clean_label}/{prefix_num}-principaux-producteurs-{operation}.md"
else:
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/30-{operation}-{best_label}/00-indice-de-herfindahl-hirschmann.md"
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/30-{operation}-{best_label}/00-indice-de-herfindahl-hirschmann.md"
return ""
@ -834,19 +827,19 @@ def generate_template(elements, isg_data=None, G=None):
# Indices documentaires
try:
has_ihh = any(element['ihh_critique'] for element in elements)
except:
except (KeyError, TypeError):
has_ihh = False
try:
has_isg = any(element['isg_critique'] for element in elements)
except:
except (KeyError, TypeError):
has_isg = False
try:
has_ivc = any(element['ivc_critique'] for element in elements)
except:
except (KeyError, TypeError):
has_ivc = False
try:
has_ics = any(element['ics_critique'] for element in elements)
except:
except (KeyError, TypeError):
has_ics = False
if has_ihh:
@ -1196,16 +1189,14 @@ def get_component_label(G, component_name):
if node.lower() == component_name.lower():
if 'label' in attrs:
return attrs['label'], True
else:
return component_name.capitalize(), True
return component_name.capitalize(), True
# Si pas de correspondance exacte, chercher un nœud qui contient le nom du composant
for node, attrs in G.nodes(data=True):
if component_name.lower() in node.lower():
if 'label' in attrs:
return attrs['label'], True
else:
return component_name.capitalize(), True
return component_name.capitalize(), True
# Si toujours pas trouvé, le composant n'existe pas dans le graphe
return component_name.capitalize(), False

View File

@ -1,7 +1,5 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour remplacer les références de chemins dans le rapport par le contenu des fichiers.
"""Script pour remplacer les références de chemins dans le rapport par le contenu des fichiers.
Ajuste automatiquement les niveaux de titres pour maintenir la hiérarchie.
"""
@ -90,7 +88,7 @@ def process_report():
return
# Lire le rapport
with open(INPUT_PATH, 'r', encoding='utf-8') as f:
with open(INPUT_PATH, encoding='utf-8') as f:
lines = f.readlines()
output_lines = []
@ -115,7 +113,7 @@ def process_report():
# Vérifier si c'est une section d'Indice de Vulnérabilité de Concurrence
is_ivc_section = "Vulnérabilité de Concurrence" in line or "/ivc-" in path.lower() or "/fiche technique ivc/" in path.lower()
with open(full_path, 'r', encoding='utf-8') as f:
with open(full_path, encoding='utf-8') as f:
content = f.read()
# Ajuster les niveaux de titres

View File

@ -1,21 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script d'injection automatique de documents pour PrivateGPT
"""Script d'injection automatique de documents pour PrivateGPT
Ce script parcourt un répertoire spécifié et injecte tous les fichiers
compatibles dans PrivateGPT via son API REST.
"""
import argparse
import logging
import os
import sys
import time
import argparse
import logging
import requests
from pathlib import Path
from typing import List, Dict, Tuple, Set
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import requests
# Configuration du logging
logging.basicConfig(
@ -89,9 +87,8 @@ def parse_arguments():
return parser.parse_args()
def find_files(directory: str, recursive: bool = False,
extensions: Set[str] = SUPPORTED_EXTENSIONS) -> List[Path]:
"""
Trouve tous les fichiers avec les extensions spécifiées dans le répertoire.
extensions: set[str] = SUPPORTED_EXTENSIONS) -> list[Path]:
"""Trouve tous les fichiers avec les extensions spécifiées dans le répertoire.
Args:
directory: Répertoire à scanner
@ -125,9 +122,8 @@ def find_files(directory: str, recursive: bool = False,
return files
def ingest_file(file_path: Path, pgpt_url: str, timeout: int,
retry_count: int, retry_delay: int) -> Tuple[Path, bool, str]:
"""
Injecte un fichier dans PrivateGPT.
retry_count: int, retry_delay: int) -> tuple[Path, bool, str]:
"""Injecte un fichier dans PrivateGPT.
Args:
file_path: Chemin du fichier à injecter
@ -154,14 +150,13 @@ def ingest_file(file_path: Path, pgpt_url: str, timeout: int,
doc_ids = result.get('document_ids', [])
logger.info(f"Succès! {file_path} -> {len(doc_ids)} documents créés")
return file_path, True, f"{len(doc_ids)} documents créés"
error_msg = f"Erreur HTTP {response.status_code}: {response.text}"
logger.warning(error_msg)
if attempt < retry_count - 1:
logger.info(f"Nouvelle tentative dans {retry_delay} secondes...")
time.sleep(retry_delay)
else:
error_msg = f"Erreur HTTP {response.status_code}: {response.text}"
logger.warning(error_msg)
if attempt < retry_count - 1:
logger.info(f"Nouvelle tentative dans {retry_delay} secondes...")
time.sleep(retry_delay)
else:
return file_path, False, error_msg
return file_path, False, error_msg
except Exception as e:
error_msg = f"Exception: {str(e)}"

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""
Script de nettoyage pour PrivateGPT
"""Script de nettoyage pour PrivateGPT
Ce script permet de lister et supprimer les documents ingérés dans PrivateGPT.
Options:
@ -12,18 +11,18 @@ Options:
Usage:
python nettoyer_pgpt.py --list
python nettoyer_pgpt.py --delete-prefix "temp_section_"
python nettoyer_pgpt.py --delete-pattern "rapport_.*\.md"
python nettoyer_pgpt.py --delete-pattern "rapport_.*\\.md"
python nettoyer_pgpt.py --delete-all
"""
import argparse
import json
import re
import requests
import sys
import time
import uuid
from typing import List, Dict, Any, Optional
from typing import Any
import requests
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
@ -37,57 +36,56 @@ def check_api_availability() -> bool:
if response.status_code == 200:
print("✅ API PrivateGPT disponible")
return True
else:
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
except requests.RequestException as e:
print(f"❌ Erreur de connexion à l'API PrivateGPT: {e}")
return False
def list_documents() -> List[Dict[str, Any]]:
def list_documents() -> list[dict[str, Any]]:
"""Liste tous les documents ingérés et renvoie la liste des métadonnées"""
try:
# Récupérer la liste des documents
response = requests.get(f"{API_URL}/ingest/list")
response.raise_for_status()
data = response.json()
# Format de réponse OpenAI
if "data" in data:
documents = data.get("data", [])
# Format alternatif
else:
documents = data.get("documents", [])
# Construire une liste normalisée des documents
normalized_docs = []
for doc in documents:
doc_id = doc.get("doc_id") or doc.get("id")
metadata = doc.get("doc_metadata", {})
filename = metadata.get("file_name") or metadata.get("filename", "Inconnu")
normalized_docs.append({
"id": doc_id,
"filename": filename,
"metadata": metadata
})
return normalized_docs
except Exception as e:
print(f"❌ Erreur lors de la récupération des documents: {e}")
return []
def print_documents(documents: List[Dict[str, Any]]) -> None:
def print_documents(documents: list[dict[str, Any]]) -> None:
"""Affiche la liste des documents de façon lisible"""
if not documents:
print("📋 Aucun document trouvé dans PrivateGPT")
return
print(f"📋 {len(documents)} documents trouvés dans PrivateGPT:")
# Regrouper par nom de fichier pour un affichage plus compact
files_grouped = {}
for doc in documents:
@ -95,7 +93,7 @@ def print_documents(documents: List[Dict[str, Any]]) -> None:
if filename not in files_grouped:
files_grouped[filename] = []
files_grouped[filename].append(doc["id"])
# Afficher les résultats groupés
for i, (filename, ids) in enumerate(files_grouped.items(), 1):
print(f"{i}. {filename} ({len(ids)} chunks)")
@ -110,36 +108,34 @@ def delete_document(doc_id: str) -> bool:
response = requests.delete(f"{API_URL}/ingest/{doc_id}")
if response.status_code == 200:
return True
else:
print(f"⚠️ Échec de la suppression de l'ID {doc_id}: Code {response.status_code}")
return False
print(f"⚠️ Échec de la suppression de l'ID {doc_id}: Code {response.status_code}")
return False
except Exception as e:
print(f"❌ Erreur lors de la suppression de l'ID {doc_id}: {e}")
return False
def delete_documents_by_criteria(documents: List[Dict[str, Any]],
prefix: Optional[str] = None,
pattern: Optional[str] = None,
def delete_documents_by_criteria(documents: list[dict[str, Any]],
prefix: str | None = None,
pattern: str | None = None,
delete_all: bool = False) -> int:
"""
Supprime des documents selon différents critères
"""Supprime des documents selon différents critères
Retourne le nombre de documents supprimés
"""
if not documents:
print("❌ Aucun document à supprimer")
return 0
if not (prefix or pattern or delete_all):
print("❌ Aucun critère de suppression spécifié")
return 0
# Comptage des suppressions réussies
success_count = 0
# Filtrer les documents à supprimer
docs_to_delete = []
if delete_all:
docs_to_delete = documents
print(f"🗑️ Suppression de tous les documents ({len(documents)} chunks)...")
@ -154,24 +150,24 @@ def delete_documents_by_criteria(documents: List[Dict[str, Any]],
except re.error as e:
print(f"❌ Expression régulière invalide: {e}")
return 0
# Demander confirmation si beaucoup de documents
if len(docs_to_delete) > 5 and not args.force:
confirm = input(f"⚠️ Vous êtes sur le point de supprimer {len(docs_to_delete)} chunks. Confirmer ? (o/N) ")
if confirm.lower() != 'o':
print("❌ Opération annulée")
return 0
# Supprimer les documents
for doc in docs_to_delete:
if delete_document(doc["id"]):
success_count += 1
if args.verbose:
print(f"✅ Document supprimé: {doc['filename']} (ID: {doc['id']})")
# Petite pause pour éviter de surcharger l'API
time.sleep(0.1)
print(f"{success_count}/{len(docs_to_delete)} documents supprimés avec succès")
return success_count
@ -184,7 +180,7 @@ def generate_unique_prefix() -> str:
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Utilitaire de nettoyage pour PrivateGPT")
# Options principales
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--list", action="store_true", help="Lister tous les documents ingérés")
@ -192,27 +188,27 @@ if __name__ == "__main__":
group.add_argument("--delete-pattern", type=str, help="Supprimer les documents dont le nom correspond au motif PATTERN (regex)")
group.add_argument("--delete-all", action="store_true", help="Supprimer tous les documents (⚠️ DANGER)")
group.add_argument("--generate-prefix", action="store_true", help="Générer un préfixe unique pour les fichiers temporaires")
# Options additionnelles
parser.add_argument("--force", action="store_true", help="Ne pas demander de confirmation")
parser.add_argument("--verbose", action="store_true", help="Afficher plus de détails")
args = parser.parse_args()
# Vérifier la disponibilité de l'API
if not check_api_availability():
sys.exit(1)
# Générer un préfixe unique
if args.generate_prefix:
unique_prefix = generate_unique_prefix()
print(f"🔑 Préfixe unique généré: {unique_prefix}")
print(f"Utilisez ce préfixe pour les fichiers temporaires de votre script.")
print("Utilisez ce préfixe pour les fichiers temporaires de votre script.")
sys.exit(0)
# Récupérer la liste des documents
documents = list_documents()
# Traiter selon l'option choisie
if args.list:
print_documents(documents)
@ -221,4 +217,4 @@ if __name__ == "__main__":
elif args.delete_pattern:
delete_documents_by_criteria(documents, pattern=args.delete_pattern)
elif args.delete_all:
delete_documents_by_criteria(documents, delete_all=True)
delete_documents_by_criteria(documents, delete_all=True)

View File

@ -1,30 +1,26 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de surveillance de répertoire pour l'injection automatique dans PrivateGPT
"""Script de surveillance de répertoire pour l'injection automatique dans PrivateGPT
Ce script surveille un répertoire et injecte automatiquement les nouveaux fichiers
dans PrivateGPT dès qu'ils sont ajoutés ou modifiés.
"""
import argparse
import logging
import os
import subprocess
import sys
import time
import logging
import argparse
import subprocess
from pathlib import Path
from typing import Set, Dict, List
from datetime import datetime
try:
from watchdog.events import FileCreatedEvent, FileModifiedEvent, FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
except ImportError:
print("Bibliothèque 'watchdog' non installée. Installation en cours...")
subprocess.run([sys.executable, "-m", "pip", "install", "watchdog"])
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
# Configuration du logging
logging.basicConfig(
@ -39,17 +35,16 @@ logger = logging.getLogger(__name__)
# Extensions de fichiers couramment supportées par PrivateGPT
SUPPORTED_EXTENSIONS = {
'.pdf', '.txt', '.md', '.doc', '.docx', '.ppt', '.pptx',
'.pdf', '.txt', '.md', '.doc', '.docx', '.ppt', '.pptx',
'.xls', '.xlsx', '.csv', '.epub', '.html', '.htm'
}
class DocumentHandler(FileSystemEventHandler):
"""Gestionnaire d'événements pour les fichiers de documents."""
def __init__(self, watch_dir: str, ingest_script: str, pgpt_url: str,
extensions: Set[str], delay: int = 5):
"""
Initialise le gestionnaire d'événements.
def __init__(self, watch_dir: str, ingest_script: str, pgpt_url: str,
extensions: set[str], delay: int = 5):
"""Initialise le gestionnaire d'événements.
Args:
watch_dir: Répertoire à surveiller
@ -63,10 +58,10 @@ class DocumentHandler(FileSystemEventHandler):
self.pgpt_url = pgpt_url
self.extensions = extensions
self.delay = delay
# Queue pour les fichiers en attente de traitement
self.pending_files: Dict[str, float] = {}
self.pending_files: dict[str, float] = {}
# Vérifier que le script d'injection existe
if not os.path.exists(self.ingest_script):
logger.error(f"Le script d'injection {self.ingest_script} n'existe pas!")
@ -76,49 +71,48 @@ class DocumentHandler(FileSystemEventHandler):
"""Appelé lorsqu'un fichier est créé."""
if not event.is_directory:
self._handle_file_event(event)
def on_modified(self, event):
"""Appelé lorsqu'un fichier est modifié."""
if not event.is_directory:
self._handle_file_event(event)
def _handle_file_event(self, event):
"""Traite un événement de fichier (création ou modification)."""
file_path = event.src_path
file_ext = os.path.splitext(file_path)[1].lower()
# Ignorer les fichiers non supportés
if file_ext not in self.extensions:
return
# Ignorer les fichiers temporaires et cachés
file_name = os.path.basename(file_path)
if file_name.startswith('.') or file_name.startswith('~') or file_name.endswith('.tmp'):
return
# Ajouter à la queue avec l'horodatage actuel
self.pending_files[file_path] = time.time()
logger.info(f"Fichier détecté: {file_path} (en attente pendant {self.delay} secondes)")
def process_pending_files(self):
"""Traite les fichiers en attente qui ont dépassé le délai d'attente."""
current_time = time.time()
files_to_process: List[str] = []
files_to_process: list[str] = []
# Identifier les fichiers prêts à être traités
for file_path, timestamp in list(self.pending_files.items()):
if current_time - timestamp >= self.delay:
if os.path.exists(file_path): # Vérifier que le fichier existe toujours
files_to_process.append(file_path)
self.pending_files.pop(file_path)
# Traiter les fichiers par lot
if files_to_process:
self._ingest_files(files_to_process)
def _ingest_files(self, files: List[str]):
"""
Injecte une liste de fichiers en utilisant le script d'injection.
def _ingest_files(self, files: list[str]):
"""Injecte une liste de fichiers en utilisant le script d'injection.
Args:
files: Liste des chemins de fichiers à injecter
@ -127,20 +121,20 @@ class DocumentHandler(FileSystemEventHandler):
# Créer un répertoire temporaire pour stocker la liste des fichiers
temp_dir = os.path.join(os.path.dirname(self.ingest_script), "temp")
os.makedirs(temp_dir, exist_ok=True)
# Créer un fichier de liste
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
list_file = os.path.join(temp_dir, f"files_to_ingest_{timestamp}.txt")
with open(list_file, "w") as f:
for file_path in files:
f.write(f"{file_path}\n")
# Construire la commande pour le script d'injection
for file_path in files:
file_dir = os.path.dirname(file_path)
logger.info(f"Injection de {file_path}...")
cmd = [
sys.executable,
self.ingest_script,
@ -148,19 +142,19 @@ class DocumentHandler(FileSystemEventHandler):
"-u", self.pgpt_url,
"--extensions", os.path.splitext(file_path)[1][1:] # Extension sans le point
]
# Exécuter la commande
process = subprocess.run(
cmd,
capture_output=True,
text=True
)
if process.returncode == 0:
logger.info(f"Injection réussie de {file_path}")
else:
logger.error(f"Échec de l'injection de {file_path}: {process.stderr}")
except Exception as e:
logger.error(f"Erreur lors de l'injection des fichiers: {str(e)}")
@ -170,26 +164,26 @@ def parse_arguments():
description="Surveille un répertoire et injecte automatiquement les nouveaux fichiers dans PrivateGPT"
)
parser.add_argument(
"-d", "--directory",
type=str,
"-d", "--directory",
type=str,
required=True,
help="Chemin du répertoire à surveiller"
)
parser.add_argument(
"-s", "--script",
type=str,
"-s", "--script",
type=str,
default=None,
help="Chemin vers le script auto_ingest.py (par défaut: détection automatique)"
)
parser.add_argument(
"-u", "--url",
type=str,
"-u", "--url",
type=str,
default="http://localhost:8001",
help="URL de l'API PrivateGPT (défaut: http://localhost:8001)"
)
parser.add_argument(
"--delay",
type=int,
"--delay",
type=int,
default=5,
help="Délai en secondes avant de traiter un nouveau fichier (défaut: 5)"
)
@ -198,36 +192,36 @@ def parse_arguments():
nargs="+",
help="Liste d'extensions spécifiques à surveiller (ex: pdf txt)"
)
return parser.parse_args()
def main():
"""Fonction principale."""
args = parse_arguments()
# Préparation des extensions si spécifiées
extensions = set(args.extensions) if args.extensions else SUPPORTED_EXTENSIONS
# Assurer que les extensions commencent par un point
extensions = {ext if ext.startswith('.') else f'.{ext}' for ext in extensions}
# Déterminer le chemin du script d'injection
if args.script:
ingest_script = args.script
else:
# Utiliser le script auto_ingest.py dans le même répertoire que ce script
ingest_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "auto_ingest.py")
# Créer le répertoire de surveillance s'il n'existe pas
watch_dir = os.path.abspath(args.directory)
if not os.path.exists(watch_dir):
logger.info(f"Création du répertoire de surveillance: {watch_dir}")
os.makedirs(watch_dir, exist_ok=True)
logger.info(f"Démarrage de la surveillance de {watch_dir}")
logger.info(f"URL PrivateGPT: {args.url}")
logger.info(f"Extensions surveillées: {', '.join(extensions)}")
logger.info(f"Délai de traitement: {args.delay} secondes")
# Initialiser le gestionnaire et l'observateur
event_handler = DocumentHandler(
watch_dir=watch_dir,
@ -236,23 +230,23 @@ def main():
extensions=extensions,
delay=args.delay
)
observer = Observer()
observer.schedule(event_handler, path=watch_dir, recursive=True)
observer.start()
try:
logger.info("Surveillance en cours... (Ctrl+C pour quitter)")
while True:
# Traiter les fichiers en attente
event_handler.process_pending_files()
time.sleep(1)
except KeyboardInterrupt:
logger.info("\nInterruption par l'utilisateur. Arrêt de la surveillance.")
observer.stop()
observer.join()
if __name__ == "__main__":
@ -260,4 +254,4 @@ if __name__ == "__main__":
main()
except Exception as e:
logger.error(f"Erreur non gérée: {str(e)}", exc_info=True)
sys.exit(1)
sys.exit(1)

View File

@ -1,15 +1,15 @@
from datetime import datetime
from collections import defaultdict, deque
import os
import sys
from datetime import timezone
from collections import defaultdict, deque
from datetime import datetime, timezone
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
# À adapter dans ton environnement
from config import GITEA_URL, ORGANISATION, DEPOT_FICHES, ENV
from utils.gitea import recuperer_date_dernier_commit
from config import DEPOT_FICHES, ENV, GITEA_URL, ORGANISATION
from IA.make_config import MAKE # MAKE doit être importé depuis un fichier de config
from utils.gitea import recuperer_date_dernier_commit
def get_mtime(path):
try:
@ -42,8 +42,7 @@ def resolve_path_from_where(where_str):
directory = context["directory"]
if "fiches" in where_str:
return os.path.join("Fiches", directory, current)
else:
return os.path.join(directory, current)
return os.path.join(directory, current)
return None

View File

@ -1,4 +1,3 @@
from utils.gitea import recuperer_date_dernier_commit
#
# from config import GITEA_URL, GITEA_TOKEN, ORGANISATION, DEPOT_FICHES, DEPOT_CODE, ENV, ENV_CODE, DOT_FILE
#

View File

@ -5,7 +5,6 @@
Cet outil interactif vous permet d'explorer et d'analyser les vulnérabilités de la chaîne de fabrication du numérique. Grâce à une interface, espérons-le, intuitive, vous pourrez visualiser les différentes étapes de production, identifier les points critiques et comprendre les enjeux géopolitiques et plus liés à la fabrication des technologies numériques.
L'application vous offre diverses fonctionnalités :
* Visualisation des données et des graphiques
* Personnalisation des produits à analyser
* Exploration détaillée des indices de vulnérabilité et des opérations
@ -15,21 +14,18 @@ L'application vous offre diverses fonctionnalités :
### Structure de la chaîne
La chaîne de fabrication numérique se décompose en trois niveaux hiérarchiques :
* Produits finaux : appareils complets comme les smartphones, ordinateurs, serveurs, etc.
* Composants : éléments constitutifs comme les processeurs, écrans, capteurs, batteries, etc.
* Minerais et matériaux : ressources de base nécessaires à la fabrication des composants
### Opérations à chaque niveau
Chaque niveau de la chaîne implique des opérations spécifiques :
* Niveau Produit final : assemblage des composants pour créer le produit fini
* Niveau Composant : fabrication des pièces à partir des minerais et matériaux
* Niveau Minerai : extraction et traitement des ressources premières
### Dimension géopolitique
Pour chaque opération, l'application détaille :
* Les pays où l'opération est réalisée, avec leur part respective du marché mondial
* Les acteurs économiques impliqués dans chaque pays, avec leur part de marché
* Les liens entre les pays d'opération, les acteurs et leur contexte géopolitique
@ -41,33 +37,29 @@ Pour chaque opération, l'application détaille :
L'application utilise quatre indices clés pour évaluer les vulnérabilités dans la chaîne de fabrication numérique :
### Indice de Herfindahl-Hirschmann (IHH)
* Que mesure-t-il ? La concentration géographique ou industrielle d'une opération.
* Comment l'interpréter ? Plus l'indice est élevé, plus l'opération est concentrée dans un nombre limité de pays, ce qui augmente la vulnérabilité.
* Que mesure-t-il ? La concentration géographique ou industrielle d'une opération.
* Comment l'interpréter ? Plus l'indice est élevé, plus l'opération est concentrée dans un nombre limité de pays, ce qui augmente la vulnérabilité.
* Où le trouver ? Associé à chaque opération dans les graphiques d'analyse.
[Voir la fiche détaillée](https://fabnum-dev.peccini.fr/app/static/Fiches/Criticit%C3%A9s/Fiche%20technique%20IHH.pdf)
### Indice de Vulnérabilité Concurrentielle (IVC)
* Que mesure-t-il ? La pression exercée par d'autres secteurs industriels sur une même ressource minérale.
* Comment l'interpréter ? Un IVC élevé indique une forte compétition pour accéder à ce minerai, augmentant le risque de pénurie.
* Que mesure-t-il ? La pression exercée par d'autres secteurs industriels sur une même ressource minérale.
* Comment l'interpréter ? Un IVC élevé indique une forte compétition pour accéder à ce minerai, augmentant le risque de pénurie.
* Où le trouver ? Associé à chaque minerai dans les visualisations et fiches.
[Voir la fiche détaillée](https://fabnum-dev.peccini.fr/app/static/Fiches/Criticit%C3%A9s/Fiche%20technique%20IVC.pdf)
### Indice de Criticité de Substituabilité (ICS)
* Que mesure-t-il ? La possibilité de remplacer un minerai par un autre dans la fabrication d'un composant. Plus rarement, il matérialise la capacité de remplacer un procédé.
* Comment l'interpréter ? Un ICS élevé signifie qu'il est difficile de trouver une alternative à ce minerai.
* Comment l'interpréter ? Un ICS élevé signifie qu'il est difficile de trouver une alternative à ce minerai.
* Où le trouver ? Associé à la relation entre composants et minerais.
[Voir la fiche détaillée](https://fabnum-dev.peccini.fr/app/static/Fiches/Criticit%C3%A9s/Fiche%20technique%20ICS.pdf)
### Indice de Stabilité Géopolitique (ISG)
* Que mesure-t-il ? La stabilité politique, économique et sociale d'un pays, basée sur trois sous-indicateurs.
* Comment l'interpréter ? Un ISG élevé indique un pays instable, ce qui augmente les risques d'approvisionnement.
* Que mesure-t-il ? La stabilité politique, économique et sociale d'un pays, basée sur trois sous-indicateurs.
* Comment l'interpréter ? Un ISG élevé indique un pays instable, ce qui augmente les risques d'approvisionnement.
* Où le trouver ? Utilisé pour pondérer les risques identifiés tout au long de la chaîne.
[Voir la fiche détaillée](https://fabnum-dev.peccini.fr/app/static/Fiches/Criticit%C3%A9s/Fiche%20technique%20ISG.pdf)
@ -77,17 +69,12 @@ L'application utilise quatre indices clés pour évaluer les vulnérabilités da
L'application est organisée en quatre onglets principaux, chacun offrant une perspective différente sur la chaîne de fabrication numérique :
* Onglet Personnalisation : Créer et gérer des produits finaux personnalisés pour des analyses spécifiques.
* À noter : Les produits personnalisés sont temporaires par défaut, mais peuvent être sauvegardés pour une utilisation ultérieure.
* À noter : Les produits personnalisés sont temporaires par défaut, mais peuvent être sauvegardés pour une utilisation ultérieure.
* Onglet Analyse : Explorer visuellement les relations entre les différents niveaux de la chaîne de fabrication.
* Exemple d'utilisation : Pour comprendre les vulnérabilités liées aux composants d'un smartphone, sélectionnez « Produit final » comme niveau de départ, « Composant » comme niveau d'arrivée, puis spécifiez « Smartphone » comme item de produit final.
* Onglet IA'nalyse : Obtenez un rapport synthétique circonstancié d'une partie du schéma global.
* Cet onglet n'est accessible qu'aux personnes diposant d'un compte.
* Onglet Plan d'action : Visualiser toutes les criticités d'une chaîne Produit final <-> Composant <-> Minerai.
* À partir d'une sélection, toutes les chaînes critiques sont examinées à la loupe.
* Des propositions d'actions et d'indicateurs sont fournies selon les opérations et leur niveau de criticité.
* Exemple d'utilisation : Pour comprendre les vulnérabilités liées aux composants d'un smartphone, sélectionnez « Produit final » comme niveau de départ, « Composant » comme niveau d'arrivée, puis spécifiez « Smartphone » comme item de produit final.
* Onglet Visualisations : Observer les corrélations entre les différents indices et comprendre les tendances globales.
* Indicateurs clés : Portez attention aux points situés dans les zones de haute valeur pour les deux indices, car ils représentent les vulnérabilités les plus critiques.
* Indicateurs clés : Portez attention aux points situés dans les zones de haute valeur pour les deux indices, car ils représentent les vulnérabilités les plus critiques.
* Onglet Fiches : Accéder à des informations détaillées sur chaque opération et minerai.
* À explorer : Les fiches contiennent souvent des informations qui ne sont pas visibles directement dans les graphiques, comme des tendances historiques ou des prévisions futures ; n'hésitez pas à les consulter
* À explorer : Les fiches contiennent souvent des informations qui ne sont pas visibles directement dans les graphiques, comme des tendances historiques ou des prévisions futures ; n'hésitez pas à les consulter
En début de chacun des onglets, vous trouverez un mini-guide spécifique sur leur utilisation.
En début de chacun des onglets, vous trouverez un mini-guide spécifique sur leur utilisation.

View File

@ -144,72 +144,70 @@ def selectionner_noeuds(
depart_labels = {n: G.nodes[n].get("label", n) for n in depart_nodes}
arrivee_labels = {n: G.nodes[n].get("label", n) for n in arrivee_nodes}
# Mapping inverse Label -> ID pour retrouver les IDs sélectionnés
depart_labels_inverse = {v: k for k, v in depart_labels.items()}
arrivee_labels_inverse = {v: k for k, v in arrivee_labels.items()}
# Mapping inverse Label -> tous les IDs (un label peut correspondre à N nœuds)
depart_par_label: dict[str, list[str]] = {}
for node_id, label in depart_labels.items():
depart_par_label.setdefault(label, []).append(node_id)
arrivee_par_label: dict[str, list[str]] = {}
for node_id, label in arrivee_labels.items():
arrivee_par_label.setdefault(label, []).append(node_id)
# DEPARTS -------------------------------------
if "analyse_noeuds_depart" not in st.session_state:
anciens_departs = []
# Persistance par labels (pas par IDs) pour éviter l'explosion des doublons
if "analyse_labels_depart" not in st.session_state:
anciens = []
i = 0
while True:
val = get_champ_statut(f"pages.analyse.filter_start_nodes.{i}")
if not val:
break
anciens_departs.append(val)
anciens.append(val)
i += 1
st.session_state["analyse_noeuds_depart"] = anciens_departs
st.session_state["analyse_labels_depart"] = anciens
# Afficher les labels mais stocker les IDs
selected_labels_depart = st.multiselect(
str(_("pages.analyse.filter_start_nodes")),
sorted(depart_labels.values()),
default=[depart_labels.get(n, n) for n in st.session_state["analyse_noeuds_depart"] if n in depart_labels],
key="analyse_noeuds_depart_labels"
sorted(depart_par_label),
default=[lb for lb in st.session_state["analyse_labels_depart"] if lb in depart_par_label],
key="analyse_labels_depart_widget"
)
# Convertir les labels sélectionnés en IDs
departs_selection = [depart_labels_inverse.get(label, label) for label in selected_labels_depart]
st.session_state["analyse_noeuds_depart"] = departs_selection
st.session_state["analyse_labels_depart"] = selected_labels_depart
supprime_champ_statut("pages.analyse.filter_start_nodes")
if departs_selection:
for i, val in enumerate(departs_selection):
maj_champ_statut(f"pages.analyse.filter_start_nodes.{i}", val)
for i, val in enumerate(selected_labels_depart):
maj_champ_statut(f"pages.analyse.filter_start_nodes.{i}", val)
# Expansion label → tous les IDs pour le Sankey
departs_selection = [nid for lb in selected_labels_depart for nid in depart_par_label[lb]]
# ARRIVEES -------------------------------------
if "analyse_noeuds_arrivee" not in st.session_state:
anciens_arrivees = []
if "analyse_labels_arrivee" not in st.session_state:
anciens = []
i = 0
while True:
val = get_champ_statut(f"pages.analyse.filter_end_nodes.{i}")
if not val:
break
anciens_arrivees.append(val)
anciens.append(val)
i += 1
st.session_state["analyse_noeuds_arrivee"] = anciens_arrivees
st.session_state["analyse_labels_arrivee"] = anciens
# Afficher les labels mais stocker les IDs
selected_labels_arrivee = st.multiselect(
str(_("pages.analyse.filter_end_nodes")),
sorted(arrivee_labels.values()),
default=[arrivee_labels.get(n, n) for n in st.session_state["analyse_noeuds_arrivee"] if n in arrivee_labels],
key="analyse_noeuds_arrivee_labels"
sorted(arrivee_par_label),
default=[lb for lb in st.session_state["analyse_labels_arrivee"] if lb in arrivee_par_label],
key="analyse_labels_arrivee_widget"
)
# Convertir les labels sélectionnés en IDs
arrivees_selection = [arrivee_labels_inverse.get(label, label) for label in selected_labels_arrivee]
st.session_state["analyse_noeuds_arrivee"] = arrivees_selection
st.session_state["analyse_labels_arrivee"] = selected_labels_arrivee
supprime_champ_statut("pages.analyse.filter_end_nodes")
if arrivees_selection:
for i, val in enumerate(arrivees_selection):
maj_champ_statut(f"pages.analyse.filter_end_nodes.{i}", val)
for i, val in enumerate(selected_labels_arrivee):
maj_champ_statut(f"pages.analyse.filter_end_nodes.{i}", val)
departs_selection = departs_selection if departs_selection else None
arrivees_selection = arrivees_selection if arrivees_selection else None
# Expansion label → tous les IDs pour le Sankey
arrivees_selection = [nid for lb in selected_labels_arrivee for nid in arrivee_par_label[lb]]
return departs_selection, arrivees_selection
return departs_selection or None, arrivees_selection or None
def configurer_filtres_vulnerabilite() -> tuple[bool, bool, bool, str, bool, str]:
"""Interface pour configurer les filtres de vulnérabilité.

View File

@ -1,5 +1,6 @@
import logging
import tempfile
from pathlib import Path
import networkx as nx
import pandas as pd
@ -298,10 +299,14 @@ def filtrer_chemins_par_criteres(
# Appliquer la logique de filtrage
if logique_filtrage == "ET":
keep = True
if filtrer_ihh: keep = keep and has_ihh
if filtrer_ivc: keep = keep and has_ivc
if filtrer_ics: keep = keep and has_ics
if filtrer_isg: keep = keep and has_isg_critique
if filtrer_ihh:
keep = keep and has_ihh
if filtrer_ivc:
keep = keep and has_ivc
if filtrer_ics:
keep = keep and has_ics
if filtrer_isg:
keep = keep and has_isg_critique
if keep:
chemins_filtres.add(tuple(chemin))
elif logique_filtrage == "OU":
@ -461,28 +466,28 @@ def creer_graphique_sankey(
fig = go.Figure(go.Sankey(
arrangement="snap",
node=dict(
pad=10,
thickness=8,
label=sorted_nodes,
x=[niveaux.get(n, 99) / 100 for n in sorted_nodes],
color=[couleur_noeud(n, niveaux, G) for n in sorted_nodes],
customdata=customdata,
hovertemplate="%{customdata}<extra></extra>"
),
link=dict(
source=sources,
target=targets,
value=values,
color=df_liens["color"].tolist(),
customdata=link_customdata,
hovertemplate="%{customdata}<extra></extra>",
line=dict(
width=1, # Set fixed width to 3 pixels (or use 2 if preferred)
color="grey"
),
arrowlen=10
)
node={
"pad": 10,
"thickness": 8,
"label": sorted_nodes,
"x": [niveaux.get(n, 99) / 100 for n in sorted_nodes],
"color": [couleur_noeud(n, niveaux, G) for n in sorted_nodes],
"customdata": customdata,
"hovertemplate": "%{customdata}<extra></extra>"
},
link={
"source": sources,
"target": targets,
"value": values,
"color": df_liens["color"].tolist(),
"customdata": link_customdata,
"hovertemplate": "%{customdata}<extra></extra>",
"line": {
"width": 1, # Set fixed width to 3 pixels (or use 2 if preferred)
"color": "grey"
},
"arrowlen": 10
}
))
fig.update_layout(
@ -526,7 +531,7 @@ def exporter_graphe_filtre(
write_dot(G_export, f.name)
dot_path = f.name
with open(dot_path, encoding="utf-8") as f:
with Path(dot_path).open(encoding="utf-8") as f:
st.download_button(
label=str(_("pages.analyse.sankey.download_dot")),
data=f.read(),
@ -546,10 +551,13 @@ def afficher_sankey(
Args:
G: Le graphe NetworkX contenant les données des produits.
niveau_depart, niveau_arrivee: Les niveaux initiaux pour le filtrage.
noeuds_depart, noeuds_arrivee: Les nœuds initiaux pour le filtrage.
niveau_depart: Le niveau de départ pour le filtrage.
niveau_arrivee: Le niveau d'arrivée pour le filtrage.
noeuds_depart: Les nœuds de départ pour le filtrage.
noeuds_arrivee: Les nœuds d'arrivée pour le filtrage.
minerais: La liste des minerais à inclure dans le filtrage.
filtrer_ics, filtrer_ivc: Les booléens pour le filtrage ICS et IVC.
filtrer_ics: Booléen pour activer le filtrage ICS.
filtrer_ivc: Booléen pour activer le filtrage IVC.
filtrer_ihh: Le booléen pour le filtrage IHH.
ihh_type: Le type d'application pour les IHH (Pays ou Acteur).
filtrer_isg: Le booléen pour le filtrage ISG.

View File

@ -10,8 +10,8 @@ Toutes ces fonctions gèrent la conversion et le rendu de contenu Markdown
vers du HTML structuré avec des mathématiques, respectant les règles RGAA.
"""
import os
import re
from pathlib import Path
import markdown
import pypandoc
@ -59,8 +59,7 @@ def remplacer_latex_par_mathml(markdown_text: str) -> str:
return f"<code>Erreur LaTeX inline: {e}</code>"
markdown_text = re.sub(r"\$\$(.*?)\$\$", remplacer_bloc_display, markdown_text, flags=re.DOTALL)
markdown_text = re.sub(r"(?<!\$)\$(.+?)\$(?!\$)", remplacer_bloc_inline, markdown_text, flags=re.DOTALL)
return markdown_text
return re.sub(r"(?<!\$)\$(.+?)\$(?!\$)", remplacer_bloc_inline, markdown_text, flags=re.DOTALL)
def markdown_to_html_rgaa(markdown_text: str, caption_text: str|None) -> str:
"""Convertit un texte Markdown en HTML structuré accessible.
@ -74,7 +73,7 @@ def markdown_to_html_rgaa(markdown_text: str, caption_text: str|None) -> str:
"""
html = markdown.markdown(markdown_text, extensions=['tables'])
soup = BeautifulSoup(html, "html.parser")
for i, table in enumerate(soup.find_all("table"), start=1):
for _, table in enumerate(soup.find_all("table"), start=1):
table["role"] = "table"
table["summary"] = caption_text
if caption_text:
@ -174,18 +173,18 @@ def generer_fiche(md_source: str, dossier: str, nom_fichier: str, seuils: dict)
contenu_md = render_fiche_markdown(md_source, seuils, license_path="assets/licence.md")
md_path = os.path.join("Fiches", dossier, nom_fichier)
os.makedirs(os.path.dirname(md_path), exist_ok=True)
with open(md_path, "w", encoding="utf-8") as f:
md_path = Path("Fiches") / dossier / nom_fichier
md_path.parent.mkdir(parents=True, exist_ok=True)
with md_path.open("w", encoding="utf-8") as f:
f.write(contenu_md)
# Génération automatique du PDF
pdf_dir = os.path.join("static", "Fiches", dossier)
os.makedirs(pdf_dir, exist_ok=True)
pdf_dir = Path("static") / "Fiches" / dossier
pdf_dir.mkdir(parents=True, exist_ok=True)
# Construire le chemin PDF correspondant (même nom que .md, mais .pdf)
nom_pdf = os.path.splitext(nom_fichier)[0] + ".pdf"
pdf_path = os.path.join(pdf_dir, nom_pdf)
nom_pdf = Path(nom_fichier).stem + ".pdf"
pdf_path = pdf_dir / nom_pdf
try:
pypandoc.convert_file(
@ -205,10 +204,10 @@ def generer_fiche(md_source: str, dossier: str, nom_fichier: str, seuils: dict)
html_output = rendu_html(contenu_md)
html_dir = os.path.join("HTML", dossier)
os.makedirs(html_dir, exist_ok=True)
html_path = os.path.join(html_dir, os.path.splitext(nom_fichier)[0] + ".html")
with open(html_path, "w", encoding="utf-8") as f:
html_dir = Path("HTML") / dossier
html_dir.mkdir(parents=True, exist_ok=True)
html_path = html_dir / (Path(nom_fichier).stem + ".html")
with html_path.open("w", encoding="utf-8") as f:
f.write("\n".join(html_output))
return html_path

View File

@ -1,5 +1,5 @@
# === Constantes et imports ===
import os
from pathlib import Path
import requests
import streamlit as st
@ -85,8 +85,8 @@ def interface_fiches() -> None:
else:
SEUILS = st.session_state["seuils"]
nom_fiche = os.path.splitext(fiche_choisie)[0]
html_path = os.path.join("HTML", dossier_choisi, nom_fiche + ".html")
nom_fiche = Path(fiche_choisie).stem
html_path = Path("HTML") / dossier_choisi / (nom_fiche + ".html")
path_relative = f"Documents/{dossier_choisi}/{fiche_choisie}"
commits_url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path={path_relative}&sha={ENV}"
@ -101,16 +101,16 @@ def interface_fiches() -> None:
if regenerate:
html_path = generer_fiche(md_source, dossier_choisi, fiche_choisie, SEUILS)
with open(html_path, encoding="utf-8") as f:
with Path(html_path).open(encoding="utf-8") as f:
st.markdown(f.read(), unsafe_allow_html=True)
from utils.persistance import get_champ_statut
if not get_champ_statut("login") == "":
if get_champ_statut("login") != "":
pdf_name = nom_fiche + ".pdf"
pdf_path = os.path.join("static", "Fiches", dossier_choisi, pdf_name)
pdf_path = Path("static") / "Fiches" / dossier_choisi / pdf_name
if os.path.exists(pdf_path):
with open(pdf_path, "rb") as pdf_file:
if pdf_path.exists():
with pdf_path.open("rb") as pdf_file:
st.download_button(
label=str(_("pages.fiches.download_pdf")),
data=pdf_file,

View File

@ -2,6 +2,7 @@
# Ce module gère à la fois les fiches d'assemblage ET de fabrication.
import re
from pathlib import Path
import streamlit as st
import yaml
@ -57,13 +58,13 @@ def build_production_sections(md: str) -> str:
produit_data = yaml_data[produit_key]
pays_data = []
for pays_key, pays_info in produit_data.items():
for _pays_key, pays_info in produit_data.items():
nom_pays = pays_info.get('nom_du_pays', '')
part_marche_pays = pays_info.get('part_de_marche', '0%')
part_marche_num = float(part_marche_pays.strip('%'))
acteurs = []
for acteur_key, acteur_info in pays_info.get('acteurs', {}).items():
for _acteur_key, acteur_info in pays_info.get('acteurs', {}).items():
nom_acteur = acteur_info.get('nom_de_l_acteur', '')
part_marche_acteur = acteur_info.get('part_de_marche', '0%')
pays_origine = acteur_info.get('pays_d_origine', '')
@ -123,7 +124,7 @@ def build_production_sections(md: str) -> str:
# Charger le contenu de la fiche technique IHH
try:
# Essayer de lire le fichier depuis le système de fichiers
with open(FICHES_CRITICITE["IHH"], encoding="utf-8") as f:
with Path(FICHES_CRITICITE["IHH"]).open(encoding="utf-8") as f:
ihh_content = f.read()
# Chercher la section IHH correspondant au schéma et au type de fiche
@ -152,6 +153,4 @@ def build_production_sections(md: str) -> str:
st.error(f"Erreur lors de la lecture/traitement de la fiche IHH: {e}")
# Supprimer le bloc YAML du markdown final
md_modifie = md_modifie.replace(yaml_block_full, "")
return md_modifie
return md_modifie.replace(yaml_block_full, "")

View File

@ -23,20 +23,19 @@ def _pairs_dataframe(md: str) -> pd.DataFrame:
def _fill(segment: str, pair: dict) -> str:
segment = _normalize_unicode(segment)
for k, v in pair.items():
val = f"{v:.2f}" if isinstance(v, (int, float)) else str(v)
val = f"{v:.2f}" if isinstance(v, int | float) else str(v)
segment = re.sub(
rf"{{{{\s*{re.escape(k)}\s*}}}}",
val,
segment,
flags=re.I,
)
segment = re.sub(
return re.sub(
r"ICS\s*=\s*[-+]?\d+(?:\.\d+)?",
f"ICS = {pair['ics']:.2f}",
segment,
count=1,
)
return segment
def _segments(md: str):
blocs = list(PAIR_RE.finditer(md))
@ -75,7 +74,7 @@ def build_dynamic_sections(md_raw: str) -> str:
3. Produire une synthèse finale avec l'analyse critique par composant.
Args:
md (str): Contenu brut du fichier Markdown contenant les structures YAML à analyser.
md_raw (str): Contenu brut du fichier Markdown contenant les structures YAML à analyser.
Returns:
str: Le markdown enrichi des tableaux de donnée analysés, ou le contenu original inchangé si aucun bloc structuré n'est trouvé.

View File

@ -16,7 +16,7 @@ def _synth_isg(md: str) -> str:
lignes = ["| Pays | WGI | FSI | NDGAIN | ISG |", "| :-- | :-- | :-- | :-- | :-- |"]
sorted_pays = sorted(yaml_data.items(), key=lambda x: x[1]['pays'].lower())
for identifiant, data in sorted_pays:
for _identifiant, data in sorted_pays:
pays = data['pays']
wgi_ps = data['wgi_ps']
fsi = data['fsi']
@ -56,11 +56,9 @@ def build_isg_sections(md: str) -> str:
flags=re.S
)
md_final = re.sub(
return re.sub(
r"# Criticité par pays\s*\n```yaml[\s\S]*?```\s*",
"# Criticité par pays\n\n",
md_final,
flags=re.S
)
return md_final

View File

@ -73,11 +73,9 @@ def build_ivc_sections(md: str) -> str:
md_final = "\n\n".join(segments)
# Remplacer la section du tableau final
md_final = re.sub(
return re.sub(
r"## Tableau de synthèse\s*\n<!---- AUTO-BEGIN:TABLEAU-FINAL -->.*?<!---- AUTO-END:TABLEAU-FINAL -->",
f"## Tableau de synthèse\n<!---- AUTO-BEGIN:TABLEAU-FINAL -->\n{synth_table}\n<!---- AUTO-END:TABLEAU-FINAL -->",
md_final,
flags=re.S
)
return md_final

View File

@ -1,4 +1,5 @@
import re
from pathlib import Path
import streamlit as st
import yaml
@ -50,7 +51,7 @@ def _build_extraction_tableau(md: str, produit: str) -> str:
# Préparer les données pour l'affichage
pays_data = []
for code_pays, pays_info in extraction_data.items():
for _code_pays, pays_info in extraction_data.items():
# Trier les acteurs par part de marché décroissante
acteurs_tries = sorted(pays_info['acteurs'], key=lambda x: x['part_marche_num'], reverse=True)
@ -88,15 +89,13 @@ def _build_extraction_tableau(md: str, produit: str) -> str:
tableau_final = "\n".join(lignes_tableau)
# Remplacer la section du tableau dans la fiche
md_modifie = re.sub(
return re.sub(
r"<!---- AUTO-BEGIN:TABLEAU-EXTRACTION -->.*?<!---- AUTO-END:TABLEAU-EXTRACTION -->",
f"<!---- AUTO-BEGIN:TABLEAU-EXTRACTION -->\n{tableau_final}\n<!---- AUTO-END:TABLEAU-EXTRACTION -->",
md,
flags=re.DOTALL
)
return md_modifie
def _build_traitement_tableau(md: str, produit: str) -> str:
"""Génère le tableau de traitement pour les fiches de minerai."""
# Identifier la section de traitement
@ -171,7 +170,7 @@ def _build_traitement_tableau(md: str, produit: str) -> str:
# Préparer les données pour l'affichage
pays_data = []
for code_pays, pays_info in traitement_data.items():
for _code_pays, pays_info in traitement_data.items():
# Trier les acteurs par part de marché décroissante
acteurs_tries = sorted(pays_info['acteurs'], key=lambda x: x['part_marche_num'], reverse=True)
@ -209,15 +208,13 @@ def _build_traitement_tableau(md: str, produit: str) -> str:
tableau_final = "\n".join(lignes_tableau)
# Remplacer la section du tableau dans la fiche
md_modifie = re.sub(
return re.sub(
r"<!---- AUTO-BEGIN:TABLEAU-TRAITEMENT -->.*?<!---- AUTO-END:TABLEAU-TRAITEMENT -->",
f"<!---- AUTO-BEGIN:TABLEAU-TRAITEMENT -->\n{tableau_final}\n<!---- AUTO-END:TABLEAU-TRAITEMENT -->",
md,
flags=re.DOTALL
)
return md_modifie
def _build_reserves_tableau(md: str, produit: str) -> str:
"""Génère le tableau des réserves pour les fiches de minerai."""
# Identifier la section des réserves
@ -263,18 +260,15 @@ def _build_reserves_tableau(md: str, produit: str) -> str:
tableau_final = "\n".join(lignes_tableau)
# Remplacer la section du tableau dans la fiche
md_modifie = re.sub(
return re.sub(
r"<!---- AUTO-BEGIN:TABLEAU-RESERVES -->.*?<!---- AUTO-END:TABLEAU-RESERVES -->",
f"<!---- AUTO-BEGIN:TABLEAU-RESERVES -->\n{tableau_final}\n<!---- AUTO-END:TABLEAU-RESERVES -->",
md,
flags=re.DOTALL
)
return md_modifie
def build_minerai_ivc_section(md: str) -> str:
"""Ajoute les informations IVC depuis la fiche technique IVC.md pour un minerai spécifique.
"""
"""Ajoute les informations IVC depuis la fiche technique IVC.md pour un minerai spécifique."""
# Extraire le type de fiche et le produit depuis l'en-tête YAML
type_fiche = None
produit = None
@ -295,8 +289,8 @@ def build_minerai_ivc_section(md: str) -> str:
# Injecter les informations IVC depuis la fiche technique
try:
# Charger le contenu de la fiche technique IVC
ivc_path = "Fiches/Criticités/Fiche technique IVC.md"
with open(ivc_path, encoding="utf-8") as f:
ivc_path = Path("Fiches/Criticités/Fiche technique IVC.md")
with ivc_path.open(encoding="utf-8") as f:
ivc_content = f.read()
# Chercher la section correspondant au minerai
@ -332,8 +326,7 @@ def build_minerai_ivc_section(md: str) -> str:
return md
def build_minerai_ics_section(md: str) -> str:
"""Ajoute les informations ICS depuis la fiche technique ICS.md pour un minerai spécifique.
"""
"""Ajoute les informations ICS depuis la fiche technique ICS.md pour un minerai spécifique."""
# Extraire le type de fiche et le produit depuis l'en-tête YAML
type_fiche = None
produit = None
@ -354,8 +347,8 @@ def build_minerai_ics_section(md: str) -> str:
# Injecter les informations ICS depuis la fiche technique
try:
# Charger le contenu de la fiche technique ICS
ics_path = "Fiches/Criticités/Fiche technique ICS.md"
with open(ics_path, encoding="utf-8") as f:
ics_path = Path("Fiches/Criticités/Fiche technique ICS.md")
with ics_path.open(encoding="utf-8") as f:
ics_content = f.read()
# Extraire la section ICS pour le minerai
@ -389,8 +382,9 @@ def build_minerai_ics_section(md: str) -> str:
return md
def build_minerai_ics_composant_section(md: str) -> str:
"""Ajoute les informations ICS pour tous les composants liés à un minerai spécifique
depuis la fiche technique ICS.md, en augmentant d'un niveau les titres.
"""Ajoute les informations ICS pour tous les composants liés à un minerai spécifique.
Depuis la fiche technique ICS.md, en augmentant d'un niveau les titres.
"""
# Extraire le type de fiche et le produit depuis l'en-tête YAML
type_fiche = None
@ -412,8 +406,8 @@ def build_minerai_ics_composant_section(md: str) -> str:
# Injecter les informations ICS depuis la fiche technique
try:
# Charger le contenu de la fiche technique ICS
ics_path = "Fiches/Criticités/Fiche technique ICS.md"
with open(ics_path, encoding="utf-8") as f:
ics_path = Path("Fiches/Criticités/Fiche technique ICS.md")
with ics_path.open(encoding="utf-8") as f:
ics_content = f.read()
# Rechercher toutes les sections de composants liés au minerai
@ -514,8 +508,8 @@ def build_minerai_sections(md: str) -> str:
# Injecter les sections IHH depuis la fiche technique
try:
# Charger le contenu de la fiche technique IHH
ihh_path = "Fiches/Criticités/Fiche technique IHH.md"
with open(ihh_path, encoding="utf-8") as f:
ihh_path = Path("Fiches/Criticités/Fiche technique IHH.md")
with ihh_path.open(encoding="utf-8") as f:
ihh_content = f.read()
# D'abord, extraire toute la section concernant le produit
@ -580,6 +574,4 @@ def build_minerai_sections(md: str) -> str:
md = build_minerai_ics_section(md)
# Ajouter les informations ICS pour les composants liés au minerai
md = build_minerai_ics_composant_section(md)
return md
return build_minerai_ics_composant_section(md)

View File

@ -1,4 +1,4 @@
"""fiche_utils.py  outils de lecture / rendu des fiches Markdown (indices et opérations)
"""fiche_utils.py  outils de lecture / rendu des fiches Markdown (indices et opérations).
Dépendances :
pip install python-frontmatter pyyaml jinja2
@ -12,10 +12,10 @@ Usage :
from __future__ import annotations
import os
import pathlib
import re
from datetime import datetime, timezone
from pathlib import Path
import frontmatter
import jinja2
@ -135,7 +135,7 @@ def fichier_plus_recent(
bool: True si le fichier est plus récent, False sinon.
"""
try:
modif = datetime.fromtimestamp(os.path.getmtime(chemin_fichier), tz=timezone.utc)
modif = datetime.fromtimestamp(Path(chemin_fichier).stat().st_mtime, tz=timezone.utc)
return modif > reference
except Exception:
return False
@ -159,10 +159,10 @@ def doit_regenerer_fiche(
Returns:
bool: True si la fiche doit être regénérée, False sinon.
"""
if not os.path.exists(html_path):
if not Path(html_path).exists():
return True
local_mtime = datetime.fromtimestamp(os.path.getmtime(html_path), tz=timezone.utc)
local_mtime = datetime.fromtimestamp(Path(html_path).stat().st_mtime, tz=timezone.utc)
remote_mtime = recuperer_date_dernier_commit(commit_url)
if remote_mtime is None or remote_mtime > local_mtime:

View File

@ -2,7 +2,7 @@
import csv
import json
import os
from pathlib import Path
import requests
import streamlit as st
@ -43,11 +43,11 @@ def charger_fiches_et_labels():
dict: Dictionnaire au format {nom_fiche: {"operations": [str], "item": str}}.
Retourne un dict vide en cas d'erreur.
"""
chemin_csv = os.path.join("assets", "fiches_labels.csv")
chemin_csv = Path("assets") / "fiches_labels.csv"
dictionnaire_fiches = {}
try:
with open(chemin_csv, encoding="utf-8") as fichier_csv:
with chemin_csv.open(encoding="utf-8") as fichier_csv:
lecteur = csv.DictReader(fichier_csv)
for ligne in lecteur:
fiche = ligne.get("Fiche")
@ -97,13 +97,13 @@ def rechercher_tickets_gitea(fiche_selectionnee):
if not cible:
return []
labels_cibles = set([cible["item"]])
labels_cibles = {cible["item"]}
tickets_associes = []
for issue in issues:
if issue.get("ref") != f"refs/heads/{ENV}":
continue
issue_labels = set(label.get("name", "") for label in issue.get("labels", []))
issue_labels = {label.get("name", "") for label in issue.get("labels", [])}
if labels_cibles.issubset(issue_labels):
tickets_associes.append(issue)
@ -137,7 +137,7 @@ def nettoyer_labels(labels):
Returns:
list[str]: Liste triee de labels uniques et non vides.
"""
return sorted(set(l.strip() for l in labels if isinstance(l, str) and l.strip()))
return sorted({lbl.strip() for lbl in labels if isinstance(lbl, str) and lbl.strip()})
def construire_corps_ticket_markdown(reponses):
@ -172,6 +172,4 @@ def creer_ticket_gitea(titre, corps, labels):
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues"
reponse = gitea_request("post", url, headers={"Content-Type": "application/json"}, data=json.dumps(data))
if not reponse:
return False
return True
return bool(reponse)

View File

@ -124,7 +124,7 @@ def gerer_previsualisation_et_soumission(reponses, labels, selected_ops, cible):
if st.button(str(_("pages.fiches.tickets.confirm"))):
labels_existants = get_labels_existants()
labels_ids = [labels_existants[l] for l in final_labels if l in labels_existants]
labels_ids = [labels_existants[lbl] for lbl in final_labels if lbl in labels_existants]
if "Backlog" in labels_existants:
labels_ids.append(labels_existants["Backlog"])

View File

@ -112,7 +112,7 @@ def afficher_carte_ticket(ticket):
created = ticket.get("created_at", "")
updated = ticket.get("updated_at", "")
body = ticket.get("body", "")
labels = [l["name"] for l in ticket.get("labels", []) if "name" in l]
labels = [lbl["name"] for lbl in ticket.get("labels", []) if "name" in lbl]
sujet = ""
match = re.search(r"## Sujet de la proposition\s+(.+?)(\n|$)", body, re.DOTALL)

View File

@ -56,8 +56,6 @@ def selectionner_minerais(
Optional[List[str]]: La liste des minerais si une sélection a été effectuée,
- None sinon
"""
minerais_selection = None
st.markdown(f"## {str(_('pages.ia_nalyse.select_minerals'))}")
# Tous les nœuds de niveau 2 (minerai)
minerais_nodes = sorted([
@ -65,14 +63,12 @@ def selectionner_minerais(
if d.get("niveau") and int(str(d.get("niveau")).strip('"')) == 2
])
minerais_selection = st.multiselect(
return st.multiselect(
str(_("pages.ia_nalyse.filter_by_minerals")),
minerais_nodes,
key="analyse_minerais"
)
return minerais_selection
def selectionner_noeuds(
G: nx.DiGraph,

View File

@ -18,6 +18,4 @@ def interface_personnalisation(G):
G = ajouter_produit(G)
G = modifier_produit(G)
G = importer_exporter_graph(G)
return G
return importer_exporter_graph(G)

View File

@ -1,14 +1,12 @@
#!/usr/bin/env python3
#
import networkx as nx
"""Script pour générer un rapport factorisé des vulnérabilités critiques.
"""
Script pour générer un rapport factorisé des vulnérabilités critiques
suivant la structure définie dans Remarques.md.
Suit la structure définie dans Remarques.md.
"""
import uuid
import networkx as nx
import streamlit as st
from networkx.drawing.nx_agraph import write_dot

View File

@ -1,4 +1,5 @@
import re
from pathlib import Path
def parse_chains_md(filepath: str) -> tuple[dict, dict, dict, list, dict, dict]:
@ -28,7 +29,7 @@ def parse_chains_md(filepath: str) -> tuple[dict, dict, dict, list, dict, dict]:
current_section = None
in_section = False
with open(filepath, encoding="utf-8") as f:
with Path(filepath).open(encoding="utf-8") as f:
for raw_line in f:
line = raw_line.strip()
if not in_section:
@ -105,7 +106,7 @@ def parse_chains_md(filepath: str) -> tuple[dict, dict, dict, list, dict, dict]:
descriptions[current_section] += raw_line
# Parse detailed sections from the complete file
with open(filepath, encoding="utf-8") as f:
with Path(filepath).open(encoding="utf-8") as f:
content = f.read()
# Extract sections using regex patterns

View File

@ -1,3 +1,5 @@
from pathlib import Path
import streamlit as st
import yaml
@ -99,7 +101,7 @@ def initialiser_seuils(config_path: str) -> dict:
seuils = {}
try:
with open(config_path, encoding="utf-8") as f:
with Path(config_path).open(encoding="utf-8") as f:
config = yaml.safe_load(f)
seuils = config.get("seuils", seuils)
except FileNotFoundError:

View File

@ -1,7 +1,7 @@
import streamlit as st
def afficher_bloc_ihh_isg(titre, ihh, isg, details_content="", ui = True) -> str|None:
def afficher_bloc_ihh_isg(titre, _ihh, _isg, details_content="", ui = True) -> str|None:
"""Affiche un bloc detaille IHH/ISG avec vulnerabilite, tableaux et graphique."""
contenu_bloc = ""
if ui:
@ -86,7 +86,7 @@ def afficher_bloc_ihh_isg(titre, ihh, isg, details_content="", ui = True) -> str
return None
def afficher_section_avec_tableau(lines, section_start, section_end=None):
"""Affiche une section contenant un tableau"""
"""Affiche une section contenant un tableau."""
in_section = False
table_lines = []
@ -104,11 +104,11 @@ def afficher_section_avec_tableau(lines, section_start, section_end=None):
break
if table_lines:
contenu = '\n'.join(table_lines)
return contenu
return '\n'.join(table_lines)
return None
def afficher_section_texte(lines, section_start, section_end_marker=None):
"""Affiche le texte d'une section sans les tableaux"""
"""Affiche le texte d'une section sans les tableaux."""
in_section = False
contenu_md = []
@ -121,8 +121,7 @@ def afficher_section_texte(lines, section_start, section_end_marker=None):
if in_section and line.strip() and not line.strip().startswith('|'):
contenu_md.append(line + '\n')
contenu = '\n'.join(contenu_md)
return contenu
return '\n'.join(contenu_md)
def afficher_description(titre, description, ui = True) -> str|None:
"""Affiche ou retourne la description d'un element du plan d'action.
@ -157,17 +156,11 @@ def afficher_description(titre, description, ui = True) -> str|None:
break
continue
# Arrêter aux titres de sections ou tableaux
if (line.startswith('####') or
line.startswith('|') or
line.startswith('**Unité')):
if line.startswith(('####', '|', '**Unité')):
break
description_lines.append(line)
if description_lines:
# Rejoindre les lignes en un seul paragraphe
contenu_md = ' '.join(description_lines)
else:
contenu_md = "Description non disponible"
contenu_md = ' '.join(description_lines) if description_lines else "Description non disponible"
else:
contenu_md = "Description non disponible"
@ -179,7 +172,7 @@ def afficher_description(titre, description, ui = True) -> str|None:
contenu_bloc += contenu_md
return contenu_bloc
def afficher_caracteristiques_minerai(minerai, mineraux_data, details_content="", ui = True) -> str|None:
def afficher_caracteristiques_minerai(_minerai, _mineraux_data, details_content="", ui = True) -> str|None:
"""Affiche les caracteristiques generales d'un minerai avec indices ICS et IVC.
Presente la vulnerabilite combinee ICS-IVC, puis les sections detaillees ICS
@ -265,3 +258,7 @@ def afficher_caracteristiques_minerai(minerai, mineraux_data, details_content=""
return None
contenu_bloc += contenu_md
return contenu_bloc
if not ui:
return contenu_bloc
return None

View File

@ -97,9 +97,7 @@ def analyser_chaines(chaines: list[dict], produits: dict, composants: dict, mine
seuil_poids = resultats[top_n - 1]["poids_total"]
# Inclure tous ceux dont le poids est égal au seuil
top_resultats = [r for r in resultats if r["poids_total"] >= seuil_poids]
return top_resultats
return [r for r in resultats if r["poids_total"] >= seuil_poids]
def tableau_de_bord(chains: list[dict], produits: dict, composants: dict, mineraux: dict, seuils: dict):
"""Affiche le tableau de bord interactif pour selectionner et analyser une chaine.
@ -237,7 +235,7 @@ def afficher_criticites(produits: dict, composants: dict, mineraux: dict, sel_pr
""")
def afficher_explications_et_details(
couleur_A, poids_A, couleur_F, poids_F, couleur_T, poids_T, couleur_E, poids_E, couleur_M, poids_M,
couleur_A, poids_A, couleur_F, poids_F, couleur_T, _poids_T, couleur_E, poids_E, couleur_M, poids_M,
produits, composants, mineraux, sel_prod, sel_comp, sel_miner,
couleur_A_ihh, couleur_A_isg, couleur_F_ihh, couleur_F_isg, couleur_T_ihh, couleur_T_isg,couleur_E_ihh, couleur_E_isg, couleur_M_ics, couleur_M_ivc, ui = True) -> str|None:
"""Affiche les explications detaillees des indices et ponderations pour chaque operation."""
@ -277,12 +275,12 @@ def afficher_explications_et_details(
return None
return contenu_md
def afficher_preconisations_et_indicateurs_generiques(niveau_criticite: dict, poids_A: int, poids_F: int, poids_T: int, poids_E: int, poids_M: int, ui: bool = True) -> tuple[str|None,str|None]:
def afficher_preconisations_et_indicateurs_generiques(niveau_criticite: dict, _poids_A: int, _poids_F: int, _poids_T: int, _poids_E: int, _poids_M: int, ui: bool = True) -> tuple[str|None,str|None]:
"""Affiche les preconisations et indicateurs generiques selon le niveau de criticite global."""
contenu_md_left = "### Préconisations :\n\n"
contenu_md_left += "Mise en œuvre : \n"
for niveau, contenu in PRECONISATIONS.items():
for niveau, _contenu in PRECONISATIONS.items():
if niveau in niveau_criticite:
contenu_md_left += f"* {colorer_couleurs(niveau)}\n"
for p in PRECONISATIONS[niveau]:
@ -291,7 +289,7 @@ def afficher_preconisations_et_indicateurs_generiques(niveau_criticite: dict, po
contenu_md_right = "### Indicateurs :\n\n"
contenu_md_right += "Mise en œuvre : \n"
for niveau, contenu in INDICATEURS.items():
for niveau, _contenu in INDICATEURS.items():
if niveau in niveau_criticite:
contenu_md_right += f"* {colorer_couleurs(niveau)}\n"
for p in INDICATEURS[niveau]:
@ -311,7 +309,7 @@ def afficher_preconisations_specifiques(operation: str, niveau_criticite_operati
"""Genere les preconisations specifiques a une operation selon son niveau de criticite."""
contenu_md = "#### Préconisations :\n\n"
contenu_md += "Mise en œuvre : \n"
for niveau, contenu in PRECONISATIONS[operation].items():
for niveau, _contenu in PRECONISATIONS[operation].items():
if niveau in niveau_criticite_operation[operation]:
contenu_md += f"* {colorer_couleurs(niveau)}\n"
for p in PRECONISATIONS[operation][niveau]:
@ -322,7 +320,7 @@ def afficher_indicateurs_specifiques(operation: str, niveau_criticite_operation:
"""Genere les indicateurs specifiques a une operation selon son niveau de criticite."""
contenu_md = "#### Indicateurs :\n\n"
contenu_md += "Mise en œuvre : \n"
for niveau, contenu in INDICATEURS[operation].items():
for niveau, _contenu in INDICATEURS[operation].items():
if niveau in niveau_criticite_operation[operation]:
contenu_md += f"* {colorer_couleurs(niveau)}\n"
for p in INDICATEURS[operation][niveau]:
@ -398,7 +396,6 @@ def afficher_details_operations(produits, composants, mineraux, sel_prod, sel_co
def initialiser_interface(filepath: str, config_path: str = "assets/config.yaml") -> None:
"""Point d'entree principal pour l'interface du plan d'action : charge donnees et affiche toutes sections."""
produits, composants, mineraux, chains, descriptions, details_sections = parse_chains_md(filepath)
if not chains:

View File

@ -1,17 +1,13 @@
# batch_ia/__init__.py
# config.py
from .utils.config import TEMPLATE_PATH, load_config, session_uuid, TEMP_SECTIONS
from .utils.config import TEMP_SECTIONS, TEMPLATE_PATH, load_config, session_uuid
# files.py
from .utils.files import write_report
# graphs.py
from .utils.graphs import (
parse_graphs,
extract_data_from_graph,
calculate_vulnerabilities
)
from .utils.graphs import calculate_vulnerabilities, extract_data_from_graph, parse_graphs
# sections.py
from .utils.sections import generate_report

View File

@ -1,44 +1,19 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour générer un rapport factorisé des vulnérabilités critiques
"""Script pour générer un rapport factorisé des vulnérabilités critiques
suivant la structure définie dans Remarques.md.
"""
import sys
from pathlib import Path
import streamlit as st
from utils.config import (
TEMP_SECTIONS,
TEMPLATE_PATH, session_uuid,
load_config
)
from utils.files import (
write_report
)
from utils.graphs import (
parse_graphs,
extract_data_from_graph,
calculate_vulnerabilities
)
from utils.sections import (
generate_report
)
from utils.sections_utils import (
nettoyer_texte_fr
)
from utils.ia import (
ingest_document,
ia_analyse,
supprimer_fichiers,
generer_rapport_final
)
from utils.config import TEMP_SECTIONS, TEMPLATE_PATH, load_config, session_uuid
from utils.files import write_report
from utils.graphs import calculate_vulnerabilities, extract_data_from_graph, parse_graphs
from utils.ia import generer_rapport_final, ia_analyse, ingest_document, supprimer_fichiers
from utils.sections import generate_report
from utils.sections_utils import nettoyer_texte_fr
def main(dot_path, output_path):

View File

@ -1,7 +1,7 @@
import time
import subprocess
from batch_utils import charger_status, sauvegarder_status, JOBS_DIR
import time
from batch_utils import JOBS_DIR, charger_status, sauvegarder_status
while True:
status = charger_status()

View File

@ -1,8 +1,10 @@
import json
import time
from pathlib import Path
from networkx.drawing.nx_agraph import write_dot
import streamlit as st
from networkx.drawing.nx_agraph import write_dot
from utils.translations import _
BATCH_DIR = Path(__file__).resolve().parent

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
"""
Script de nettoyage pour PrivateGPT
"""Script de nettoyage pour PrivateGPT
Ce script permet de lister et supprimer les documents ingérés dans PrivateGPT.
Options:
@ -10,17 +9,17 @@ Options:
- Supprimer tous les documents
"""
import json
import re
import requests
import time
from typing import List, Dict, Any, Optional
from typing import Any
import requests
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
def list_documents() -> List[Dict[str, Any]]:
def list_documents() -> list[dict[str, Any]]:
"""Liste tous les documents ingérés et renvoie la liste des métadonnées"""
try:
# Récupérer la liste des documents
@ -61,20 +60,17 @@ def delete_document(doc_id: str) -> bool:
response = requests.delete(f"{API_URL}/ingest/{doc_id}")
if response.status_code == 200:
return True
else:
print(f"⚠️ Échec de la suppression de l'ID {doc_id}: Code {response.status_code}")
return False
print(f"⚠️ Échec de la suppression de l'ID {doc_id}: Code {response.status_code}")
return False
except Exception as e:
print(f"❌ Erreur lors de la suppression de l'ID {doc_id}: {e}")
return False
def delete_documents_by_criteria(pattern) -> int:
"""
Supprime des documents selon différents critères
"""Supprime des documents selon différents critères
Retourne le nombre de documents supprimés
"""
documents = list_documents()
if not documents or not pattern:
@ -88,7 +84,7 @@ def delete_documents_by_criteria(pattern) -> int:
try:
regex = re.compile(pattern)
docs_to_delete = [doc for doc in documents if regex.search(doc["filename"])]
except re.error as e:
except re.error:
return 0
# Supprimer les documents

View File

@ -1,7 +1,9 @@
import os
import yaml
from pathlib import Path
import uuid
from pathlib import Path
import yaml
def init_uuid():
if not TEMP_SECTIONS.exists():
@ -48,17 +50,15 @@ def load_config(thresholds_path=THRESHOLDS_PATH):
config = {}
# Charger les seuils
if os.path.exists(thresholds_path):
with open(thresholds_path, 'r', encoding='utf-8') as f:
with open(thresholds_path, encoding='utf-8') as f:
thresholds = yaml.safe_load(f)
config['thresholds'] = thresholds.get('seuils', {})
return config
def determine_threshold_color(value, index_type, thresholds):
"""
Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
"""Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
Utilise les seuils de config.yaml si disponibles.
"""
# Récupérer les seuils pour cet indice
if index_type in thresholds:
index_thresholds = thresholds[index_type]
@ -67,12 +67,12 @@ def determine_threshold_color(value, index_type, thresholds):
index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]:
suffix = get_suffix_for_index(index_type, "vert")
return "Vert", suffix
elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \
if "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \
index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \
index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]:
suffix = get_suffix_for_index(index_type, "orange")
return "Orange", suffix
elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \
if "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \
index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]:
suffix = get_suffix_for_index(index_type, "rouge")
return "Rouge", suffix

View File

@ -1,17 +1,15 @@
import os
import re
from .config import (
CORPUS_DIR
)
from .config import CORPUS_DIR
def strip_prefix(name):
"""Supprime le préfixe numérique éventuel d'un nom de fichier ou de dossier."""
return re.sub(r'^\d+[-_ ]*', '', name).lower()
def find_prefixed_directory(pattern, base_path=None):
"""
Recherche un sous-répertoire dont le nom (sans préfixe) correspond au pattern.
"""Recherche un sous-répertoire dont le nom (sans préfixe) correspond au pattern.
Args:
pattern: Nom du répertoire sans préfixe
@ -38,8 +36,7 @@ def find_prefixed_directory(pattern, base_path=None):
return None
def find_corpus_file(pattern, base_path=None):
"""
Recherche récursive dans le corpus d'un fichier en ignorant les préfixes numériques dans les dossiers et fichiers.
"""Recherche récursive dans le corpus d'un fichier en ignorant les préfixes numériques dans les dossiers et fichiers.
Args:
pattern: Chemin relatif type "sous-dossier/nom-fichier"
@ -48,7 +45,6 @@ def find_corpus_file(pattern, base_path=None):
Returns:
Chemin relatif du fichier trouvé ou None
"""
if base_path:
search_path = os.path.join(CORPUS_DIR, base_path)
else:
@ -83,8 +79,7 @@ def find_corpus_file(pattern, base_path=None):
def read_corpus_file(file_path, remove_first_title=False, shift_titles=0):
"""
Lit un fichier du corpus et applique les transformations demandées.
"""Lit un fichier du corpus et applique les transformations demandées.
Args:
file_path: Chemin relatif du fichier dans le corpus
@ -101,7 +96,7 @@ def read_corpus_file(file_path, remove_first_title=False, shift_titles=0):
return f"Fichier non trouvé: {file_path}"
# # print(f"Lecture du fichier: {full_path}")
with open(full_path, 'r', encoding='utf-8') as f:
with open(full_path, encoding='utf-8') as f:
lines = f.readlines()
# Supprimer la première ligne si c'est un titre et si demandé
@ -124,7 +119,6 @@ def read_corpus_file(file_path, remove_first_title=False, shift_titles=0):
def write_report(report, fichier):
"""Écrit le rapport généré dans le fichier spécifié."""
report = re.sub(r'<!----.*?-->', '', report)
report = re.sub(r'\n\n\n+', '\n\n', report)

View File

@ -1,15 +1,13 @@
import os
import sys
from networkx.drawing.nx_agraph import read_dot
from .config import (
REFERENCE_GRAPH_PATH,
determine_threshold_color, get_weight_for_color
)
from .config import REFERENCE_GRAPH_PATH, determine_threshold_color, get_weight_for_color
def parse_graphs(graphe_path):
"""
Charge et analyse les graphes DOT (analyse et référence).
"""Charge et analyse les graphes DOT (analyse et référence).
"""
print(graphe_path)
# Charger le graphe à analyser
@ -69,8 +67,7 @@ def parse_graphs(graphe_path):
sys.exit(1)
def extract_data_from_graph(graph, ref_graph):
"""
Extrait toutes les données pertinentes des graphes DOT.
"""Extrait toutes les données pertinentes des graphes DOT.
"""
data = {
"products": {}, # Produits finaux (N0)
@ -411,8 +408,7 @@ def extract_data_from_graph(graph, ref_graph):
return data
def calculate_vulnerabilities(data, config):
"""
Calcule les vulnérabilités combinées pour toutes les opérations et minerais.
"""Calcule les vulnérabilités combinées pour toutes les opérations et minerais.
"""
thresholds = config.get('thresholds', {})
results = {

View File

@ -1,20 +1,15 @@
import re
from pathlib import Path
import requests
import json
import re
import time
import zipfile
from pathlib import Path
import requests
import streamlit as st
from nettoyer_pgpt import delete_documents_by_criteria
from nettoyer_pgpt import (
delete_documents_by_criteria
)
from utils.config import API_URL, PROMPT_METHODOLOGIE, TEMP_SECTIONS, session_uuid
from utils.config import (
TEMP_SECTIONS,
session_uuid,
API_URL, PROMPT_METHODOLOGIE
)
def ingest_document(file_path: Path) -> bool:
"""Ingère un document dans PrivateGPT"""
@ -83,9 +78,8 @@ def generate_text(input_file, full_prompt, system_message, temperature = "0.3",
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
@ -270,7 +264,7 @@ def supprimer_fichiers(session_uuid):
for temp_file in TEMP_SECTIONS.glob(f"*{session_uuid}*.md"):
temp_file.unlink()
return True
except:
except Exception:
return False
def generer_rapport_final(rapport, analyse, resultat):

View File

@ -1,30 +1,17 @@
import os
import re
from utils.logger import setup_logger
logger = setup_logger(__name__)
from .config import (
CORPUS_DIR,
TEMPLATE_PATH,
determine_threshold_color
)
from .config import CORPUS_DIR, TEMPLATE_PATH, determine_threshold_color
from .files import find_corpus_file, find_prefixed_directory, read_corpus_file, write_report
from .sections_utils import extraire_sections_par_mot_cle, trouver_dossier_composant
from .files import (
find_prefixed_directory,
find_corpus_file,
write_report,
read_corpus_file
)
from .sections_utils import (
trouver_dossier_composant,
extraire_sections_par_mot_cle
)
def generate_introduction_section(data):
"""
Génère la section d'introduction du rapport.
"""Génère la section d'introduction du rapport.
"""
products = [p["label"] for p in data["products"].values()]
components = [c["label"] for c in data["components"].values()]
@ -41,8 +28,7 @@ def generate_introduction_section(data):
return "\n".join(template)
def generate_methodology_section():
"""
Génère la section méthodologie du rapport.
"""Génère la section méthodologie du rapport.
"""
template = []
template.append("## Méthodologie d'analyse des risques\n")
@ -198,8 +184,7 @@ def generate_methodology_section():
return "\n".join(template)
def generate_operations_section(data, results, config):
"""
Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement).
"""Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement).
"""
# # print("DEBUG: Génération de la section des opérations")
# # print(f"DEBUG: Nombre de produits: {len(data['products'])}")
@ -376,8 +361,7 @@ def generate_operations_section(data, results, config):
return result
def generate_minerals_section(data, results, config):
"""
Génère la section détaillant les minerais et leurs opérations d'extraction et traitement.
"""Génère la section détaillant les minerais et leurs opérations d'extraction et traitement.
"""
template = []
template.append("## Détails des minerais\n")
@ -574,8 +558,7 @@ def generate_minerals_section(data, results, config):
return "\n".join(template)
def generate_critical_paths_section(data, results):
"""
Génère la section des chemins critiques.
"""Génère la section des chemins critiques.
"""
template = []
template.append("## Chemins critiques\n")
@ -713,8 +696,7 @@ def slugify(text):
return re.sub(r'\W+', '-', text.strip()).strip('-').lower()
def generate_report(data, results, config):
"""
Génère le rapport complet structuré selon les spécifications.
"""Génère le rapport complet structuré selon les spécifications.
"""
# Titre principal
report_titre = ["# Évaluation des vulnérabilités critiques\n"]

View File

@ -1,15 +1,13 @@
import os
import re
from pathlib import Path
from collections import defaultdict
from pathlib import Path
from .config import CORPUS_DIR
from .config import (
CORPUS_DIR
)
def composant_match(nom_composant, nom_dossier):
"""
Vérifie si le nom du composant correspond approximativement à un nom de dossier (lettres et chiffres dans le même ordre).
"""Vérifie si le nom du composant correspond approximativement à un nom de dossier (lettres et chiffres dans le même ordre).
"""
def clean(s):
return ''.join(c.lower() for c in s if c.isalnum())
@ -22,8 +20,7 @@ def composant_match(nom_composant, nom_dossier):
return all(c in it for c in cleaned_comp)
def trouver_dossier_composant(nom_composant, base_path, prefixe):
"""
Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant.
"""Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant.
"""
search_path = os.path.join(CORPUS_DIR, base_path)
if not os.path.exists(search_path):
@ -36,8 +33,7 @@ def trouver_dossier_composant(nom_composant, base_path, prefixe):
return None
def extraire_sections_par_mot_cle(fichier_markdown: Path) -> dict:
"""
Extrait les sections de niveau 3 uniquement dans la section
"""Extrait les sections de niveau 3 uniquement dans la section
'## Chaînes avec risque critique' du fichier Markdown,
et les regroupe par mot-clé (ce qui se trouve entre '### ' et '').
Réduit chaque titre dun niveau (#).

View File

@ -1,14 +1,19 @@
import streamlit as st
import requests
import logging
import os
from utils.translations import _
from pathlib import Path
import requests
import streamlit as st
from utils.persistance import get_champ_statut, maj_champ_statut
from utils.translations import _
def initialiser_logger():
LOG_FILE_PATH = "/var/log/fabnum-auth.log"
if not os.path.exists(os.path.dirname(LOG_FILE_PATH)):
os.makedirs(os.path.dirname(LOG_FILE_PATH), exist_ok=True)
"""Initialise et retourne le logger d'authentification."""
LOG_FILE_PATH = Path("/var/log/fabnum-auth.log")
if not LOG_FILE_PATH.parent.exists():
LOG_FILE_PATH.parent.mkdir(parents=True, exist_ok=True)
logger = logging.getLogger("auth_logger")
logger.setLevel(logging.INFO)
@ -20,6 +25,7 @@ def initialiser_logger():
return logger
def connexion():
"""Affiche le formulaire de connexion et authentifie l'utilisateur via Gitea."""
login = get_champ_statut("login")
if login == "":
auth_title = str(_("auth.title"))
@ -40,7 +46,7 @@ def connexion():
# Ajout d'un champ identifiant fictif pour activer l'autocomplétion navigateur
# et permettre de stocker le token comme un mot de passe par le navigateur
# L'identifiant n'est donc pas utilisé par la suite ; il est caché en CSS
identifiant = st.text_input(str(_("auth.username")), value="fabnum-connexion", key="nom_utilisateur")
_identifiant = st.text_input(str(_("auth.username")), value="fabnum-connexion", key="nom_utilisateur")
token = st.text_input(str(_("auth.token")), type="password")
submitted = st.form_submit_button(str(_("auth.login")), icon=":material/login:")
@ -87,8 +93,9 @@ def connexion():
def bouton_deconnexion():
"""Affiche le bouton de déconnexion dans la barre latérale."""
login = get_champ_statut("login")
if not login == "":
if login != "":
auth_title = str(_("auth.title"))
st.html(f"""
<section role="region" aria-label="region-authentification">

View File

@ -1,8 +1,10 @@
import streamlit as st
from utils.translations import _
def afficher_pied_de_page():
"""Affiche le pied de page avec les mentions légales et crédits."""
st.markdown("""
<section role="region" aria-label="Contenu principal" id="main-content">
""", unsafe_allow_html=True)

View File

@ -1,10 +1,12 @@
import streamlit as st
from config import ENV
from utils.translations import _
from utils.persistance import get_session_id
from utils.translations import _
def afficher_entete():
"""Affiche l'en-tête de l'application avec le titre et le sous-titre."""
header = f"""
<header role="banner" aria-labelledby="entete-header">
<div class='wide-header'>

View File

@ -1,10 +1,13 @@
import streamlit as st
from components.connexion import connexion, bouton_deconnexion
import streamlit.components.v1 as components
from utils.translations import _
from components.connexion import bouton_deconnexion, connexion
from utils.persistance import get_champ_statut, maj_champ_statut
from utils.translations import _
def afficher_menu():
"""Affiche le menu de navigation et les options de thème dans la barre latérale."""
with st.sidebar:
st.markdown(f"""
<nav role="navigation" aria-label="{str(_('sidebar.menu'))}">
@ -23,7 +26,7 @@ def afficher_menu():
str(_("navigation.instructions")),
str(_("navigation.personnalisation")),
str(_("navigation.analyse")),
*([str(_("navigation.ia_nalyse"))] if not get_champ_statut("login") == "" else []),
*([str(_("navigation.ia_nalyse"))] if get_champ_statut("login") != "" else []),
*([str(_("navigation.plan_d_action"))]),
str(_("navigation.visualisations")),
str(_("navigation.fiches"))
@ -80,7 +83,7 @@ def afficher_menu():
connexion()
if not get_champ_statut("login") == "":
if get_champ_statut("login") != "":
bouton_deconnexion()
# === RERUN SI BESOIN ===
@ -97,6 +100,7 @@ def afficher_menu():
# sudo chcon -Rt httpd_sys_content_t /chemin/d/acces/assets/
#
def afficher_impact(total_bytes):
"""Affiche le widget d'impact environnemental CO2 dans la barre latérale."""
impact_label = str(_("sidebar.impact"))
loading_text = str(_("sidebar.loading"))

View File

@ -1,4 +1,5 @@
import os
import streamlit as st
from dotenv import load_dotenv
@ -7,6 +8,7 @@ load_dotenv(".env.local", override=True)
# Fonction pour déterminer l'environnement à partir de l'en-tête X-Environment
def determine_environment():
"""Détermine l'environnement d'exécution à partir de l'en-tête Nginx X-Environment."""
# Valeur par défaut (si aucun en-tête n'est détecté)
environment = "dev"
@ -42,7 +44,7 @@ FICHE_ISG = os.getenv("FICHE_ISG")
# Optionnel : vérification + fallback
for key, value in [("FICHE_IHH", FICHE_IHH), ("FICHE_ICS", FICHE_ICS), ("FICHE_IVC", FICHE_IVC), ("FICHE_ISG", FICHE_ISG)]:
if not value:
raise EnvironmentError(f"Variable d'environnement '{key}' non définie.")
raise OSError(f"Variable d'environnement '{key}' non définie.")
FICHES_CRITICITE = {
"IHH": FICHE_IHH,

View File

@ -0,0 +1,204 @@
# Audit qualite, securite et simplicite - FabNum
Date : 2 mars 2026
Objectif : Bilan de sante complet du projet
## Contexte
FabNum est une application Streamlit (Python 3.14) d'analyse de chaines de valeur numeriques.
Le projet comprend ~11 200 lignes de code Python reparties sur 6 modules applicatifs,
7 utilitaires, 4 composants UI et des scripts auxiliaires (IA, batch, generation).
### Etat actuel
| Metrique | Valeur |
|---|---|
| Securite (Bandit) | 0 vulnerabilite |
| Dependances (pip-audit --local) | 0 vulnerabilite |
| Qualite (ruff) | 907 erreurs |
| Tests | 67 tests, 100% passent |
| Couverture | 16% |
| Bugs averes | 1 (F821 nom indefini) |
| CLAUDE.md | Absent |
## Plan par phases
### Phase 1 : Critique (bugs et erreurs dangereuses)
Objectif : Corriger les problemes qui causent ou masquent des bugs.
| Regle | Nb | Description |
|---|---|---|
| F821 | 1 | Nom indefini `ingested_section_ids` dans scripts/generer_analyse.py:471 |
| E722 | 5 | `except:` nu (masque toutes les exceptions y compris KeyboardInterrupt) |
| B904 | 1 | `raise` sans `from` dans un except (perte de contexte d'erreur) |
| W605 | 1 | Sequence d'echappement invalide dans une chaine |
Total : 8 erreurs. Effort : ~30 minutes.
Verification : `ruff check . --exclude venv,pgpt,.git --select F821,E722,B904,W605`
### Phase 2 : Nettoyage automatique (ruff --fix)
Objectif : Appliquer les 330 corrections automatiques sans risque.
Commande : `ruff check . --exclude venv,pgpt,.git --fix`
Principales corrections :
- W293 (125) : Espaces sur lignes vides
- I001 (41) : Imports non tries
- D212 (38) : Docstrings multi-lignes mal formatees
- F401 (27) : Imports inutilises
- UP006 (27) : Annotations type obsoletes (List -> list, Dict -> dict)
- RET505 (16) : Else superflu apres return
- W291 (15) : Espaces en fin de ligne
- Autres (41) : UP009, UP015, F541, D202, W292, UP007, UP024, SIM114, SIM300, UP012
Verification apres fix : relancer les 67 tests pour confirmer aucune regression.
### Phase 3 : Qualite manuelle (577 erreurs restantes)
Objectif : Ameliorer la qualite du code manuellement, par sous-chantiers.
#### 3a. Modernisation pathlib (~220 erreurs)
Remplacer les appels `os.path.*` et `os.makedirs` par `pathlib.Path`.
Regles : PTH118, PTH123, PTH110, PTH208, PTH103, PTH120, PTH122, PTH112,
PTH100, PTH119, PTH204, PTH207, PTH113.
Approche : traiter fichier par fichier, en commencant par utils/ puis app/.
#### 3b. Nommage (88 erreurs)
N803 : noms d'arguments invalides.
Decision a prendre : `G` (convention NetworkX pour les graphes) est utilise partout.
Options :
- Ignorer N803 globalement pour les arguments nommes `G`, `G_temp`, `G_temp_ivc`
- Renommer en `graph` partout (refactoring lourd)
Recommandation : ajouter `G` a la liste des noms acceptes dans la config ruff.
#### 3c. Documentation (90 erreurs)
- D103 (35) : Fonctions publiques sans docstring
- D415 (26) : Docstrings sans ponctuation finale
- D200 (14) : Docstrings multi-lignes inutiles
- D205 (12) : Ligne vide manquante apres resume de docstring
- D417 (2) : Parametres non documentes
- D301 (1) : Sequence d'echappement dans docstring
#### 3d. Code inutilise (~53 erreurs)
- B007 (30) : Variables de boucle inutilisees (remplacer par `_`)
- ARG001 (11) : Arguments de fonction inutilises
- ARG002 (7) : Arguments de methode inutilises
- F841 (5) : Variables locales inutilisees
#### 3e. Imports (46 erreurs)
- E402 (29) : Imports pas en haut de fichier
Note : certains sont voulus dans Streamlit (imports apres st.set_page_config)
Decision : ajouter `# noqa: E402` pour les cas legitimes
- UP035 (17) : Imports deprecies (typing.List -> list, etc.)
#### 3f. Simplifications (~59 erreurs)
- RET504 (23) : Assignation inutile avant return
- SIM108 (7) : If/else remplacable par expression ternaire
- SIM105 (6) : Try/except remplacable par contextlib.suppress
- SIM102 (5) : If imbriques collapsibles
- SIM201 (4) : Negation de comparaison (not x == y -> x != y)
- RET503 (4) : Return implicite
- Autres (10) : C401, C408, C405, E701, E741, PIE810, SIM103, SIM117, SIM210, UP038
### Phase 4 : Couverture de tests (16% -> 40%)
Objectif : Couvrir les modules critiques qui sont a 0%.
Priorite haute :
- utils/persistance.py (0%, 112 lignes) : coeur de la gestion de session
- app/fiches/utils/dynamic/minerai/minerai.py (4%, 585 lignes) : module le plus gros
- app/fiches/generer.py (12%, 113 lignes) : generation de fiches
Priorite moyenne :
- app/personnalisation/ (0%, ~253 lignes)
- app/plan_d_action/ (0%, ~67 lignes interface)
- app/visualisations/ (0%, ~133 lignes)
Priorite basse :
- app/ia_nalyse/ (0%, 114 lignes) : depend d'un service externe
- utils/translations.py (28%)
- utils/visualisation.py (0%)
Note : les modules Streamlit sont difficiles a tester unitairement.
Strategie : tester la logique metier en l'isolant de l'interface Streamlit.
### Phase 5 : Simplification / Refactoring
Objectif : Decouper les gros fichiers pour ameliorer la maintenabilite.
Candidats :
- minerai.py (~585 lignes) : decouper par type de section (header, body, indices, etc.)
- modification.py (~341 lignes) : separer logique de donnees et interface
- fabnum.py (217 lignes) : extraire `get_total_bytes_for_session` et `charger_theme`
dans un module `utils/session.py` ou similaire
### Phase bonus : Documentation projet
- Creer un CLAUDE.md avec les conventions du projet
(structure, nommage, outils, commandes de dev, decisions architecturales)
## Ordre d'execution recommande
1. Phase 1 (critique) - a faire immediatement
2. Phase 2 (auto-fix) - dans la foulee
3. Phase 3b (nommage - decision sur `G`) - debloquer avant le reste
4. Phase 3a (pathlib) - le plus gros chantier, fichier par fichier
5. Phase 3d (code inutilise) - rapide
6. Phase 3e (imports) - rapide
7. Phase 3f (simplifications) - au fil de l'eau
8. Phase 3c (documentation) - au fil de l'eau
9. Phase 4 (tests) - en parallele des phases 3
10. Phase 5 (refactoring) - une fois les tests en place
11. Phase bonus (CLAUDE.md) - a tout moment
## Criteres de succes
| Metrique | Avant | Objectif | Resultat |
| --- | --- | --- | --- |
| Erreurs ruff | 907 | < 50 | **0** |
| Couverture tests | 16% | >= 40% | **35%** (448 tests) |
| Bugs averes | 1 | 0 | **0** |
| CLAUDE.md | Absent | Present | **Present** |
## Resultats de l'audit (2 mars 2026)
### Phase 1 : COMPLETE
- 8 erreurs critiques corrigees (F821, E722, B904, W605)
### Phase 2 : COMPLETE
- 330 corrections automatiques appliquees avec ruff --fix
### Phase 3 : COMPLETE
- 907 -> 0 erreurs ruff
- Migration complete vers pathlib (PTH)
- Ajout N803 aux ignores (convention NetworkX G)
- Corrections manuelles via 5 agents paralleles (RET, SIM, B007, D*, PTH)
### Phase 4 : COMPLETE (35% au lieu de 40% vise)
- 67 -> 448 tests (+381 tests, x6.7)
- 14 modules a 100% de couverture
- Modules non-testes : principalement interface Streamlit (difficile a tester unitairement)
- La couverture restante concerne des modules UI fortement couples a Streamlit
### Phase 5 : ANALYSEE (refactoring deporte)
- Candidats identifies : minerai.py (577L), modification.py (341L), fabnum.py (212L)
- Decision : refactoring deporte car fortement couple a Streamlit, risque eleve pour benefice limite dans le cadre de l'audit
- Recommandation : refactorer incrementalement lors des evolutions fonctionnelles
### Bonus : COMPLETE
- CLAUDE.md cree avec conventions du projet

View File

@ -1,7 +1,9 @@
import utils.persistance
utils.persistance.update_session_paths()
import streamlit as st
from utils.persistance import get_champ_statut, get_session_id
st.set_page_config(
@ -12,23 +14,19 @@ st.set_page_config(
)
import re
from pathlib import Path
# Configuration Gitea
from config import INSTRUCTIONS, ENV
from utils.gitea import (
charger_instructions_depuis_gitea
)
from config import ENV, INSTRUCTIONS
from utils.gitea import charger_instructions_depuis_gitea
# Import du module de traductions
from utils.translations import init_translations, _, set_language
from utils.translations import _, init_translations, set_language
from utils.widgets import html_expander
def afficher_instructions_avec_expanders(markdown_content):
"""
Affiche le contenu markdown avec les sections de niveau 2 (## Titre) dans des expanders
"""
"""Affiche le contenu markdown avec les sections de niveau 2 dans des expanders."""
# Extraction du titre principal (niveau 1)
titre_pattern = r'^# (.+)$'
titre_match = re.search(titre_pattern, markdown_content, re.MULTILINE)
@ -62,28 +60,20 @@ def afficher_instructions_avec_expanders(markdown_content):
contenu_section += "\n\n" + lignes[1].strip()
# Affichage dans un expander
status = True if i == 1 else False
status = i == 1
# with st.expander(f"## {titre_section}", expanded=status):
html_expander(f"{titre_section}", content=contenu_section, open_by_default=status, details_class="details_introduction")
from utils.graph_utils import (
charger_graphe
)
from components.sidebar import (
afficher_menu,
afficher_impact
)
from components.header import afficher_entete
from components.footer import afficher_pied_de_page
from app.fiches import interface_fiches
from app.visualisations import interface_visualisations
from app.personnalisation import interface_personnalisation
from app.analyse import interface_analyse
from app.fiches import interface_fiches
from app.ia_nalyse import interface_ia_nalyse
from app.personnalisation import interface_personnalisation
from app.plan_d_action.interface import interface_plan_d_action
from app.visualisations import interface_visualisations
from components.footer import afficher_pied_de_page
from components.header import afficher_entete
from components.sidebar import afficher_impact, afficher_menu
from utils.graph_utils import charger_graphe
# Initialisation des traductions (langue française par défaut)
init_translations()
@ -100,9 +90,10 @@ set_language("fr")
#
session_id = get_session_id()
def get_total_bytes_for_session(session_id):
"""Calcule le volume total d'octets transférés pour une session Nginx."""
total_bytes = 0
try:
with open(f"/var/log/nginx/fabnum-{ENV}.access.log", "r") as f:
with Path(f"/var/log/nginx/fabnum-{ENV}.access.log").open() as f:
for line in f:
if session_id in line:
match = re.search(r'"GET.*?" \d+ (\d+)', line)
@ -114,17 +105,18 @@ def get_total_bytes_for_session(session_id):
return total_bytes
def charger_theme():
"""Charge et injecte les fichiers CSS du thème sélectionné."""
# Chargement des fichiers CSS (une seule fois)
if "base_css_content" not in st.session_state:
with open("assets/styles/base.css") as f:
with Path("assets/styles/base.css").open() as f:
st.session_state["base_css_content"] = f.read()
if "theme_css_content_light" not in st.session_state:
with open("assets/styles/theme-light.css") as f:
with Path("assets/styles/theme-light.css").open() as f:
st.session_state["theme_css_content_light"] = f.read()
if "theme_css_content_dark" not in st.session_state:
with open("assets/styles/theme-dark.css") as f:
with Path("assets/styles/theme-dark.css").open() as f:
st.session_state["theme_css_content_dark"] = f.read()
# Mappage des noms traduits vers les noms internes
@ -148,6 +140,7 @@ def charger_theme():
st.markdown(f"<style>{st.session_state['base_css_content']}</style>", unsafe_allow_html=True)
def ouvrir_page():
"""Initialise la page avec le thème, l'en-tête et le menu latéral."""
charger_theme()
afficher_entete()
afficher_menu()
@ -156,6 +149,7 @@ def ouvrir_page():
""", unsafe_allow_html=True)
def fermer_page():
"""Ferme les balises HTML de la page et affiche le pied de page."""
st.markdown("</div>", unsafe_allow_html=True)
st.markdown("""</section>""", unsafe_allow_html=True)
st.markdown("</main>", unsafe_allow_html=True)

View File

@ -52,14 +52,16 @@ ignore = [
"D213", # Multi-line docstring summary should start at the second line (conflit avec D212)
"E501", # Line too long (géré par line-length)
"N802", # Function name should be lowercase (streamlit utilise des noms de fonctions variés)
"N803", # Argument name should be lowercase (pour compatibilité avec NetworkX : G)
"N806", # Variable in function should be lowercase (pour compatibilité avec NetworkX)
]
# Fichiers à ignorer pour certaines règles
[tool.ruff.lint.per-file-ignores]
"__init__.py" = ["F401"] # Imports non utilisés dans __init__ sont OK
"tests/**/*.py" = ["D103", "ARG001"] # Pas de docstrings obligatoires dans les tests
"tests/**/*.py" = ["D103", "ARG001", "ARG002", "SIM300"] # Tests: pas de docstrings, mocks @patch inutilisés OK, Yoda conditions OK
"scripts/**/*.py" = ["D"] # Pas de docstrings obligatoires dans les scripts
"fabnum.py" = ["E402"] # Streamlit impose st.set_page_config() avant les imports d'app modules
[tool.ruff.lint.pydocstyle]
# Convention de docstrings (Google style)

View File

@ -1,21 +1,20 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour l'injection automatique de fichiers dans Private GPT.
Ce script scanne un répertoire source et injecte les nouveaux fichiers via l'API de Private GPT.
"""
import os
import argparse
import json
import logging
import sys
import time
import json
import argparse
import logging
import requests
from pathlib import Path
from typing import List, Set, Dict, Any
from datetime import datetime
from pathlib import Path
from typing import Any
import requests
# Configuration du logging
logging.basicConfig(
@ -50,11 +49,11 @@ class PrivateGPTIngestor:
self.processed_file = processed_file
self.processed_files = self._load_processed_files()
def _load_processed_files(self) -> Set[str]:
def _load_processed_files(self) -> set[str]:
"""Charge la liste des fichiers déjà traités."""
try:
if os.path.exists(self.processed_file):
with open(self.processed_file, 'r', encoding='utf-8') as f:
if Path(self.processed_file).exists():
with Path(self.processed_file).open(encoding='utf-8') as f:
return set(json.load(f))
return set()
except Exception as e:
@ -64,13 +63,13 @@ class PrivateGPTIngestor:
def _save_processed_files(self) -> None:
"""Sauvegarde la liste des fichiers déjà traités."""
try:
with open(self.processed_file, 'w', encoding='utf-8') as f:
with Path(self.processed_file).open('w', encoding='utf-8') as f:
json.dump(list(self.processed_files), f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des fichiers traités: {e}")
def scan_directory(self, directory: str, extensions: Set[str] = None,
recursive: bool = True) -> List[str]:
def scan_directory(self, directory: str, extensions: set[str] = None,
recursive: bool = True) -> list[str]:
"""
Scanne un répertoire pour trouver des fichiers à injecter.
@ -121,8 +120,8 @@ class PrivateGPTIngestor:
logger.info(f"Injection du fichier: {file_path}")
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
with Path(file_path).open('rb') as f:
files = {'file': (Path(file_path).name, f)}
response = requests.post(f"{self.api_url}/v1/ingest/file", files=files, timeout=30)
if response.status_code == 200:
@ -130,14 +129,13 @@ class PrivateGPTIngestor:
self.processed_files.add(file_path)
self._save_processed_files()
return True
else:
logger.error(f"Échec de l'injection pour {file_path}: {response.status_code} - {response.text}")
return False
logger.error(f"Échec de l'injection pour {file_path}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Erreur lors de l'injection de {file_path}: {e}")
return False
def list_documents(self) -> List[Dict[str, Any]]:
def list_documents(self) -> list[dict[str, Any]]:
"""
Liste les documents déjà injectés dans Private GPT.
@ -148,14 +146,13 @@ class PrivateGPTIngestor:
response = requests.get(f"{self.api_url}/v1/ingest/list", timeout=10)
if response.status_code == 200:
return response.json()
else:
logger.error(f"Échec de la récupération des documents: {response.status_code} - {response.text}")
return []
logger.error(f"Échec de la récupération des documents: {response.status_code} - {response.text}")
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération des documents: {e}")
return []
def run_ingestion(self, directory: str, extensions: Set[str] = None,
def run_ingestion(self, directory: str, extensions: set[str] = None,
recursive: bool = True, batch_size: int = 5,
delay: float = 2.0) -> None:
"""

View File

@ -1,12 +1,12 @@
import requests
import time
import argparse
import json
import sys
import time
import uuid
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from enum import Enum
from pathlib import Path
import requests
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
@ -108,9 +108,8 @@ def check_api_availability() -> bool:
if response.status_code == 200:
print("✅ API PrivateGPT disponible")
return True
else:
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
except requests.RequestException as e:
print(f"❌ Erreur de connexion à l'API PrivateGPT: {e}")
return False
@ -118,13 +117,10 @@ def check_api_availability() -> bool:
def ingest_document(file_path: Path, session_id: str = "") -> bool:
"""Ingère un document dans PrivateGPT"""
try:
with open(file_path, "rb") as f:
with file_path.open("rb") as f:
# Si un session_id est fourni, l'ajouter au nom du fichier pour le tracking
if session_id:
file_name = f"input_{session_id}_{file_path.name}"
else:
file_name = file_path.name
file_name = f"input_{session_id}_{file_path.name}" if session_id else file_path.name
files = {"file": (file_name, f, "text/markdown")}
# Ajouter des métadonnées pour identifier facilement ce fichier d'entrée
metadata = {
@ -154,11 +150,11 @@ def setup_temp_directory() -> None:
TEMP_DIR.mkdir(parents=True)
print(f"📁 Répertoire temporaire '{TEMP_DIR}' créé")
def save_section_to_file(section: Dict[str, str], index: int, session_uuid: str) -> Path:
def save_section_to_file(section: dict[str, str], index: int, session_uuid: str) -> Path:
"""Sauvegarde une section dans un fichier temporaire et retourne le chemin"""
setup_temp_directory()
section_file = TEMP_DIR / f"temp_section_{session_uuid}_{index+1}_{section['title'].lower().replace(' ', '_')}.md"
# Contenu du fichier avec métadonnées et commentaire explicite
content = (
f"# SECTION TEMPORAIRE GÉNÉRÉE - {section['title']}\n\n"
@ -166,17 +162,17 @@ def save_section_to_file(section: Dict[str, str], index: int, session_uuid: str)
f"UUID de session: {session_uuid}\n\n"
f"{section['output']}"
)
# Écrire dans le fichier
section_file.write_text(content, encoding="utf-8")
return section_file
def ingest_section_files(section_files: List[Path]) -> List[str]:
def ingest_section_files(section_files: list[Path]) -> list[str]:
"""Ingère les fichiers de section et retourne leurs noms de fichiers"""
ingested_file_names = []
for file_path in section_files:
try:
with open(file_path, "rb") as f:
with file_path.open("rb") as f:
files = {"file": (file_path.name, f, "text/markdown")}
# Ajouter des métadonnées pour identifier facilement nos fichiers temporaires
metadata = {
@ -197,7 +193,7 @@ def ingest_section_files(section_files: List[Path]) -> List[str]:
print(f"⚠️ Erreur lors de l'ingestion de '{file_path.name}': {e}")
return ingested_file_names
def get_context(sections: List[Dict[str, str]], strategy: ContextStrategy, max_length: int) -> str:
def get_context(sections: list[dict[str, str]], strategy: ContextStrategy, max_length: int) -> str:
"""Génère le contexte selon la stratégie choisie"""
if not sections or strategy == ContextStrategy.NONE:
return ""
@ -210,8 +206,8 @@ def get_context(sections: List[Dict[str, str]], strategy: ContextStrategy, max_l
context_note = f"NOTE IMPORTANTE: Les sections précédentes ({', '.join(section_names)}) " + \
f"ont été ingérées sous forme de fichiers temporaires avec l'identifiant unique '{session_uuid}'. " + \
f"Utilisez UNIQUEMENT le document '{input_file.name}' et ces sections temporaires pour votre analyse. " + \
f"IGNOREZ tous les autres documents qui pourraient être présents dans la base de connaissances. " + \
f"Assurez la cohérence avec les sections déjà générées pour maintenir la continuité du rapport."
"IGNOREZ tous les autres documents qui pourraient être présents dans la base de connaissances. " + \
"Assurez la cohérence avec les sections déjà générées pour maintenir la continuité du rapport."
print(f"📄 Utilisation de {len(sections)} sections ingérées comme contexte")
return context_note
@ -272,7 +268,9 @@ def get_context(sections: List[Dict[str, str]], strategy: ContextStrategy, max_l
# En cas d'échec, revenir à la stratégie de troncature
return get_context(sections, ContextStrategy.TRUNCATE, max_length)
def generate_text(prompt: str, previous_context: str = "", use_context: bool = True, retry_on_error: bool = True) -> Optional[str]:
return ""
def generate_text(prompt: str, previous_context: str = "", use_context: bool = True, _retry_on_error: bool = True) -> str | None:
"""Génère du texte avec l'API PrivateGPT"""
try:
# Préparer le prompt avec le contexte précédent si disponible et demandé
@ -328,9 +326,8 @@ def generate_text(prompt: str, previous_context: str = "", use_context: bool = T
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
@ -338,7 +335,7 @@ def generate_text(prompt: str, previous_context: str = "", use_context: bool = T
print(f"Détails: {e.response.text}")
return None
def cleanup_temp_files(temp_file_names: List[str] = None, remove_directory: bool = False, session_id: str = "") -> None:
def cleanup_temp_files(temp_file_names: list[str] = None, remove_directory: bool = False, session_id: str = "") -> None:
"""Nettoie les fichiers temporaires et les documents ingérés"""
try:
# Supprimer les fichiers du répertoire temporaire
@ -346,54 +343,50 @@ def cleanup_temp_files(temp_file_names: List[str] = None, remove_directory: bool
for temp_file in TEMP_DIR.glob("*.md"):
temp_file.unlink()
print(f"🗑️ Fichier temporaire supprimé : {temp_file.name}")
# Supprimer le répertoire s'il est vide et si demandé
if remove_directory and not any(TEMP_DIR.iterdir()):
TEMP_DIR.rmdir()
print(f"🗑️ Répertoire temporaire '{TEMP_DIR}' supprimé")
# Supprimer les documents ingérés via l'API de liste et suppression
try:
# Lister tous les documents ingérés
list_response = requests.get(f"{API_URL}/ingest/list", timeout=10)
if list_response.status_code == 200:
documents_data = list_response.json()
# Format de réponse OpenAI
if "data" in documents_data:
documents = documents_data.get("data", [])
# Format alternatif
else:
documents = documents_data.get("documents", [])
deleted_count = 0
# Parcourir les documents et supprimer ceux qui correspondent à nos fichiers temporaires
for doc in documents:
doc_metadata = doc.get("doc_metadata", {})
file_name = doc_metadata.get("file_name", "") or doc_metadata.get("filename", "")
# Vérifier si c'est un de nos fichiers temporaires ou le fichier d'entrée
is_our_file = False
if temp_file_names and file_name in temp_file_names:
if temp_file_names and file_name in temp_file_names or f"temp_section_{session_uuid}_" in file_name or session_id and f"input_{session_id}_" in file_name:
is_our_file = True
elif f"temp_section_{session_uuid}_" in file_name:
is_our_file = True
elif session_id and f"input_{session_id}_" in file_name:
is_our_file = True
if is_our_file:
doc_id = doc.get("doc_id") or doc.get("id")
if doc_id:
delete_response = requests.delete(f"{API_URL}/ingest/{doc_id}", timeout=10)
if delete_response.status_code == 200:
deleted_count += 1
if deleted_count > 0:
print(f"🗑️ {deleted_count} documents supprimés de PrivateGPT")
except Exception as e:
print(f"⚠️ Erreur lors de la suppression des documents ingérés: {e}")
except Exception as e:
print(f"⚠️ Erreur lors du nettoyage des fichiers temporaires: {e}")
@ -405,42 +398,40 @@ def clean_ai_thoughts(text: str) -> str:
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
# Supprimer les lignes vides multiples
text = re.sub(r'\n{3,}', '\n\n', text)
return text
return re.sub(r'\n{3,}', '\n\n', text)
def main(input_path: str, output_path: str, context_strategy: ContextStrategy = ContextStrategy.FILE, context_length: int = MAX_CONTEXT_LENGTH):
"""Fonction principale qui exécute le processus complet"""
global input_file, section_files, session_uuid # Variables globales pour le filtre de contexte et l'UUID
# Générer un UUID unique pour cette session
session_uuid = str(uuid.uuid4())[:8] # Utiliser les 8 premiers caractères pour plus de concision
print(f"🔑 UUID de session généré: {session_uuid}")
# Vérifier la disponibilité de l'API
if not check_api_availability():
sys.exit(1)
# Convertir les chemins en objets Path (accessibles globalement)
input_file = Path(input_path)
input_file = Path(input_path)
output_file = Path(output_path)
# Ingérer le document principal avec l'UUID de session
if not ingest_document(input_file, session_uuid):
sys.exit(1)
# Récupérer la valeur du délai depuis args
delay = args.delay if 'args' in globals() else 5
# Attendre que l'ingestion soit complètement traitée
print(f"⏳ Attente du traitement de l'ingestion pendant {delay} secondes...")
time.sleep(delay)
print(f"🔧 Stratégie de contexte initiale: {context_strategy.value}, taille max: {context_length} caractères")
# Préparer le répertoire pour les fichiers temporaires
setup_temp_directory()
# Générer chaque section du rapport
step_outputs = []
section_files = [] # Chemins des fichiers temporaires
@ -465,13 +456,13 @@ def main(input_path: str, output_path: str, context_strategy: ContextStrategy =
if j >= len(section_files): # Cette section n'a pas encore été sauvegardée
section_file = save_section_to_file(section, j, session_uuid)
section_files.append(section_file)
# Ingérer le fichier si nous utilisons la stratégie FILE
new_ids = ingest_section_files([section_file])
ingested_section_ids.extend(new_ids)
ingested_file_names.extend(new_ids)
# Attendre que l'ingestion soit traitée
print(f"⏳ Attente du traitement de l'ingestion des sections précédentes...")
print("⏳ Attente du traitement de l'ingestion des sections précédentes...")
time.sleep(2)
# Essayer chaque stratégie jusqu'à ce qu'une réussisse
@ -506,11 +497,11 @@ def main(input_path: str, output_path: str, context_strategy: ContextStrategy =
if context_strategy == ContextStrategy.FILE:
section_file = save_section_to_file(step_outputs[-1], len(step_outputs)-1, session_uuid)
section_files.append(section_file)
# Ingérer le fichier
new_file_names = ingest_section_files([section_file])
ingested_file_names.extend(new_file_names)
# Petite pause pour permettre l'indexation
time.sleep(1)
else:
@ -540,7 +531,7 @@ def main(input_path: str, output_path: str, context_strategy: ContextStrategy =
try:
output_file.write_text(report_text, encoding="utf-8")
print(f"\n📄 Rapport final généré dans '{output_file}'")
except IOError as e:
except OSError as e:
print(f"❌ Erreur lors de l'écriture du fichier de sortie: {e}")
# Nettoyer les fichiers temporaires si demandé

View File

@ -17,10 +17,11 @@
"""
import sys
import networkx as nx
from networkx.drawing.nx_agraph import read_dot
import logging
import sys
from pathlib import Path
from networkx.drawing.nx_agraph import read_dot
logging.basicConfig(
filename="beautify_debug.log",
@ -100,11 +101,9 @@ def formater_noeuds_par_niveau(schema, niveau, indentation=4):
relations.append((s, d, attrs))
# Suppression des doublons en convertissant le dictionnaire en tuple trié
relations_sans_doublons = set((a, b, tuple(sorted(c.items()))) for a, b, c in relations)
relations_sans_doublons = {(a, b, tuple(sorted(c.items()))) for a, b, c in relations}
# Reconversion en dictionnaire
relations_finales = [(a, b, dict(c)) for a, b, c in relations_sans_doublons]
return relations_finales
return [(a, b, dict(c)) for a, b, c in relations_sans_doublons]
# Définir les niveaux d'indentation
indent = " " * (indentation + 4)
@ -292,7 +291,7 @@ def generer_rank_same(schema, indentation=4):
sortie = "\n" + indent + "// Alignement des nœuds par niveau\n"
# Trier les niveaux numériquement
for niveau, noeuds in sorted(noeuds_par_niveau.items(), key=lambda x: int(x[0]) if x[0].isdigit() else 99):
for _niveau, noeuds in sorted(noeuds_par_niveau.items(), key=lambda x: int(x[0]) if x[0].isdigit() else 99):
if noeuds: # S'il y a des nœuds pour ce niveau
sortie += indent + "{ rank=same; "
sortie += "; ".join(noeuds)
@ -407,13 +406,13 @@ def main(fichier_entree, fichier_sortie):
"""
try:
with open(f"{fichier_sortie}", "w", encoding="utf-8") as f:
with Path(fichier_sortie).open("w", encoding="utf-8") as f:
print(f"{sortie}", file=f)
except FileNotFoundError:
print(f"Erreur : Le chemin vers '{fichier_sortie}' n'existe pas")
except PermissionError:
print(f"Erreur : Permissions insuffisantes pour écrire dans '{fichier_sortie}'")
except IOError as e:
except OSError as e:
print(f"Erreur d'E/S lors de l'écriture dans le fichier : {e}")
except Exception as e:
print(f"Erreur inattendue : {e}")

View File

@ -17,10 +17,12 @@
"""
import re
import sys
import networkx as nx
from networkx.drawing.nx_pydot import write_dot
import sys
import re
def calcul_ihh(graphe, depart, arrivee):
ihh = 0
@ -42,8 +44,7 @@ def calcul_ihh(graphe, depart, arrivee):
print(ihh_inter)
ihh += ihh_inter
print(ihh)
ihh = int(round(ihh/100))
return ihh
return int(round(ihh/100))
def mettre_a_jour_ihh(graph, noeuds):
for noeud in noeuds:

View File

@ -1,7 +1,7 @@
import base64
import logging
import os
from datetime import datetime, timezone
from pathlib import Path
import requests
import streamlit as st
@ -19,9 +19,8 @@ def lire_fichier_local(nom_fichier):
Returns:
str: Contenu du fichier.
"""
with open(nom_fichier, encoding="utf-8") as f:
contenu_md = f.read()
return contenu_md
with Path(nom_fichier).open(encoding="utf-8") as f:
return f.read()
def charger_instructions_depuis_gitea(nom_fichier="Instructions.md"):
"""Charge le fichier Instructions.md depuis Gitea avec cache local timestamp.
@ -39,8 +38,9 @@ def charger_instructions_depuis_gitea(nom_fichier="Instructions.md"):
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/{nom_fichier}?ref={ENV}"
try:
# Vérifier si une version plus récente existe sur le dépôt
fichier = Path(nom_fichier)
remote_last_modified = recuperer_date_dernier_commit(f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/commits?path={nom_fichier}&sha={ENV}")
local_last_modified = datetime.fromtimestamp(os.path.getmtime(nom_fichier), tz=timezone.utc) if os.path.exists(nom_fichier) else None
local_last_modified = datetime.fromtimestamp(fichier.stat().st_mtime, tz=timezone.utc) if fichier.exists() else None
# Si le fichier local n'existe pas ou si la version distante est plus récente
if not local_last_modified or (remote_last_modified and remote_last_modified > local_last_modified):
@ -49,7 +49,7 @@ def charger_instructions_depuis_gitea(nom_fichier="Instructions.md"):
data = response.json()
contenu_md = base64.b64decode(data["content"]).decode("utf-8")
# Sauvegarder en local
with open(nom_fichier, "w", encoding="utf-8") as f:
with fichier.open("w", encoding="utf-8") as f:
f.write(contenu_md)
return contenu_md
# Lire depuis le cache local
@ -57,7 +57,7 @@ def charger_instructions_depuis_gitea(nom_fichier="Instructions.md"):
except Exception as e:
st.error(f"Erreur chargement instructions Gitea : {e}")
# Essayer de charger depuis le cache local en cas d'erreur
if os.path.exists(nom_fichier):
if Path(nom_fichier).exists():
return lire_fichier_local(nom_fichier)
return None
@ -103,12 +103,13 @@ def charger_schema_depuis_gitea(fichier_local="schema_temp.txt"):
response.raise_for_status()
data = response.json()
fichier = Path(fichier_local)
remote_last_modified = recuperer_date_dernier_commit(f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_CODE}/commits?path={DOT_FILE}&sha={ENV_CODE}")
local_last_modified = datetime.fromtimestamp(os.path.getmtime(fichier_local), tz=timezone.utc) if os.path.exists(fichier_local) else None
local_last_modified = datetime.fromtimestamp(fichier.stat().st_mtime, tz=timezone.utc) if fichier.exists() else None
if not local_last_modified or (remote_last_modified and remote_last_modified > local_last_modified):
dot_text = base64.b64decode(data["content"]).decode("utf-8")
with open(fichier_local, "w", encoding="utf-8") as f:
with fichier.open("w", encoding="utf-8") as f:
f.write(dot_text)
return "OK"

View File

@ -7,14 +7,12 @@ import streamlit as st
import yaml
from networkx.drawing.nx_agraph import read_dot
from config import DOT_FILE
from utils.gitea import charger_schema_depuis_gitea
from utils.logger import setup_logger
logger = setup_logger(__name__)
# Configuration Gitea
from config import DOT_FILE
from utils.gitea import charger_schema_depuis_gitea
def extraire_chemins_depuis(G, source):
"""Extrait tous les chemins depuis un noeud source jusqu'aux feuilles du graphe.
@ -208,6 +206,7 @@ def load_seuils_config(path: str = "assets/config.yaml") -> dict:
def determiner_couleur_par_seuil(valeur: int, seuils_indice: dict) -> str:
"""Détermine la couleur en fonction de la valeur et des seuils configurés.
Logique alignée avec determine_threshold_color du projet.
Args:
@ -221,19 +220,16 @@ def determiner_couleur_par_seuil(valeur: int, seuils_indice: dict) -> str:
return "gray"
# Vérifier d'abord le seuil rouge (priorité la plus haute)
if "rouge" in seuils_indice and "min" in seuils_indice["rouge"]:
if valeur >= seuils_indice["rouge"]["min"]:
return "darkred"
if "rouge" in seuils_indice and "min" in seuils_indice["rouge"] and valeur >= seuils_indice["rouge"]["min"]:
return "darkred"
# Ensuite le seuil orange
if "orange" in seuils_indice and "min" in seuils_indice["orange"] and "max" in seuils_indice["orange"]:
if valeur >= seuils_indice["orange"]["min"] and valeur < seuils_indice["orange"]["max"]:
return "orange"
if "orange" in seuils_indice and "min" in seuils_indice["orange"] and "max" in seuils_indice["orange"] and valeur >= seuils_indice["orange"]["min"] and valeur < seuils_indice["orange"]["max"]:
return "orange"
# Seuil vert (valeurs inférieures au seuil orange)
if "vert" in seuils_indice and "max" in seuils_indice["vert"]:
if valeur < seuils_indice["vert"]["max"]:
return "darkgreen"
if "vert" in seuils_indice and "max" in seuils_indice["vert"] and valeur < seuils_indice["vert"]["max"]:
return "darkgreen"
# Par défaut orange si on ne trouve pas de correspondance exacte
return "orange"
@ -241,6 +237,7 @@ def determiner_couleur_par_seuil(valeur: int, seuils_indice: dict) -> str:
def couleur_noeud(n: str, niveaux: dict, G: nx.DiGraph) -> str:
"""Détermine la couleur d'un nœud en fonction de son niveau et de ses attributs.
Utilise les seuils définis dans le fichier de configuration.
Args:

View File

@ -16,8 +16,7 @@ def get_session_id() -> str:
Returns:
str: ID de session ou "anonymous" si non disponible.
"""
session_id = st.context.headers.get("x-session-id", "anonymous")
return session_id
return st.context.headers.get("x-session-id", "anonymous")
def update_session_paths():
"""Initialise les chemins de sauvegarde specifiques a la session courante.
@ -60,7 +59,7 @@ def _maj_champ(fichier, cle: str, contenu: str = "") -> bool:
if fichier.exists():
try:
with open(fichier, encoding="utf-8") as f:
with fichier.open(encoding="utf-8") as f:
sauvegarde = json.load(f)
sauvegarde = inserer_cle_json(sauvegarde, cle, serialize(contenu))
except Exception as e:
@ -72,7 +71,7 @@ def _maj_champ(fichier, cle: str, contenu: str = "") -> bool:
sauvegarde = inserer_cle_json(sauvegarde, cle, serialize(contenu))
try:
with open(fichier, "w", encoding="utf-8") as f:
with fichier.open("w", encoding="utf-8") as f:
json.dump(sauvegarde, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
@ -103,14 +102,14 @@ def _get_champ(fichier, cle: str) -> str:
import json
def charger_json_sain(fichier: str) -> dict:
with open(fichier, encoding="utf-8") as f:
with fichier.open(encoding="utf-8") as f:
contenu = json.load(f)
if isinstance(contenu, str):
try:
contenu = json.loads(contenu) # On essaie de parser une 2e fois si nécessaire
except json.JSONDecodeError:
raise ValueError("Le fichier contient une chaîne JSON invalide.")
except json.JSONDecodeError as err:
raise ValueError("Le fichier contient une chaîne JSON invalide.") from err
if not isinstance(contenu, dict):
raise ValueError("Le contenu JSON n'est pas un objet/dictionnaire valide.")
@ -139,7 +138,7 @@ def _supprime_champ(fichier: Path, cle: str) -> bool:
if fichier.exists():
try:
with open(fichier, encoding="utf-8") as f:
with fichier.open(encoding="utf-8") as f:
sauvegarde = json.load(f)
except Exception as e:
st.error(_("persistance.errors.read_file").format(function="_maj_champ", file=fichier, error=e))
@ -148,7 +147,7 @@ def _supprime_champ(fichier: Path, cle: str) -> bool:
supprimer_cle_profonde(sauvegarde, cle)
try:
with open(fichier, "w", encoding="utf-8") as f:
with fichier.open("w", encoding="utf-8") as f:
json.dump(sauvegarde, f, indent=4)
except Exception as e:
st.error(_("persistance.errors.write_file").format(function="_supprime_champ", file=fichier, error=e))
@ -196,9 +195,8 @@ def get_full_structure() -> dict|None:
fichier = SAVE_STATUT_PATH
if fichier.exists():
try:
with open(fichier, encoding="utf-8") as f:
sauvegarde = json.load(f)
return sauvegarde
with fichier.open(encoding="utf-8") as f:
return json.load(f)
except Exception as e:
st.error(_("persistance.errors.read_file").format(function="_maj_champ", file=fichier, error=e))
return None

View File

@ -1,6 +1,6 @@
import json
import logging
import os
from pathlib import Path
import streamlit as st
@ -18,12 +18,12 @@ def load_translations(lang="fr"):
dict: Dictionnaire des traductions ou un dictionnaire vide en cas d'erreur
"""
try:
file_path = os.path.join("assets", "locales", f"{lang}.json")
if not os.path.exists(file_path):
file_path = Path("assets") / "locales" / f"{lang}.json"
if not file_path.exists():
logger.warning(f"Fichier de traduction non trouvé: {file_path}")
return {}
with open(file_path, encoding="utf-8") as f:
with file_path.open(encoding="utf-8") as f:
translations = json.load(f)
logger.info(f"Traductions chargées: {lang}")
return translations
@ -33,6 +33,7 @@ def load_translations(lang="fr"):
def get_translation(key):
"""Récupère une traduction par sa clé.
Les clés peuvent être hiérarchiques, séparées par des points.
Exemple: "header.title" pour accéder à translations["header"]["title"]
@ -74,7 +75,7 @@ def set_language(lang="fr"):
# Initialiser la langue française par défaut
def init_translations():
"""Initialise les traductions avec la langue française"""
"""Initialise les traductions avec la langue française."""
if "translations" not in st.session_state:
set_language("fr")