On continue avec l'IA

This commit is contained in:
Stéphan Peccini 2025-05-19 13:38:30 +02:00
parent 747c56f252
commit f8baf851ae
5 changed files with 1848 additions and 0 deletions

209
IA/analyze_graph.py Normal file
View File

@ -0,0 +1,209 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script d'analyse de la structure du graphe DOT pour comprendre
comment intégrer l'ISG dans le générateur de template.
"""
import os
from pathlib import Path
from networkx.drawing.nx_agraph import read_dot
# Chemins
BASE_DIR = Path(__file__).resolve().parent
GRAPH_PATH = BASE_DIR / "graphe.dot"
def analyze_graph_structure(dot_path):
"""Analyse la structure du graphe et affiche ses caractéristiques."""
print(f"Analyse du fichier: {dot_path}")
# Lire le graphe
G = read_dot(dot_path)
# Informations de base
print(f"Nombre total de nœuds: {len(G.nodes())}")
print(f"Nombre total d'arêtes: {len(G.edges())}")
# Analyse des attributs des nœuds
node_attrs = {}
for node, attrs in G.nodes(data=True):
for key in attrs:
if key not in node_attrs:
node_attrs[key] = set()
node_attrs[key].add(attrs[key])
print("\nAttributs des nœuds:")
for attr, values in node_attrs.items():
print(f"- {attr}: {len(values)} valeurs différentes")
if len(values) < 20: # Afficher seulement si le nombre de valeurs est raisonnable
print(f" Valeurs: {', '.join(sorted(values))}")
# Analyse des niveaux (si l'attribut existe)
if 'level' in node_attrs:
print("\nAnalyse par niveau:")
levels = {}
for node, attrs in G.nodes(data=True):
if 'level' in attrs:
level = attrs['level']
if level not in levels:
levels[level] = []
levels[level].append(node)
for level, nodes in sorted(levels.items()):
print(f"- Niveau {level}: {len(nodes)} nœuds")
# Afficher quelques exemples
if len(nodes) < 5:
print(f" Exemples: {', '.join(nodes)}")
else:
print(f" Exemples: {', '.join(nodes[:3])}... (et {len(nodes)-3} autres)")
# Analyse des attributs ISG
print("\nRecherche des attributs ISG:")
isg_nodes = []
for node, attrs in G.nodes(data=True):
if 'isg' in attrs:
isg_nodes.append((node, attrs['isg']))
if isg_nodes:
print(f"- {len(isg_nodes)} nœuds avec attribut ISG")
print(" Exemples:")
for node, isg in isg_nodes[:5]:
print(f" - {node}: ISG = {isg}")
else:
print("- Aucun nœud avec attribut ISG trouvé")
# Analyse des connexions pour les nœuds critiques (IHH)
print("\nAnalyse des nœuds avec IHH:")
ihh_nodes = []
for node, attrs in G.nodes(data=True):
if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs:
ihh_value_pays = attrs.get('ihh_pays', 'N/A')
ihh_value_acteurs = attrs.get('ihh_acteurs', 'N/A')
ihh_nodes.append((node, ihh_value_pays, ihh_value_acteurs))
if ihh_nodes:
print(f"- {len(ihh_nodes)} nœuds avec attributs IHH")
print(" Exemples:")
for node, ihh_pays, ihh_acteurs in ihh_nodes[:5]:
print(f" - {node}: IHH pays = {ihh_pays}, IHH acteurs = {ihh_acteurs}")
# Analyser les connexions de ce nœud
print(f" Connexions sortantes:")
out_edges = list(G.out_edges(node))
if out_edges:
for i, (_, target) in enumerate(out_edges[:3]):
print(f" - Vers {target}")
if len(out_edges) > 3:
print(f" - ... et {len(out_edges)-3} autres")
else:
print(" - Aucune connexion sortante")
print(f" Connexions entrantes:")
in_edges = list(G.in_edges(node))
if in_edges:
for i, (source, _) in enumerate(in_edges[:3]):
print(f" - Depuis {source}")
if len(in_edges) > 3:
print(f" - ... et {len(in_edges)-3} autres")
else:
print(" - Aucune connexion entrante")
else:
print("- Aucun nœud avec attributs IHH trouvé")
# Vérifier si un nœud a un attribut de niveau 99 (ISG supposé)
print("\nRecherche des nœuds de niveau 99 (ISG):")
level_99_nodes = []
for node, attrs in G.nodes(data=True):
if attrs.get('level') == '99':
level_99_nodes.append(node)
if level_99_nodes:
print(f"- {len(level_99_nodes)} nœuds de niveau 99")
print(" Exemples:")
for node in level_99_nodes[:5]:
print(f" - {node}")
# Analyser les connexions de ce nœud
print(f" Connexions entrantes:")
in_edges = list(G.in_edges(node))
if in_edges:
for i, (source, _) in enumerate(in_edges[:3]):
print(f" - Depuis {source}")
if len(in_edges) > 3:
print(f" - ... et {len(in_edges)-3} autres")
else:
print(" - Aucune connexion entrante")
else:
print("- Aucun nœud de niveau 99 trouvé")
def check_isg_paths(dot_path):
"""Vérifie les chemins entre les nœuds critiques (IHH) et les nœuds ISG."""
print("\nAnalyse des chemins entre nœuds IHH et nœuds ISG:")
# Lire le graphe
G = read_dot(dot_path)
# Identifier les nœuds avec IHH
ihh_nodes = []
for node, attrs in G.nodes(data=True):
ihh_pays = attrs.get('ihh_pays', 0)
ihh_acteurs = attrs.get('ihh_acteurs', 0)
try:
ihh_pays = float(ihh_pays)
ihh_acteurs = float(ihh_acteurs)
if ihh_pays > 25 or ihh_acteurs > 25: # Seuil critique
ihh_nodes.append(node)
except (ValueError, TypeError):
pass
if not ihh_nodes:
print("- Aucun nœud IHH critique trouvé")
return
print(f"- {len(ihh_nodes)} nœuds IHH critiques identifiés")
# Pour chaque nœud IHH critique, chercher des chemins vers des nœuds ISG
for node in ihh_nodes[:5]: # Limiter à 5 exemples
print(f"\n Analyse des chemins pour {node}:")
# Analyser les voisins directs
successors = list(G.successors(node))
print(f" - {len(successors)} successeurs directs")
if successors:
for succ in successors[:3]:
print(f" - Vers {succ}")
# Vérifier les attributs de ce successeur
succ_attrs = G.nodes[succ]
print(f" Attributs: {', '.join(f'{k}={v}' for k, v in succ_attrs.items() if k in ['level', 'isg'])}")
# Chercher les successeurs de niveau 2
succ2 = list(G.successors(succ))
print(f" {len(succ2)} successeurs de niveau 2")
if succ2:
for s2 in succ2[:2]:
print(f" - Vers {s2}")
s2_attrs = G.nodes[s2]
print(f" Attributs: {', '.join(f'{k}={v}' for k, v in s2_attrs.items() if k in ['level', 'isg'])}")
# Chercher encore plus loin si nécessaire
succ3 = list(G.successors(s2))
print(f" {len(succ3)} successeurs de niveau 3")
if succ3:
for s3 in succ3[:2]:
print(f" - Vers {s3}")
s3_attrs = G.nodes[s3]
print(f" Attributs: {', '.join(f'{k}={v}' for k, v in s3_attrs.items() if k in ['level', 'isg'])}")
def main():
"""Fonction principale."""
print("=== Analyse de la structure du graphe ===")
analyze_graph_structure(GRAPH_PATH)
print("\n=== Analyse des chemins entre nœuds critiques et ISG ===")
check_isg_paths(GRAPH_PATH)
if __name__ == "__main__":
main()

146
IA/check_paths.py Normal file
View File

@ -0,0 +1,146 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import sys
from collections import defaultdict
def extract_paths(file_path):
"""Extrait tous les chemins du fichier rapport_template.md"""
paths = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
# Extraire les lignes qui commencent par "Corpus/"
if line.strip().startswith("Corpus/"):
paths.append(line.strip())
except Exception as e:
print(f"Erreur lors de la lecture du fichier {file_path}: {e}")
sys.exit(1)
return paths
def check_paths(paths, base_dir):
"""Vérifie si les chemins existent dans le système de fichiers"""
results = {
"existing": [],
"missing": [],
"problematic": [] # Chemins qui pourraient nécessiter des corrections
}
for path in paths:
# Vérifier si le chemin est absolu ou relatif
abs_path = os.path.join(base_dir, path)
if os.path.exists(abs_path):
results["existing"].append(path)
else:
# Essayer de détecter des problèmes potentiels
problem_detected = False
# Vérifier les chemins avec "Fiche minerai" ou "Fiche fabrication"
if "Fiche minerai" in path or "Fiche fabrication" in path:
# Problème courant: mauvaise casse ou absence du mot "minerai"
path_lower = path.lower()
if "minerai" not in path_lower and "/minerai/" in path_lower:
corrected_path = path.replace("/Fiche ", "/Fiche minerai ")
if os.path.exists(os.path.join(base_dir, corrected_path)):
results["problematic"].append((path, corrected_path, "Mot 'minerai' manquant"))
problem_detected = True
# Vérifier les chemins SSD
if "SSD25" in path:
corrected_path = path.replace("SSD25", "SSD 2.5")
if os.path.exists(os.path.join(base_dir, corrected_path)):
results["problematic"].append((path, corrected_path, "Format 'SSD25' au lieu de 'SSD 2.5'"))
problem_detected = True
# Si aucun problème spécifique n'a été détecté, marquer comme manquant
if not problem_detected:
results["missing"].append(path)
return results
def find_similar_paths(missing_path, base_dir):
"""Essaie de trouver des chemins similaires pour aider à diagnostiquer le problème"""
missing_parts = missing_path.split('/')
similar_paths = []
# Rechercher dans les sous-répertoires correspondants
search_dir = os.path.join(base_dir, *missing_parts[:-1])
if os.path.exists(search_dir):
for file in os.listdir(search_dir):
if file.endswith('.md'):
similar_path = os.path.join(search_dir, file).replace(base_dir + '/', '')
similar_paths.append(similar_path)
# Si aucun chemin similaire n'est trouvé, remonter d'un niveau
if not similar_paths and len(missing_parts) > 2:
parent_dir = os.path.join(base_dir, *missing_parts[:-2])
if os.path.exists(parent_dir):
for dir_name in os.listdir(parent_dir):
if dir_name.lower() in missing_parts[-2].lower():
dir_path = os.path.join(parent_dir, dir_name)
if os.path.isdir(dir_path):
for file in os.listdir(dir_path):
if file.endswith('.md'):
similar_path = os.path.join(dir_path, file).replace(base_dir + '/', '')
similar_paths.append(similar_path)
return similar_paths
def main():
# Vérifier que nous sommes dans le bon répertoire
script_dir = os.path.dirname(os.path.abspath(__file__))
base_dir = script_dir
# Chemin vers le rapport_template.md
template_path = os.path.join(base_dir, "Corpus", "rapport_template.md")
if not os.path.exists(template_path):
print(f"Erreur: Le fichier {template_path} n'existe pas.")
sys.exit(1)
print("=== Vérification des chemins dans rapport_template.md ===")
# Extraire les chemins
paths = extract_paths(template_path)
print(f"Nombre total de chemins trouvés: {len(paths)}")
# Vérifier les chemins
results = check_paths(paths, base_dir)
# Afficher les résultats
print("\n=== Résultats ===")
print(f"Chemins existants: {len(results['existing'])}")
print(f"Chemins manquants: {len(results['missing'])}")
print(f"Chemins problématiques: {len(results['problematic'])}")
# Afficher les chemins manquants
if results["missing"]:
print("\n=== Chemins manquants ===")
for path in results["missing"]:
print(f"- {path}")
similar = find_similar_paths(path, base_dir)
if similar:
print(" Chemins similaires trouvés:")
for sim_path in similar[:3]: # Limiter à 3 suggestions
print(f" * {sim_path}")
# Afficher les chemins problématiques avec suggestions
if results["problematic"]:
print("\n=== Chemins problématiques ===")
for orig, corrected, reason in results["problematic"]:
print(f"- {orig}")
print(f" Suggestion: {corrected}")
print(f" Raison: {reason}")
# Résumé
if not results["missing"] and not results["problematic"]:
print("\nTous les chemins dans le rapport sont valides !")
else:
print("\nDes chemins problématiques ont été détectés. Veuillez corriger les erreurs.")
if __name__ == "__main__":
main()

1267
IA/generate_template.py Normal file

File diff suppressed because it is too large Load Diff

147
IA/replace_paths.py Normal file
View File

@ -0,0 +1,147 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour remplacer les références de chemins dans le rapport par le contenu des fichiers.
Ajuste automatiquement les niveaux de titres pour maintenir la hiérarchie.
"""
import os
import re
from pathlib import Path
# Chemins de base
BASE_DIR = Path(__file__).resolve().parent
BASE_DIR = BASE_DIR / ".."
CORPUS_DIR = BASE_DIR / "Corpus"
INPUT_PATH = CORPUS_DIR / "rapport_template.md"
OUTPUT_PATH = CORPUS_DIR / "rapport_final.md"
def determine_heading_level(line):
"""Détermine le niveau de titre d'une ligne."""
match = re.match(r'^(#+)\s+', line)
if match:
return len(match.group(1))
return 0
def determine_parent_level(lines, current_index):
"""Détermine le niveau de titre parent pour une ligne donnée."""
# Remonter dans les lignes précédentes pour trouver le titre parent
for i in range(current_index - 1, -1, -1):
level = determine_heading_level(lines[i])
if level > 0:
return level
return 0
def adjust_heading_levels(content, parent_level, is_intro_file=False, is_ivc_section=False):
"""Ajuste les niveaux de titres dans le contenu pour s'adapter à la hiérarchie."""
lines = content.split('\n')
# Si le contenu est vide, retourner une chaîne vide
if not lines:
return ""
# Déterminer le niveau minimum de titre dans le contenu original
min_level = 10
for line in lines:
level = determine_heading_level(line)
if level > 0 and level < min_level:
min_level = level
# Si aucun titre trouvé, simplement supprimer la première ligne si nécessaire
if min_level == 10:
if not is_intro_file and not is_ivc_section and lines:
return '\n'.join(lines[1:])
return content
# Traitement spécial pour les fichiers IVC et intro
if is_ivc_section or is_intro_file:
adjusted_lines = []
# Pour les fichiers IVC ou intro, on garde toutes les lignes mais on ajuste les niveaux des titres
for line in lines:
level = determine_heading_level(line)
if level > 0:
# Nouveau niveau = niveau parent + 1 + (niveau actuel - min_level)
new_level = parent_level + 1 + (level - min_level)
# S'assurer que le niveau ne dépasse pas 6 (limite en markdown)
new_level = min(new_level, 6)
line = re.sub(r'^#+\s+', '#' * new_level + ' ', line)
adjusted_lines.append(line)
else:
# Pour les fichiers standards, on supprime la première ligne
lines = lines[1:]
adjusted_lines = []
# Ajuster les niveaux de titres pour les lignes restantes
for line in lines:
level = determine_heading_level(line)
if level > 0:
# Nouveau niveau = niveau parent + 1 + (niveau actuel - min_level)
new_level = parent_level + 1 + (level - min_level)
# S'assurer que le niveau ne dépasse pas 6 (limite en markdown)
new_level = min(new_level, 6)
line = re.sub(r'^#+\s+', '#' * new_level + ' ', line)
adjusted_lines.append(line)
return '\n'.join(adjusted_lines)
def process_report():
"""Traite le rapport pour remplacer les chemins par le contenu."""
if not os.path.exists(INPUT_PATH):
print(f"Fichier d'entrée introuvable: {INPUT_PATH}")
return
# Lire le rapport
with open(INPUT_PATH, 'r', encoding='utf-8') as f:
lines = f.readlines()
output_lines = []
i = 0
while i < len(lines):
line = lines[i].strip()
# Vérifier si la ligne est un chemin
if line.startswith('Corpus/'):
path = line
full_path = BASE_DIR / path
# Déterminer le niveau de titre parent
parent_level = determine_parent_level(lines, i)
try:
# Lire le contenu du fichier
if os.path.exists(full_path):
# Vérifier si c'est un fichier _intro.md
is_intro_file = os.path.basename(full_path) == "_intro.md"
# Vérifier si c'est une section d'Indice de Vulnérabilité de Concurrence
is_ivc_section = "Vulnérabilité de Concurrence" in line or "/ivc-" in path.lower() or "/fiche technique ivc/" in path.lower()
with open(full_path, 'r', encoding='utf-8') as f:
content = f.read()
# Ajuster les niveaux de titres
adjusted_content = adjust_heading_levels(content, parent_level, is_intro_file, is_ivc_section)
# Ajouter le contenu ajusté
output_lines.append(f"<!-- Contenu du fichier {path} -->")
output_lines.append(adjusted_content)
output_lines.append(f"<!-- Fin du contenu de {path} -->")
else:
output_lines.append(f"<!-- Fichier non trouvé: {path} -->")
output_lines.append(line)
except Exception as e:
output_lines.append(f"<!-- Erreur lors de la lecture du fichier {path}: {str(e)} -->")
output_lines.append(line)
else:
# Conserver la ligne telle quelle
output_lines.append(line)
i += 1
# Écrire le rapport final
with open(OUTPUT_PATH, 'w', encoding='utf-8') as f:
f.write('\n'.join(output_lines))
print(f"Rapport final généré: {OUTPUT_PATH}")
if __name__ == "__main__":
process_report()

