Découpage de analyse_ia.py pour faciliter la maintenance

This commit is contained in:
Fabrication du Numérique 2025-05-28 14:36:30 +02:00
parent 95ede9c6f1
commit c5d854b165
14 changed files with 2025 additions and 1904 deletions

0
IA/02 - injection_fiches/auto_ingest.py Normal file → Executable file
View File

View File

@ -180,9 +180,12 @@ def interface_ia_nalyse(G_temp):
st.info(str(_("pages.ia_nalyse.empty_graph")))
elif resultat["statut"] == "terminé" and resultat["telechargement"]:
st.download_button(str(_("buttons.download")), resultat["telechargement"], file_name="analyse.zip")
if st.button(str(_("pages.ia_nalyse.confirm_download"))):
nettoyage_post_telechargement(st.session_state.username)
if not st.session_state.get("telechargement_confirme"):
st.download_button(str(_("buttons.download")), resultat["telechargement"], file_name="analyse.zip")
if st.button(str(_("pages.ia_nalyse.confirm_download"))):
nettoyage_post_telechargement(st.session_state.username)
st.session_state["telechargement_confirme"] = True
else:
st.success("Résultat supprimé. Vous pouvez relancer une nouvelle analyse.")
if st.button(str(_("buttons.refresh"))):
st.rerun()

View File

@ -136,6 +136,7 @@
"The graph covers all levels, from end products to geographic countries.\n",
"1. You can select minerals through which the paths go.",
"2. You can choose end products to perform an analysis tailored to your context.\n",
"These two selections are optional, but strongly recommended for a more relevant analysis.",
"The analysis is carried out using a private AI on a minimalist server. The result is therefore not immediate (approximately 30 minutes) and you will be notified of the progress."
],
"select_minerals": "Select one or more minerals",
@ -303,6 +304,7 @@
"failure": "Error",
"unknwon_error": "unknown error",
"no_task": "No task wainting or in progress",
"complete": "Analysis complete. Download the result in zip format, which contains the detailed report and analysis."
"complete": "Analysis complete. Download the result in zip format, which contains the detailed report and analysis.",
"step": "Step"
}
}

View File

@ -136,6 +136,7 @@
"Le graphe intègre l'ensemble des niveaux, des produits finaux aux pays géographiques.\n",
"1. Vous pouvez sélectionner des minerais par lesquels passent les chemins.",
"2. Vous pouvez choisir des produits finaux pour faire une analyse adaptée à votre contexte.\n",
" Ces deux sélections sont optionnelles, mais fortement recommandées pour avoir une meilleure pertinence de l'analyse.",
"L'analyse se réalise à l'aide d'une IA privée, sur un serveur minimaliste. Le résultat n'est donc pas immédiat (ordre de grandeur : 30 minutes) et vous serez informé de l'avancement."
],
"select_minerals": "Sélectionner un ou plusieurs minerais",
@ -303,6 +304,7 @@
"failure": "Échec",
"unknwon_error": "erreur inconnue",
"no_task": "Aucune tâche en attente ou en cours",
"complete": "Analyse terminée. Télécharger le résultat au format zip, qui contient le rapport détaillé et l'analyse."
"complete": "Analyse terminée. Télécharger le résultat au format zip, qui contient le rapport détaillé et l'analyse.",
"step": "Étape"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -32,8 +32,10 @@ def statut_utilisateur(login):
"message": f"{str(_('batch.in_queue'))} (position {entry.get('position', '?')})."}
if entry["status"] == "en cours":
if "step" not in st.session_state:
st.session_state["step"] = 1
return {"statut": "en cours", "position": 0,
"telechargement": None, "message": f"{str(_('batch.in_progress'))}."}
"telechargement": None, "message": f"{str(_('batch.in_progress'))} ({str(_('batch.step'))} {st.session_state["step"]}/5)."}
if entry["status"] == "terminé":
result_file = JOBS_DIR / f"{login}.zip"

93
batch_ia/utils/config.py Normal file
View File

@ -0,0 +1,93 @@
import os
import yaml
from pathlib import Path
import uuid
def init_uuid():
if not TEMP_SECTIONS.exists():
TEMP_SECTIONS.mkdir(parents=True)
session_uuid = str(uuid.uuid4())[:8] # Utiliser les 8 premiers caractères pour plus de concision
print(f"🔑 UUID de session généré: {session_uuid}")
return session_uuid
BASE_DIR = Path(__file__).resolve().parent.parent
CORPUS_DIR = BASE_DIR.parent / "Corpus"
THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml"
REFERENCE_GRAPH_PATH = BASE_DIR.parent / "schema.txt"
GRAPH_PATH = BASE_DIR.parent / "graphe.dot"
TEMP_SECTIONS = BASE_DIR / "temp_sections"
session_uuid = init_uuid()
TEMPLATE_PATH = TEMP_SECTIONS / f"rapport_final - {session_uuid}.md"
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
PROMPT_METHODOLOGIE = """
Le rapport à examiner a été établi à partir de la méthodologie suivante.
Le dispositif dévaluation des risques proposé repose sur quatre indices clairement définis, chacun analysant un aspect spécifique des risques dans la chaîne dapprovisionnement numérique. Lindice IHH mesure la concentration géographique ou industrielle, permettant dévaluer la dépendance vis-à-vis de certains acteurs ou régions. Lindice ISG indique la stabilité géopolitique des pays impliqués dans la chaîne de production, en intégrant des critères politiques, sociaux et climatiques. Lindice ICS quantifie la facilité ou la difficulté à remplacer ou substituer un élément spécifique dans la chaîne, évaluant ainsi les risques liés à la dépendance technologique et économique. Enfin, lindice IVC examine la pression concurrentielle sur les ressources utilisées par le numérique, révélant ainsi le risque potentiel que ces ressources soient détournées vers dautres secteurs industriels.
Ces indices se combinent judicieusement par paires pour une évaluation approfondie et pertinente des risques. La combinaison IHH-ISG permet dassocier la gravité d'un impact potentiel (IHH) à la probabilité de survenance dun événement perturbateur (ISG), créant ainsi une matrice de vulnérabilité combinée utile pour identifier rapidement les points critiques dans la chaîne de production. La combinaison ICS-IVC fonctionne selon la même logique, mais se concentre spécifiquement sur les ressources minérales : lICS indique la gravité potentielle d'une rupture d'approvisionnement due à une faible substituabilité, tandis que lIVC évalue la probabilité que les ressources soient captées par d'autres secteurs industriels concurrents. Ces combinaisons permettent dobtenir une analyse précise et opérationnelle du niveau de risque global.
Les avantages de cette méthodologie résident dans son approche à la fois systématique et granulaire, adaptée à l'échelle décisionnelle d'un COMEX. Elle permet didentifier avec précision les vulnérabilités majeures et leurs origines spécifiques, facilitant ainsi la prise de décision stratégique éclairée et proactive. En combinant des facteurs géopolitiques, industriels, technologiques et concurrentiels, ces indices offrent un suivi efficace de la chaîne de fabrication numérique, garantissant ainsi une gestion optimale des risques et la continuité opérationnelle à long terme.
"""
DICTIONNAIRE_CRITICITES = {
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
"ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"},
"ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"},
"IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"}
}
POIDS_COULEURS = {
"Vert": 1,
"Orange": 2,
"Rouge": 3
}
def load_config(thresholds_path=THRESHOLDS_PATH):
"""Charge la configuration depuis les fichiers YAML."""
config = {}
# Charger les seuils
if os.path.exists(thresholds_path):
with open(thresholds_path, 'r', encoding='utf-8') as f:
thresholds = yaml.safe_load(f)
config['thresholds'] = thresholds.get('seuils', {})
return config
def determine_threshold_color(value, index_type, thresholds):
"""
Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
Utilise les seuils de config.yaml si disponibles.
"""
# Récupérer les seuils pour cet indice
if index_type in thresholds:
index_thresholds = thresholds[index_type]
# Déterminer la couleur
if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \
index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]:
suffix = get_suffix_for_index(index_type, "vert")
return "Vert", suffix
elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \
index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \
index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]:
suffix = get_suffix_for_index(index_type, "orange")
return "Orange", suffix
elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \
index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]:
suffix = get_suffix_for_index(index_type, "rouge")
return "Rouge", suffix
return "Non déterminé", ""
def get_suffix_for_index(index_type, color):
"""Retourne le suffixe approprié pour chaque indice et couleur."""
suffixes = DICTIONNAIRE_CRITICITES
if index_type in suffixes and color in suffixes[index_type]:
return suffixes[index_type][color]
return ""
def get_weight_for_color(color):
"""Retourne le poids correspondant à une couleur."""
weights = POIDS_COULEURS
return weights.get(color, 0)

