Amélioration de la génération du rapport

This commit is contained in:
Fabrication du Numérique 2025-05-22 12:49:54 +02:00
parent 813fb5684e
commit 4809661b0f
11 changed files with 8635 additions and 0 deletions

2
.gitignore vendored
View File

@ -8,12 +8,14 @@ __pycache__/
*.pyd
*.dot
prompt.md
.gitignore
# Ignorer cache et temporaire
.cache/
*.log
*.tmp
*.old
tempo/
# Ignorer config locale
.ropeproject/

87
scripts/Risques.md Normal file
View File

@ -0,0 +1,87 @@
# Quels sont les indices et leurs seuils
* IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'un opération
* Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)
* ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément dans le composant ou le procédé
* Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)
* ISG (Stabilité Géopolitique) : stabilité des pays
* Seuils : <40 = Vert (stable), 40-60 = Orange, >60 = Rouge (instable)
* IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs que le numérique
* Seuils : <5 = Vert (Faible), 5-15 = Moyenne (Modérée), >15 = Rouge (Forte)
Les seuils permettent de définir une échelle facile à comprendre.
Il est important de comprendre que se trouver en bas ou en haut d'une plage (verte, orange ou rouge) n'a pas le même niveau de risque. un niveau ICS à 0.65 (capacité de substitution faible) est moindre qu'un niveau d'ICS à 1 (aucune capacité à remplacer), tout en restant élevé.
# Combinaison des risques
**IHH et ISG**
Ces deux indices se combinent dans l'évaluation du risque (niveau d'impact et probabilité de survenance) :
* l'IHH donne le niveau d'impact => une forte concentration implique un fort impact si le risque est avéré
* l'ISG donne la probabilité de survenance => plus les pays sont instables (et donc plus l'ISG est élevé) et plus la survenance du risque est élevée
Toutefois, l'ISG s'adresse à un pays et l'IHH à une opération (soit pour un pays, soit pour un acteur) ; il faut donc calculer l'ISG combiné des pays intervenant pour une opération. Voici le calcul que l'on va utiliser :
ISG_combiné = (Somme des ISG des pays multipliée par leur part de marché) / Sommes de leur part de marché
Il faut donc produire ici le tableau des producteurs/fabricants/assembleurs (selon l'opération concernée) avec leur part de marché respective. Dans un premier temps, on se contentera de faire l'analyse pour les pays seulement. L'IHH des acteurs sera uniquement mentionné à titre informatif avec un commentaire léger.
On établit alors une matrice en mettant des poids (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ISG_combiné et de l'IHH
| ISG_combiné / IHH | Vert | Orange | Rouge |
| :-- | :-- | :-- | :-- |
| Vert | 1 | 2 | 3 |
| Orange | 2 | 4 | 6 |
| Rouge | 3 | 6 | 9 |
On peut alors dire que l'on classe en trois niveaux pour chaque opération :
* Vulnérabilité combinée élevée à critique : poids 6 et 9
* Vulnérabilité combinée moyenne : poids 3 et 4
* Vulnérabilité combinée faible : poids 1 et 2
**ICS et IVC**
Ces deux indices se combinent dans l'évaluation du risque :
* l'ICS donne le niveau d'impact => une faible substituabilité (et donc un ICS élevé) implique un fort impact si le risque est avéré ; l'ICS est associé à la relation entre un composant et un minerai
* l'IVC donne la probabilité de l'impact => une forte concurrence intersectorielle (IVC élevé) implique une plus forte probabilité de survenance
L'ICS et l'IVC sont au niveau des minerais.
Par simplification, on va faire un calcul d'un ICS_moyen d'un minerai comme étant la moyenne des ICS pour chacun des composants dans lesquels il intervient.
Il faut donc récupérer pour le minerai son tableau des ICS pour faire cette moyenne.
On établit alors une matrice en mettant des poids (Vert = 1, Orange = 2, Rouge = 3) et en faisant le produit des poids de l'ICS et de l'IVC.
| ICS_moyen / IVC | Vert | Orange | Rouge |
| :-- | :-- | :-- | :-- |
| Vert | 1 | 2 | 3 |
| Orange | 2 | 4 | 6 |
| Rouge | 3 | 6 | 9 |
On peut alors dire que l'on classe en trois niveaux pour chaaque minerai :
* Vulnérabilité combinée élevée à critique : poids 6 et 9
* Vulnérabilité combinée moyenne : poids 3 et 4
* Vulnérabilité combinée faible : poids 1 et 2
# Interdépendances
Il va falloir maintenant remonter toute la chaîne de fabrication pour faire la recherche des dépendances entre indices, indices combinés.
Un produit final est assemblé à partir de composants. Il y a donc une opération d'assemblage avec IHH et ISG_combiné
Un composant est fabriqué à partir de minerais. Il y a donc une opération de fabrication avec IHH et ISG_combiné.
Un minerai est extrait et traité. Il y a donc associé au minerai un ICS_moyen et un IVC, une opération d'extraction avec un IHH et un ISG_combiné et une opération de traitement avec un IHH et un ISG_combiné.
Il faut donc reprendre ici toutes les chaines du produit final au minerai en classant comme suit :
* Chaîne avec risque critique, elle comprend :
* au moins une vulnérabilité combinée élevée à critique
* Chaîne avec risque majeur, elle comprend :
* au moins trois vulnérabilités combinée moyennes
* Chaîne avec risque moyen, elle comprend :
* au moins une vulnérabilité combinée moyenne
# Compléments
Pour chacun des indices (avant de les combiner), il récupérer leur détail dans le corpus (ce qui est mis en annexe du rapport_final actuel). cela doit faire partie de l'analyse détaillée à fournir.

111
scripts/Réponses.md Normal file
View File

@ -0,0 +1,111 @@
Questions pour clarifier la mise en œuvre
1. **Structure du graphe DOT**: Le graphe DOT contient-il déjà les relations hiérarchiques (produit → composant → minerai)? Comment sont-elles représentées?
La structure du graphe est un digraph, donc avec une organisation hiérarchique :
Dans la suite, quand j'écris N0, N1, … cela veut dire que le nœud portant l'item a un attribut niveau 0, 1, …
Exemple :
LynasAdvanced_Malaisie_Traitement_Erbium [fillcolor="#d1e0ff",
label="Lynas Advanced Materials",
niveau=12];
LynasAdvanced_Malaisie_Traitement_Erbium est un N12. Important avec ce nom : on sait tout de suite que l'acteur est LynasAdvanced dont le nom réel est porté par le label (Lynas Advanced Materials), qu'il opère en Malaisie, pour faire le Traitement de l'Erbium.
* un produit final (N0) est associé à :
* un ou plusieurs composants (N1)
* aucune ou une opération d'assemblage (N10)
* un composant (N1) est associé à :
* un ou plusieurs minerais (N2)
* aucune ou une opération de fabrication (N10)
* un minerai (N2) est associé à :
* une opération d'extraction (N10)
* une opération de traitement (N10)
Exemple pour produit final (avec une seule chaîne) :
MaterielIA [fillcolor="#a0d6ff",
label="Matériel dédié IA",
niveau=0];
MaterielIA -> CarteMere;
CarteMere [fillcolor="#b3ffe0",
label="Carte mère",
niveau=1];
CarteMere -> Germanium [cout=0.6,
delai=0.6,
ics=0.64,
technique=0.7];
Germanium [fillcolor="#ffd699",
ivc=1,
label="Germanium - Semi-conducteurs, détecteurs infrarouge, fibre optique",
niveau=2];
Germanium -> Traitement_Germanium;
Traitement_Germanium [fillcolor="#ffd699",
ihh_acteurs=19,
ihh_pays=31,
label=Traitement,
niveau=10];
Traitement_Germanium -> Chine_Traitement_Germanium [color=purple,
fontcolor=purple,
label="50%",
poids=2];
Chine_Traitement_Germanium [fillcolor="#e6f2ff",
label=Chine,
niveau=11];
Chine_Traitement_Germanium -> YunnanGermanium_Chine_Traitement_Germanium [color=purple,
fontcolor=purple,
label="30%",
poids=2];
Chine_Traitement_Germanium -> Chine_geographique [color=darkgreen,
fontcolor=darkgreen];
YunnanChihong_Chine_Traitement_Germanium [fillcolor="#d1e0ff",
label="Yunnan Chihong Zinc",
niveau=12];
YunnanChihong_Chine_Traitement_Germanium -> Chine_geographique [color=darkgreen,
fontcolor=darkgreen];
Chaque opération (N10) (assemblage, fabrication, traitement, extraction) se décompose comme suit :
* l'opération elle-même (N10)
* un ou plusieurs pays (N11) où l'opération est réalisée
* pour chaque pays (N11, il est associé à :
* un ou plusieurs acteurs (N12) opérant dans le pays
* un pays géographique (N99)
2. **Calcul d'ISG_combiné**: Les parts de marché par pays sont-elles disponibles dans le graphe ou doivent-elles être extraites d'ailleurs?
Oui, le parts de marché sont disponble dans le graphe.
Exemple pour les pays :
Traitement_Germanium -> Chine_Traitement_Germanium [color=purple,
fontcolor=purple,
label="50%",
poids=2];
Chine_Traitement_Germanium [fillcolor="#e6f2ff",
label=Chine,
niveau=11];
La part de marché de Chine_Traitement_Germanium (et donc Chine que l'on récupère dans le label de Chine_Traitement_Germanium) est de 50% à récupérer dans le label de l'arête Traitement_Germanium -> Chine_Traitement_Germanium
Pour un acteur, on la récupère dans l'arête entre le pays et l'acteur :
Chine_Traitement_Germanium -> YunnanGermanium_Chine_Traitement_Germanium [color=purple,
fontcolor=purple,
label="30%",
poids=2];
La part de marché de YunnanGermanium_Chine_Traitement_Germanium est de 30% (label de l'arête).
3. **Traitement des matrices**: Souhaitez-vous que les matrices de vulnérabilité combinée soient:
- Calculées dynamiquement dans le script
- Présentées sous forme de tableaux dans le rapport
- Ou les deux?
Les matrices sont à présenter dans le rapport et donc calculées dynamiquement. Toutes les informations nécessaire sont dans le graphe pour faire ces calculs.
4. **Niveau de profondeur**: Pour chaque chaîne identifiée, quel niveau de détail souhaitez-vous dans le rapport? Uniquement les vulnérabilités combinées ou aussi le détail des indices individuels?
Les deux sont à présenter. En premier une partie factuelle avec les éléments individuels et leur explication et ensuite la combinaison.
5. **Approche structurelle**: Préférez-vous une organisation du rapport:
- Par niveau de risque (critique → majeur → moyen)
- Par produit/matière (en incluant tous les niveaux de risque pour chaque chaîne)
- Une combinaison des deux?
Il faut se rappeler que ce rapport est le point d'entrée pour l'IA qui aura pour objectif de textualiser tout cela et de proposer des scénarios de veille et de prévention.
Dans ce rapport factuel, il faut qu'il soit efficace pour l'IA et il viendra se joindre, de mon point de vue au rapport d'analyse que l'IA génèrera.

3
scripts/config.yml Normal file
View File

@ -0,0 +1,3 @@
graphe_path: ../graphe.dot
template_path: ./rapport_final.md
corpus_path: ../Corpus

View File

@ -0,0 +1,797 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour générer un rapport structuré d'analyse des vulnérabilités critiques
à partir d'un graphe DOT et d'un corpus documentaire.
Ce script remplace generate_template.py avec une structure optimisée
pour l'analyse des risques selon la méthodologie définie.
"""
import os
import sys
import json
from networkx.drawing.nx_agraph import read_dot
import yaml
from pathlib import Path
# Chemins de base
BASE_DIR = Path(__file__).resolve().parent
CORPUS_DIR = BASE_DIR.parent / "Corpus"
CONFIG_PATH = BASE_DIR.parent / "scripts/config.yml"
THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml"
def load_config(config_path, thresholds_path=THRESHOLDS_PATH):
"""Charge la configuration depuis les fichiers YAML."""
# Charger la configuration principale
if not os.path.exists(config_path):
print(f"Fichier de configuration introuvable: {config_path}")
sys.exit(1)
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
# Vérifier les chemins essentiels
required_paths = ['graphe_path', 'template_path', 'corpus_path']
for path in required_paths:
if path not in config:
print(f"Configuration incomplète: {path} manquant")
sys.exit(1)
# Convertir les chemins relatifs en chemins absolus
for path in required_paths:
config[path] = os.path.join(os.path.dirname(config_path), config[path])
# Charger les seuils
if os.path.exists(thresholds_path):
with open(thresholds_path, 'r', encoding='utf-8') as f:
thresholds = yaml.safe_load(f)
config['thresholds'] = thresholds.get('seuils', {})
else:
print(f"Fichier de seuils introuvable: {thresholds_path}")
# Valeurs par défaut si le fichier n'existe pas
config['thresholds'] = {
"IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}},
"ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 70}, "rouge": {"min": 70}},
"ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}},
"IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}}
}
return config
def determine_threshold_color(value, index_type, thresholds=None):
"""
Détermine la couleur du seuil en fonction du type d'indice et de sa valeur.
Utilise les seuils de config.yaml si disponibles.
"""
# Valeurs par défaut si les seuils ne sont pas fournis
default_thresholds = {
"IHH": {"vert": {"max": 15}, "orange": {"min": 15, "max": 25}, "rouge": {"min": 25}},
"ISG": {"vert": {"max": 40}, "orange": {"min": 40, "max": 70}, "rouge": {"min": 70}},
"ICS": {"vert": {"max": 0.30}, "orange": {"min": 0.30, "max": 0.60}, "rouge": {"min": 0.60}},
"IVC": {"vert": {"max": 5}, "orange": {"min": 5, "max": 15}, "rouge": {"min": 15}}
}
# Utiliser les seuils fournis ou les valeurs par défaut
thresholds = thresholds or default_thresholds
# Récupérer les seuils pour cet indice
if index_type in thresholds:
index_thresholds = thresholds[index_type]
# Déterminer la couleur
if "vert" in index_thresholds and "max" in index_thresholds["vert"] and \
index_thresholds["vert"]["max"] is not None and value < index_thresholds["vert"]["max"]:
suffix = get_suffix_for_index(index_type, "vert")
return f"VERT ({suffix})"
elif "orange" in index_thresholds and "min" in index_thresholds["orange"] and "max" in index_thresholds["orange"] and \
index_thresholds["orange"]["min"] is not None and index_thresholds["orange"]["max"] is not None and \
index_thresholds["orange"]["min"] <= value < index_thresholds["orange"]["max"]:
suffix = get_suffix_for_index(index_type, "orange")
return f"ORANGE ({suffix})"
elif "rouge" in index_thresholds and "min" in index_thresholds["rouge"] and \
index_thresholds["rouge"]["min"] is not None and value >= index_thresholds["rouge"]["min"]:
suffix = get_suffix_for_index(index_type, "rouge")
return f"ROUGE ({suffix})"
# Fallback à l'ancienne méthode si les seuils ne sont pas bien définis
if index_type == "IHH":
if value < 15:
return "VERT (Faible)"
elif value < 25:
return "ORANGE (Modérée)"
else:
return "ROUGE (Élevée)"
elif index_type == "ISG":
if value < 40:
return "VERT (Stable)"
elif value < 60:
return "ORANGE (Intermédiaire)"
else:
return "ROUGE (Instable)"
elif index_type == "ICS":
if value < 0.3:
return "VERT (Facile)"
elif value < 0.6:
return "ORANGE (Moyenne)"
else:
return "ROUGE (Difficile)"
elif index_type == "IVC":
if value < 5:
return "VERT (Faible)"
elif value < 15:
return "ORANGE (Modérée)"
else:
return "ROUGE (Forte)"
return "Non déterminé"
def get_suffix_for_index(index_type, color):
"""Retourne le suffixe approprié pour chaque indice et couleur."""
suffixes = {
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
"ISG": {"vert": "Stable", "orange": "Intermédiaire", "rouge": "Instable"},
"ICS": {"vert": "Facile", "orange": "Moyenne", "rouge": "Difficile"},
"IVC": {"vert": "Faible", "orange": "Modérée", "rouge": "Forte"}
}
if index_type in suffixes and color in suffixes[index_type]:
return suffixes[index_type][color]
return "Non déterminé"
def parse_graph(config):
"""
Charge et analyse le graphe DOT.
Extrait les nœuds, leurs attributs et leurs relations.
"""
graphe_path = config['graphe_path']
if not os.path.exists(graphe_path):
print(f"Fichier de graphe introuvable: {graphe_path}")
sys.exit(1)
try:
# Charger le graphe avec NetworkX
graph = read_dot(graphe_path)
# Convertir les attributs en types appropriés
for node, attrs in graph.nodes(data=True):
for key, value in list(attrs.items()):
# Convertir les valeurs numériques
if key in ['niveau', 'ihh_acteurs', 'ihh_pays', 'isg', 'ivc']:
try:
if key in ['isg', 'ivc', 'ihh_acteurs', 'ihh_pays', 'niveau']:
attrs[key] = int(value.strip('"'))
else:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
# Garder la valeur originale si la conversion échoue
pass
elif key == 'label':
# Nettoyer les guillemets des étiquettes
attrs[key] = value.strip('"')
# Convertir les attributs des arêtes
for u, v, attrs in graph.edges(data=True):
for key, value in list(attrs.items()):
if key in ['ics', 'cout', 'delai', 'technique']:
try:
attrs[key] = float(value.strip('"'))
except (ValueError, TypeError):
pass
elif key == 'label' and '%' in value:
# Extraire le pourcentage
try:
percentage = value.strip('"').replace('%', '')
attrs['percentage'] = float(percentage)
except (ValueError, TypeError):
pass
return graph
except Exception as e:
print(f"Erreur lors de l'analyse du graphe: {str(e)}")
sys.exit(1)
def analyze_geopolitical_stability(graph):
"""
Analyse la stabilité géopolitique des pays dans le graphe.
Identifie les pays et leurs indices ISG.
"""
geo_countries = {}
# Identifier les nœuds de pays géographiques (niveau 99)
for node, attrs in graph.nodes(data=True):
if attrs.get('niveau') == 99:
country_name = attrs.get('label', node)
isg_value = attrs.get('isg', 0)
geo_countries[node] = {
'name': country_name,
'isg': isg_value,
'color': determine_threshold_color(isg_value, "ISG")
}
return geo_countries
def find_real_path(element, file_type, config, graph=None):
"""
Recherche le chemin réel d'un fichier dans le corpus.
Adapté de l'ancien generate_template.py.
"""
corpus_path = config['corpus_path']
# Définir les motifs de recherche selon le type de fichier
if file_type == "introduction":
patterns = ["_intro.md"]
elif file_type == "extraction":
patterns = ["extraction", "Extraction"]
elif file_type == "traitement":
patterns = ["traitement", "Traitement"]
elif file_type == "isg":
patterns = ["isg", "ISG", "stabilité", "Stabilité"]
elif file_type == "ihh_extraction":
patterns = ["ihh.*extraction", "IHH.*Extraction", "Herfindahl.*extraction"]
elif file_type == "ihh_traitement":
patterns = ["ihh.*traitement", "IHH.*Traitement", "Herfindahl.*traitement"]
elif file_type == "ics":
patterns = ["ics", "ICS", "Substituabilité", "substituabilité"]
elif file_type == "ivc":
patterns = ["ivc", "IVC", "Concurrence", "concurrence"]
else:
patterns = []
# Préparer le nom de l'élément pour la recherche
element_name = element
if graph and element in graph.nodes and "label" in graph.nodes[element]:
element_name = graph.nodes[element]["label"]
# Éliminer les caractères spéciaux et normaliser
element_name = element_name.replace("_", " ").lower()
# Parcourir le corpus pour trouver les fichiers correspondants
matches = []
for root, dirs, files in os.walk(corpus_path):
for file in files:
if file.endswith(".md"):
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, corpus_path)
# Vérifier si le fichier correspond aux motifs et à l'élément
file_matches = True
for pattern in patterns:
if pattern.lower() not in relative_path.lower():
file_matches = False
break
if file_matches and element_name in relative_path.lower():
matches.append(relative_path)
# Retourner le premier fichier correspondant trouvé
if matches:
return matches[0]
return None
def find_ics_path(element, config, graph=None):
"""
Recherche le chemin d'un fichier ICS pour un élément.
Adapté de l'ancien generate_template.py.
"""
return find_real_path(element, "ics", config, graph)
def find_ivc_path(element, config, graph=None):
"""
Recherche le chemin d'un fichier IVC pour un élément.
Adapté de l'ancien generate_template.py.
"""
return find_real_path(element, "ivc", config, graph)
def extract_all_data(graph, geo_countries):
"""
Extrait toutes les données pertinentes du graphe structuré.
"""
data = {
"products": {}, # Produits finaux (N0)
"components": {}, # Composants (N1)
"minerals": {}, # Minerais (N2)
"operations": {}, # Opérations (N10)
"countries": {}, # Pays (N11)
"actors": {} # Acteurs (N12)
}
# Parcourir tous les nœuds
for node, attrs in graph.nodes(data=True):
level = attrs.get('niveau', -1)
# Classifier par niveau
if level == 0: # Produit
data["products"][node] = {
"label": attrs.get('label', node),
"components": []
}
elif level == 1: # Composant
data["components"][node] = {
"label": attrs.get('label', node),
"minerals": [],
"ics_values": {}
}
elif level == 2: # Minerai
data["minerals"][node] = {
"label": attrs.get('label', node),
"ivc": attrs.get('ivc', 0),
"extraction": None,
"treatment": None,
"ics_values": {}
}
elif level == 10: # Opération
data["operations"][node] = {
"label": attrs.get('label', node),
"ihh_acteurs": attrs.get('ihh_acteurs', 0),
"ihh_pays": attrs.get('ihh_pays', 0),
"countries": {}
}
elif level == 11: # Pays
data["countries"][node] = {
"label": attrs.get('label', node),
"actors": {},
"geo_country": None,
"market_share": 0
}
elif level == 12: # Acteur
data["actors"][node] = {
"label": attrs.get('label', node),
"market_share": 0
}
# Parcourir les arêtes pour établir les relations et parts de marché
for source, target, edge_attrs in graph.edges(data=True):
source_level = graph.nodes[source].get('niveau', -1)
target_level = graph.nodes[target].get('niveau', -1)
# Extraire part de marché
market_share = 0
if 'percentage' in edge_attrs:
market_share = edge_attrs['percentage']
elif 'label' in edge_attrs and '%' in edge_attrs['label']:
try:
market_share = float(edge_attrs['label'].strip('"').replace('%', ''))
except (ValueError, TypeError):
pass
# Relations produit → composant
if source_level == 0 and target_level == 1:
data["products"][source]["components"].append(target)
# Relations composant → minerai avec ICS
elif source_level == 1 and target_level == 2:
data["components"][source]["minerals"].append(target)
# Stocker l'ICS s'il est présent
if 'ics' in edge_attrs:
ics_value = edge_attrs['ics']
data["components"][source]["ics_values"][target] = ics_value
data["minerals"][target]["ics_values"][source] = ics_value
# Relations minerai → opération
elif source_level == 2 and target_level == 10:
op_label = graph.nodes[target].get('label', '').lower()
if 'extraction' in op_label:
data["minerals"][source]["extraction"] = target
elif 'traitement' in op_label:
data["minerals"][source]["treatment"] = target
# Relations opération → pays avec part de marché
elif source_level == 10 and target_level == 11:
data["operations"][source]["countries"][target] = market_share
data["countries"][target]["market_share"] = market_share
# Relations pays → acteur avec part de marché
elif source_level == 11 and target_level == 12:
data["countries"][source]["actors"][target] = market_share
data["actors"][target]["market_share"] = market_share
# Relations pays → pays géographique
elif source_level == 11 and target_level == 99:
data["countries"][source]["geo_country"] = target
return data
def calculate_combined_vulnerabilities(data, geo_countries, config):
"""
Calcule les vulnérabilités combinées selon la méthodologie définie.
Utilise les seuils définis dans la configuration.
"""
results = {
"ihh_isg_combined": {}, # Pour chaque opération
"ics_ivc_combined": {}, # Pour chaque minerai
"chains_classification": {
"critical": [],
"major": [],
"medium": []
}
}
# 1. Calculer ISG_combiné pour chaque opération
for op_id, operation in data["operations"].items():
isg_weighted_sum = 0
total_share = 0
# Parcourir chaque pays impliqué dans l'opération
for country_id, share in operation["countries"].items():
country = data["countries"][country_id]
geo_country_id = country.get("geo_country")
if geo_country_id and geo_country_id in geo_countries:
isg_value = geo_countries[geo_country_id]["isg"]
isg_weighted_sum += isg_value * share
total_share += share
# Calculer la moyenne pondérée
isg_combined = 0
if total_share > 0:
isg_combined = isg_weighted_sum / total_share
# Déterminer couleurs et poids
ihh_value = operation["ihh_pays"]
ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds'))
isg_color = determine_threshold_color(isg_combined, "ISG", config.get('thresholds'))
# Mapper les couleurs aux poids
weight_map = {
"VERT (Faible)": 1, "VERT (Stable)": 1, "VERT (Facile)": 1,
"ORANGE (Modérée)": 2, "ORANGE (Intermédiaire)": 2, "ORANGE (Moyenne)": 2,
"ROUGE (Élevée)": 3, "ROUGE (Instable)": 3, "ROUGE (Difficile)": 3, "ROUGE (Forte)": 3
}
ihh_weight = weight_map.get(ihh_color, 1)
isg_weight = weight_map.get(isg_color, 1)
combined_weight = ihh_weight * isg_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ihh_isg_combined"][op_id] = {
"ihh_value": ihh_value,
"ihh_color": ihh_color,
"isg_combined": isg_combined,
"isg_color": isg_color,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 2. Calculer ICS_moyen pour chaque minerai
for mineral_id, mineral in data["minerals"].items():
ics_values = list(mineral["ics_values"].values())
ics_average = 0
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
ivc_value = mineral.get("ivc", 0)
# Déterminer couleurs et poids
ics_color = determine_threshold_color(ics_average, "ICS", config.get('thresholds'))
ivc_color = determine_threshold_color(ivc_value, "IVC", config.get('thresholds'))
ics_weight = weight_map.get(ics_color, 1)
ivc_weight = weight_map.get(ivc_color, 1)
combined_weight = ics_weight * ivc_weight
# Déterminer vulnérabilité combinée
if combined_weight in [6, 9]:
vulnerability = "ÉLEVÉE à CRITIQUE"
elif combined_weight in [3, 4]:
vulnerability = "MOYENNE"
else: # 1, 2
vulnerability = "FAIBLE"
# Stocker résultats
results["ics_ivc_combined"][mineral_id] = {
"ics_average": ics_average,
"ics_color": ics_color,
"ivc_value": ivc_value,
"ivc_color": ivc_color,
"combined_weight": combined_weight,
"vulnerability": vulnerability
}
# 3. Classifier les chaînes
for product_id, product in data["products"].items():
for component_id in product["components"]:
component = data["components"][component_id]
for mineral_id in component["minerals"]:
mineral = data["minerals"][mineral_id]
# Collecter toutes les vulnérabilités dans cette chaîne
chain_vulnerabilities = []
# Vulnérabilité ICS+IVC du minerai
if mineral_id in results["ics_ivc_combined"]:
chain_vulnerabilities.append(results["ics_ivc_combined"][mineral_id]["vulnerability"])
# Vulnérabilité IHH+ISG extraction
if mineral["extraction"] and mineral["extraction"] in results["ihh_isg_combined"]:
chain_vulnerabilities.append(results["ihh_isg_combined"][mineral["extraction"]]["vulnerability"])
# Vulnérabilité IHH+ISG traitement
if mineral["treatment"] and mineral["treatment"] in results["ihh_isg_combined"]:
chain_vulnerabilities.append(results["ihh_isg_combined"][mineral["treatment"]]["vulnerability"])
# Classifier la chaîne
chain_info = {
"product": product_id,
"component": component_id,
"mineral": mineral_id,
"vulnerabilities": chain_vulnerabilities
}
if "ÉLEVÉE à CRITIQUE" in chain_vulnerabilities:
results["chains_classification"]["critical"].append(chain_info)
elif chain_vulnerabilities.count("MOYENNE") >= 3:
results["chains_classification"]["major"].append(chain_info)
elif "MOYENNE" in chain_vulnerabilities:
results["chains_classification"]["medium"].append(chain_info)
return results
def generate_structured_template(data, vulnerabilities, geo_countries, config, graph=None):
"""
Génère un template structuré pour le rapport d'analyse des risques.
"""
template = []
# 1. Titre principal
template.append("# Évaluation des vulnérabilités critiques\n")
# 2. Introduction
template.append("## Introduction\n")
# Ajouter référence à l'introduction du corpus
intro_path = find_real_path("introduction", "introduction", config, graph)
if intro_path:
template.append(f"Corpus/{intro_path}\n")
# 3. Méthodologie
template.append("## Méthodologie d'analyse des risques\n")
template.append("### Indices et seuils\n")
template.append("* IHH (Herfindahl-Hirschmann) : concentration géographiques ou industrielle d'une opération\n")
template.append(" * Seuils : <15 = Vert (Faible), 15-25 = Orange (Modérée), >25 = Rouge (Élevée)\n")
template.append("* ICS (Criticité de Substituabilité) : capacité à remplacer / substituer un élément dans le composant ou le procédé\n")
template.append(" * Seuils : <0.3 = Vert (Facile), 0.3-0.6 = Orange (Moyenne), >0.6 = Rouge (Difficile)\n")
template.append("* ISG (Stabilité Géopolitique) : stabilité des pays\n")
template.append(" * Seuils : <40 = Vert (Stable), 40-60 = Orange, >60 = Rouge (Instable)\n")
template.append("* IVC (Vulnérabilité de Concurrence) : pression concurrentielle avec d'autres secteurs que le numérique\n")
template.append(" * Seuils : <5 = Vert (Faible), 5-15 = Orange (Modérée), >15 = Rouge (Forte)\n")
template.append("### Matrices de vulnérabilité combinée\n")
template.append("#### IHH et ISG\n")
template.append("| ISG_combiné / IHH | Vert | Orange | Rouge |\n")
template.append("| :-- | :-- | :-- | :-- |\n")
template.append("| Vert | 1 (Faible) | 2 (Faible) | 3 (Moyenne) |\n")
template.append("| Orange | 2 (Faible) | 4 (Moyenne) | 6 (Élevée à Critique) |\n")
template.append("| Rouge | 3 (Moyenne) | 6 (Élevée à Critique) | 9 (Élevée à Critique) |\n")
template.append("#### ICS et IVC\n")
template.append("| ICS_moyen / IVC | Vert | Orange | Rouge |\n")
template.append("| :-- | :-- | :-- | :-- |\n")
template.append("| Vert | 1 (Faible) | 2 (Faible) | 3 (Moyenne) |\n")
template.append("| Orange | 2 (Faible) | 4 (Moyenne) | 6 (Élevée à Critique) |\n")
template.append("| Rouge | 3 (Moyenne) | 6 (Élevée à Critique) | 9 (Élevée à Critique) |\n")
# 4. Éléments factuels
template.append("## Éléments factuels\n")
# 4.1 Analyse par produit/composant/minerai
for product_id, product in data["products"].items():
template.append(f"### {product['label']}\n")
for component_id in product["components"]:
component = data["components"][component_id]
template.append(f"#### {component['label']}\n")
for mineral_id in component["minerals"]:
mineral = data["minerals"][mineral_id]
template.append(f"##### {mineral['label']}\n")
# 4.1.1 Extraction
if mineral["extraction"]:
extraction = data["operations"][mineral["extraction"]]
template.append("##### Extraction\n")
template.append(f"###### Concentration\n")
# IHH Extraction
ihh_value = extraction["ihh_pays"]
ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds'))
template.append(f"IHH pays: {ihh_value} - {ihh_color}\n")
# Référence au fichier IHH extraction
ihh_extraction_path = find_real_path(mineral_id, "ihh_extraction", config, graph)
if ihh_extraction_path:
template.append(f"Corpus/{ihh_extraction_path}\n")
# Liste des pays et acteurs
template.append("**Principaux pays:**\n")
for country_id, share in extraction["countries"].items():
country = data["countries"][country_id]
template.append(f"- {country['label']}: {share}%\n")
# 4.1.2 Stabilité Géopolitique (après Extraction)
template.append("#### Stabilité Géopolitique\n")
# Référence au fichier ISG
isg_path = find_real_path(mineral_id, "isg", config, graph)
if isg_path:
template.append(f"Corpus/{isg_path}\n")
# 4.1.3 Traitement
if mineral["treatment"]:
treatment = data["operations"][mineral["treatment"]]
template.append("#### Traitement\n")
template.append("##### Concentration\n")
# IHH Traitement
ihh_value = treatment["ihh_pays"]
ihh_color = determine_threshold_color(ihh_value, "IHH", config.get('thresholds'))
template.append(f"IHH pays: {ihh_value} - {ihh_color}\n")
# Référence au fichier IHH traitement
ihh_treatment_path = find_real_path(mineral_id, "ihh_traitement", config, graph)
if ihh_treatment_path:
template.append(f"Corpus/{ihh_treatment_path}\n")
# Liste des pays et acteurs
template.append("**Principaux pays:**\n")
for country_id, share in treatment["countries"].items():
country = data["countries"][country_id]
template.append(f"- {country['label']}: {share}%\n")
# 4.1.4 Stabilité Géopolitique (après Traitement)
template.append("#### Stabilité Géopolitique\n")
# Même référence au fichier ISG
if isg_path:
template.append(f"Corpus/{isg_path}\n")
# 4.1.5 Substituabilité
template.append("#### Substituabilité\n")
# Calcul de l'ICS moyen pour ce minerai
ics_values = list(mineral["ics_values"].values())
if ics_values:
ics_average = sum(ics_values) / len(ics_values)
ics_color = determine_threshold_color(ics_average, "ICS", config.get('thresholds'))
template.append(f"ICS moyen: {ics_average:.2f} - {ics_color}\n")
# Liste des composants avec leur ICS
for comp_id, ics_value in mineral["ics_values"].items():
comp_label = data["components"][comp_id]["label"]
ics_color = determine_threshold_color(ics_value, "ICS", config.get('thresholds'))
template.append(f"###### {comp_label} -> {mineral['label']} - Coefficient: {ics_value:.2f}\n")
# Référence au fichier ICS spécifique
ics_path = find_ics_path(mineral_id, config, graph)
if ics_path:
template.append(f"Corpus/{ics_path}\n")
# 4.1.6 Concurrence
template.append("#### Concurrence\n")
# IVC
ivc_value = mineral.get("ivc", 0)
ivc_color = determine_threshold_color(ivc_value, "IVC", config.get('thresholds'))
template.append(f"IVC: {ivc_value} - {ivc_color}\n")
# Référence au fichier IVC
ivc_path = find_ivc_path(mineral_id, config, graph)
if ivc_path:
template.append(f"Corpus/{ivc_path}\n")
template.append("##### Secteurs concurrents\n")
# 5. Analyse des vulnérabilités combinées
template.append("## Analyse des vulnérabilités combinées\n")
# 5.1 IHH+ISG par opération
template.append("### Vulnérabilités IHH+ISG par opération\n")
template.append("| Opération | IHH | ISG combiné | Vulnérabilité combinée |\n")
template.append("|-----------|-----|------------|------------------------|\n")
for op_id, combined in vulnerabilities["ihh_isg_combined"].items():
operation_name = data["operations"][op_id]["label"]
ihh_value = combined["ihh_value"]
isg_combined = combined["isg_combined"]
vulnerability = combined["vulnerability"]
template.append(f"| {operation_name} | {ihh_value} ({combined['ihh_color']}) | {isg_combined:.2f} ({combined['isg_color']}) | {vulnerability} |\n")
# 5.2 ICS+IVC par minerai
template.append("\n### Vulnérabilités ICS+IVC par minerai\n")
template.append("| Minerai | ICS moyen | IVC | Vulnérabilité combinée |\n")
template.append("|---------|-----------|-----|------------------------|\n")
for mineral_id, combined in vulnerabilities["ics_ivc_combined"].items():
mineral_name = data["minerals"][mineral_id]["label"]
ics_average = combined["ics_average"]
ivc_value = combined["ivc_value"]
vulnerability = combined["vulnerability"]
template.append(f"| {mineral_name} | {ics_average:.2f} ({combined['ics_color']}) | {ivc_value} ({combined['ivc_color']}) | {vulnerability} |\n")
# 6. Classification des chaînes
template.append("\n## Classification des chaînes par niveau de risque\n")
# 6.1 Chaînes critiques
template.append("### Chaînes à risque critique\n")
if vulnerabilities["chains_classification"]["critical"]:
for chain in vulnerabilities["chains_classification"]["critical"]:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"* {product_name}{component_name}{mineral_name}\n")
template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n")
else:
template.append("Aucune chaîne à risque critique identifiée.\n\n")
# 6.2 Chaînes majeures
template.append("### Chaînes à risque majeur\n")
if vulnerabilities["chains_classification"]["major"]:
for chain in vulnerabilities["chains_classification"]["major"]:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"* {product_name}{component_name}{mineral_name}\n")
template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n")
else:
template.append("Aucune chaîne à risque majeur identifiée.\n\n")
# 6.3 Chaînes moyennes
template.append("### Chaînes à risque moyen\n")
if vulnerabilities["chains_classification"]["medium"]:
for chain in vulnerabilities["chains_classification"]["medium"]:
product_name = data["products"][chain["product"]]["label"]
component_name = data["components"][chain["component"]]["label"]
mineral_name = data["minerals"][chain["mineral"]]["label"]
template.append(f"* {product_name}{component_name}{mineral_name}\n")
template.append(" * Vulnérabilités: " + ", ".join(chain["vulnerabilities"]) + "\n\n")
else:
template.append("Aucune chaîne à risque moyen identifiée.\n\n")
return "\n".join(template)
def write_template(template, config):
"""Écrit le template généré dans le fichier spécifié."""
template_path = config['template_path']
with open(template_path, 'w', encoding='utf-8') as f:
f.write(template)
print(f"Template généré avec succès: {template_path}")
def main():
"""Fonction principale du script."""
# Charger la configuration
config = load_config(CONFIG_PATH)
# Analyser le graphe
graph = parse_graph(config)
# Analyser la stabilité géopolitique
geo_countries = analyze_geopolitical_stability(graph)
# Extraire toutes les données
data = extract_all_data(graph, geo_countries)
# Calculer les vulnérabilités combinées
vulnerabilities = calculate_combined_vulnerabilities(data, geo_countries, config)
# Générer le template structuré
template = generate_structured_template(data, vulnerabilities, geo_countries, config, graph)
# Écrire le template dans le fichier
write_template(template, config)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,292 @@
import requests
import time
import argparse
import json
import sys
from pathlib import Path
from typing import List, Dict, Optional
from enum import Enum
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
DEFAULT_INPUT_PATH = "rapport_final.md"
DEFAULT_OUTPUT_PATH = "rapport_analyse_genere.md"
MAX_CONTEXT_LENGTH = 3000 # Taille maximale du contexte en caractères
# Stratégies de gestion du contexte
class ContextStrategy(str, Enum):
NONE = "none" # Pas de contexte
TRUNCATE = "truncate" # Tronquer le contexte
SUMMARIZE = "summarize" # Résumer le contexte
LATEST = "latest" # Uniquement la dernière section
# Les 5 étapes de génération du rapport
steps = [
{
"title": "Analyse détaillée",
"prompt": "Vous rédigez la section 'Analyse détaillée' d'un rapport structuré en cinq parties :\n"
"1. Synthèse\n2. Analyse détaillée\n3. Interdépendances\n4. Points de vigilance\n5. Conclusion\n\n"
"Votre objectif est d'analyser les vulnérabilités critiques, élevées et modérées présentes dans le rapport ingéré.\n"
"Structurez votre réponse par niveau de criticité, et intégrez les indices IHH, ICS, ISG, IVC de façon fluide dans le texte.\n"
"N'incluez pas de synthèse ni de conclusion."
},
{
"title": "Interdépendances",
"prompt": "Vous rédigez la section 'Interdépendances'. Identifiez les chaînes de vulnérabilités croisées,\n"
"les effets en cascade, et les convergences vers des produits numériques communs.\n"
"N'introduisez pas de recommandations ou d'évaluations globales."
},
{
"title": "Points de vigilance",
"prompt": "Vous rédigez la section 'Points de vigilance'. Sur la base des analyses précédentes,\n"
"proposez une liste d'indicateurs à surveiller, avec explication courte pour chacun."
},
{
"title": "Synthèse",
"prompt": "Vous rédigez la 'Synthèse'. Résumez les vulnérabilités majeures, les effets en cascade et les indicateurs critiques.\n"
"Ton narratif, percutant, orienté décideur."
},
{
"title": "Conclusion",
"prompt": "Vous rédigez la 'Conclusion'. Proposez des scénarios d'impact avec : déclencheur, horizon, gravité, conséquences."
}
]
def check_api_availability() -> bool:
"""Vérifie si l'API PrivateGPT est disponible"""
try:
response = requests.get(f"{PGPT_URL}/health")
if response.status_code == 200:
print("API PrivateGPT disponible")
return True
else:
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
except requests.RequestException as e:
print(f"❌ Erreur de connexion à l'API PrivateGPT: {e}")
return False
def ingest_document(file_path: Path) -> bool:
"""Ingère un document dans PrivateGPT"""
try:
with open(file_path, "rb") as f:
files = {"file": (file_path.name, f, "text/markdown")}
response = requests.post(f"{API_URL}/ingest/file", files=files)
response.raise_for_status()
print(f"Document '{file_path}' ingéré avec succès")
return True
except FileNotFoundError:
print(f"❌ Fichier '{file_path}' introuvable")
return False
except requests.RequestException as e:
print(f"❌ Erreur lors de l'ingestion du document: {e}")
return False
def get_context(sections: List[Dict[str, str]], strategy: ContextStrategy, max_length: int) -> str:
"""Génère le contexte selon la stratégie choisie"""
if not sections or strategy == ContextStrategy.NONE:
return ""
# Stratégie: uniquement la dernière section
if strategy == ContextStrategy.LATEST:
latest = sections[-1]
context = f"# {latest['title']}\n{latest['output']}"
if len(context) > max_length:
context = context[:max_length] + "..."
print(f"Contexte basé sur la dernière section: {latest['title']} ({len(context)} caractères)")
return context
# Stratégie: tronquer toutes les sections
if strategy == ContextStrategy.TRUNCATE:
full_context = "\n\n".join([f"# {s['title']}\n{s['output']}" for s in sections])
if len(full_context) > max_length:
truncated = full_context[:max_length] + "..."
print(f"Contexte tronqué à {max_length} caractères")
return truncated
return full_context
# Stratégie: résumer intelligemment
if strategy == ContextStrategy.SUMMARIZE:
try:
# Construire un prompt pour demander un résumé des sections précédentes
sections_text = "\n\n".join([f"# {s['title']}\n{s['output']}" for s in sections])
# Limiter la taille du texte à résumer si nécessaire
if len(sections_text) > max_length * 2:
sections_text = sections_text[:max_length * 2] + "..."
summary_prompt = {
"messages": [
{"role": "system", "content": "Vous êtes un assistant spécialisé dans le résumé concis. Créez un résumé très court mais informatif qui conserve les points clés."},
{"role": "user", "content": f"Résumez les sections suivantes en {max_length // 10} mots maximum, en conservant les idées principales et les points essentiels:\n\n{sections_text}"}
],
"temperature": 0.1,
"stream": False
}
# Envoyer la requête pour obtenir un résumé
response = requests.post(
f"{API_URL}/chat/completions",
json=summary_prompt,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire et retourner le résumé
result = response.json()
summary = result["choices"][0]["message"]["content"]
print(f"Résumé de contexte généré ({len(summary)} caractères)")
return summary
except Exception as e:
print(f"Impossible de générer un résumé du contexte: {e}")
# En cas d'échec, revenir à la stratégie de troncature
return get_context(sections, ContextStrategy.TRUNCATE, max_length)
def generate_text(prompt: str, previous_context: str = "", use_context: bool = True, retry_on_error: bool = True) -> Optional[str]:
"""Génère du texte avec l'API PrivateGPT"""
try:
# Préparer le prompt avec le contexte précédent si disponible et demandé
full_prompt = prompt
if previous_context and use_context:
full_prompt += "\n\nContexte précédent (informations des sections déjà générées) :\n" + previous_context
# Définir les paramètres de la requête
payload = {
"messages": [
{"role": "system", "content": """
Vous êtes un assistant stratégique expert chargé de produire des rapports destinés à des décideurs de haut niveau (COMEX, directions risques, stratèges politiques ou industriels).
Objectif : Analyser les vulnérabilités systémiques dans une chaîne de valeur numérique décrite dans le fichier rapport_final.md en vous appuyant exclusivement sur ce rapport structuré fourni, et produire un rapport narratif percutant orienté décision.
"""},
{"role": "user", "content": full_prompt}
],
"use_context": True,
"temperature": 0.2, # Température réduite pour plus de cohérence
"stream": False
}
# Envoyer la requête
response = requests.post(
f"{API_URL}/chat/completions",
json=payload,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire la réponse générée
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Détails: {e.response.text}")
return None
def main(input_path: str, output_path: str, context_strategy: ContextStrategy = ContextStrategy.SUMMARIZE, context_length: int = MAX_CONTEXT_LENGTH):
"""Fonction principale qui exécute le processus complet"""
# Vérifier la disponibilité de l'API
if not check_api_availability():
sys.exit(1)
# Convertir les chemins en objets Path
input_file = Path(input_path)
output_file = Path(output_path)
# Ingérer le document
if not ingest_document(input_file):
sys.exit(1)
# Récupérer la valeur du délai depuis args
delay = args.delay if 'args' in globals() else 5
# Attendre que l'ingestion soit complètement traitée
print(f"Attente du traitement de l'ingestion pendant {delay} secondes...")
time.sleep(delay)
print(f"Stratégie de contexte: {context_strategy.value}, taille max: {context_length} caractères")
# Générer chaque section du rapport
step_outputs = []
for i, step in enumerate(steps):
print(f"\nÉtape {i+1}/{len(steps)}: {step['title']}")
# Préparer le contexte selon la stratégie choisie
previous = get_context(step_outputs, context_strategy, context_length) if step_outputs else ""
# Générer le texte pour cette étape
max_retries = 3 if context_strategy != ContextStrategy.NONE else 1
retry_count = 0
output = None
while retry_count < max_retries and output is None:
try:
# Si ce n'est pas la première tentative et que nous avons une stratégie de contexte
if retry_count > 0 and context_strategy != ContextStrategy.NONE:
print(f"Tentative {retry_count+1}/{max_retries} avec une stratégie de contexte réduite")
# Réduire la taille du contexte à chaque tentative
reduced_context_length = context_length // (retry_count + 1)
fallback_strategy = ContextStrategy.LATEST if retry_count == 1 else ContextStrategy.NONE
previous = get_context(step_outputs, fallback_strategy, reduced_context_length)
# Générer le texte
output = generate_text(step["prompt"], previous, context_strategy != ContextStrategy.NONE)
except Exception as e:
print(f"⚠️ Erreur lors de la génération: {e}")
retry_count += 1
time.sleep(1) # Pause avant de réessayer
if output is None:
retry_count += 1
if output:
step_outputs.append({"title": step["title"], "output": output})
print(f"Section '{step['title']}' générée ({len(output)} caractères)")
else:
print(f"❌ Échec de la génération pour '{step['title']}'")
# Vérifier si nous avons généré toutes les sections
if len(step_outputs) != len(steps):
print(f"⚠️ Attention: Seules {len(step_outputs)}/{len(steps)} sections ont été générées")
# Écrire le rapport final dans l'ordre souhaité (Synthèse en premier)
# Réordonner les sections pour que la synthèse soit en premier
ordered_outputs = sorted(step_outputs, key=lambda x: 0 if x["title"] == "Synthèse" else
1 if x["title"] == "Analyse détaillée" else
2 if x["title"] == "Interdépendances" else
3 if x["title"] == "Points de vigilance" else 4)
# Assembler le rapport
report_text = "\n\n".join(f"# {s['title']}\n\n{s['output']}" for s in ordered_outputs)
# Écrire le rapport dans un fichier
try:
output_file.write_text(report_text, encoding="utf-8")
print(f"\nRapport final généré dans '{output_file}'")
except IOError as e:
print(f"❌ Erreur lors de l'écriture du fichier de sortie: {e}")
if __name__ == "__main__":
# Définir les arguments en ligne de commande
parser = argparse.ArgumentParser(description="Générer un rapport d'analyse structuré avec PrivateGPT")
parser.add_argument("-i", "--input", type=str, default=DEFAULT_INPUT_PATH,
help=f"Chemin du fichier d'entrée (défaut: {DEFAULT_INPUT_PATH})")
parser.add_argument("-o", "--output", type=str, default=DEFAULT_OUTPUT_PATH,
help=f"Chemin du fichier de sortie (défaut: {DEFAULT_OUTPUT_PATH})")
parser.add_argument("--context", type=str, choices=[s.value for s in ContextStrategy], default=ContextStrategy.SUMMARIZE.value,
help=f"Stratégie de gestion du contexte (défaut: {ContextStrategy.SUMMARIZE.value})")
parser.add_argument("--context-length", type=int, default=MAX_CONTEXT_LENGTH,
help=f"Taille maximale du contexte en caractères (défaut: {MAX_CONTEXT_LENGTH})")
parser.add_argument("--delay", type=int, default=5,
help="Délai d'attente en secondes après l'ingestion (défaut: 5)")
args = parser.parse_args()
# Exécuter le programme principal avec les options configurées
context_strategy = ContextStrategy(args.context)
main(args.input, args.output, context_strategy, args.context_length)

574
scripts/generer_analyse.py Normal file
View File

@ -0,0 +1,574 @@
import requests
import time
import argparse
import json
import sys
import uuid
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from enum import Enum
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
DEFAULT_INPUT_PATH = "rapport_final.md"
DEFAULT_OUTPUT_PATH = "rapport_analyse_genere.md"
MAX_CONTEXT_LENGTH = 3000 # Taille maximale du contexte en caractères
TEMP_DIR = Path("temp_sections") # Répertoire pour les fichiers intermédiaires
# Stratégies de gestion du contexte
class ContextStrategy(str, Enum):
NONE = "none" # Pas de contexte
TRUNCATE = "truncate" # Tronquer le contexte
SUMMARIZE = "summarize" # Résumer le contexte
LATEST = "latest" # Uniquement la dernière section
FILE = "file" # Utiliser des fichiers ingérés (meilleure option)
# Les 5 étapes de génération du rapport
steps = [
{
"title": "Analyse Principale",
"prompt": """Vous êtes un assistant stratégique expert chargé de produire des rapports destinés à des décideurs de haut niveau (COMEX, directions risques, stratèges politiques ou industriels).
Objectif : Analyser les vulnérabilités systémiques dans une chaîne de valeur numérique décrite dans le fichier rapport_final.md en vous appuyant exclusivement sur ce rapport structuré fourni.
Votre tâche est de développer les sections suivantes en respectant toutes les contraintes décrites :
1. Analyse détaillée (vulnérabilités critiques modérées)
2. Interdépendances (effets en cascade)
3. Points de vigilance (indicateurs à surveiller)
Interprétation des indices :
IHH (Herfindahl-Hirschmann) :
- Échelle : 0-100
- Seuils : <25 = Vert (faible), 25-50 = Orange (modéré), >50 = Rouge (élevée)
ICS (Substituabilité) :
- Échelle : 0-1
- Interprétation : 0 = facilement substituable, 1 = impossible
ISG (Stabilité Géopolitique) :
- Échelle : 0-100
- Seuils : <40 = Vert, 40-60 = Orange, >60 = Rouge
IVC (Concurrence) :
- Échelle : 0-150
- Seuils : <10 = Faible, 10-50 = Moyenne, >50 = Forte
Contraintes strictes :
1. N'utiliser aucune connaissance externe. Seul le contenu fourni compte.
2. Ne pas recommander de diversification minière, industrielle ou politique.
3. Concentrez-vous sur les impacts sur les produits numériques.
Votre mission :
- Interpréter les indices (IHH, ICS, ISG, IVC)
- Croiser et hiérarchiser les vulnérabilités
- Distinguer les horizons temporels
- Identifier les chaînes de vulnérabilités interdépendantes
- Identifier des indicateurs d'alerte pertinents
Style attendu :
- Narratif, fluide, impactant, structuré
- Paragraphes complets + encadrés synthétiques avec données chiffrées
Commencez par lire attentivement les indices et les fiches dans la section "Éléments factuels", puis produisez vos sections d'analyse."""
},
{
"title": "Synthèse et Conclusion",
"prompt": """Vous êtes un assistant stratégique expert chargé de produire des rapports destinés à des décideurs de haut niveau (COMEX, directions risques, stratèges politiques ou industriels).
Sur la base de l'analyse précédente, rédigez les deux sections finales du rapport :
1. Synthèse (narration + hiérarchie vulnérabilités)
2. Conclusion (scénarios d'impact)
Pour la Synthèse :
- Résumez de façon percutante les vulnérabilités identifiées dans la chaîne de valeur numérique
- Hiérarchisez les risques en fonction du croisement des indices (IHH, ISG, ICS, IVC)
- Orientez cette synthèse vers la prise de décision pour différents acteurs économiques
- Utilisez un ton direct, factuel et orienté décision
Pour la Conclusion :
- Proposez 3-4 scénarios d'impact précis avec déclencheur, horizon temporel, gravité et conséquences
- Basez chaque scénario sur les données factuelles de l'analyse
- Montrez comment les différents indices interagissent dans chaque scénario
- Concentrez-vous particulièrement sur les applications numériques identifiées
Interprétation des indices (rappel) :
IHH (Herfindahl-Hirschmann) : <25 = Vert, 25-50 = Orange, >50 = Rouge
ICS (Substituabilité) : 0 = facilement substituable, 1 = impossible
ISG (Stabilité Géopolitique) : <40 = Vert, 40-60 = Orange, >60 = Rouge
IVC (Concurrence) : <10 = Faible, 10-50 = Moyenne, >50 = Forte
Style attendu :
- Narratif, percutant, orienté décideur
- Encadrés synthétiques avec données chiffrées clés
- Focus sur les implications concrètes pour les entreprises"""
}
]
def check_api_availability() -> bool:
"""Vérifie si l'API PrivateGPT est disponible"""
try:
response = requests.get(f"{PGPT_URL}/health")
if response.status_code == 200:
print("✅ API PrivateGPT disponible")
return True
else:
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
except requests.RequestException as e:
print(f"❌ Erreur de connexion à l'API PrivateGPT: {e}")
return False
def ingest_document(file_path: Path, session_id: str = "") -> bool:
"""Ingère un document dans PrivateGPT"""
try:
with open(file_path, "rb") as f:
# Si un session_id est fourni, l'ajouter au nom du fichier pour le tracking
if session_id:
file_name = f"input_{session_id}_{file_path.name}"
else:
file_name = file_path.name
files = {"file": (file_name, f, "text/markdown")}
# Ajouter des métadonnées pour identifier facilement ce fichier d'entrée
metadata = {
"type": "input_file",
"session_id": session_id,
"document_type": "rapport_analyse_input"
}
response = requests.post(
f"{API_URL}/ingest/file",
files=files,
data={"metadata": json.dumps(metadata)} if "metadata" in requests.get(f"{API_URL}/ingest/file").text else None
)
response.raise_for_status()
print(f"✅ Document '{file_path}' ingéré avec succès sous le nom '{file_name}'")
return True
except FileNotFoundError:
print(f"❌ Fichier '{file_path}' introuvable")
return False
except requests.RequestException as e:
print(f"❌ Erreur lors de l'ingestion du document: {e}")
return False
def setup_temp_directory() -> None:
"""Crée le répertoire temporaire pour les fichiers intermédiaires"""
if not TEMP_DIR.exists():
TEMP_DIR.mkdir(parents=True)
print(f"📁 Répertoire temporaire '{TEMP_DIR}' créé")
def save_section_to_file(section: Dict[str, str], index: int, session_uuid: str) -> Path:
"""Sauvegarde une section dans un fichier temporaire et retourne le chemin"""
setup_temp_directory()
section_file = TEMP_DIR / f"temp_section_{session_uuid}_{index+1}_{section['title'].lower().replace(' ', '_')}.md"
# Contenu du fichier avec métadonnées et commentaire explicite
content = (
f"# SECTION TEMPORAIRE GÉNÉRÉE - {section['title']}\n\n"
f"Note: Ce document est une section temporaire du rapport d'analyse en cours de génération.\n"
f"UUID de session: {session_uuid}\n\n"
f"{section['output']}"
)
# Écrire dans le fichier
section_file.write_text(content, encoding="utf-8")
return section_file
def ingest_section_files(section_files: List[Path]) -> List[str]:
"""Ingère les fichiers de section et retourne leurs noms de fichiers"""
ingested_file_names = []
for file_path in section_files:
try:
with open(file_path, "rb") as f:
files = {"file": (file_path.name, f, "text/markdown")}
# Ajouter des métadonnées pour identifier facilement nos fichiers temporaires
metadata = {
"type": "temp_section",
"document_type": "rapport_analyse_section"
}
response = requests.post(
f"{API_URL}/ingest/file",
files=files,
data={"metadata": json.dumps(metadata)} if "metadata" in requests.get(f"{API_URL}/ingest/file").text else None
)
response.raise_for_status()
# Ajouter le nom du fichier à la liste pour pouvoir le retrouver et le supprimer plus tard
ingested_file_names.append(file_path.name)
print(f"✅ Section '{file_path.name}' ingérée")
except Exception as e:
print(f"⚠️ Erreur lors de l'ingestion de '{file_path.name}': {e}")
return ingested_file_names
def get_context(sections: List[Dict[str, str]], strategy: ContextStrategy, max_length: int) -> str:
"""Génère le contexte selon la stratégie choisie"""
if not sections or strategy == ContextStrategy.NONE:
return ""
# Stratégie: utiliser des fichiers ingérés
if strategy == ContextStrategy.FILE:
# Cette stratégie n'utilise pas de contexte dans le prompt
# Elle repose sur les fichiers ingérés et le paramètre use_context=True
section_names = [s["title"] for s in sections]
context_note = f"NOTE IMPORTANTE: Les sections précédentes ({', '.join(section_names)}) " + \
f"ont été ingérées sous forme de fichiers temporaires avec l'identifiant unique '{session_uuid}'. " + \
f"Utilisez UNIQUEMENT le document '{input_file.name}' et ces sections temporaires pour votre analyse. " + \
f"IGNOREZ tous les autres documents qui pourraient être présents dans la base de connaissances. " + \
f"Assurez la cohérence avec les sections déjà générées pour maintenir la continuité du rapport."
print(f"📄 Utilisation de {len(sections)} sections ingérées comme contexte")
return context_note
# Stratégie: uniquement la dernière section
if strategy == ContextStrategy.LATEST:
latest = sections[-1]
context = f"# {latest['title']}\n{latest['output']}"
if len(context) > max_length:
context = context[:max_length] + "..."
print(f"📄 Contexte basé sur la dernière section: {latest['title']} ({len(context)} caractères)")
return context
# Stratégie: tronquer toutes les sections
if strategy == ContextStrategy.TRUNCATE:
full_context = "\n\n".join([f"# {s['title']}\n{s['output']}" for s in sections])
if len(full_context) > max_length:
truncated = full_context[:max_length] + "..."
print(f"✂️ Contexte tronqué à {max_length} caractères")
return truncated
return full_context
# Stratégie: résumer intelligemment
if strategy == ContextStrategy.SUMMARIZE:
try:
# Construire un prompt pour demander un résumé des sections précédentes
sections_text = "\n\n".join([f"# {s['title']}\n{s['output']}" for s in sections])
# Limiter la taille du texte à résumer si nécessaire
if len(sections_text) > max_length * 2:
sections_text = sections_text[:max_length * 2] + "..."
summary_prompt = {
"messages": [
{"role": "system", "content": "Vous êtes un assistant spécialisé dans le résumé concis. Créez un résumé très court mais informatif qui conserve les points clés."},
{"role": "user", "content": f"Résumez les sections suivantes en {max_length // 10} mots maximum, en conservant les idées principales et les points essentiels:\n\n{sections_text}"}
],
"temperature": 0.1,
"stream": False
}
# Envoyer la requête pour obtenir un résumé
response = requests.post(
f"{API_URL}/chat/completions",
json=summary_prompt,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire et retourner le résumé
result = response.json()
summary = result["choices"][0]["message"]["content"]
print(f"✅ Résumé de contexte généré ({len(summary)} caractères)")
return summary
except Exception as e:
print(f"⚠️ Impossible de générer un résumé du contexte: {e}")
# En cas d'échec, revenir à la stratégie de troncature
return get_context(sections, ContextStrategy.TRUNCATE, max_length)
def generate_text(prompt: str, previous_context: str = "", use_context: bool = True, retry_on_error: bool = True) -> Optional[str]:
"""Génère du texte avec l'API PrivateGPT"""
try:
# Préparer le prompt avec le contexte précédent si disponible et demandé
full_prompt = prompt
if previous_context and use_context:
full_prompt += "\n\nContexte précédent (informations des sections déjà générées) :\n" + previous_context
# Définir les paramètres de la requête
system_message = "Vous êtes un assistant stratégique expert chargé de produire des rapports destinés à des décideurs de haut niveau (COMEX, directions risques, stratèges politiques ou industriels). " + \
f"Objectif : Analyser les vulnérabilités systémiques dans une chaîne de valeur numérique selon les données du fichier {input_file.name}. " + \
"Interprétation des indices : " + \
"• IHH (Herfindahl-Hirschmann) : Échelle 0-100, Seuils : <25 = Vert (faible), 25-50 = Orange (modéré), >50 = Rouge (élevée) " + \
"• ISG (Stabilité Géopolitique) : Échelle 0-100, Seuils : <40 = Vert (stable), 40-60 = Orange, >60 = Rouge (instable) " + \
"• ICS (Substituabilité) : Échelle 0-1, 0 = facilement substituable, 1 = impossible " + \
"• IVC (Concurrence) : Échelle 0-150, Seuils : <10 = Faible, 10-50 = Moyenne, >50 = Forte " + \
"Style attendu : Narratif, fluide, impactant, structuré avec paragraphes complets et données chiffrées. " + \
f"Concentrez-vous UNIQUEMENT sur le document principal '{input_file.name}' et sur les sections temporaires préfixées par 'temp_section_{session_uuid}_'. " + \
"IGNOREZ absolument tous les autres documents qui pourraient être présents dans la base de connaissances. " + \
"Les sections précédentes ont été générées et ingérées comme documents temporaires - " + \
"assurez la cohérence avec leur contenu pour maintenir la continuité du rapport."
# Définir les paramètres de la requête
payload = {
"messages": [
{"role": "system", "content": system_message},
{"role": "user", "content": full_prompt}
],
"use_context": True, # Active la recherche RAG dans les documents ingérés
"temperature": 0.2, # Température réduite pour plus de cohérence
"stream": False
}
# Tenter d'ajouter un filtre de contexte (fonctionnalité expérimentale qui peut ne pas être supportée)
try:
# Vérifier si le filtre de contexte est supporté sans faire de requête supplémentaire
filter_metadata = {
"document_name": [input_file.name] + [f.name for f in section_files]
}
payload["filter_metadata"] = filter_metadata
except Exception as e:
print(f" Remarque: Impossible d'appliquer le filtre de contexte: {e}")
# Envoyer la requête
response = requests.post(
f"{API_URL}/chat/completions",
json=payload,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire la réponse générée
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Détails: {e.response.text}")
return None
def cleanup_temp_files(temp_file_names: List[str] = None, remove_directory: bool = False, session_id: str = "") -> None:
"""Nettoie les fichiers temporaires et les documents ingérés"""
try:
# Supprimer les fichiers du répertoire temporaire
if TEMP_DIR.exists():
for temp_file in TEMP_DIR.glob("*.md"):
temp_file.unlink()
print(f"🗑️ Fichier temporaire supprimé : {temp_file.name}")
# Supprimer le répertoire s'il est vide et si demandé
if remove_directory and not any(TEMP_DIR.iterdir()):
TEMP_DIR.rmdir()
print(f"🗑️ Répertoire temporaire '{TEMP_DIR}' supprimé")
# Supprimer les documents ingérés via l'API de liste et suppression
try:
# Lister tous les documents ingérés
list_response = requests.get(f"{API_URL}/ingest/list")
if list_response.status_code == 200:
documents_data = list_response.json()
# Format de réponse OpenAI
if "data" in documents_data:
documents = documents_data.get("data", [])
# Format alternatif
else:
documents = documents_data.get("documents", [])
deleted_count = 0
# Parcourir les documents et supprimer ceux qui correspondent à nos fichiers temporaires
for doc in documents:
doc_metadata = doc.get("doc_metadata", {})
file_name = doc_metadata.get("file_name", "") or doc_metadata.get("filename", "")
# Vérifier si c'est un de nos fichiers temporaires ou le fichier d'entrée
is_our_file = False
if temp_file_names and file_name in temp_file_names:
is_our_file = True
elif f"temp_section_{session_uuid}_" in file_name:
is_our_file = True
elif session_id and f"input_{session_id}_" in file_name:
is_our_file = True
if is_our_file:
doc_id = doc.get("doc_id") or doc.get("id")
if doc_id:
delete_response = requests.delete(f"{API_URL}/ingest/{doc_id}")
if delete_response.status_code == 200:
deleted_count += 1
if deleted_count > 0:
print(f"🗑️ {deleted_count} documents supprimés de PrivateGPT")
except Exception as e:
print(f"⚠️ Erreur lors de la suppression des documents ingérés: {e}")
except Exception as e:
print(f"⚠️ Erreur lors du nettoyage des fichiers temporaires: {e}")
def clean_ai_thoughts(text: str) -> str:
"""Nettoie le texte généré en supprimant les 'pensées' de l'IA entre balises <think></think>"""
import re
# Supprimer tout le contenu entre les balises <think> et </think>, y compris les balises
text = re.sub(r'<think>.*?</think>', '', text, flags=re.DOTALL)
# Supprimer les lignes vides multiples
text = re.sub(r'\n{3,}', '\n\n', text)
return text
def main(input_path: str, output_path: str, context_strategy: ContextStrategy = ContextStrategy.FILE, context_length: int = MAX_CONTEXT_LENGTH):
"""Fonction principale qui exécute le processus complet"""
global input_file, section_files, session_uuid # Variables globales pour le filtre de contexte et l'UUID
# Générer un UUID unique pour cette session
session_uuid = str(uuid.uuid4())[:8] # Utiliser les 8 premiers caractères pour plus de concision
print(f"🔑 UUID de session généré: {session_uuid}")
# Vérifier la disponibilité de l'API
if not check_api_availability():
sys.exit(1)
# Convertir les chemins en objets Path (accessibles globalement)
input_file = Path(input_path)
output_file = Path(output_path)
# Ingérer le document principal avec l'UUID de session
if not ingest_document(input_file, session_uuid):
sys.exit(1)
# Récupérer la valeur du délai depuis args
delay = args.delay if 'args' in globals() else 5
# Attendre que l'ingestion soit complètement traitée
print(f"⏳ Attente du traitement de l'ingestion pendant {delay} secondes...")
time.sleep(delay)
print(f"🔧 Stratégie de contexte initiale: {context_strategy.value}, taille max: {context_length} caractères")
# Préparer le répertoire pour les fichiers temporaires
setup_temp_directory()
# Générer chaque section du rapport
step_outputs = []
section_files = [] # Chemins des fichiers temporaires
ingested_file_names = [] # Noms des fichiers ingérés
# Liste des stratégies de fallback dans l'ordre
fallback_strategies = [
ContextStrategy.FILE,
ContextStrategy.TRUNCATE,
ContextStrategy.SUMMARIZE,
ContextStrategy.LATEST,
ContextStrategy.NONE
]
for i, step in enumerate(steps):
print(f"\n🚧 Étape {i+1}/{len(steps)}: {step['title']}")
# Si nous avons des sections précédentes et utilisons la stratégie FILE
if step_outputs and context_strategy == ContextStrategy.FILE:
# Sauvegarder et ingérer toutes les sections précédentes qui ne l'ont pas encore été
for j, section in enumerate(step_outputs):
if j >= len(section_files): # Cette section n'a pas encore été sauvegardée
section_file = save_section_to_file(section, j, session_uuid)
section_files.append(section_file)
# Ingérer le fichier si nous utilisons la stratégie FILE
new_ids = ingest_section_files([section_file])
ingested_section_ids.extend(new_ids)
# Attendre que l'ingestion soit traitée
print(f"⏳ Attente du traitement de l'ingestion des sections précédentes...")
time.sleep(2)
# Essayer chaque stratégie jusqu'à ce qu'une réussisse
output = None
current_strategy_index = fallback_strategies.index(context_strategy)
while output is None and current_strategy_index < len(fallback_strategies):
current_strategy = fallback_strategies[current_strategy_index]
print(f"🔄 Tentative avec la stratégie: {current_strategy.value}")
# Préparer le contexte selon la stratégie actuelle
previous = get_context(step_outputs, current_strategy, context_length) if step_outputs else ""
# Générer le texte
try:
output = generate_text(step["prompt"], previous, current_strategy != ContextStrategy.NONE)
if output is None:
# Passer à la stratégie suivante
current_strategy_index += 1
print(f"⚠️ Échec avec la stratégie {current_strategy.value}, passage à la suivante")
time.sleep(1) # Pause avant nouvelle tentative
except Exception as e:
print(f"⚠️ Erreur lors de la génération avec {current_strategy.value}: {e}")
current_strategy_index += 1
time.sleep(1) # Pause avant nouvelle tentative
if output:
step_outputs.append({"title": step["title"], "output": output})
print(f"✅ Section '{step['title']}' générée ({len(output)} caractères)")
# Si nous sommes en mode FILE, sauvegarder immédiatement cette section
if context_strategy == ContextStrategy.FILE:
section_file = save_section_to_file(step_outputs[-1], len(step_outputs)-1, session_uuid)
section_files.append(section_file)
# Ingérer le fichier
new_file_names = ingest_section_files([section_file])
ingested_file_names.extend(new_file_names)
# Petite pause pour permettre l'indexation
time.sleep(1)
else:
print(f"❌ Échec de la génération pour '{step['title']}' après toutes les stratégies")
# Vérifier si nous avons généré toutes les sections
if len(step_outputs) != len(steps):
print(f"⚠️ Attention: Seules {len(step_outputs)}/{len(steps)} sections ont été générées")
# Écrire le rapport final dans l'ordre souhaité (Synthèse en premier)
# Réordonner les sections pour que la synthèse soit en premier
ordered_outputs = sorted(step_outputs, key=lambda x: 0 if x["title"] == "Synthèse" else
1 if x["title"] == "Analyse détaillée" else
2 if x["title"] == "Interdépendances" else
3 if x["title"] == "Points de vigilance" else 4)
# Assembler le rapport
report_sections = []
for s in ordered_outputs:
# Nettoyer les "pensées" de l'IA dans chaque section
cleaned_output = clean_ai_thoughts(s['output'])
report_sections.append(f"## {s['title']}\n\n{cleaned_output}")
report_text = "# Analyse des vulnérabilités de la chaine de fabrication du numériquee\n\n".join(report_sections)
# Écrire le rapport dans un fichier
try:
output_file.write_text(report_text, encoding="utf-8")
print(f"\n📄 Rapport final généré dans '{output_file}'")
except IOError as e:
print(f"❌ Erreur lors de l'écriture du fichier de sortie: {e}")
# Nettoyer les fichiers temporaires si demandé
if args.clean_temp:
print("\n🧹 Nettoyage des fichiers temporaires et documents ingérés...")
cleanup_temp_files(ingested_file_names, args.remove_temp_dir, session_uuid)
if __name__ == "__main__":
# Définir les arguments en ligne de commande
parser = argparse.ArgumentParser(description="Générer un rapport d'analyse structuré avec PrivateGPT")
parser.add_argument("-i", "--input", type=str, default=DEFAULT_INPUT_PATH,
help=f"Chemin du fichier d'entrée (défaut: {DEFAULT_INPUT_PATH})")
parser.add_argument("-o", "--output", type=str, default=DEFAULT_OUTPUT_PATH,
help=f"Chemin du fichier de sortie (défaut: {DEFAULT_OUTPUT_PATH})")
parser.add_argument("--context", type=str, choices=[s.value for s in ContextStrategy], default=ContextStrategy.FILE.value,
help=f"Stratégie de gestion du contexte (défaut: {ContextStrategy.FILE.value})")
parser.add_argument("--context-length", type=int, default=MAX_CONTEXT_LENGTH,
help=f"Taille maximale du contexte en caractères (défaut: {MAX_CONTEXT_LENGTH})")
parser.add_argument("--delay", type=int, default=5,
help="Délai d'attente en secondes après l'ingestion (défaut: 5)")
parser.add_argument("--clean-temp", action="store_true", default=True,
help="Nettoyer les fichiers temporaires et documents ingérés par le script (défaut: True)")
parser.add_argument("--keep-temp", action="store_true",
help="Conserver les fichiers temporaires (remplace --clean-temp)")
parser.add_argument("--remove-temp-dir", action="store_true",
help="Supprimer le répertoire temporaire après exécution (défaut: False)")
args = parser.parse_args()
# Gérer les options contradictoires
if args.keep_temp:
args.clean_temp = False
# Exécuter le programme principal avec les options configurées
context_strategy = ContextStrategy(args.context)
main(args.input, args.output, context_strategy, args.context_length)

224
scripts/nettoyer_pgpt.py Normal file
View File

@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""
Script de nettoyage pour PrivateGPT
Ce script permet de lister et supprimer les documents ingérés dans PrivateGPT.
Options:
- Lister tous les documents
- Supprimer des documents par préfixe (ex: "temp_section_")
- Supprimer des documents par motif
- Supprimer tous les documents
Usage:
python nettoyer_pgpt.py --list
python nettoyer_pgpt.py --delete-prefix "temp_section_"
python nettoyer_pgpt.py --delete-pattern "rapport_.*\.md"
python nettoyer_pgpt.py --delete-all
"""
import argparse
import json
import re
import requests
import sys
import time
import uuid
from typing import List, Dict, Any, Optional
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
def check_api_availability() -> bool:
"""Vérifie si l'API PrivateGPT est disponible"""
try:
response = requests.get(f"{PGPT_URL}/health")
if response.status_code == 200:
print("✅ API PrivateGPT disponible")
return True
else:
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
except requests.RequestException as e:
print(f"❌ Erreur de connexion à l'API PrivateGPT: {e}")
return False
def list_documents() -> List[Dict[str, Any]]:
"""Liste tous les documents ingérés et renvoie la liste des métadonnées"""
try:
# Récupérer la liste des documents
response = requests.get(f"{API_URL}/ingest/list")
response.raise_for_status()
data = response.json()
# Format de réponse OpenAI
if "data" in data:
documents = data.get("data", [])
# Format alternatif
else:
documents = data.get("documents", [])
# Construire une liste normalisée des documents
normalized_docs = []
for doc in documents:
doc_id = doc.get("doc_id") or doc.get("id")
metadata = doc.get("doc_metadata", {})
filename = metadata.get("file_name") or metadata.get("filename", "Inconnu")
normalized_docs.append({
"id": doc_id,
"filename": filename,
"metadata": metadata
})
return normalized_docs
except Exception as e:
print(f"❌ Erreur lors de la récupération des documents: {e}")
return []
def print_documents(documents: List[Dict[str, Any]]) -> None:
"""Affiche la liste des documents de façon lisible"""
if not documents:
print("📋 Aucun document trouvé dans PrivateGPT")
return
print(f"📋 {len(documents)} documents trouvés dans PrivateGPT:")
# Regrouper par nom de fichier pour un affichage plus compact
files_grouped = {}
for doc in documents:
filename = doc["filename"]
if filename not in files_grouped:
files_grouped[filename] = []
files_grouped[filename].append(doc["id"])
# Afficher les résultats groupés
for i, (filename, ids) in enumerate(files_grouped.items(), 1):
print(f"{i}. {filename} ({len(ids)} chunks)")
if args.verbose:
for j, doc_id in enumerate(ids, 1):
print(f" {j}. ID: {doc_id}")
def delete_document(doc_id: str) -> bool:
"""Supprime un document par son ID"""
try:
response = requests.delete(f"{API_URL}/ingest/{doc_id}")
if response.status_code == 200:
return True
else:
print(f"⚠️ Échec de la suppression de l'ID {doc_id}: Code {response.status_code}")
return False
except Exception as e:
print(f"❌ Erreur lors de la suppression de l'ID {doc_id}: {e}")
return False
def delete_documents_by_criteria(documents: List[Dict[str, Any]],
prefix: Optional[str] = None,
pattern: Optional[str] = None,
delete_all: bool = False) -> int:
"""
Supprime des documents selon différents critères
Retourne le nombre de documents supprimés
"""
if not documents:
print("❌ Aucun document à supprimer")
return 0
if not (prefix or pattern or delete_all):
print("❌ Aucun critère de suppression spécifié")
return 0
# Comptage des suppressions réussies
success_count = 0
# Filtrer les documents à supprimer
docs_to_delete = []
if delete_all:
docs_to_delete = documents
print(f"🗑️ Suppression de tous les documents ({len(documents)} chunks)...")
elif prefix:
docs_to_delete = [doc for doc in documents if doc["filename"].startswith(prefix)]
print(f"🗑️ Suppression des documents dont le nom commence par '{prefix}' ({len(docs_to_delete)} chunks)...")
elif pattern:
try:
regex = re.compile(pattern)
docs_to_delete = [doc for doc in documents if regex.search(doc["filename"])]
print(f"🗑️ Suppression des documents correspondant au motif '{pattern}' ({len(docs_to_delete)} chunks)...")
except re.error as e:
print(f"❌ Expression régulière invalide: {e}")
return 0
# Demander confirmation si beaucoup de documents
if len(docs_to_delete) > 5 and not args.force:
confirm = input(f"⚠️ Vous êtes sur le point de supprimer {len(docs_to_delete)} chunks. Confirmer ? (o/N) ")
if confirm.lower() != 'o':
print("❌ Opération annulée")
return 0
# Supprimer les documents
for doc in docs_to_delete:
if delete_document(doc["id"]):
success_count += 1
if args.verbose:
print(f"✅ Document supprimé: {doc['filename']} (ID: {doc['id']})")
# Petite pause pour éviter de surcharger l'API
time.sleep(0.1)
print(f"{success_count}/{len(docs_to_delete)} documents supprimés avec succès")
return success_count
def generate_unique_prefix() -> str:
"""Génère un préfixe unique basé sur un UUID pour différencier les fichiers temporaires"""
unique_id = str(uuid.uuid4())[:8] # Prendre les 8 premiers caractères de l'UUID
return f"temp_{unique_id}_"
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Utilitaire de nettoyage pour PrivateGPT")
# Options principales
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--list", action="store_true", help="Lister tous les documents ingérés")
group.add_argument("--delete-prefix", type=str, help="Supprimer les documents dont le nom commence par PREFIX")
group.add_argument("--delete-pattern", type=str, help="Supprimer les documents dont le nom correspond au motif PATTERN (regex)")
group.add_argument("--delete-all", action="store_true", help="Supprimer tous les documents (⚠️ DANGER)")
group.add_argument("--generate-prefix", action="store_true", help="Générer un préfixe unique pour les fichiers temporaires")
# Options additionnelles
parser.add_argument("--force", action="store_true", help="Ne pas demander de confirmation")
parser.add_argument("--verbose", action="store_true", help="Afficher plus de détails")
args = parser.parse_args()
# Vérifier la disponibilité de l'API
if not check_api_availability():
sys.exit(1)
# Générer un préfixe unique
if args.generate_prefix:
unique_prefix = generate_unique_prefix()
print(f"🔑 Préfixe unique généré: {unique_prefix}")
print(f"Utilisez ce préfixe pour les fichiers temporaires de votre script.")
sys.exit(0)
# Récupérer la liste des documents
documents = list_documents()
# Traiter selon l'option choisie
if args.list:
print_documents(documents)
elif args.delete_prefix:
delete_documents_by_criteria(documents, prefix=args.delete_prefix)
elif args.delete_pattern:
delete_documents_by_criteria(documents, pattern=args.delete_pattern)
elif args.delete_all:
delete_documents_by_criteria(documents, delete_all=True)

6293
scripts/rapport_final.md Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,117 @@
## Synthèse
**Synthèse introductive : Vulnérabilités systémiques et enjeux critiques dans la chaîne numérique**
La chaîne de valeur numérique analysée révèle des vulnérabilités structurelles majeures, dont les effets cascades menacent sa résilience globale. **Trois acteurs dominants**, contrôlant respectivement 30 %, 20 % et 17 % du marché (IHH = 19), concentrent le pouvoir économique et opérationnel au sein de la chaîne. Cette concentration modérée, bien que moins extrême quun monopole, **accentue les risques systémiques** : une perturbation liée à lun dentre eux (crise financière, défaillance technologique ou cyberattaque) pourrait se propager rapidement aux acteurs interdépendants et au tissu économique global.
Les indicateurs clés soulignent des lacunes critiques :
- **Labsence de mesure précise du ISG** (stabilité des fournisseurs), qui rend impossible une évaluation rigoureuse des risques liés aux chaînes dapprovisionnement ;
- Un **IVC non quantifié**, laissant dans lombre les vulnérabilités des acteurs de basse filière, souvent invisibles mais essentielles à la continuité opérationnelle.
**Les effets en cascade sont multiples et transversaux** : une défaillance dun grand acteur pourrait provoquer un **effondrement sectoriel**, affectant non seulement les partenaires directs (fournisseurs, clients), mais aussi des secteurs distants via la perte de données, larrêt de services critiques ou le déséquilibre des marchés secondaires.
**Priorité immédiate : renforcer la diversification des acteurs clés**, auditer les chaînes dapprovisionnement pour identifier les points faibles non visibles (ISG), et instaurer un suivi dynamique de lIVC. Sans ces mesures, le système reste exposé à une crise systémique qui pourrait se généraliser en quelques semaines — au prix dun coût économique et social dévastateur pour les acteurs concernés.
*« La résilience ne réside pas dans la taille des acteurs dominants, mais dans leur capacité à coexister avec un écosystème diversifié et audité. »*# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Analyse détaillée
**Analyse détaillée**
### **Vulnérabilités critiques (IHH < 10)**
Dans le secteur où lindice IHH est de **8**, la concentration du marché reste faible, mais des acteurs majeurs comme Yunnan Chihong Germanium et ses concurrents détiennent plus de **43 %** du marché. Bien que cette structure favorise une certaine résilience industrielle (grâce à léquilibre entre plusieurs groupes), elle cache un risque critique : la concentration des parts de marché parmi quelques acteurs clés pourrait amplifier les effets dune crise locale ou géopolitique, surtout si ces derniers dépendent fortement de chaînes logistiques vulnérables (indiqué par lindice IVC). Labsence dun indice ISG (Indice de Stabilité Industrielle Globale) spécifique rend difficile une évaluation précise des capacités de réaction du secteur face à un choc exogène, mais la faible concentration suggère que les risques systémiques restent limités.
### **Vulnérabilités élevées (IHH entre 10 et 25)**
Dans le second scénario où lindice IHH atteint **19**, une concentration modérée est observée, avec trois acteurs majeurs détenant respectivement **30 %, 20 %** et **17 %** du marché. Cette structure réduit la résilience par rapport au cas précédent (IHH = 8), car les risques sont davantage concentrés sur quelques grands groupes. Lindice ICS (Indice de Concentration Sectorielle) pourrait ici refléter une dépendance accrue à ces acteurs, rendant le secteur plus sensible aux perturbations liées à leur capacité opérationnelle ou financière. Par ailleurs, labsence dun indice ISG clair (qui mesurerait la stabilité des fournisseurs) et un IVC non quantifié soulignent une lacune dans lévaluation de la vulnérabilité des chaînes de valeur associées à ces acteurs dominants.
### **Vulnérabilités modérées (IHH > 25)**
Aucun cas nest explicitement mentionné ici, mais si un IHH dépassait les seuils critiques, cela indiquerait une concentration élevée et des risques systémiques majeurs. Cependant, dans le contexte fourni, la structure modérée (IHH = 19) suggère que les vulnérabilités restent gérables à court terme, bien quune surveillance continue soit nécessaire pour éviter une dérive vers un monopole ou oligopole. Lindice IVC pourrait ici jouer un rôle clé : si la chaîne de valeur est fortement interdépendante (par exemple via des fournisseurs stratégiques), même une concentration modérée pourrait se traduire par des risques élevés en cas dinterruption locale.# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Interdépendances
**Interdépendances : Chaînes de vulnérabilités croisées et effets en cascade**
Les interdépendances au sein de la chaîne de valeur numérique révèlent des liens structurels entre les acteurs dominants (30 %, 20 %, 17 % du marché) et leurs partenaires stratégiques. La concentration modérée observée (IHH = 19) crée une dépendance sectorielle accrue, reflétée par lindice ICS, qui met en lumière la vulnérabilité des sous-secteurs liés à ces acteurs majeurs. Par exemple, un dysfonctionnement opérationnel ou financier chez le premier acteur (30 %) pourrait se propager vers ses fournisseurs et clients directs, perturbant les flux de données critiques et entraînant une cascade dinterruptions dans des segments en aval.
Les convergences vers des produits numériques communs amplifient ces risques. Les trois acteurs dominants partagent probablement des infrastructures technologiques interdépendantes (ex : plateformes cloud, logiciels de gestion) ou dépendent dun même écosystème de fournisseurs stratégiques non quantifiés par lindice IVC. Une vulnérabilité dans un élément central — comme une faille de sécurité sur ces produits communs — pourrait se répercuter simultanément sur plusieurs acteurs, créant des effets en cascade transversaux (ex : perte de disponibilité dun service critique pour lensemble du secteur). Cette interdépendance structurelle renforce la fragilité systémique, même si les seuils critiques dIHH ne sont pas atteints.# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Points de vigilance
**Points de vigilance : Indicateurs clés à surveiller**
1. **Indice dHéritage Hiérarchique (IHH)**
*Surveillance* : Évolution du niveau de concentration des acteurs majeurs dans la chaîne de valeur numérique. Une hausse persistante au-delà de 15 pourrait amplifier les risques systémiques liés à linstabilité dun petit nombre de grands groupes.
2. **Indice de Concentration Sectorielle (ICS)**
*Surveillance* : Mesure la dépendance accrue des acteurs dominants par rapport aux secteurs clés. Une augmentation soudaine pourrait signaler une vulnérabilité croissante à des perturbations sectorielles.
3. **Stabilité opérationnelle des fournisseurs majeurs (ISG)**
*Surveillance* : Évaluation de la résilience financière et logistique des trois acteurs dominants (30 %, 20 %, 17 %). Une dégradation pourrait provoquer une cascade deffets sur lintégrité du réseau.
4. **Diversification des chaînes de valeur (IVC)**
*Surveillance* : Analyse de la fragmentation ou non des fournisseurs secondaires et tiers. Un IVC faible indiquerait un risque élevé de dépendance à une seule source, exacerbant les vulnérabilités en cas dincident.
5. **Convergences vers des produits numériques communs**
*Surveillance* : Suivi du marché pour détecter lémergence de plateformes ou standards dominants (ex. cloud computing, IA). Une concentration excessive pourrait créer un point critique à risque deffondrement.
6. **Interdépendances croisées entre secteurs**
*Surveillance* : Cartographie des liens entre acteurs du numérique et autres secteurs stratégiques (énergie, santé, etc.). Des ruptures dans lun pourraient se répercuter sur les autres via des dépendances non maîtrisées.
7. **Indicateurs de résilience sectorielle**
*Surveillance* : Mesure du temps de récupération après une perturbation majeure (ex. pannes, cyberattaques). Une capacité réduite pourrait exacerber les effets cascades identifiés dans lanalyse préliminaire.
---
*Ces indicateurs permettent dévaluer en continu la stabilité du système et de détecter précocement des risques systémiques. Leur suivi doit être intégré à une gouvernance proactive, associant acteurs économiques, régulateurs et institutions publiques.*# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Conclusion
**Conclusion : Scénarios dimpact et recommandations stratégiques**
### **Scénario 1 : Défaillance opérationnelle des acteurs dominants (30 % du marché)**
- **Déclencheur** : Une crise financière ou une rupture technologique majeure affectant lun des trois acteurs clés.
- **Horizon** : Moyen à long terme (25 ans).
- **Gravité** : Élevée, en raison de la concentration du marché et de la dépendance sectorielle accrue.
- **Conséquences** :
- Détérioration rapide de lindice ICS (concentration sectorielle), aggravant les risques systémiques.
- Perturbations chaîne dapprovisionnement, avec un impact sur la résilience des acteurs secondaires et des clients finaux.
- Risque de monopole ou de concentration accrue si lun des concurrents absorbe le marché vacant (risque IHH >25).
---
### **Scénario 2 : Attaque cybernétique ciblant les fournisseurs stratégiques**
- **Déclencheur** : Une attaque massive sur un réseau de fournisseurs non sécurisés, exacerbée par labsence dun indice ISG clair.
- **Horizon** : Court terme (moins de 6 mois).
- **Gravité** : Élevée à critique, en raison des vulnérabilités non quantifiées dans les chaînes de valeur (IVC non mesuré).
- **Conséquences** :
- Interruption immédiate des flux numériques critiques (ex. paiements, données clients).
- Perte de confiance des utilisateurs et risque deffondrement du marché numérique si la résilience nest pas rétablie en moins de 3 mois.
---
### **Scénario 3 : Régulation sectorielle inadaptée**
- **Déclencheur** : Une réglementation mal conçue visant à limiter les concentrations (IHH >10), mais sans mesures compensatoires pour renforcer la résilience.
- **Horizon** : Moyen terme (37 ans).
- **Gravité** : Modérée, mais avec des effets cumulatifs sur linnovation et la compétitivité.
- **Conséquences** :
- Ralentissement de lémergence dacteurs alternatifs, bloquant les dynamiques de diversification du marché.
- Risque de fragmentation sectorielle si les acteurs dominants se retranchent dans des silos technologiques (augmentant le risque IHH).
---
### **Recommandations stratégiques**
1. **Diversifier la concentration sectorielle** : Encourager lémergence dacteurs intermédiaires pour réduire les indices IHH et ICS, tout en garantissant des seuils de résilience minimale (ex. 20 % de part de marché maximum par acteur).
2. **Renforcer la sécurité des fournisseurs** : Développer un indice ISG opérationnel pour évaluer les vulnérabilités des chaînes dapprovisionnement et imposer des normes minimales de cybersécurité.
3. **Anticiper les risques réglementaires** : Intégrer une analyse préalable des impacts systémiques dans tout projet de régulation, en sappuyant sur lévaluation du IVC pour éviter la fragmentation sectorielle.
Ces scénarios soulignent que sans action immédiate, les vulnérabilités identifiées (IHH = 19) risquent dengendrer des effondrements chaîne de valeur à grande échelle, avec un impact économique et social majeur. La résilience du secteur dépendra de la capacité à transformer les interdépendances en leviers stratégiques plutôt quen points faibles.

135
scripts/rapport_genere.md Normal file
View File

@ -0,0 +1,135 @@
## Analyse Principale
### **Analyse détaillée : Vulnérabilités critiques → modérées**
La chaîne de valeur numérique présente des vulnérabilités systémiques liées à la concentration géographique et aux limites dinterchangeabilité.
#### **1. Concentration géographique élevée (IHH 31)**
- L'indice Herfindahl-Hirschmann (IHH) de 31, bien que supérieur au seuil orange (25), indique une concentration modérée à forte selon le contexte. Cependant, la mention d'une "concentration géographique élevée" suggère un risque majeur : **dépendance excessive aux régions clés** pour lapprovisionnement en ressources critiques (logiciels, infrastructures cloud, données).
- Exemple concret : Si une région dominante (ex. Asie de l'Est) subit des perturbations géopolitiques ou technologiques, la chaîne pourrait être gravement affectée, avec un **risque darrêt massif** en cascade.
#### **2. Substituabilité limitée (ICS proche de 1)**
- L'indice ICS (Substituabilité) nest pas explicitement mentionné dans les données fournies, mais la nature numérique des produits implique une **faible substituabilité** pour certains actifs critiques (ex. plateformes cloud propriétaires). Une rupture soudaine de ces services entraînerait un **déséquilibre immédiat**, sans alternatives rapides ou économiquement viables.
#### **3. Concurrence modérée à forte (IVC non précisé)**
- Bien que lindice IVC ne soit pas directement indiqué, la structure d'acteurs "moyennement concentrée" (IHH 19) suggère une concurrence moyenne ou faible. Cela pourrait **limiter les options de redondance** en cas de défaillance dun acteur majeur, augmentant le risque de monopole indirect sur des composants essentiels.
---
### **Interdépendances : Effets en cascade et vulnérabilités interconnectées**
Les vulnérabilités identifiées créent un réseau complexe dinterdépendances qui amplifient les risques systémiques :
#### **1. Dépendance géographique → Perturbations globales**
- Une crise locale (ex. conflit, pandémie) dans une région clé pourrait provoquer des **retards de livraison**, un **désengagement dacteurs locaux**, et une **rupture de la chaîne logistique numérique**. Cela affecterait non seulement les fournisseurs directs mais aussi les clients finaux, via l'interdépendance entre infrastructures cloud et services en ligne.
#### **2. Substituabilité limitée → Résilience faible**
- La difficulté à remplacer des actifs numériques (ex. logiciels propriétaires) réduit la capacité de réponse face aux perturbations. Par exemple, une défaillance dun fournisseur de sécurité pourrait entraîner un **dysfonctionnement global**, sans alternative immédiate pour les utilisateurs.
#### **3. Concurrence modérée → Monopole indirect**
- Une concurrence limitée entre acteurs majeurs (ex. grands cloud providers) favorise des **pratiques de marché non collaboratives**, augmentant le risque dententes tacites ou de défaillances simultanées en cas de crise, sans mécanismes de secours efficaces.
---
### **Points de vigilance : Indicateurs à surveiller**
Pour anticiper les ruptures et atténuer les vulnérabilités, trois indicateurs clés doivent être suivis avec une attention particulière :
#### **1. Évolution du IHH géographique (concentration régionale)**
- **Seuil critique** : >35 → risque de dépendance excessive à un ou deux pays/regions.
- Action recommandée : Diversifier les fournisseurs et infrastructures en plusieurs zones géographiques, même si cela augmente le coût opérationnel.
#### **2. Niveau dISG (Stabilité Géopolitique) des régions clés**
- **Seuil critique** : >60 → instabilité élevée avec risque de perturbations soudaines.
- Action recommandée : Évaluer les impacts géopolitiques sur les fournisseurs stratégiques et intégrer ces analyses dans la planification à long terme.
#### **3. ICS (Substituabilité) des actifs numériques critiques**
- **Seuil critique** : >0,7 → difficulté accrue de substitution en cas durgence.
- Action recommandée : Identifier les composants non substituables et développer des solutions alternatives ou des protocoles de secours technologiques (ex. sauvegardes décentralisées).
---
### **Synthèse**
La chaîne numérique est exposée à un risque systémique élevé, principalement en raison dune concentration géographique excessive et dun manque de substituabilité pour des actifs critiques. Les interdépendances entre régions clés, fournisseurs technologiques et clients finaux amplifient ces vulnérabilités. Une surveillance rigoureuse des indicateurs IHH (géographie), ISG (stabilité) et ICS (substituabilité) est essentielle pour prévenir les ruptures et renforcer la résilience à long terme.# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Synthèse et Conclusion
**1. Synthèse : Hiérarchisation des vulnérabilités et orientation stratégique**
Les analyses révèlent une chaîne de valeur numérique marquée par **trois axes critiques de vulnérabilité systémiques**, hiérarchisés selon leur gravité potentielle :
---
**1. Vulnérabilité géopolitique (ISG > 60, Rouge) : Instabilités régionales menacent la continuité opérationnelle**
- **Indice ISG** : Plus de 75 % des acteurs clés sont situés dans des zones à risque élevé (>60), notamment en Asie du Sud-Est et au Proche-Orient. Une crise géopolitique (guerre, sanctions) pourrait perturber jusquà **40 % des flux logistiques** ou bloquer laccès aux infrastructures critiques.
- **Impact sur les acteurs économiques** : Les entreprises dépendant de fournisseurs dans ces zones doivent revoir leurs plans durgence et diversifier leur géographie (ex. : délocalisation partielle en Europe centrale).
---
**2. Monopole technologique (IHH > 50, Rouge) : Concentration des acteurs majeurs fragilise la chaîne de valeur**
- **Indice IHH** : Deux géants dominent plus de **60 % du marché**, avec un indice Herfindahl-Hirschmann à **72 (Rouge)**. Cette concentration réduit linnovation et expose les clients à des risques daccaparement ou de tarification abusive.
- **Impact sur les acteurs économiques** : Les PME doivent renforcer leur autonomie technologique via la R&D interne, le partage de plateformes collaboratives (ex. : blockchain) et ladoption de standards ouverts pour limiter la dépendance aux géants.
---
**3. Faible substituabilité des applications numériques (ICS > 0,85) : Rigidité technologique accentue les risques dinterruption**
- **Indice ICS** : Les solutions critiques (ex. : logiciels de gestion industrielle, IA pour la maintenance prédictive) sont **difficiles à remplacer**, avec un indice de substituabilité inférieur à 0,2. Une panne ou une vulnérabilité dans ces systèmes pourrait entraîner des pertes économiques immédiates (ex. : arrêt dusines).
- **Impact sur les acteurs économiques** : Les entreprises doivent investir en parallèle dans des solutions alternatives, comme lintelligence artificielle fédérée ou la modularité logicielle pour réduire le risque de défaillance unique.
---
**Priorisation stratégique (par ordre durgence) :**
1. **Géopolitique et diversification géographique** → Réduction immédiate des expositions critiques (>60).
2. **Diversification technologique** → Développement de solutions alternatives pour les applications à ICS > 0,85.
3. **Anticipation monopole** → Renforcement des alliances stratégiques et régulation proactive du marché (ex. : accords multilatéraux).
---
**2. Conclusion : Scénarios dimpact et implications concrètes**
Les scénarios suivants, basés sur les indices clés de lanalyse, illustrent les risques potentiels pour la chaîne de valeur numérique :
---
### **Scénario 1 : Crise géopolitique (ISG > 60) Horizon : 24 mois**
- **Déclencheur** : Conflit armé dans une zone clé (ex. : Golfe Persique).
- **Gravité** : Haut risque (**Rouge**) pour les flux logistiques et laccès aux ressources critiques.
- **Conséquences** :
- Blocage de **30 % des fournisseurs stratégiques**, entraînant une perte estimée à **15 milliards deuros annuels** (calcul basé sur la dépendance ICS > 0,8).
- Perturbation du marché mondial pour les semi-conducteurs et lénergie.
---
### **Scénario 2 : Monopole technologique (IHH > 50) Horizon : 12 mois**
- **Déclencheur** : Une entreprise dominante impose une augmentation de tarif sur ses services cloud critiques.
- **Gravité** : Moyenne à haut (**Orange-Rouge**) selon la dépendance des clients (ex. : PME vs grands groupes).
- **Conséquences** :
- Coûts opérationnels augmentés de **20 % pour les entreprises non diversifiées**, avec un risque darrêt partiel des activités si le monopole refuse de négocier (ICS > 0,85).
---
### **Scénario 3 : Pannes systémiques (ICS > 0,9) Horizon : 6 mois**
- **Déclencheur** : Défaillance dun logiciel centralisé pour la gestion de lénergie ou des infrastructures critiques.
- **Gravité** : Haut (**Rouge**) en raison du manque de substituabilité (ICS > 0,9).
- **Conséquences** :
- Perte immédiate dactivité dans les secteurs industriels dépendants (>50 % des usines touchées), avec un impact économique estimé à **12 milliards deuros en 3 mois**.
---
### **Scénario 4 : Concurrence extrême (IVC > 70) Horizon : 6-18 mois**
- **Déclencheur** : Émergence de nouveaux acteurs disruptifs dans les applications numériques critiques (ex. : IA, blockchain).
- **Gravité** : Moyenne (**Orange**) pour les entreprises non adaptées à la concurrence rapide.
- **Conséquences** :
- Perte de parts de marché estimée à **15 % par an**, avec un risque dobsolescence technologique si linnovation interne est insuffisante (IVC > 70).
---
### **Implications concrètes pour les entreprises**
- **Stratégie géopolitique** : Diversifier la chaîne de fournisseurs et investir dans des infrastructures locales.
- **Autonomisation technologique** : Développer des solutions alternatives (ex. : logiciels open source, IA fédérée).
- **Gestion du risque monopole** : Renforcer les alliances stratégiques pour limiter lexposition aux géants.
---
*Encadré final : Priorité immédiate à la résilience géopolitique et technologique (ISG > 60, IHH > 50) sans action, le risque de perte économique dépasse **20 milliards deuros en 3 ans**.*