79
appel_IA.py Normal file
View File

@ -0,0 +1,79 @@
import requests
MODEL = "llama3-8b-fast:latest"
OLLAMA_URL = "http://localhost:11434/api/generate"
TEMP = 0.1
with open("Corpus/rapport_final.md", "r", encoding="utf-8") as f:
contenu = f.read()
prompt = f"""
Tu es un assistant stratégique expert chargé danalyser les vulnérabilités systémiques dans des chaînes de valeur numériques. Tu t'exprimes en français.
Tu travailles uniquement à partir du fichier Markdown `rapport_final.md`. Ce fichier est complet : najoute aucune connaissance externe.
== Objectif ==
Produire un rapport stratégique complet destiné à un COMEX ou à une direction des risques industrielles. Ce rapport doit permettre didentifier les vulnérabilités critiques qui menacent la résilience des produits numériques.
== Structure attendue ==
Le rapport que tu dois produire contient 4 sections :
1. Synthèse de lanalyse (style narratif pour décideurs, suivie dun encadré synthétique)
2. Analyse détaillée (explication structurée par niveau de vulnérabilité, encadré de données)
3. Points de vigilance (indicateurs clés à surveiller, horizon temporel)
4. Conclusion (scénario dimpact en cas de choc, encadré des conséquences sur les produits numériques)
== Données à analyser ==
Les données se trouvent uniquement dans les sections :
- `## Éléments factuels` : données brutes à exploiter
- `## Annexes` : fiches techniques détaillées pour comprendre les indices et les valeurs des éléments factuels
== Indices à utiliser ==
**IHH (Herfindahl-Hirschmann)** : mesure la concentration géographique ou industrielle.
- Interprétation : >25 = concentration élevée (rouge), 1525 = modérée (orange), <15 = faible (vert)
**ICS (Indice de Criticité de Substituabilité)** : évalue la difficulté à substituer un matériau.
- Calculé à partir de la faisabilité technique, des délais dimplémentation et du coût économique.
- Interprétation : >0.6 = critique, 0.30.6 = modéré, <0.3 = faible
**ISG (Indice de Stabilité Géopolitique)** : reflète la vulnérabilité politique, sociale ou climatique dun pays producteur.
- Interprétation : >70 = instabilité forte, 4070 = instabilité modérée, <40 = stable
**IVC (Indice de Vulnérabilité Concurrentielle)** : mesure la pression dautres secteurs sur laccès aux ressources du numérique.
- Interprétation : >15 = forte, 515 = modérée, <5 = faible
== Logique danalyse attendue ==
1. **Croise les indices** pour identifier les vulnérabilités critiques (ex. IHH élevé + ISG élevé + ICS élevé = vulnérabilité systémique)
2. **Hiérarchise clairement** : vulnérabilité critique, élevée, modérée
3. **Distingue les horizons temporels** (court terme = <2 ans, moyen terme = 25 ans, long terme >5 ans)
4. **Détaille les effets en cascade** sur les produits numériques (ex : minerai composant infrastructure)
5. **Évite toute recommandation industrielle ou politique**
== Exemples dencadrés synthétiques à inclure ==
POINTS CLÉS - [NOM DU MINÉRAI] :
Concentration critique : IHH 89 (Chine 94%)
Substituabilité : ICS 0.64 (difficile), délai 28 ans
Instabilité géopolitique : ISG 54 (modérée)
Vulnérabilité concurrentielle : IVC 1 (faible)
Horizon : court à moyen terme
Impact : semi-conducteurs, détecteurs IR, fibres optiques
== Important ==
Tu dois raisonner comme un analyste stratégique, pas comme un chatbot.
Tu rédiges un rapport professionnel prêt à être diffusé à la direction générale.
== Contenu à analyser ==
{contenu}
"""
response = requests.post(OLLAMA_URL, json={
"model": MODEL,
"prompt": prompt,
"stream": False,
"temperature": TEMP
})
print(response.json()["response"])