Mise à jour schema, et ajustements en conséquence

This commit is contained in:
Fabrication du Numérique 2025-05-23 13:35:27 +02:00
parent c5482c3033
commit ec00ec3a9b
7 changed files with 39639 additions and 10462 deletions

View File

@ -68,7 +68,8 @@ def process_markdown_file(md_path, rel_output_dir):
f_out.write("\n".join(sec_lines).strip()) f_out.write("\n".join(sec_lines).strip())
def build_corpus_structure(): def build_corpus_structure():
BASE_DIR = Path(__file__).resolve().parent BASE_DIR = Path(__file__).resolve().parent.parent.parent
print(BASE_DIR)
SOURCE_DIR = BASE_DIR / "Fiches" SOURCE_DIR = BASE_DIR / "Fiches"
DEST_DIR = BASE_DIR / "Corpus" DEST_DIR = BASE_DIR / "Corpus"

View File

@ -3,9 +3,9 @@ date: 2025-05-06
seuils: seuils:
IVC: # Indice de vulnérabilité concurrentielle IVC: # Indice de vulnérabilité concurrentielle
vert: { max: 5 } vert: { max: 15 }
orange: { min: 5, max: 15 } orange: { min: 15, max: 60 }
rouge: { min: 15 } rouge: { min: 60 }
IHH: # Index Herfindahl-Hirschman IHH: # Index Herfindahl-Hirschman
vert: { max: 15 } vert: { max: 15 }

17387
schema.txt

File diff suppressed because it is too large Load Diff

View File