133
batch_ia/utils/files.py Normal file
View File

@ -0,0 +1,133 @@
import os
import re
from utils.config import (
CORPUS_DIR
)
def strip_prefix(name):
"""Supprime le préfixe numérique éventuel d'un nom de fichier ou de dossier."""
return re.sub(r'^\d+[-_ ]*', '', name).lower()
def find_prefixed_directory(pattern, base_path=None):
"""
Recherche un sous-répertoire dont le nom (sans préfixe) correspond au pattern.
Args:
pattern: Nom du répertoire sans préfixe
base_path: Répertoire de base chercher
Returns:
Le chemin relatif du répertoire trouvé (avec préfixe) ou None
"""
if base_path:
search_path = os.path.join(CORPUS_DIR, base_path)
else:
search_path = CORPUS_DIR
if not os.path.exists(search_path):
# print(f"Chemin inexistant: {search_path}")
return None
for d in os.listdir(search_path):
dir_path = os.path.join(search_path, d)
if os.path.isdir(dir_path) and strip_prefix(d) == pattern.lower():
return os.path.relpath(dir_path, CORPUS_DIR)
# print(f"Aucun répertoire correspondant à: '{pattern}' trouvé dans {search_path}")
return None
def find_corpus_file(pattern, base_path=None):
"""
Recherche récursive dans le corpus d'un fichier en ignorant les préfixes numériques dans les dossiers et fichiers.
Args:
pattern: Chemin relatif type "sous-dossier/nom-fichier"
base_path: Dossier de base à partir duquel chercher
Returns:
Chemin relatif du fichier trouvé ou None
"""
if base_path:
search_path = os.path.join(CORPUS_DIR, base_path)
else:
search_path = CORPUS_DIR
# # print(f"Recherche de: '{pattern}' dans {search_path}")
if not os.path.exists(search_path):
# print(pattern)
# print(base_path)
# print(f"Chemin inexistant: {search_path}")
return None
if '/' not in pattern:
# Recherche directe d'un fichier
for file in os.listdir(search_path):
if not file.endswith('.md'):
continue
if strip_prefix(os.path.splitext(file)[0]) == pattern.lower():
rel_path = os.path.relpath(os.path.join(search_path, file), CORPUS_DIR)
# # print(f"Fichier trouvé: {rel_path}")
return rel_path
else:
# Séparation du chemin en dossier/fichier
first, rest = pattern.split('/', 1)
matched_dir = find_prefixed_directory(first, base_path)
if matched_dir:
return find_corpus_file(rest, matched_dir)
# print(f"Aucun fichier correspondant à: '{pattern}' trouvé dans {base_path}.")
return None
def read_corpus_file(file_path, remove_first_title=False, shift_titles=0):
"""
Lit un fichier du corpus et applique les transformations demandées.
Args:
file_path: Chemin relatif du fichier dans le corpus
remove_first_title: Si True, supprime la première ligne de titre
shift_titles: Nombre de niveaux à ajouter aux titres
Returns:
Le contenu du fichier avec les transformations appliquées
"""
full_path = os.path.join(CORPUS_DIR, file_path)
if not os.path.exists(full_path):
# print(f"Fichier non trouvé: {full_path}")
return f"Fichier non trouvé: {file_path}"
# # print(f"Lecture du fichier: {full_path}")
with open(full_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Supprimer la première ligne si c'est un titre et si demandé
if remove_first_title and lines and lines[0].startswith('#'):
# # print(f"Suppression du titre: {lines[0].strip()}")
lines = lines[1:]
# Décaler les niveaux de titre si demandé
if shift_titles > 0:
for i in range(len(lines)):
if lines[i].startswith('#'):
lines[i] = '#' * shift_titles + lines[i]
# Nettoyer les retours à la ligne superflus
content = ''.join(lines)
# Supprimer les retours à la ligne en fin de contenu
content = content.rstrip('\n') + '\n'
return content
def write_report(report, fichier):
"""Écrit le rapport généré dans le fichier spécifié."""
report = re.sub(r'<!----.*?-->', '', report)
report = re.sub(r'\n\n\n+', '\n\n', report)
with open(fichier, 'w', encoding='utf-8') as f:
f.write(report)
# print(f"Rapport généré avec succès: {TEMPLATE_PATH}")

591
batch_ia/utils/graphs.py Normal file
View File

@ -0,0 +1,591 @@
import os
import sys
from networkx.drawing.nx_agraph import read_dot
from utils.config import (
REFERENCE_GRAPH_PATH,
determine_threshold_color, get_weight_for_color
)
def parse_graphs(graphe_path):
"""
Charge et analyse les graphes DOT (analyse et référence).
"""
print(graphe_path)
# Charger le graphe à analyser
if not os.path.exists(graphe_path):
print(f"Fichier de graphe à analyser introuvable: {graphe_path}")
sys.exit(1)
# Charger le graphe de référence
reference_path = REFERENCE_GRAPH_PATH
if not os.path.exists(reference_path):
print(f"Fichier de graphe de référence introuvable: {reference_path}")
sys.exit(1)
try:
# Charger les graphes avec NetworkX
graph = read_dot(graphe_path)
ref_graph = read_dot(reference_path)
# Convertir les attributs en types appropriés pour les deux graphes
for g in [graph, ref_graph]:
for node, attrs in g.nodes(data=True):
for key, value in list(attrs.items()):
# Convertir les valeurs numériques
if key in ['niveau', 'ihh_acteurs', 'ihh_pays', 'isg', 'ivc']:
try:
if key in ['isg', 'ivc', 'ihh_acteurs', 'ihh_pays', 'niveau']:
attrs[key] = int(value.strip('"'))
else:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
# Garder la valeur originale si la conversion échoue
pass
elif key == 'label':
# Nettoyer les guillemets des étiquettes
attrs[key] = value.strip('"')
# Convertir les attributs des arêtes
for u, v, attrs in g.edges(data=True):
for key, value in list(attrs.items()):
if key in ['ics', 'cout', 'delai', 'technique']:
try:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
pass
elif key == 'label' and '%' in value:
# Extraire le pourcentage
try:
percentage = value.strip('"').replace('%', '')
attrs['percentage'] = float(percentage)
except (ValueError, TypeError):
pass
return graph, ref_graph
except Exception as e:
print(f"Erreur lors de l'analyse des graphes: {str(e)}")
sys.exit(1)
def extract_data_from_graph(graph, ref_graph):
"""
Extrait toutes les données pertinentes des graphes DOT.
"""
data = {
"products": {}, # Produits finaux (N0)
"components": {}, # Composants (N1)
"minerals": {}, # Minerais (N2)
"operations": {}, # Opérations (N10)
"countries": {}, # Pays (N11)
"geo_countries": {}, # Pays géographiques (N99)
"actors": {} # Acteurs (N12)
}
# Extraire tous les pays géographiques du graphe de référence
for node, attrs in ref_graph.nodes(data=True):
if attrs.get('niveau') == 99:
country_name = attrs.get('label', node)
isg_value = attrs.get('isg', 0)
data["geo_countries"][country_name] = {
"id": node,
"isg": isg_value
}
# Extraire les nœuds du graphe à analyser
for node, attrs in graph.nodes(data=True):
level = attrs.get('niveau', -1)
label = attrs.get('label', node)
if level == 0 or level == 1000: # Produit final
data["products"][node] = {
"label": label,
"components": [],
"assembly": None,
"level": level
}
elif level == 1 or level == 1001: # Composant
data["components"][node] = {
"label": label,
"minerals": [],
"manufacturing": None
}
elif level == 2: # Minerai
data["minerals"][node] = {
"label": label,
"ivc": attrs.get('ivc', 0),
"extraction": None,
"treatment": None,
"ics_values": {}
}
elif level == 10 or level == 1010: # Opération
op_type = label.lower()
data["operations"][node] = {
"label": label,
"type": op_type,
"ihh_acteurs": attrs.get('ihh_acteurs', 0),
"ihh_pays": attrs.get('ihh_pays', 0),
"countries": {}
}
elif level == 11 or level == 1011: # Pays
data["countries"][node] = {
"label": label,
"actors": {},
"geo_country": None,
"market_share": 0
}
elif level == 12 or level == 1012: # Acteur
data["actors"][node] = {
"label": label,
"country": None,
"market_share": 0
}
# Extraire les relations et attributs des arêtes
for source, target, edge_attrs in graph.edges(data=True):
if source not in graph.nodes or target not in graph.nodes:
continue
source_level = graph.nodes[source].get('niveau', -1)
target_level = graph.nodes[target].get('niveau', -1)
# Extraire part de marché
market_share = 0
if 'percentage' in edge_attrs:
market_share = edge_attrs['percentage']
elif 'label' in edge_attrs and '%' in edge_attrs['label']:
try:
market_share = float(edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Relations produit → composant
if (source_level == 0 and target_level == 1) or (source_level == 1000 and target_level == 1001):
if target not in data["products"][source]["components"]:
data["products"][source]["components"].append(target)
# Relations produit → opération (assemblage)
elif (source_level == 0 and target_level == 10) or (source_level == 1000 and target_level == 1010):
if graph.nodes[target].get('label', '').lower() == 'assemblage':
data["products"][source]["assembly"] = target
# Relations composant → minerai avec ICS
elif (source_level == 1 or source_level == 1001) and target_level == 2:
if target not in data["components"][source]["minerals"]:
data["components"][source]["minerals"].append(target)
# Stocker l'ICS s'il est présent
if 'ics' in edge_attrs:
ics_value = edge_attrs['ics']
data["minerals"][target]["ics_values"][source] = ics_value
# Relations composant → opération (fabrication)
elif (source_level == 1 or source_level == 1001) and target_level == 10:
if graph.nodes[target].get('label', '').lower() == 'fabrication':
data["components"][source]["manufacturing"] = target
# Relations minerai → opération (extraction/traitement)
elif source_level == 2 and target_level == 10:
op_label = graph.nodes[target].get('label', '').lower()
if 'extraction' in op_label:
data["minerals"][source]["extraction"] = target
elif 'traitement' in op_label:
data["minerals"][source]["treatment"] = target
# Relations opération → pays avec part de marché
elif (source_level == 10 and target_level == 11) or (source_level == 1010 and target_level == 1011):
data["operations"][source]["countries"][target] = market_share
data["countries"][target]["market_share"] = market_share
# Relations pays → acteur avec part de marché
elif (source_level == 11 and target_level == 12) or (source_level == 1011 and target_level == 1012):
data["countries"][source]["actors"][target] = market_share
data["actors"][target]["market_share"] = market_share
data["actors"][target]["country"] = source
# Relations pays → pays géographique
elif (source_level == 11 or source_level == 1011) and target_level == 99:
country_name = graph.nodes[target].get('label', '')
data["countries"][source]["geo_country"] = country_name
# Compléter les opérations manquantes pour les produits et composants
# en les récupérant du graphe de référence si elles existent
# Pour les produits finaux (N0)
for product_id, product_data in data["products"].items():
if product_data["assembly"] is None:
# Chercher l'opération d'assemblage dans le graphe de référence
for source, target, edge_attrs in ref_graph.edges(data=True):
if (source == product_id and
((ref_graph.nodes[source].get('niveau') == 0 and
ref_graph.nodes[target].get('niveau') == 10) or
(ref_graph.nodes[source].get('niveau') == 1000 and
ref_graph.nodes[target].get('niveau') == 1010)) and
ref_graph.nodes[target].get('label', '').lower() == 'assemblage'):
# L'opération existe dans le graphe de référence
assembly_id = target
product_data["assembly"] = assembly_id
# Ajouter l'opération si elle n'existe pas déjà
if assembly_id not in data["operations"]:
data["operations"][assembly_id] = {
"label": ref_graph.nodes[assembly_id].get('label', assembly_id),
"type": "assemblage",
"ihh_acteurs": ref_graph.nodes[assembly_id].get('ihh_acteurs', 0),
"ihh_pays": ref_graph.nodes[assembly_id].get('ihh_pays', 0),
"countries": {}
}
# Extraire les relations de l'opération vers les pays
for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True):
if (op_source == assembly_id and
(ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)):
country_id = op_target
# Extraire part de marché
market_share = 0
if 'percentage' in op_edge_attrs:
market_share = op_edge_attrs['percentage']
elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']:
try:
market_share = float(op_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter le pays à l'opération
data["operations"][assembly_id]["countries"][country_id] = market_share
# Ajouter le pays s'il n'existe pas déjà
if country_id not in data["countries"]:
data["countries"][country_id] = {
"label": ref_graph.nodes[country_id].get('label', country_id),
"actors": {},
"geo_country": None,
"market_share": market_share
}
else:
data["countries"][country_id]["market_share"] = market_share
# Extraire les relations du pays vers les acteurs
for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True):
if (country_source == country_id and
(ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)):
actor_id = country_target
# Extraire part de marché
actor_market_share = 0
if 'percentage' in country_edge_attrs:
actor_market_share = country_edge_attrs['percentage']
elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']:
try:
actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter l'acteur au pays
data["countries"][country_id]["actors"][actor_id] = actor_market_share
# Ajouter l'acteur s'il n'existe pas déjà
if actor_id not in data["actors"]:
data["actors"][actor_id] = {
"label": ref_graph.nodes[actor_id].get('label', actor_id),
"country": country_id,
"market_share": actor_market_share
}
else:
data["actors"][actor_id]["market_share"] = actor_market_share
data["actors"][actor_id]["country"] = country_id
# Extraire la relation du pays vers le pays géographique
for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True):
if (geo_source == country_id and
ref_graph.nodes[geo_target].get('niveau') == 99):
geo_country_name = ref_graph.nodes[geo_target].get('label', '')
data["countries"][country_id]["geo_country"] = geo_country_name
break # Une seule opération d'assemblage par produit
# Pour les composants (N1)
for component_id, component_data in data["components"].items():
if component_data["manufacturing"] is None:
# Chercher l'opération de fabrication dans le graphe de référence
for source, target, edge_attrs in ref_graph.edges(data=True):
if (source == component_id and
((ref_graph.nodes[source].get('niveau') == 1 and
ref_graph.nodes[target].get('niveau') == 10) or
(ref_graph.nodes[source].get('niveau') == 1001 and
ref_graph.nodes[target].get('niveau') == 1010)) and
ref_graph.nodes[target].get('label', '').lower() == 'fabrication'):
# L'opération existe dans le graphe de référence
manufacturing_id = target
component_data["manufacturing"] = manufacturing_id
# Ajouter l'opération si elle n'existe pas déjà
if manufacturing_id not in data["operations"]:
data["operations"][manufacturing_id] = {
"label": ref_graph.nodes[manufacturing_id].get('label', manufacturing_id),
"type": "fabrication",
"ihh_acteurs": ref_graph.nodes[manufacturing_id].get('ihh_acteurs', 0),
"ihh_pays": ref_graph.nodes[manufacturing_id].get('ihh_pays', 0),
"countries": {}
}
# Extraire les relations de l'opération vers les pays
for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True):
if (op_source == manufacturing_id and
(ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)):
country_id = op_target
# Extraire part de marché
market_share = 0
if 'percentage' in op_edge_attrs:
market_share = op_edge_attrs['percentage']
elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']:
try:
market_share = float(op_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter le pays à l'opération
data["operations"][manufacturing_id]["countries"][country_id] = market_share
# Ajouter le pays s'il n'existe pas déjà
if country_id not in data["countries"]:
data["countries"][country_id] = {
"label": ref_graph.nodes[country_id].get('label', country_id),
"actors": {},
"geo_country": None,
"market_share": market_share
}
else:
data["countries"][country_id]["market_share"] = market_share
# Extraire les relations du pays vers les acteurs
for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True):
if (country_source == country_id and
(ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)):
actor_id = country_target
# Extraire part de marché
actor_market_share = 0
if 'percentage' in country_edge_attrs:
actor_market_share = country_edge_attrs['percentage']
elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']:
try:
actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter l'acteur au pays
data["countries"][country_id]["actors"][actor_id] = actor_market_share
# Ajouter l'acteur s'il n'existe pas déjà
if actor_id not in data["actors"]:
data["actors"][actor_id] = {
"label": ref_graph.nodes[actor_id].get('label', actor_id),
"country": country_id,
"market_share": actor_market_share
}
else:
data["actors"][actor_id]["market_share"] = actor_market_share
data["actors"][actor_id]["country"] = country_id
# Extraire la relation du pays vers le pays géographique
for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True):
if (geo_source == country_id and
ref_graph.nodes[geo_target].get('niveau') == 99):
geo_country_name = ref_graph.nodes[geo_target].get('label', '')
data["countries"][country_id]["geo_country"] = geo_country_name
break # Une seule opération de fabrication par composant
return data
def calculate_vulnerabilities(data, config):
"""
Calcule les vulnérabilités combinées pour toutes les opérations et minerais.
"""
thresholds = config.get('thresholds', {})
results = {
"ihh_isg_combined": {}, # Pour chaque opération
"ics_ivc_combined": {}, # Pour chaque minerai
"chains": [] # Pour stocker tous les chemins possibles
}
# 1. Calculer ISG_combiné pour chaque opération
for op_id, operation in data["operations"].items():
isg_weighted_sum = 0
total_share = 0
# Parcourir chaque pays impliqué dans l'opération
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
isg_weighted_sum += isg_value * share
total_share += share
# Calculer la moyenne pondérée
isg_combined = 0
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
# Déterminer couleurs et poids
ihh_value = operation["ihh_pays"]
ihh_color, ihh_suffix = determine_threshold_color(ihh_value, "IHH", thresholds)
isg_color, isg_suffix = determine_threshold_color(isg_combined, "ISG", thresholds)
# Calculer poids combiné
ihh_weight = get_weight_for_color(ihh_color)
isg_weight = get_weight_for_color(isg_color)
combined_weight = ihh_weight * isg_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ihh_isg_combined"][op_id] = {
"ihh_value": ihh_value,
"ihh_color": ihh_color,
"ihh_suffix": ihh_suffix,
"isg_combined": isg_combined,
"isg_color": isg_color,
"isg_suffix": isg_suffix,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 2. Calculer ICS_moyen pour chaque minerai
for mineral_id, mineral in data["minerals"].items():
ics_values = list(mineral["ics_values"].values())
ics_average = 0
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
ivc_value = mineral.get("ivc", 0)
# Déterminer couleurs et poids
ics_color, ics_suffix = determine_threshold_color(ics_average, "ICS", thresholds)
ivc_color, ivc_suffix = determine_threshold_color(ivc_value, "IVC", thresholds)
# Calculer poids combiné
ics_weight = get_weight_for_color(ics_color)
ivc_weight = get_weight_for_color(ivc_color)
combined_weight = ics_weight * ivc_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ics_ivc_combined"][mineral_id] = {
"ics_average": ics_average,
"ics_color": ics_color,
"ics_suffix": ics_suffix,
"ivc_value": ivc_value,
"ivc_color": ivc_color,
"ivc_suffix": ivc_suffix,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 3. Identifier tous les chemins et leurs vulnérabilités
for product_id, product in data["products"].items():
for component_id in product["components"]:
component = data["components"][component_id]
for mineral_id in component["minerals"]:
mineral = data["minerals"][mineral_id]
# Collecter toutes les vulnérabilités dans ce chemin
path_vulnerabilities = []
# Assemblage (si présent)
assembly_id = product["assembly"]
if assembly_id and assembly_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "assemblage",
"vulnerability": results["ihh_isg_combined"][assembly_id]["vulnerability"],
"operation_id": assembly_id
})
# Fabrication (si présent)
manufacturing_id = component["manufacturing"]
if manufacturing_id and manufacturing_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "fabrication",
"vulnerability": results["ihh_isg_combined"][manufacturing_id]["vulnerability"],
"operation_id": manufacturing_id
})
# Minerai (ICS+IVC)
if mineral_id in results["ics_ivc_combined"]:
path_vulnerabilities.append({
"type": "minerai",
"vulnerability": results["ics_ivc_combined"][mineral_id]["vulnerability"],
"mineral_id": mineral_id
})
# Extraction (si présent)
extraction_id = mineral["extraction"]
if extraction_id and extraction_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "extraction",
"vulnerability": results["ihh_isg_combined"][extraction_id]["vulnerability"],
"operation_id": extraction_id
})
# Traitement (si présent)
treatment_id = mineral["treatment"]
if treatment_id and treatment_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "traitement",
"vulnerability": results["ihh_isg_combined"][treatment_id]["vulnerability"],
"operation_id": treatment_id
})
# Classifier le chemin
path_info = {
"product": product_id,
"component": component_id,
"mineral": mineral_id,
"vulnerabilities": path_vulnerabilities
}
# Déterminer le niveau de risque du chemin
critical_count = path_vulnerabilities.count({"vulnerability": "ÉLEVÉE à CRITIQUE"})
medium_count = path_vulnerabilities.count({"vulnerability": "MOYENNE"})
if any(v["vulnerability"] == "ÉLEVÉE à CRITIQUE" for v in path_vulnerabilities):
path_info["risk_level"] = "critique"
elif medium_count >= 3:
path_info["risk_level"] = "majeur"
elif any(v["vulnerability"] == "MOYENNE" for v in path_vulnerabilities):
path_info["risk_level"] = "moyen"
else:
path_info["risk_level"] = "faible"
results["chains"].append(path_info)
return results

