Code/scripts/generate_structured_template.py
2025-05-22 12:49:54 +02:00

798 lines
33 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour générer un rapport structuré d'analyse des vulnérabilités critiques
à partir d'un graphe DOT et d'un corpus documentaire.
Ce script remplace generate_template.py avec une structure optimisée
pour l'analyse des risques selon la méthodologie définie.
"""
import os
import sys
import json
from networkx.drawing.nx_agraph import read_dot
import yaml
from pathlib import Path
# Chemins de base
BASE_DIR = Path(__file__).resolve().parent
CORPUS_DIR = BASE_DIR.parent / "Corpus"
CONFIG_PATH = BASE_DIR.parent / "scripts/config.yml"
THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml"
def load_config(config_path, thresholds_path=THRESHOLDS_PATH):
"""Charge la configuration depuis les fichiers YAML."""
# Charger la configuration principale
if not os.path.exists(config_path):
print(f"Fichier de configuration introuvable: {config_path}")
sys.exit(1)
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# Vérifier les chemins essentiels
required_paths = ['graphe_path', 'template_path', 'corpus_path']
for path in required_paths:
if path not in config:
print(f"Configuration incomplète: {path} manquant")
sys.exit(1)
# Convertir les chemins relatifs en chemins absolus
for path in required_paths:
config[path] = os.path.join(os.path.dirname(config_path), config[path])
# Charger les seuils
if os.path.exists(thresholds_path):
with open(thresholds_path, 'r', encoding='utf-8') as f:
thresholds = yaml.safe_load(f)
config['thresholds'] = thresholds.get('seuils', {})
else:
print(f"Fichier de seuils introuvable: {thresholds_path}")
# Valeurs par défaut si le fichier n'existe pas
config['thresholds'] = {
"IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}},
"ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 70}, "rouge": {"min": 70}},
"ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}},
"IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}}
}
return config
def determine_threshold_color(value, index_type, thresholds=None):
"""
Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
Utilise les seuils de config.yaml si disponibles.
"""
# Valeurs par défaut si les seuils ne sont pas fournis
default_thresholds = {
"IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}},
"ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 70}, "rouge": {"min": 70}},
"ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}},
"IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}}
}
# Utiliser les seuils fournis ou les valeurs par défaut
thresholds = thresholds or default_thresholds
# Récupérer les seuils pour cet indice
if index_type in thresholds:
index_thresholds = thresholds[index_type]
# Déterminer la couleur
if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \
index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]:
suffix = get_suffix_for_index(index_type, "vert")
return f"VERT ({suffix})"
elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \
index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \
index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]:
suffix = get_suffix_for_index(index_type, "orange")
return f"ORANGE ({suffix})"
elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \
index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]:
suffix = get_suffix_for_index(index_type, "rouge")
return f"ROUGE ({suffix})"
# Fallback à l'ancienne méthode si les seuils ne sont pas bien définis
if index_type == "IHH":
if value < 15:
return "VERT (Faible)"
elif value < 25:
return "ORANGE (Modérée)"
else:
return "ROUGE (Élevée)"
elif index_type == "ISG":
if value < 40:
return "VERT (Stable)"
elif value < 60:
return "ORANGE (Intermédiaire)"
else:
return "ROUGE (Instable)"
elif index_type == "ICS":
if value < 0.3:
return "VERT (Facile)"
elif value < 0.6:
return "ORANGE (Moyenne)"
else:
return "ROUGE (Difficile)"
elif index_type == "IVC":
if value < 5:
return "VERT (Faible)"
elif value < 15:
return "ORANGE (Modérée)"
else:
return "ROUGE (Forte)"
return "Non déterminé"
def get_suffix_for_index(index_type, color):
"""Retourne le suffixe approprié pour chaque indice et couleur."""
suffixes = {
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
"ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"},
"ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"},
"IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"}
}
if index_type in suffixes and color in suffixes[index_type]:
return suffixes[index_type][color]
return "Non déterminé"
def parse_graph(config):
"""
Charge et analyse le graphe DOT.
Extrait les nœuds, leurs attributs et leurs relations.
"""
graphe_path = config['graphe_path']
if not os.path.exists(graphe_path):
print(f"Fichier de graphe introuvable: {graphe_path}")
sys.exit(1)
try:
# Charger le graphe avec NetworkX
graph = read_dot(graphe_path)
# Convertir les attributs en types appropriés
for node, attrs in graph.nodes(data=True):
for key, value in list(attrs.items()):
# Convertir les valeurs numériques
if key in ['niveau', 'ihh_acteurs', 'ihh_pays', 'isg', 'ivc']:
try:
if key in ['isg', 'ivc', 'ihh_acteurs', 'ihh_pays', 'niveau']:
attrs[key] = int(value.strip('"'))
else:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
# Garder la valeur originale si la conversion échoue
pass
elif key == 'label':
# Nettoyer les guillemets des étiquettes
attrs[key] = value.strip('"')
# Convertir les attributs des arêtes
for u, v, attrs in graph.edges(data=True):
for key, value in list(attrs.items()):
if key in ['ics', 'cout', 'delai', 'technique']:
try:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
pass
elif key == 'label' and '%' in value:
# Extraire le pourcentage
try:
percentage = value.strip('"').replace('%', '')
attrs['percentage'] = float(percentage)
except (ValueError, TypeError):
pass
return graph
except Exception as e:
print(f"Erreur lors de l'analyse du graphe: {str(e)}")
sys.exit(1)
def analyze_geopolitical_stability(graph):
"""
Analyse la stabilité géopolitique des pays dans le graphe.
Identifie les pays et leurs indices ISG.
"""
geo_countries = {}
# Identifier les nœuds de pays géographiques (niveau 99)
for node, attrs in graph.nodes(data=True):
if attrs.get('niveau') == 99:
country_name = attrs.get('label', node)
isg_value = attrs.get('isg', 0)
geo_countries[node] = {
'name': country_name,
'isg': isg_value,
'color': determine_threshold_color(isg_value, "ISG")
}
return geo_countries
def find_real_path(element, file_type, config, graph=None):
"""
Recherche le chemin réel d'un fichier dans le corpus.
Adapté de l'ancien generate_template.py.
"""
corpus_path = config['corpus_path']
# Définir les motifs de recherche selon le type de fichier
if file_type == "introduction":
patterns = ["_intro.md"]
elif file_type == "extraction":
patterns = ["extraction", "Extraction"]
elif file_type == "traitement":
patterns = ["traitement", "Traitement"]
elif file_type == "isg":
patterns = ["isg", "ISG", "stabilité", "Stabilité"]
elif file_type == "ihh_extraction":
patterns = ["ihh.*extraction", "IHH.*Extraction", "Herfindahl.*extraction"]
elif file_type == "ihh_traitement":
patterns = ["ihh.*traitement", "IHH.*Traitement", "Herfindahl.*traitement"]
elif file_type == "ics":
patterns = ["ics", "ICS", "Substituabilité", "substituabilité"]
elif file_type == "ivc":
patterns = ["ivc", "IVC", "Concurrence", "concurrence"]
else:
patterns = []
# Préparer le nom de l'élément pour la recherche
element_name = element
if graph and element in graph.nodes and "label" in graph.nodes[element]:
element_name = graph.nodes[element]["label"]
# Éliminer les caractères spéciaux et normaliser
element_name = element_name.replace("_", " ").lower()
# Parcourir le corpus pour trouver les fichiers correspondants
matches = []
for root, dirs, files in os.walk(corpus_path):
for file in files:
if file.endswith(".md"):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, corpus_path)
# Vérifier si le fichier correspond aux motifs et à l'élément
file_matches = True
for pattern in patterns:
if pattern.lower() not in relative_path.lower():
file_matches = False
break
if file_matches and element_name in relative_path.lower():
matches.append(relative_path)
# Retourner le premier fichier correspondant trouvé
if matches:
return matches[0]
return None
def find_ics_path(element, config, graph=None):
"""
Recherche le chemin d'un fichier ICS pour un élément.
Adapté de l'ancien generate_template.py.
"""
return find_real_path(element, "ics", config, graph)
def find_ivc_path(element, config, graph=None):
"""
Recherche le chemin d'un fichier IVC pour un élément.
Adapté de l'ancien generate_template.py.
"""
return find_real_path(element, "ivc", config, graph)
def extract_all_data(graph, geo_countries):
"""
Extrait toutes les données pertinentes du graphe structuré.
"""
data = {
"products": {}, # Produits finaux (N0)
"components": {}, # Composants (N1)
"minerals": {}, # Minerais (N2)
"operations": {}, # Opérations (N10)
"countries": {}, # Pays (N11)
"actors": {} # Acteurs (N12)
}
# Parcourir tous les nœuds
for node, attrs in graph.nodes(data=True):
level = attrs.get('niveau', -1)
# Classifier par niveau
if level == 0: # Produit
data["products"][node] = {
"label": attrs.get('label', node),
"components": []
}
elif level == 1: # Composant
data["components"][node] = {
"label": attrs.get('label', node),
"minerals": [],
"ics_values": {}
}
elif level == 2: # Minerai
data["minerals"][node] = {
"label": attrs.get('label', node),
"ivc": attrs.get('ivc', 0),
"extraction": None,
"treatment": None,
"ics_values": {}
}
elif level == 10: # Opération
data["operations"][node] = {
"label": attrs.get('label', node),
"ihh_acteurs": attrs.get('ihh_acteurs', 0),
"ihh_pays": attrs.get('ihh_pays', 0),
"countries": {}
}
elif level == 11: # Pays
data["countries"][node] = {
"label": attrs.get('label', node),
"actors": {},
"geo_country": None,
"market_share": 0
}
elif level == 12: # Acteur
data["actors"][node] = {
"label": attrs.get('label', node),
"market_share": 0
}
# Parcourir les arêtes pour établir les relations et parts de marché
for source, target, edge_attrs in graph.edges(data=True):
source_level = graph.nodes[source].get('niveau', -1)
target_level = graph.nodes[target].get('niveau', -1)
# Extraire part de marché
market_share = 0
if 'percentage' in edge_attrs:
market_share = edge_attrs['percentage']
elif 'label' in edge_attrs and '%' in edge_attrs['label']:
try:
market_share = float(edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Relations produit → composant
if source_level == 0 and target_level == 1:
data["products"][source]["components"].append(target)
# Relations composant → minerai avec ICS
elif source_level == 1 and target_level == 2:
data["components"][source]["minerals"].append(target)
# Stocker l'ICS s'il est présent
if 'ics' in edge_attrs:
ics_value = edge_attrs['ics']
data["components"][source]["ics_values"][target] = ics_value
data["minerals"][target]["ics_values"][source] = ics_value
# Relations minerai → opération
elif source_level == 2 and target_level == 10:
op_label = graph.nodes[target].get('label', '').lower()
if 'extraction' in op_label:
data["minerals"][source]["extraction"] = target
elif 'traitement' in op_label:
data["minerals"][source]["treatment"] = target
# Relations opération → pays avec part de marché
elif source_level == 10 and target_level == 11:
data["operations"][source]["countries"][target] = market_share
data["countries"][target]["market_share"] = market_share
# Relations pays → acteur avec part de marché
elif source_level == 11 and target_level == 12:
data["countries"][source]["actors"][target] = market_share
data["actors"][target]["market_share"] = market_share
# Relations pays → pays géographique
elif source_level == 11 and target_level == 99:
data["countries"][source]["geo_country"] = target
return data
def calculate_combined_vulnerabilities(data, geo_countries, config):
"""
Calcule les vulnérabilités combinées selon la méthodologie définie.
Utilise les seuils définis dans la configuration.
"""
results = {
"ihh_isg_combined": {}, # Pour chaque opération
"ics_ivc_combined": {}, # Pour chaque minerai
"chains_classification": {
"critical": [],
"major": [],
"medium": []
}
}
# 1. Calculer ISG_combiné pour chaque opération
for op_id, operation in data["operations"].items():
isg_weighted_sum = 0
total_share = 0
# Parcourir chaque pays impliqué dans l'opération
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country_id = country.get("geo_country")
if geo_country_id and geo_country_id in geo_countries:
isg_value = geo_countries[geo_country_id]["isg"]
isg_weighted_sum += isg_value * share
total_share += share
# Calculer la moyenne pondérée
isg_combined = 0
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
# Déterminer couleurs et poids
ihh_value = operation["ihh_pays"]
ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds'))
isg_color = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
# Mapper les couleurs aux poids
weight_map = {
"VERT (Faible)": 1, "VERT (Stable)": 1, "VERT (Facile)": 1,
"ORANGE (Modérée)": 2, "ORANGE (Intermédiaire)": 2, "ORANGE (Moyenne)": 2,
"ROUGE (Élevée)": 3, "ROUGE (Instable)": 3, "ROUGE (Difficile)": 3, "ROUGE (Forte)": 3
}
ihh_weight = weight_map.get(ihh_color, 1)
isg_weight = weight_map.get(isg_color, 1)
combined_weight = ihh_weight * isg_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ihh_isg_combined"][op_id] = {
"ihh_value": ihh_value,
"ihh_color": ihh_color,
"isg_combined": isg_combined,
"isg_color": isg_color,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 2. Calculer ICS_moyen pour chaque minerai
for mineral_id, mineral in data["minerals"].items():
ics_values = list(mineral["ics_values"].values())
ics_average = 0
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
ivc_value = mineral.get("ivc", 0)
# Déterminer couleurs et poids
ics_color = determine_threshold_color(ics_average, "ICS", config.get('thresholds'))
ivc_color = determine_threshold_color(ivc_value, "IVC", config.get('thresholds'))
ics_weight = weight_map.get(ics_color, 1)
ivc_weight = weight_map.get(ivc_color, 1)
combined_weight = ics_weight * ivc_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ics_ivc_combined"][mineral_id] = {
"ics_average": ics_average,
"ics_color": ics_color,
"ivc_value": ivc_value,
"ivc_color": ivc_color,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 3. Classifier les chaînes
for product_id, product in data["products"].items():
for component_id in product["components"]:
component = data["components"][component_id]
for mineral_id in component["minerals"]:
mineral = data["minerals"][mineral_id]
# Collecter toutes les vulnérabilités dans cette chaîne
chain_vulnerabilities = []
# Vulnérabilité ICS+IVC du minerai
if mineral_id in results["ics_ivc_combined"]:
chain_vulnerabilities.append(results["ics_ivc_combined"][mineral_id]["vulnerability"])
# Vulnérabilité IHH+ISG extraction
if mineral["extraction"] and mineral["extraction"] in results["ihh_isg_combined"]:
chain_vulnerabilities.append(results["ihh_isg_combined"][mineral["extraction"]]["vulnerability"])
# Vulnérabilité IHH+ISG traitement
if mineral["treatment"] and mineral["treatment"] in results["ihh_isg_combined"]:
chain_vulnerabilities.append(results["ihh_isg_combined"][mineral["treatment"]]["vulnerability"])
# Classifier la chaîne
chain_info = {
"product": product_id,
"component": component_id,
"mineral": mineral_id,
"vulnerabilities": chain_vulnerabilities
}
if "ÉLEVÉE à CRITIQUE" in chain_vulnerabilities:
results["chains_classification"]["critical"].append(chain_info)
elif chain_vulnerabilities.count("MOYENNE") >= 3:
results["chains_classification"]["major"].append(chain_info)
elif "MOYENNE" in chain_vulnerabilities:
results["chains_classification"]["medium"].append(chain_info)
return results
def generate_structured_template(data, vulnerabilities, geo_countries, config, graph=None):
"""
Génère un template structuré pour le rapport d'analyse des risques.
"""
template = []
# 1. Titre principal
template.append("# Évaluation des vulnérabilités critiques\n")
# 2. Introduction
template.append("## Introduction\n")
# Ajouter référence à l'introduction du corpus
intro_path = find_real_path("introduction", "introduction", config, graph)
if intro_path:
template.append(f"Corpus/{intro_path}\n")
# 3. Méthodologie
template.append("## Méthodologie d'analyse des risques\n")
template.append("### Indices et seuils\n")
template.append("* IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'une opération\n")
template.append(" * Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)\n")
template.append("* ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément dans le composant ou le procédé\n")
template.append(" * Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)\n")
template.append("* ISG (Stabilité Géopolitique) : stabilité des pays\n")
template.append(" * Seuils : <40 = Vert (Stable), 40-60 = Orange, >60 = Rouge (Instable)\n")
template.append("* IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs que le numérique\n")
template.append(" * Seuils : <5 = Vert (Faible), 5-15 = Orange (Modérée), >15 = Rouge (Forte)\n")
template.append("### Matrices de vulnérabilité combinée\n")
template.append("#### IHH et ISG\n")
template.append("| ISG_combiné / IHH | Vert | Orange | Rouge |\n")
template.append("| :-- | :-- | :-- | :-- |\n")
template.append("| Vert | 1 (Faible) | 2 (Faible) | 3 (Moyenne) |\n")
template.append("| Orange | 2 (Faible) | 4 (Moyenne) | 6 (Élevée à Critique) |\n")
template.append("| Rouge | 3 (Moyenne) | 6 (Élevée à Critique) | 9 (Élevée à Critique) |\n")
template.append("#### ICS et IVC\n")
template.append("| ICS_moyen / IVC | Vert | Orange | Rouge |\n")
template.append("| :-- | :-- | :-- | :-- |\n")
template.append("| Vert | 1 (Faible) | 2 (Faible) | 3 (Moyenne) |\n")
template.append("| Orange | 2 (Faible) | 4 (Moyenne) | 6 (Élevée à Critique) |\n")
template.append("| Rouge | 3 (Moyenne) | 6 (Élevée à Critique) | 9 (Élevée à Critique) |\n")
# 4. Éléments factuels
template.append("## Éléments factuels\n")
# 4.1 Analyse par produit/composant/minerai
for product_id, product in data["products"].items():
template.append(f"### {product['label']}\n")
for component_id in product["components"]:
component = data["components"][component_id]
template.append(f"#### {component['label']}\n")
for mineral_id in component["minerals"]:
mineral = data["minerals"][mineral_id]
template.append(f"##### {mineral['label']}\n")
# 4.1.1 Extraction
if mineral["extraction"]:
extraction = data["operations"][mineral["extraction"]]
template.append("##### Extraction\n")
template.append(f"###### Concentration\n")
# IHH Extraction
ihh_value = extraction["ihh_pays"]
ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds'))
template.append(f"IHH pays: {ihh_value} - {ihh_color}\n")
# Référence au fichier IHH extraction
ihh_extraction_path = find_real_path(mineral_id, "ihh_extraction", config, graph)
if ihh_extraction_path:
template.append(f"Corpus/{ihh_extraction_path}\n")
# Liste des pays et acteurs
template.append("**Principaux pays:**\n")
for country_id, share in extraction["countries"].items():
country = data["countries"][country_id]
template.append(f"- {country['label']}: {share}%\n")
# 4.1.2 Stabilité Géopolitique (après Extraction)
template.append("#### Stabilité Géopolitique\n")
# Référence au fichier ISG
isg_path = find_real_path(mineral_id, "isg", config, graph)
if isg_path:
template.append(f"Corpus/{isg_path}\n")
# 4.1.3 Traitement
if mineral["treatment"]:
treatment = data["operations"][mineral["treatment"]]
template.append("#### Traitement\n")
template.append("##### Concentration\n")
# IHH Traitement
ihh_value = treatment["ihh_pays"]
ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds'))
template.append(f"IHH pays: {ihh_value} - {ihh_color}\n")
# Référence au fichier IHH traitement
ihh_treatment_path = find_real_path(mineral_id, "ihh_traitement", config, graph)
if ihh_treatment_path:
template.append(f"Corpus/{ihh_treatment_path}\n")
# Liste des pays et acteurs
template.append("**Principaux pays:**\n")
for country_id, share in treatment["countries"].items():
country = data["countries"][country_id]
template.append(f"- {country['label']}: {share}%\n")
# 4.1.4 Stabilité Géopolitique (après Traitement)
template.append("#### Stabilité Géopolitique\n")
# Même référence au fichier ISG
if isg_path:
template.append(f"Corpus/{isg_path}\n")
# 4.1.5 Substituabilité
template.append("#### Substituabilité\n")
# Calcul de l'ICS moyen pour ce minerai
ics_values = list(mineral["ics_values"].values())
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
ics_color = determine_threshold_color(ics_average, "ICS", config.get('thresholds'))
template.append(f"ICS moyen: {ics_average:.2f} - {ics_color}\n")
# Liste des composants avec leur ICS
for comp_id, ics_value in mineral["ics_values"].items():
comp_label = data["components"][comp_id]["label"]
ics_color = determine_threshold_color(ics_value, "ICS", config.get('thresholds'))
template.append(f"###### {comp_label} -> {mineral['label']} - Coefficient: {ics_value:.2f}\n")
# Référence au fichier ICS spécifique
ics_path = find_ics_path(mineral_id, config, graph)
if ics_path:
template.append(f"Corpus/{ics_path}\n")
# 4.1.6 Concurrence
template.append("#### Concurrence\n")
# IVC
ivc_value = mineral.get("ivc", 0)
ivc_color = determine_threshold_color(ivc_value, "IVC", config.get('thresholds'))
template.append(f"IVC: {ivc_value} - {ivc_color}\n")
# Référence au fichier IVC
ivc_path = find_ivc_path(mineral_id, config, graph)
if ivc_path:
template.append(f"Corpus/{ivc_path}\n")
template.append("##### Secteurs concurrents\n")
# 5. Analyse des vulnérabilités combinées
template.append("## Analyse des vulnérabilités combinées\n")
# 5.1 IHH+ISG par opération
template.append("### Vulnérabilités IHH+ISG par opération\n")
template.append("| Opération | IHH | ISG combiné | Vulnérabilité combinée |\n")
template.append("|-----------|-----|------------|------------------------|\n")
for op_id, combined in vulnerabilities["ihh_isg_combined"].items():
operation_name = data["operations"][op_id]["label"]
ihh_value = combined["ihh_value"]
isg_combined = combined["isg_combined"]
vulnerability = combined["vulnerability"]
template.append(f"| {operation_name} | {ihh_value} ({combined['ihh_color']}) | {isg_combined:.2f} ({combined['isg_color']}) | {vulnerability} |\n")
# 5.2 ICS+IVC par minerai
template.append("\n### Vulnérabilités ICS+IVC par minerai\n")
template.append("| Minerai | ICS moyen | IVC | Vulnérabilité combinée |\n")
template.append("|---------|-----------|-----|------------------------|\n")
for mineral_id, combined in vulnerabilities["ics_ivc_combined"].items():
mineral_name = data["minerals"][mineral_id]["label"]
ics_average = combined["ics_average"]
ivc_value = combined["ivc_value"]
vulnerability = combined["vulnerability"]
template.append(f"| {mineral_name} | {ics_average:.2f} ({combined['ics_color']}) | {ivc_value} ({combined['ivc_color']}) | {vulnerability} |\n")
# 6. Classification des chaînes
template.append("\n## Classification des chaînes par niveau de risque\n")
# 6.1 Chaînes critiques
template.append("### Chaînes à risque critique\n")
if vulnerabilities["chains_classification"]["critical"]:
for chain in vulnerabilities["chains_classification"]["critical"]:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"* {product_name}{component_name}{mineral_name}\n")
template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n")
else:
template.append("Aucune chaîne à risque critique identifiée.\n\n")
# 6.2 Chaînes majeures
template.append("### Chaînes à risque majeur\n")
if vulnerabilities["chains_classification"]["major"]:
for chain in vulnerabilities["chains_classification"]["major"]:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"* {product_name}{component_name}{mineral_name}\n")
template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n")
else:
template.append("Aucune chaîne à risque majeur identifiée.\n\n")
# 6.3 Chaînes moyennes
template.append("### Chaînes à risque moyen\n")
if vulnerabilities["chains_classification"]["medium"]:
for chain in vulnerabilities["chains_classification"]["medium"]:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"* {product_name}{component_name}{mineral_name}\n")
template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n")
else:
template.append("Aucune chaîne à risque moyen identifiée.\n\n")
return "\n".join(template)
def write_template(template, config):
"""Écrit le template généré dans le fichier spécifié."""
template_path = config['template_path']
with open(template_path, 'w', encoding='utf-8') as f:
f.write(template)
print(f"Template généré avec succès: {template_path}")
def main():
"""Fonction principale du script."""
# Charger la configuration
config = load_config(CONFIG_PATH)
# Analyser le graphe
graph = parse_graph(config)
# Analyser la stabilité géopolitique
geo_countries = analyze_geopolitical_stability(graph)
# Extraire toutes les données
data = extract_all_data(graph, geo_countries)
# Calculer les vulnérabilités combinées
vulnerabilities = calculate_combined_vulnerabilities(data, geo_countries, config)
# Générer le template structuré
template = generate_structured_template(data, vulnerabilities, geo_countries, config, graph)
# Écrire le template dans le fichier
write_template(template, config)
if __name__ == "__main__":
main()