#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script pour générer un rapport factorisé des vulnérabilités critiques suivant la structure définie dans Remarques.md. """ import os import sys import re import yaml from networkx.drawing.nx_agraph import read_dot from pathlib import Path from collections import defaultdict import uuid import requests import json import time import zipfile from nettoyer_pgpt import delete_documents_by_criteria session_uuid = str(uuid.uuid4())[:8] # Utiliser les 8 premiers caractères pour plus de concision print(f"🔑 UUID de session généré: {session_uuid}") BASE_DIR = Path(__file__).resolve().parent CORPUS_DIR = BASE_DIR.parent / "Corpus" THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml" REFERENCE_GRAPH_PATH = BASE_DIR.parent / "schema.txt" GRAPH_PATH = BASE_DIR.parent / "graphe.dot" TEMP_SECTIONS = BASE_DIR / "temp_sections" TEMPLATE_PATH = TEMP_SECTIONS / f"rapport_final - {session_uuid}.md" if not TEMP_SECTIONS.exists(): TEMP_SECTIONS.mkdir(parents=True) PGPT_URL = "http://127.0.0.1:8001" API_URL = f"{PGPT_URL}/v1" PROMPT_METHODOLOGIE = """ Le rapport à examiner a été établi à partir de la méthodologie suivante. Le dispositif d’évaluation des risques proposé repose sur quatre indices clairement définis, chacun analysant un aspect spécifique des risques dans la chaîne d’approvisionnement numérique. L’indice IHH mesure la concentration géographique ou industrielle, permettant d’évaluer la dépendance vis-à-vis de certains acteurs ou régions. L’indice ISG indique la stabilité géopolitique des pays impliqués dans la chaîne de production, en intégrant des critères politiques, sociaux et climatiques. L’indice ICS quantifie la facilité ou la difficulté à remplacer ou substituer un élément spécifique dans la chaîne, évaluant ainsi les risques liés à la dépendance technologique et économique. Enfin, l’indice IVC examine la pression concurrentielle sur les ressources utilisées par le numérique, révélant ainsi le risque potentiel que ces ressources soient détournées vers d’autres secteurs industriels. Ces indices se combinent judicieusement par paires pour une évaluation approfondie et pertinente des risques. La combinaison IHH-ISG permet d’associer la gravité d'un impact potentiel (IHH) à la probabilité de survenance d’un événement perturbateur (ISG), créant ainsi une matrice de vulnérabilité combinée utile pour identifier rapidement les points critiques dans la chaîne de production. La combinaison ICS-IVC fonctionne selon la même logique, mais se concentre spécifiquement sur les ressources minérales : l’ICS indique la gravité potentielle d'une rupture d'approvisionnement due à une faible substituabilité, tandis que l’IVC évalue la probabilité que les ressources soient captées par d'autres secteurs industriels concurrents. Ces combinaisons permettent d’obtenir une analyse précise et opérationnelle du niveau de risque global. Les avantages de cette méthodologie résident dans son approche à la fois systématique et granulaire, adaptée à l'échelle décisionnelle d'un COMEX. Elle permet d’identifier avec précision les vulnérabilités majeures et leurs origines spécifiques, facilitant ainsi la prise de décision stratégique éclairée et proactive. En combinant des facteurs géopolitiques, industriels, technologiques et concurrentiels, ces indices offrent un suivi efficace de la chaîne de fabrication numérique, garantissant ainsi une gestion optimale des risques et la continuité opérationnelle à long terme. """ DICTIONNAIRE_CRITICITES = { "IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"}, "ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"}, "ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"}, "IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"} } POIDS_COULEURS = { "Vert": 1, "Orange": 2, "Rouge": 3 } def load_config(thresholds_path=THRESHOLDS_PATH): """Charge la configuration depuis les fichiers YAML.""" config = {} # Charger les seuils if os.path.exists(thresholds_path): with open(thresholds_path, 'r', encoding='utf-8') as f: thresholds = yaml.safe_load(f) config['thresholds'] = thresholds.get('seuils', {}) return config def determine_threshold_color(value, index_type, thresholds): """ Détermine la couleur du seuil en fonction du type d'indice et de sa valeur. Utilise les seuils de config.yaml si disponibles. """ # Récupérer les seuils pour cet indice if index_type in thresholds: index_thresholds = thresholds[index_type] # Déterminer la couleur if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \ index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]: suffix = get_suffix_for_index(index_type, "vert") return "Vert", suffix elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \ index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \ index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]: suffix = get_suffix_for_index(index_type, "orange") return "Orange", suffix elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \ index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]: suffix = get_suffix_for_index(index_type, "rouge") return "Rouge", suffix return "Non déterminé", "" def get_suffix_for_index(index_type, color): """Retourne le suffixe approprié pour chaque indice et couleur.""" suffixes = DICTIONNAIRE_CRITICITES if index_type in suffixes and color in suffixes[index_type]: return suffixes[index_type][color] return "" def get_weight_for_color(color): """Retourne le poids correspondant à une couleur.""" weights = POIDS_COULEURS return weights.get(color, 0) def strip_prefix(name): """Supprime le préfixe numérique éventuel d'un nom de fichier ou de dossier.""" return re.sub(r'^\d+[-_ ]*', '', name).lower() def find_prefixed_directory(pattern, base_path=None): """ Recherche un sous-répertoire dont le nom (sans préfixe) correspond au pattern. Args: pattern: Nom du répertoire sans préfixe base_path: Répertoire de base où chercher Returns: Le chemin relatif du répertoire trouvé (avec préfixe) ou None """ if base_path: search_path = os.path.join(CORPUS_DIR, base_path) else: search_path = CORPUS_DIR if not os.path.exists(search_path): # print(f"Chemin inexistant: {search_path}") return None for d in os.listdir(search_path): dir_path = os.path.join(search_path, d) if os.path.isdir(dir_path) and strip_prefix(d) == pattern.lower(): return os.path.relpath(dir_path, CORPUS_DIR) # print(f"Aucun répertoire correspondant à: '{pattern}' trouvé dans {search_path}") return None def find_corpus_file(pattern, base_path=None): """ Recherche récursive dans le corpus d'un fichier en ignorant les préfixes numériques dans les dossiers et fichiers. Args: pattern: Chemin relatif type "sous-dossier/nom-fichier" base_path: Dossier de base à partir duquel chercher Returns: Chemin relatif du fichier trouvé ou None """ if base_path: search_path = os.path.join(CORPUS_DIR, base_path) else: search_path = CORPUS_DIR # # print(f"Recherche de: '{pattern}' dans {search_path}") if not os.path.exists(search_path): # print(pattern) # print(base_path) # print(f"Chemin inexistant: {search_path}") return None if '/' not in pattern: # Recherche directe d'un fichier for file in os.listdir(search_path): if not file.endswith('.md'): continue if strip_prefix(os.path.splitext(file)[0]) == pattern.lower(): rel_path = os.path.relpath(os.path.join(search_path, file), CORPUS_DIR) # # print(f"Fichier trouvé: {rel_path}") return rel_path else: # Séparation du chemin en dossier/fichier first, rest = pattern.split('/', 1) matched_dir = find_prefixed_directory(first, base_path) if matched_dir: return find_corpus_file(rest, matched_dir) # print(f"Aucun fichier correspondant à: '{pattern}' trouvé dans {base_path}.") return None def read_corpus_file(file_path, remove_first_title=False, shift_titles=0): """ Lit un fichier du corpus et applique les transformations demandées. Args: file_path: Chemin relatif du fichier dans le corpus remove_first_title: Si True, supprime la première ligne de titre shift_titles: Nombre de niveaux à ajouter aux titres Returns: Le contenu du fichier avec les transformations appliquées """ full_path = os.path.join(CORPUS_DIR, file_path) if not os.path.exists(full_path): # print(f"Fichier non trouvé: {full_path}") return f"Fichier non trouvé: {file_path}" # # print(f"Lecture du fichier: {full_path}") with open(full_path, 'r', encoding='utf-8') as f: lines = f.readlines() # Supprimer la première ligne si c'est un titre et si demandé if remove_first_title and lines and lines[0].startswith('#'): # # print(f"Suppression du titre: {lines[0].strip()}") lines = lines[1:] # Décaler les niveaux de titre si demandé if shift_titles > 0: for i in range(len(lines)): if lines[i].startswith('#'): lines[i] = '#' * shift_titles + lines[i] # Nettoyer les retours à la ligne superflus content = ''.join(lines) # Supprimer les retours à la ligne en fin de contenu content = content.rstrip('\n') + '\n' return content def parse_graphs(graphe_path): """ Charge et analyse les graphes DOT (analyse et référence). """ # Charger le graphe à analyser if not os.path.exists(graphe_path): # print(f"Fichier de graphe à analyser introuvable: {graphe_path}") sys.exit(1) # Charger le graphe de référence reference_path = REFERENCE_GRAPH_PATH if not os.path.exists(reference_path): # print(f"Fichier de graphe de référence introuvable: {reference_path}") sys.exit(1) try: # Charger les graphes avec NetworkX graph = read_dot(graphe_path) ref_graph = read_dot(reference_path) # Convertir les attributs en types appropriés pour les deux graphes for g in [graph, ref_graph]: for node, attrs in g.nodes(data=True): for key, value in list(attrs.items()): # Convertir les valeurs numériques if key in ['niveau', 'ihh_acteurs', 'ihh_pays', 'isg', 'ivc']: try: if key in ['isg', 'ivc', 'ihh_acteurs', 'ihh_pays', 'niveau']: attrs[key] = int(value.strip('"')) else: attrs[key] = float(value.strip('"')) except (ValueError, TypeError): # Garder la valeur originale si la conversion échoue pass elif key == 'label': # Nettoyer les guillemets des étiquettes attrs[key] = value.strip('"') # Convertir les attributs des arêtes for u, v, attrs in g.edges(data=True): for key, value in list(attrs.items()): if key in ['ics', 'cout', 'delai', 'technique']: try: attrs[key] = float(value.strip('"')) except (ValueError, TypeError): pass elif key == 'label' and '%' in value: # Extraire le pourcentage try: percentage = value.strip('"').replace('%', '') attrs['percentage'] = float(percentage) except (ValueError, TypeError): pass return graph, ref_graph except Exception as e: print(f"Erreur lors de l'analyse des graphes: {str(e)}") sys.exit(1) def extract_data_from_graph(graph, ref_graph): """ Extrait toutes les données pertinentes des graphes DOT. """ data = { "products": {}, # Produits finaux (N0) "components": {}, # Composants (N1) "minerals": {}, # Minerais (N2) "operations": {}, # Opérations (N10) "countries": {}, # Pays (N11) "geo_countries": {}, # Pays géographiques (N99) "actors": {} # Acteurs (N12) } # Extraire tous les pays géographiques du graphe de référence for node, attrs in ref_graph.nodes(data=True): if attrs.get('niveau') == 99: country_name = attrs.get('label', node) isg_value = attrs.get('isg', 0) data["geo_countries"][country_name] = { "id": node, "isg": isg_value } # Extraire les nœuds du graphe à analyser for node, attrs in graph.nodes(data=True): level = attrs.get('niveau', -1) label = attrs.get('label', node) if level == 0 or level == 1000: # Produit final data["products"][node] = { "label": label, "components": [], "assembly": None, "level": level } elif level == 1 or level == 1001: # Composant data["components"][node] = { "label": label, "minerals": [], "manufacturing": None } elif level == 2: # Minerai data["minerals"][node] = { "label": label, "ivc": attrs.get('ivc', 0), "extraction": None, "treatment": None, "ics_values": {} } elif level == 10 or level == 1010: # Opération op_type = label.lower() data["operations"][node] = { "label": label, "type": op_type, "ihh_acteurs": attrs.get('ihh_acteurs', 0), "ihh_pays": attrs.get('ihh_pays', 0), "countries": {} } elif level == 11 or level == 1011: # Pays data["countries"][node] = { "label": label, "actors": {}, "geo_country": None, "market_share": 0 } elif level == 12 or level == 1012: # Acteur data["actors"][node] = { "label": label, "country": None, "market_share": 0 } # Extraire les relations et attributs des arêtes for source, target, edge_attrs in graph.edges(data=True): if source not in graph.nodes or target not in graph.nodes: continue source_level = graph.nodes[source].get('niveau', -1) target_level = graph.nodes[target].get('niveau', -1) # Extraire part de marché market_share = 0 if 'percentage' in edge_attrs: market_share = edge_attrs['percentage'] elif 'label' in edge_attrs and '%' in edge_attrs['label']: try: market_share = float(edge_attrs['label'].strip('"').replace('%', '')) except (ValueError, TypeError): pass # Relations produit → composant if (source_level == 0 and target_level == 1) or (source_level == 1000 and target_level == 1001): if target not in data["products"][source]["components"]: data["products"][source]["components"].append(target) # Relations produit → opération (assemblage) elif (source_level == 0 and target_level == 10) or (source_level == 1000 and target_level == 1010): if graph.nodes[target].get('label', '').lower() == 'assemblage': data["products"][source]["assembly"] = target # Relations composant → minerai avec ICS elif (source_level == 1 or source_level == 1001) and target_level == 2: if target not in data["components"][source]["minerals"]: data["components"][source]["minerals"].append(target) # Stocker l'ICS s'il est présent if 'ics' in edge_attrs: ics_value = edge_attrs['ics'] data["minerals"][target]["ics_values"][source] = ics_value # Relations composant → opération (fabrication) elif (source_level == 1 or source_level == 1001) and target_level == 10: if graph.nodes[target].get('label', '').lower() == 'fabrication': data["components"][source]["manufacturing"] = target # Relations minerai → opération (extraction/traitement) elif source_level == 2 and target_level == 10: op_label = graph.nodes[target].get('label', '').lower() if 'extraction' in op_label: data["minerals"][source]["extraction"] = target elif 'traitement' in op_label: data["minerals"][source]["treatment"] = target # Relations opération → pays avec part de marché elif (source_level == 10 and target_level == 11) or (source_level == 1010 and target_level == 1011): data["operations"][source]["countries"][target] = market_share data["countries"][target]["market_share"] = market_share # Relations pays → acteur avec part de marché elif (source_level == 11 and target_level == 12) or (source_level == 1011 and target_level == 1012): data["countries"][source]["actors"][target] = market_share data["actors"][target]["market_share"] = market_share data["actors"][target]["country"] = source # Relations pays → pays géographique elif (source_level == 11 or source_level == 1011) and target_level == 99: country_name = graph.nodes[target].get('label', '') data["countries"][source]["geo_country"] = country_name # Compléter les opérations manquantes pour les produits et composants # en les récupérant du graphe de référence si elles existent # Pour les produits finaux (N0) for product_id, product_data in data["products"].items(): if product_data["assembly"] is None: # Chercher l'opération d'assemblage dans le graphe de référence for source, target, edge_attrs in ref_graph.edges(data=True): if (source == product_id and ((ref_graph.nodes[source].get('niveau') == 0 and ref_graph.nodes[target].get('niveau') == 10) or (ref_graph.nodes[source].get('niveau') == 1000 and ref_graph.nodes[target].get('niveau') == 1010)) and ref_graph.nodes[target].get('label', '').lower() == 'assemblage'): # L'opération existe dans le graphe de référence assembly_id = target product_data["assembly"] = assembly_id # Ajouter l'opération si elle n'existe pas déjà if assembly_id not in data["operations"]: data["operations"][assembly_id] = { "label": ref_graph.nodes[assembly_id].get('label', assembly_id), "type": "assemblage", "ihh_acteurs": ref_graph.nodes[assembly_id].get('ihh_acteurs', 0), "ihh_pays": ref_graph.nodes[assembly_id].get('ihh_pays', 0), "countries": {} } # Extraire les relations de l'opération vers les pays for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True): if (op_source == assembly_id and (ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)): country_id = op_target # Extraire part de marché market_share = 0 if 'percentage' in op_edge_attrs: market_share = op_edge_attrs['percentage'] elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']: try: market_share = float(op_edge_attrs['label'].strip('"').replace('%', '')) except (ValueError, TypeError): pass # Ajouter le pays à l'opération data["operations"][assembly_id]["countries"][country_id] = market_share # Ajouter le pays s'il n'existe pas déjà if country_id not in data["countries"]: data["countries"][country_id] = { "label": ref_graph.nodes[country_id].get('label', country_id), "actors": {}, "geo_country": None, "market_share": market_share } else: data["countries"][country_id]["market_share"] = market_share # Extraire les relations du pays vers les acteurs for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True): if (country_source == country_id and (ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)): actor_id = country_target # Extraire part de marché actor_market_share = 0 if 'percentage' in country_edge_attrs: actor_market_share = country_edge_attrs['percentage'] elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']: try: actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', '')) except (ValueError, TypeError): pass # Ajouter l'acteur au pays data["countries"][country_id]["actors"][actor_id] = actor_market_share # Ajouter l'acteur s'il n'existe pas déjà if actor_id not in data["actors"]: data["actors"][actor_id] = { "label": ref_graph.nodes[actor_id].get('label', actor_id), "country": country_id, "market_share": actor_market_share } else: data["actors"][actor_id]["market_share"] = actor_market_share data["actors"][actor_id]["country"] = country_id # Extraire la relation du pays vers le pays géographique for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True): if (geo_source == country_id and ref_graph.nodes[geo_target].get('niveau') == 99): geo_country_name = ref_graph.nodes[geo_target].get('label', '') data["countries"][country_id]["geo_country"] = geo_country_name break # Une seule opération d'assemblage par produit # Pour les composants (N1) for component_id, component_data in data["components"].items(): if component_data["manufacturing"] is None: # Chercher l'opération de fabrication dans le graphe de référence for source, target, edge_attrs in ref_graph.edges(data=True): if (source == component_id and ((ref_graph.nodes[source].get('niveau') == 1 and ref_graph.nodes[target].get('niveau') == 10) or (ref_graph.nodes[source].get('niveau') == 1001 and ref_graph.nodes[target].get('niveau') == 1010)) and ref_graph.nodes[target].get('label', '').lower() == 'fabrication'): # L'opération existe dans le graphe de référence manufacturing_id = target component_data["manufacturing"] = manufacturing_id # Ajouter l'opération si elle n'existe pas déjà if manufacturing_id not in data["operations"]: data["operations"][manufacturing_id] = { "label": ref_graph.nodes[manufacturing_id].get('label', manufacturing_id), "type": "fabrication", "ihh_acteurs": ref_graph.nodes[manufacturing_id].get('ihh_acteurs', 0), "ihh_pays": ref_graph.nodes[manufacturing_id].get('ihh_pays', 0), "countries": {} } # Extraire les relations de l'opération vers les pays for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True): if (op_source == manufacturing_id and (ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)): country_id = op_target # Extraire part de marché market_share = 0 if 'percentage' in op_edge_attrs: market_share = op_edge_attrs['percentage'] elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']: try: market_share = float(op_edge_attrs['label'].strip('"').replace('%', '')) except (ValueError, TypeError): pass # Ajouter le pays à l'opération data["operations"][manufacturing_id]["countries"][country_id] = market_share # Ajouter le pays s'il n'existe pas déjà if country_id not in data["countries"]: data["countries"][country_id] = { "label": ref_graph.nodes[country_id].get('label', country_id), "actors": {}, "geo_country": None, "market_share": market_share } else: data["countries"][country_id]["market_share"] = market_share # Extraire les relations du pays vers les acteurs for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True): if (country_source == country_id and (ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)): actor_id = country_target # Extraire part de marché actor_market_share = 0 if 'percentage' in country_edge_attrs: actor_market_share = country_edge_attrs['percentage'] elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']: try: actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', '')) except (ValueError, TypeError): pass # Ajouter l'acteur au pays data["countries"][country_id]["actors"][actor_id] = actor_market_share # Ajouter l'acteur s'il n'existe pas déjà if actor_id not in data["actors"]: data["actors"][actor_id] = { "label": ref_graph.nodes[actor_id].get('label', actor_id), "country": country_id, "market_share": actor_market_share } else: data["actors"][actor_id]["market_share"] = actor_market_share data["actors"][actor_id]["country"] = country_id # Extraire la relation du pays vers le pays géographique for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True): if (geo_source == country_id and ref_graph.nodes[geo_target].get('niveau') == 99): geo_country_name = ref_graph.nodes[geo_target].get('label', '') data["countries"][country_id]["geo_country"] = geo_country_name break # Une seule opération de fabrication par composant return data def calculate_vulnerabilities(data, config): """ Calcule les vulnérabilités combinées pour toutes les opérations et minerais. """ thresholds = config.get('thresholds', {}) results = { "ihh_isg_combined": {}, # Pour chaque opération "ics_ivc_combined": {}, # Pour chaque minerai "chains": [] # Pour stocker tous les chemins possibles } # 1. Calculer ISG_combiné pour chaque opération for op_id, operation in data["operations"].items(): isg_weighted_sum = 0 total_share = 0 # Parcourir chaque pays impliqué dans l'opération for country_id, share in operation["countries"].items(): country = data["countries"][country_id] geo_country = country.get("geo_country") if geo_country and geo_country in data["geo_countries"]: isg_value = data["geo_countries"][geo_country]["isg"] isg_weighted_sum += isg_value * share total_share += share # Calculer la moyenne pondérée isg_combined = 0 if total_share > 0: isg_combined = isg_weighted_sum / total_share # Déterminer couleurs et poids ihh_value = operation["ihh_pays"] ihh_color, ihh_suffix = determine_threshold_color(ihh_value, "IHH", thresholds) isg_color, isg_suffix = determine_threshold_color(isg_combined, "ISG", thresholds) # Calculer poids combiné ihh_weight = get_weight_for_color(ihh_color) isg_weight = get_weight_for_color(isg_color) combined_weight = ihh_weight * isg_weight # Déterminer vulnérabilité combinée if combined_weight in [6, 9]: vulnerability = "ÉLEVÉE à CRITIQUE" elif combined_weight in [3, 4]: vulnerability = "MOYENNE" else: # 1, 2 vulnerability = "FAIBLE" # Stocker résultats results["ihh_isg_combined"][op_id] = { "ihh_value": ihh_value, "ihh_color": ihh_color, "ihh_suffix": ihh_suffix, "isg_combined": isg_combined, "isg_color": isg_color, "isg_suffix": isg_suffix, "combined_weight": combined_weight, "vulnerability": vulnerability } # 2. Calculer ICS_moyen pour chaque minerai for mineral_id, mineral in data["minerals"].items(): ics_values = list(mineral["ics_values"].values()) ics_average = 0 if ics_values: ics_average = sum(ics_values) / len(ics_values) ivc_value = mineral.get("ivc", 0) # Déterminer couleurs et poids ics_color, ics_suffix = determine_threshold_color(ics_average, "ICS", thresholds) ivc_color, ivc_suffix = determine_threshold_color(ivc_value, "IVC", thresholds) # Calculer poids combiné ics_weight = get_weight_for_color(ics_color) ivc_weight = get_weight_for_color(ivc_color) combined_weight = ics_weight * ivc_weight # Déterminer vulnérabilité combinée if combined_weight in [6, 9]: vulnerability = "ÉLEVÉE à CRITIQUE" elif combined_weight in [3, 4]: vulnerability = "MOYENNE" else: # 1, 2 vulnerability = "FAIBLE" # Stocker résultats results["ics_ivc_combined"][mineral_id] = { "ics_average": ics_average, "ics_color": ics_color, "ics_suffix": ics_suffix, "ivc_value": ivc_value, "ivc_color": ivc_color, "ivc_suffix": ivc_suffix, "combined_weight": combined_weight, "vulnerability": vulnerability } # 3. Identifier tous les chemins et leurs vulnérabilités for product_id, product in data["products"].items(): for component_id in product["components"]: component = data["components"][component_id] for mineral_id in component["minerals"]: mineral = data["minerals"][mineral_id] # Collecter toutes les vulnérabilités dans ce chemin path_vulnerabilities = [] # Assemblage (si présent) assembly_id = product["assembly"] if assembly_id and assembly_id in results["ihh_isg_combined"]: path_vulnerabilities.append({ "type": "assemblage", "vulnerability": results["ihh_isg_combined"][assembly_id]["vulnerability"], "operation_id": assembly_id }) # Fabrication (si présent) manufacturing_id = component["manufacturing"] if manufacturing_id and manufacturing_id in results["ihh_isg_combined"]: path_vulnerabilities.append({ "type": "fabrication", "vulnerability": results["ihh_isg_combined"][manufacturing_id]["vulnerability"], "operation_id": manufacturing_id }) # Minerai (ICS+IVC) if mineral_id in results["ics_ivc_combined"]: path_vulnerabilities.append({ "type": "minerai", "vulnerability": results["ics_ivc_combined"][mineral_id]["vulnerability"], "mineral_id": mineral_id }) # Extraction (si présent) extraction_id = mineral["extraction"] if extraction_id and extraction_id in results["ihh_isg_combined"]: path_vulnerabilities.append({ "type": "extraction", "vulnerability": results["ihh_isg_combined"][extraction_id]["vulnerability"], "operation_id": extraction_id }) # Traitement (si présent) treatment_id = mineral["treatment"] if treatment_id and treatment_id in results["ihh_isg_combined"]: path_vulnerabilities.append({ "type": "traitement", "vulnerability": results["ihh_isg_combined"][treatment_id]["vulnerability"], "operation_id": treatment_id }) # Classifier le chemin path_info = { "product": product_id, "component": component_id, "mineral": mineral_id, "vulnerabilities": path_vulnerabilities } # Déterminer le niveau de risque du chemin critical_count = path_vulnerabilities.count({"vulnerability": "ÉLEVÉE à CRITIQUE"}) medium_count = path_vulnerabilities.count({"vulnerability": "MOYENNE"}) if any(v["vulnerability"] == "ÉLEVÉE à CRITIQUE" for v in path_vulnerabilities): path_info["risk_level"] = "critique" elif medium_count >= 3: path_info["risk_level"] = "majeur" elif any(v["vulnerability"] == "MOYENNE" for v in path_vulnerabilities): path_info["risk_level"] = "moyen" else: path_info["risk_level"] = "faible" results["chains"].append(path_info) return results def generate_introduction_section(data): """ Génère la section d'introduction du rapport. """ products = [p["label"] for p in data["products"].values()] components = [c["label"] for c in data["components"].values()] minerals = [m["label"] for m in data["minerals"].values()] template = [] template.append("## Introduction\n") template.append("Ce rapport analyse les vulnérabilités de la chaîne de fabrication du numérique pour :\n") template.append("* les produits finaux : " + ", ".join(products)) template.append("* les composants : " + ", ".join(components)) template.append("* les minerais : " + ", ".join(minerals) + "\n") return "\n".join(template) def generate_methodology_section(): """ Génère la section méthodologie du rapport. """ template = [] template.append("## Méthodologie d'analyse des risques\n") template.append("### Indices et seuils\n") template.append("La méthode d'évaluation intègre 4 indices et leurs combinaisons pour identifier les chemins critiques.\n") # IHH template.append("#### IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'une opération\n") # Essayer d'abord avec le chemin exact ihh_context_file = "Criticités/Fiche technique IHH/00-contexte-et-objectif.md" if os.path.exists(os.path.join(CORPUS_DIR, ihh_context_file)): template.append(read_corpus_file(ihh_context_file, remove_first_title=True)) else: # Fallback à la recherche par motif ihh_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique IHH") if ihh_context_file: template.append(read_corpus_file(ihh_context_file, remove_first_title=True)) # Essayer d'abord avec le chemin exact ihh_calc_file = "Criticités/Fiche technique IHH/01-mode-de-calcul/_intro.md" if os.path.exists(os.path.join(CORPUS_DIR, ihh_calc_file)): template.append(read_corpus_file(ihh_calc_file, remove_first_title=True)) else: # Fallback à la recherche par motif ihh_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique IHH") if ihh_calc_file: template.append(read_corpus_file(ihh_calc_file, remove_first_title=True)) template.append(" * Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)\n") # ISG template.append("#### ISG (Stabilité Géopolitique) : stabilité des pays\n") # Essayer d'abord avec le chemin exact isg_context_file = "Criticités/Fiche technique ISG/00-contexte-et-objectif.md" if os.path.exists(os.path.join(CORPUS_DIR, isg_context_file)): template.append(read_corpus_file(isg_context_file, remove_first_title=True)) else: # Fallback à la recherche par motif isg_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique ISG") if isg_context_file: template.append(read_corpus_file(isg_context_file, remove_first_title=True)) # Essayer d'abord avec le chemin exact isg_calc_file = "Criticités/Fiche technique ISG/01-mode-de-calcul/_intro.md" if os.path.exists(os.path.join(CORPUS_DIR, isg_calc_file)): template.append(read_corpus_file(isg_calc_file, remove_first_title=True)) else: # Fallback à la recherche par motif isg_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique ISG") if isg_calc_file: template.append(read_corpus_file(isg_calc_file, remove_first_title=True)) template.append(" * Seuils : <40 = Vert (Stable), 40-60 = Orange, >60 = Rouge (Instable)\n") # ICS template.append("#### ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément\n") # Essayer d'abord avec le chemin exact ics_context_file = "Criticités/Fiche technique ICS/00-contexte-et-objectif.md" if os.path.exists(os.path.join(CORPUS_DIR, ics_context_file)): template.append(read_corpus_file(ics_context_file, remove_first_title=True)) else: # Fallback à la recherche par motif ics_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique ICS") if ics_context_file: template.append(read_corpus_file(ics_context_file, remove_first_title=True)) # Essayer d'abord avec le chemin exact ics_calc_file = "Criticités/Fiche technique ICS/01-mode-de-calcul/_intro.md" if os.path.exists(os.path.join(CORPUS_DIR, ics_calc_file)): template.append(read_corpus_file(ics_calc_file, remove_first_title=True)) else: # Fallback à la recherche par motif ics_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique ICS") if ics_calc_file: template.append(read_corpus_file(ics_calc_file, remove_first_title=True)) template.append(" * Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)\n") # IVC template.append("#### IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs\n") # Essayer d'abord avec le chemin exact ivc_context_file = "Criticités/Fiche technique IVC/00-contexte-et-objectif.md" if os.path.exists(os.path.join(CORPUS_DIR, ivc_context_file)): template.append(read_corpus_file(ivc_context_file, remove_first_title=True)) else: # Fallback à la recherche par motif ivc_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique IVC") if ivc_context_file: template.append(read_corpus_file(ivc_context_file, remove_first_title=True)) # Essayer d'abord avec le chemin exact ivc_calc_file = "Criticités/Fiche technique IVC/01-mode-de-calcul/_intro.md" if os.path.exists(os.path.join(CORPUS_DIR, ivc_calc_file)): template.append(read_corpus_file(ivc_calc_file, remove_first_title=True)) else: # Fallback à la recherche par motif ivc_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique IVC") if ivc_calc_file: template.append(read_corpus_file(ivc_calc_file, remove_first_title=True)) template.append(" * Seuils : <5 = Vert (Faible), 5-15 = Orange (Modérée), >15 = Rouge (Forte)\n") # Combinaison des indices template.append("### Combinaison des indices\n") # IHH et ISG template.append("**IHH et ISG**\n") template.append("Ces deux indices s'appliquent à toutes les opérations et se combinent dans l'évaluation du risque (niveau d'impact et probabilité de survenance) :\n") template.append("* l'IHH donne le niveau d'impact => une forte concentration implique un fort impact si le risque est avéré") template.append("* l'ISG donne la probabilité de survenance => plus les pays sont instables (et donc plus l'ISG est élevé) et plus la survenance du risque est élevée\n") template.append("Pour évaluer le risque pour une opération, les ISG des pays sont pondérés par les parts de marché respectives pour donner un ISG combiné dont le calcul est :") template.append("ISG_combiné = (Somme des ISG des pays multipliée par leur part de marché) / Sommes de leur part de marché\n") template.append("On établit alors une matrice (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ISG combiné et de l'IHH\n") template.append("| ISG combiné / IHH | Vert | Orange | Rouge |") template.append("| :-- | :-- | :-- | :-- |") template.append("| Vert | 1 | 2 | 3 |") template.append("| Orange | 2 | 4 | 6 |") template.append("| Rouge | 3 | 6 | 9 |\n") template.append("Les vulnérabilités se classent en trois niveaux pour chaque opération :\n") template.append("* Vulnérabilité combinée élevée à critique : poids 6 et 9") template.append("* Vulnérabilité combinée moyenne : poids 3 et 4") template.append("* Vulnérabilité combinée faible : poids 1 et 2\n") # ICS et IVC template.append("**ICS et IVC**\n") template.append("Ces deux indices se combinent dans l'évaluation du risque pour un minerai :\n") template.append("* l'ICS donne le niveau d'impact => une faible substituabilité (et donc un ICS élevé) implique un fort impact si le risque est avéré ; l'ICS est associé à la relation entre un composant et un minerai") template.append("* l'IVC donne la probabilité de l'impact => une forte concurrence intersectorielle (IVC élevé) implique une plus forte probabilité de survenance\n") template.append("Par simplification, on intègre un ICS moyen d'un minerai comme étant la moyenne des ICS pour chacun des composants dans lesquels il intervient.\n") template.append("On établit alors une matrice (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ICS moyen et de l'IVC.\n") template.append("| ICS_moyen / IVC | Vert | Orange | Rouge |") template.append("| :-- | :-- | :-- | :-- |") template.append("| Vert | 1 | 2 | 3 |") template.append("| Orange | 2 | 4 | 6 |") template.append("| Rouge | 3 | 6 | 9 |\n") template.append("Les vulnérabilités se classent en trois niveaux pour chaque minerai :\n") template.append("* Vulnérabilité combinée élevée à critique : poids 6 et 9") template.append("* Vulnérabilité combinée moyenne : poids 3 et 4") template.append("* Vulnérabilité combinée faible : poids 1 et 2\n") return "\n".join(template) def composant_match(nom_composant, nom_dossier): """ Vérifie si le nom du composant correspond approximativement à un nom de dossier (lettres et chiffres dans le même ordre). """ def clean(s): return ''.join(c.lower() for c in s if c.isalnum()) cleaned_comp = clean(nom_composant) cleaned_dir = clean(nom_dossier) # Vérifie que chaque caractère de cleaned_comp est présent dans cleaned_dir dans le bon ordre it = iter(cleaned_dir) return all(c in it for c in cleaned_comp) def trouver_dossier_composant(nom_composant, base_path, prefixe): """ Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant. """ search_path = os.path.join(CORPUS_DIR, base_path) if not os.path.exists(search_path): return None for d in os.listdir(search_path): if os.path.isdir(os.path.join(search_path, d)): if composant_match(f"{prefixe}{nom_composant}", d): return os.path.join(base_path, d) return None def generate_operations_section(data, results, config): """ Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement). """ # # print("DEBUG: Génération de la section des opérations") # # print(f"DEBUG: Nombre de produits: {len(data['products'])}") # # print(f"DEBUG: Nombre de composants: {len(data['components'])}") # # print(f"DEBUG: Nombre d'opérations: {len(data['operations'])}") template = [] template.append("## Détails des opérations\n") # 1. Traiter les produits finaux (assemblage) for product_id, product in data["products"].items(): # # print(f"DEBUG: Produit {product_id} ({product['label']}), assembly = {product['assembly']}") if product["assembly"]: template.append(f"### {product['label']} et Assemblage\n") # Récupérer la présentation synthétique # product_slug = product['label'].lower().replace(' ', '-') sous_repertoire = f"{product['label']}" if product["level"] == 0: type = "Assemblage" else: type = "Connexe" sous_repertoire = trouver_dossier_composant(sous_repertoire, type, "Fiche assemblage ") product_slug = sous_repertoire.split(' ', 2)[2] presentation_file = find_corpus_file("présentation-synthétique", f"{type}/Fiche assemblage {product_slug}") if presentation_file: template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append("") # Récupérer les principaux assembleurs assembleurs_file = find_corpus_file("principaux-assembleurs", f"{type}/Fiche assemblage {product_slug}") if assembleurs_file: template.append(read_corpus_file(assembleurs_file, shift_titles=2)) template.append("") # ISG des pays impliqués assembly_id = product["assembly"] operation = data["operations"][assembly_id] template.append("##### ISG des pays impliqués\n") template.append("| Pays | Part de marché | ISG | Criticité |") template.append("| :-- | :-- | :-- | :-- |") isg_weighted_sum = 0 total_share = 0 for country_id, share in operation["countries"].items(): country = data["countries"][country_id] geo_country = country.get("geo_country") if geo_country and geo_country in data["geo_countries"]: isg_value = data["geo_countries"][geo_country]["isg"] color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds')) template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |") isg_weighted_sum += isg_value * share total_share += share # Calculer ISG combiné if total_share > 0: isg_combined = isg_weighted_sum / total_share color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds')) template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**") # IHH ihh_file = find_corpus_file("matrice-des-risques-liés-à-l-assemblage/indice-de-herfindahl-hirschmann", f"{type}/Fiche assemblage {product_slug}") if ihh_file: template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append("\n") # Vulnérabilité combinée if assembly_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][assembly_id] template.append("#### Vulnérabilité combinée IHH-ISG\n") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n") # 2. Traiter les composants (fabrication) for component_id, component in data["components"].items(): # # print(f"DEBUG: Composant {component_id} ({component['label']}), manufacturing = {component['manufacturing']}") if component["manufacturing"]: template.append(f"### {component['label']} et Fabrication\n") # Récupérer la présentation synthétique # component_slug = component['label'].lower().replace(' ', '-') sous_repertoire = f"{component['label']}" sous_repertoire = trouver_dossier_composant(sous_repertoire, "Fabrication", "Fiche fabrication ") component_slug = sous_repertoire.split(' ', 2)[2] presentation_file = find_corpus_file("présentation-synthétique", f"Fabrication/Fiche fabrication {component_slug}") if presentation_file: template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append("\n") # Récupérer les principaux fabricants fabricants_file = find_corpus_file("principaux-fabricants", f"Fabrication/Fiche fabrication {component_slug}") if fabricants_file: template.append(read_corpus_file(fabricants_file, shift_titles=2)) template.append("\n") # ISG des pays impliqués manufacturing_id = component["manufacturing"] operation = data["operations"][manufacturing_id] template.append("#### ISG des pays impliqués\n") template.append("| Pays | Part de marché | ISG | Criticité |") template.append("| :-- | :-- | :-- | :-- |") isg_weighted_sum = 0 total_share = 0 for country_id, share in operation["countries"].items(): country = data["countries"][country_id] geo_country = country.get("geo_country") if geo_country and geo_country in data["geo_countries"]: isg_value = data["geo_countries"][geo_country]["isg"] color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds')) template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |") isg_weighted_sum += isg_value * share total_share += share # Calculer ISG combiné if total_share > 0: isg_combined = isg_weighted_sum / total_share color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds')) template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n") # IHH ihh_file = find_corpus_file("matrice-des-risques-liés-à-la-fabrication/indice-de-herfindahl-hirschmann", f"Fabrication/Fiche fabrication {component_slug}") if ihh_file: template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append("\n") # Vulnérabilité combinée if manufacturing_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][manufacturing_id] template.append("#### Vulnérabilité combinée IHH-ISG\n") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n") # 3. Traiter les minerais (détaillés dans une section séparée) result = "\n".join(template) # # print(f"DEBUG: Fin de génération de la section des opérations. Taille: {len(result)} caractères") if len(result) <= 30: # Juste le titre de section # # print("DEBUG: ALERTE - La section des opérations est vide ou presque vide!") # Ajout d'une section de débogage dans le rapport template.append("### DÉBOGAGE - Opérations manquantes\n") template.append("Aucune opération d'assemblage ou de fabrication n'a été trouvée dans les données.\n") template.append("Informations disponibles:\n") template.append(f"* Nombre de produits: {len(data['products'])}\n") template.append(f"* Nombre de composants: {len(data['components'])}\n") template.append(f"* Nombre d'opérations: {len(data['operations'])}\n") template.append("\nDétail des produits et de leurs opérations d'assemblage:\n") for pid, p in data["products"].items(): template.append(f"* {p['label']}: {'Assemblage: ' + str(p['assembly']) if p['assembly'] else 'Pas d\'assemblage'}\n") template.append("\nDétail des composants et de leurs opérations de fabrication:\n") for cid, c in data["components"].items(): template.append(f"* {c['label']}: {'Fabrication: ' + str(c['manufacturing']) if c['manufacturing'] else 'Pas de fabrication'}\n") result = "\n".join(template) return result def generate_minerals_section(data, results, config): """ Génère la section détaillant les minerais et leurs opérations d'extraction et traitement. """ template = [] template.append("## Détails des minerais\n") for mineral_id, mineral in data["minerals"].items(): mineral_slug = mineral['label'].lower().replace(' ', '-') fiche_dir = f"{CORPUS_DIR}/Minerai/Fiche minerai {mineral_slug}" if not os.path.exists(fiche_dir): continue template.append(f"---\n\n### {mineral['label']}\n") # Récupérer la présentation synthétique presentation_file = find_corpus_file("présentation-synthétique", f"Minerai/Fiche minerai {mineral_slug}") if presentation_file: template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append("\n") # ICS template.append("#### ICS\n") ics_intro_file = find_corpus_file("risque-de-substituabilité/_intro", f"Minerai/Fiche minerai {mineral_slug}") if ics_intro_file: template.append(read_corpus_file(ics_intro_file, remove_first_title=True)) template.append("\n") # Calcul de l'ICS moyen ics_values = list(mineral["ics_values"].values()) if ics_values: ics_average = sum(ics_values) / len(ics_values) color, suffix = determine_threshold_color(ics_average, "ICS", config.get('thresholds')) template.append("##### Valeurs d'ICS par composant\n") template.append("| Composant | ICS | Criticité |") template.append("| :-- | :-- | :-- |") for comp_id, ics_value in mineral["ics_values"].items(): comp_name = data["components"][comp_id]["label"] comp_color, comp_suffix = determine_threshold_color(ics_value, "ICS", config.get('thresholds')) template.append(f"| {comp_name} | {ics_value:.2f} | {comp_color} ({comp_suffix}) |") template.append(f"\n**ICS moyen : {ics_average:.2f} - {color} ({suffix})**\n") # IVC template.append("#### IVC\n\n") # Valeur IVC ivc_value = mineral.get("ivc", 0) color, suffix = determine_threshold_color(ivc_value, "IVC", config.get('thresholds')) template.append(f"**IVC: {ivc_value} - {color} ({suffix})**\n") # Récupérer toutes les sections de vulnérabilité de concurrence ivc_sections = [] ivc_dir = find_prefixed_directory("vulnérabilité-de-concurrence", f"Minerai/Fiche minerai {mineral_slug}") corpus_path = os.path.join(CORPUS_DIR, ivc_dir) if os.path.exists(os.path.join(CORPUS_DIR, ivc_dir)) else None if corpus_path: for file in sorted(os.listdir(corpus_path)): if file.endswith('.md') and "_intro.md" not in file and "sources" not in file: ivc_sections.append(os.path.join(ivc_dir, file)) # Inclure chaque section IVC for section_file in ivc_sections: content = read_corpus_file(section_file, remove_first_title=False) # Nettoyer les balises des fichiers IVC content = re.sub(r'```.*?```', '', content, flags=re.DOTALL) # Mettre le titre en italique s'il commence par un # (format Markdown pour titre) if content and '\n' in content: first_line, rest = content.split('\n', 1) if first_line.strip().startswith('#'): # Extraire le texte du titre sans les # et les espaces title_text = first_line.strip().lstrip('#').strip() content = f"\n*{title_text}*\n{rest.strip()}" # Ne pas ajouter de contenu vide if content.strip(): template.append(content.strip()) # ICS et IVC combinés if mineral_id in results["ics_ivc_combined"]: combined = results["ics_ivc_combined"][mineral_id] template.append("\n#### Vulnérabilité combinée ICS-IVC\n") template.append(f"* ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']} ({combined['ics_suffix']})") template.append(f"* IVC: {combined['ivc_value']} - {combined['ivc_color']} ({combined['ivc_suffix']})") template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n") # Extraction if mineral["extraction"]: template.append("#### Extraction\n") # Récupérer les principaux producteurs producers_file = find_corpus_file("principaux-producteurs-extraction", f"Minerai/Fiche minerai {mineral_slug}") if producers_file: template.append(read_corpus_file(producers_file, remove_first_title=True)) template.append("\n") # ISG des pays impliqués extraction_id = mineral["extraction"] operation = data["operations"][extraction_id] template.append("##### ISG des pays impliqués\n") template.append("| Pays | Part de marché | ISG | Criticité |") template.append("| :-- | :-- | :-- | :-- |") isg_weighted_sum = 0 total_share = 0 for country_id, share in operation["countries"].items(): country = data["countries"][country_id] geo_country = country.get("geo_country") if geo_country and geo_country in data["geo_countries"]: isg_value = data["geo_countries"][geo_country]["isg"] color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds')) template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |") isg_weighted_sum += isg_value * share total_share += share # Calculer ISG combiné if total_share > 0: isg_combined = isg_weighted_sum / total_share color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds')) template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n") # IHH extraction ihh_file = find_corpus_file("matrice-des-risques/indice-de-herfindahl-hirschmann-extraction", f"Minerai/Fiche minerai {mineral_slug}") if ihh_file: template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append("\n") # Vulnérabilité combinée if extraction_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][extraction_id] template.append("##### Vulnérabilité combinée IHH-ISG pour l'extraction\n") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n") # Traitement if mineral["treatment"]: template.append("#### Traitement\n") # Récupérer les principaux producteurs producers_file = find_corpus_file("principaux-producteurs-traitement", f"Minerai/Fiche minerai {mineral_slug}") if producers_file: template.append(read_corpus_file(producers_file, remove_first_title=True)) template.append("\n") # ISG des pays impliqués treatment_id = mineral["treatment"] operation = data["operations"][treatment_id] template.append("##### ISG des pays impliqués\n") template.append("| Pays | Part de marché | ISG | Criticité |") template.append("| :-- | :-- | :-- | :-- |") isg_weighted_sum = 0 total_share = 0 for country_id, share in operation["countries"].items(): country = data["countries"][country_id] geo_country = country.get("geo_country") if geo_country and geo_country in data["geo_countries"]: isg_value = data["geo_countries"][geo_country]["isg"] color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds')) template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |") isg_weighted_sum += isg_value * share total_share += share # Calculer ISG combiné if total_share > 0: isg_combined = isg_weighted_sum / total_share color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds')) template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n") # IHH traitement ihh_file = find_corpus_file("matrice-des-risques/indice-de-herfindahl-hirschmann-traitement", f"Minerai/Fiche minerai {mineral_slug}") if ihh_file: template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append("\n") # Vulnérabilité combinée if treatment_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][treatment_id] template.append("##### Vulnérabilité combinée IHH-ISG pour le traitement\n") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n") return "\n".join(template) def generate_critical_paths_section(data, results): """ Génère la section des chemins critiques. """ template = [] template.append("## Chemins critiques\n") # Récupérer les chaînes par niveau de risque critical_chains = [] major_chains = [] medium_chains = [] for chain in results["chains"]: if chain["risk_level"] == "critique": critical_chains.append(chain) elif chain["risk_level"] == "majeur": major_chains.append(chain) elif chain["risk_level"] == "moyen": medium_chains.append(chain) # 1. Chaînes critiques template.append("### Chaînes avec risque critique\n") template.append("*Ces chaînes comprennent au moins une vulnérabilité combinée élevée à critique*\n") if critical_chains: for chain in critical_chains: product_name = data["products"][chain["product"]]["label"] component_name = data["components"][chain["component"]]["label"] mineral_name = data["minerals"][chain["mineral"]]["label"] template.append(f"#### {product_name} → {component_name} → {mineral_name}\n") # Vulnérabilités template.append("**Vulnérabilités identifiées:**\n") for vuln in chain["vulnerabilities"]: vuln_type = vuln["type"].capitalize() vuln_level = vuln["vulnerability"] if vuln_type == "Minerai": mineral_id = vuln["mineral_id"] template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}") if mineral_id in results["ics_ivc_combined"]: combined = results["ics_ivc_combined"][mineral_id] template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}") template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}") else: op_id = vuln["operation_id"] op_label = data["operations"][op_id]["label"] template.append(f"* {vuln_type} ({op_label}): {vuln_level}") if op_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][op_id] template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}") template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}") template.append("\n") else: template.append("Aucune chaîne à risque critique identifiée.\n") # 2. Chaînes majeures template.append("### Chaînes avec risque majeur\n") template.append("*Ces chaînes comprennent au moins trois vulnérabilités combinées moyennes*\n") if major_chains: for chain in major_chains: product_name = data["products"][chain["product"]]["label"] component_name = data["components"][chain["component"]]["label"] mineral_name = data["minerals"][chain["mineral"]]["label"] template.append(f"#### {product_name} → {component_name} → {mineral_name}\n") # Vulnérabilités template.append("**Vulnérabilités identifiées:**\n") for vuln in chain["vulnerabilities"]: vuln_type = vuln["type"].capitalize() vuln_level = vuln["vulnerability"] if vuln_type == "Minerai": mineral_id = vuln["mineral_id"] template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}\n") if mineral_id in results["ics_ivc_combined"]: combined = results["ics_ivc_combined"][mineral_id] template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}\n") template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}\n") else: op_id = vuln["operation_id"] op_label = data["operations"][op_id]["label"] template.append(f"* {vuln_type} ({op_label}): {vuln_level}\n") if op_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][op_id] template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}\n") template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}\n") template.append("\n") else: template.append("Aucune chaîne à risque majeur identifiée.\n") # 3. Chaînes moyennes template.append("### Chaînes avec risque moyen\n") template.append("*Ces chaînes comprennent au moins une vulnérabilité combinée moyenne*\n") if medium_chains: for chain in medium_chains: product_name = data["products"][chain["product"]]["label"] component_name = data["components"][chain["component"]]["label"] mineral_name = data["minerals"][chain["mineral"]]["label"] template.append(f"#### {product_name} → {component_name} → {mineral_name}\n") # Vulnérabilités template.append("**Vulnérabilités identifiées:**\n") for vuln in chain["vulnerabilities"]: vuln_type = vuln["type"].capitalize() vuln_level = vuln["vulnerability"] if vuln_type == "Minerai": mineral_id = vuln["mineral_id"] template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}\n") if mineral_id in results["ics_ivc_combined"]: combined = results["ics_ivc_combined"][mineral_id] template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}\n") template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}\n") else: op_id = vuln["operation_id"] op_label = data["operations"][op_id]["label"] template.append(f"* {vuln_type} ({op_label}): {vuln_level}\n") if op_id in results["ihh_isg_combined"]: combined = results["ihh_isg_combined"][op_id] template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}\n") template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}\n") template.append("\n") else: template.append("Aucune chaîne à risque moyen identifiée.\n") return "\n".join(template) def extraire_sections_par_mot_cle(fichier_markdown: Path) -> dict: """ Extrait les sections de niveau 3 uniquement dans la section '## Chaînes avec risque critique' du fichier Markdown, et les regroupe par mot-clé (ce qui se trouve entre '### ' et ' →'). Réduit chaque titre d’un niveau (#). """ with fichier_markdown.open(encoding="utf-8") as f: contenu = f.read() # Extraire uniquement la section '## Chaînes avec risque critique' match_section = re.search( r"## Chaînes avec risque critique(.*?)(?=\n## |\Z)", contenu, re.DOTALL ) if not match_section: return {} section_critique = match_section.group(1) # Extraire les mots-clés entre '### ' et ' →' mots_cles = set(re.findall(r"^### (.+?) →", section_critique, re.MULTILINE)) # Extraire tous les blocs de niveau 3 dans cette section uniquement blocs_sections = re.findall(r"(### .+?)(?=\n### |\n## |\Z)", section_critique, re.DOTALL) # Regrouper les blocs par mot-clé regroupement = defaultdict(list) for bloc in blocs_sections: match = re.match(r"### (.+?) →", bloc) if match: mot = match.group(1) if mot in mots_cles: # Réduction du niveau des titres bloc_modifie = re.sub(r"^###", "##", bloc, flags=re.MULTILINE) bloc_modifie = re.sub(r"^###", "##", bloc_modifie, flags=re.MULTILINE) regroupement[mot].append(bloc_modifie) return {mot: "\n\n".join(blocs) for mot, blocs in regroupement.items()} def ingest_document(file_path: Path) -> bool: """Ingère un document dans PrivateGPT""" try: with open(file_path, "rb") as f: file_name = file_path.name files = {"file": (file_name, f, "text/markdown")} # Ajouter des métadonnées pour identifier facilement ce fichier d'entrée metadata = { "type": "input_file", "session_id": session_uuid, "document_type": "rapport_analyse_input" } response = requests.post( f"{API_URL}/ingest/file", files=files, data={"metadata": json.dumps(metadata)} if "metadata" in requests.get(f"{API_URL}/ingest/file").text else None ) response.raise_for_status() print(f"✅ Document '{file_path}' ingéré avec succès sous le nom '{file_name}'") return True except FileNotFoundError: print(f"❌ Fichier '{file_path}' introuvable") return False except requests.RequestException as e: print(f"❌ Erreur lors de l'ingestion du document: {e}") return False def generate_report(data, results, config): """ Génère le rapport complet structuré selon les spécifications. """ # Titre principal report_titre = ["# Évaluation des vulnérabilités critiques\n"] # Section d'introduction report_introduction = generate_introduction_section(data) # report.append(generate_introduction_section(data)) # Section méthodologie report_methodologie = generate_methodology_section() # report.append(generate_methodology_section()) # Section détails des opérations report_operations = generate_operations_section(data, results, config) # report.append(generate_operations_section(data, results, config)) # Section détails des minerais report_minerals = generate_minerals_section(data, results, config) # report.append(generate_minerals_section(data, results, config)) # Section chemins critiques report_critical_paths = generate_critical_paths_section(data, results) suffixe = " - chemins critiques" fichier = TEMPLATE_PATH.name.replace(".md", f"{suffixe}.md") fichier_path = TEMPLATE_PATH.parent / fichier # Élever les titres Markdown dans report_critical_paths report_critical_paths = re.sub(r'^(#{2,})', lambda m: '#' * (len(m.group(1)) - 1), report_critical_paths, flags=re.MULTILINE) write_report(report_critical_paths, fichier_path) # Récupérer les sections critiques décomposées par mot-clé chemins_critiques_sections = extraire_sections_par_mot_cle(fichier_path) file_names = [] # Pour chaque mot-clé, écrire un fichier individuel for mot_cle, contenu in chemins_critiques_sections.items(): print(mot_cle) suffixe = f" - chemins critiques {mot_cle}" fichier_personnalise = TEMPLATE_PATH.with_name( TEMPLATE_PATH.name.replace(".md", f"{suffixe}.md") ) # Ajouter du texte au début du contenu introduction = f"# Détail des chemins critiques pour : {mot_cle}\n\n" contenu = introduction + contenu write_report(contenu, fichier_personnalise) file_names.append(fichier_personnalise) # report.append(generate_critical_paths_section(data, results)) # Ordre de composition final report = ( report_titre + [report_introduction] + [report_critical_paths] + [report_operations] + [report_minerals] + [report_methodologie] ) return "\n".join(report), file_names def generate_text(input_file, full_prompt, system_message, temperature = "0.1"): """Génère du texte avec l'API PrivateGPT""" try: # Définir les paramètres de la requête payload = { "messages": [ {"role": "system", "content": system_message}, {"role": "user", "content": full_prompt} ], "use_context": True, # Active la recherche RAG dans les documents ingérés "temperature": temperature, # Température réduite pour plus de cohérence "stream": False } # Tenter d'ajouter un filtre de contexte (fonctionnalité expérimentale qui peut ne pas être supportée) if input_file: try: # Vérifier si le filtre de contexte est supporté sans faire de requête supplémentaire liste_des_fichiers = list(TEMP_SECTIONS.glob(f"*{session_uuid}*.md")) filter_metadata = { "document_name": [input_file.name] + [f.name for f in liste_des_fichiers] } payload["filter_metadata"] = filter_metadata except Exception as e: print(f"ℹ️ Remarque: Impossible d'appliquer le filtre de contexte: {e}") # Envoyer la requête response = requests.post( f"{API_URL}/chat/completions", json=payload, headers={"accept": "application/json"} ) response.raise_for_status() # Extraire la réponse générée result = response.json() if "choices" in result and len(result["choices"]) > 0: return result["choices"][0]["message"]["content"] else: print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2)) return None except requests.RequestException as e: print(f"❌ Erreur lors de la génération de texte: {e}") if hasattr(e, 'response') and e.response is not None: print(f"Détails: {e.response.text}") return None def ia_analyse(file_names): for file in file_names: ingest_document(file) time.sleep(5) reponse = {} for file in file_names: produit_final = re.search(r"chemins critiques (.+)\.md$", file.name).group(1) # Préparer le prompt avec le contexte précédent si disponible et demandé full_prompt = f""" Rédigez une synthèse du fichier {file.name} dédiée au produit final '{produit_final}'. Cette synthèse, destinée spécifiquement au Directeur des Risques, membre du COMEX d'une grande entreprise utilisant ce produit, doit être claire et concise (environ 10 lignes). En utilisant impérativement la méthodologie fournie, expliquez en termes simples mais précis, pourquoi et comment les vulnérabilités identifiées constituent un risque concret pour l'entreprise. Mentionnez clairement : - Les composants spécifiques du produit '{produit_final}' concernés par ces vulnérabilités. - Les minerais précis responsables de ces vulnérabilités et leur rôle dans l’impact sur les composants. - Les points critiques exacts identifiés dans la chaîne d'approvisionnement (par exemple : faible substituabilité, forte concentration géographique, instabilité géopolitique, concurrence élevée entre secteurs industriels). Respectez strictement les consignes suivantes : - N'utilisez aucun acronyme ni valeur numérique ; uniquement leur équivalent textuel (ex : criticité de substituabilité, vulnérabilité élevée ou critique, etc.). - N'incluez à ce stade aucune préconisation ni recommandation. Votre texte doit être parfaitement adapté à une compréhension rapide par des dirigeants d’entreprise. """ # Définir les paramètres de la requête system_message = f""" Vous êtes un assistant stratégique expert chargé de rédiger des synthèses destinées à des décideurs de très haut niveau (Directeurs des Risques, membres du COMEX, stratèges industriels). Vous analysez exclusivement les vulnérabilités systémiques affectant les produits numériques, à partir des données précises fournies dans le fichier {file.name}. Votre analyse doit être rigoureuse, accessible, pertinente pour la prise de décision stratégique, et conforme à la méthodologie définie ci-dessous : {PROMPT_METHODOLOGIE} """ reponse[produit_final] = f"\n**{produit_final}**\n\n" + generate_text(file, full_prompt, system_message).split("")[-1].strip() # print(reponse[produit_final]) corps = "\n\n".join(reponse.values()) print("Corps") full_prompt = corps + "\n\n" + PROMPT_METHODOLOGIE system_message = """ Vous êtes un expert en rédaction de rapports stratégiques destinés à un COMEX ou une Direction des Risques. Votre mission est d'écrire une introduction professionnelle, claire et synthétique (maximum 7 lignes) à partir des éléments suivants : 1. Un corps d’analyse décrivant les vulnérabilités identifiées pour un produit numérique. 2. La méthodologie détaillée utilisée pour cette analyse (fourni en deuxième partie). Votre introduction doit : - Présenter brièvement le sujet traité (vulnérabilités du produit final) et quels sont les produits finaux concernés. - Annoncer clairement le contenu et l'objectif de l'analyse présentée dans le corps. - Résumer succinctement les axes méthodologiques principaux (concentration géographique ou industrielle, stabilité géopolitique, criticité de substituabilité, concurrence intersectorielle des minerais). - Être facilement compréhensible par des décideurs de haut niveau (pas d'acronymes, ni chiffres ; uniquement des formulations textuelles). - Être fluide, agréable à lire, avec un ton sobre et professionnel. Répondez uniquement avec l'introduction rédigée. Ne fournissez aucune autre explication complémentaire. """ introduction = generate_text("", full_prompt, system_message).split("")[-1].strip() print("Introduction") full_prompt = corps + "\n\n" + PROMPT_METHODOLOGIE system_message = """ Vous êtes un expert stratégique en gestion des risques liés à la chaîne de valeur numérique. Vous conseillez directement le COMEX et la Direction des Risques de grandes entreprises utilisatrices de produits numériques. Ces entreprises n'ont pour levier d’action que le choix de leurs fournisseurs ou l'allongement de la durée de vie de leur matériel. À partir des vulnérabilités identifiées dans la première partie du prompt (corps d'analyse) et en tenant compte du contexte et de la méthodologie décrite en deuxième partie, rédigez un texte clair, structuré en deux parties distinctes : 1. **Préconisations stratégiques :** Proposez clairement des axes concrets pour limiter les risques identifiés dans l’analyse. Ces préconisations doivent impérativement être réalistes et directement actionnables par les dirigeants compte tenu de leurs leviers limités. 2. **Indicateurs de suivi :** Identifiez précisément les indicateurs pertinents à suivre pour évaluer régulièrement l’évolution de ces risques. Ces indicateurs doivent être inspirés directement des axes méthodologiques fournis (concentration géographique, stabilité géopolitique, substituabilité, concurrence intersectorielle) ou s’appuyer sur des bonnes pratiques reconnues. Votre rédaction doit être fluide, concise, très professionnelle, et directement accessible à un COMEX. Évitez strictement toute explication complémentaire ou ajout superflu. Ne proposez que le texte demandé. """ preconisations = generate_text("", full_prompt, system_message, "0.5").split("")[-1].strip() print("Préconisations") full_prompt = corps + "\n\n" + preconisations system_message = """ Vous êtes un expert stratégique spécialisé dans les risques liés à la chaîne de valeur du numérique. Vous conseillez directement le COMEX et la Direction des Risques de grandes entreprises dépendantes du numérique, dont les leviers d’action se limitent au choix des fournisseurs et à l’allongement de la durée d’utilisation du matériel. À partir du résultat de l'analyse des vulnérabilités présenté en première partie du prompt (corps) et des préconisations stratégiques formulées en deuxième partie, rédigez une conclusion synthétique et percutante (environ 6 à 8 lignes maximum) afin de : - Résumer clairement les principaux risques identifiés. - Souligner brièvement les axes prioritaires proposés pour agir concrètement. - Inviter de manière dynamique le COMEX à passer immédiatement à l'action. Votre rédaction doit être fluide, professionnelle, claire et immédiatement exploitable par des dirigeants. Ne fournissez aucune explication supplémentaire. Ne répondez que par la conclusion demandée. """ conclusion = generate_text("", full_prompt, system_message, "0.7").split("")[-1].strip() print("Conclusion") analyse = "# Rapport d'analyse\n\n" + \ "\n\n## Introduction\n\n" + \ introduction + \ "\n\n## Analyse des produits finaux\n\n" + \ corps + \ "\n\n## Préconisations\n\n" + \ preconisations + \ "\n\n## Conclusion\n\n" + \ conclusion + \ "\n\n## Méthodologie\n\n" + \ PROMPT_METHODOLOGIE fichier_a_reviser = Path(TEMPLATE_PATH.name.replace(".md", " - analyse à relire.md")) write_report(analyse, TEMP_SECTIONS / fichier_a_reviser) ingest_document(TEMP_SECTIONS / fichier_a_reviser) full_prompt = f""" Le fichier à réviser est {fichier_a_reviser}. Suivre scrupuleusement les consignes. """ system_message = f""" Vous êtes un réviseur professionnel expert en écriture stratégique, maîtrisant parfaitement la langue française et habitué à réviser des textes destinés à des dirigeants de haut niveau (COMEX). Votre unique tâche est d'améliorer la qualité rédactionnelle du texte dans le fichier {fichier_a_reviser}, sans modifier ni sa structure, ni son sens initial, ni ajouter d’informations nouvelles. Cette révision doit : - Éliminer toutes répétitions ou redondances et varier systématiquement les tournures entre les paragraphes. - Rendre chaque phrase claire, directe et concise. Si une phrase est trop longue, scindez-la en plusieurs phrases courtes. - Scinder les paragraphes en 2 ou 3 parties cohérentes et bien enchaînées avec des termes de coordinations, d'implication, … - Remplacer systématiquement les acronymes par les expressions suivantes : - ICS → « capacité à substituer un minerai » - IHH → « concentration géographique ou industrielle » - ISG → « stabilité géopolitique » - IVC → « concurrence intersectorielle pour les minerais » Votre texte final doit être fluide, agréable à lire, parfaitement adapté à un COMEX, avec un ton professionnel et sobre. Répondez uniquement avec le texte révisé, sans autre commentaire. """ corps = generate_text(fichier_a_reviser, full_prompt, system_message, "0.6").split("")[-1].strip() print("Relecture") return analyse def write_report(report, fichier): """Écrit le rapport généré dans le fichier spécifié.""" report = re.sub(r'', '', report) report = re.sub(r'\n\n\n+', '\n\n', report) with open(fichier, 'w', encoding='utf-8') as f: f.write(report) # print(f"Rapport généré avec succès: {TEMPLATE_PATH}") def nettoyer_texte_fr(texte: str) -> str: # Apostrophes droites -> typographiques texte = texte.replace("'", "’") # Guillemets droits -> guillemets français (avec espace fine insécable) texte = re.sub(r'"(.*?)"', r'« \1 »', texte) # Espaces fines insécables avant : ; ! ? texte = re.sub(r' (?=[:;!?])', '\u202F', texte) # Unités : espace insécable entre chiffre et unité texte = re.sub(r'(\d) (?=\w+)', lambda m: f"{m.group(1)}\u202F", texte) # Suppression des doubles espaces texte = re.sub(r' {2,}', ' ', texte) # Remplacement optionnel des tirets simples (optionnel) texte = texte.replace(" - ", " – ") # Nettoyage ponctuation multiple accidentelle texte = re.sub(r'\s+([.,;!?])', r'\1', texte) return texte def supprimer_fichiers(session_uuid): try: delete_documents_by_criteria(session_uuid) for temp_file in TEMP_SECTIONS.glob(f"*{session_uuid}*.md"): temp_file.unlink() return True except: return False def generer_rapport_final(rapport, analyse, resultat): try: rapport = Path(rapport) analyse = Path(analyse) with zipfile.ZipFile(resultat, "w") as zipf: zipf.write(rapport, arcname=rapport.name) zipf.write(analyse, arcname=analyse.name) return True except Exception as e: print(f"Erreur lors du zip : {e}") return False def main(dot_path, output_path): """Fonction principale du script.""" # Charger la configuration config = load_config() # Analyser les graphes graph, ref_graph = parse_graphs(dot_path) # Extraire les données data = extract_data_from_graph(graph, ref_graph) # Calculer les vulnérabilités results = calculate_vulnerabilities(data, config) # Générer le rapport report, file_names = generate_report(data, results, config) # Écrire le rapport write_report(report, TEMPLATE_PATH) ingest_document(TEMPLATE_PATH) # Générer l'analyse par l'IA du rapport compler analyse_finale = nettoyer_texte_fr(ia_analyse(file_names)) analyse_fichier = TEMP_SECTIONS / TEMPLATE_PATH.name.replace(".md", " - analyse.md") write_report(analyse_finale, analyse_fichier) if generer_rapport_final(TEMPLATE_PATH, analyse_fichier, output_path): supprimer_fichiers(session_uuid) else: print("") if __name__ == "__main__": dot_path = Path(sys.argv[1]) output_path = Path(sys.argv[2]) main(dot_path, output_path)