Code/IA/generate_template.py
2025-05-19 13:38:30 +02:00

1268 lines
60 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de génération de template de rapport IHH/ISG.
Ce script lit un fichier DOT, identifie les éléments Orange ou Rouge,
et génère un template avec les chemins d'accès aux sections à récupérer.
"""
import os
import re
import glob
import yaml
import argparse
import unicodedata
import networkx as nx
from pathlib import Path
from networkx.drawing.nx_agraph import read_dot
# Chemins de base
BASE_DIR = Path(__file__).resolve().parent
BASE_DIR = BASE_DIR / ".."
CORPUS_DIR = BASE_DIR / "Corpus"
GRAPH_PATH = BASE_DIR / "graphe.dot"
CONFIG_PATH = BASE_DIR / "assets/config.yaml"
OUTPUT_PATH = CORPUS_DIR / "rapport_template.md"
# Chemins pour les indices et documents de référence
# Chemins utilisés pour les références vers les fiches techniques
IHH_ROOT_PATH = 'Criticités/Fiche technique IHH'
ISG_ROOT_PATH = 'Criticités/Fiche technique ISG'
IVC_ROOT_PATH = 'Criticités/Fiche technique IVC'
ICS_ROOT_PATH = 'Criticités/Fiche technique ICS'
# Utiliser simplement le chemin relatif pour accéder aux fichiers
CRIT_PATH = 'Criticités'
def load_config():
"""Charge les seuils depuis le fichier de configuration"""
try:
with open(CONFIG_PATH, 'r') as file:
config = yaml.safe_load(file)
return config.get('seuils', {})
except Exception as e:
print(f"Erreur lors du chargement de la configuration: {e}")
# Valeurs par défaut si le fichier est inaccessible
return {
'ISG': {
'vert': {'max': 40},
'orange': {'min': 40, 'max': 70},
'rouge': {'min': 70}
},
'IVC': {
'vert': {'max': 5},
'orange': {'min': 5, 'max': 15},
'rouge': {'min': 15}
},
'IHH': {
'vert': {'max': 15},
'orange': {'min': 15, 'max': 25},
'rouge': {'min': 25}
},
'ICS': {
'vert': {'max': 0.30},
'orange': {'min': 0.30, 'max': 0.60},
'rouge': {'min': 0.60}
}
}
def determine_threshold_color(value, index_type):
"""Détermine la couleur du seuil en fonction de la valeur et du type d'indice"""
seuils = load_config().get(index_type, {})
if not seuils:
return None
# Vérifier d'abord le seuil rouge (priorité la plus haute)
if 'rouge' in seuils and 'min' in seuils['rouge'] and value >= seuils['rouge']['min']:
return 'rouge'
# Ensuite le seuil orange
if 'orange' in seuils and 'min' in seuils['orange'] and 'max' in seuils['orange']:
if value >= seuils['orange']['min'] and value < seuils['orange']['max']:
return 'orange'
# Par défaut, c'est vert
return 'vert'
# Définition des types de ressources et leurs opérations associées
RESOURCE_TYPES = {
"produit": {"folder": "Assemblage", "operation": "assemblage"},
"composant": {"folder": "Fabrication", "operation": "fabrication"},
"minerai": {"folder": "Minerai", "operations": ["extraction", "traitement"]}
}
def parse_graph(dot_path, filter_level='orange'):
"""Analyse le fichier DOT et extrait les éléments avec criticité Orange ou Rouge.
Args:
dot_path: Chemin vers le fichier graphe.dot
filter_level: Niveau de seuil à utiliser ('orange' ou 'rouge')
"""
G = read_dot(dot_path)
elements = []
# Charger les seuils depuis la configuration
seuils = load_config()
# Définir les seuils pour IHH, ISG, et IVC (Orange et Rouge)
ihh_seuil_orange = seuils.get('IHH', {}).get('orange', {}).get('min', 15)
ihh_seuil_rouge = seuils.get('IHH', {}).get('rouge', {}).get('min', 25)
isg_seuil_orange = seuils.get('ISG', {}).get('orange', {}).get('min', 40)
isg_seuil_rouge = seuils.get('ISG', {}).get('rouge', {}).get('min', 70)
ivc_seuil_orange = seuils.get('IVC', {}).get('orange', {}).get('min', 5)
ivc_seuil_rouge = seuils.get('IVC', {}).get('rouge', {}).get('min', 15)
# Sélectionner les seuils en fonction du niveau de filtrage
if filter_level == 'rouge':
ihh_seuil = ihh_seuil_rouge
isg_seuil = isg_seuil_rouge
ivc_seuil = ivc_seuil_rouge
else: # 'orange' par défaut
ihh_seuil = ihh_seuil_orange
isg_seuil = isg_seuil_orange
ivc_seuil = ivc_seuil_orange
# Traiter les nœuds
for node, attrs in G.nodes(data=True):
if '_' not in node:
continue
op, res = node.split('_', 1)
# Récupérer les valeurs IHH si disponibles
ihh_pays = attrs.get('ihh_pays', 0)
ihh_acteurs = attrs.get('ihh_acteurs', 0)
try:
ihh_pays = float(ihh_pays)
ihh_acteurs = float(ihh_acteurs)
except (ValueError, TypeError):
ihh_pays = 0
ihh_acteurs = 0
# Récupérer la valeur ISG si disponible
isg = attrs.get('isg', 0)
try:
isg = float(isg)
except (ValueError, TypeError):
isg = 0
# Récupérer la valeur IVC si disponible
ivc = attrs.get('ivc', 0)
try:
ivc = float(ivc)
except (ValueError, TypeError):
ivc = 0
# Déterminer si l'élément est critique selon le seuil configuré
ihh_critique = ihh_pays > ihh_seuil or ihh_acteurs > ihh_seuil
isg_critique = isg > isg_seuil
ivc_critique = ivc > ivc_seuil
if ihh_critique or isg_critique or ivc_critique:
elements.append({
'operation': op,
'resource': res,
'ihh_pays': ihh_pays,
'ihh_acteurs': ihh_acteurs,
'isg': isg,
'ivc': ivc,
'ihh_critique': ihh_critique,
'isg_critique': isg_critique,
'ivc_critique': ivc_critique,
'node_id': node
})
# Traiter les arêtes pour détecter les ICS critiques
for u, v, attrs in G.edges(data=True):
if 'ics' in attrs:
ics_value = attrs.get('ics', 0)
try:
ics_value = float(ics_value)
except (ValueError, TypeError):
ics_value = 0
ics_seuil_orange = seuils.get('ICS', {}).get('orange', {}).get('min', 0.30)
ics_seuil_rouge = seuils.get('ICS', {}).get('rouge', {}).get('min', 0.60)
# Sélectionner le seuil ICS en fonction du niveau de filtrage
ics_seuil = ics_seuil_rouge if filter_level == 'rouge' else ics_seuil_orange
ics_critique = ics_value >= ics_seuil
if ics_critique:
# Ajouter l'ICS critique au nœud de destination (minerai)
# Trouver d'abord si le nœud de destination existe déjà dans elements
dest_node = v
dest_element = next((e for e in elements if e['node_id'] == dest_node), None)
if dest_element:
# Mettre à jour l'élément existant
dest_element['ics_critique'] = True
dest_element['ics_value'] = ics_value
else:
# Si le nœud destination n'est pas déjà dans elements (pas critique pour d'autres raisons)
# Créer une entrée s'il s'agit d'un format operation_resource
if '_' in dest_node:
op, res = dest_node.split('_', 1)
elements.append({
'operation': op,
'resource': res,
'node_id': dest_node,
'ics_critique': True,
'ics_value': ics_value
})
return elements
def analyze_geopolitical_stability(dot_path, filter_levels=None):
"""Analyse la stabilité géopolitique à partir du graphe.
Args:
dot_path: Chemin vers le fichier graphe.dot
filter_levels: Liste de niveaux de criticité à considérer ('orange', 'rouge')
Si None, prend les deux niveaux par défaut
"""
G = read_dot(dot_path)
# Si aucun niveau n'est spécifié, on considère orange et rouge
if filter_levels is None:
filter_levels = ['orange', 'rouge']
elif isinstance(filter_levels, str):
filter_levels = [filter_levels]
print(f"Analyse ISG du fichier {dot_path} avec filtres {', '.join(filter_levels)}")
# Charger les seuils depuis la configuration
seuils = load_config().get('ISG', {})
# Déterminer le seuil en fonction des niveaux demandés
isg_threshold = None
if 'orange' in filter_levels and 'rouge' not in filter_levels:
isg_threshold = seuils.get('orange', {}).get('min', 40)
print(f"Niveau orange: ISG >= {isg_threshold}")
elif 'rouge' in filter_levels and 'orange' not in filter_levels:
isg_threshold = seuils.get('rouge', {}).get('min', 70)
print(f"Niveau rouge: ISG >= {isg_threshold}")
else: # Les deux niveaux ou cas par défaut
isg_threshold = seuils.get('orange', {}).get('min', 40)
print(f"Niveaux orange et rouge: ISG >= {isg_threshold}")
print(f"Seuil ISG critique: {isg_threshold}")
results = {}
# Trouver d'abord tous les nœuds géographiques avec ISG
geo_isg = {}
for node, attrs in G.nodes(data=True):
if 'isg' in attrs:
pays_name = node.split('_')[0] if '_' in node else node
isg_value = float(attrs['isg'])
geo_isg[pays_name] = isg_value
print(f"Nœud géographique avec ISG trouvé: {node}, ISG: {isg_value}")
# Pour chaque nœud d'opération
operations_count = 0
for node, attrs in G.nodes(data=True):
if '_' in node and ('ihh_pays' in attrs or 'ihh_acteurs' in attrs):
operations_count += 1
parts = node.split('_')
op, res = parts[0], '_'.join(parts[1:]) if len(parts) > 1 else parts[0]
operation_key = f"{op}_{res}"
print(f"Analyse de l'opération: {operation_key}")
results[operation_key] = {
'pays_critiques': [],
'acteurs_critiques': []
}
# Chercher les pays liés directement à cette opération
for pays_node in G.successors(node):
# Vérifier que c'est un nœud de pays valide (contient "_" et son premier segment est dans geo_isg)
# et n'est pas un nœud géographique
pays_prefix = pays_node.split('_')[0] if '_' in pays_node else pays_node
if '_' in pays_node and 'geographique' not in pays_node and pays_prefix in geo_isg:
pays_name = pays_prefix
# Extraire le pourcentage de l'attribut label de l'arête
edge_label = G.edges[node, pays_node].get('label', '0%')
pays_part = int(edge_label.strip('%')) if edge_label.strip('%').isdigit() else 0
# Chercher le nœud géographique correspondant
geo_node = f"{pays_name}_geographique"
isg_value = geo_isg.get(pays_name, 0)
print(f" - Pays trouvé: {pays_name}, ISG: {isg_value}, Part: {pays_part}%, seuil: {isg_threshold}")
if isg_value >= isg_threshold:
results[operation_key]['pays_critiques'].append({
'nom': pays_name,
'isg': isg_value,
'part': pays_part
})
print(f" => Pays critique ajouté: {pays_name}, ISG: {isg_value}")
# Chercher les acteurs liés à ce pays
for acteur_node in G.successors(pays_node):
# Vérifier que c'est bien un nœud d'acteur (format: NomActeur_Pays_Operation)
if '_' in acteur_node:
acteur_parts = acteur_node.split("_")
if len(acteur_parts) >= 3 and acteur_parts[1] == pays_name:
acteur_name = acteur_parts[0]
# Ignorer si l'acteur a le même nom que le pays
if acteur_name.lower() != pays_name.lower():
edge_label = G.edges[pays_node, acteur_node].get('label', '0%')
acteur_part = int(edge_label.strip('%')) if edge_label.strip('%').isdigit() else 0
# Utiliser l'ISG du pays pour l'acteur
if isg_value >= isg_threshold:
results[operation_key]['acteurs_critiques'].append({
'nom': acteur_name,
'pays': pays_name,
'isg': isg_value,
'part': acteur_part
})
print(f" => Acteur critique ajouté: {acteur_name}, ISG: {isg_value}")
# Résumé de l'analyse
total_critical_countries = sum(len(data['pays_critiques']) for data in results.values())
total_critical_actors = sum(len(data['acteurs_critiques']) for data in results.values())
print(f"Analyse ISG terminée: {operations_count} opérations analysées")
print(f"Résultats: {total_critical_countries} pays critiques, {total_critical_actors} acteurs critiques")
return results
# Function removed as no longer needed with the new approach
def find_real_path(base_dir, resource_type, resource_name, file_pattern, G=None):
"""Recherche le chemin réel d'un fichier/dossier."""
# Obtenir le label du graphe si disponible
resource_label = resource_name
if G:
for node, attrs in G.nodes(data=True):
if node == resource_name and 'label' in attrs:
resource_label = attrs['label']
break
resource_name_lower = resource_name.lower()
resource_label_lower = resource_label.lower()
# Créer des variantes du label pour les cas spéciaux comme "SSD 2.5 pouces" -> "SSD 2.5"
label_variants = [resource_label, resource_label_lower]
# Ajouter une variante sans "pouces" ou autre suffixe
if " " in resource_label:
# Pour "SSD 2.5 pouces", ajouter "SSD 2.5"
# Pour "Carte mère", ajouter "Carte"
parts = resource_label.split(" ")
if len(parts) >= 2:
short_label = " ".join(parts[:2]) # Prendre les deux premiers mots
label_variants.append(short_label)
label_variants.append(short_label.lower())
# Version avec première lettre majuscule
capitalized_label = resource_label_lower.title()
label_variants.append(capitalized_label)
short_capitalized = short_label.title()
label_variants.append(short_capitalized)
# Cas spécial pour les principaux assembleurs/fabricants
is_principaux_pattern = "principaux" in file_pattern.lower()
if is_principaux_pattern:
# Rechercher les fichiers des principaux assembleurs/fabricants
if resource_type == "produit":
patterns = []
for variant in label_variants:
patterns.extend([
f"Assemblage/Fiche assemblage {variant}/??-principaux-assembleurs.md",
f"Assemblage/Fiche*{variant}*/??-principaux-assembleurs.md",
f"**/*{variant}*/*principaux*assembleurs*.md"
])
# Ajouter les patterns avec le nom du nœud aussi
patterns.extend([
f"Assemblage/Fiche assemblage {resource_name}/??-principaux-assembleurs.md",
f"Assemblage/Fiche assemblage {resource_name_lower}/??-principaux-assembleurs.md",
f"Assemblage/Fiche*{resource_name}*/??-principaux-assembleurs.md",
f"Assemblage/Fiche*{resource_name_lower}*/??-principaux-assembleurs.md",
f"**/*{resource_name}*/*principaux*assembleurs*.md",
f"**/*{resource_name_lower}*/*principaux*assembleurs*.md"
])
elif resource_type == "composant":
patterns = []
for variant in label_variants:
patterns.extend([
f"Fabrication/Fiche fabrication {variant}/??-principaux-fabricants.md",
f"Fabrication/Fiche*{variant}*/??-principaux-fabricants.md",
f"**/*{variant}*/*principaux*fabricants*.md"
])
# Ajouter les patterns avec le nom du nœud aussi
patterns.extend([
f"Fabrication/Fiche fabrication {resource_name}/??-principaux-fabricants.md",
f"Fabrication/Fiche fabrication {resource_name_lower}/??-principaux-fabricants.md",
f"Fabrication/Fiche*{resource_name}*/??-principaux-fabricants.md",
f"Fabrication/Fiche*{resource_name_lower}*/??-principaux-fabricants.md",
f"**/*{resource_name}*/*principaux*fabricants*.md",
f"**/*{resource_name_lower}*/*principaux*fabricants*.md"
])
elif resource_type == "minerai":
operation = file_pattern.split("-")[-1] if "-" in file_pattern else ""
patterns = []
for variant in label_variants:
# S'assurer qu'il n'y a pas de tiret à la fin du nom du minerai
clean_variant = variant.rstrip(" -")
patterns.extend([
f"Minerai/Fiche minerai {clean_variant}/??-principaux-producteurs-{operation}.md",
f"Minerai/Fiche {clean_variant}/??-principaux-producteurs-{operation}.md",
f"Minerai/Fiche*minerai*{clean_variant}*/??-principaux-producteurs*.md",
f"Minerai/Fiche*{clean_variant}*/??-principaux-producteurs*.md",
f"**/*minerai*{clean_variant}*/*principaux*producteurs*.md",
f"**/*{clean_variant}*/*principaux*producteurs*.md"
])
# Ajouter les patterns avec le nom du nœud aussi
clean_resource = resource_name.rstrip(" -")
clean_resource_lower = resource_name_lower.rstrip(" -")
patterns.extend([
f"Minerai/Fiche minerai {clean_resource}/??-principaux-producteurs-{operation}.md",
f"Minerai/Fiche {clean_resource}/??-principaux-producteurs-{operation}.md",
f"Minerai/Fiche minerai {clean_resource_lower}/??-principaux-producteurs-{operation}.md",
f"Minerai/Fiche {clean_resource_lower}/??-principaux-producteurs-{operation}.md",
f"Minerai/Fiche*minerai*{clean_resource}*/??-principaux-producteurs*.md",
f"Minerai/Fiche*{clean_resource}*/??-principaux-producteurs*.md",
f"Minerai/Fiche*minerai*{clean_resource_lower}*/??-principaux-producteurs*.md",
f"Minerai/Fiche*{clean_resource_lower}*/??-principaux-producteurs*.md",
f"**/*minerai*{clean_resource}*/*principaux*producteurs*.md",
f"**/*{clean_resource}*/*principaux*producteurs*.md",
f"**/*minerai*{clean_resource_lower}*/*principaux*producteurs*.md",
f"**/*{clean_resource_lower}*/*principaux*producteurs*.md"
])
else:
# Pour les IHH, privilégier la recherche dans le répertoire centralisé
if resource_type == "produit":
patterns = []
for variant in label_variants:
patterns.extend([
f"{IHH_ROOT_PATH}/*-assemblage-{variant}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{variant}/*-indice-de-herfindahl-hirschmann.md"
])
# Ajouter les patterns avec le nom du nœud aussi
patterns.extend([
f"{IHH_ROOT_PATH}/*-assemblage-{resource_name}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-assemblage-{resource_name_lower}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{resource_name}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{resource_name_lower}/*-indice-de-herfindahl-hirschmann.md"
])
elif resource_type == "composant":
patterns = []
for variant in label_variants:
patterns.extend([
f"{IHH_ROOT_PATH}/*-fabrication-{variant}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{variant}/*-indice-de-herfindahl-hirschmann.md"
])
# Ajouter les patterns avec le nom du nœud aussi
patterns.extend([
f"{IHH_ROOT_PATH}/*-fabrication-{resource_name}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-fabrication-{resource_name_lower}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{resource_name}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{resource_name_lower}/*-indice-de-herfindahl-hirschmann.md"
])
elif resource_type == "minerai":
operation = file_pattern.split("-")[-1] if "-" in file_pattern else ""
if operation:
patterns = []
for variant in label_variants:
patterns.extend([
# Nouvelle structure
f"{IHH_ROOT_PATH}/*-opérations-{variant}/*-indice-de-herfindahl-hirschmann-{operation}.md",
# Ancienne structure
f"{IHH_ROOT_PATH}/*-{operation}-{variant}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{variant}*/*-{operation}*.md"
])
# Ajouter les patterns avec le nom du nœud aussi
patterns.extend([
# Nouvelle structure
f"{IHH_ROOT_PATH}/*-opérations-{resource_name}/*-indice-de-herfindahl-hirschmann-{operation}.md",
f"{IHH_ROOT_PATH}/*-opérations-{resource_name_lower}/*-indice-de-herfindahl-hirschmann-{operation}.md",
# Ancienne structure
f"{IHH_ROOT_PATH}/*-{operation}-{resource_name}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{operation}-{resource_name_lower}/*-indice-de-herfindahl-hirschmann.md",
f"{IHH_ROOT_PATH}/*-{resource_name}*/*-{operation}*.md",
f"{IHH_ROOT_PATH}/*-{resource_name_lower}*/*-{operation}*.md"
])
else:
patterns = []
for variant in label_variants:
patterns.extend([
f"{IHH_ROOT_PATH}/*-opérations-{variant}/*.md",
f"{IHH_ROOT_PATH}/*-{variant}*/*.md"
])
# Ajouter les patterns avec le nom du nœud aussi
patterns.extend([
f"{IHH_ROOT_PATH}/*-opérations-{resource_name}/*.md",
f"{IHH_ROOT_PATH}/*-opérations-{resource_name_lower}/*.md",
f"{IHH_ROOT_PATH}/*-{resource_name}*/*.md",
f"{IHH_ROOT_PATH}/*-{resource_name_lower}*/*.md"
])
# Chercher dans les patterns
for pattern in patterns:
matches = glob.glob(os.path.join(base_dir, pattern), recursive=True)
if matches:
return os.path.relpath(matches[0], base_dir)
# Si aucun chemin réel n'est trouvé, construire un chemin générique dans la structure appropriée
# Utiliser le label pour le chemin par défaut, et choisir la meilleure variante
best_label = resource_label
# Si le label contient "pouces" ou d'autres termes descriptifs longs, utiliser une version plus courte
if " " in resource_label and len(resource_label.split(" ")) > 2:
parts = resource_label.split(" ")
best_label = " ".join(parts[:2]) # Prendre les deux premiers mots
if resource_type == "produit":
if is_principaux_pattern:
return f"Assemblage/Fiche assemblage {best_label}/02-principaux-assembleurs.md"
else:
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/10-assemblage-{best_label}/00-indice-de-herfindahl-hirschmann.md"
elif resource_type == "composant":
if is_principaux_pattern:
return f"Fabrication/Fiche fabrication {best_label}/02-principaux-fabricants.md"
else:
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/20-fabrication-{best_label}/00-indice-de-herfindahl-hirschmann.md"
elif resource_type == "minerai":
operation = file_pattern.split("-")[-1] if "-" in file_pattern else ""
if is_principaux_pattern:
# Choisir le numéro du préfixe en fonction de l'opération
prefix_num = "03" if operation == "extraction" else "05" if operation == "traitement" else "02"
# S'assurer que le nom du minerai est correctement formaté (sans tiret à la fin et en minuscules)
clean_label = best_label.rstrip(" -").lower()
return f"Minerai/Fiche minerai {clean_label}/{prefix_num}-principaux-producteurs-{operation}.md"
else:
# Numéros arbitraires mais dans le format correct
return f"{IHH_ROOT_PATH}/30-{operation}-{best_label}/00-indice-de-herfindahl-hirschmann.md"
return ""
def categorize_elements(elements):
"""Catégorise les éléments en produits finaux, composants et minerais."""
produits = []
composants = []
minerais = []
for element in elements:
resource = element['resource'].lower()
if resource in ['serveur', 'smartphone', 'ordinateur', 'véhicule électrique']:
produits.append(element)
elif resource in ['boitier', 'cartemere', 'ssd25', 'ssdm2', 'batterie', 'puce', 'cpu', 'mémoire', 'écran', 'disquedur', 'connecteurs', 'connectivite']:
composants.append(element)
else:
minerais.append(element)
# On regroupe d'abord les minerais par ressource
minerais_par_ressource = {}
for m in minerais:
resource = m['resource']
if resource not in minerais_par_ressource:
minerais_par_ressource[resource] = []
minerais_par_ressource[resource].append(m)
# Reconstruire la liste des minerais, groupés par ressource et triés alphabétiquement
minerais = []
for resource in sorted(minerais_par_ressource.keys(), key=str.lower):
minerais.extend(minerais_par_ressource[resource])
# Tri alphabétique des produits et composants
produits.sort(key=lambda x: x['resource'].lower())
composants.sort(key=lambda x: x['resource'].lower())
return produits, composants, minerais
def generate_template(elements, isg_data=None, G=None):
"""Génère le template de rapport avec les chemins d'accès aux sections."""
template = []
# Corps principal
template.append("# Évaluation des vulnérabilités critiques")
template.append("")
template.append("## Introduction")
template.append("Ce rapport vise à évaluer les vulnérabilités systémiques associées à la chaîne de valeur représentée dans le graphe fourni.")
template.append("")
# Éléments factuels
template.append("## Éléments factuels")
template.append("Cette section présente les données clés collectées automatiquement à partir du graphe et du corpus documentaire.")
template.append("")
# Catégoriser les éléments
produits, composants, minerais = categorize_elements(elements)
# Obtenir les labels des éléments à partir du graphe
resource_labels = {}
if G:
for node, attrs in G.nodes(data=True):
if 'label' in attrs:
resource_labels[node] = attrs['label']
# Obtenir les labels des éléments à partir du graphe
resource_labels = {}
if G:
for node, attrs in G.nodes(data=True):
if 'label' in attrs:
resource_labels[node] = attrs['label']
# Traiter les produits finaux
for produit in produits:
resource = produit['resource']
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"### {display_name}")
template.append("")
if produit['ihh_critique']:
template.append(f"#### Assemblage {display_name}")
template.append("")
# Déterminer la couleur du seuil IHH
ihh_pays_color = determine_threshold_color(produit['ihh_pays'], 'IHH')
ihh_acteurs_color = determine_threshold_color(produit['ihh_acteurs'], 'IHH')
ihh_pays_color_text = f" - seuil {ihh_pays_color.capitalize()}" if ihh_pays_color else ""
ihh_acteurs_color_text = f" - seuil {ihh_acteurs_color.capitalize()}" if ihh_acteurs_color else ""
template.append("##### Concentration")
template.append(f"- IHH pays : {produit['ihh_pays']}{ihh_pays_color_text}")
template.append(f"- IHH acteurs : {produit['ihh_acteurs']}{ihh_acteurs_color_text}")
template.append("- Principaux pays producteurs : ...")
template.append("- Principaux acteurs : ...")
template.append("")
# Vérifier si nous avons des données ISG pour cette opération
if isg_data:
operation_key = f"{produit['operation']}_{produit['resource']}"
print(f"Vérification ISG pour produit: {operation_key}")
if operation_key in isg_data:
print(f" - Trouvé dans isg_data: pays={len(isg_data[operation_key]['pays_critiques'])}, acteurs={len(isg_data[operation_key]['acteurs_critiques'])}")
if operation_key in isg_data and (isg_data[operation_key]['pays_critiques'] or isg_data[operation_key]['acteurs_critiques']):
template.append("#### Stabilité Géopolitique")
print(f" => Section ISG ajoutée pour {operation_key}")
if isg_data[operation_key]['pays_critiques']:
template.append("**Pays à risque critique** :")
for pays in isg_data[operation_key]['pays_critiques']:
isg_color = determine_threshold_color(pays['isg'], 'ISG')
isg_color_text = f" - seuil {isg_color.capitalize()}" if isg_color else ""
template.append(f"- {pays['nom']} (ISG: {pays['isg']}{isg_color_text}) - Production de {pays['part']}% des ressources")
template.append("")
if isg_data[operation_key]['acteurs_critiques']:
template.append("**Acteurs à risque critique** :")
for acteur in isg_data[operation_key]['acteurs_critiques']:
isg_color = determine_threshold_color(acteur['isg'], 'ISG')
isg_color_text = f" - seuil {isg_color.capitalize()}" if isg_color else ""
template.append(f"- {acteur['nom']} ({acteur['pays']}, ISG: {acteur['isg']}{isg_color_text}) - Contrôle de {acteur['part']}% du marché")
template.append("")
elif produit['isg_critique']:
template.append("#### Stabilité Géopolitique")
template.append("- Analyse des risques géopolitiques non disponible")
template.append("")
# Traiter les composants
for composant in composants:
resource = composant['resource']
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"### {display_name}")
template.append("")
if composant['ihh_critique']:
template.append(f"#### Fabrication {display_name}")
template.append("")
# Déterminer la couleur du seuil IHH
ihh_pays_color = determine_threshold_color(composant['ihh_pays'], 'IHH')
ihh_acteurs_color = determine_threshold_color(composant['ihh_acteurs'], 'IHH')
ihh_pays_color_text = f" - seuil {ihh_pays_color.capitalize()}" if ihh_pays_color else ""
ihh_acteurs_color_text = f" - seuil {ihh_acteurs_color.capitalize()}" if ihh_acteurs_color else ""
template.append("##### Concentration")
template.append(f"- IHH pays : {composant['ihh_pays']}{ihh_pays_color_text}")
template.append(f"- IHH acteurs : {composant['ihh_acteurs']}{ihh_acteurs_color_text}")
template.append("- Principaux pays producteurs : ...")
template.append("- Principaux fabricants : ...")
template.append("")
# Vérifier si nous avons des données ISG pour cette opération
if isg_data:
operation_key = f"{composant['operation']}_{composant['resource']}"
print(f"Vérification ISG pour composant: {operation_key}")
if operation_key in isg_data:
print(f" - Trouvé dans isg_data: pays={len(isg_data[operation_key]['pays_critiques'])}, acteurs={len(isg_data[operation_key]['acteurs_critiques'])}")
if operation_key in isg_data and (isg_data[operation_key]['pays_critiques'] or isg_data[operation_key]['acteurs_critiques']):
template.append("#### Stabilité Géopolitique")
print(f" => Section ISG ajoutée pour {operation_key}")
if isg_data[operation_key]['pays_critiques']:
template.append("**Pays à risque critique** :")
for pays in isg_data[operation_key]['pays_critiques']:
isg_color = determine_threshold_color(pays['isg'], 'ISG')
isg_color_text = f" - seuil {isg_color.capitalize()}" if isg_color else ""
template.append(f"- {pays['nom']} (ISG: {pays['isg']}{isg_color_text}) - Production de {pays['part']}% des ressources")
template.append("")
if isg_data[operation_key]['acteurs_critiques']:
template.append("**Acteurs à risque critique** :")
for acteur in isg_data[operation_key]['acteurs_critiques']:
isg_color = determine_threshold_color(acteur['isg'], 'ISG')
isg_color_text = f" - seuil {isg_color.capitalize()}" if isg_color else ""
template.append(f"- {acteur['nom']} ({acteur['pays']}, ISG: {acteur['isg']}{isg_color_text}) - Contrôle de {acteur['part']}% du marché")
template.append("")
elif composant['isg_critique']:
template.append("#### Stabilité Géopolitique")
template.append("- Analyse des risques géopolitiques non disponible")
template.append("")
# Regrouper les minerais par nom de ressource pour le tri alphabétique
minerais_par_ressource = {}
for minerai in minerais:
resource = minerai['resource']
if resource not in minerais_par_ressource:
minerais_par_ressource[resource] = []
minerais_par_ressource[resource].append(minerai)
# Traiter les ressources dans l'ordre alphabétique
for resource in sorted(minerais_par_ressource.keys(), key=str.lower):
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"### {display_name}")
template.append("")
# Pour chaque ressource, traiter ses opérations
minerais_resource = minerais_par_ressource[resource]
# Identifier les types d'opérations disponibles pour cette ressource
operations_disponibles = {'Extraction': None, 'Traitement': None}
for m in minerais_resource:
if m['operation'] in operations_disponibles:
operations_disponibles[m['operation']] = m
# Traiter les opérations dans un ordre spécifique (Extraction puis Traitement)
operations_ordre = ['Extraction', 'Traitement']
for operation in operations_ordre:
m = operations_disponibles.get(operation)
if m and m['ihh_critique']:
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"#### {operation} {display_name}")
template.append("")
# Déterminer la couleur du seuil IHH
ihh_pays_color = determine_threshold_color(m['ihh_pays'], 'IHH')
ihh_acteurs_color = determine_threshold_color(m['ihh_acteurs'], 'IHH')
ihh_pays_color_text = f" - seuil {ihh_pays_color.capitalize()}" if ihh_pays_color else ""
ihh_acteurs_color_text = f" - seuil {ihh_acteurs_color.capitalize()}" if ihh_acteurs_color else ""
template.append("##### Concentration")
template.append(f"- IHH pays : {m['ihh_pays']}{ihh_pays_color_text}")
template.append(f"- IHH acteurs : {m['ihh_acteurs']}{ihh_acteurs_color_text}")
template.append("- Principaux pays producteurs : ...")
template.append("- Principaux acteurs : ...")
template.append("")
# Vérifier si nous avons des données ISG pour cette opération
if isg_data:
operation_key = f"{m['operation']}_{resource}"
print(f"Vérification ISG pour minerai: {operation_key}")
if operation_key in isg_data:
print(f" - Trouvé dans isg_data: pays={len(isg_data[operation_key]['pays_critiques'])}, acteurs={len(isg_data[operation_key]['acteurs_critiques'])}")
if operation_key in isg_data and (isg_data[operation_key]['pays_critiques'] or isg_data[operation_key]['acteurs_critiques']):
template.append("#### Stabilité Géopolitique")
print(f" => Section ISG ajoutée pour {operation_key}")
if isg_data[operation_key]['pays_critiques']:
template.append("**Pays à risque critique** :")
for pays in isg_data[operation_key]['pays_critiques']:
isg_color = determine_threshold_color(pays['isg'], 'ISG')
isg_color_text = f" - seuil {isg_color.capitalize()}" if isg_color else ""
template.append(f"- {pays['nom']} (ISG: {pays['isg']}{isg_color_text}) - Production de {pays['part']}% des ressources")
template.append("")
if isg_data[operation_key]['acteurs_critiques']:
template.append("**Acteurs à risque critique** :")
for acteur in isg_data[operation_key]['acteurs_critiques']:
isg_color = determine_threshold_color(acteur['isg'], 'ISG')
isg_color_text = f" - seuil {isg_color.capitalize()}" if isg_color else ""
template.append(f"- {acteur['nom']} ({acteur['pays']}, ISG: {acteur['isg']}{isg_color_text}) - Contrôle de {acteur['part']}% du marché")
template.append("")
elif m['isg_critique']:
template.append("#### Stabilité Géopolitique")
template.append("- Analyse des risques géopolitiques non disponible")
template.append("")
if minerai['ihh_critique']:
# Déterminer la couleur du seuil IHH
ihh_color = determine_threshold_color(minerai['ihh_pays'], 'IHH')
ihh_color_text = f" - seuil {ihh_color.capitalize()}" if ihh_color else ""
template.append("#### Substituabilité")
# Ajouter la section ICS pour les minerais
ics_path = find_ics_path(resource)
if ics_path:
template.append(f"Corpus/{ics_path}")
else:
template.append("[résumé]")
template.append("")
template.append("#### Concurrence")
# Rechercher l'IVC pour ce minerai
ivc_info = find_ivc_path(resource)
if ivc_info:
template.append(f"IVC: {ivc_info['ivc_value']} - seuil {ivc_info['threshold'].capitalize()}")
template.append("")
# Ajouter le lien vers les secteurs concurrents
secteurs_path = f"{ivc_info['path']}/01-secteurs-concurrents.md"
if os.path.exists(os.path.join(CORPUS_DIR, secteurs_path)):
template.append(f"Corpus/{secteurs_path}")
else:
template.append("[Information sur les secteurs concurrents non disponible]")
else:
template.append("[résumé]")
template.append("")
# Annexes
template.append("## Annexes")
template.append("Contenus documentaires complets :")
template.append("")
# Indices documentaires
try:
has_ihh = any(element['ihh_critique'] for element in elements)
except:
has_ihh = False
try:
has_isg = any(element['isg_critique'] for element in elements)
except:
has_isg = False
try:
has_ivc = any(element['ivc_critique'] for element in elements)
except:
has_ivc = False
try:
has_ics = any(element['ics_critique'] for element in elements)
except:
has_ics = False
if has_ihh:
template.append("### Indice Herfindahl-Hirschmann (IHH)")
template.append("")
template.append("#### Contexte et objectif")
template.append(f"Corpus/{IHH_ROOT_PATH}/00-contexte-et-objectif.md")
template.append("")
template.append("#### Mode de calcul")
template.append(f"Corpus/{IHH_ROOT_PATH}/01-mode-de-calcul/_intro.md")
template.append("")
template.append("#### Seuils")
template.append(f"Corpus/{IHH_ROOT_PATH}/01-mode-de-calcul/00-seuils-d-interprétation.md")
template.append("")
if has_isg:
template.append("### Indice de Stabilité Géopolitique (ISG)")
template.append("")
template.append("#### Contexte et objectif")
template.append(f"Corpus/{ISG_ROOT_PATH}/00-contexte-et-objectif.md")
template.append("")
template.append("#### Mode de calcul")
template.append(f"Corpus/{ISG_ROOT_PATH}/01-mode-de-calcul/_intro.md")
template.append("")
template.append("#### Seuils")
template.append(f"Corpus/{ISG_ROOT_PATH}/01-mode-de-calcul/04-seuils-d-interprétation.md")
template.append("")
if has_ivc:
template.append("### Indice de Vulnérabilité Concurrentielle (IVC)")
template.append("")
template.append("#### Contexte et objectif")
template.append(f"Corpus/{IVC_ROOT_PATH}/00-contexte-et-objectif.md")
template.append("")
template.append("#### Mode de calcul")
template.append(f"Corpus/{IVC_ROOT_PATH}/01-mode-de-calcul/_intro.md")
template.append("")
template.append("#### Paramètres")
template.append(f"Corpus/{IVC_ROOT_PATH}/01-mode-de-calcul/00-paramètres.md")
template.append("")
template.append("#### Seuils")
template.append(f"Corpus/{IVC_ROOT_PATH}/01-mode-de-calcul/01-seuils-d-interprétation.md")
template.append("")
if has_ics:
template.append("### Indice de Criticité de Substituabilité (ICS)")
template.append("")
template.append("#### Contexte et objectif")
template.append(f"Corpus/{ICS_ROOT_PATH}/00-contexte-et-objectif.md")
template.append("")
template.append("#### Mode de calcul")
template.append(f"Corpus/{ICS_ROOT_PATH}/01-mode-de-calcul/_intro.md")
template.append("")
template.append("#### Paramètres")
template.append(f"Corpus/{ICS_ROOT_PATH}/01-mode-de-calcul/00-paramètres.md")
template.append("")
template.append("#### Seuils")
template.append(f"Corpus/{ICS_ROOT_PATH}/01-mode-de-calcul/01-seuils-d-interprétation.md")
template.append("")
# Détails des éléments critiques
for produit in produits:
resource = produit['resource']
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"### {display_name}")
template.append("")
if produit['ihh_critique']:
template.append("#### Indice de Herfindahl-Hirschmann")
ihh_path = find_real_path(CORPUS_DIR, "produit", resource, "indice-de-herfindahl-hirschmann", G)
template.append(f"Corpus/{ihh_path}")
template.append("")
template.append("#### Principaux assembleurs")
assembleurs_path = find_real_path(CORPUS_DIR, "produit", resource, "principaux-assembleurs", G)
template.append(f"Corpus/{assembleurs_path}")
template.append("")
for composant in composants:
resource = composant['resource']
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"### {display_name}")
template.append("")
if composant['ihh_critique']:
template.append("#### Indice de Herfindahl-Hirschmann")
ihh_path = find_real_path(CORPUS_DIR, "composant", resource, "indice-de-herfindahl-hirschmann", G)
template.append(f"Corpus/{ihh_path}")
template.append("")
template.append("#### Principaux fabricants")
fabricants_path = find_real_path(CORPUS_DIR, "composant", resource, "principaux-fabricants", G)
template.append(f"Corpus/{fabricants_path}")
template.append("")
# Détails des minerais - on traite Extraction puis Traitement
# Créer un dictionnaire pour regrouper les entrées par ressource
minerais_par_ressource = {}
for minerai in minerais:
resource = minerai['resource']
if resource not in minerais_par_ressource:
minerais_par_ressource[resource] = []
minerais_par_ressource[resource].append(minerai)
# Parcourir les ressources dans l'ordre alphabétique
for resource in sorted(minerais_par_ressource.keys(), key=str.lower):
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"### {display_name}")
template.append("")
# Identifier les types d'opérations disponibles pour cette ressource
operations_disponibles = {'Extraction': None, 'Traitement': None}
for m in minerais_par_ressource[resource]:
if m['operation'] in operations_disponibles:
operations_disponibles[m['operation']] = m
# Traiter les opérations dans un ordre spécifique (Extraction puis Traitement)
operations_ordre = ['Extraction', 'Traitement']
for operation in operations_ordre:
m = operations_disponibles.get(operation)
if m and m['ihh_critique']:
# Utiliser le label du graphe si disponible
display_name = resource_labels.get(resource, resource)
template.append(f"#### Indice de Herfindahl-Hirschmann - {operation}")
ihh_path = find_real_path(CORPUS_DIR, "minerai", resource, f"indice-de-herfindahl-{operation.lower()}", G)
if ihh_path:
template.append(f"Corpus/{ihh_path}")
else:
# Chercher d'abord avec la nouvelle structure
new_pattern = f"{IHH_ROOT_PATH}/*-opérations-{resource.lower()}/*-indice-de-herfindahl-hirschmann-{operation.lower()}.md"
matches = glob.glob(os.path.join(CORPUS_DIR, new_pattern))
if matches:
template.append(f"Corpus/{os.path.relpath(matches[0], CORPUS_DIR)}")
else:
# Utiliser l'ancienne structure
template.append(f"Corpus/{IHH_ROOT_PATH}/30-{operation.lower()}-{resource}/00-indice-de-herfindahl-hirschmann.md")
template.append("")
template.append(f"##### Principaux producteurs - {operation}")
producteurs_path = find_real_path(CORPUS_DIR, "minerai", resource, f"principaux-producteurs-{operation.lower()}", G)
if producteurs_path:
template.append(f"Corpus/{producteurs_path}")
else:
template.append(f"Corpus/Minerai/Fiche {resource}/02-principaux-producteurs-{operation.lower()}.md")
template.append("")
# Ajouter la section IVC pour les minerais
ivc_info = find_ivc_path(resource)
if ivc_info:
template.append("#### Indice de Vulnérabilité de Concurrence (IVC)")
# Ajouter les liens vers les différents fichiers de la fiche IVC
ivc_sections = [
"03-répartition-des-usages.md",
"04-tendance.md",
"05-procédés-alternatifs.md",
"06-réserves.md"
]
for section in ivc_sections:
section_path = f"{ivc_info['path']}/{section}"
if os.path.exists(os.path.join(CORPUS_DIR, section_path)):
template.append(f"Corpus/{section_path}")
template.append("")
# Ajouter la section ICS pour les minerais
if G:
ics_components = find_ics_component_paths(resource, G)
if ics_components:
template.append("#### Indice de Criticité de Substituabilité (ICS)")
template.append("")
for component_name, info in ics_components.items():
template.append(f"##### {info['label']}")
template.append("")
for file_path in info['files']:
template.append(f"Corpus/{file_path}")
template.append("")
return "\n".join(template)
def find_ivc_path(resource):
"""Trouve le chemin de la fiche IVC pour un minerai donné.
Args:
resource: Le nom du minerai (par exemple 'Cuivre', 'Antimoine')
Returns:
Un dictionnaire contenant les informations IVC ou None si non trouvé
"""
# Normaliser le nom de la ressource en minuscules
resource_lower = resource.lower()
# Chercher dans le répertoire Criticités avec caractères accentués
ivc_base_dir = "Criticités"
ivc_dir = CORPUS_DIR / ivc_base_dir / "Fiche technique IVC"
if not os.path.exists(ivc_dir):
print(f"Répertoire IVC introuvable: {ivc_dir}")
return None
# Utiliser glob pattern pour chercher les dossiers IVC contenant le nom du minerai
# Format attendu dans les dossiers: "XX-minerai-ivc-VALEUR-vulnérabilité-SEUIL"
search_pattern = os.path.join(ivc_dir, f"*{resource_lower}*ivc*")
matching_dirs = []
# Vérifier d'abord avec le nom exact du minerai
for item_path in glob.glob(search_pattern, recursive=False):
if os.path.isdir(item_path):
item = os.path.basename(item_path)
matching_dirs.append((item, item_path))
# Si aucune correspondance exacte, essayer avec une recherche plus générale
if not matching_dirs:
for item_path in glob.glob(os.path.join(ivc_dir, "*ivc*"), recursive=False):
if os.path.isdir(item_path):
item = os.path.basename(item_path)
if resource_lower in item.lower():
matching_dirs.append((item, item_path))
# Traiter les correspondances trouvées
for item, item_path in matching_dirs:
try:
# Format attendu: "XX-minerai-ivc-VALEUR-vulnérabilité-SEUIL"
parts = item.split('-')
ivc_index = parts.index('ivc')
ivc_value = int(parts[ivc_index + 1])
# Déterminer le seuil
threshold = determine_threshold_color(ivc_value, 'IVC')
relative_path = os.path.join(ivc_base_dir, "Fiche technique IVC", item)
return {
'path': relative_path,
'ivc_value': ivc_value,
'threshold': threshold
}
except (ValueError, IndexError) as e:
print(f"Erreur lors de l'analyse du chemin IVC {item}: {e}")
continue
print(f"Aucune fiche IVC trouvée pour {resource}")
return None
def write_template(template, output_path):
"""Écrit le template dans un fichier."""
with open(output_path, 'w', encoding='utf-8') as f:
f.write(template)
print(f"✅ Template généré : {output_path}")
def find_ics_path(resource):
"""Trouve le chemin de la fiche ICS pour un minerai donné."""
resource_lower = resource.lower()
# Utiliser le chemin relatif pour éviter les problèmes d'encodage
ics_dir = os.path.join(CORPUS_DIR, CRIT_PATH, "Fiche technique ICS")
if not os.path.exists(ics_dir):
print(f"Répertoire ICS introuvable: {ics_dir}")
return None
# Rechercher un fichier qui contient le nom du minerai
for item in os.listdir(ics_dir):
item_path = os.path.join(ics_dir, item)
# Recherche à la fois par nom exact et par inclusion dans le nom du fichier
if os.path.isfile(item_path) and (resource_lower in item.lower() or
item.lower().endswith(f"-{resource_lower}.md")):
return os.path.join(ICS_ROOT_PATH, item)
print(f"Aucune fiche ICS trouvée pour {resource}")
return None
def find_ics_component_paths(resource, G):
"""Trouve les sous-répertoires ICS pour les relations entre composants et un minerai."""
resource_lower = resource.lower()
ics_dir = os.path.join(CORPUS_DIR, CRIT_PATH, "Fiche technique ICS")
component_info = {}
if not os.path.exists(ics_dir):
print(f"Répertoire ICS introuvable: {ics_dir}")
return component_info
# Rechercher des répertoires correspondant au format attendu
for item in os.listdir(ics_dir):
item_path = os.path.join(ics_dir, item)
if os.path.isdir(item_path):
# Format attendu: XX-composant-minerai-coefficient-Y-YY
parts = item.lower().split('-')
if len(parts) >= 4 and resource_lower in parts:
# Trouver le composant en cherchant dans les parties avant 'coefficient'
component_idx = -1
for i, part in enumerate(parts):
if part == "coefficient":
component_idx = i - 2 # Le composant est 2 segments avant "coefficient"
break
if component_idx >= 0 and component_idx < len(parts):
component_name = parts[component_idx]
# Rechercher le label du composant dans le graphe et vérifier qu'il existe
label, exists_in_graph = get_component_label(G, component_name)
# Ne continuer que si le composant existe dans le graphe
if exists_in_graph:
# Préparer la liste des fichiers
files = []
# D'abord vérifier si _intro.md existe et l'ajouter en premier
intro_file = os.path.join(item_path, "_intro.md")
if os.path.exists(intro_file):
files.append(os.path.join(ICS_ROOT_PATH, item, "_intro.md"))
# Ensuite ajouter les fichiers importants dans l'ordre
important_files = ['faisabilité-technique', 'délai-d-implémentation', 'impact-coût']
for prefix in important_files:
for file in sorted(os.listdir(item_path)):
if file.endswith('.md') and prefix in file.lower():
files.append(os.path.join(ICS_ROOT_PATH, item, file))
# Ne pas ajouter les fichiers de sources ou autres non nécessaires
excluded_files = ["sources.md", "03-sources.md"]
for file in sorted(os.listdir(item_path)):
file_path = os.path.join(ICS_ROOT_PATH, item, file)
if (file.endswith('.md') and file_path not in files and
not any(excluded in file.lower() for excluded in excluded_files) and
file != "_intro.md"): # Ne pas ajouter _intro.md à nouveau
files.append(file_path)
component_info[component_name] = {
'label': label,
'files': files
}
return component_info
def get_component_label(G, component_name):
"""Récupère le label d'un composant à partir du graphe.
Retourne:
Un tuple (label, exists_in_graph) où:
- label est le nom formaté du composant
- exists_in_graph est un booléen indiquant si le composant existe dans le graphe
"""
# D'abord, essayer de trouver une correspondance exacte
for node, attrs in G.nodes(data=True):
if node.lower() == component_name.lower():
if 'label' in attrs:
return attrs['label'], True
else:
return component_name.capitalize(), True
# Si pas de correspondance exacte, chercher un nœud qui contient le nom du composant
for node, attrs in G.nodes(data=True):
if component_name.lower() in node.lower():
if 'label' in attrs:
return attrs['label'], True
else:
return component_name.capitalize(), True
# Si toujours pas trouvé, le composant n'existe pas dans le graphe
return component_name.capitalize(), False
def main():
"""Fonction principale."""
# Analyser les arguments de la ligne de commande
parser = argparse.ArgumentParser(description='Génère un rapport de vulnérabilités')
parser.add_argument('--filter', choices=['all', 'orange', 'rouge'], default='all',
help='Niveau de criticité pour l\'ISG: "all" (orange+rouge), "orange" ou "rouge"')
args = parser.parse_args()
# Déterminer les niveaux de filtrage
filter_levels = None
if args.filter == 'orange':
filter_levels = ['orange']
elif args.filter == 'rouge':
filter_levels = ['rouge']
else: # 'all'
filter_levels = ['orange', 'rouge']
print("=== Début de la génération du template ===")
elements = parse_graph(GRAPH_PATH, filter_level=args.filter)
print(f"Éléments extraits du graphe: {len(elements)}")
# Analyser la stabilité géopolitique
print("\n=== Analyse de la stabilité géopolitique ===")
isg_data = analyze_geopolitical_stability(GRAPH_PATH, filter_levels=filter_levels)
# Vérifier les données ISG
print("\n=== Résumé des données ISG ===")
found_isg_data = False
for op_key, data in isg_data.items():
if data['pays_critiques'] or data['acteurs_critiques']:
found_isg_data = True
print(f"Opération {op_key}:")
if data['pays_critiques']:
print(f" - {len(data['pays_critiques'])} pays critiques")
for pays in data['pays_critiques']:
print(f" * {pays['nom']} (ISG: {pays['isg']}, part: {pays['part']}%)")
if data['acteurs_critiques']:
print(f" - {len(data['acteurs_critiques'])} acteurs critiques")
for acteur in data['acteurs_critiques']:
print(f" * {acteur['nom']} ({acteur['pays']}, ISG: {acteur['isg']}, part: {acteur['part']}%)")
if not found_isg_data:
print("Aucune donnée ISG critique trouvée.")
# Charger le graphe pour obtenir les labels des composants
G = read_dot(GRAPH_PATH)
# Générer le template avec les données ISG et le graphe
print("\n=== Génération du template ===")
template = generate_template(elements, isg_data, G)
write_template(template, OUTPUT_PATH)
print("=== Fin de la génération du template ===")
if __name__ == "__main__":
main()