286
batch_ia/utils/ia.py Normal file
View File

@ -0,0 +1,286 @@
import re
from pathlib import Path
import requests
import json
import time
import zipfile
import streamlit as st
from nettoyer_pgpt import (
delete_documents_by_criteria
)
from utils.config import (
TEMP_SECTIONS,
session_uuid,
API_URL, PROMPT_METHODOLOGIE
)
def ingest_document(file_path: Path) -> bool:
"""Ingère un document dans PrivateGPT"""
try:
with open(file_path, "rb") as f:
file_name = file_path.name
files = {"file": (file_name, f, "text/markdown")}
# Ajouter des métadonnées pour identifier facilement ce fichier d'entrée
metadata = {
"type": "input_file",
"session_id": session_uuid,
"document_type": "rapport_analyse_input"
}
response = requests.post(
f"{API_URL}/ingest/file",
files=files,
data={"metadata": json.dumps(metadata)} if "metadata" in requests.get(f"{API_URL}/ingest/file").text else None
)
response.raise_for_status()
print(f"✅ Document '{file_path}' ingéré avec succès sous le nom '{file_name}'")
return True
except FileNotFoundError:
print(f"❌ Fichier '{file_path}' introuvable")
return False
except requests.RequestException as e:
print(f"❌ Erreur lors de l'ingestion du document: {e}")
return False
def generate_text(input_file, full_prompt, system_message, temperature = "0.3", use_context = True):
"""Génère du texte avec l'API PrivateGPT"""
try:
# Définir les paramètres de la requête
payload = {
"messages": [
{"role": "system", "content": system_message},
{"role": "user", "content": full_prompt}
],
"use_context": use_context, # Active la recherche RAG dans les documents ingérés
"temperature": temperature, # Température réduite pour plus de cohérence
"stream": False
}
# Tenter d'ajouter un filtre de contexte (fonctionnalité expérimentale qui peut ne pas être supportée)
if input_file:
try:
# Vérifier si le filtre de contexte est supporté sans faire de requête supplémentaire
liste_des_fichiers = list(TEMP_SECTIONS.glob(f"*{session_uuid}*.md"))
filter_metadata = {
"document_name": [input_file.name] + [f.name for f in liste_des_fichiers]
}
payload["filter_metadata"] = filter_metadata
except Exception as e:
print(f" Remarque: Impossible d'appliquer le filtre de contexte: {e}")
# Envoyer la requête
response = requests.post(
f"{API_URL}/chat/completions",
json=payload,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire la réponse générée
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Détails: {e.response.text}")
return None
def ia_analyse(file_names):
for file in file_names:
ingest_document(file)
time.sleep(5)
reponse = {}
for file in file_names:
produit_final = re.search(r"chemins critiques (.+)\.md$", file.name).group(1)
# Préparer le prompt avec le contexte précédent si disponible et demandé
full_prompt = f"""
Rédigez une synthèse du fichier {file.name} dédiée au produit final '{produit_final}'.
Cette synthèse, destinée spécifiquement au Directeur des Risques, membre du COMEX d'une grande entreprise utilisant ce produit, doit être claire et concise (environ 10 lignes).
En utilisant impérativement la méthodologie fournie, expliquez en termes simples mais précis, pourquoi et comment les vulnérabilités identifiées constituent un risque concret pour l'entreprise. Mentionnez clairement :
- Les composants spécifiques du produit '{produit_final}' concernés par ces vulnérabilités.
- Les minerais précis responsables de ces vulnérabilités et leur rôle dans limpact sur les composants.
- Les points critiques exacts identifiés dans la chaîne d'approvisionnement (par exemple : faible substituabilité, forte concentration géographique, instabilité géopolitique, concurrence élevée entre secteurs industriels).
- Identifier autant que faire se peut, les pays générant la forte concentration géographiques ou qui sont sujet à instabilité géopolitique, les secteurs en concurrence avec le numérique pour les minerais.
Respectez strictement les consignes suivantes :
- N'utilisez aucun acronyme ni valeur numérique ; uniquement leur équivalent textuel (ex : criticité de substituabilité, vulnérabilité élevée ou critique, etc.).
- N'incluez à ce stade aucune préconisation ni recommandation.
Votre texte doit être parfaitement adapté à une compréhension rapide par des dirigeants dentreprise.
"""
# Définir les paramètres de la requête
system_message = f"""
Vous êtes un assistant stratégique expert chargé de rédiger des synthèses destinées à des décideurs de très haut niveau (Directeurs des Risques, membres du COMEX, stratèges industriels). Vous analysez exclusivement les vulnérabilités systémiques affectant les produits numériques, à partir des données précises fournies dans le fichier {file.name}.
Votre analyse doit être rigoureuse, accessible, pertinente pour la prise de décision stratégique, et conforme à la méthodologie définie ci-dessous :
{PROMPT_METHODOLOGIE}
/no_think
"""
reponse[produit_final] = f"\n**{produit_final}**\n\n" + generate_text(file, full_prompt, system_message).split("</think>")[-1].strip()
# print(reponse[produit_final])
corps = "\n\n".join(reponse.values())
print("Corps")
st.session_state["step"] = 2
full_prompt = corps + "\n\n" + PROMPT_METHODOLOGIE
system_message = """
Vous êtes un expert en rédaction de rapports stratégiques destinés à un COMEX ou une Direction des Risques.
Votre mission est d'écrire une introduction professionnelle, claire et synthétique (maximum 7 lignes) à partir des éléments suivants :
1. Un corps danalyse décrivant les vulnérabilités identifiées pour un produit numérique.
2. La méthodologie détaillée utilisée pour cette analyse (fourni en deuxième partie).
Votre introduction doit :
- Présenter brièvement le sujet traité (vulnérabilités du produit final), quels sont les produits finaux et les minerais concernés.
- Annoncer clairement le contenu et l'objectif de l'analyse présentée dans le corps.
- Résumer succinctement les axes méthodologiques principaux (concentration géographique ou industrielle, stabilité géopolitique, criticité de substituabilité, concurrence intersectorielle des minerais).
- Être facilement compréhensible par des décideurs de haut niveau (pas d'acronymes, ni chiffres ; uniquement des formulations textuelles).
- Être fluide, agréable à lire, avec un ton sobre et professionnel.
Répondez uniquement avec l'introduction rédigée. Ne fournissez aucune autre explication complémentaire.
/no_think
"""
introduction = generate_text("", full_prompt, system_message).split("</think>")[-1].strip()
print("Introduction")
st.session_state["step"] = 3
full_prompt = corps + "\n\n" + PROMPT_METHODOLOGIE
system_message = """
Vous êtes un expert stratégique en gestion des risques liés à la chaîne de valeur numérique. Vous conseillez directement le COMEX et la Direction des Risques de grandes entreprises utilisatrices de produits numériques. Ces entreprises n'ont pour levier daction que le choix de leurs fournisseurs ou l'allongement de la durée de vie de leur matériel.
À partir des vulnérabilités identifiées dans la première partie du prompt (corps d'analyse) et en tenant compte du contexte et de la méthodologie décrite en deuxième partie, rédigez un texte clair, structuré en deux parties distinctes :
1. **Préconisations stratégiques :**
Proposez clairement des axes concrets pour limiter les risques identifiés dans lanalyse. Ces préconisations doivent impérativement être réalistes et directement actionnables par les dirigeants compte tenu de leurs leviers limités.
2. **Indicateurs de suivi :**
Identifiez précisément les indicateurs pertinents à suivre pour évaluer régulièrement lévolution de ces risques. Ces indicateurs doivent être inspirés directement des axes méthodologiques fournis (concentration géographique, stabilité géopolitique, substituabilité, concurrence intersectorielle) ou sappuyer sur des bonnes pratiques reconnues.
Votre rédaction doit être fluide, concise, très professionnelle, et directement accessible à un COMEX. Évitez strictement toute explication complémentaire ou ajout superflu. Ne proposez que le texte demandé.
"""
preconisations = generate_text("", full_prompt, system_message, "0.5").split("</think>")[-1].strip()
print("Préconisations")
st.session_state["step"] = 4
full_prompt = corps + "\n\n" + preconisations
system_message = """
Vous êtes un expert stratégique spécialisé dans les risques liés à la chaîne de valeur du numérique. Vous conseillez directement le COMEX et la Direction des Risques de grandes entreprises dépendantes du numérique, dont les leviers daction se limitent au choix des fournisseurs et à lallongement de la durée dutilisation du matériel.
À partir du résultat de l'analyse des vulnérabilités présenté en première partie du prompt (corps) et des préconisations stratégiques formulées en deuxième partie, rédigez une conclusion synthétique et percutante (environ 6 à 8 lignes maximum) afin de :
- Résumer clairement les principaux risques identifiés.
- Souligner brièvement les axes prioritaires proposés pour agir concrètement.
- Inviter de manière dynamique le COMEX à passer immédiatement à l'action.
Votre rédaction doit être fluide, professionnelle, claire et immédiatement exploitable par des dirigeants. Ne fournissez aucune explication supplémentaire. Ne répondez que par la conclusion demandée.
/no_think
"""
conclusion = generate_text("", full_prompt, system_message, "0.7").split("</think>")[-1].strip()
print("Conclusion")
st.session_state["step"] = 5
analyse = "# Rapport d'analyse\n\n" + \
"\n\n## Introduction\n\n" + \
introduction + \
"\n\n## Analyse des produits finaux\n\n" + \
corps + \
"\n\n## Préconisations\n\n" + \
preconisations + \
"\n\n## Conclusion\n\n" + \
conclusion + \
"\n\n## Méthodologie\n\n" + \
PROMPT_METHODOLOGIE
# fichier_a_reviser = Path(TEMPLATE_PATH.name.replace(".md", " - analyse à relire.md"))
# write_report(analyse, TEMP_SECTIONS / fichier_a_reviser)
# ingest_document(TEMP_SECTIONS / fichier_a_reviser)
full_prompt = """
Suivre scrupuleusement les consignes.
"""
system_message = f"""
Vous êtes un réviseur professionnel expert en écriture stratégique, maîtrisant parfaitement la langue française et habitué à réviser des textes destinés à des dirigeants de haut niveau (COMEX).
Votre tâche unique est d'améliorer strictement la qualité rédactionnelle du texte suivant, sans modifier en aucune manière :
- la structure existante (sections, titres, sous-titres),
- l'ordre des paragraphes et des idées,
- le sens précis du contenu original,
- sans ajouter aucune information nouvelle.
Votre révision doit impérativement respecter les points suivants :
- Éliminer toutes répétitions ou redondances et varier systématiquement les tournures entre les paragraphes.
- Rendre chaque phrase claire, directe et concise. Si une phrase est trop longue, scindez-la clairement en plusieurs phrases courtes.
- Structurer chaque paragraphe en 2 à 3 parties cohérentes, reliées entre elles par des termes logiques (coordination, implication, opposition, etc.) et séparées par des retours à la ligne.
- Remplacer systématiquement les acronymes par ces expressions précises :
- ICS « capacité à substituer un minerai »
- IHH « concentration géographique ou industrielle »
- ISG « stabilité géopolitique »
- IVC « concurrence intersectorielle pour les minerais »
Votre texte final doit être parfaitement fluide, agréable à lire, adapté à un COMEX, avec un ton professionnel et sobre.
**Important : Ne répondez strictement que par le texte révisé ci-dessous, sans aucun commentaire ou explication supplémentaire.**
Voici le texte à réviser précisément :
{analyse}
/no_think
"""
revision = generate_text("", full_prompt, system_message, "0.1", False).split("</think>")[-1].strip()
print("Relecture")
return revision
def supprimer_fichiers(session_uuid):
try:
delete_documents_by_criteria(session_uuid)
for temp_file in TEMP_SECTIONS.glob(f"*{session_uuid}*.md"):
temp_file.unlink()
return True
except:
return False
def generer_rapport_final(rapport, analyse, resultat):
try:
rapport = Path(rapport)
analyse = Path(analyse)
with zipfile.ZipFile(resultat, "w") as zipf:
zipf.write(rapport, arcname=rapport.name)
zipf.write(analyse, arcname=analyse.name)
return True
except Exception as e:
print(f"Erreur lors du zip : {e}")
return False

766
batch_ia/utils/sections.py Normal file
View File

@ -0,0 +1,766 @@
import os
import re
from utils.config import (
CORPUS_DIR,
TEMPLATE_PATH,
determine_threshold_color
)
from utils.files import (
find_prefixed_directory,
find_corpus_file,
write_report,
read_corpus_file
)
from utils.sections_utils import (
trouver_dossier_composant,
extraire_sections_par_mot_cle
)
def generate_introduction_section(data):
"""
Génère la section d'introduction du rapport.
"""
products = [p["label"] for p in data["products"].values()]
components = [c["label"] for c in data["components"].values()]
minerals = [m["label"] for m in data["minerals"].values()]
template = []
template.append("## Introduction\n")
template.append("Ce rapport analyse les vulnérabilités de la chaîne de fabrication du numérique pour :\n")
template.append("* les produits finaux : " + ", ".join(products))
template.append("* les composants : " + ", ".join(components))
template.append("* les minerais : " + ", ".join(minerals) + "\n")
return "\n".join(template)
def generate_methodology_section():
"""
Génère la section méthodologie du rapport.
"""
template = []
template.append("## Méthodologie d'analyse des risques\n")
template.append("### Indices et seuils\n")
template.append("La méthode d'évaluation intègre 4 indices et leurs combinaisons pour identifier les chemins critiques.\n")
# IHH
template.append("#### IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'une opération\n")
# Essayer d'abord avec le chemin exact
ihh_context_file = "Criticités/Fiche technique IHH/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, ihh_context_file)):
template.append(read_corpus_file(ihh_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ihh_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique IHH")
if ihh_context_file:
template.append(read_corpus_file(ihh_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
ihh_calc_file = "Criticités/Fiche technique IHH/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, ihh_calc_file)):
template.append(read_corpus_file(ihh_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ihh_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique IHH")
if ihh_calc_file:
template.append(read_corpus_file(ihh_calc_file, remove_first_title=True))
template.append(" * Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)\n")
# ISG
template.append("#### ISG (Stabilité Géopolitique) : stabilité des pays\n")
# Essayer d'abord avec le chemin exact
isg_context_file = "Criticités/Fiche technique ISG/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, isg_context_file)):
template.append(read_corpus_file(isg_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
isg_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique ISG")
if isg_context_file:
template.append(read_corpus_file(isg_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
isg_calc_file = "Criticités/Fiche technique ISG/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, isg_calc_file)):
template.append(read_corpus_file(isg_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
isg_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique ISG")
if isg_calc_file:
template.append(read_corpus_file(isg_calc_file, remove_first_title=True))
template.append(" * Seuils : <40 = Vert (Stable), 40-60 = Orange, >60 = Rouge (Instable)\n")
# ICS
template.append("#### ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément\n")
# Essayer d'abord avec le chemin exact
ics_context_file = "Criticités/Fiche technique ICS/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, ics_context_file)):
template.append(read_corpus_file(ics_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ics_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique ICS")
if ics_context_file:
template.append(read_corpus_file(ics_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
ics_calc_file = "Criticités/Fiche technique ICS/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, ics_calc_file)):
template.append(read_corpus_file(ics_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ics_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique ICS")
if ics_calc_file:
template.append(read_corpus_file(ics_calc_file, remove_first_title=True))
template.append(" * Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)\n")
# IVC
template.append("#### IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs\n")
# Essayer d'abord avec le chemin exact
ivc_context_file = "Criticités/Fiche technique IVC/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, ivc_context_file)):
template.append(read_corpus_file(ivc_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ivc_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique IVC")
if ivc_context_file:
template.append(read_corpus_file(ivc_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
ivc_calc_file = "Criticités/Fiche technique IVC/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, ivc_calc_file)):
template.append(read_corpus_file(ivc_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ivc_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique IVC")
if ivc_calc_file:
template.append(read_corpus_file(ivc_calc_file, remove_first_title=True))
template.append(" * Seuils : <5 = Vert (Faible), 5-15 = Orange (Modérée), >15 = Rouge (Forte)\n")
# Combinaison des indices
template.append("### Combinaison des indices\n")
# IHH et ISG
template.append("**IHH et ISG**\n")
template.append("Ces deux indices s'appliquent à toutes les opérations et se combinent dans l'évaluation du risque (niveau d'impact et probabilité de survenance) :\n")
template.append("* l'IHH donne le niveau d'impact => une forte concentration implique un fort impact si le risque est avéré")
template.append("* l'ISG donne la probabilité de survenance => plus les pays sont instables (et donc plus l'ISG est élevé) et plus la survenance du risque est élevée\n")
template.append("Pour évaluer le risque pour une opération, les ISG des pays sont pondérés par les parts de marché respectives pour donner un ISG combiné dont le calcul est :")
template.append("ISG_combiné = (Somme des ISG des pays multipliée par leur part de marché) / Sommes de leur part de marché\n")
template.append("On établit alors une matrice (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ISG combiné et de l'IHH\n")
template.append("| ISG combiné / IHH | Vert | Orange | Rouge |")
template.append("| :-- | :-- | :-- | :-- |")
template.append("| Vert | 1 | 2 | 3 |")
template.append("| Orange | 2 | 4 | 6 |")
template.append("| Rouge | 3 | 6 | 9 |\n")
template.append("Les vulnérabilités se classent en trois niveaux pour chaque opération :\n")
template.append("* Vulnérabilité combinée élevée à critique : poids 6 et 9")
template.append("* Vulnérabilité combinée moyenne : poids 3 et 4")
template.append("* Vulnérabilité combinée faible : poids 1 et 2\n")
# ICS et IVC
template.append("**ICS et IVC**\n")
template.append("Ces deux indices se combinent dans l'évaluation du risque pour un minerai :\n")
template.append("* l'ICS donne le niveau d'impact => une faible substituabilité (et donc un ICS élevé) implique un fort impact si le risque est avéré ; l'ICS est associé à la relation entre un composant et un minerai")
template.append("* l'IVC donne la probabilité de l'impact => une forte concurrence intersectorielle (IVC élevé) implique une plus forte probabilité de survenance\n")
template.append("Par simplification, on intègre un ICS moyen d'un minerai comme étant la moyenne des ICS pour chacun des composants dans lesquels il intervient.\n")
template.append("On établit alors une matrice (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ICS moyen et de l'IVC.\n")
template.append("| ICS_moyen / IVC | Vert | Orange | Rouge |")
template.append("| :-- | :-- | :-- | :-- |")
template.append("| Vert | 1 | 2 | 3 |")
template.append("| Orange | 2 | 4 | 6 |")
template.append("| Rouge | 3 | 6 | 9 |\n")
template.append("Les vulnérabilités se classent en trois niveaux pour chaque minerai :\n")
template.append("* Vulnérabilité combinée élevée à critique : poids 6 et 9")
template.append("* Vulnérabilité combinée moyenne : poids 3 et 4")
template.append("* Vulnérabilité combinée faible : poids 1 et 2\n")
return "\n".join(template)
def generate_operations_section(data, results, config):
"""
Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement).
"""
# # print("DEBUG: Génération de la section des opérations")
# # print(f"DEBUG: Nombre de produits: {len(data['products'])}")
# # print(f"DEBUG: Nombre de composants: {len(data['components'])}")
# # print(f"DEBUG: Nombre d'opérations: {len(data['operations'])}")
template = []
template.append("## Détails des opérations\n")
# 1. Traiter les produits finaux (assemblage)
for product_id, product in data["products"].items():
# # print(f"DEBUG: Produit {product_id} ({product['label']}), assembly = {product['assembly']}")
if product["assembly"]:
template.append(f"### {product['label']} et Assemblage\n")
# Récupérer la présentation synthétique
# product_slug = product['label'].lower().replace(' ', '-')
sous_repertoire = f"{product['label']}"
if product["level"] == 0:
type = "Assemblage"
else:
type = "Connexe"
sous_repertoire = trouver_dossier_composant(sous_repertoire, type, "Fiche assemblage ")
product_slug = sous_repertoire.split(' ', 2)[2]
presentation_file = find_corpus_file("présentation-synthétique", f"{type}/Fiche assemblage {product_slug}")
if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True))
template.append("")
# Récupérer les principaux assembleurs
assembleurs_file = find_corpus_file("principaux-assembleurs", f"{type}/Fiche assemblage {product_slug}")
if assembleurs_file:
template.append(read_corpus_file(assembleurs_file, shift_titles=2))
template.append("")
# ISG des pays impliqués
assembly_id = product["assembly"]
operation = data["operations"][assembly_id]
template.append("##### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**")
# IHH
ihh_file = find_corpus_file("matrice-des-risques-liés-à-l-assemblage/indice-de-herfindahl-hirschmann", f"{type}/Fiche assemblage {product_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if assembly_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][assembly_id]
template.append("#### Vulnérabilité combinée IHH-ISG\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# 2. Traiter les composants (fabrication)
for component_id, component in data["components"].items():
# # print(f"DEBUG: Composant {component_id} ({component['label']}), manufacturing = {component['manufacturing']}")
if component["manufacturing"]:
template.append(f"### {component['label']} et Fabrication\n")
# Récupérer la présentation synthétique
# component_slug = component['label'].lower().replace(' ', '-')
sous_repertoire = f"{component['label']}"
sous_repertoire = trouver_dossier_composant(sous_repertoire, "Fabrication", "Fiche fabrication ")
component_slug = sous_repertoire.split(' ', 2)[2]
presentation_file = find_corpus_file("présentation-synthétique", f"Fabrication/Fiche fabrication {component_slug}")
if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True))
template.append("\n")
# Récupérer les principaux fabricants
fabricants_file = find_corpus_file("principaux-fabricants", f"Fabrication/Fiche fabrication {component_slug}")
if fabricants_file:
template.append(read_corpus_file(fabricants_file, shift_titles=2))
template.append("\n")
# ISG des pays impliqués
manufacturing_id = component["manufacturing"]
operation = data["operations"][manufacturing_id]
template.append("#### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n\n")
# IHH
ihh_file = find_corpus_file("matrice-des-risques-liés-à-la-fabrication/indice-de-herfindahl-hirschmann", f"Fabrication/Fiche fabrication {component_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if manufacturing_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][manufacturing_id]
template.append("#### Vulnérabilité combinée IHH-ISG\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# 3. Traiter les minerais (détaillés dans une section séparée)
result = "\n".join(template)
# # print(f"DEBUG: Fin de génération de la section des opérations. Taille: {len(result)} caractères")
if len(result) <= 30: # Juste le titre de section
# # print("DEBUG: ALERTE - La section des opérations est vide ou presque vide!")
# Ajout d'une section de débogage dans le rapport
template.append("### DÉBOGAGE - Opérations manquantes\n")
template.append("Aucune opération d'assemblage ou de fabrication n'a été trouvée dans les données.\n")
template.append("Informations disponibles:\n")
template.append(f"* Nombre de produits: {len(data['products'])}\n")
template.append(f"* Nombre de composants: {len(data['components'])}\n")
template.append(f"* Nombre d'opérations: {len(data['operations'])}\n")
template.append("\nDétail des produits et de leurs opérations d'assemblage:\n")
for pid, p in data["products"].items():
template.append(f"* {p['label']}: {'Assemblage: ' + str(p['assembly']) if p['assembly'] else 'Pas d\'assemblage'}\n")
template.append("\nDétail des composants et de leurs opérations de fabrication:\n")
for cid, c in data["components"].items():
template.append(f"* {c['label']}: {'Fabrication: ' + str(c['manufacturing']) if c['manufacturing'] else 'Pas de fabrication'}\n")
result = "\n".join(template)
return result
def generate_minerals_section(data, results, config):
"""
Génère la section détaillant les minerais et leurs opérations d'extraction et traitement.
"""
template = []
template.append("## Détails des minerais\n")
for mineral_id, mineral in data["minerals"].items():
mineral_slug = mineral['label'].lower().replace(' ', '-')
fiche_dir = f"{CORPUS_DIR}/Minerai/Fiche minerai {mineral_slug}"
if not os.path.exists(fiche_dir):
continue
template.append(f"---\n\n### {mineral['label']}\n")
# Récupérer la présentation synthétique
presentation_file = find_corpus_file("présentation-synthétique", f"Minerai/Fiche minerai {mineral_slug}")
if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True))
template.append("\n")
# ICS
template.append("#### ICS\n")
ics_intro_file = find_corpus_file("risque-de-substituabilité/_intro", f"Minerai/Fiche minerai {mineral_slug}")
if ics_intro_file:
template.append(read_corpus_file(ics_intro_file, remove_first_title=True))
template.append("\n")
# Calcul de l'ICS moyen
ics_values = list(mineral["ics_values"].values())
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
color, suffix = determine_threshold_color(ics_average, "ICS", config.get('thresholds'))
template.append("##### Valeurs d'ICS par composant\n")
template.append("| Composant | ICS | Criticité |")
template.append("| :-- | :-- | :-- |")
for comp_id, ics_value in mineral["ics_values"].items():
comp_name = data["components"][comp_id]["label"]
comp_color, comp_suffix = determine_threshold_color(ics_value, "ICS", config.get('thresholds'))
template.append(f"| {comp_name} | {ics_value:.2f} | {comp_color} ({comp_suffix}) |")
template.append(f"\n**ICS moyen : {ics_average:.2f} - {color} ({suffix})**\n")
# IVC
template.append("#### IVC\n\n")
# Valeur IVC
ivc_value = mineral.get("ivc", 0)
color, suffix = determine_threshold_color(ivc_value, "IVC", config.get('thresholds'))
template.append(f"**IVC: {ivc_value} - {color} ({suffix})**\n")
# Récupérer toutes les sections de vulnérabilité de concurrence
ivc_sections = []
ivc_dir = find_prefixed_directory("vulnérabilité-de-concurrence", f"Minerai/Fiche minerai {mineral_slug}")
corpus_path = os.path.join(CORPUS_DIR, ivc_dir) if os.path.exists(os.path.join(CORPUS_DIR, ivc_dir)) else None
if corpus_path:
for file in sorted(os.listdir(corpus_path)):
if file.endswith('.md') and "_intro.md" not in file and "sources" not in file:
ivc_sections.append(os.path.join(ivc_dir, file))
# Inclure chaque section IVC
for section_file in ivc_sections:
content = read_corpus_file(section_file, remove_first_title=False)
# Nettoyer les balises des fichiers IVC
content = re.sub(r'```.*?```', '', content, flags=re.DOTALL)
# Mettre le titre en italique s'il commence par un # (format Markdown pour titre)
if content and '\n' in content:
first_line, rest = content.split('\n', 1)
if first_line.strip().startswith('#'):
# Extraire le texte du titre sans les # et les espaces
title_text = first_line.strip().lstrip('#').strip()
content = f"\n*{title_text}*\n{rest.strip()}"
# Ne pas ajouter de contenu vide
if content.strip():
template.append(content.strip())
# ICS et IVC combinés
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append("\n#### Vulnérabilité combinée ICS-IVC\n")
template.append(f"* ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']} ({combined['ics_suffix']})")
template.append(f"* IVC: {combined['ivc_value']} - {combined['ivc_color']} ({combined['ivc_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# Extraction
if mineral["extraction"]:
template.append("#### Extraction\n")
# Récupérer les principaux producteurs
producers_file = find_corpus_file("principaux-producteurs-extraction", f"Minerai/Fiche minerai {mineral_slug}")
if producers_file:
template.append(read_corpus_file(producers_file, remove_first_title=True))
template.append("\n")
# ISG des pays impliqués
extraction_id = mineral["extraction"]
operation = data["operations"][extraction_id]
template.append("##### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n")
# IHH extraction
ihh_file = find_corpus_file("matrice-des-risques/indice-de-herfindahl-hirschmann-extraction", f"Minerai/Fiche minerai {mineral_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if extraction_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][extraction_id]
template.append("##### Vulnérabilité combinée IHH-ISG pour l'extraction\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# Traitement
if mineral["treatment"]:
template.append("#### Traitement\n")
# Récupérer les principaux producteurs
producers_file = find_corpus_file("principaux-producteurs-traitement", f"Minerai/Fiche minerai {mineral_slug}")
if producers_file:
template.append(read_corpus_file(producers_file, remove_first_title=True))
template.append("\n")
# ISG des pays impliqués
treatment_id = mineral["treatment"]
operation = data["operations"][treatment_id]
template.append("##### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n")
# IHH traitement
ihh_file = find_corpus_file("matrice-des-risques/indice-de-herfindahl-hirschmann-traitement", f"Minerai/Fiche minerai {mineral_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if treatment_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][treatment_id]
template.append("##### Vulnérabilité combinée IHH-ISG pour le traitement\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
return "\n".join(template)
def generate_critical_paths_section(data, results):
"""
Génère la section des chemins critiques.
"""
template = []
template.append("## Chemins critiques\n")
# Récupérer les chaînes par niveau de risque
critical_chains = []
major_chains = []
medium_chains = []
for chain in results["chains"]:
if chain["risk_level"] == "critique":
critical_chains.append(chain)
elif chain["risk_level"] == "majeur":
major_chains.append(chain)
elif chain["risk_level"] == "moyen":
medium_chains.append(chain)
# 1. Chaînes critiques
template.append("### Chaînes avec risque critique\n")
template.append("*Ces chaînes comprennent au moins une vulnérabilité combinée élevée à critique*\n")
if critical_chains:
for chain in critical_chains:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"#### {product_name}{component_name}{mineral_name}\n")
# Vulnérabilités
template.append("**Vulnérabilités identifiées:**\n")
for vuln in chain["vulnerabilities"]:
vuln_type = vuln["type"].capitalize()
vuln_level = vuln["vulnerability"]
if vuln_type == "Minerai":
mineral_id = vuln["mineral_id"]
template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}")
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}")
template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}")
else:
op_id = vuln["operation_id"]
op_label = data["operations"][op_id]["label"]
template.append(f"* {vuln_type} ({op_label}): {vuln_level}")
if op_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][op_id]
template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}")
template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}")
template.append("\n")
else:
template.append("Aucune chaîne à risque critique identifiée.\n")
# 2. Chaînes majeures
template.append("### Chaînes avec risque majeur\n")
template.append("*Ces chaînes comprennent au moins trois vulnérabilités combinées moyennes*\n")
if major_chains:
for chain in major_chains:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"#### {product_name}{component_name}{mineral_name}\n")
# Vulnérabilités
template.append("**Vulnérabilités identifiées:**\n")
for vuln in chain["vulnerabilities"]:
vuln_type = vuln["type"].capitalize()
vuln_level = vuln["vulnerability"]
if vuln_type == "Minerai":
mineral_id = vuln["mineral_id"]
template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}\n")
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}\n")
template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}\n")
else:
op_id = vuln["operation_id"]
op_label = data["operations"][op_id]["label"]
template.append(f"* {vuln_type} ({op_label}): {vuln_level}\n")
if op_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][op_id]
template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}\n")
template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}\n")
template.append("\n")
else:
template.append("Aucune chaîne à risque majeur identifiée.\n")
# 3. Chaînes moyennes
template.append("### Chaînes avec risque moyen\n")
template.append("*Ces chaînes comprennent au moins une vulnérabilité combinée moyenne*\n")
if medium_chains:
for chain in medium_chains:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"#### {product_name}{component_name}{mineral_name}\n")
# Vulnérabilités
template.append("**Vulnérabilités identifiées:**\n")
for vuln in chain["vulnerabilities"]:
vuln_type = vuln["type"].capitalize()
vuln_level = vuln["vulnerability"]
if vuln_type == "Minerai":
mineral_id = vuln["mineral_id"]
template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}\n")
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}\n")
template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}\n")
else:
op_id = vuln["operation_id"]
op_label = data["operations"][op_id]["label"]
template.append(f"* {vuln_type} ({op_label}): {vuln_level}\n")
if op_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][op_id]
template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}\n")
template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}\n")
template.append("\n")
else:
template.append("Aucune chaîne à risque moyen identifiée.\n")
return "\n".join(template)
def generate_report(data, results, config):
"""
Génère le rapport complet structuré selon les spécifications.
"""
# Titre principal
report_titre = ["# Évaluation des vulnérabilités critiques\n"]
# Section d'introduction
report_introduction = generate_introduction_section(data)
# report.append(generate_introduction_section(data))
# Section méthodologie
report_methodologie = generate_methodology_section()
# report.append(generate_methodology_section())
# Section détails des opérations
report_operations = generate_operations_section(data, results, config)
# report.append(generate_operations_section(data, results, config))
# Section détails des minerais
report_minerals = generate_minerals_section(data, results, config)
# report.append(generate_minerals_section(data, results, config))
# Section chemins critiques
report_critical_paths = generate_critical_paths_section(data, results)
suffixe = " - chemins critiques"
fichier = TEMPLATE_PATH.name.replace(".md", f"{suffixe}.md")
fichier_path = TEMPLATE_PATH.parent / fichier
# Élever les titres Markdown dans report_critical_paths
report_critical_paths = re.sub(r'^(#{2,})', lambda m: '#' * (len(m.group(1)) - 1), report_critical_paths, flags=re.MULTILINE)
write_report(report_critical_paths, fichier_path)
# Récupérer les sections critiques décomposées par mot-clé
chemins_critiques_sections = extraire_sections_par_mot_cle(fichier_path)
file_names = []
# Pour chaque mot-clé, écrire un fichier individuel
for mot_cle, contenu in chemins_critiques_sections.items():
print(mot_cle)
suffixe = f" - chemins critiques {mot_cle}"
fichier_personnalise = TEMPLATE_PATH.with_name(
TEMPLATE_PATH.name.replace(".md", f"{suffixe}.md")
)
# Ajouter du texte au début du contenu
introduction = f"# Détail des chemins critiques pour : {mot_cle}\n\n"
contenu = introduction + contenu
write_report(contenu, fichier_personnalise)
file_names.append(fichier_personnalise)
# report.append(generate_critical_paths_section(data, results))
# Ordre de composition final
report = (
report_titre +
[report_introduction] +
[report_critical_paths] +
[report_operations] +
[report_minerals] +
[report_methodologie]
)
return "\n".join(report), file_names

View File

@ -0,0 +1,92 @@
import os
import re
from pathlib import Path
from collections import defaultdict
from utils.config import (
CORPUS_DIR
)
def composant_match(nom_composant, nom_dossier):
"""
Vérifie si le nom du composant correspond approximativement à un nom de dossier (lettres et chiffres dans le même ordre).
"""
def clean(s):
return ''.join(c.lower() for c in s if c.isalnum())
cleaned_comp = clean(nom_composant)
cleaned_dir = clean(nom_dossier)
# Vérifie que chaque caractère de cleaned_comp est présent dans cleaned_dir dans le bon ordre
it = iter(cleaned_dir)
return all(c in it for c in cleaned_comp)
def trouver_dossier_composant(nom_composant, base_path, prefixe):
"""
Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant.
"""
search_path = os.path.join(CORPUS_DIR, base_path)
if not os.path.exists(search_path):
return None
for d in os.listdir(search_path):
if os.path.isdir(os.path.join(search_path, d)):
if composant_match(f"{prefixe}{nom_composant}", d):
return os.path.join(base_path, d)
return None
def extraire_sections_par_mot_cle(fichier_markdown: Path) -> dict:
"""
Extrait les sections de niveau 3 uniquement dans la section
'## Chaînes avec risque critique' du fichier Markdown,
et les regroupe par mot-clé (ce qui se trouve entre '### ' et '').
Réduit chaque titre dun niveau (#).
"""
with fichier_markdown.open(encoding="utf-8") as f:
contenu = f.read()
# Extraire uniquement la section '## Chaînes avec risque critique'
match_section = re.search(
r"## Chaînes avec risque critique(.*?)(?=\n## |\Z)", contenu, re.DOTALL
)
if not match_section:
return {}
section_critique = match_section.group(1)
# Extraire les mots-clés entre '### ' et ' →'
mots_cles = set(re.findall(r"^### (.+?) →", section_critique, re.MULTILINE))
# Extraire tous les blocs de niveau 3 dans cette section uniquement
blocs_sections = re.findall(r"(### .+?)(?=\n### |\n## |\Z)", section_critique, re.DOTALL)
# Regrouper les blocs par mot-clé
regroupement = defaultdict(list)
for bloc in blocs_sections:
match = re.match(r"### (.+?) →", bloc)
if match:
mot = match.group(1)
if mot in mots_cles:
# Réduction du niveau des titres
bloc_modifie = re.sub(r"^###", "##", bloc, flags=re.MULTILINE)
bloc_modifie = re.sub(r"^###", "##", bloc_modifie, flags=re.MULTILINE)
regroupement[mot].append(bloc_modifie)
return {mot: "\n\n".join(blocs) for mot, blocs in regroupement.items()}
def nettoyer_texte_fr(texte: str) -> str:
# Apostrophes droites -> typographiques
texte = texte.replace("'", "")
# Guillemets droits -> guillemets français (avec espace fine insécable)
texte = re.sub(r'"(.*?)"', r'« \1»', texte)
# Espaces fines insécables avant : ; ! ?
texte = re.sub(r' (?=[:;!?])', '\u202F', texte)
# Unités : espace insécable entre chiffre et unité
texte = re.sub(r'(\d) (?=\w+)', lambda m: f"{m.group(1)}\u202F", texte)
# Suppression des doubles espaces
texte = re.sub(r' {2,}', ' ', texte)
# Remplacement optionnel des tirets simples (optionnel)
texte = texte.replace(" - ", " ")
# Nettoyage ponctuation multiple accidentelle
texte = re.sub(r'\s+([.,;!?])', r'\1', texte)
return texte

15
fabnum-dev.service Normal file
View File

@ -0,0 +1,15 @@
[Unit]
Description=Fabnum Dev - Streamlit App
After=network.target
[Service]
User=fabnum
WorkingDirectory=/home/fabnum/fabnum-dev
ExecStart=/home/fabnum/fabnum-dev/venv/bin/streamlit run /home/fabnum/fabnum-dev
/fabnum.py --server.port 8502
Restart=always
RestartSec=5
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.targe