From 81f5bb3b666ff193e02f71cf9be2de508d8e57e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabrication=20du=20Num=C3=A9rique?= Date: Fri, 23 May 2025 21:57:27 +0200 Subject: [PATCH] Corrections diverses --- schema.txt | 18 +- scripts/generate_factorized_report.py | 140 +++-- scripts/generate_structured_template.py | 797 ------------------------ scripts/generation_analyse.py | 0 4 files changed, 99 insertions(+), 856 deletions(-) delete mode 100644 scripts/generate_structured_template.py create mode 100644 scripts/generation_analyse.py diff --git a/schema.txt b/schema.txt index 2a0fc5c..b8f8168 100644 --- a/schema.txt +++ b/schema.txt @@ -1421,7 +1421,7 @@ digraph Hierarchie_Composants_Electroniques_Simplifiee { subgraph cluster_ProcedeDUV { label="ProcedeDUV"; fillcolor="#ffd699"; - ProcedeDUV [fillcolor="#ffd699", label="Procédé DUV (Deep Ultraviolet - 248/193 nm", niveau="1000"]; + ProcedeDUV [fillcolor="#ffd699", label="Photolitographie DUV", niveau="1000"]; // Relations sortantes ProcedeDUV -> Assemblage_ProcedeDUV []; @@ -1464,11 +1464,11 @@ digraph Hierarchie_Composants_Electroniques_Simplifiee { // Relations sortantes du nœud de niveau 11 PaysBas_Assemblage_ProcedeDUV -> PaysBas_geographique [color="darkgreen"]; - PaysBas_Assemblage_ProcedeDUV -> AMSL_PaysBas_Assemblage_ProcedeDUV [color="purple", fontcolor="purple", label="84%", poids="2"]; - AMSL_PaysBas_Assemblage_ProcedeDUV [fillcolor="#d1e0ff", label="ASML", niveau="1012"]; + PaysBas_Assemblage_ProcedeDUV -> ASML_PaysBas_Assemblage_ProcedeDUV [color="purple", fontcolor="purple", label="84%", poids="2"]; + ASML_PaysBas_Assemblage_ProcedeDUV [fillcolor="#d1e0ff", label="ASML", niveau="1012"]; // Relations des nœuds destination - AMSL_PaysBas_Assemblage_ProcedeDUV -> PaysBas_geographique [color="darkgreen"]; + ASML_PaysBas_Assemblage_ProcedeDUV -> PaysBas_geographique [color="darkgreen"]; } } } @@ -1476,7 +1476,7 @@ digraph Hierarchie_Composants_Electroniques_Simplifiee { subgraph cluster_ProcedeEUV { label="ProcedeEUV"; fillcolor="#ffd699"; - ProcedeEUV [fillcolor="#ffd699", label="Procédé EUV (Extreme Ultraviolet - 13.5 nm", niveau="1000"]; + ProcedeEUV [fillcolor="#ffd699", label="Photolitographie EUV", niveau="1000"]; // Relations sortantes ProcedeEUV -> Fluorite []; @@ -1500,12 +1500,12 @@ digraph Hierarchie_Composants_Electroniques_Simplifiee { PaysBas_Assemblage_ProcedeEUV [fillcolor="#e6f2ff", label="Pays-Bas", niveau="1011"]; // Relations sortantes du nœud de niveau 11 - PaysBas_Assemblage_ProcedeEUV -> AMSL_PaysBas_Assemblage_ProcedeEUV [color="purple", fontcolor="purple", label="100%", poids="2"]; + PaysBas_Assemblage_ProcedeEUV -> ASML_PaysBas_Assemblage_ProcedeEUV [color="purple", fontcolor="purple", label="100%", poids="2"]; PaysBas_Assemblage_ProcedeEUV -> PaysBas_geographique [color="darkgreen"]; - AMSL_PaysBas_Assemblage_ProcedeEUV [fillcolor="#d1e0ff", label="ASML", niveau="1012"]; + ASML_PaysBas_Assemblage_ProcedeEUV [fillcolor="#d1e0ff", label="ASML", niveau="1012"]; // Relations des nœuds destination - AMSL_PaysBas_Assemblage_ProcedeEUV -> PaysBas_geographique [color="darkgreen"]; + ASML_PaysBas_Assemblage_ProcedeEUV -> PaysBas_geographique [color="darkgreen"]; } } } @@ -16012,7 +16012,7 @@ digraph Hierarchie_Composants_Electroniques_Simplifiee { { rank=same; CreusetGraphite; CreusetQuartz; } { rank=same; Assemblage_ProcedeDUV; Assemblage_ProcedeEUV; } { rank=same; PaysBas_Assemblage_ProcedeDUV; Japon_Assemblage_ProcedeDUV; PaysBas_Assemblage_ProcedeEUV; } - { rank=same; AMSL_PaysBas_Assemblage_ProcedeDUV; Nikon_Japon_Assemblage_ProcedeDUV; Canon_Japon_Assemblage_ProcedeDUV; AMSL_PaysBas_Assemblage_ProcedeEUV; } + { rank=same; ASML_PaysBas_Assemblage_ProcedeDUV; Nikon_Japon_Assemblage_ProcedeDUV; Canon_Japon_Assemblage_ProcedeDUV; ASML_PaysBas_Assemblage_ProcedeEUV; } // Légende subgraph cluster_legende { diff --git a/scripts/generate_factorized_report.py b/scripts/generate_factorized_report.py index dbf3e7f..f3f2051 100644 --- a/scripts/generate_factorized_report.py +++ b/scripts/generate_factorized_report.py @@ -36,7 +36,7 @@ def load_config(config_path, thresholds_path=THRESHOLDS_PATH): """Charge la configuration depuis les fichiers YAML.""" # Charger la configuration principale if not os.path.exists(config_path): - print(f"Fichier de configuration introuvable: {config_path}") + # print(f"Fichier de configuration introuvable: {config_path}") sys.exit(1) with open(config_path, 'r', encoding='utf-8') as f: @@ -46,7 +46,7 @@ def load_config(config_path, thresholds_path=THRESHOLDS_PATH): required_paths = ['graphe_path', 'template_path', 'corpus_path'] for path in required_paths: if path not in config: - print(f"Configuration incomplète: {path} manquant") + # print(f"Configuration incomplète: {path} manquant") sys.exit(1) # Convertir les chemins relatifs en chemins absolus @@ -120,7 +120,7 @@ def find_prefixed_directory(pattern, base_path=None): search_path = CORPUS_DIR if not os.path.exists(search_path): - print(f"Chemin inexistant: {search_path}") + # print(f"Chemin inexistant: {search_path}") return None for d in os.listdir(search_path): @@ -128,7 +128,7 @@ def find_prefixed_directory(pattern, base_path=None): if os.path.isdir(dir_path) and strip_prefix(d) == pattern.lower(): return os.path.relpath(dir_path, CORPUS_DIR) - print(f"Aucun répertoire correspondant à: '{pattern}' trouvé dans {search_path}") + # print(f"Aucun répertoire correspondant à: '{pattern}' trouvé dans {search_path}") return None def find_corpus_file(pattern, base_path=None): @@ -148,12 +148,12 @@ def find_corpus_file(pattern, base_path=None): else: search_path = CORPUS_DIR - # print(f"Recherche de: '{pattern}' dans {search_path}") + # # print(f"Recherche de: '{pattern}' dans {search_path}") if not os.path.exists(search_path): - print(pattern) - print(base_path) - print(f"Chemin inexistant: {search_path}") + # print(pattern) + # print(base_path) + # print(f"Chemin inexistant: {search_path}") return None if '/' not in pattern: @@ -163,7 +163,7 @@ def find_corpus_file(pattern, base_path=None): continue if strip_prefix(os.path.splitext(file)[0]) == pattern.lower(): rel_path = os.path.relpath(os.path.join(search_path, file), CORPUS_DIR) - # print(f"Fichier trouvé: {rel_path}") + # # print(f"Fichier trouvé: {rel_path}") return rel_path else: # Séparation du chemin en dossier/fichier @@ -172,7 +172,7 @@ def find_corpus_file(pattern, base_path=None): if matched_dir: return find_corpus_file(rest, matched_dir) - print(f"Aucun fichier correspondant à: '{pattern}' trouvé dans {base_path}.") + # print(f"Aucun fichier correspondant à: '{pattern}' trouvé dans {base_path}.") return None @@ -191,16 +191,16 @@ def read_corpus_file(file_path, remove_first_title=False, shift_titles=0): full_path = os.path.join(CORPUS_DIR, file_path) if not os.path.exists(full_path): - print(f"Fichier non trouvé: {full_path}") + # print(f"Fichier non trouvé: {full_path}") return f"Fichier non trouvé: {file_path}" - # print(f"Lecture du fichier: {full_path}") + # # print(f"Lecture du fichier: {full_path}") with open(full_path, 'r', encoding='utf-8') as f: lines = f.readlines() # Supprimer la première ligne si c'est un titre et si demandé if remove_first_title and lines and lines[0].startswith('#'): - # print(f"Suppression du titre: {lines[0].strip()}") + # # print(f"Suppression du titre: {lines[0].strip()}") lines = lines[1:] # Décaler les niveaux de titre si demandé @@ -223,13 +223,13 @@ def parse_graphs(config): # Charger le graphe à analyser graphe_path = GRAPH_PATH if not os.path.exists(graphe_path): - print(f"Fichier de graphe à analyser introuvable: {graphe_path}") + # print(f"Fichier de graphe à analyser introuvable: {graphe_path}") sys.exit(1) # Charger le graphe de référence reference_path = REFERENCE_GRAPH_PATH if not os.path.exists(reference_path): - print(f"Fichier de graphe de référence introuvable: {reference_path}") + # print(f"Fichier de graphe de référence introuvable: {reference_path}") sys.exit(1) try: @@ -274,7 +274,7 @@ def parse_graphs(config): return graph, ref_graph except Exception as e: - print(f"Erreur lors de l'analyse des graphes: {str(e)}") + # print(f"Erreur lors de l'analyse des graphes: {str(e)}") sys.exit(1) def extract_data_from_graph(graph, ref_graph): @@ -307,13 +307,14 @@ def extract_data_from_graph(graph, ref_graph): level = attrs.get('niveau', -1) label = attrs.get('label', node) - if level == 0: # Produit final + if level == 0 or level == 1000: # Produit final data["products"][node] = { "label": label, "components": [], - "assembly": None + "assembly": None, + "level": level } - elif level == 1: # Composant + elif level == 1 or level == 1001: # Composant data["components"][node] = { "label": label, "minerals": [], @@ -327,7 +328,7 @@ def extract_data_from_graph(graph, ref_graph): "treatment": None, "ics_values": {} } - elif level == 10: # Opération + elif level == 10 or level == 1010: # Opération op_type = label.lower() data["operations"][node] = { "label": label, @@ -336,14 +337,14 @@ def extract_data_from_graph(graph, ref_graph): "ihh_pays": attrs.get('ihh_pays', 0), "countries": {} } - elif level == 11: # Pays + elif level == 11 or level == 1011: # Pays data["countries"][node] = { "label": label, "actors": {}, "geo_country": None, "market_share": 0 } - elif level == 12: # Acteur + elif level == 12 or level == 1012: # Acteur data["actors"][node] = { "label": label, "country": None, @@ -369,17 +370,17 @@ def extract_data_from_graph(graph, ref_graph): pass # Relations produit → composant - if source_level == 0 and target_level == 1: + if (source_level == 0 and target_level == 1) or (source_level == 1000 and target_level == 1001): if target not in data["products"][source]["components"]: data["products"][source]["components"].append(target) # Relations produit → opération (assemblage) - elif source_level == 0 and target_level == 10: + elif (source_level == 0 and target_level == 10) or (source_level == 1000 and target_level == 1010): if graph.nodes[target].get('label', '').lower() == 'assemblage': data["products"][source]["assembly"] = target # Relations composant → minerai avec ICS - elif source_level == 1 and target_level == 2: + elif (source_level == 1 or source_level == 1001) and target_level == 2: if target not in data["components"][source]["minerals"]: data["components"][source]["minerals"].append(target) @@ -389,7 +390,7 @@ def extract_data_from_graph(graph, ref_graph): data["minerals"][target]["ics_values"][source] = ics_value # Relations composant → opération (fabrication) - elif source_level == 1 and target_level == 10: + elif (source_level == 1 or source_level == 1001) and target_level == 10: if graph.nodes[target].get('label', '').lower() == 'fabrication': data["components"][source]["manufacturing"] = target @@ -402,18 +403,18 @@ def extract_data_from_graph(graph, ref_graph): data["minerals"][source]["treatment"] = target # Relations opération → pays avec part de marché - elif source_level == 10 and target_level == 11: + elif (source_level == 10 and target_level == 11) or (source_level == 1010 and target_level == 1011): data["operations"][source]["countries"][target] = market_share data["countries"][target]["market_share"] = market_share # Relations pays → acteur avec part de marché - elif source_level == 11 and target_level == 12: + elif (source_level == 11 and target_level == 12) or (source_level == 1011 and target_level == 1012): data["countries"][source]["actors"][target] = market_share data["actors"][target]["market_share"] = market_share data["actors"][target]["country"] = source # Relations pays → pays géographique - elif source_level == 11 and target_level == 99: + elif (source_level == 11 or source_level == 1011) and target_level == 99: country_name = graph.nodes[target].get('label', '') data["countries"][source]["geo_country"] = country_name @@ -426,8 +427,10 @@ def extract_data_from_graph(graph, ref_graph): # Chercher l'opération d'assemblage dans le graphe de référence for source, target, edge_attrs in ref_graph.edges(data=True): if (source == product_id and - ref_graph.nodes[source].get('niveau') == 0 and - ref_graph.nodes[target].get('niveau') == 10 and + ((ref_graph.nodes[source].get('niveau') == 0 and + ref_graph.nodes[target].get('niveau') == 10) or + (ref_graph.nodes[source].get('niveau') == 1000 and + ref_graph.nodes[target].get('niveau') == 1010)) and ref_graph.nodes[target].get('label', '').lower() == 'assemblage'): # L'opération existe dans le graphe de référence @@ -447,7 +450,7 @@ def extract_data_from_graph(graph, ref_graph): # Extraire les relations de l'opération vers les pays for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True): if (op_source == assembly_id and - ref_graph.nodes[op_target].get('niveau') == 11): + (ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)): country_id = op_target @@ -478,7 +481,7 @@ def extract_data_from_graph(graph, ref_graph): # Extraire les relations du pays vers les acteurs for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True): if (country_source == country_id and - ref_graph.nodes[country_target].get('niveau') == 12): + (ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)): actor_id = country_target @@ -522,8 +525,10 @@ def extract_data_from_graph(graph, ref_graph): # Chercher l'opération de fabrication dans le graphe de référence for source, target, edge_attrs in ref_graph.edges(data=True): if (source == component_id and - ref_graph.nodes[source].get('niveau') == 1 and - ref_graph.nodes[target].get('niveau') == 10 and + ((ref_graph.nodes[source].get('niveau') == 1 and + ref_graph.nodes[target].get('niveau') == 10) or + (ref_graph.nodes[source].get('niveau') == 1001 and + ref_graph.nodes[target].get('niveau') == 1010)) and ref_graph.nodes[target].get('label', '').lower() == 'fabrication'): # L'opération existe dans le graphe de référence @@ -543,7 +548,7 @@ def extract_data_from_graph(graph, ref_graph): # Extraire les relations de l'opération vers les pays for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True): if (op_source == manufacturing_id and - ref_graph.nodes[op_target].get('niveau') == 11): + (ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)): country_id = op_target @@ -574,7 +579,7 @@ def extract_data_from_graph(graph, ref_graph): # Extraire les relations du pays vers les acteurs for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True): if (country_source == country_id and - ref_graph.nodes[country_target].get('niveau') == 12): + (ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)): actor_id = country_target @@ -818,6 +823,32 @@ def generate_methodology_section(): """ template = [] template.append("## Méthodologie d'analyse des risques\n") + template.append("### Synthèse de la méthodologie\n") + template.append(""" + Le dispositif d’évaluation des risques proposé repose sur quatre indices clairement définis, chacun analysant un aspect + spécifique des risques dans la chaîne d’approvisionnement numérique. L’indice IHH mesure la concentration géographique ou + industrielle, permettant d’évaluer la dépendance vis-à-vis de certains acteurs ou régions. L’indice ISG indique la + stabilité géopolitique des pays impliqués dans la chaîne de production, en intégrant des critères politiques, sociaux + et climatiques. L’indice ICS quantifie la facilité ou la difficulté à remplacer ou substituer un élément spécifique dans + la chaîne, évaluant ainsi les risques liés à la dépendance technologique et économique. Enfin, l’indice IVC examine la + pression concurrentielle sur les ressources utilisées par le numérique, révélant ainsi le risque potentiel que ces + ressources soient détournées vers d’autres secteurs industriels. + + Ces indices se combinent judicieusement par paires pour une évaluation approfondie et pertinente des risques. La + combinaison IHH-ISG permet d’associer la gravité d'un impact potentiel (IHH) à la probabilité de survenance d’un + événement perturbateur (ISG), créant ainsi une matrice de vulnérabilité combinée utile pour identifier rapidement les + points critiques dans la chaîne de production. La combinaison ICS-IVC fonctionne selon la même logique, mais se concentre + spécifiquement sur les ressources minérales : l’ICS indique la gravité potentielle d'une rupture d'approvisionnement due + à une faible substituabilité, tandis que l’IVC évalue la probabilité que les ressources soient captées par d'autres + secteurs industriels concurrents. Ces combinaisons permettent d’obtenir une analyse précise et opérationnelle du niveau + de risque global. + + Les avantages de cette méthodologie résident dans son approche à la fois systématique et granulaire. Elle permet d’identifier + avec précision les vulnérabilités majeures et leurs origines spécifiques, facilitant ainsi la prise de décision stratégique + éclairée et proactive. En combinant des facteurs géopolitiques, industriels, technologiques et concurrentiels, ces indices + offrent un suivi efficace de la chaîne de fabrication numérique, garantissant ainsi une gestion optimale des risques et la + continuité opérationnelle à long terme. + """) template.append("### Indices et seuils\n") template.append("La méthode d'évaluation intègre 4 indices et leurs combinaisons pour identifier les chemins critiques.\n") @@ -988,10 +1019,14 @@ def trouver_dossier_composant(nom_composant, base_path, prefixe): Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant. """ search_path = os.path.join(CORPUS_DIR, base_path) + print(nom_composant) + print(base_path) + print(search_path) if not os.path.exists(search_path): return None for d in os.listdir(search_path): + print(d) if os.path.isdir(os.path.join(search_path, d)): if composant_match(f"{prefixe}{nom_composant}", d): return os.path.join(base_path, d) @@ -1001,32 +1036,37 @@ def generate_operations_section(data, results, config): """ Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement). """ - # print("DEBUG: Génération de la section des opérations") - # print(f"DEBUG: Nombre de produits: {len(data['products'])}") - # print(f"DEBUG: Nombre de composants: {len(data['components'])}") - # print(f"DEBUG: Nombre d'opérations: {len(data['operations'])}") + # # print("DEBUG: Génération de la section des opérations") + # # print(f"DEBUG: Nombre de produits: {len(data['products'])}") + # # print(f"DEBUG: Nombre de composants: {len(data['components'])}") + # # print(f"DEBUG: Nombre d'opérations: {len(data['operations'])}") template = [] template.append("## Détails des opérations\n") # 1. Traiter les produits finaux (assemblage) for product_id, product in data["products"].items(): - # print(f"DEBUG: Produit {product_id} ({product['label']}), assembly = {product['assembly']}") + # # print(f"DEBUG: Produit {product_id} ({product['label']}), assembly = {product['assembly']}") if product["assembly"]: template.append(f"### {product['label']} et Assemblage\n") # Récupérer la présentation synthétique # product_slug = product['label'].lower().replace(' ', '-') sous_repertoire = f"{product['label']}" - sous_repertoire = trouver_dossier_composant(sous_repertoire, "Assemblage", "Fiche assemblage ") + print(product) + if product["level"] == 0: + type = "Assemblage" + else: + type = "Connexe" + sous_repertoire = trouver_dossier_composant(sous_repertoire, type, "Fiche assemblage ") product_slug = sous_repertoire.split(' ', 2)[2] - presentation_file = find_corpus_file("présentation-synthétique", f"Assemblage/Fiche assemblage {product_slug}") + presentation_file = find_corpus_file("présentation-synthétique", f"{type}/Fiche assemblage {product_slug}") if presentation_file: template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append("") # Récupérer les principaux assembleurs - assembleurs_file = find_corpus_file("principaux-assembleurs", f"Assemblage/Fiche assemblage {product_slug}") + assembleurs_file = find_corpus_file("principaux-assembleurs", f"{type}/Fiche assemblage {product_slug}") if assembleurs_file: template.append(read_corpus_file(assembleurs_file, shift_titles=2)) template.append("") @@ -1061,7 +1101,7 @@ def generate_operations_section(data, results, config): template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**") # IHH - ihh_file = find_corpus_file("matrice-des-risques-liés-à-l-assemblage/indice-de-herfindahl-hirschmann", f"Assemblage/Fiche assemblage {product_slug}") + ihh_file = find_corpus_file("matrice-des-risques-liés-à-l-assemblage/indice-de-herfindahl-hirschmann", f"{type}/Fiche assemblage {product_slug}") if ihh_file: template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append("\n") @@ -1077,7 +1117,7 @@ def generate_operations_section(data, results, config): # 2. Traiter les composants (fabrication) for component_id, component in data["components"].items(): - # print(f"DEBUG: Composant {component_id} ({component['label']}), manufacturing = {component['manufacturing']}") + # # print(f"DEBUG: Composant {component_id} ({component['label']}), manufacturing = {component['manufacturing']}") if component["manufacturing"]: template.append(f"### {component['label']} et Fabrication\n") @@ -1144,9 +1184,9 @@ def generate_operations_section(data, results, config): # 3. Traiter les minerais (détaillés dans une section séparée) result = "\n".join(template) - # print(f"DEBUG: Fin de génération de la section des opérations. Taille: {len(result)} caractères") + # # print(f"DEBUG: Fin de génération de la section des opérations. Taille: {len(result)} caractères") if len(result) <= 30: # Juste le titre de section - # print("DEBUG: ALERTE - La section des opérations est vide ou presque vide!") + # # print("DEBUG: ALERTE - La section des opérations est vide ou presque vide!") # Ajout d'une section de débogage dans le rapport template.append("### DÉBOGAGE - Opérations manquantes\n") template.append("Aucune opération d'assemblage ou de fabrication n'a été trouvée dans les données.\n") @@ -1543,7 +1583,7 @@ def write_report(report, config): """Écrit le rapport généré dans le fichier spécifié.""" with open(TEMPLATE_PATH, 'w', encoding='utf-8') as f: f.write(report) - print(f"Rapport généré avec succès: {TEMPLATE_PATH}") + # print(f"Rapport généré avec succès: {TEMPLATE_PATH}") def main(): """Fonction principale du script.""" diff --git a/scripts/generate_structured_template.py b/scripts/generate_structured_template.py deleted file mode 100644 index 88900b8..0000000 --- a/scripts/generate_structured_template.py +++ /dev/null @@ -1,797 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Script pour générer un rapport structuré d'analyse des vulnérabilités critiques -à partir d'un graphe DOT et d'un corpus documentaire. - -Ce script remplace generate_template.py avec une structure optimisée -pour l'analyse des risques selon la méthodologie définie. -""" - -import os -import sys -import json -from networkx.drawing.nx_agraph import read_dot -import yaml -from pathlib import Path - -# Chemins de base -BASE_DIR = Path(__file__).resolve().parent -CORPUS_DIR = BASE_DIR.parent / "Corpus" -CONFIG_PATH = BASE_DIR.parent / "scripts/config.yml" -THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml" - -def load_config(config_path, thresholds_path=THRESHOLDS_PATH): - """Charge la configuration depuis les fichiers YAML.""" - # Charger la configuration principale - if not os.path.exists(config_path): - print(f"Fichier de configuration introuvable: {config_path}") - sys.exit(1) - - with open(config_path, 'r', encoding='utf-8') as f: - config = yaml.safe_load(f) - - # Vérifier les chemins essentiels - required_paths = ['graphe_path', 'template_path', 'corpus_path'] - for path in required_paths: - if path not in config: - print(f"Configuration incomplète: {path} manquant") - sys.exit(1) - - # Convertir les chemins relatifs en chemins absolus - for path in required_paths: - config[path] = os.path.join(os.path.dirname(config_path), config[path]) - - # Charger les seuils - if os.path.exists(thresholds_path): - with open(thresholds_path, 'r', encoding='utf-8') as f: - thresholds = yaml.safe_load(f) - config['thresholds'] = thresholds.get('seuils', {}) - else: - print(f"Fichier de seuils introuvable: {thresholds_path}") - # Valeurs par défaut si le fichier n'existe pas - config['thresholds'] = { - "IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}}, - "ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 70}, "rouge": {"min": 70}}, - "ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}}, - "IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}} - } - - return config - -def determine_threshold_color(value, index_type, thresholds=None): - """ - Détermine la couleur du seuil en fonction du type d'indice et de sa valeur. - Utilise les seuils de config.yaml si disponibles. - """ - # Valeurs par défaut si les seuils ne sont pas fournis - default_thresholds = { - "IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}}, - "ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 70}, "rouge": {"min": 70}}, - "ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}}, - "IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}} - } - - # Utiliser les seuils fournis ou les valeurs par défaut - thresholds = thresholds or default_thresholds - - # Récupérer les seuils pour cet indice - if index_type in thresholds: - index_thresholds = thresholds[index_type] - - # Déterminer la couleur - if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \ - index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]: - suffix = get_suffix_for_index(index_type, "vert") - return f"VERT ({suffix})" - elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \ - index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \ - index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]: - suffix = get_suffix_for_index(index_type, "orange") - return f"ORANGE ({suffix})" - elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \ - index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]: - suffix = get_suffix_for_index(index_type, "rouge") - return f"ROUGE ({suffix})" - - # Fallback à l'ancienne méthode si les seuils ne sont pas bien définis - if index_type == "IHH": - if value < 15: - return "VERT (Faible)" - elif value < 25: - return "ORANGE (Modérée)" - else: - return "ROUGE (Élevée)" - elif index_type == "ISG": - if value < 40: - return "VERT (Stable)" - elif value < 60: - return "ORANGE (Intermédiaire)" - else: - return "ROUGE (Instable)" - elif index_type == "ICS": - if value < 0.3: - return "VERT (Facile)" - elif value < 0.6: - return "ORANGE (Moyenne)" - else: - return "ROUGE (Difficile)" - elif index_type == "IVC": - if value < 5: - return "VERT (Faible)" - elif value < 15: - return "ORANGE (Modérée)" - else: - return "ROUGE (Forte)" - - return "Non déterminé" - -def get_suffix_for_index(index_type, color): - """Retourne le suffixe approprié pour chaque indice et couleur.""" - suffixes = { - "IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"}, - "ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"}, - "ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"}, - "IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"} - } - - if index_type in suffixes and color in suffixes[index_type]: - return suffixes[index_type][color] - return "Non déterminé" - -def parse_graph(config): - """ - Charge et analyse le graphe DOT. - Extrait les nœuds, leurs attributs et leurs relations. - """ - graphe_path = config['graphe_path'] - if not os.path.exists(graphe_path): - print(f"Fichier de graphe introuvable: {graphe_path}") - sys.exit(1) - - try: - # Charger le graphe avec NetworkX - graph = read_dot(graphe_path) - - # Convertir les attributs en types appropriés - for node, attrs in graph.nodes(data=True): - for key, value in list(attrs.items()): - # Convertir les valeurs numériques - if key in ['niveau', 'ihh_acteurs', 'ihh_pays', 'isg', 'ivc']: - try: - if key in ['isg', 'ivc', 'ihh_acteurs', 'ihh_pays', 'niveau']: - attrs[key] = int(value.strip('"')) - else: - attrs[key] = float(value.strip('"')) - except (ValueError, TypeError): - # Garder la valeur originale si la conversion échoue - pass - elif key == 'label': - # Nettoyer les guillemets des étiquettes - attrs[key] = value.strip('"') - - # Convertir les attributs des arêtes - for u, v, attrs in graph.edges(data=True): - for key, value in list(attrs.items()): - if key in ['ics', 'cout', 'delai', 'technique']: - try: - attrs[key] = float(value.strip('"')) - except (ValueError, TypeError): - pass - elif key == 'label' and '%' in value: - # Extraire le pourcentage - try: - percentage = value.strip('"').replace('%', '') - attrs['percentage'] = float(percentage) - except (ValueError, TypeError): - pass - - return graph - - except Exception as e: - print(f"Erreur lors de l'analyse du graphe: {str(e)}") - sys.exit(1) - -def analyze_geopolitical_stability(graph): - """ - Analyse la stabilité géopolitique des pays dans le graphe. - Identifie les pays et leurs indices ISG. - """ - geo_countries = {} - - # Identifier les nœuds de pays géographiques (niveau 99) - for node, attrs in graph.nodes(data=True): - if attrs.get('niveau') == 99: - country_name = attrs.get('label', node) - isg_value = attrs.get('isg', 0) - - geo_countries[node] = { - 'name': country_name, - 'isg': isg_value, - 'color': determine_threshold_color(isg_value, "ISG") - } - - return geo_countries - -def find_real_path(element, file_type, config, graph=None): - """ - Recherche le chemin réel d'un fichier dans le corpus. - Adapté de l'ancien generate_template.py. - """ - corpus_path = config['corpus_path'] - - # Définir les motifs de recherche selon le type de fichier - if file_type == "introduction": - patterns = ["_intro.md"] - elif file_type == "extraction": - patterns = ["extraction", "Extraction"] - elif file_type == "traitement": - patterns = ["traitement", "Traitement"] - elif file_type == "isg": - patterns = ["isg", "ISG", "stabilité", "Stabilité"] - elif file_type == "ihh_extraction": - patterns = ["ihh.*extraction", "IHH.*Extraction", "Herfindahl.*extraction"] - elif file_type == "ihh_traitement": - patterns = ["ihh.*traitement", "IHH.*Traitement", "Herfindahl.*traitement"] - elif file_type == "ics": - patterns = ["ics", "ICS", "Substituabilité", "substituabilité"] - elif file_type == "ivc": - patterns = ["ivc", "IVC", "Concurrence", "concurrence"] - else: - patterns = [] - - # Préparer le nom de l'élément pour la recherche - element_name = element - if graph and element in graph.nodes and "label" in graph.nodes[element]: - element_name = graph.nodes[element]["label"] - - # Éliminer les caractères spéciaux et normaliser - element_name = element_name.replace("_", " ").lower() - - # Parcourir le corpus pour trouver les fichiers correspondants - matches = [] - for root, dirs, files in os.walk(corpus_path): - for file in files: - if file.endswith(".md"): - file_path = os.path.join(root, file) - relative_path = os.path.relpath(file_path, corpus_path) - - # Vérifier si le fichier correspond aux motifs et à l'élément - file_matches = True - for pattern in patterns: - if pattern.lower() not in relative_path.lower(): - file_matches = False - break - - if file_matches and element_name in relative_path.lower(): - matches.append(relative_path) - - # Retourner le premier fichier correspondant trouvé - if matches: - return matches[0] - return None - -def find_ics_path(element, config, graph=None): - """ - Recherche le chemin d'un fichier ICS pour un élément. - Adapté de l'ancien generate_template.py. - """ - return find_real_path(element, "ics", config, graph) - -def find_ivc_path(element, config, graph=None): - """ - Recherche le chemin d'un fichier IVC pour un élément. - Adapté de l'ancien generate_template.py. - """ - return find_real_path(element, "ivc", config, graph) - -def extract_all_data(graph, geo_countries): - """ - Extrait toutes les données pertinentes du graphe structuré. - """ - data = { - "products": {}, # Produits finaux (N0) - "components": {}, # Composants (N1) - "minerals": {}, # Minerais (N2) - "operations": {}, # Opérations (N10) - "countries": {}, # Pays (N11) - "actors": {} # Acteurs (N12) - } - - # Parcourir tous les nœuds - for node, attrs in graph.nodes(data=True): - level = attrs.get('niveau', -1) - - # Classifier par niveau - if level == 0: # Produit - data["products"][node] = { - "label": attrs.get('label', node), - "components": [] - } - elif level == 1: # Composant - data["components"][node] = { - "label": attrs.get('label', node), - "minerals": [], - "ics_values": {} - } - elif level == 2: # Minerai - data["minerals"][node] = { - "label": attrs.get('label', node), - "ivc": attrs.get('ivc', 0), - "extraction": None, - "treatment": None, - "ics_values": {} - } - elif level == 10: # Opération - data["operations"][node] = { - "label": attrs.get('label', node), - "ihh_acteurs": attrs.get('ihh_acteurs', 0), - "ihh_pays": attrs.get('ihh_pays', 0), - "countries": {} - } - elif level == 11: # Pays - data["countries"][node] = { - "label": attrs.get('label', node), - "actors": {}, - "geo_country": None, - "market_share": 0 - } - elif level == 12: # Acteur - data["actors"][node] = { - "label": attrs.get('label', node), - "market_share": 0 - } - - # Parcourir les arêtes pour établir les relations et parts de marché - for source, target, edge_attrs in graph.edges(data=True): - source_level = graph.nodes[source].get('niveau', -1) - target_level = graph.nodes[target].get('niveau', -1) - - # Extraire part de marché - market_share = 0 - if 'percentage' in edge_attrs: - market_share = edge_attrs['percentage'] - elif 'label' in edge_attrs and '%' in edge_attrs['label']: - try: - market_share = float(edge_attrs['label'].strip('"').replace('%', '')) - except (ValueError, TypeError): - pass - - # Relations produit → composant - if source_level == 0 and target_level == 1: - data["products"][source]["components"].append(target) - - # Relations composant → minerai avec ICS - elif source_level == 1 and target_level == 2: - data["components"][source]["minerals"].append(target) - # Stocker l'ICS s'il est présent - if 'ics' in edge_attrs: - ics_value = edge_attrs['ics'] - data["components"][source]["ics_values"][target] = ics_value - data["minerals"][target]["ics_values"][source] = ics_value - - # Relations minerai → opération - elif source_level == 2 and target_level == 10: - op_label = graph.nodes[target].get('label', '').lower() - if 'extraction' in op_label: - data["minerals"][source]["extraction"] = target - elif 'traitement' in op_label: - data["minerals"][source]["treatment"] = target - - # Relations opération → pays avec part de marché - elif source_level == 10 and target_level == 11: - data["operations"][source]["countries"][target] = market_share - data["countries"][target]["market_share"] = market_share - - # Relations pays → acteur avec part de marché - elif source_level == 11 and target_level == 12: - data["countries"][source]["actors"][target] = market_share - data["actors"][target]["market_share"] = market_share - - # Relations pays → pays géographique - elif source_level == 11 and target_level == 99: - data["countries"][source]["geo_country"] = target - - return data - -def calculate_combined_vulnerabilities(data, geo_countries, config): - """ - Calcule les vulnérabilités combinées selon la méthodologie définie. - Utilise les seuils définis dans la configuration. - """ - results = { - "ihh_isg_combined": {}, # Pour chaque opération - "ics_ivc_combined": {}, # Pour chaque minerai - "chains_classification": { - "critical": [], - "major": [], - "medium": [] - } - } - - # 1. Calculer ISG_combiné pour chaque opération - for op_id, operation in data["operations"].items(): - isg_weighted_sum = 0 - total_share = 0 - - # Parcourir chaque pays impliqué dans l'opération - for country_id, share in operation["countries"].items(): - country = data["countries"][country_id] - geo_country_id = country.get("geo_country") - - if geo_country_id and geo_country_id in geo_countries: - isg_value = geo_countries[geo_country_id]["isg"] - isg_weighted_sum += isg_value * share - total_share += share - - # Calculer la moyenne pondérée - isg_combined = 0 - if total_share > 0: - isg_combined = isg_weighted_sum / total_share - - # Déterminer couleurs et poids - ihh_value = operation["ihh_pays"] - ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds')) - isg_color = determine_threshold_color(isg_combined, "ISG", config.get('thresholds')) - - # Mapper les couleurs aux poids - weight_map = { - "VERT (Faible)": 1, "VERT (Stable)": 1, "VERT (Facile)": 1, - "ORANGE (Modérée)": 2, "ORANGE (Intermédiaire)": 2, "ORANGE (Moyenne)": 2, - "ROUGE (Élevée)": 3, "ROUGE (Instable)": 3, "ROUGE (Difficile)": 3, "ROUGE (Forte)": 3 - } - - ihh_weight = weight_map.get(ihh_color, 1) - isg_weight = weight_map.get(isg_color, 1) - combined_weight = ihh_weight * isg_weight - - # Déterminer vulnérabilité combinée - if combined_weight in [6, 9]: - vulnerability = "ÉLEVÉE à CRITIQUE" - elif combined_weight in [3, 4]: - vulnerability = "MOYENNE" - else: # 1, 2 - vulnerability = "FAIBLE" - - # Stocker résultats - results["ihh_isg_combined"][op_id] = { - "ihh_value": ihh_value, - "ihh_color": ihh_color, - "isg_combined": isg_combined, - "isg_color": isg_color, - "combined_weight": combined_weight, - "vulnerability": vulnerability - } - - # 2. Calculer ICS_moyen pour chaque minerai - for mineral_id, mineral in data["minerals"].items(): - ics_values = list(mineral["ics_values"].values()) - ics_average = 0 - - if ics_values: - ics_average = sum(ics_values) / len(ics_values) - - ivc_value = mineral.get("ivc", 0) - - # Déterminer couleurs et poids - ics_color = determine_threshold_color(ics_average, "ICS", config.get('thresholds')) - ivc_color = determine_threshold_color(ivc_value, "IVC", config.get('thresholds')) - - ics_weight = weight_map.get(ics_color, 1) - ivc_weight = weight_map.get(ivc_color, 1) - combined_weight = ics_weight * ivc_weight - - # Déterminer vulnérabilité combinée - if combined_weight in [6, 9]: - vulnerability = "ÉLEVÉE à CRITIQUE" - elif combined_weight in [3, 4]: - vulnerability = "MOYENNE" - else: # 1, 2 - vulnerability = "FAIBLE" - - # Stocker résultats - results["ics_ivc_combined"][mineral_id] = { - "ics_average": ics_average, - "ics_color": ics_color, - "ivc_value": ivc_value, - "ivc_color": ivc_color, - "combined_weight": combined_weight, - "vulnerability": vulnerability - } - - # 3. Classifier les chaînes - for product_id, product in data["products"].items(): - for component_id in product["components"]: - component = data["components"][component_id] - - for mineral_id in component["minerals"]: - mineral = data["minerals"][mineral_id] - - # Collecter toutes les vulnérabilités dans cette chaîne - chain_vulnerabilities = [] - - # Vulnérabilité ICS+IVC du minerai - if mineral_id in results["ics_ivc_combined"]: - chain_vulnerabilities.append(results["ics_ivc_combined"][mineral_id]["vulnerability"]) - - # Vulnérabilité IHH+ISG extraction - if mineral["extraction"] and mineral["extraction"] in results["ihh_isg_combined"]: - chain_vulnerabilities.append(results["ihh_isg_combined"][mineral["extraction"]]["vulnerability"]) - - # Vulnérabilité IHH+ISG traitement - if mineral["treatment"] and mineral["treatment"] in results["ihh_isg_combined"]: - chain_vulnerabilities.append(results["ihh_isg_combined"][mineral["treatment"]]["vulnerability"]) - - # Classifier la chaîne - chain_info = { - "product": product_id, - "component": component_id, - "mineral": mineral_id, - "vulnerabilities": chain_vulnerabilities - } - - if "ÉLEVÉE à CRITIQUE" in chain_vulnerabilities: - results["chains_classification"]["critical"].append(chain_info) - elif chain_vulnerabilities.count("MOYENNE") >= 3: - results["chains_classification"]["major"].append(chain_info) - elif "MOYENNE" in chain_vulnerabilities: - results["chains_classification"]["medium"].append(chain_info) - - return results - -def generate_structured_template(data, vulnerabilities, geo_countries, config, graph=None): - """ - Génère un template structuré pour le rapport d'analyse des risques. - """ - template = [] - - # 1. Titre principal - template.append("# Évaluation des vulnérabilités critiques\n") - - # 2. Introduction - template.append("## Introduction\n") - # Ajouter référence à l'introduction du corpus - intro_path = find_real_path("introduction", "introduction", config, graph) - if intro_path: - template.append(f"Corpus/{intro_path}\n") - - # 3. Méthodologie - template.append("## Méthodologie d'analyse des risques\n") - template.append("### Indices et seuils\n") - template.append("* IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'une opération\n") - template.append(" * Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)\n") - template.append("* ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément dans le composant ou le procédé\n") - template.append(" * Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)\n") - template.append("* ISG (Stabilité Géopolitique) : stabilité des pays\n") - template.append(" * Seuils : <40 = Vert (Stable), 40-60 = Orange, >60 = Rouge (Instable)\n") - template.append("* IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs que le numérique\n") - template.append(" * Seuils : <5 = Vert (Faible), 5-15 = Orange (Modérée), >15 = Rouge (Forte)\n") - - template.append("### Matrices de vulnérabilité combinée\n") - template.append("#### IHH et ISG\n") - template.append("| ISG_combiné / IHH | Vert | Orange | Rouge |\n") - template.append("| :-- | :-- | :-- | :-- |\n") - template.append("| Vert | 1 (Faible) | 2 (Faible) | 3 (Moyenne) |\n") - template.append("| Orange | 2 (Faible) | 4 (Moyenne) | 6 (Élevée à Critique) |\n") - template.append("| Rouge | 3 (Moyenne) | 6 (Élevée à Critique) | 9 (Élevée à Critique) |\n") - - template.append("#### ICS et IVC\n") - template.append("| ICS_moyen / IVC | Vert | Orange | Rouge |\n") - template.append("| :-- | :-- | :-- | :-- |\n") - template.append("| Vert | 1 (Faible) | 2 (Faible) | 3 (Moyenne) |\n") - template.append("| Orange | 2 (Faible) | 4 (Moyenne) | 6 (Élevée à Critique) |\n") - template.append("| Rouge | 3 (Moyenne) | 6 (Élevée à Critique) | 9 (Élevée à Critique) |\n") - - # 4. Éléments factuels - template.append("## Éléments factuels\n") - - # 4.1 Analyse par produit/composant/minerai - for product_id, product in data["products"].items(): - template.append(f"### {product['label']}\n") - - for component_id in product["components"]: - component = data["components"][component_id] - template.append(f"#### {component['label']}\n") - - for mineral_id in component["minerals"]: - mineral = data["minerals"][mineral_id] - template.append(f"##### {mineral['label']}\n") - - # 4.1.1 Extraction - if mineral["extraction"]: - extraction = data["operations"][mineral["extraction"]] - template.append("##### Extraction\n") - template.append(f"###### Concentration\n") - - # IHH Extraction - ihh_value = extraction["ihh_pays"] - ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds')) - template.append(f"IHH pays: {ihh_value} - {ihh_color}\n") - - # Référence au fichier IHH extraction - ihh_extraction_path = find_real_path(mineral_id, "ihh_extraction", config, graph) - if ihh_extraction_path: - template.append(f"Corpus/{ihh_extraction_path}\n") - - # Liste des pays et acteurs - template.append("**Principaux pays:**\n") - for country_id, share in extraction["countries"].items(): - country = data["countries"][country_id] - template.append(f"- {country['label']}: {share}%\n") - - # 4.1.2 Stabilité Géopolitique (après Extraction) - template.append("#### Stabilité Géopolitique\n") - - # Référence au fichier ISG - isg_path = find_real_path(mineral_id, "isg", config, graph) - if isg_path: - template.append(f"Corpus/{isg_path}\n") - - # 4.1.3 Traitement - if mineral["treatment"]: - treatment = data["operations"][mineral["treatment"]] - template.append("#### Traitement\n") - template.append("##### Concentration\n") - - # IHH Traitement - ihh_value = treatment["ihh_pays"] - ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds')) - template.append(f"IHH pays: {ihh_value} - {ihh_color}\n") - - # Référence au fichier IHH traitement - ihh_treatment_path = find_real_path(mineral_id, "ihh_traitement", config, graph) - if ihh_treatment_path: - template.append(f"Corpus/{ihh_treatment_path}\n") - - # Liste des pays et acteurs - template.append("**Principaux pays:**\n") - for country_id, share in treatment["countries"].items(): - country = data["countries"][country_id] - template.append(f"- {country['label']}: {share}%\n") - - # 4.1.4 Stabilité Géopolitique (après Traitement) - template.append("#### Stabilité Géopolitique\n") - - # Même référence au fichier ISG - if isg_path: - template.append(f"Corpus/{isg_path}\n") - - # 4.1.5 Substituabilité - template.append("#### Substituabilité\n") - - # Calcul de l'ICS moyen pour ce minerai - ics_values = list(mineral["ics_values"].values()) - if ics_values: - ics_average = sum(ics_values) / len(ics_values) - ics_color = determine_threshold_color(ics_average, "ICS", config.get('thresholds')) - template.append(f"ICS moyen: {ics_average:.2f} - {ics_color}\n") - - # Liste des composants avec leur ICS - for comp_id, ics_value in mineral["ics_values"].items(): - comp_label = data["components"][comp_id]["label"] - ics_color = determine_threshold_color(ics_value, "ICS", config.get('thresholds')) - template.append(f"###### {comp_label} -> {mineral['label']} - Coefficient: {ics_value:.2f}\n") - - # Référence au fichier ICS spécifique - ics_path = find_ics_path(mineral_id, config, graph) - if ics_path: - template.append(f"Corpus/{ics_path}\n") - - # 4.1.6 Concurrence - template.append("#### Concurrence\n") - - # IVC - ivc_value = mineral.get("ivc", 0) - ivc_color = determine_threshold_color(ivc_value, "IVC", config.get('thresholds')) - template.append(f"IVC: {ivc_value} - {ivc_color}\n") - - # Référence au fichier IVC - ivc_path = find_ivc_path(mineral_id, config, graph) - if ivc_path: - template.append(f"Corpus/{ivc_path}\n") - - template.append("##### Secteurs concurrents\n") - - # 5. Analyse des vulnérabilités combinées - template.append("## Analyse des vulnérabilités combinées\n") - - # 5.1 IHH+ISG par opération - template.append("### Vulnérabilités IHH+ISG par opération\n") - template.append("| Opération | IHH | ISG combiné | Vulnérabilité combinée |\n") - template.append("|-----------|-----|------------|------------------------|\n") - - for op_id, combined in vulnerabilities["ihh_isg_combined"].items(): - operation_name = data["operations"][op_id]["label"] - ihh_value = combined["ihh_value"] - isg_combined = combined["isg_combined"] - vulnerability = combined["vulnerability"] - template.append(f"| {operation_name} | {ihh_value} ({combined['ihh_color']}) | {isg_combined:.2f} ({combined['isg_color']}) | {vulnerability} |\n") - - # 5.2 ICS+IVC par minerai - template.append("\n### Vulnérabilités ICS+IVC par minerai\n") - template.append("| Minerai | ICS moyen | IVC | Vulnérabilité combinée |\n") - template.append("|---------|-----------|-----|------------------------|\n") - - for mineral_id, combined in vulnerabilities["ics_ivc_combined"].items(): - mineral_name = data["minerals"][mineral_id]["label"] - ics_average = combined["ics_average"] - ivc_value = combined["ivc_value"] - vulnerability = combined["vulnerability"] - template.append(f"| {mineral_name} | {ics_average:.2f} ({combined['ics_color']}) | {ivc_value} ({combined['ivc_color']}) | {vulnerability} |\n") - - # 6. Classification des chaînes - template.append("\n## Classification des chaînes par niveau de risque\n") - - # 6.1 Chaînes critiques - template.append("### Chaînes à risque critique\n") - if vulnerabilities["chains_classification"]["critical"]: - for chain in vulnerabilities["chains_classification"]["critical"]: - product_name = data["products"][chain["product"]]["label"] - component_name = data["components"][chain["component"]]["label"] - mineral_name = data["minerals"][chain["mineral"]]["label"] - - template.append(f"* {product_name} → {component_name} → {mineral_name}\n") - template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n") - else: - template.append("Aucune chaîne à risque critique identifiée.\n\n") - - # 6.2 Chaînes majeures - template.append("### Chaînes à risque majeur\n") - if vulnerabilities["chains_classification"]["major"]: - for chain in vulnerabilities["chains_classification"]["major"]: - product_name = data["products"][chain["product"]]["label"] - component_name = data["components"][chain["component"]]["label"] - mineral_name = data["minerals"][chain["mineral"]]["label"] - - template.append(f"* {product_name} → {component_name} → {mineral_name}\n") - template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n") - else: - template.append("Aucune chaîne à risque majeur identifiée.\n\n") - - # 6.3 Chaînes moyennes - template.append("### Chaînes à risque moyen\n") - if vulnerabilities["chains_classification"]["medium"]: - for chain in vulnerabilities["chains_classification"]["medium"]: - product_name = data["products"][chain["product"]]["label"] - component_name = data["components"][chain["component"]]["label"] - mineral_name = data["minerals"][chain["mineral"]]["label"] - - template.append(f"* {product_name} → {component_name} → {mineral_name}\n") - template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n") - else: - template.append("Aucune chaîne à risque moyen identifiée.\n\n") - - return "\n".join(template) - -def write_template(template, config): - """Écrit le template généré dans le fichier spécifié.""" - template_path = config['template_path'] - with open(template_path, 'w', encoding='utf-8') as f: - f.write(template) - print(f"Template généré avec succès: {template_path}") - -def main(): - """Fonction principale du script.""" - # Charger la configuration - config = load_config(CONFIG_PATH) - - # Analyser le graphe - graph = parse_graph(config) - - # Analyser la stabilité géopolitique - geo_countries = analyze_geopolitical_stability(graph) - - # Extraire toutes les données - data = extract_all_data(graph, geo_countries) - - # Calculer les vulnérabilités combinées - vulnerabilities = calculate_combined_vulnerabilities(data, geo_countries, config) - - # Générer le template structuré - template = generate_structured_template(data, vulnerabilities, geo_countries, config, graph) - - # Écrire le template dans le fichier - write_template(template, config) - -if __name__ == "__main__": - main() diff --git a/scripts/generation_analyse.py b/scripts/generation_analyse.py new file mode 100644 index 0000000..e69de29