Code/scripts/generate_factorized_report.py
Fabrication du Numérique 81f5bb3b66 Corrections diverses
2025-05-23 21:57:27 +02:00

1613 lines
76 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour générer un rapport factorisé des vulnérabilités critiques
suivant la structure définie dans Remarques.md.
"""
import os
import sys
import re
import yaml
from networkx.drawing.nx_agraph import read_dot
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent
CORPUS_DIR = BASE_DIR.parent / "Corpus"
CONFIG_PATH = BASE_DIR / "config.yml"
THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml"
REFERENCE_GRAPH_PATH = BASE_DIR.parent / "schema.txt"
GRAPH_PATH = BASE_DIR.parent / "graphe.dot"
TEMPLATE_PATH = BASE_DIR / "rapport_final.md"
DICTIONNAIRE_CRITICITES = {
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
"ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"},
"ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"},
"IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"}
}
POIDS_COULEURS = {
"Vert": 1,
"Orange": 2,
"Rouge": 3
}
def load_config(config_path, thresholds_path=THRESHOLDS_PATH):
"""Charge la configuration depuis les fichiers YAML."""
# Charger la configuration principale
if not os.path.exists(config_path):
# print(f"Fichier de configuration introuvable: {config_path}")
sys.exit(1)
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# Vérifier les chemins essentiels
required_paths = ['graphe_path', 'template_path', 'corpus_path']
for path in required_paths:
if path not in config:
# print(f"Configuration incomplète: {path} manquant")
sys.exit(1)
# Convertir les chemins relatifs en chemins absolus
for path in required_paths:
config[path] = os.path.join(os.path.dirname(config_path), config[path])
# Charger les seuils
if os.path.exists(thresholds_path):
with open(thresholds_path, 'r', encoding='utf-8') as f:
thresholds = yaml.safe_load(f)
config['thresholds'] = thresholds.get('seuils', {})
return config
def determine_threshold_color(value, index_type, thresholds):
"""
Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
Utilise les seuils de config.yaml si disponibles.
"""
# Récupérer les seuils pour cet indice
if index_type in thresholds:
index_thresholds = thresholds[index_type]
# Déterminer la couleur
if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \
index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]:
suffix = get_suffix_for_index(index_type, "vert")
return "Vert", suffix
elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \
index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \
index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]:
suffix = get_suffix_for_index(index_type, "orange")
return "Orange", suffix
elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \
index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]:
suffix = get_suffix_for_index(index_type, "rouge")
return "Rouge", suffix
return "Non déterminé", ""
def get_suffix_for_index(index_type, color):
"""Retourne le suffixe approprié pour chaque indice et couleur."""
suffixes = DICTIONNAIRE_CRITICITES
if index_type in suffixes and color in suffixes[index_type]:
return suffixes[index_type][color]
return ""
def get_weight_for_color(color):
"""Retourne le poids correspondant à une couleur."""
weights = POIDS_COULEURS
return weights.get(color, 0)
def strip_prefix(name):
"""Supprime le préfixe numérique éventuel d'un nom de fichier ou de dossier."""
return re.sub(r'^\d+[-_ ]*', '', name).lower()
def find_prefixed_directory(pattern, base_path=None):
"""
Recherche un sous-répertoire dont le nom (sans préfixe) correspond au pattern.
Args:
pattern: Nom du répertoire sans préfixe
base_path: Répertoire de base où chercher
Returns:
Le chemin relatif du répertoire trouvé (avec préfixe) ou None
"""
if base_path:
search_path = os.path.join(CORPUS_DIR, base_path)
else:
search_path = CORPUS_DIR
if not os.path.exists(search_path):
# print(f"Chemin inexistant: {search_path}")
return None
for d in os.listdir(search_path):
dir_path = os.path.join(search_path, d)
if os.path.isdir(dir_path) and strip_prefix(d) == pattern.lower():
return os.path.relpath(dir_path, CORPUS_DIR)
# print(f"Aucun répertoire correspondant à: '{pattern}' trouvé dans {search_path}")
return None
def find_corpus_file(pattern, base_path=None):
"""
Recherche récursive dans le corpus d'un fichier en ignorant les préfixes numériques dans les dossiers et fichiers.
Args:
pattern: Chemin relatif type "sous-dossier/nom-fichier"
base_path: Dossier de base à partir duquel chercher
Returns:
Chemin relatif du fichier trouvé ou None
"""
if base_path:
search_path = os.path.join(CORPUS_DIR, base_path)
else:
search_path = CORPUS_DIR
# # print(f"Recherche de: '{pattern}' dans {search_path}")
if not os.path.exists(search_path):
# print(pattern)
# print(base_path)
# print(f"Chemin inexistant: {search_path}")
return None
if '/' not in pattern:
# Recherche directe d'un fichier
for file in os.listdir(search_path):
if not file.endswith('.md'):
continue
if strip_prefix(os.path.splitext(file)[0]) == pattern.lower():
rel_path = os.path.relpath(os.path.join(search_path, file), CORPUS_DIR)
# # print(f"Fichier trouvé: {rel_path}")
return rel_path
else:
# Séparation du chemin en dossier/fichier
first, rest = pattern.split('/', 1)
matched_dir = find_prefixed_directory(first, base_path)
if matched_dir:
return find_corpus_file(rest, matched_dir)
# print(f"Aucun fichier correspondant à: '{pattern}' trouvé dans {base_path}.")
return None
def read_corpus_file(file_path, remove_first_title=False, shift_titles=0):
"""
Lit un fichier du corpus et applique les transformations demandées.
Args:
file_path: Chemin relatif du fichier dans le corpus
remove_first_title: Si True, supprime la première ligne de titre
shift_titles: Nombre de niveaux à ajouter aux titres
Returns:
Le contenu du fichier avec les transformations appliquées
"""
full_path = os.path.join(CORPUS_DIR, file_path)
if not os.path.exists(full_path):
# print(f"Fichier non trouvé: {full_path}")
return f"Fichier non trouvé: {file_path}"
# # print(f"Lecture du fichier: {full_path}")
with open(full_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
# Supprimer la première ligne si c'est un titre et si demandé
if remove_first_title and lines and lines[0].startswith('#'):
# # print(f"Suppression du titre: {lines[0].strip()}")
lines = lines[1:]
# Décaler les niveaux de titre si demandé
if shift_titles > 0:
for i in range(len(lines)):
if lines[i].startswith('#'):
lines[i] = '#' * shift_titles + lines[i]
# Nettoyer les retours à la ligne superflus
content = ''.join(lines)
# Supprimer les retours à la ligne en fin de contenu
content = content.rstrip('\n') + '\n'
return content
def parse_graphs(config):
"""
Charge et analyse les graphes DOT (analyse et référence).
"""
# Charger le graphe à analyser
graphe_path = GRAPH_PATH
if not os.path.exists(graphe_path):
# print(f"Fichier de graphe à analyser introuvable: {graphe_path}")
sys.exit(1)
# Charger le graphe de référence
reference_path = REFERENCE_GRAPH_PATH
if not os.path.exists(reference_path):
# print(f"Fichier de graphe de référence introuvable: {reference_path}")
sys.exit(1)
try:
# Charger les graphes avec NetworkX
graph = read_dot(graphe_path)
ref_graph = read_dot(reference_path)
# Convertir les attributs en types appropriés pour les deux graphes
for g in [graph, ref_graph]:
for node, attrs in g.nodes(data=True):
for key, value in list(attrs.items()):
# Convertir les valeurs numériques
if key in ['niveau', 'ihh_acteurs', 'ihh_pays', 'isg', 'ivc']:
try:
if key in ['isg', 'ivc', 'ihh_acteurs', 'ihh_pays', 'niveau']:
attrs[key] = int(value.strip('"'))
else:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
# Garder la valeur originale si la conversion échoue
pass
elif key == 'label':
# Nettoyer les guillemets des étiquettes
attrs[key] = value.strip('"')
# Convertir les attributs des arêtes
for u, v, attrs in g.edges(data=True):
for key, value in list(attrs.items()):
if key in ['ics', 'cout', 'delai', 'technique']:
try:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
pass
elif key == 'label' and '%' in value:
# Extraire le pourcentage
try:
percentage = value.strip('"').replace('%', '')
attrs['percentage'] = float(percentage)
except (ValueError, TypeError):
pass
return graph, ref_graph
except Exception as e:
# print(f"Erreur lors de l'analyse des graphes: {str(e)}")
sys.exit(1)
def extract_data_from_graph(graph, ref_graph):
"""
Extrait toutes les données pertinentes des graphes DOT.
"""
data = {
"products": {}, # Produits finaux (N0)
"components": {}, # Composants (N1)
"minerals": {}, # Minerais (N2)
"operations": {}, # Opérations (N10)
"countries": {}, # Pays (N11)
"geo_countries": {}, # Pays géographiques (N99)
"actors": {} # Acteurs (N12)
}
# Extraire tous les pays géographiques du graphe de référence
for node, attrs in ref_graph.nodes(data=True):
if attrs.get('niveau') == 99:
country_name = attrs.get('label', node)
isg_value = attrs.get('isg', 0)
data["geo_countries"][country_name] = {
"id": node,
"isg": isg_value
}
# Extraire les nœuds du graphe à analyser
for node, attrs in graph.nodes(data=True):
level = attrs.get('niveau', -1)
label = attrs.get('label', node)
if level == 0 or level == 1000: # Produit final
data["products"][node] = {
"label": label,
"components": [],
"assembly": None,
"level": level
}
elif level == 1 or level == 1001: # Composant
data["components"][node] = {
"label": label,
"minerals": [],
"manufacturing": None
}
elif level == 2: # Minerai
data["minerals"][node] = {
"label": label,
"ivc": attrs.get('ivc', 0),
"extraction": None,
"treatment": None,
"ics_values": {}
}
elif level == 10 or level == 1010: # Opération
op_type = label.lower()
data["operations"][node] = {
"label": label,
"type": op_type,
"ihh_acteurs": attrs.get('ihh_acteurs', 0),
"ihh_pays": attrs.get('ihh_pays', 0),
"countries": {}
}
elif level == 11 or level == 1011: # Pays
data["countries"][node] = {
"label": label,
"actors": {},
"geo_country": None,
"market_share": 0
}
elif level == 12 or level == 1012: # Acteur
data["actors"][node] = {
"label": label,
"country": None,
"market_share": 0
}
# Extraire les relations et attributs des arêtes
for source, target, edge_attrs in graph.edges(data=True):
if source not in graph.nodes or target not in graph.nodes:
continue
source_level = graph.nodes[source].get('niveau', -1)
target_level = graph.nodes[target].get('niveau', -1)
# Extraire part de marché
market_share = 0
if 'percentage' in edge_attrs:
market_share = edge_attrs['percentage']
elif 'label' in edge_attrs and '%' in edge_attrs['label']:
try:
market_share = float(edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Relations produit → composant
if (source_level == 0 and target_level == 1) or (source_level == 1000 and target_level == 1001):
if target not in data["products"][source]["components"]:
data["products"][source]["components"].append(target)
# Relations produit → opération (assemblage)
elif (source_level == 0 and target_level == 10) or (source_level == 1000 and target_level == 1010):
if graph.nodes[target].get('label', '').lower() == 'assemblage':
data["products"][source]["assembly"] = target
# Relations composant → minerai avec ICS
elif (source_level == 1 or source_level == 1001) and target_level == 2:
if target not in data["components"][source]["minerals"]:
data["components"][source]["minerals"].append(target)
# Stocker l'ICS s'il est présent
if 'ics' in edge_attrs:
ics_value = edge_attrs['ics']
data["minerals"][target]["ics_values"][source] = ics_value
# Relations composant → opération (fabrication)
elif (source_level == 1 or source_level == 1001) and target_level == 10:
if graph.nodes[target].get('label', '').lower() == 'fabrication':
data["components"][source]["manufacturing"] = target
# Relations minerai → opération (extraction/traitement)
elif source_level == 2 and target_level == 10:
op_label = graph.nodes[target].get('label', '').lower()
if 'extraction' in op_label:
data["minerals"][source]["extraction"] = target
elif 'traitement' in op_label:
data["minerals"][source]["treatment"] = target
# Relations opération → pays avec part de marché
elif (source_level == 10 and target_level == 11) or (source_level == 1010 and target_level == 1011):
data["operations"][source]["countries"][target] = market_share
data["countries"][target]["market_share"] = market_share
# Relations pays → acteur avec part de marché
elif (source_level == 11 and target_level == 12) or (source_level == 1011 and target_level == 1012):
data["countries"][source]["actors"][target] = market_share
data["actors"][target]["market_share"] = market_share
data["actors"][target]["country"] = source
# Relations pays → pays géographique
elif (source_level == 11 or source_level == 1011) and target_level == 99:
country_name = graph.nodes[target].get('label', '')
data["countries"][source]["geo_country"] = country_name
# Compléter les opérations manquantes pour les produits et composants
# en les récupérant du graphe de référence si elles existent
# Pour les produits finaux (N0)
for product_id, product_data in data["products"].items():
if product_data["assembly"] is None:
# Chercher l'opération d'assemblage dans le graphe de référence
for source, target, edge_attrs in ref_graph.edges(data=True):
if (source == product_id and
((ref_graph.nodes[source].get('niveau') == 0 and
ref_graph.nodes[target].get('niveau') == 10) or
(ref_graph.nodes[source].get('niveau') == 1000 and
ref_graph.nodes[target].get('niveau') == 1010)) and
ref_graph.nodes[target].get('label', '').lower() == 'assemblage'):
# L'opération existe dans le graphe de référence
assembly_id = target
product_data["assembly"] = assembly_id
# Ajouter l'opération si elle n'existe pas déjà
if assembly_id not in data["operations"]:
data["operations"][assembly_id] = {
"label": ref_graph.nodes[assembly_id].get('label', assembly_id),
"type": "assemblage",
"ihh_acteurs": ref_graph.nodes[assembly_id].get('ihh_acteurs', 0),
"ihh_pays": ref_graph.nodes[assembly_id].get('ihh_pays', 0),
"countries": {}
}
# Extraire les relations de l'opération vers les pays
for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True):
if (op_source == assembly_id and
(ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)):
country_id = op_target
# Extraire part de marché
market_share = 0
if 'percentage' in op_edge_attrs:
market_share = op_edge_attrs['percentage']
elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']:
try:
market_share = float(op_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter le pays à l'opération
data["operations"][assembly_id]["countries"][country_id] = market_share
# Ajouter le pays s'il n'existe pas déjà
if country_id not in data["countries"]:
data["countries"][country_id] = {
"label": ref_graph.nodes[country_id].get('label', country_id),
"actors": {},
"geo_country": None,
"market_share": market_share
}
else:
data["countries"][country_id]["market_share"] = market_share
# Extraire les relations du pays vers les acteurs
for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True):
if (country_source == country_id and
(ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)):
actor_id = country_target
# Extraire part de marché
actor_market_share = 0
if 'percentage' in country_edge_attrs:
actor_market_share = country_edge_attrs['percentage']
elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']:
try:
actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter l'acteur au pays
data["countries"][country_id]["actors"][actor_id] = actor_market_share
# Ajouter l'acteur s'il n'existe pas déjà
if actor_id not in data["actors"]:
data["actors"][actor_id] = {
"label": ref_graph.nodes[actor_id].get('label', actor_id),
"country": country_id,
"market_share": actor_market_share
}
else:
data["actors"][actor_id]["market_share"] = actor_market_share
data["actors"][actor_id]["country"] = country_id
# Extraire la relation du pays vers le pays géographique
for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True):
if (geo_source == country_id and
ref_graph.nodes[geo_target].get('niveau') == 99):
geo_country_name = ref_graph.nodes[geo_target].get('label', '')
data["countries"][country_id]["geo_country"] = geo_country_name
break # Une seule opération d'assemblage par produit
# Pour les composants (N1)
for component_id, component_data in data["components"].items():
if component_data["manufacturing"] is None:
# Chercher l'opération de fabrication dans le graphe de référence
for source, target, edge_attrs in ref_graph.edges(data=True):
if (source == component_id and
((ref_graph.nodes[source].get('niveau') == 1 and
ref_graph.nodes[target].get('niveau') == 10) or
(ref_graph.nodes[source].get('niveau') == 1001 and
ref_graph.nodes[target].get('niveau') == 1010)) and
ref_graph.nodes[target].get('label', '').lower() == 'fabrication'):
# L'opération existe dans le graphe de référence
manufacturing_id = target
component_data["manufacturing"] = manufacturing_id
# Ajouter l'opération si elle n'existe pas déjà
if manufacturing_id not in data["operations"]:
data["operations"][manufacturing_id] = {
"label": ref_graph.nodes[manufacturing_id].get('label', manufacturing_id),
"type": "fabrication",
"ihh_acteurs": ref_graph.nodes[manufacturing_id].get('ihh_acteurs', 0),
"ihh_pays": ref_graph.nodes[manufacturing_id].get('ihh_pays', 0),
"countries": {}
}
# Extraire les relations de l'opération vers les pays
for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True):
if (op_source == manufacturing_id and
(ref_graph.nodes[op_target].get('niveau') == 11 or ref_graph.nodes[op_target].get('niveau') == 1011)):
country_id = op_target
# Extraire part de marché
market_share = 0
if 'percentage' in op_edge_attrs:
market_share = op_edge_attrs['percentage']
elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']:
try:
market_share = float(op_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter le pays à l'opération
data["operations"][manufacturing_id]["countries"][country_id] = market_share
# Ajouter le pays s'il n'existe pas déjà
if country_id not in data["countries"]:
data["countries"][country_id] = {
"label": ref_graph.nodes[country_id].get('label', country_id),
"actors": {},
"geo_country": None,
"market_share": market_share
}
else:
data["countries"][country_id]["market_share"] = market_share
# Extraire les relations du pays vers les acteurs
for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True):
if (country_source == country_id and
(ref_graph.nodes[country_target].get('niveau') == 12 or ref_graph.nodes[country_target].get('niveau') == 1012)):
actor_id = country_target
# Extraire part de marché
actor_market_share = 0
if 'percentage' in country_edge_attrs:
actor_market_share = country_edge_attrs['percentage']
elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']:
try:
actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter l'acteur au pays
data["countries"][country_id]["actors"][actor_id] = actor_market_share
# Ajouter l'acteur s'il n'existe pas déjà
if actor_id not in data["actors"]:
data["actors"][actor_id] = {
"label": ref_graph.nodes[actor_id].get('label', actor_id),
"country": country_id,
"market_share": actor_market_share
}
else:
data["actors"][actor_id]["market_share"] = actor_market_share
data["actors"][actor_id]["country"] = country_id
# Extraire la relation du pays vers le pays géographique
for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True):
if (geo_source == country_id and
ref_graph.nodes[geo_target].get('niveau') == 99):
geo_country_name = ref_graph.nodes[geo_target].get('label', '')
data["countries"][country_id]["geo_country"] = geo_country_name
break # Une seule opération de fabrication par composant
return data
def calculate_vulnerabilities(data, config):
"""
Calcule les vulnérabilités combinées pour toutes les opérations et minerais.
"""
thresholds = config.get('thresholds', {})
results = {
"ihh_isg_combined": {}, # Pour chaque opération
"ics_ivc_combined": {}, # Pour chaque minerai
"chains": [] # Pour stocker tous les chemins possibles
}
# 1. Calculer ISG_combiné pour chaque opération
for op_id, operation in data["operations"].items():
isg_weighted_sum = 0
total_share = 0
# Parcourir chaque pays impliqué dans l'opération
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
isg_weighted_sum += isg_value * share
total_share += share
# Calculer la moyenne pondérée
isg_combined = 0
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
# Déterminer couleurs et poids
ihh_value = operation["ihh_pays"]
ihh_color, ihh_suffix = determine_threshold_color(ihh_value, "IHH", thresholds)
isg_color, isg_suffix = determine_threshold_color(isg_combined, "ISG", thresholds)
# Calculer poids combiné
ihh_weight = get_weight_for_color(ihh_color)
isg_weight = get_weight_for_color(isg_color)
combined_weight = ihh_weight * isg_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ihh_isg_combined"][op_id] = {
"ihh_value": ihh_value,
"ihh_color": ihh_color,
"ihh_suffix": ihh_suffix,
"isg_combined": isg_combined,
"isg_color": isg_color,
"isg_suffix": isg_suffix,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 2. Calculer ICS_moyen pour chaque minerai
for mineral_id, mineral in data["minerals"].items():
ics_values = list(mineral["ics_values"].values())
ics_average = 0
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
ivc_value = mineral.get("ivc", 0)
# Déterminer couleurs et poids
ics_color, ics_suffix = determine_threshold_color(ics_average, "ICS", thresholds)
ivc_color, ivc_suffix = determine_threshold_color(ivc_value, "IVC", thresholds)
# Calculer poids combiné
ics_weight = get_weight_for_color(ics_color)
ivc_weight = get_weight_for_color(ivc_color)
combined_weight = ics_weight * ivc_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ics_ivc_combined"][mineral_id] = {
"ics_average": ics_average,
"ics_color": ics_color,
"ics_suffix": ics_suffix,
"ivc_value": ivc_value,
"ivc_color": ivc_color,
"ivc_suffix": ivc_suffix,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 3. Identifier tous les chemins et leurs vulnérabilités
for product_id, product in data["products"].items():
for component_id in product["components"]:
component = data["components"][component_id]
for mineral_id in component["minerals"]:
mineral = data["minerals"][mineral_id]
# Collecter toutes les vulnérabilités dans ce chemin
path_vulnerabilities = []
# Assemblage (si présent)
assembly_id = product["assembly"]
if assembly_id and assembly_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "assemblage",
"vulnerability": results["ihh_isg_combined"][assembly_id]["vulnerability"],
"operation_id": assembly_id
})
# Fabrication (si présent)
manufacturing_id = component["manufacturing"]
if manufacturing_id and manufacturing_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "fabrication",
"vulnerability": results["ihh_isg_combined"][manufacturing_id]["vulnerability"],
"operation_id": manufacturing_id
})
# Minerai (ICS+IVC)
if mineral_id in results["ics_ivc_combined"]:
path_vulnerabilities.append({
"type": "minerai",
"vulnerability": results["ics_ivc_combined"][mineral_id]["vulnerability"],
"mineral_id": mineral_id
})
# Extraction (si présent)
extraction_id = mineral["extraction"]
if extraction_id and extraction_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "extraction",
"vulnerability": results["ihh_isg_combined"][extraction_id]["vulnerability"],
"operation_id": extraction_id
})
# Traitement (si présent)
treatment_id = mineral["treatment"]
if treatment_id and treatment_id in results["ihh_isg_combined"]:
path_vulnerabilities.append({
"type": "traitement",
"vulnerability": results["ihh_isg_combined"][treatment_id]["vulnerability"],
"operation_id": treatment_id
})
# Classifier le chemin
path_info = {
"product": product_id,
"component": component_id,
"mineral": mineral_id,
"vulnerabilities": path_vulnerabilities
}
# Déterminer le niveau de risque du chemin
critical_count = path_vulnerabilities.count({"vulnerability": "ÉLEVÉE à CRITIQUE"})
medium_count = path_vulnerabilities.count({"vulnerability": "MOYENNE"})
if any(v["vulnerability"] == "ÉLEVÉE à CRITIQUE" for v in path_vulnerabilities):
path_info["risk_level"] = "critique"
elif medium_count >= 3:
path_info["risk_level"] = "majeur"
elif any(v["vulnerability"] == "MOYENNE" for v in path_vulnerabilities):
path_info["risk_level"] = "moyen"
else:
path_info["risk_level"] = "faible"
results["chains"].append(path_info)
return results
def generate_introduction_section(data):
"""
Génère la section d'introduction du rapport.
"""
products = [p["label"] for p in data["products"].values()]
components = [c["label"] for c in data["components"].values()]
minerals = [m["label"] for m in data["minerals"].values()]
template = []
template.append("## Introduction\n")
template.append("Ce rapport analyse les vulnérabilités de la chaîne de fabrication du numérique pour :\n")
template.append("* les produits finaux : " + ", ".join(products))
template.append("* les composants : " + ", ".join(components))
template.append("* les minerais : " + ", ".join(minerals) + "\n")
return "\n".join(template)
def generate_methodology_section():
"""
Génère la section méthodologie du rapport.
"""
template = []
template.append("## Méthodologie d'analyse des risques\n")
template.append("### Synthèse de la méthodologie\n")
template.append("""
Le dispositif dévaluation des risques proposé repose sur quatre indices clairement définis, chacun analysant un aspect
spécifique des risques dans la chaîne dapprovisionnement numérique. Lindice IHH mesure la concentration géographique ou
industrielle, permettant dévaluer la dépendance vis-à-vis de certains acteurs ou régions. Lindice ISG indique la
stabilité géopolitique des pays impliqués dans la chaîne de production, en intégrant des critères politiques, sociaux
et climatiques. Lindice ICS quantifie la facilité ou la difficulté à remplacer ou substituer un élément spécifique dans
la chaîne, évaluant ainsi les risques liés à la dépendance technologique et économique. Enfin, lindice IVC examine la
pression concurrentielle sur les ressources utilisées par le numérique, révélant ainsi le risque potentiel que ces
ressources soient détournées vers dautres secteurs industriels.
Ces indices se combinent judicieusement par paires pour une évaluation approfondie et pertinente des risques. La
combinaison IHH-ISG permet dassocier la gravité d'un impact potentiel (IHH) à la probabilité de survenance dun
événement perturbateur (ISG), créant ainsi une matrice de vulnérabilité combinée utile pour identifier rapidement les
points critiques dans la chaîne de production. La combinaison ICS-IVC fonctionne selon la même logique, mais se concentre
spécifiquement sur les ressources minérales : lICS indique la gravité potentielle d'une rupture d'approvisionnement due
à une faible substituabilité, tandis que lIVC évalue la probabilité que les ressources soient captées par d'autres
secteurs industriels concurrents. Ces combinaisons permettent dobtenir une analyse précise et opérationnelle du niveau
de risque global.
Les avantages de cette méthodologie résident dans son approche à la fois systématique et granulaire. Elle permet didentifier
avec précision les vulnérabilités majeures et leurs origines spécifiques, facilitant ainsi la prise de décision stratégique
éclairée et proactive. En combinant des facteurs géopolitiques, industriels, technologiques et concurrentiels, ces indices
offrent un suivi efficace de la chaîne de fabrication numérique, garantissant ainsi une gestion optimale des risques et la
continuité opérationnelle à long terme.
""")
template.append("### Indices et seuils\n")
template.append("La méthode d'évaluation intègre 4 indices et leurs combinaisons pour identifier les chemins critiques.\n")
# IHH
template.append("#### IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'une opération\n")
# Essayer d'abord avec le chemin exact
ihh_context_file = "Criticités/Fiche technique IHH/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, ihh_context_file)):
template.append(read_corpus_file(ihh_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ihh_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique IHH")
if ihh_context_file:
template.append(read_corpus_file(ihh_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
ihh_calc_file = "Criticités/Fiche technique IHH/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, ihh_calc_file)):
template.append(read_corpus_file(ihh_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ihh_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique IHH")
if ihh_calc_file:
template.append(read_corpus_file(ihh_calc_file, remove_first_title=True))
template.append(" * Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)\n")
# ISG
template.append("#### ISG (Stabilité Géopolitique) : stabilité des pays\n")
# Essayer d'abord avec le chemin exact
isg_context_file = "Criticités/Fiche technique ISG/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, isg_context_file)):
template.append(read_corpus_file(isg_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
isg_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique ISG")
if isg_context_file:
template.append(read_corpus_file(isg_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
isg_calc_file = "Criticités/Fiche technique ISG/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, isg_calc_file)):
template.append(read_corpus_file(isg_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
isg_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique ISG")
if isg_calc_file:
template.append(read_corpus_file(isg_calc_file, remove_first_title=True))
template.append(" * Seuils : <40 = Vert (Stable), 40-60 = Orange, >60 = Rouge (Instable)\n")
# ICS
template.append("#### ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément\n")
# Essayer d'abord avec le chemin exact
ics_context_file = "Criticités/Fiche technique ICS/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, ics_context_file)):
template.append(read_corpus_file(ics_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ics_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique ICS")
if ics_context_file:
template.append(read_corpus_file(ics_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
ics_calc_file = "Criticités/Fiche technique ICS/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, ics_calc_file)):
template.append(read_corpus_file(ics_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ics_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique ICS")
if ics_calc_file:
template.append(read_corpus_file(ics_calc_file, remove_first_title=True))
template.append(" * Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)\n")
# IVC
template.append("#### IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs\n")
# Essayer d'abord avec le chemin exact
ivc_context_file = "Criticités/Fiche technique IVC/00-contexte-et-objectif.md"
if os.path.exists(os.path.join(CORPUS_DIR, ivc_context_file)):
template.append(read_corpus_file(ivc_context_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ivc_context_file = find_corpus_file("contexte-et-objectif", "Criticités/Fiche technique IVC")
if ivc_context_file:
template.append(read_corpus_file(ivc_context_file, remove_first_title=True))
# Essayer d'abord avec le chemin exact
ivc_calc_file = "Criticités/Fiche technique IVC/01-mode-de-calcul/_intro.md"
if os.path.exists(os.path.join(CORPUS_DIR, ivc_calc_file)):
template.append(read_corpus_file(ivc_calc_file, remove_first_title=True))
else:
# Fallback à la recherche par motif
ivc_calc_file = find_corpus_file("mode-de-calcul/_intro", "Criticités/Fiche technique IVC")
if ivc_calc_file:
template.append(read_corpus_file(ivc_calc_file, remove_first_title=True))
template.append(" * Seuils : <5 = Vert (Faible), 5-15 = Orange (Modérée), >15 = Rouge (Forte)\n")
# Combinaison des indices
template.append("### Combinaison des indices\n")
# IHH et ISG
template.append("**IHH et ISG**\n")
template.append("Ces deux indices s'appliquent à toutes les opérations et se combinent dans l'évaluation du risque (niveau d'impact et probabilité de survenance) :\n")
template.append("* l'IHH donne le niveau d'impact => une forte concentration implique un fort impact si le risque est avéré")
template.append("* l'ISG donne la probabilité de survenance => plus les pays sont instables (et donc plus l'ISG est élevé) et plus la survenance du risque est élevée\n")
template.append("Pour évaluer le risque pour une opération, les ISG des pays sont pondérés par les parts de marché respectives pour donner un ISG combiné dont le calcul est :")
template.append("ISG_combiné = (Somme des ISG des pays multipliée par leur part de marché) / Sommes de leur part de marché\n")
template.append("On établit alors une matrice (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ISG combiné et de l'IHH\n")
template.append("| ISG combiné / IHH | Vert | Orange | Rouge |")
template.append("| :-- | :-- | :-- | :-- |")
template.append("| Vert | 1 | 2 | 3 |")
template.append("| Orange | 2 | 4 | 6 |")
template.append("| Rouge | 3 | 6 | 9 |\n")
template.append("Les vulnérabilités se classent en trois niveaux pour chaque opération :\n")
template.append("* Vulnérabilité combinée élevée à critique : poids 6 et 9")
template.append("* Vulnérabilité combinée moyenne : poids 3 et 4")
template.append("* Vulnérabilité combinée faible : poids 1 et 2\n")
# ICS et IVC
template.append("**ICS et IVC**\n")
template.append("Ces deux indices se combinent dans l'évaluation du risque pour un minerai :\n")
template.append("* l'ICS donne le niveau d'impact => une faible substituabilité (et donc un ICS élevé) implique un fort impact si le risque est avéré ; l'ICS est associé à la relation entre un composant et un minerai")
template.append("* l'IVC donne la probabilité de l'impact => une forte concurrence intersectorielle (IVC élevé) implique une plus forte probabilité de survenance\n")
template.append("Par simplification, on intègre un ICS moyen d'un minerai comme étant la moyenne des ICS pour chacun des composants dans lesquels il intervient.\n")
template.append("On établit alors une matrice (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ICS moyen et de l'IVC.\n")
template.append("| ICS_moyen / IVC | Vert | Orange | Rouge |")
template.append("| :-- | :-- | :-- | :-- |")
template.append("| Vert | 1 | 2 | 3 |")
template.append("| Orange | 2 | 4 | 6 |")
template.append("| Rouge | 3 | 6 | 9 |\n")
template.append("Les vulnérabilités se classent en trois niveaux pour chaque minerai :\n")
template.append("* Vulnérabilité combinée élevée à critique : poids 6 et 9")
template.append("* Vulnérabilité combinée moyenne : poids 3 et 4")
template.append("* Vulnérabilité combinée faible : poids 1 et 2\n")
return "\n".join(template)
def composant_match(nom_composant, nom_dossier):
"""
Vérifie si le nom du composant correspond approximativement à un nom de dossier (lettres et chiffres dans le même ordre).
"""
def clean(s):
return ''.join(c.lower() for c in s if c.isalnum())
cleaned_comp = clean(nom_composant)
cleaned_dir = clean(nom_dossier)
# Vérifie que chaque caractère de cleaned_comp est présent dans cleaned_dir dans le bon ordre
it = iter(cleaned_dir)
return all(c in it for c in cleaned_comp)
def trouver_dossier_composant(nom_composant, base_path, prefixe):
"""
Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant.
"""
search_path = os.path.join(CORPUS_DIR, base_path)
print(nom_composant)
print(base_path)
print(search_path)
if not os.path.exists(search_path):
return None
for d in os.listdir(search_path):
print(d)
if os.path.isdir(os.path.join(search_path, d)):
if composant_match(f"{prefixe}{nom_composant}", d):
return os.path.join(base_path, d)
return None
def generate_operations_section(data, results, config):
"""
Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement).
"""
# # print("DEBUG: Génération de la section des opérations")
# # print(f"DEBUG: Nombre de produits: {len(data['products'])}")
# # print(f"DEBUG: Nombre de composants: {len(data['components'])}")
# # print(f"DEBUG: Nombre d'opérations: {len(data['operations'])}")
template = []
template.append("## Détails des opérations\n")
# 1. Traiter les produits finaux (assemblage)
for product_id, product in data["products"].items():
# # print(f"DEBUG: Produit {product_id} ({product['label']}), assembly = {product['assembly']}")
if product["assembly"]:
template.append(f"### {product['label']} et Assemblage\n")
# Récupérer la présentation synthétique
# product_slug = product['label'].lower().replace(' ', '-')
sous_repertoire = f"{product['label']}"
print(product)
if product["level"] == 0:
type = "Assemblage"
else:
type = "Connexe"
sous_repertoire = trouver_dossier_composant(sous_repertoire, type, "Fiche assemblage ")
product_slug = sous_repertoire.split(' ', 2)[2]
presentation_file = find_corpus_file("présentation-synthétique", f"{type}/Fiche assemblage {product_slug}")
if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True))
template.append("")
# Récupérer les principaux assembleurs
assembleurs_file = find_corpus_file("principaux-assembleurs", f"{type}/Fiche assemblage {product_slug}")
if assembleurs_file:
template.append(read_corpus_file(assembleurs_file, shift_titles=2))
template.append("")
# ISG des pays impliqués
assembly_id = product["assembly"]
operation = data["operations"][assembly_id]
template.append("##### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**")
# IHH
ihh_file = find_corpus_file("matrice-des-risques-liés-à-l-assemblage/indice-de-herfindahl-hirschmann", f"{type}/Fiche assemblage {product_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if assembly_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][assembly_id]
template.append("#### Vulnérabilité combinée IHH-ISG\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# 2. Traiter les composants (fabrication)
for component_id, component in data["components"].items():
# # print(f"DEBUG: Composant {component_id} ({component['label']}), manufacturing = {component['manufacturing']}")
if component["manufacturing"]:
template.append(f"### {component['label']} et Fabrication\n")
# Récupérer la présentation synthétique
# component_slug = component['label'].lower().replace(' ', '-')
sous_repertoire = f"{component['label']}"
sous_repertoire = trouver_dossier_composant(sous_repertoire, "Fabrication", "Fiche fabrication ")
component_slug = sous_repertoire.split(' ', 2)[2]
presentation_file = find_corpus_file("présentation-synthétique", f"Fabrication/Fiche fabrication {component_slug}")
if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True))
template.append("\n")
# Récupérer les principaux fabricants
fabricants_file = find_corpus_file("principaux-fabricants", f"Fabrication/Fiche fabrication {component_slug}")
if fabricants_file:
template.append(read_corpus_file(fabricants_file, shift_titles=2))
template.append("\n")
# ISG des pays impliqués
manufacturing_id = component["manufacturing"]
operation = data["operations"][manufacturing_id]
template.append("#### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n")
# IHH
ihh_file = find_corpus_file("matrice-des-risques-liés-à-la-fabrication/indice-de-herfindahl-hirschmann", f"Fabrication/Fiche fabrication {component_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if manufacturing_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][manufacturing_id]
template.append("#### Vulnérabilité combinée IHH-ISG\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# 3. Traiter les minerais (détaillés dans une section séparée)
result = "\n".join(template)
# # print(f"DEBUG: Fin de génération de la section des opérations. Taille: {len(result)} caractères")
if len(result) <= 30: # Juste le titre de section
# # print("DEBUG: ALERTE - La section des opérations est vide ou presque vide!")
# Ajout d'une section de débogage dans le rapport
template.append("### DÉBOGAGE - Opérations manquantes\n")
template.append("Aucune opération d'assemblage ou de fabrication n'a été trouvée dans les données.\n")
template.append("Informations disponibles:\n")
template.append(f"* Nombre de produits: {len(data['products'])}\n")
template.append(f"* Nombre de composants: {len(data['components'])}\n")
template.append(f"* Nombre d'opérations: {len(data['operations'])}\n")
template.append("\nDétail des produits et de leurs opérations d'assemblage:\n")
for pid, p in data["products"].items():
template.append(f"* {p['label']}: {'Assemblage: ' + str(p['assembly']) if p['assembly'] else 'Pas d\'assemblage'}\n")
template.append("\nDétail des composants et de leurs opérations de fabrication:\n")
for cid, c in data["components"].items():
template.append(f"* {c['label']}: {'Fabrication: ' + str(c['manufacturing']) if c['manufacturing'] else 'Pas de fabrication'}\n")
result = "\n".join(template)
return result
def generate_minerals_section(data, results, config):
"""
Génère la section détaillant les minerais et leurs opérations d'extraction et traitement.
"""
template = []
template.append("## Détails des minerais\n")
for mineral_id, mineral in data["minerals"].items():
mineral_slug = mineral['label'].lower().replace(' ', '-')
fiche_dir = f"{CORPUS_DIR}/Minerai/Fiche minerai {mineral_slug}"
if not os.path.exists(fiche_dir):
continue
template.append(f"---\n\n### {mineral['label']}\n")
# Récupérer la présentation synthétique
presentation_file = find_corpus_file("présentation-synthétique", f"Minerai/Fiche minerai {mineral_slug}")
if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True))
template.append("\n")
# ICS
template.append("#### ICS\n")
ics_intro_file = find_corpus_file("risque-de-substituabilité/_intro", f"Minerai/Fiche minerai {mineral_slug}")
if ics_intro_file:
template.append(read_corpus_file(ics_intro_file, remove_first_title=True))
template.append("\n")
# Calcul de l'ICS moyen
ics_values = list(mineral["ics_values"].values())
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
color, suffix = determine_threshold_color(ics_average, "ICS", config.get('thresholds'))
template.append("##### Valeurs d'ICS par composant\n")
template.append("| Composant | ICS | Criticité |")
template.append("| :-- | :-- | :-- |")
for comp_id, ics_value in mineral["ics_values"].items():
comp_name = data["components"][comp_id]["label"]
comp_color, comp_suffix = determine_threshold_color(ics_value, "ICS", config.get('thresholds'))
template.append(f"| {comp_name} | {ics_value:.2f} | {comp_color} ({comp_suffix}) |")
template.append(f"\n**ICS moyen : {ics_average:.2f} - {color} ({suffix})**\n")
# IVC
template.append("#### IVC\n\n")
# Valeur IVC
ivc_value = mineral.get("ivc", 0)
color, suffix = determine_threshold_color(ivc_value, "IVC", config.get('thresholds'))
template.append(f"**IVC: {ivc_value} - {color} ({suffix})**\n")
# Récupérer toutes les sections de vulnérabilité de concurrence
ivc_sections = []
ivc_dir = find_prefixed_directory("vulnérabilité-de-concurrence", f"Minerai/Fiche minerai {mineral_slug}")
corpus_path = os.path.join(CORPUS_DIR, ivc_dir) if os.path.exists(os.path.join(CORPUS_DIR, ivc_dir)) else None
if corpus_path:
for file in sorted(os.listdir(corpus_path)):
if file.endswith('.md') and "_intro.md" not in file and "sources" not in file:
ivc_sections.append(os.path.join(ivc_dir, file))
# Inclure chaque section IVC
for section_file in ivc_sections:
content = read_corpus_file(section_file, remove_first_title=False)
# Nettoyer les balises des fichiers IVC
content = re.sub(r'```.*?```', '', content, flags=re.DOTALL)
# Mettre le titre en italique s'il commence par un # (format Markdown pour titre)
if content and '\n' in content:
first_line, rest = content.split('\n', 1)
if first_line.strip().startswith('#'):
# Extraire le texte du titre sans les # et les espaces
title_text = first_line.strip().lstrip('#').strip()
content = f"\n*{title_text}*\n{rest.strip()}"
# Ne pas ajouter de contenu vide
if content.strip():
template.append(content.strip())
# ICS et IVC combinés
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f"\n#### Vulnérabilité combinée ICS-IVC\n")
template.append(f"* ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']} ({combined['ics_suffix']})")
template.append(f"* IVC: {combined['ivc_value']} - {combined['ivc_color']} ({combined['ivc_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# Extraction
if mineral["extraction"]:
template.append("#### Extraction\n")
# Récupérer les principaux producteurs
producers_file = find_corpus_file("principaux-producteurs-extraction", f"Minerai/Fiche minerai {mineral_slug}")
if producers_file:
template.append(read_corpus_file(producers_file, remove_first_title=True))
template.append("\n")
# ISG des pays impliqués
extraction_id = mineral["extraction"]
operation = data["operations"][extraction_id]
template.append("##### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n")
# IHH extraction
ihh_file = find_corpus_file("matrice-des-risques/indice-de-herfindahl-hirschmann-extraction", f"Minerai/Fiche minerai {mineral_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if extraction_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][extraction_id]
template.append(f"##### Vulnérabilité combinée IHH-ISG pour l'extraction\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
# Traitement
if mineral["treatment"]:
template.append("#### Traitement\n")
# Récupérer les principaux producteurs
producers_file = find_corpus_file("principaux-producteurs-traitement", f"Minerai/Fiche minerai {mineral_slug}")
if producers_file:
template.append(read_corpus_file(producers_file, remove_first_title=True))
template.append("\n")
# ISG des pays impliqués
treatment_id = mineral["treatment"]
operation = data["operations"][treatment_id]
template.append("##### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0
total_share = 0
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country = country.get("geo_country")
if geo_country and geo_country in data["geo_countries"]:
isg_value = data["geo_countries"][geo_country]["isg"]
color, suffix = determine_threshold_color(isg_value, "ISG", config.get('thresholds'))
template.append(f"| {country['label']} | {share}% | {isg_value} | {color} ({suffix}) |")
isg_weighted_sum += isg_value * share
total_share += share
# Calculer ISG combiné
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
color, suffix = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n")
# IHH traitement
ihh_file = find_corpus_file("matrice-des-risques/indice-de-herfindahl-hirschmann-traitement", f"Minerai/Fiche minerai {mineral_slug}")
if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n")
# Vulnérabilité combinée
if treatment_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][treatment_id]
template.append("##### Vulnérabilité combinée IHH-ISG pour le traitement\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}")
template.append(f"* Niveau de vulnérabilité: **{combined['vulnerability']}**\n")
return "\n".join(template)
def generate_critical_paths_section(data, results):
"""
Génère la section des chemins critiques.
"""
template = []
template.append("## Chemins critiques\n")
# Récupérer les chaînes par niveau de risque
critical_chains = []
major_chains = []
medium_chains = []
for chain in results["chains"]:
if chain["risk_level"] == "critique":
critical_chains.append(chain)
elif chain["risk_level"] == "majeur":
major_chains.append(chain)
elif chain["risk_level"] == "moyen":
medium_chains.append(chain)
# 1. Chaînes critiques
template.append("### Chaînes avec risque critique\n")
template.append("*Ces chaînes comprennent au moins une vulnérabilité combinée élevée à critique*\n")
if critical_chains:
for chain in critical_chains:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"#### {product_name}{component_name}{mineral_name}\n")
# Vulnérabilités
template.append("**Vulnérabilités identifiées:**\n")
for vuln in chain["vulnerabilities"]:
vuln_type = vuln["type"].capitalize()
vuln_level = vuln["vulnerability"]
if vuln_type == "Minerai":
mineral_id = vuln["mineral_id"]
template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}")
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}")
template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}")
else:
op_id = vuln["operation_id"]
op_label = data["operations"][op_id]["label"]
template.append(f"* {vuln_type} ({op_label}): {vuln_level}")
if op_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][op_id]
template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}")
template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}")
template.append("\n")
else:
template.append("Aucune chaîne à risque critique identifiée.\n")
# 2. Chaînes majeures
template.append("### Chaînes avec risque majeur\n")
template.append("*Ces chaînes comprennent au moins trois vulnérabilités combinées moyennes*\n")
if major_chains:
for chain in major_chains:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"#### {product_name}{component_name}{mineral_name}\n")
# Vulnérabilités
template.append("**Vulnérabilités identifiées:**\n")
for vuln in chain["vulnerabilities"]:
vuln_type = vuln["type"].capitalize()
vuln_level = vuln["vulnerability"]
if vuln_type == "Minerai":
mineral_id = vuln["mineral_id"]
template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}\n")
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}\n")
template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}\n")
else:
op_id = vuln["operation_id"]
op_label = data["operations"][op_id]["label"]
template.append(f"* {vuln_type} ({op_label}): {vuln_level}\n")
if op_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][op_id]
template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}\n")
template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}\n")
template.append("\n")
else:
template.append("Aucune chaîne à risque majeur identifiée.\n")
# 3. Chaînes moyennes
template.append("### Chaînes avec risque moyen\n")
template.append("*Ces chaînes comprennent au moins une vulnérabilité combinée moyenne*\n")
if medium_chains:
for chain in medium_chains:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"#### {product_name}{component_name}{mineral_name}\n")
# Vulnérabilités
template.append("**Vulnérabilités identifiées:**\n")
for vuln in chain["vulnerabilities"]:
vuln_type = vuln["type"].capitalize()
vuln_level = vuln["vulnerability"]
if vuln_type == "Minerai":
mineral_id = vuln["mineral_id"]
template.append(f"* {vuln_type} ({mineral_name}): {vuln_level}\n")
if mineral_id in results["ics_ivc_combined"]:
combined = results["ics_ivc_combined"][mineral_id]
template.append(f" * ICS moyen: {combined['ics_average']:.2f} - {combined['ics_color']}\n")
template.append(f" * IVC: {combined['ivc_value']} - {combined['ivc_color']}\n")
else:
op_id = vuln["operation_id"]
op_label = data["operations"][op_id]["label"]
template.append(f"* {vuln_type} ({op_label}): {vuln_level}\n")
if op_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][op_id]
template.append(f" * IHH: {combined['ihh_value']} - {combined['ihh_color']}\n")
template.append(f" * ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']}\n")
template.append("\n")
else:
template.append("Aucune chaîne à risque moyen identifiée.\n")
return "\n".join(template)
def generate_report(data, results, config):
"""
Génère le rapport complet structuré selon les spécifications.
"""
# Titre principal
report_titre = ["# Évaluation des vulnérabilités critiques\n"]
# Section d'introduction
report_introduction = generate_introduction_section(data)
# report.append(generate_introduction_section(data))
# Section méthodologie
report_methodologie = generate_methodology_section()
# report.append(generate_methodology_section())
# Section détails des opérations
report_operations = generate_operations_section(data, results, config)
# report.append(generate_operations_section(data, results, config))
# Section détails des minerais
report_minerals = generate_minerals_section(data, results, config)
# report.append(generate_minerals_section(data, results, config))
# Section chemins critiques
report_critical_paths = generate_critical_paths_section(data, results)
# report.append(generate_critical_paths_section(data, results))
# Ordre de composition final
report = (
report_titre +
[report_introduction] +
[report_critical_paths] +
[report_operations] +
[report_minerals] +
[report_methodologie]
)
return "\n".join(report)
def write_report(report, config):
"""Écrit le rapport généré dans le fichier spécifié."""
with open(TEMPLATE_PATH, 'w', encoding='utf-8') as f:
f.write(report)
# print(f"Rapport généré avec succès: {TEMPLATE_PATH}")
def main():
"""Fonction principale du script."""
# Charger la configuration
config = load_config(CONFIG_PATH)
# Analyser les graphes
graph, ref_graph = parse_graphs(config)
# Extraire les données
data = extract_data_from_graph(graph, ref_graph)
# Calculer les vulnérabilités
results = calculate_vulnerabilities(data, config)
# Générer le rapport
report = generate_report(data, results, config)
report = re.sub(r'<!----.*?-->', '', report)
report = re.sub(r'\n\n\n+', '\n\n', report)
# Écrire le rapport
write_report(report, config)
if __name__ == "__main__":
main()