@ -12,12 +12,25 @@ import yaml
from networkx.drawing.nx_agraph import read_dot from networkx.drawing.nx_agraph import read_dot
from pathlib import Path from pathlib import Path
# Chemins de base
BASE_DIR = Path(__file__).resolve().parent BASE_DIR = Path(__file__).resolve().parent
CORPUS_DIR = BASE_DIR.parent / "Corpus" CORPUS_DIR = BASE_DIR.parent / "Corpus"
CONFIG_PATH = BASE_DIR / "config.yml" CONFIG_PATH = BASE_DIR / "config.yml"
THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml" THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml"
REFERENCE_GRAPH_PATH = BASE_DIR.parent / "schema.txt" REFERENCE_GRAPH_PATH = BASE_DIR.parent / "schema.txt"
GRAPH_PATH = BASE_DIR.parent / "graphe.dot"
TEMPLATE_PATH = BASE_DIR / "rapport_final.md"
DICTIONNAIRE_CRITICITES = {
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
"ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"},
"ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"},
"IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"}
}
POIDS_COULEURS = {
"Vert": 1,
"Orange": 2,
"Rouge": 3
}
def load_config(config_path, thresholds_path=THRESHOLDS_PATH): def load_config(config_path, thresholds_path=THRESHOLDS_PATH):
"""Charge la configuration depuis les fichiers YAML.""" """Charge la configuration depuis les fichiers YAML."""
@ -45,38 +58,17 @@ def load_config(config_path, thresholds_path=THRESHOLDS_PATH):
with open(thresholds_path, 'r', encoding='utf-8') as f: with open(thresholds_path, 'r', encoding='utf-8') as f:
thresholds = yaml.safe_load(f) thresholds = yaml.safe_load(f)
config['thresholds'] = thresholds.get('seuils', {}) config['thresholds'] = thresholds.get('seuils', {})
else:
print(f"Fichier de seuils introuvable: {thresholds_path}")
# Valeurs par défaut si le fichier n'existe pas
config['thresholds'] = {
"IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}},
"ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 60}, "rouge": {"min": 60}},
"ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}},
"IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}}
}
return config return config
def determine_threshold_color(value, index_type, thresholds=None): def determine_threshold_color(value, index_type, thresholds):
""" """
Détermine la couleur du seuil en fonction du type d'indice et de sa valeur. Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
Utilise les seuils de config.yaml si disponibles. Utilise les seuils de config.yaml si disponibles.
""" """
# Valeurs par défaut si les seuils ne sont pas fournis
default_thresholds = {
"IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}},
"ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 60}, "rouge": {"min": 60}},
"ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}},
"IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}}
}
# Utiliser les seuils fournis ou les valeurs par défaut
thresholds = thresholds or default_thresholds
# Récupérer les seuils pour cet indice # Récupérer les seuils pour cet indice
if index_type in thresholds: if index_type in thresholds:
index_thresholds = thresholds[index_type] index_thresholds = thresholds[index_type]
# Déterminer la couleur # Déterminer la couleur
if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \ if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \
index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]: index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]:
@ -92,46 +84,11 @@ def determine_threshold_color(value, index_type, thresholds=None):
suffix = get_suffix_for_index(index_type, "rouge") suffix = get_suffix_for_index(index_type, "rouge")
return "Rouge", suffix return "Rouge", suffix
# Fallback à l'ancienne méthode
if index_type == "IHH":
if value < 15:
return "Vert", "Faible"
elif value < 25:
return "Orange", "Modérée"
else:
return "Rouge", "Élevée"
elif index_type == "ISG":
if value < 40:
return "Vert", "Stable"
elif value < 60:
return "Orange", "Intermédiaire"
else:
return "Rouge", "Instable"
elif index_type == "ICS":
if value < 0.3:
return "Vert", "Facile"
elif value < 0.6:
return "Orange", "Moyenne"
else:
return "Rouge", "Difficile"
elif index_type == "IVC":
if value < 5:
return "Vert", "Faible"
elif value < 15:
return "Orange", "Modérée"
else:
return "Rouge", "Forte"
return "Non déterminé", "" return "Non déterminé", ""
def get_suffix_for_index(index_type, color): def get_suffix_for_index(index_type, color):
"""Retourne le suffixe approprié pour chaque indice et couleur.""" """Retourne le suffixe approprié pour chaque indice et couleur."""
suffixes = { suffixes = DICTIONNAIRE_CRITICITES
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
"ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"},
"ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"},
"IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"}
}
if index_type in suffixes and color in suffixes[index_type]: if index_type in suffixes and color in suffixes[index_type]:
return suffixes[index_type][color] return suffixes[index_type][color]
@ -139,66 +96,9 @@ def get_suffix_for_index(index_type, color):
def get_weight_for_color(color): def get_weight_for_color(color):
"""Retourne le poids correspondant à une couleur.""" """Retourne le poids correspondant à une couleur."""
weights = { weights = POIDS_COULEURS
"Vert": 1,
"Orange": 2,
"Rouge": 3
}
return weights.get(color, 0) return weights.get(color, 0)
def old_find_corpus_file(pattern, base_path=None):
"""
Trouve un fichier dans le corpus qui correspond au motif, indépendamment des préfixes numériques.
Args:
pattern: Le motif de recherche (slug)
base_path: Le chemin de base pour la recherche (optionnel)
Returns:
Le chemin relatif du fichier trouvé ou None si aucun fichier n'est trouvé
"""
if base_path:
search_path = os.path.join(CORPUS_DIR, base_path)
else:
search_path = CORPUS_DIR
print(f"Recherche: '{pattern}' dans {search_path}")
# Vérifier d'abord si le chemin exact existe (pour les chemins complets spécifiés)
exact_path = os.path.join(CORPUS_DIR, pattern) if not pattern.startswith(os.path.sep) else pattern
if os.path.exists(exact_path) and exact_path.endswith('.md'):
print(f"Trouvé chemin exact: {exact_path}")
return os.path.relpath(exact_path, CORPUS_DIR)
# Sinon, transformer le motif en expression régulière
# Remplacer les tirets et les espaces par des caractères génériques
regex_pattern = pattern.replace('-', '[-_ ]').replace(' ', '[-_ ]')
# Chercher récursivement dans le répertoire
for root, dirs, files in os.walk(search_path):
for file in files:
if file.endswith('.md'):
full_path = os.path.join(root, file)
# Ignorer les préfixes numériques
file_without_prefix = re.sub(r'^\d+[-_ ]*', '', file)
# Rechercher le motif dans le nom du fichier
if re.search(regex_pattern, file_without_prefix, re.IGNORECASE):
rel_path = os.path.relpath(full_path, CORPUS_DIR)
print(f"Trouvé via regex: {rel_path}")
return rel_path
# Si toujours pas trouvé, essayer une recherche plus approfondie
print(f"Fichier non trouvé: '{pattern}' dans {search_path}")
# Essayer de lister les fichiers disponibles pour aider au débogage
if base_path and os.path.exists(search_path):
print(f"Fichiers disponibles dans {search_path}:")
for file in os.listdir(search_path):
if file.endswith('.md'):
print(f" - {file}")
return None
def strip_prefix(name): def strip_prefix(name):
"""Supprime le préfixe numérique éventuel d'un nom de fichier ou de dossier.""" """Supprime le préfixe numérique éventuel d'un nom de fichier ou de dossier."""
return re.sub(r'^\d+[-_ ]*', '', name).lower() return re.sub(r'^\d+[-_ ]*', '', name).lower()
@ -242,14 +142,17 @@ def find_corpus_file(pattern, base_path=None):
Returns: Returns:
Chemin relatif du fichier trouvé ou None Chemin relatif du fichier trouvé ou None
""" """
if base_path: if base_path:
search_path = os.path.join(CORPUS_DIR, base_path) search_path = os.path.join(CORPUS_DIR, base_path)
else: else:
search_path = CORPUS_DIR search_path = CORPUS_DIR
print(f"Recherche de: '{pattern}' dans {search_path}") # print(f"Recherche de: '{pattern}' dans {search_path}")
if not os.path.exists(search_path): if not os.path.exists(search_path):
print(pattern)
print(base_path)
print(f"Chemin inexistant: {search_path}") print(f"Chemin inexistant: {search_path}")
return None return None
@ -260,7 +163,7 @@ def find_corpus_file(pattern, base_path=None):
continue continue
if strip_prefix(os.path.splitext(file)[0]) == pattern.lower(): if strip_prefix(os.path.splitext(file)[0]) == pattern.lower():
rel_path = os.path.relpath(os.path.join(search_path, file), CORPUS_DIR) rel_path = os.path.relpath(os.path.join(search_path, file), CORPUS_DIR)
print(f"Fichier trouvé: {rel_path}") # print(f"Fichier trouvé: {rel_path}")
return rel_path return rel_path
else: else:
# Séparation du chemin en dossier/fichier # Séparation du chemin en dossier/fichier
@ -269,7 +172,7 @@ def find_corpus_file(pattern, base_path=None):
if matched_dir: if matched_dir:
return find_corpus_file(rest, matched_dir) return find_corpus_file(rest, matched_dir)
print(f"Aucun fichier correspondant à: '{pattern}' trouvé.") print(f"Aucun fichier correspondant à: '{pattern}' trouvé dans {base_path}.")
return None return None
@ -308,8 +211,6 @@ def read_corpus_file(file_path, remove_first_title=False, shift_titles=0):
# Nettoyer les retours à la ligne superflus # Nettoyer les retours à la ligne superflus
content = ''.join(lines) content = ''.join(lines)
# Remplacer les triples retours à la ligne par des doubles
content = re.sub(r'\n\n\n+', '\n\n', content)
# Supprimer les retours à la ligne en fin de contenu # Supprimer les retours à la ligne en fin de contenu
content = content.rstrip('\n') + '\n' content = content.rstrip('\n') + '\n'
@ -320,7 +221,7 @@ def parse_graphs(config):
Charge et analyse les graphes DOT (analyse et référence). Charge et analyse les graphes DOT (analyse et référence).
""" """
# Charger le graphe à analyser # Charger le graphe à analyser
graphe_path = config['graphe_path'] graphe_path = GRAPH_PATH
if not os.path.exists(graphe_path): if not os.path.exists(graphe_path):
print(f"Fichier de graphe à analyser introuvable: {graphe_path}") print(f"Fichier de graphe à analyser introuvable: {graphe_path}")
sys.exit(1) sys.exit(1)
@ -516,6 +417,201 @@ def extract_data_from_graph(graph, ref_graph):
country_name = graph.nodes[target].get('label', '') country_name = graph.nodes[target].get('label', '')
data["countries"][source]["geo_country"] = country_name data["countries"][source]["geo_country"] = country_name
# Compléter les opérations manquantes pour les produits et composants
# en les récupérant du graphe de référence si elles existent
# Pour les produits finaux (N0)
for product_id, product_data in data["products"].items():
if product_data["assembly"] is None:
# Chercher l'opération d'assemblage dans le graphe de référence
for source, target, edge_attrs in ref_graph.edges(data=True):
if (source == product_id and
ref_graph.nodes[source].get('niveau') == 0 and
ref_graph.nodes[target].get('niveau') == 10 and
ref_graph.nodes[target].get('label', '').lower() == 'assemblage'):
# L'opération existe dans le graphe de référence
assembly_id = target
product_data["assembly"] = assembly_id
# Ajouter l'opération si elle n'existe pas déjà
if assembly_id not in data["operations"]:
data["operations"][assembly_id] = {
"label": ref_graph.nodes[assembly_id].get('label', assembly_id),
"type": "assemblage",
"ihh_acteurs": ref_graph.nodes[assembly_id].get('ihh_acteurs', 0),
"ihh_pays": ref_graph.nodes[assembly_id].get('ihh_pays', 0),
"countries": {}
}
# Extraire les relations de l'opération vers les pays
for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True):
if (op_source == assembly_id and
ref_graph.nodes[op_target].get('niveau') == 11):
country_id = op_target
# Extraire part de marché
market_share = 0
if 'percentage' in op_edge_attrs:
market_share = op_edge_attrs['percentage']
elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']:
try:
market_share = float(op_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter le pays à l'opération
data["operations"][assembly_id]["countries"][country_id] = market_share
# Ajouter le pays s'il n'existe pas déjà
if country_id not in data["countries"]:
data["countries"][country_id] = {
"label": ref_graph.nodes[country_id].get('label', country_id),
"actors": {},
"geo_country": None,
"market_share": market_share
}
else:
data["countries"][country_id]["market_share"] = market_share
# Extraire les relations du pays vers les acteurs
for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True):
if (country_source == country_id and
ref_graph.nodes[country_target].get('niveau') == 12):
actor_id = country_target
# Extraire part de marché
actor_market_share = 0
if 'percentage' in country_edge_attrs:
actor_market_share = country_edge_attrs['percentage']
elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']:
try:
actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter l'acteur au pays
data["countries"][country_id]["actors"][actor_id] = actor_market_share
# Ajouter l'acteur s'il n'existe pas déjà
if actor_id not in data["actors"]:
data["actors"][actor_id] = {
"label": ref_graph.nodes[actor_id].get('label', actor_id),
"country": country_id,
"market_share": actor_market_share
}
else:
data["actors"][actor_id]["market_share"] = actor_market_share
data["actors"][actor_id]["country"] = country_id
# Extraire la relation du pays vers le pays géographique
for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True):
if (geo_source == country_id and
ref_graph.nodes[geo_target].get('niveau') == 99):
geo_country_name = ref_graph.nodes[geo_target].get('label', '')
data["countries"][country_id]["geo_country"] = geo_country_name
break # Une seule opération d'assemblage par produit
# Pour les composants (N1)
for component_id, component_data in data["components"].items():
if component_data["manufacturing"] is None:
# Chercher l'opération de fabrication dans le graphe de référence
for source, target, edge_attrs in ref_graph.edges(data=True):
if (source == component_id and
ref_graph.nodes[source].get('niveau') == 1 and
ref_graph.nodes[target].get('niveau') == 10 and
ref_graph.nodes[target].get('label', '').lower() == 'fabrication'):
# L'opération existe dans le graphe de référence
manufacturing_id = target
component_data["manufacturing"] = manufacturing_id
# Ajouter l'opération si elle n'existe pas déjà
if manufacturing_id not in data["operations"]:
data["operations"][manufacturing_id] = {
"label": ref_graph.nodes[manufacturing_id].get('label', manufacturing_id),
"type": "fabrication",
"ihh_acteurs": ref_graph.nodes[manufacturing_id].get('ihh_acteurs', 0),
"ihh_pays": ref_graph.nodes[manufacturing_id].get('ihh_pays', 0),
"countries": {}
}
# Extraire les relations de l'opération vers les pays
for op_source, op_target, op_edge_attrs in ref_graph.edges(data=True):
if (op_source == manufacturing_id and
ref_graph.nodes[op_target].get('niveau') == 11):
country_id = op_target
# Extraire part de marché
market_share = 0
if 'percentage' in op_edge_attrs:
market_share = op_edge_attrs['percentage']
elif 'label' in op_edge_attrs and '%' in op_edge_attrs['label']:
try:
market_share = float(op_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter le pays à l'opération
data["operations"][manufacturing_id]["countries"][country_id] = market_share
# Ajouter le pays s'il n'existe pas déjà
if country_id not in data["countries"]:
data["countries"][country_id] = {
"label": ref_graph.nodes[country_id].get('label', country_id),
"actors": {},
"geo_country": None,
"market_share": market_share
}
else:
data["countries"][country_id]["market_share"] = market_share
# Extraire les relations du pays vers les acteurs
for country_source, country_target, country_edge_attrs in ref_graph.edges(data=True):
if (country_source == country_id and
ref_graph.nodes[country_target].get('niveau') == 12):
actor_id = country_target
# Extraire part de marché
actor_market_share = 0
if 'percentage' in country_edge_attrs:
actor_market_share = country_edge_attrs['percentage']
elif 'label' in country_edge_attrs and '%' in country_edge_attrs['label']:
try:
actor_market_share = float(country_edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Ajouter l'acteur au pays
data["countries"][country_id]["actors"][actor_id] = actor_market_share
# Ajouter l'acteur s'il n'existe pas déjà
if actor_id not in data["actors"]:
data["actors"][actor_id] = {
"label": ref_graph.nodes[actor_id].get('label', actor_id),
"country": country_id,
"market_share": actor_market_share
}
else:
data["actors"][actor_id]["market_share"] = actor_market_share
data["actors"][actor_id]["country"] = country_id
# Extraire la relation du pays vers le pays géographique
for geo_source, geo_target, geo_edge_attrs in ref_graph.edges(data=True):
if (geo_source == country_id and
ref_graph.nodes[geo_target].get('niveau') == 99):
geo_country_name = ref_graph.nodes[geo_target].get('label', '')
data["countries"][country_id]["geo_country"] = geo_country_name
break # Une seule opération de fabrication par composant
return data return data
def calculate_vulnerabilities(data, config): def calculate_vulnerabilities(data, config):
@ -873,20 +969,57 @@ def generate_methodology_section():
return "\n".join(template) return "\n".join(template)
def composant_match(nom_composant, nom_dossier):
"""
Vérifie si le nom du composant correspond approximativement à un nom de dossier (lettres et chiffres dans le même ordre).
"""
def clean(s):
return ''.join(c.lower() for c in s if c.isalnum())
cleaned_comp = clean(nom_composant)
cleaned_dir = clean(nom_dossier)
# Vérifie que chaque caractère de cleaned_comp est présent dans cleaned_dir dans le bon ordre
it = iter(cleaned_dir)
return all(c in it for c in cleaned_comp)
def trouver_dossier_composant(nom_composant, base_path, prefixe):
"""
Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant.
"""
search_path = os.path.join(CORPUS_DIR, base_path)
if not os.path.exists(search_path):
return None
for d in os.listdir(search_path):
if os.path.isdir(os.path.join(search_path, d)):
if composant_match(f"{prefixe}{nom_composant}", d):
return os.path.join(base_path, d)
return None
def generate_operations_section(data, results, config): def generate_operations_section(data, results, config):
""" """
Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement). Génère la section détaillant les opérations (assemblage, fabrication, extraction, traitement).
""" """
# print("DEBUG: Génération de la section des opérations")
# print(f"DEBUG: Nombre de produits: {len(data['products'])}")
# print(f"DEBUG: Nombre de composants: {len(data['components'])}")
# print(f"DEBUG: Nombre d'opérations: {len(data['operations'])}")
template = [] template = []
template.append("## Détails des opérations\n") template.append("## Détails des opérations\n")
# 1. Traiter les produits finaux (assemblage) # 1. Traiter les produits finaux (assemblage)
for product_id, product in data["products"].items(): for product_id, product in data["products"].items():
# print(f"DEBUG: Produit {product_id} ({product['label']}), assembly = {product['assembly']}")
if product["assembly"]: if product["assembly"]:
template.append(f"### {product['label']} et Assemblage\n") template.append(f"### {product['label']} et Assemblage\n")
# Récupérer la présentation synthétique # Récupérer la présentation synthétique
product_slug = product['label'].lower().replace(' ', '-') # product_slug = product['label'].lower().replace(' ', '-')
sous_repertoire = f"{product['label']}"
sous_repertoire = trouver_dossier_composant(sous_repertoire, "Assemblage", "Fiche assemblage ")
product_slug = sous_repertoire.split(' ', 2)[2]
presentation_file = find_corpus_file("présentation-synthétique", f"Assemblage/Fiche assemblage {product_slug}") presentation_file = find_corpus_file("présentation-synthétique", f"Assemblage/Fiche assemblage {product_slug}")
if presentation_file: if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append(read_corpus_file(presentation_file, remove_first_title=True))
@ -928,7 +1061,7 @@ def generate_operations_section(data, results, config):
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**") template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**")
# IHH # IHH
ihh_file = find_corpus_file("indice-de-herfindahl-hirschmann", f"Assemblage/Fiche assemblage {product_slug}/matrice-des-risques") ihh_file = find_corpus_file("matrice-des-risques-liés-à-l-assemblage/indice-de-herfindahl-hirschmann", f"Assemblage/Fiche assemblage {product_slug}")
if ihh_file: if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n") template.append("\n")
@ -936,7 +1069,7 @@ def generate_operations_section(data, results, config):
# Vulnérabilité combinée # Vulnérabilité combinée
if assembly_id in results["ihh_isg_combined"]: if assembly_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][assembly_id] combined = results["ihh_isg_combined"][assembly_id]
template.append(f"#### Vulnérabilité combinée IHH-ISG\n") template.append("#### Vulnérabilité combinée IHH-ISG\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Poids combiné: {combined['combined_weight']}")
@ -944,11 +1077,15 @@ def generate_operations_section(data, results, config):
# 2. Traiter les composants (fabrication) # 2. Traiter les composants (fabrication)
for component_id, component in data["components"].items(): for component_id, component in data["components"].items():
# print(f"DEBUG: Composant {component_id} ({component['label']}), manufacturing = {component['manufacturing']}")
if component["manufacturing"]: if component["manufacturing"]:
template.append(f"### {component['label']} et Fabrication\n") template.append(f"### {component['label']} et Fabrication\n")
# Récupérer la présentation synthétique # Récupérer la présentation synthétique
component_slug = component['label'].lower().replace(' ', '-') # component_slug = component['label'].lower().replace(' ', '-')
sous_repertoire = f"{component['label']}"
sous_repertoire = trouver_dossier_composant(sous_repertoire, "Fabrication", "Fiche fabrication ")
component_slug = sous_repertoire.split(' ', 2)[2]
presentation_file = find_corpus_file("présentation-synthétique", f"Fabrication/Fiche fabrication {component_slug}") presentation_file = find_corpus_file("présentation-synthétique", f"Fabrication/Fiche fabrication {component_slug}")
if presentation_file: if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append(read_corpus_file(presentation_file, remove_first_title=True))
@ -965,8 +1102,8 @@ def generate_operations_section(data, results, config):
operation = data["operations"][manufacturing_id] operation = data["operations"][manufacturing_id]
template.append("#### ISG des pays impliqués\n") template.append("#### ISG des pays impliqués\n")
template.append("| Pays | Part de marché | ISG | Criticité |\n") template.append("| Pays | Part de marché | ISG | Criticité |")
template.append("| :-- | :-- | :-- | :-- |\n") template.append("| :-- | :-- | :-- | :-- |")
isg_weighted_sum = 0 isg_weighted_sum = 0
total_share = 0 total_share = 0
@ -990,7 +1127,7 @@ def generate_operations_section(data, results, config):
template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n") template.append(f"\n**ISG combiné: {isg_combined:.0f} - {color} ({suffix})**\n")
# IHH # IHH
ihh_file = find_corpus_file("indice-de-herfindahl-hirschmann", f"Fabrication/Fiche fabrication {component_slug}/matrice-des-risques") ihh_file = find_corpus_file("matrice-des-risques-liés-à-la-fabrication/indice-de-herfindahl-hirschmann", f"Fabrication/Fiche fabrication {component_slug}")
if ihh_file: if ihh_file:
template.append(read_corpus_file(ihh_file, shift_titles=1)) template.append(read_corpus_file(ihh_file, shift_titles=1))
template.append("\n") template.append("\n")
@ -998,7 +1135,7 @@ def generate_operations_section(data, results, config):
# Vulnérabilité combinée # Vulnérabilité combinée
if manufacturing_id in results["ihh_isg_combined"]: if manufacturing_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][manufacturing_id] combined = results["ihh_isg_combined"][manufacturing_id]
template.append(f"#### Vulnérabilité combinée IHH-ISG\n") template.append("#### Vulnérabilité combinée IHH-ISG\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Poids combiné: {combined['combined_weight']}")
@ -1006,7 +1143,26 @@ def generate_operations_section(data, results, config):
# 3. Traiter les minerais (détaillés dans une section séparée) # 3. Traiter les minerais (détaillés dans une section séparée)
return "\n".join(template) result = "\n".join(template)
# print(f"DEBUG: Fin de génération de la section des opérations. Taille: {len(result)} caractères")
if len(result) <= 30: # Juste le titre de section
# print("DEBUG: ALERTE - La section des opérations est vide ou presque vide!")
# Ajout d'une section de débogage dans le rapport
template.append("### DÉBOGAGE - Opérations manquantes\n")
template.append("Aucune opération d'assemblage ou de fabrication n'a été trouvée dans les données.\n")
template.append("Informations disponibles:\n")
template.append(f"* Nombre de produits: {len(data['products'])}\n")
template.append(f"* Nombre de composants: {len(data['components'])}\n")
template.append(f"* Nombre d'opérations: {len(data['operations'])}\n")
template.append("\nDétail des produits et de leurs opérations d'assemblage:\n")
for pid, p in data["products"].items():
template.append(f"* {p['label']}: {'Assemblage: ' + str(p['assembly']) if p['assembly'] else 'Pas d\'assemblage'}\n")
template.append("\nDétail des composants et de leurs opérations de fabrication:\n")
for cid, c in data["components"].items():
template.append(f"* {c['label']}: {'Fabrication: ' + str(c['manufacturing']) if c['manufacturing'] else 'Pas de fabrication'}\n")
result = "\n".join(template)
return result
def generate_minerals_section(data, results, config): def generate_minerals_section(data, results, config):
""" """
@ -1016,10 +1172,14 @@ def generate_minerals_section(data, results, config):
template.append("## Détails des minerais\n") template.append("## Détails des minerais\n")
for mineral_id, mineral in data["minerals"].items(): for mineral_id, mineral in data["minerals"].items():
template.append(f"### {mineral['label']}\n") mineral_slug = mineral['label'].lower().replace(' ', '-')
fiche_dir = f"{CORPUS_DIR}/Minerai/Fiche minerai {mineral_slug}"
if not os.path.exists(fiche_dir):
continue
template.append(f"---\n\n### {mineral['label']}\n")
# Récupérer la présentation synthétique # Récupérer la présentation synthétique
mineral_slug = mineral['label'].lower().replace(' ', '-')
presentation_file = find_corpus_file("présentation-synthétique", f"Minerai/Fiche minerai {mineral_slug}") presentation_file = find_corpus_file("présentation-synthétique", f"Minerai/Fiche minerai {mineral_slug}")
if presentation_file: if presentation_file:
template.append(read_corpus_file(presentation_file, remove_first_title=True)) template.append(read_corpus_file(presentation_file, remove_first_title=True))
@ -1051,7 +1211,7 @@ def generate_minerals_section(data, results, config):
template.append(f"\n**ICS moyen : {ics_average:.2f} - {color} ({suffix})**\n") template.append(f"\n**ICS moyen : {ics_average:.2f} - {color} ({suffix})**\n")
# IVC # IVC
template.append("#### IVC\n") template.append("#### IVC\n\n")
# Valeur IVC # Valeur IVC
ivc_value = mineral.get("ivc", 0) ivc_value = mineral.get("ivc", 0)
@ -1071,7 +1231,6 @@ def generate_minerals_section(data, results, config):
for section_file in ivc_sections: for section_file in ivc_sections:
content = read_corpus_file(section_file, remove_first_title=False) content = read_corpus_file(section_file, remove_first_title=False)
# Nettoyer les balises des fichiers IVC # Nettoyer les balises des fichiers IVC
content = re.sub(r'<!----.*?-->', '', content)
content = re.sub(r'```.*?```', '', content, flags=re.DOTALL) content = re.sub(r'```.*?```', '', content, flags=re.DOTALL)
# Mettre le titre en italique s'il commence par un # (format Markdown pour titre) # Mettre le titre en italique s'il commence par un # (format Markdown pour titre)
@ -1080,7 +1239,7 @@ def generate_minerals_section(data, results, config):
if first_line.strip().startswith('#'): if first_line.strip().startswith('#'):
# Extraire le texte du titre sans les # et les espaces # Extraire le texte du titre sans les # et les espaces
title_text = first_line.strip().lstrip('#').strip() title_text = first_line.strip().lstrip('#').strip()
content = f"*{title_text}*\n{rest.strip()}" content = f"\n*{title_text}*\n{rest.strip()}"
# Ne pas ajouter de contenu vide # Ne pas ajouter de contenu vide
if content.strip(): if content.strip():
@ -1197,7 +1356,7 @@ def generate_minerals_section(data, results, config):
# Vulnérabilité combinée # Vulnérabilité combinée
if treatment_id in results["ihh_isg_combined"]: if treatment_id in results["ihh_isg_combined"]:
combined = results["ihh_isg_combined"][treatment_id] combined = results["ihh_isg_combined"][treatment_id]
template.append(f"##### Vulnérabilité combinée IHH-ISG pour le traitement\n") template.append("##### Vulnérabilité combinée IHH-ISG pour le traitement\n")
template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})") template.append(f"* IHH: {combined['ihh_value']} - {combined['ihh_color']} ({combined['ihh_suffix']})")
template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})") template.append(f"* ISG combiné: {combined['isg_combined']:.0f} - {combined['isg_color']} ({combined['isg_suffix']})")
template.append(f"* Poids combiné: {combined['combined_weight']}") template.append(f"* Poids combiné: {combined['combined_weight']}")
@ -1346,31 +1505,45 @@ def generate_report(data, results, config):
Génère le rapport complet structuré selon les spécifications. Génère le rapport complet structuré selon les spécifications.
""" """
# Titre principal # Titre principal
report = ["# Évaluation des vulnérabilités critiques\n"] report_titre = ["# Évaluation des vulnérabilités critiques\n"]
# Section d'introduction # Section d'introduction
report.append(generate_introduction_section(data)) report_introduction = generate_introduction_section(data)
# report.append(generate_introduction_section(data))
# Section méthodologie # Section méthodologie
report.append(generate_methodology_section()) report_methodologie = generate_methodology_section()
# report.append(generate_methodology_section())
# Section détails des opérations # Section détails des opérations
report.append(generate_operations_section(data, results, config)) report_operations = generate_operations_section(data, results, config)
# report.append(generate_operations_section(data, results, config))
# Section détails des minerais # Section détails des minerais
report.append(generate_minerals_section(data, results, config)) report_minerals = generate_minerals_section(data, results, config)
# report.append(generate_minerals_section(data, results, config))
# Section chemins critiques # Section chemins critiques
report.append(generate_critical_paths_section(data, results)) report_critical_paths = generate_critical_paths_section(data, results)
# report.append(generate_critical_paths_section(data, results))
# Ordre de composition final
report = (
report_titre +
[report_introduction] +
[report_critical_paths] +
[report_operations] +
[report_minerals] +
[report_methodologie]
)
return "\n".join(report) return "\n".join(report)
def write_report(report, config): def write_report(report, config):
"""Écrit le rapport généré dans le fichier spécifié.""" """Écrit le rapport généré dans le fichier spécifié."""
template_path = config['template_path'] with open(TEMPLATE_PATH, 'w', encoding='utf-8') as f:
with open(template_path, 'w', encoding='utf-8') as f:
f.write(report) f.write(report)
print(f"Rapport généré avec succès: {template_path}") print(f"Rapport généré avec succès: {TEMPLATE_PATH}")
def main(): def main():
"""Fonction principale du script.""" """Fonction principale du script."""

428
scripts/gestion/beautify.py Normal file
View File

@ -0,0 +1,428 @@
"""
Version 1.0
Date 27/03/2025
Auteur : Stéphan Peccini
Ce script permet de récupérer un graphe DOT syntaxiquement correct, pour
le formater selon la structure hiérarchique souhaitée.
Exemple d'utilisation :
-----------------------
Le script ihh.py met à jour les données ihh en fonction des nouvelles informations.
Pour faciliter le travail, tout est fait avec un graphe NetworkX et ensuite sauvé
au format DOT sans aucune structure hiérarchique.
Il suffit alors de passer le fichier résultat de ihh.py pour avoir un fichier DOT
conforme aux attentes.
"""
import sys
import networkx as nx
from networkx.drawing.nx_agraph import read_dot
import logging
logging.basicConfig(
filename="beautify_debug.log",
filemode="w",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
def formater_noeuds_par_niveau(schema, niveau, indentation=4):
"""
Génère la syntaxe DOT avec une hiérarchie complète de sous-graphes imbriqués:
- Sous-graphes pour les nœuds du niveau spécifié (0, 1, 2)
- Sous-graphes pour les nœuds de niveau 10 liés à ces nœuds
- Sous-graphes pour les nœuds de niveau 11 liés aux nœuds de niveau 10
- Pour les nœuds de niveau 11, inclut leurs nœuds destination (sauf niveau 99)
"""
# Convertir le niveau en chaîne si nécessaire
niveau_str = str(niveau)
# Identifier les nœuds du niveau spécifié
noeuds_niveau = [noeud for noeud, attr in schema.nodes(data=True)
if attr.get('niveau') == niveau_str]
# Récupérer les attributs de ces nœuds
attributs_niveau = {noeud: schema.nodes[noeud] for noeud in noeuds_niveau}
# Définir les couleurs selon la légende
couleurs_operations = {
"Assemblage": "#a0d6ff",
"Niveau Composant": "#b3ffe0",
"Fabrication": "#ffe0cc",
"Extraction": "#ffd699",
"Reserves": "#ffd699",
"Pays": "#e6f2ff",
"Zone": "#e6f2ff",
"Entreprise": "#f0f0f0"
}
# Fonction pour déterminer l'opération et la couleur
def determiner_operation_et_couleur(nom_noeud):
operation = nom_noeud.split('_')[0] if '_' in nom_noeud else nom_noeud
couleur = "#ffffff" # Blanc par défaut
for op_cle, op_couleur in couleurs_operations.items():
if op_cle in operation:
couleur = op_couleur
break
return operation, couleur
# Fonction pour formater une arête avec ses attributs
def formater_arete(source, dest, edge_attrs, indent_level):
edge_str = f"{indent_level}{source} -> {dest} ["
edge_attrs_formatted = []
for edge_attr, edge_val in edge_attrs.items():
if edge_val:
edge_attrs_formatted.append(f'{edge_attr}="{edge_val}"')
edge_str += ", ".join(edge_attrs_formatted)
edge_str += "];\n"
return edge_str
# Fonction pour formater un nœud avec ses attributs
def formater_noeud(nom_noeud, attributs_noeud, indent_level):
node_str = f"{indent_level}{nom_noeud} ["
attrs_formattés = []
for attr_nom, attr_val in attributs_noeud.items():
if attr_val and attr_nom in ordre_attributs:
attrs_formattés.append(f'{attr_nom}="{attr_val}"')
node_str += ", ".join(attrs_formattés)
node_str += "];\n"
return node_str
# Fonction pour trouver toutes les relations sortantes d'un nœud
def trouver_relations_sortantes(noeud_source):
relations = []
for s, d, attrs in schema.edges(data=True):
if s == noeud_source:
relations.append((s, d, attrs))
# Suppression des doublons en convertissant le dictionnaire en tuple trié
relations_sans_doublons = set((a, b, tuple(sorted(c.items()))) for a, b, c in relations)
# Reconversion en dictionnaire
relations_finales = [(a, b, dict(c)) for a, b, c in relations_sans_doublons]
return relations_finales
# Définir les niveaux d'indentation
indent = " " * (indentation + 4)
indent_inner = " " * (indentation + 8)
indent_inner2 = " " * (indentation + 12)
indent_inner3 = " " * (indentation + 16)
# Préparer la chaîne de sortie
sortie = f"\n{indent}// Sous-graphes pour les nœuds de niveau {niveau} avec leurs relations\n"
# Définir l'ordre des attributs
ordre_attributs = ["ihh_pays", "ihh_acteurs", "ivc", "label", "niveau", "fillcolor", "orphelin", "shape", "style", "fontname"]
# Formater chaque nœud comme un subgraph avec ses relations sortantes
for nom_noeud, attributs in attributs_niveau.items():
# Débuter le sous-graphe principal
sortie += f"{indent}subgraph cluster_{nom_noeud} {{\n"
sortie += f"{indent_inner}label=\"{nom_noeud}\";\n"
# Ajouter les autres attributs du sous-graphe
for attr_nom, attr_valeur in attributs.items():
if attr_valeur and attr_nom in ["style", "fillcolor", "color", "fontname"]:
sortie += f"{indent_inner}{attr_nom}=\"{attr_valeur}\";\n"
# Ajouter le nœud lui-même dans le sous-graphe
sortie += formater_noeud(nom_noeud, attributs, indent_inner)
# Trouver toutes les relations dont ce nœud est la source
relations_sortantes = []
sous_graphes_niveau_10 = {} # Dictionnaire pour stocker les sous-graphes de niveau 10
for source, dest, edge_attrs in trouver_relations_sortantes(nom_noeud):
# Vérifier si la destination est un nœud de niveau 10
dest_attrs = schema.nodes[dest]
# est_niveau_10 = dest_attrs.get('niveau') == "10"
est_niveau_10 = str(dest_attrs.get('niveau')) in {"10", "1010"}
# Formater l'arête pour le sous-graphe parent
edge_str = formater_arete(source, dest, edge_attrs, indent_inner)
relations_sortantes.append(edge_str)
# Traiter les nœuds de niveau 10
if est_niveau_10 and dest not in sous_graphes_niveau_10:
# Début du sous-graphe de niveau 10
operation, couleur = determiner_operation_et_couleur(dest)
sg_str = f"\n{indent_inner}subgraph cluster_{dest} {{\n"
sg_str += f"{indent_inner2}label=\"{dest}\";\n"
# Couleur de remplissage
if 'fillcolor' in dest_attrs and dest_attrs['fillcolor']:
sg_str += f"{indent_inner2}fillcolor=\"{dest_attrs['fillcolor']}\";\n"
else:
sg_str += f"{indent_inner2}fillcolor=\"{couleur}\";\n"
sg_str += f"{indent_inner2}style=\"filled\";\n"
# Ajouter le nœud de niveau 10 lui-même
sg_str += formater_noeud(dest, dest_attrs, indent_inner2)
# Collecter les relations sortantes du nœud de niveau 10
relations_niveau_10 = trouver_relations_sortantes(dest)
sous_graphes_niveau_11 = {} # Pour stocker les sous-graphes de niveau 11
if relations_niveau_10:
sg_str += f"\n{indent_inner2}// Relations sortantes du nœud de niveau 10\n"
for src, dst, rel_attrs in relations_niveau_10:
# Ajouter la relation
sg_str += formater_arete(src, dst, rel_attrs, indent_inner2)
# Vérifier si la destination est un nœud de niveau 11
if dst in schema.nodes:
dst_attrs = schema.nodes[dst]
dst_niveau = dst_attrs.get('niveau', '')
# Créer un sous-graphe pour les nœuds de niveau 11
if (dst_niveau == "11" or dst_niveau == "1011") and dst not in sous_graphes_niveau_11:
# Début du sous-graphe de niveau 11
dst_operation, dst_couleur = determiner_operation_et_couleur(dst)
dst_sg = f"\n{indent_inner2}subgraph cluster_{dst} {{\n"
dst_sg += f"{indent_inner3}label=\"{dst}\";\n"
# Couleur de remplissage
if 'fillcolor' in dst_attrs and dst_attrs['fillcolor']:
dst_sg += f"{indent_inner3}fillcolor=\"{dst_attrs['fillcolor']}\";\n"
else:
dst_sg += f"{indent_inner3}fillcolor=\"{dst_couleur}\";\n"
dst_sg += f"{indent_inner3}style=\"filled\";\n"
# Ajouter le nœud de niveau 11 lui-même
dst_sg += formater_noeud(dst, dst_attrs, indent_inner3)
# Collecter les relations sortantes du nœud de niveau 11
relations_niveau_11 = trouver_relations_sortantes(dst)
noeuds_destination = {} # Pour stocker les nœuds destination
relations_additionnelles = [] # Pour stocker les relations des nœuds destination
if relations_niveau_11:
dst_sg += f"\n{indent_inner3}// Relations sortantes du nœud de niveau 11\n"
for n11_src, n11_dst, n11_attrs in relations_niveau_11:
# Ajouter la relation
dst_sg += formater_arete(n11_src, n11_dst, n11_attrs, indent_inner3)
# Ajouter le nœud destination sauf s'il est de niveau 99
if n11_dst in schema.nodes:
n11_dst_attrs = schema.nodes[n11_dst]
n11_dst_niveau = n11_dst_attrs.get('niveau', '')
if n11_dst_niveau != "99" and n11_dst not in noeuds_destination and n11_dst != dst:
noeuds_destination[n11_dst] = n11_dst_attrs
# Collecter les relations sortantes du nœud destination
for dest_src, dest_dst, dest_attrs in trouver_relations_sortantes(n11_dst):
relations_additionnelles.append((dest_src, dest_dst, dest_attrs))
# Ajouter les nœuds destination
for dest_nom, dest_attrs in noeuds_destination.items():
dst_sg += formater_noeud(dest_nom, dest_attrs, indent_inner3)
# Ajouter les relations additionnelles
if relations_additionnelles:
dst_sg += f"\n{indent_inner3}// Relations des nœuds destination\n"
for rel_src, rel_dst, rel_attrs in relations_additionnelles:
dst_sg += formater_arete(rel_src, rel_dst, rel_attrs, indent_inner3)
# Fermer le sous-graphe de niveau 11
dst_sg += f"{indent_inner2}}}\n"
# Ajouter à la collection
sous_graphes_niveau_11[dst] = dst_sg
# Ajouter les sous-graphes de niveau 11 au sous-graphe de niveau 10
if sous_graphes_niveau_11:
sg_str += "".join(sous_graphes_niveau_11.values())
# Fermer le sous-graphe de niveau 10
sg_str += f"{indent_inner}}}\n"
# Ajouter à la collection
sous_graphes_niveau_10[dest] = sg_str
# Ajouter les relations au sous-graphe principal
if relations_sortantes:
sortie += f"\n{indent_inner}// Relations sortantes\n"
sortie += "".join(relations_sortantes)
# Ajouter les sous-graphes de niveau 10
if sous_graphes_niveau_10:
sortie += "\n"
sortie += "".join(sous_graphes_niveau_10.values())
# Terminer le sous-graphe principal
sortie += f"{indent}}}\n\n"
return sortie
def generer_rank_same(schema, indentation=4):
"""
Génère des instructions rank=same pour les nœuds de même niveau dans un graphe DOT.
Args:
schema: Le graphe NetworkX chargé depuis un fichier DOT
indentation: Nombre d'espaces pour l'indentation (par défaut 4)
Returns:
Une chaîne contenant les instructions rank=same formatées pour le fichier DOT
"""
# Dictionnaire pour stocker les nœuds par niveau
noeuds_par_niveau = {}
# Parcourir tous les nœuds du graphe
for noeud, attrs in schema.nodes(data=True):
niveau = attrs.get('niveau')
if niveau: # Si le nœud a un attribut niveau
if niveau not in noeuds_par_niveau:
noeuds_par_niveau[niveau] = []
noeuds_par_niveau[niveau].append(noeud)
# Préparer l'indentation
indent = " " * indentation
# Générer les instructions rank=same
sortie = "\n" + indent + "// Alignement des nœuds par niveau\n"
# Trier les niveaux numériquement
for niveau, noeuds in sorted(noeuds_par_niveau.items(), key=lambda x: int(x[0]) if x[0].isdigit() else 99):
if noeuds: # S'il y a des nœuds pour ce niveau
sortie += indent + "{ rank=same; "
sortie += "; ".join(noeuds)
sortie += "; }\n"
return sortie
def formater_attributs(attributs):
# Filtrer les attributs non vides et les formater
formatted_attrs = []
for attr, value in attributs.items():
if value: # Ne pas inclure les attributs vides
# Ajouter des guillemets pour les valeurs textuelles (sauf pour certains attributs)
if attr in ('shape', 'style') or value.isdigit():
formatted_attrs.append(f"{attr}={value}")
else:
formatted_attrs.append(f'{attr}="{value}"')
# Gérer le cas spécial pour style=filled si nécessaire
if 'style' not in attributs or not attributs['style']:
formatted_attrs.append("style=filled")
return formatted_attrs
def main(fichier_entree, fichier_sortie):
# Charger le graphe DOT avec NetworkX
try:
schema = read_dot(fichier_entree)
except Exception as e:
logging.error(f"Erreur de lecture DOT : {e}", exc_info=True)
print(f"Erreur à l'ouverture du fichier DOT : {e}")
return
sortie = """digraph Hierarchie_Composants_Electroniques_Simplifiee {
// Configuration globale
graph [compound="True", rankdir="TB", ranksep="10.0"]
node [fontname="Arial", shape=box, style=filled];
edge [fontname="Arial", fontsize=10, style=filled];
"""
# Subgraph ASSEMBLAGE
sortie += """
// Niveau Assemblage
subgraph cluster_assemblage {
label="ASSEMBLAGE";
bgcolor="#f0f0f0";
node [fillcolor="#a0d6ff"];
"""
# Ajout des informations pour tous les nœudss de niveau 0
sortie += formater_noeuds_par_niveau(schema, "0")
sortie += formater_noeuds_par_niveau(schema, "1000")
sortie += """ }"""
# Subgraph COMPOSANTS
sortie += """
// Niveau Composants
subgraph cluster_composants {
label="Composants";
bgcolor="#f0f0f0";
node [fillcolor="#a0d6ff"];"""
# Ajout des informations pour tous les nœudss de niveau 1
sortie += formater_noeuds_par_niveau(schema, "1")
sortie += formater_noeuds_par_niveau(schema, "1001")
sortie += """ }"""
# Subgraph MATÉRIAUX
sortie += """
// Niveau Matériaux
subgraph cluster_materiaux {
label="Matériaux";
bgcolor="#f0f0f0";
node [fillcolor="#a0d6ff"];"""
# Ajout des informations pour tous les nœudss de niveau 2
sortie += formater_noeuds_par_niveau(schema, "2")
sortie += """ }"""
# Subgraph PAYS GÉOGRAPHIQUES
sortie += """
// Niveau Pays géographiques
subgraph cluster_pays_geographiques {
label="Pays géographiques";
bgcolor="#f0f0f0";
node [fillcolor="#a0d6ff"];"""
# Ajout des informations pour tous les nœudss de niveau 99
sortie += formater_noeuds_par_niveau(schema, "99")
sortie += """ }"""
# Ajouter les instructions rank=same pour l'alignement horizontal
sortie += generer_rank_same(schema)
sortie += """
// Légende
subgraph cluster_legende {
label="LÉGENDE";
bgcolor="white";
node [shape=box, style=filled, width=0.2, height=0.2];
L1 [label="Fabrication des wafers", fillcolor="#f0fff0"];
L2 [label="Assemblage", fillcolor="#a0d6ff"];
L3 [label="Niveau Composant", fillcolor="#b3ffe0"];
L4 [label="Fabrication N3 (Matériaux)", fillcolor="#ffe0cc"];
L5 [label="Fabrication N3 (Terres Rares)", fillcolor="#ffd699"];
L6 [label="Extraction/Réserves", fillcolor="#ffd699"];
L7 [label="Pays/Zone geographique", fillcolor="#e6f2ff"];
L8 [label="Entreprise", fillcolor="#f0f0f0"];
L9 [label="Liens réserves", color="red", fontcolor="white"];
L10 [label="Liens opération(traitement, fabrication, assemblage)", color="purple", fontcolor="white"];
L11 [label="Liens extraction", color="orange", fontcolor="white"];
L12 [label="Liens d'origine géographique", color="darkgreen", fontcolor="white"];
L13 [label="Liens origine des minerais", color="darkblue", fontcolor="white"];
}
}
"""
try:
with open(f"{fichier_sortie}", "w", encoding="utf-8") as f:
print(f"{sortie}", file=f)
except FileNotFoundError:
print(f"Erreur : Le chemin vers '{fichier_sortie}' n'existe pas")
except PermissionError:
print(f"Erreur : Permissions insuffisantes pour écrire dans '{fichier_sortie}'")
except IOError as e:
print(f"Erreur d'E/S lors de l'écriture dans le fichier : {e}")
except Exception as e:
print(f"Erreur inattendue : {e}")
else:
print(f"Écriture dans '{fichier_sortie}' réussie")
if len(sys.argv) != 3:
print("Usage: python script.py fichier_entree.dot fichier_sortie.dot")
else:
fichier_entree = sys.argv[1]
fichier_sortie = sys.argv[2]
main(fichier_entree, fichier_sortie)

133
scripts/gestion/ihh.py Normal file
View File

@ -0,0 +1,133 @@
"""
Version 1.0
Date 27/03/2025
Auteur : Stéphan Peccini
Ce script met à jour les valeurs ihh en fonction des données
de part de marché des pays des acteurs.
https://fr.wikipedia.org/wiki/Indice_de_Herfindahl-Hirschmann
Important :
-----------------------
Pour faciliter le travail, tout est fait avec un graphe NetworkX et ensuite sauvé
au format DOT sans aucune structure hiérarchique.
Il suffit alors de passer le fichier résultat dans beautify.py pour avoir un fichier
DOT conforme aux attentes.
"""
import networkx as nx
from networkx.drawing.nx_pydot import write_dot
import sys
import re
def calcul_ihh(graphe, depart, arrivee):
ihh = 0
for noeud in arrivee:
if arrivee not in list(graphe.successors(depart)):
depart = list(graphe.predecessors(noeud))[0]
relation = graphe.get_edge_data(depart, noeud)
ihh += int(int(relation['label'].strip("%"))**2)
ihh = int(round(ihh/100))
return ihh
def mettre_a_jour_ihh(graph, noeuds):
for noeud in noeuds:
sous_graphe = nx.DiGraph()
try:
for chemin in nx.edge_bfs(graph, source=noeud):
sous_graphe.add_edge(*chemin)
# Ajouter les attributs des nœuds
for node in sous_graphe.nodes():
if node in graph.nodes():
sous_graphe.nodes[node].update(graph.nodes[node])
# Ajouter les attributs des arêtes
for edge in sous_graphe.edges():
if edge in graph.edges():
sous_graphe.edges[edge].update(graph.edges[edge])
niveaux_ihh = nx.get_node_attributes(sous_graphe, "niveau")
operation = noeud.split('_')[0]
# La hiérarchie Traitement est particulière. En effet, les acteurs du traitement se fournissent
# auprès des acteurs de l'extraction. Il y a une relations entre chaque acteur de traitement
# et les pays auprès desquels il se fournit en minerai.
if "Traitement" in operation:
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if v == "11" and operation in n]
noeuds_acteurs_ihh = [n for n, v in niveaux_ihh.items() if v == "12" and operation in list(sous_graphe.predecessors(n))[0]]
else:
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if v == "11"]
noeuds_acteurs_ihh = [n for n, v in niveaux_ihh.items() if v == "12"]
# Le calcul de l'indice de Herfindahl-Hirschmann se fait normalement au niveau d'une entreprise.
# Toutefois, il se fait au niveau des pays et au niveau des acteurs pour l'opération qui est menée.
ihh_pays = calcul_ihh(sous_graphe, noeud, noeuds_pays_ihh)
nx.set_node_attributes(graph, {noeud: {"ihh_pays": f"{ihh_pays}"}})
ihh_acteurs = calcul_ihh(sous_graphe, noeud, noeuds_acteurs_ihh)
nx.set_node_attributes(graph, {noeud: {"ihh_acteurs": f"{ihh_acteurs}"}})
except nx.NetworkXNoPath:
pass # Ignore les chemins inexistants
return graph
def mettre_a_jour_ihh_reserves(graph, noeuds):
for noeud in noeuds:
sous_graphe = nx.DiGraph()
try:
for chemin in nx.edge_bfs(graph, source=noeud):
sous_graphe.add_edge(*chemin)
# Ajouter les attributs des nœuds
for node in sous_graphe.nodes():
if node in graph.nodes():
sous_graphe.nodes[node].update(graph.nodes[node])
# Ajouter les attributs des arêtes
for edge in sous_graphe.edges():
if edge in graph.edges():
sous_graphe.edges[edge].update(graph.edges[edge])
niveaux_ihh = nx.get_node_attributes(sous_graphe, "niveau")
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if v == "11"]
# Le calcul de l'indice de Herfindahl-Hirschmann se fait normalement au niveau d'une entreprise.
# Toutefois, il se fait au niveau des pays et au niveau des acteurs pour l'opération qui est menée.
ihh_pays = calcul_ihh(sous_graphe, noeud, noeuds_pays_ihh)
nx.set_node_attributes(graph, {noeud: {"ihh_pays": f"{ihh_pays}"}})
except nx.NetworkXNoPath:
pass # Ignore les chemins inexistants
return graph
def main():
""" Fonction principale pour interagir avec l'utilisateur. """
if len(sys.argv) != 3:
print("Usage: python ihh.py fichier_en_entree.dot fichier_en_sortie.dot")
return
fichier_en_entree = sys.argv[1]
fichier_en_sortie = sys.argv[2]
graph = nx.DiGraph(nx.nx_agraph.read_dot(fichier_en_entree))
niveaux = nx.get_node_attributes(graph, "niveau")
noeuds_niveau_10 = [n for n, v in niveaux.items() if v == "10" and not re.search(r'Reserves', n)]
noeuds_niveau_10.sort()
graphe = mettre_a_jour_ihh(graph, noeuds_niveau_10)
noeuds_niveau_10 = [n for n, v in niveaux.items() if v == "10" and re.search(r'Reserves', n)]
noeuds_niveau_10.sort()
graphe = mettre_a_jour_ihh_reserves(graphe, noeuds_niveau_10)
write_dot(graphe, fichier_en_sortie)
if __name__ == "__main__":
main()

File diff suppressed because it is too large Load Diff