Mise à jour et nettoyage

This commit is contained in:
Fabrication du Numérique 2025-05-25 21:18:38 +02:00
parent 81f5bb3b66
commit 5839098db6
19 changed files with 12441 additions and 42741 deletions

0
IA/02 - injection_fiches/auto_ingest.py Executable file → Normal file
View File

View File

@ -1,79 +0,0 @@
import requests
MODEL = "llama3-8b-fast:latest"
OLLAMA_URL = "http://localhost:11434/api/generate"
TEMP = 0.1
with open("Corpus/rapport_final.md", "r", encoding="utf-8") as f:
contenu = f.read()
prompt = f"""
Tu es un assistant stratégique expert chargé danalyser les vulnérabilités systémiques dans des chaînes de valeur numériques. Tu t'exprimes en français.
Tu travailles uniquement à partir du fichier Markdown `rapport_final.md`. Ce fichier est complet : najoute aucune connaissance externe.
== Objectif ==
Produire un rapport stratégique complet destiné à un COMEX ou à une direction des risques industrielles. Ce rapport doit permettre didentifier les vulnérabilités critiques qui menacent la résilience des produits numériques.
== Structure attendue ==
Le rapport que tu dois produire contient 4 sections :
1. Synthèse de lanalyse (style narratif pour décideurs, suivie dun encadré synthétique)
2. Analyse détaillée (explication structurée par niveau de vulnérabilité, encadré de données)
3. Points de vigilance (indicateurs clés à surveiller, horizon temporel)
4. Conclusion (scénario dimpact en cas de choc, encadré des conséquences sur les produits numériques)
== Données à analyser ==
Les données se trouvent uniquement dans les sections :
- `## Éléments factuels` : données brutes à exploiter
- `## Annexes` : fiches techniques détaillées pour comprendre les indices et les valeurs des éléments factuels
== Indices à utiliser ==
**IHH (Herfindahl-Hirschmann)** : mesure la concentration géographique ou industrielle.
- Interprétation : >25 = concentration élevée (rouge), 1525 = modérée (orange), <15 = faible (vert)
**ICS (Indice de Criticité de Substituabilité)** : évalue la difficulté à substituer un matériau.
- Calculé à partir de la faisabilité technique, des délais dimplémentation et du coût économique.
- Interprétation : >0.6 = critique, 0.30.6 = modéré, <0.3 = faible
**ISG (Indice de Stabilité Géopolitique)** : reflète la vulnérabilité politique, sociale ou climatique dun pays producteur.
- Interprétation : >70 = instabilité forte, 4070 = instabilité modérée, <40 = stable
**IVC (Indice de Vulnérabilité Concurrentielle)** : mesure la pression dautres secteurs sur laccès aux ressources du numérique.
- Interprétation : >15 = forte, 515 = modérée, <5 = faible
== Logique danalyse attendue ==
1. **Croise les indices** pour identifier les vulnérabilités critiques (ex. IHH élevé + ISG élevé + ICS élevé = vulnérabilité systémique)
2. **Hiérarchise clairement** : vulnérabilité critique, élevée, modérée
3. **Distingue les horizons temporels** (court terme = <2 ans, moyen terme = 25 ans, long terme >5 ans)
4. **Détaille les effets en cascade** sur les produits numériques (ex : minerai composant infrastructure)
5. **Évite toute recommandation industrielle ou politique**
== Exemples dencadrés synthétiques à inclure ==
POINTS CLÉS - [NOM DU MINÉRAI] :
Concentration critique : IHH 89 (Chine 94%)
Substituabilité : ICS 0.64 (difficile), délai 28 ans
Instabilité géopolitique : ISG 54 (modérée)
Vulnérabilité concurrentielle : IVC 1 (faible)
Horizon : court à moyen terme
Impact : semi-conducteurs, détecteurs IR, fibres optiques
== Important ==
Tu dois raisonner comme un analyste stratégique, pas comme un chatbot.
Tu rédiges un rapport professionnel prêt à être diffusé à la direction générale.
== Contenu à analyser ==
{contenu}
"""
response = requests.post(OLLAMA_URL, json={
"model": MODEL,
"prompt": prompt,
"stream": False,
"temperature": TEMP
})
print(response.json()["response"])

12
batch_ia/analyse_ia.py Normal file
View File

@ -0,0 +1,12 @@
import sys
from pathlib import Path
dot_path = Path(sys.argv[1])
output_path = Path(sys.argv[2])
with dot_path.open() as f:
contenu = f.read()
with output_path.open("w") as f:
f.write("Analyse IA du graphe suivant :\n\n")
f.write(contenu)

7
batch_ia/batch.service Normal file
View File

@ -0,0 +1,7 @@
[Unit]
Description=Batch IA Processing Service
[Service]
ExecStart=/usr/bin/systemd-run --scope --property=CPUQuota=87.5% --property=MemoryMax=12G /usr/bin/python3 /chemin/complet/vers/batch_ia/batch_runner.py
WorkingDirectory=/chemin/complet/vers/batch_ia
Restart=always

30
batch_ia/batch_runner.py Normal file
View File

@ -0,0 +1,30 @@
import time
import subprocess
from batch_utils import *
while True:
status = charger_status()
jobs = [(login, data) for login, data in status.items() if data["status"] == "en attente"]
jobs = sorted(jobs, key=lambda x: x[1].get("timestamp", 0))
for i, (login, data) in enumerate(jobs):
status[login]["position"] = i
if jobs:
login, _ = jobs[0]
dot_file = JOBS_DIR / f"{login}.dot"
result_file = JOBS_DIR / f"{login}.result.txt"
status[login]["status"] = "en cours"
sauvegarder_status(status)
try:
subprocess.run(["python3", "analyse_ia.py", str(dot_file), str(result_file)], check=True)
status[login]["status"] = "terminé"
except Exception as e:
status[login]["status"] = "échoué"
status[login]["error"] = str(e)
sauvegarder_status(status)
time.sleep(10)

58
batch_ia/batch_utils.py Normal file
View File

@ -0,0 +1,58 @@
import json
import time
from pathlib import Path
from networkx.drawing.nx_agraph import write_dot
BATCH_DIR = Path(__file__).resolve().parent
JOBS_DIR = BATCH_DIR / "jobs"
STATUS_FILE = BATCH_DIR / "status.json"
def charger_status():
if STATUS_FILE.exists():
return json.loads(STATUS_FILE.read_text())
return {}
def sauvegarder_status(data):
STATUS_FILE.write_text(json.dumps(data, indent=2))
def statut_utilisateur(login):
status = charger_status()
entry = status.get(login)
if not entry:
return {"statut": None, "position": None, "telechargement": None,
"message": "Aucune tâche en cours."}
if entry["status"] == "en attente":
return {"statut": "en attente", "position": entry.get("position"),
"telechargement": None,
"message": f"En attente (position {entry.get('position', '?')})."}
if entry["status"] == "en cours":
return {"statut": "en cours", "position": 0,
"telechargement": None, "message": "Analyse en cours."}
if entry["status"] == "terminé":
result_file = JOBS_DIR / f"{login}.result.txt"
if result_file.exists():
return {"statut": "terminé", "position": None,
"telechargement": result_file.read_text(),
"message": "Analyse terminée. Télécharger le résultat."}
if entry["status"] == "échoué":
return {"statut": "échoué", "position": None, "telechargement": None,
"message": f"Échec : {entry.get('error', 'erreur inconnue')}"}
def soumettre_batch(login, G):
if statut_utilisateur(login)["statut"]:
raise RuntimeError("Un batch est déjà en cours.")
write_dot(G, JOBS_DIR / f"{login}.dot")
status = charger_status()
status[login] = {"status": "en attente", "timestamp": time.time()}
sauvegarder_status(status)
def nettoyage_post_telechargement(login):
(JOBS_DIR / f"{login}.dot").unlink(missing_ok=True)
(JOBS_DIR / f"{login}.result.txt").unlink(missing_ok=True)
status = charger_status()
status.pop(login, None)
sauvegarder_status(status)

128
index.py
View File

@ -1,128 +0,0 @@
#!/usr/bin/env python3
"""
index.py indexation hybride des minifiches
===========================================
1 fichier = 1 passage **si** le fichier WORD_LIMIT mots (par défaut : 600).
Audelà (rare : fiche ICS, ISG, etc.), on découpe en blocs ~CHUNK mots
avec chevauchement OVERLAP pour isoler les tableaux et valeurs numériques.
Incrémental : encode uniquement les fichiers nouveaux ou modifiés.
Embeddings : BGEM3 (FlagEmbedding) en CPU, normalisés L2.
Usage :
python index.py --root Corpus # première construction
python index.py # relance rapide (0 s si rien)
Arguments :
--root dossier des fiches (déf. Corpus)
--index nom du fichier FAISS (déf. corpus.idx)
--meta fichier méta JSON (déf. corpus.meta.json)
--word WORD_LIMIT (déf. 600)
--chunk CHUNK (déf. 350)
"""
import argparse, json, re, sys
from pathlib import Path
import faiss, numpy as np
from FlagEmbedding import BGEM3FlagModel
from rich import print
# --------------------- CLI --------------------------------------------------
parser = argparse.ArgumentParser(description="Indexation hybride : 1 passage par fiche courte, découpe douce pour les longues.")
parser.add_argument("--root", default="Corpus", help="Répertoire racine des fiches")
parser.add_argument("--index", default="corpus.idx", help="Nom du fichier FAISS")
parser.add_argument("--meta", default="corpus.meta.json", help="Nom du méta JSON")
parser.add_argument("--word", type=int, default=600, help="WORD_LIMIT : audelà on découpe (mots)")
parser.add_argument("--chunk", type=int, default=350, help="Taille des chunks quand on découpe (mots)")
args = parser.parse_args()
ROOT = Path(args.root)
INDEX_F = Path(args.index)
META_F = Path(args.meta)
WORD_LIMIT= args.word
CHUNK = args.chunk
OVERLAP = 50
EXTS = {".md", ".markdown", ".txt"}
print(f"[dim]Racine : {ROOT} | Index : {INDEX_F}[/]")
# ---------------- split helper --------------------------------------------
def split_long(text: str):
"""Découpe douce : blocs ~CHUNK mots, préserve tableaux."""
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks, buf = [], []
for s in sentences:
if "|" in s or re.fullmatch(r"\s*-{3,}\s*", s):
if buf:
chunks.append(" ".join(buf))
buf = []
chunks.append(s)
continue
buf.append(s)
if len(" ".join(buf).split()) >= CHUNK:
chunks.append(" ".join(buf))
buf = buf[-OVERLAP:]
if buf:
chunks.append(" ".join(buf))
return chunks
# ------------------------ lire méta existant ------------------------------
old_meta = {}
if INDEX_F.exists() and META_F.exists():
try:
for m in json.load(META_F.open()):
old_meta[m["path"]] = m
except Exception as e:
print(f"[yellow]Avertissement : méta illisible ({e}), reconstruction complète.[/]")
old_meta = {}
# ------------------------ scanner les fichiers ----------------------------
files = [fp for fp in ROOT.rglob("*") if fp.suffix.lower() in EXTS]
files.sort()
new_docs, new_meta, kept_meta = [], [], []
for fp in files:
rel = str(fp.relative_to(ROOT))
mtime = int(fp.stat().st_mtime)
prev = old_meta.get(rel)
if prev and prev["mtime"] == mtime:
kept_meta.append(prev)
continue
txt = fp.read_text(encoding="utf-8")
words = len(txt.split())
if words <= WORD_LIMIT:
new_docs.append(txt)
new_meta.append({"path": rel, "part": 0, "mtime": mtime})
else:
for i, chunk in enumerate(split_long(txt)):
new_docs.append(chunk)
new_meta.append({"path": rel, "part": i, "mtime": mtime})
print(f"Nouveaux/Modifiés : {len(new_meta)} | Conservés : {len(kept_meta)}")
if not new_meta and INDEX_F.exists():
print("Index déjà à jour ✔︎")
sys.exit(0)
# ------------------------ embeddings --------------------------------------
model = BGEM3FlagModel("BAAI/bge-m3", device="cpu")
emb = model.encode(new_docs)
if isinstance(emb, dict):
emb = next(v for v in emb.values() if isinstance(v, np.ndarray))
emb = emb / np.linalg.norm(emb, axis=1, keepdims=True)
emb = emb.astype("float32")
# ------------------------ FAISS update ------------------------------------
if INDEX_F.exists():
idx = faiss.read_index(str(INDEX_F))
else:
idx = faiss.IndexFlatIP(emb.shape[1])
idx.add(emb)
faiss.write_index(idx, str(INDEX_F))
# ------------------------ save meta ---------------------------------------
all_meta = kept_meta + new_meta
json.dump(all_meta, META_F.open("w"), ensure_ascii=False, indent=2)
print(f"Index mis à jour ✔︎ | Total passages : {idx.ntotal}")

View File

@ -1,24 +0,0 @@
#!/bin/bash
# Lancer fabnum avec Streamlit, selon l'environnement défini dans .env
# Aller dans le dossier du script
cd "$(dirname "$0")"
# Charger l'environnement Python
source venv/bin/activate
# Charger les variables d'environnement définies dans .env
if [ -f .env ]; then
export $(grep -v '^#' .env | xargs)
else
echo "⚠️ Fichier .env manquant !"
exit 1
fi
# Valeur par défaut si PORT non défini
PORT=${PORT:-8501}
echo "🔄 Lancement de Fabnum ($ENV) sur le port $PORT..."
# Exécuter streamlit via l'interpréteur du venv
exec venv/bin/streamlit run fabnum.py --server.address=127.0.0.1 --server.port=$PORT

128
rag.py
View File

@ -1,128 +0,0 @@
#!/usr/bin/env python3
"""
rag.py recherche + génération (version robuste, chapitres)
============================================================
Charge **un ou plusieurs** couples index/meta (FAISS + JSON). Par défaut :
rapport.idx / rapport.meta.json
Reconstitue les textes à partir des fichiers `path` indiqués dans la méta.
Les passages sont déjà prêts (1 par fichier court, ou découpés par index.py).
Recherche : embeddings BGEM3 (CPU) + FAISS (cosinus IP) sur tous les index.
topk configurable (déf. 20 pour index détaillé, 5 pour index chapitres).
trie ensuite les hits mettant en avant ceux contenant un motclé fourni
(ex. « seuil » pour ICS).
Génération : appelle llama3-8b-fast (Ollama) avec temperature 0.1 et consigne :
« Réponds uniquement à partir du contexte. Si linfo manque : Je ne sais pas. »
Usage :
python rag.py [--k 25] [--kw seuil] [--model llama3-8b-fast]
"""
from __future__ import annotations
import argparse, json, re, sys
from pathlib import Path
import faiss, numpy as np, requests
from FlagEmbedding import BGEM3FlagModel
from rich import print
ROOT = Path("Rapport")
# ------------------------- CLI -------------------------------------------
p = argparse.ArgumentParser()
p.add_argument("--index", nargs="*", default=["rapport.idx"],
help="Liste des fichiers FAISS à charger (déf. rapport.idx)")
p.add_argument("--meta", nargs="*", default=["rapport.meta.json"],
help="Liste des méta JSON assortis (même ordre que --index)")
p.add_argument("--k", type=int, default=15, help="topk cumulés (déf. 15)")
p.add_argument("--kw", default="seuil", help="motclé boosté (déf. seuil)")
p.add_argument("--model", default="llama3-8b-fast", help="modèle Ollama")
args = p.parse_args()
if len(args.index) != len(args.meta):
print("[red]Erreur : --index et --meta doivent avoir la même longueur.")
sys.exit(1)
# ------------------------- charger indexes -------------------------------
indexes, metas, start_offset = [], [], []
offset = 0
for idx_f, meta_f in zip(args.index, args.meta):
idx = faiss.read_index(str(idx_f))
meta = json.load(open(meta_f))
if idx.ntotal != len(meta):
print(f"[yellow]Avertissement : {idx_f} contient {idx.ntotal} vecteurs, meta {len(meta)} lignes.[/]")
indexes.append(idx)
metas.append(meta)
start_offset.append(offset)
offset += idx.ntotal
total_passages = offset
print(f"Passages chargés : {total_passages} (agrégat de {len(indexes)} index)")
# ------------------------- cache texte -----------------------------------
DOCS: dict[int,str] = {}
for base_offset, meta in zip(start_offset, metas):
for i, m in enumerate(meta):
rel_path = m.get("path") or m.get("file")
full_path = ROOT / rel_path
DOCS[base_offset + i] = full_path.read_text(encoding="utf-8")
print("[dim]Cache texte préchargé.[/]")
# ------------------------- modèle embeddings -----------------------------
embedder = BGEM3FlagModel("BAAI/bge-m3", device="cpu")
# ------------------------- helpers ---------------------------------------
def encode_query(q: str):
emb = embedder.encode([q])
if isinstance(emb, dict):
emb = next(v for v in emb.values() if isinstance(v, np.ndarray))
v = emb[0]
return (v / np.linalg.norm(v)).astype("float32").reshape(1, -1)
def search_all(vec):
hits = []
for idx, off in zip(indexes, start_offset):
D, I = idx.search(vec, min(args.k, idx.ntotal))
hits.extend([off + int(i) for i in I[0]])
return hits
# ------------------------- boucle interactive ----------------------------
print("RAG prêt ! (CtrlD pour quitter)")
while True:
try:
q = input("❓ > ").strip()
except (EOFError, KeyboardInterrupt):
print("\nBye."); break
if not q: continue
# correction rapide de typos courantes (substituabilité…)
q_norm = re.sub(r"susbtitu[a-z]+", "substituabilité", q, flags=re.I)
vec = encode_query(q_norm)
hits = search_all(vec)
# Boost lexical : passages contenant le motclé args.kw dabord
kw_lower = args.kw.lower()
hits.sort(key=lambda i: kw_lower not in DOCS[i].lower())
hits = hits[:args.k]
context = "\n\n".join(DOCS[i] for i in hits)
prompt = (
"<system>Réponds en français, de façon précise et uniquement à partir du contexte. "
"Si l'information n'est pas dans le contexte, réponds : 'Je ne sais pas'.</system>\n"
f"<context>{context}</context>\n"
f"<user>{q}</user>"
)
r = requests.post("http://127.0.0.1:11434/api/generate", json={
"model": args.model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.1, "num_predict": 512}
}, timeout=300)
answer = r.json().get("response", "(erreur API)")
print("\n[bold]Réponse :[/]\n", answer)
print("\n[dim]--- contexte utilisé (top " + str(len(hits)) + ") ---[/]")
for rank, idx_id in enumerate(hits, 1):
m = metas[0] # non utilisé ici, on affiche juste le nom
path = DOCS[idx_id].splitlines()[0][:250]
print(f"[{rank}] … {path}")

23407
schema.txt

File diff suppressed because one or more lines are too long

280
scripts/auto_ingest.py Normal file
View File

@ -0,0 +1,280 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script pour l'injection automatique de fichiers dans Private GPT.
Ce script scanne un répertoire source et injecte les nouveaux fichiers via l'API de Private GPT.
"""
import os
import sys
import time
import json
import argparse
import logging
import requests
from pathlib import Path
from typing import List, Set, Dict, Any, Optional
from datetime import datetime
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('auto_ingest.log')
]
)
logger = logging.getLogger("auto_ingest")
# Extensions de fichiers supportées par défaut
DEFAULT_SUPPORTED_EXTENSIONS = {
'.txt', '.pdf', '.csv', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx',
'.md', '.html', '.htm', '.json', '.xml', '.rtf', '.odt', '.ods', '.odp'
}
class PrivateGPTIngestor:
"""Classe pour gérer l'ingestion de fichiers dans Private GPT."""
def __init__(self, api_url: str = "http://localhost:8001",
processed_file: str = "processed_files.json"):
"""
Initialise l'ingesteur.
Args:
api_url: URL de l'API Private GPT
processed_file: Fichier pour stocker les fichiers déjà traités
"""
self.api_url = api_url
self.processed_file = processed_file
self.processed_files = self._load_processed_files()
def _load_processed_files(self) -> Set[str]:
"""Charge la liste des fichiers déjà traités."""
try:
if os.path.exists(self.processed_file):
with open(self.processed_file, 'r', encoding='utf-8') as f:
return set(json.load(f))
return set()
except Exception as e:
logger.error(f"Erreur lors du chargement des fichiers traités: {e}")
return set()
def _save_processed_files(self) -> None:
"""Sauvegarde la liste des fichiers déjà traités."""
try:
with open(self.processed_file, 'w', encoding='utf-8') as f:
json.dump(list(self.processed_files), f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des fichiers traités: {e}")
def scan_directory(self, directory: str, extensions: Set[str] = None,
recursive: bool = True) -> List[str]:
"""
Scanne un répertoire pour trouver des fichiers à injecter.
Args:
directory: Le répertoire à scanner
extensions: Extensions de fichiers à prendre en compte
recursive: Si True, scanne les sous-répertoires
Returns:
Liste des chemins des fichiers à injecter
"""
if extensions is None:
extensions = DEFAULT_SUPPORTED_EXTENSIONS
files_to_ingest = []
directory_path = Path(directory)
if not directory_path.exists():
logger.error(f"Le répertoire {directory} n'existe pas")
return []
logger.info(f"Scan du répertoire {directory}")
# Fonction de scan
def scan_dir(path: Path):
for item in path.iterdir():
if item.is_file() and any(item.name.lower().endswith(ext) for ext in extensions):
abs_path = str(item.absolute())
if abs_path not in self.processed_files:
files_to_ingest.append(abs_path)
elif item.is_dir() and recursive:
scan_dir(item)
scan_dir(directory_path)
logger.info(f"Trouvé {len(files_to_ingest)} fichiers à injecter")
return files_to_ingest
def ingest_file(self, file_path: str) -> bool:
"""
Injecte un fichier dans Private GPT via l'API.
Args:
file_path: Chemin du fichier à injecter
Returns:
True si l'injection a réussi, False sinon
"""
logger.info(f"Injection du fichier: {file_path}")
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f)}
response = requests.post(f"{self.api_url}/v1/ingest/file", files=files)
if response.status_code == 200:
logger.info(f"Injection réussie pour {file_path}")
self.processed_files.add(file_path)
self._save_processed_files()
return True
else:
logger.error(f"Échec de l'injection pour {file_path}: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Erreur lors de l'injection de {file_path}: {e}")
return False
def list_documents(self) -> List[Dict[str, Any]]:
"""
Liste les documents déjà injectés dans Private GPT.
Returns:
Liste des documents injectés
"""
try:
response = requests.get(f"{self.api_url}/v1/ingest/list")
if response.status_code == 200:
return response.json()
else:
logger.error(f"Échec de la récupération des documents: {response.status_code} - {response.text}")
return []
except Exception as e:
logger.error(f"Erreur lors de la récupération des documents: {e}")
return []
def run_ingestion(self, directory: str, extensions: Set[str] = None,
recursive: bool = True, batch_size: int = 5,
delay: float = 2.0) -> None:
"""
Exécute l'ingestion des fichiers d'un répertoire.
Args:
directory: Répertoire à scanner
extensions: Extensions à prendre en compte
recursive: Si True, scanne les sous-répertoires
batch_size: Nombre de fichiers à injecter par lot
delay: Délai entre chaque lot (en secondes)
"""
files_to_ingest = self.scan_directory(directory, extensions, recursive)
if not files_to_ingest:
logger.info("Aucun nouveau fichier à injecter")
return
total_files = len(files_to_ingest)
successful = 0
failed = 0
for i, file_path in enumerate(files_to_ingest):
logger.info(f"Progression: {i+1}/{total_files}")
if self.ingest_file(file_path):
successful += 1
else:
failed += 1
# Pause après chaque lot
if (i + 1) % batch_size == 0 and i < total_files - 1:
logger.info(f"Pause de {delay} secondes après le lot de {batch_size} fichiers")
time.sleep(delay)
logger.info(f"Ingestion terminée: {successful} réussis, {failed} échoués sur {total_files} fichiers")
def parse_args():
"""Parse les arguments de ligne de commande."""
parser = argparse.ArgumentParser(description="Outil d'injection automatique pour Private GPT")
parser.add_argument("--directory", "-d", type=str, required=True,
help="Répertoire contenant les fichiers à injecter")
parser.add_argument("--api-url", type=str, default="http://localhost:8001",
help="URL de l'API Private GPT (défaut: http://localhost:8001)")
parser.add_argument("--recursive", "-r", action="store_true", default=True,
help="Scanner récursivement les sous-répertoires")
parser.add_argument("--extensions", "-e", type=str, nargs="+",
help="Extensions de fichiers à prendre en compte (ex: .pdf .txt)")
parser.add_argument("--batch-size", "-b", type=int, default=5,
help="Nombre de fichiers à injecter par lot (défaut: 5)")
parser.add_argument("--delay", type=float, default=2.0,
help="Délai entre les lots en secondes (défaut: 2.0)")
parser.add_argument("--list", "-l", action="store_true",
help="Lister les documents déjà injectés et quitter")
parser.add_argument("--watch", "-w", action="store_true",
help="Mode surveillance: vérifier périodiquement les nouveaux fichiers")
parser.add_argument("--watch-interval", type=int, default=300,
help="Intervalle de surveillance en secondes (défaut: 300)")
return parser.parse_args()
def main():
"""Fonction principale."""
args = parse_args()
ingestor = PrivateGPTIngestor(api_url=args.api_url)
# Option pour lister les documents
if args.list:
documents = ingestor.list_documents()
print(f"Documents déjà injectés ({len(documents)}):")
for doc in documents:
print(f"- {doc.get('doc_id')}: {doc.get('doc_metadata', {}).get('file_name', 'Inconnu')}")
return
# Conversion des extensions
extensions = None
if args.extensions:
extensions = set()
for ext in args.extensions:
if not ext.startswith('.'):
ext = '.' + ext
extensions.add(ext.lower())
# Mode surveillance
if args.watch:
logger.info(f"Mode surveillance activé. Vérification toutes les {args.watch_interval} secondes")
try:
while True:
start_time = datetime.now()
logger.info(f"Démarrage d'un scan à {start_time.strftime('%H:%M:%S')}")
ingestor.run_ingestion(
directory=args.directory,
extensions=extensions,
recursive=args.recursive,
batch_size=args.batch_size,
delay=args.delay
)
# Calcul du temps à attendre
elapsed = (datetime.now() - start_time).total_seconds()
wait_time = max(0, args.watch_interval - elapsed)
if wait_time > 0:
logger.info(f"En attente pendant {wait_time:.1f} secondes jusqu'au prochain scan")
time.sleep(wait_time)
except KeyboardInterrupt:
logger.info("Arrêt du mode surveillance")
else:
# Exécution unique
ingestor.run_ingestion(
directory=args.directory,
extensions=extensions,
recursive=args.recursive,
batch_size=args.batch_size,
delay=args.delay
)
if __name__ == "__main__":
main()

View File

@ -11,6 +11,14 @@ import re
import yaml
from networkx.drawing.nx_agraph import read_dot
from pathlib import Path
from collections import defaultdict
import uuid
import requests
import json
import time
session_uuid = str(uuid.uuid4())[:8] # Utiliser les 8 premiers caractères pour plus de concision
print(f"🔑 UUID de session généré: {session_uuid}")
BASE_DIR = Path(__file__).resolve().parent
CORPUS_DIR = BASE_DIR.parent / "Corpus"
@ -18,7 +26,23 @@ CONFIG_PATH = BASE_DIR / "config.yml"
THRESHOLDS_PATH = BASE_DIR.parent / "assets" / "config.yaml"
REFERENCE_GRAPH_PATH = BASE_DIR.parent / "schema.txt"
GRAPH_PATH = BASE_DIR.parent / "graphe.dot"
TEMPLATE_PATH = BASE_DIR / "rapport_final.md"
TEMP_SECTIONS = BASE_DIR / "temp_sections"
TEMPLATE_PATH = TEMP_SECTIONS / f"rapport_final - {session_uuid}.md"
if not TEMP_SECTIONS.exists():
TEMP_SECTIONS.mkdir(parents=True)
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
PROMPT_METHODOLOGIE = """
Le rapport à examiner a été établi à partir de la méthodologie suivante.
Le dispositif dévaluation des risques proposé repose sur quatre indices clairement définis, chacun analysant un aspect spécifique des risques dans la chaîne dapprovisionnement numérique. Lindice IHH mesure la concentration géographique ou industrielle, permettant dévaluer la dépendance vis-à-vis de certains acteurs ou régions. Lindice ISG indique la stabilité géopolitique des pays impliqués dans la chaîne de production, en intégrant des critères politiques, sociaux et climatiques. Lindice ICS quantifie la facilité ou la difficulté à remplacer ou substituer un élément spécifique dans la chaîne, évaluant ainsi les risques liés à la dépendance technologique et économique. Enfin, lindice IVC examine la pression concurrentielle sur les ressources utilisées par le numérique, révélant ainsi le risque potentiel que ces ressources soient détournées vers dautres secteurs industriels.
Ces indices se combinent judicieusement par paires pour une évaluation approfondie et pertinente des risques. La combinaison IHH-ISG permet dassocier la gravité d'un impact potentiel (IHH) à la probabilité de survenance dun événement perturbateur (ISG), créant ainsi une matrice de vulnérabilité combinée utile pour identifier rapidement les points critiques dans la chaîne de production. La combinaison ICS-IVC fonctionne selon la même logique, mais se concentre spécifiquement sur les ressources minérales : lICS indique la gravité potentielle d'une rupture d'approvisionnement due à une faible substituabilité, tandis que lIVC évalue la probabilité que les ressources soient captées par d'autres secteurs industriels concurrents. Ces combinaisons permettent dobtenir une analyse précise et opérationnelle du niveau de risque global.
Les avantages de cette méthodologie résident dans son approche à la fois systématique et granulaire, adaptée à l'échelle décisionnelle d'un COMEX. Elle permet didentifier avec précision les vulnérabilités majeures et leurs origines spécifiques, facilitant ainsi la prise de décision stratégique éclairée et proactive. En combinant des facteurs géopolitiques, industriels, technologiques et concurrentiels, ces indices offrent un suivi efficace de la chaîne de fabrication numérique, garantissant ainsi une gestion optimale des risques et la continuité opérationnelle à long terme.
"""
DICTIONNAIRE_CRITICITES = {
"IHH": {"vert": "Faible", "orange": "Modérée", "rouge": "Élevée"},
@ -1019,14 +1043,10 @@ def trouver_dossier_composant(nom_composant, base_path, prefixe):
Parcourt les sous-répertoires de base_path et retourne celui qui correspond au composant.
"""
search_path = os.path.join(CORPUS_DIR, base_path)
print(nom_composant)
print(base_path)
print(search_path)
if not os.path.exists(search_path):
return None
for d in os.listdir(search_path):
print(d)
if os.path.isdir(os.path.join(search_path, d)):
if composant_match(f"{prefixe}{nom_composant}", d):
return os.path.join(base_path, d)
@ -1053,7 +1073,6 @@ def generate_operations_section(data, results, config):
# Récupérer la présentation synthétique
# product_slug = product['label'].lower().replace(' ', '-')
sous_repertoire = f"{product['label']}"
print(product)
if product["level"] == 0:
type = "Assemblage"
else:
@ -1540,6 +1559,74 @@ def generate_critical_paths_section(data, results):
return "\n".join(template)
def extraire_sections_par_mot_cle(fichier_markdown: Path) -> dict:
"""
Extrait les sections de niveau 3 uniquement dans la section
'## Chaînes avec risque critique' du fichier Markdown,
et les regroupe par mot-clé (ce qui se trouve entre '### ' et '').
Réduit chaque titre dun niveau (#).
"""
with fichier_markdown.open(encoding="utf-8") as f:
contenu = f.read()
# Extraire uniquement la section '## Chaînes avec risque critique'
match_section = re.search(
r"## Chaînes avec risque critique(.*?)(?=\n## |\Z)", contenu, re.DOTALL
)
if not match_section:
return {}
section_critique = match_section.group(1)
# Extraire les mots-clés entre '### ' et ' →'
mots_cles = set(re.findall(r"^### (.+?) →", section_critique, re.MULTILINE))
# Extraire tous les blocs de niveau 3 dans cette section uniquement
blocs_sections = re.findall(r"(### .+?)(?=\n### |\n## |\Z)", section_critique, re.DOTALL)
# Regrouper les blocs par mot-clé
regroupement = defaultdict(list)
for bloc in blocs_sections:
match = re.match(r"### (.+?) →", bloc)
if match:
mot = match.group(1)
if mot in mots_cles:
# Réduction du niveau des titres
bloc_modifie = re.sub(r"^###", "##", bloc, flags=re.MULTILINE)
bloc_modifie = re.sub(r"^###", "##", bloc_modifie, flags=re.MULTILINE)
regroupement[mot].append(bloc_modifie)
return {mot: "\n\n".join(blocs) for mot, blocs in regroupement.items()}
def ingest_document(file_path: Path) -> bool:
"""Ingère un document dans PrivateGPT"""
try:
with open(file_path, "rb") as f:
file_name = file_path.name
files = {"file": (file_name, f, "text/markdown")}
# Ajouter des métadonnées pour identifier facilement ce fichier d'entrée
metadata = {
"type": "input_file",
"session_id": session_uuid,
"document_type": "rapport_analyse_input"
}
response = requests.post(
f"{API_URL}/ingest/file",
files=files,
data={"metadata": json.dumps(metadata)} if "metadata" in requests.get(f"{API_URL}/ingest/file").text else None
)
response.raise_for_status()
print(f"✅ Document '{file_path}' ingéré avec succès sous le nom '{file_name}'")
return True
except FileNotFoundError:
print(f"❌ Fichier '{file_path}' introuvable")
return False
except requests.RequestException as e:
print(f"❌ Erreur lors de l'ingestion du document: {e}")
return False
def generate_report(data, results, config):
"""
Génère le rapport complet structuré selon les spécifications.
@ -1565,6 +1652,31 @@ def generate_report(data, results, config):
# Section chemins critiques
report_critical_paths = generate_critical_paths_section(data, results)
suffixe = " - chemins critiques"
fichier = TEMPLATE_PATH.name.replace(".md", f"{suffixe}.md")
fichier_path = TEMPLATE_PATH.parent / fichier
# Élever les titres Markdown dans report_critical_paths
report_critical_paths = re.sub(r'^(#{2,})', lambda m: '#' * (len(m.group(1)) - 1), report_critical_paths, flags=re.MULTILINE)
write_report(report_critical_paths, fichier_path)
# Récupérer les sections critiques décomposées par mot-clé
chemins_critiques_sections = extraire_sections_par_mot_cle(fichier_path)
file_names = []
# Pour chaque mot-clé, écrire un fichier individuel
for mot_cle, contenu in chemins_critiques_sections.items():
print(mot_cle)
suffixe = f" - chemins critiques {mot_cle}"
fichier_personnalise = TEMPLATE_PATH.with_name(
TEMPLATE_PATH.name.replace(".md", f"{suffixe}.md")
)
# Ajouter du texte au début du contenu
introduction = f"# Détail des chemins critiques pour : {mot_cle}\n\n"
contenu = introduction + contenu
write_report(contenu, fichier_personnalise)
file_names.append(fichier_personnalise)
# report.append(generate_critical_paths_section(data, results))
# Ordre de composition final
@ -1577,36 +1689,243 @@ def generate_report(data, results, config):
[report_methodologie]
)
return "\n".join(report)
return "\n".join(report), file_names
def write_report(report, config):
def generate_text(input_file, full_prompt, system_message, temperature = "0.1"):
"""Génère du texte avec l'API PrivateGPT"""
try:
# Définir les paramètres de la requête
payload = {
"messages": [
{"role": "system", "content": system_message},
{"role": "user", "content": full_prompt}
],
"use_context": True, # Active la recherche RAG dans les documents ingérés
"temperature": temperature, # Température réduite pour plus de cohérence
"stream": False
}
# Tenter d'ajouter un filtre de contexte (fonctionnalité expérimentale qui peut ne pas être supportée)
if input_file:
try:
# Vérifier si le filtre de contexte est supporté sans faire de requête supplémentaire
liste_des_fichiers = list(TEMP_SECTIONS.glob(f"*{session_uuid}*.md"))
filter_metadata = {
"document_name": [input_file.name] + [f.name for f in liste_des_fichiers]
}
payload["filter_metadata"] = filter_metadata
except Exception as e:
print(f" Remarque: Impossible d'appliquer le filtre de contexte: {e}")
# Envoyer la requête
response = requests.post(
f"{API_URL}/chat/completions",
json=payload,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire la réponse générée
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Détails: {e.response.text}")
return None
def ia_analyse(file_names):
for file in file_names:
ingest_document(file)
time.sleep(5)
reponse = {}
for file in file_names:
produit_final = re.search(r"chemins critiques (.+)\.md$", file.name).group(1)
# Préparer le prompt avec le contexte précédent si disponible et demandé
full_prompt = f"""
Rédigez une synthèse du fichier {file.name} dédiée au produit final '{produit_final}'.
Cette synthèse, destinée spécifiquement au Directeur des Risques, membre du COMEX d'une grande entreprise utilisant ce produit, doit être claire et concise (environ 10 lignes).
En utilisant impérativement la méthodologie fournie, expliquez en termes simples mais précis, pourquoi et comment les vulnérabilités identifiées constituent un risque concret pour l'entreprise. Mentionnez clairement :
- Les composants spécifiques du produit '{produit_final}' concernés par ces vulnérabilités.
- Les minerais précis responsables de ces vulnérabilités et leur rôle dans limpact sur les composants.
- Les points critiques exacts identifiés dans la chaîne d'approvisionnement (par exemple : faible substituabilité, forte concentration géographique, instabilité géopolitique, concurrence élevée entre secteurs industriels).
Respectez strictement les consignes suivantes :
- N'utilisez aucun acronyme ni valeur numérique ; uniquement leur équivalent textuel (ex : criticité de substituabilité, vulnérabilité élevée ou critique, etc.).
- N'incluez à ce stade aucune préconisation ni recommandation.
Votre texte doit être parfaitement adapté à une compréhension rapide par des dirigeants dentreprise.
"""
# Définir les paramètres de la requête
system_message = f"""
Vous êtes un assistant stratégique expert chargé de rédiger des synthèses destinées à des décideurs de très haut niveau (Directeurs des Risques, membres du COMEX, stratèges industriels). Vous analysez exclusivement les vulnérabilités systémiques affectant les produits numériques, à partir des données précises fournies dans le fichier {file.name}.
Votre analyse doit être rigoureuse, accessible, pertinente pour la prise de décision stratégique, et conforme à la méthodologie définie ci-dessous :
{PROMPT_METHODOLOGIE}
"""
reponse[produit_final] = f"\n**{produit_final}**\n\n" + generate_text(file, full_prompt, system_message).split("</think>")[-1].strip()
# print(reponse[produit_final])
corps = "\n\n".join(reponse.values())
print("Corps")
full_prompt = corps + "\n\n" + PROMPT_METHODOLOGIE
system_message = """
Vous êtes un expert en rédaction de rapports stratégiques destinés à un COMEX ou une Direction des Risques.
Votre mission est d'écrire une introduction professionnelle, claire et synthétique (maximum 7 lignes) à partir des éléments suivants :
1. Un corps danalyse décrivant les vulnérabilités identifiées pour un produit numérique.
2. La méthodologie détaillée utilisée pour cette analyse (fourni en deuxième partie).
Votre introduction doit :
- Présenter brièvement le sujet traité (vulnérabilités du produit final).
- Annoncer clairement le contenu et l'objectif de l'analyse présentée dans le corps.
- Résumer succinctement les axes méthodologiques principaux (concentration géographique ou industrielle, stabilité géopolitique, criticité de substituabilité, concurrence intersectorielle des minerais).
- Être facilement compréhensible par des décideurs de haut niveau (pas d'acronymes, ni chiffres ; uniquement des formulations textuelles).
- Être fluide, agréable à lire, avec un ton sobre et professionnel.
Répondez uniquement avec l'introduction rédigée. Ne fournissez aucune autre explication complémentaire.
"""
introduction = generate_text("", full_prompt, system_message).split("</think>")[-1].strip()
print("Introduction")
full_prompt = corps + "\n\n" + PROMPT_METHODOLOGIE
system_message = """
Vous êtes un expert stratégique en gestion des risques liés à la chaîne de valeur numérique. Vous conseillez directement le COMEX et la Direction des Risques de grandes entreprises utilisatrices de produits numériques. Ces entreprises n'ont pour levier daction que le choix de leurs fournisseurs ou l'allongement de la durée de vie de leur matériel.
À partir des vulnérabilités identifiées dans la première partie du prompt (corps d'analyse) et en tenant compte du contexte et de la méthodologie décrite en deuxième partie, rédigez un texte clair, structuré en deux parties distinctes :
1. **Préconisations stratégiques :**
Proposez clairement des axes concrets pour limiter les risques identifiés dans lanalyse. Ces préconisations doivent impérativement être réalistes et directement actionnables par les dirigeants compte tenu de leurs leviers limités.
2. **Indicateurs de suivi :**
Identifiez précisément les indicateurs pertinents à suivre pour évaluer régulièrement lévolution de ces risques. Ces indicateurs doivent être inspirés directement des axes méthodologiques fournis (concentration géographique, stabilité géopolitique, substituabilité, concurrence intersectorielle) ou sappuyer sur des bonnes pratiques reconnues.
Votre rédaction doit être fluide, concise, très professionnelle, et directement accessible à un COMEX. Évitez strictement toute explication complémentaire ou ajout superflu. Ne proposez que le texte demandé.
"""
preconisations = generate_text("", full_prompt, system_message, "0.5").split("</think>")[-1].strip()
print("Préconisations")
full_prompt = corps + "\n\n" + preconisations
system_message = """
Vous êtes un expert stratégique spécialisé dans les risques liés à la chaîne de valeur du numérique. Vous conseillez directement le COMEX et la Direction des Risques de grandes entreprises dépendantes du numérique, dont les leviers daction se limitent au choix des fournisseurs et à lallongement de la durée dutilisation du matériel.
À partir du résultat de l'analyse des vulnérabilités présenté en première partie du prompt (corps) et des préconisations stratégiques formulées en deuxième partie, rédigez une conclusion synthétique et percutante (environ 6 à 8 lignes maximum) afin de :
- Résumer clairement les principaux risques identifiés.
- Souligner brièvement les axes prioritaires proposés pour agir concrètement.
- Inviter de manière dynamique le COMEX à passer immédiatement à l'action.
Votre rédaction doit être fluide, professionnelle, claire et immédiatement exploitable par des dirigeants. Ne fournissez aucune explication supplémentaire. Ne répondez que par la conclusion demandée.
"""
conclusion = generate_text("", full_prompt, system_message, "0.7").split("</think>")[-1].strip()
print("Conclusion")
analyse = "# Rapport d'analyse\n\n" + \
"\n\n## Introduction\n\n" + \
introduction + \
"\n\n## Analyse des produits finaux\n\n" + \
corps + \
"\n\n## Préconisations\n\n" + \
preconisations + \
"\n\n## Conclusion\n\n" + \
conclusion
fichier_a_reviser = TEMPLATE_PATH.name.replace(".md", " - analyse à relire.md")
write_report(analyse, fichier_a_reviser)
ingest_document(Path(fichier_a_reviser))
full_prompt = f"""
Le fichier à réviser est {fichier_a_reviser}. Suivre scrupuleusement les consignes.
"""
system_message = f"""
Vous êtes un réviseur professionnel expert en écriture stratégique, maîtrisant parfaitement la langue française et habitué à réviser des textes destinés à des dirigeants de haut niveau (COMEX).
Votre unique tâche est d'améliorer la qualité rédactionnelle du texte dans le fichier {fichier_a_reviser}, sans modifier ni sa structure, ni son sens initial, ni ajouter dinformations nouvelles. Cette révision doit :
- Éliminer toutes répétitions ou redondances et varier systématiquement les tournures entre les paragraphes.
- Rendre chaque phrase claire, directe et concise. Si une phrase est trop longue, scindez-la en plusieurs phrases courtes.
- Remplacer systématiquement les acronymes par les expressions suivantes :
- ICS « capacité à substituer un minerai »
- IHH « concentration géographique ou industrielle »
- ISG « stabilité géopolitique »
- IVC « concurrence intersectorielle pour les minerais »
Votre texte final doit être fluide, agréable à lire, parfaitement adapté à un COMEX, avec un ton professionnel et sobre.
Répondez uniquement avec le texte révisé, sans autre commentaire.
"""
corps = generate_text(fichier_a_reviser, full_prompt, system_message, "0.6").split("</think>")[-1].strip()
print("Relecture")
return analyse
def write_report(report, fichier):
"""Écrit le rapport généré dans le fichier spécifié."""
with open(TEMPLATE_PATH, 'w', encoding='utf-8') as f:
report = re.sub(r'<!----.*?-->', '', report)
report = re.sub(r'\n\n\n+', '\n\n', report)
with open(fichier, 'w', encoding='utf-8') as f:
f.write(report)
# print(f"Rapport généré avec succès: {TEMPLATE_PATH}")
def nettoyer_texte_fr(texte: str) -> str:
# Apostrophes droites -> typographiques
texte = texte.replace("'", "")
# Guillemets droits -> guillemets français (avec espace fine insécable)
texte = re.sub(r'"(.*?)"', r'« \1»', texte)
# Espaces fines insécables avant : ; ! ?
texte = re.sub(r' (?=[:;!?])', '\u202F', texte)
# Unités : espace insécable entre chiffre et unité
texte = re.sub(r'(\d) (?=\w+)', lambda m: f"{m.group(1)}\u202F", texte)
# Suppression des doubles espaces
texte = re.sub(r' {2,}', ' ', texte)
# Remplacement optionnel des tirets simples (optionnel)
texte = texte.replace(" - ", " ")
# Nettoyage ponctuation multiple accidentelle
texte = re.sub(r'\s+([.,;!?])', r'\1', texte)
return texte
def main():
"""Fonction principale du script."""
# Charger la configuration
config = load_config(CONFIG_PATH)
# Analyser les graphes
graph, ref_graph = parse_graphs(config)
# Extraire les données
data = extract_data_from_graph(graph, ref_graph)
# Calculer les vulnérabilités
results = calculate_vulnerabilities(data, config)
# Générer le rapport
report = generate_report(data, results, config)
report = re.sub(r'<!----.*?-->', '', report)
report = re.sub(r'\n\n\n+', '\n\n', report)
report, file_names = generate_report(data, results, config)
# Écrire le rapport
write_report(report, config)
write_report(report, TEMPLATE_PATH)
# Générer l'analyse par l'IA du rapport compler
analyse_finale = nettoyer_texte_fr(ia_analyse(file_names))
write_report(analyse_finale, TEMP_SECTIONS / TEMPLATE_PATH.name.replace(".md", " - analyse.md"))
if __name__ == "__main__":
main()

View File

@ -1,292 +0,0 @@
import requests
import time
import argparse
import json
import sys
from pathlib import Path
from typing import List, Dict, Optional
from enum import Enum
# Configuration de l'API PrivateGPT
PGPT_URL = "http://127.0.0.1:8001"
API_URL = f"{PGPT_URL}/v1"
DEFAULT_INPUT_PATH = "rapport_final.md"
DEFAULT_OUTPUT_PATH = "rapport_analyse_genere.md"
MAX_CONTEXT_LENGTH = 3000 # Taille maximale du contexte en caractères
# Stratégies de gestion du contexte
class ContextStrategy(str, Enum):
NONE = "none" # Pas de contexte
TRUNCATE = "truncate" # Tronquer le contexte
SUMMARIZE = "summarize" # Résumer le contexte
LATEST = "latest" # Uniquement la dernière section
# Les 5 étapes de génération du rapport
steps = [
{
"title": "Analyse détaillée",
"prompt": "Vous rédigez la section 'Analyse détaillée' d'un rapport structuré en cinq parties :\n"
"1. Synthèse\n2. Analyse détaillée\n3. Interdépendances\n4. Points de vigilance\n5. Conclusion\n\n"
"Votre objectif est d'analyser les vulnérabilités critiques, élevées et modérées présentes dans le rapport ingéré.\n"
"Structurez votre réponse par niveau de criticité, et intégrez les indices IHH, ICS, ISG, IVC de façon fluide dans le texte.\n"
"N'incluez pas de synthèse ni de conclusion."
},
{
"title": "Interdépendances",
"prompt": "Vous rédigez la section 'Interdépendances'. Identifiez les chaînes de vulnérabilités croisées,\n"
"les effets en cascade, et les convergences vers des produits numériques communs.\n"
"N'introduisez pas de recommandations ou d'évaluations globales."
},
{
"title": "Points de vigilance",
"prompt": "Vous rédigez la section 'Points de vigilance'. Sur la base des analyses précédentes,\n"
"proposez une liste d'indicateurs à surveiller, avec explication courte pour chacun."
},
{
"title": "Synthèse",
"prompt": "Vous rédigez la 'Synthèse'. Résumez les vulnérabilités majeures, les effets en cascade et les indicateurs critiques.\n"
"Ton narratif, percutant, orienté décideur."
},
{
"title": "Conclusion",
"prompt": "Vous rédigez la 'Conclusion'. Proposez des scénarios d'impact avec : déclencheur, horizon, gravité, conséquences."
}
]
def check_api_availability() -> bool:
"""Vérifie si l'API PrivateGPT est disponible"""
try:
response = requests.get(f"{PGPT_URL}/health")
if response.status_code == 200:
print("API PrivateGPT disponible")
return True
else:
print(f"❌ L'API PrivateGPT a retourné le code d'état {response.status_code}")
return False
except requests.RequestException as e:
print(f"❌ Erreur de connexion à l'API PrivateGPT: {e}")
return False
def ingest_document(file_path: Path) -> bool:
"""Ingère un document dans PrivateGPT"""
try:
with open(file_path, "rb") as f:
files = {"file": (file_path.name, f, "text/markdown")}
response = requests.post(f"{API_URL}/ingest/file", files=files)
response.raise_for_status()
print(f"Document '{file_path}' ingéré avec succès")
return True
except FileNotFoundError:
print(f"❌ Fichier '{file_path}' introuvable")
return False
except requests.RequestException as e:
print(f"❌ Erreur lors de l'ingestion du document: {e}")
return False
def get_context(sections: List[Dict[str, str]], strategy: ContextStrategy, max_length: int) -> str:
"""Génère le contexte selon la stratégie choisie"""
if not sections or strategy == ContextStrategy.NONE:
return ""
# Stratégie: uniquement la dernière section
if strategy == ContextStrategy.LATEST:
latest = sections[-1]
context = f"# {latest['title']}\n{latest['output']}"
if len(context) > max_length:
context = context[:max_length] + "..."
print(f"Contexte basé sur la dernière section: {latest['title']} ({len(context)} caractères)")
return context
# Stratégie: tronquer toutes les sections
if strategy == ContextStrategy.TRUNCATE:
full_context = "\n\n".join([f"# {s['title']}\n{s['output']}" for s in sections])
if len(full_context) > max_length:
truncated = full_context[:max_length] + "..."
print(f"Contexte tronqué à {max_length} caractères")
return truncated
return full_context
# Stratégie: résumer intelligemment
if strategy == ContextStrategy.SUMMARIZE:
try:
# Construire un prompt pour demander un résumé des sections précédentes
sections_text = "\n\n".join([f"# {s['title']}\n{s['output']}" for s in sections])
# Limiter la taille du texte à résumer si nécessaire
if len(sections_text) > max_length * 2:
sections_text = sections_text[:max_length * 2] + "..."
summary_prompt = {
"messages": [
{"role": "system", "content": "Vous êtes un assistant spécialisé dans le résumé concis. Créez un résumé très court mais informatif qui conserve les points clés."},
{"role": "user", "content": f"Résumez les sections suivantes en {max_length // 10} mots maximum, en conservant les idées principales et les points essentiels:\n\n{sections_text}"}
],
"temperature": 0.1,
"stream": False
}
# Envoyer la requête pour obtenir un résumé
response = requests.post(
f"{API_URL}/chat/completions",
json=summary_prompt,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire et retourner le résumé
result = response.json()
summary = result["choices"][0]["message"]["content"]
print(f"Résumé de contexte généré ({len(summary)} caractères)")
return summary
except Exception as e:
print(f"Impossible de générer un résumé du contexte: {e}")
# En cas d'échec, revenir à la stratégie de troncature
return get_context(sections, ContextStrategy.TRUNCATE, max_length)
def generate_text(prompt: str, previous_context: str = "", use_context: bool = True, retry_on_error: bool = True) -> Optional[str]:
"""Génère du texte avec l'API PrivateGPT"""
try:
# Préparer le prompt avec le contexte précédent si disponible et demandé
full_prompt = prompt
if previous_context and use_context:
full_prompt += "\n\nContexte précédent (informations des sections déjà générées) :\n" + previous_context
# Définir les paramètres de la requête
payload = {
"messages": [
{"role": "system", "content": """
Vous êtes un assistant stratégique expert chargé de produire des rapports destinés à des décideurs de haut niveau (COMEX, directions risques, stratèges politiques ou industriels).
Objectif : Analyser les vulnérabilités systémiques dans une chaîne de valeur numérique décrite dans le fichier rapport_final.md en vous appuyant exclusivement sur ce rapport structuré fourni, et produire un rapport narratif percutant orienté décision.
"""},
{"role": "user", "content": full_prompt}
],
"use_context": True,
"temperature": 0.2, # Température réduite pour plus de cohérence
"stream": False
}
# Envoyer la requête
response = requests.post(
f"{API_URL}/chat/completions",
json=payload,
headers={"accept": "application/json"}
)
response.raise_for_status()
# Extraire la réponse générée
result = response.json()
if "choices" in result and len(result["choices"]) > 0:
return result["choices"][0]["message"]["content"]
else:
print("⚠️ Format de réponse inattendu:", json.dumps(result, indent=2))
return None
except requests.RequestException as e:
print(f"❌ Erreur lors de la génération de texte: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f"Détails: {e.response.text}")
return None
def main(input_path: str, output_path: str, context_strategy: ContextStrategy = ContextStrategy.SUMMARIZE, context_length: int = MAX_CONTEXT_LENGTH):
"""Fonction principale qui exécute le processus complet"""
# Vérifier la disponibilité de l'API
if not check_api_availability():
sys.exit(1)
# Convertir les chemins en objets Path
input_file = Path(input_path)
output_file = Path(output_path)
# Ingérer le document
if not ingest_document(input_file):
sys.exit(1)
# Récupérer la valeur du délai depuis args
delay = args.delay if 'args' in globals() else 5
# Attendre que l'ingestion soit complètement traitée
print(f"Attente du traitement de l'ingestion pendant {delay} secondes...")
time.sleep(delay)
print(f"Stratégie de contexte: {context_strategy.value}, taille max: {context_length} caractères")
# Générer chaque section du rapport
step_outputs = []
for i, step in enumerate(steps):
print(f"\nÉtape {i+1}/{len(steps)}: {step['title']}")
# Préparer le contexte selon la stratégie choisie
previous = get_context(step_outputs, context_strategy, context_length) if step_outputs else ""
# Générer le texte pour cette étape
max_retries = 3 if context_strategy != ContextStrategy.NONE else 1
retry_count = 0
output = None
while retry_count < max_retries and output is None:
try:
# Si ce n'est pas la première tentative et que nous avons une stratégie de contexte
if retry_count > 0 and context_strategy != ContextStrategy.NONE:
print(f"Tentative {retry_count+1}/{max_retries} avec une stratégie de contexte réduite")
# Réduire la taille du contexte à chaque tentative
reduced_context_length = context_length // (retry_count + 1)
fallback_strategy = ContextStrategy.LATEST if retry_count == 1 else ContextStrategy.NONE
previous = get_context(step_outputs, fallback_strategy, reduced_context_length)
# Générer le texte
output = generate_text(step["prompt"], previous, context_strategy != ContextStrategy.NONE)
except Exception as e:
print(f"⚠️ Erreur lors de la génération: {e}")
retry_count += 1
time.sleep(1) # Pause avant de réessayer
if output is None:
retry_count += 1
if output:
step_outputs.append({"title": step["title"], "output": output})
print(f"Section '{step['title']}' générée ({len(output)} caractères)")
else:
print(f"❌ Échec de la génération pour '{step['title']}'")
# Vérifier si nous avons généré toutes les sections
if len(step_outputs) != len(steps):
print(f"⚠️ Attention: Seules {len(step_outputs)}/{len(steps)} sections ont été générées")
# Écrire le rapport final dans l'ordre souhaité (Synthèse en premier)
# Réordonner les sections pour que la synthèse soit en premier
ordered_outputs = sorted(step_outputs, key=lambda x: 0 if x["title"] == "Synthèse" else
1 if x["title"] == "Analyse détaillée" else
2 if x["title"] == "Interdépendances" else
3 if x["title"] == "Points de vigilance" else 4)
# Assembler le rapport
report_text = "\n\n".join(f"# {s['title']}\n\n{s['output']}" for s in ordered_outputs)
# Écrire le rapport dans un fichier
try:
output_file.write_text(report_text, encoding="utf-8")
print(f"\nRapport final généré dans '{output_file}'")
except IOError as e:
print(f"❌ Erreur lors de l'écriture du fichier de sortie: {e}")
if __name__ == "__main__":
# Définir les arguments en ligne de commande
parser = argparse.ArgumentParser(description="Générer un rapport d'analyse structuré avec PrivateGPT")
parser.add_argument("-i", "--input", type=str, default=DEFAULT_INPUT_PATH,
help=f"Chemin du fichier d'entrée (défaut: {DEFAULT_INPUT_PATH})")
parser.add_argument("-o", "--output", type=str, default=DEFAULT_OUTPUT_PATH,
help=f"Chemin du fichier de sortie (défaut: {DEFAULT_OUTPUT_PATH})")
parser.add_argument("--context", type=str, choices=[s.value for s in ContextStrategy], default=ContextStrategy.SUMMARIZE.value,
help=f"Stratégie de gestion du contexte (défaut: {ContextStrategy.SUMMARIZE.value})")
parser.add_argument("--context-length", type=int, default=MAX_CONTEXT_LENGTH,
help=f"Taille maximale du contexte en caractères (défaut: {MAX_CONTEXT_LENGTH})")
parser.add_argument("--delay", type=int, default=5,
help="Délai d'attente en secondes après l'ingestion (défaut: 5)")
args = parser.parse_args()
# Exécuter le programme principal avec les options configurées
context_strategy = ContextStrategy(args.context)
main(args.input, args.output, context_strategy, args.context_length)

View File

@ -25,10 +25,11 @@ import re
def calcul_ihh(graphe, depart, arrivee):
ihh = 0
for noeud in arrivee:
if arrivee not in list(graphe.successors(depart)):
depart = list(graphe.predecessors(noeud))[0]
relation = graphe.get_edge_data(depart, noeud)
ihh += int(int(relation['label'].strip("%"))**2)
if depart in noeud: # Gestion simplifiée de l'intégration des Connexes
if arrivee not in list(graphe.successors(depart)):
depart = list(graphe.predecessors(noeud))[0]
relation = graphe.get_edge_data(depart, noeud)
ihh += int(int(relation['label'].strip("%"))**2)
ihh = int(round(ihh/100))
return ihh
@ -56,11 +57,11 @@ def mettre_a_jour_ihh(graph, noeuds):
# auprès des acteurs de l'extraction. Il y a une relations entre chaque acteur de traitement
# et les pays auprès desquels il se fournit en minerai.
if "Traitement" in operation:
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if v == "11" and operation in n]
noeuds_acteurs_ihh = [n for n, v in niveaux_ihh.items() if v == "12" and operation in list(sous_graphe.predecessors(n))[0]]
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if (v == "11" or v == "1011") and operation in n]
noeuds_acteurs_ihh = [n for n, v in niveaux_ihh.items() if (v == "12" or v == "1012" and operation in list(sous_graphe.predecessors(n))[0]]
else:
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if v == "11"]
noeuds_acteurs_ihh = [n for n, v in niveaux_ihh.items() if v == "12"]
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if (v == "11" or v == "1011")]
noeuds_acteurs_ihh = [n for n, v in niveaux_ihh.items() if (v == "12" or v == "1012")]
# Le calcul de l'indice de Herfindahl-Hirschmann se fait normalement au niveau d'une entreprise.
# Toutefois, il se fait au niveau des pays et au niveau des acteurs pour l'opération qui est menée.
@ -93,7 +94,7 @@ def mettre_a_jour_ihh_reserves(graph, noeuds):
niveaux_ihh = nx.get_node_attributes(sous_graphe, "niveau")
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if v == "11"]
noeuds_pays_ihh = [n for n, v in niveaux_ihh.items() if (v == "11" or v == "1011")]
# Le calcul de l'indice de Herfindahl-Hirschmann se fait normalement au niveau d'une entreprise.
# Toutefois, il se fait au niveau des pays et au niveau des acteurs pour l'opération qui est menée.
@ -117,12 +118,12 @@ def main():
graph = nx.DiGraph(nx.nx_agraph.read_dot(fichier_en_entree))
niveaux = nx.get_node_attributes(graph, "niveau")
noeuds_niveau_10 = [n for n, v in niveaux.items() if v == "10" and not re.search(r'Reserves', n)]
noeuds_niveau_10 = [n for n, v in niveaux.items() if (v == "10" or v == "1010") and not re.search(r'Reserves', n)]
noeuds_niveau_10.sort()
graphe = mettre_a_jour_ihh(graph, noeuds_niveau_10)
noeuds_niveau_10 = [n for n, v in niveaux.items() if v == "10" and re.search(r'Reserves', n)]
noeuds_niveau_10 = [n for n, v in niveaux.items() if (v == "10" or v == "1010") and re.search(r'Reserves', n)]
noeuds_niveau_10.sort()
graphe = mettre_a_jour_ihh_reserves(graphe, noeuds_niveau_10)

File diff suppressed because it is too large Load Diff

View File

@ -1,117 +0,0 @@
## Synthèse
**Synthèse introductive : Vulnérabilités systémiques et enjeux critiques dans la chaîne numérique**
La chaîne de valeur numérique analysée révèle des vulnérabilités structurelles majeures, dont les effets cascades menacent sa résilience globale. **Trois acteurs dominants**, contrôlant respectivement 30 %, 20 % et 17 % du marché (IHH = 19), concentrent le pouvoir économique et opérationnel au sein de la chaîne. Cette concentration modérée, bien que moins extrême quun monopole, **accentue les risques systémiques** : une perturbation liée à lun dentre eux (crise financière, défaillance technologique ou cyberattaque) pourrait se propager rapidement aux acteurs interdépendants et au tissu économique global.
Les indicateurs clés soulignent des lacunes critiques :
- **Labsence de mesure précise du ISG** (stabilité des fournisseurs), qui rend impossible une évaluation rigoureuse des risques liés aux chaînes dapprovisionnement ;
- Un **IVC non quantifié**, laissant dans lombre les vulnérabilités des acteurs de basse filière, souvent invisibles mais essentielles à la continuité opérationnelle.
**Les effets en cascade sont multiples et transversaux** : une défaillance dun grand acteur pourrait provoquer un **effondrement sectoriel**, affectant non seulement les partenaires directs (fournisseurs, clients), mais aussi des secteurs distants via la perte de données, larrêt de services critiques ou le déséquilibre des marchés secondaires.
**Priorité immédiate : renforcer la diversification des acteurs clés**, auditer les chaînes dapprovisionnement pour identifier les points faibles non visibles (ISG), et instaurer un suivi dynamique de lIVC. Sans ces mesures, le système reste exposé à une crise systémique qui pourrait se généraliser en quelques semaines — au prix dun coût économique et social dévastateur pour les acteurs concernés.
*« La résilience ne réside pas dans la taille des acteurs dominants, mais dans leur capacité à coexister avec un écosystème diversifié et audité. »*# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Analyse détaillée
**Analyse détaillée**
### **Vulnérabilités critiques (IHH < 10)**
Dans le secteur où lindice IHH est de **8**, la concentration du marché reste faible, mais des acteurs majeurs comme Yunnan Chihong Germanium et ses concurrents détiennent plus de **43 %** du marché. Bien que cette structure favorise une certaine résilience industrielle (grâce à léquilibre entre plusieurs groupes), elle cache un risque critique : la concentration des parts de marché parmi quelques acteurs clés pourrait amplifier les effets dune crise locale ou géopolitique, surtout si ces derniers dépendent fortement de chaînes logistiques vulnérables (indiqué par lindice IVC). Labsence dun indice ISG (Indice de Stabilité Industrielle Globale) spécifique rend difficile une évaluation précise des capacités de réaction du secteur face à un choc exogène, mais la faible concentration suggère que les risques systémiques restent limités.
### **Vulnérabilités élevées (IHH entre 10 et 25)**
Dans le second scénario où lindice IHH atteint **19**, une concentration modérée est observée, avec trois acteurs majeurs détenant respectivement **30 %, 20 %** et **17 %** du marché. Cette structure réduit la résilience par rapport au cas précédent (IHH = 8), car les risques sont davantage concentrés sur quelques grands groupes. Lindice ICS (Indice de Concentration Sectorielle) pourrait ici refléter une dépendance accrue à ces acteurs, rendant le secteur plus sensible aux perturbations liées à leur capacité opérationnelle ou financière. Par ailleurs, labsence dun indice ISG clair (qui mesurerait la stabilité des fournisseurs) et un IVC non quantifié soulignent une lacune dans lévaluation de la vulnérabilité des chaînes de valeur associées à ces acteurs dominants.
### **Vulnérabilités modérées (IHH > 25)**
Aucun cas nest explicitement mentionné ici, mais si un IHH dépassait les seuils critiques, cela indiquerait une concentration élevée et des risques systémiques majeurs. Cependant, dans le contexte fourni, la structure modérée (IHH = 19) suggère que les vulnérabilités restent gérables à court terme, bien quune surveillance continue soit nécessaire pour éviter une dérive vers un monopole ou oligopole. Lindice IVC pourrait ici jouer un rôle clé : si la chaîne de valeur est fortement interdépendante (par exemple via des fournisseurs stratégiques), même une concentration modérée pourrait se traduire par des risques élevés en cas dinterruption locale.# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Interdépendances
**Interdépendances : Chaînes de vulnérabilités croisées et effets en cascade**
Les interdépendances au sein de la chaîne de valeur numérique révèlent des liens structurels entre les acteurs dominants (30 %, 20 %, 17 % du marché) et leurs partenaires stratégiques. La concentration modérée observée (IHH = 19) crée une dépendance sectorielle accrue, reflétée par lindice ICS, qui met en lumière la vulnérabilité des sous-secteurs liés à ces acteurs majeurs. Par exemple, un dysfonctionnement opérationnel ou financier chez le premier acteur (30 %) pourrait se propager vers ses fournisseurs et clients directs, perturbant les flux de données critiques et entraînant une cascade dinterruptions dans des segments en aval.
Les convergences vers des produits numériques communs amplifient ces risques. Les trois acteurs dominants partagent probablement des infrastructures technologiques interdépendantes (ex : plateformes cloud, logiciels de gestion) ou dépendent dun même écosystème de fournisseurs stratégiques non quantifiés par lindice IVC. Une vulnérabilité dans un élément central — comme une faille de sécurité sur ces produits communs — pourrait se répercuter simultanément sur plusieurs acteurs, créant des effets en cascade transversaux (ex : perte de disponibilité dun service critique pour lensemble du secteur). Cette interdépendance structurelle renforce la fragilité systémique, même si les seuils critiques dIHH ne sont pas atteints.# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Points de vigilance
**Points de vigilance : Indicateurs clés à surveiller**
1. **Indice dHéritage Hiérarchique (IHH)**
*Surveillance* : Évolution du niveau de concentration des acteurs majeurs dans la chaîne de valeur numérique. Une hausse persistante au-delà de 15 pourrait amplifier les risques systémiques liés à linstabilité dun petit nombre de grands groupes.
2. **Indice de Concentration Sectorielle (ICS)**
*Surveillance* : Mesure la dépendance accrue des acteurs dominants par rapport aux secteurs clés. Une augmentation soudaine pourrait signaler une vulnérabilité croissante à des perturbations sectorielles.
3. **Stabilité opérationnelle des fournisseurs majeurs (ISG)**
*Surveillance* : Évaluation de la résilience financière et logistique des trois acteurs dominants (30 %, 20 %, 17 %). Une dégradation pourrait provoquer une cascade deffets sur lintégrité du réseau.
4. **Diversification des chaînes de valeur (IVC)**
*Surveillance* : Analyse de la fragmentation ou non des fournisseurs secondaires et tiers. Un IVC faible indiquerait un risque élevé de dépendance à une seule source, exacerbant les vulnérabilités en cas dincident.
5. **Convergences vers des produits numériques communs**
*Surveillance* : Suivi du marché pour détecter lémergence de plateformes ou standards dominants (ex. cloud computing, IA). Une concentration excessive pourrait créer un point critique à risque deffondrement.
6. **Interdépendances croisées entre secteurs**
*Surveillance* : Cartographie des liens entre acteurs du numérique et autres secteurs stratégiques (énergie, santé, etc.). Des ruptures dans lun pourraient se répercuter sur les autres via des dépendances non maîtrisées.
7. **Indicateurs de résilience sectorielle**
*Surveillance* : Mesure du temps de récupération après une perturbation majeure (ex. pannes, cyberattaques). Une capacité réduite pourrait exacerber les effets cascades identifiés dans lanalyse préliminaire.
---
*Ces indicateurs permettent dévaluer en continu la stabilité du système et de détecter précocement des risques systémiques. Leur suivi doit être intégré à une gouvernance proactive, associant acteurs économiques, régulateurs et institutions publiques.*# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Conclusion
**Conclusion : Scénarios dimpact et recommandations stratégiques**
### **Scénario 1 : Défaillance opérationnelle des acteurs dominants (30 % du marché)**
- **Déclencheur** : Une crise financière ou une rupture technologique majeure affectant lun des trois acteurs clés.
- **Horizon** : Moyen à long terme (25 ans).
- **Gravité** : Élevée, en raison de la concentration du marché et de la dépendance sectorielle accrue.
- **Conséquences** :
- Détérioration rapide de lindice ICS (concentration sectorielle), aggravant les risques systémiques.
- Perturbations chaîne dapprovisionnement, avec un impact sur la résilience des acteurs secondaires et des clients finaux.
- Risque de monopole ou de concentration accrue si lun des concurrents absorbe le marché vacant (risque IHH >25).
---
### **Scénario 2 : Attaque cybernétique ciblant les fournisseurs stratégiques**
- **Déclencheur** : Une attaque massive sur un réseau de fournisseurs non sécurisés, exacerbée par labsence dun indice ISG clair.
- **Horizon** : Court terme (moins de 6 mois).
- **Gravité** : Élevée à critique, en raison des vulnérabilités non quantifiées dans les chaînes de valeur (IVC non mesuré).
- **Conséquences** :
- Interruption immédiate des flux numériques critiques (ex. paiements, données clients).
- Perte de confiance des utilisateurs et risque deffondrement du marché numérique si la résilience nest pas rétablie en moins de 3 mois.
---
### **Scénario 3 : Régulation sectorielle inadaptée**
- **Déclencheur** : Une réglementation mal conçue visant à limiter les concentrations (IHH >10), mais sans mesures compensatoires pour renforcer la résilience.
- **Horizon** : Moyen terme (37 ans).
- **Gravité** : Modérée, mais avec des effets cumulatifs sur linnovation et la compétitivité.
- **Conséquences** :
- Ralentissement de lémergence dacteurs alternatifs, bloquant les dynamiques de diversification du marché.
- Risque de fragmentation sectorielle si les acteurs dominants se retranchent dans des silos technologiques (augmentant le risque IHH).
---
### **Recommandations stratégiques**
1. **Diversifier la concentration sectorielle** : Encourager lémergence dacteurs intermédiaires pour réduire les indices IHH et ICS, tout en garantissant des seuils de résilience minimale (ex. 20 % de part de marché maximum par acteur).
2. **Renforcer la sécurité des fournisseurs** : Développer un indice ISG opérationnel pour évaluer les vulnérabilités des chaînes dapprovisionnement et imposer des normes minimales de cybersécurité.
3. **Anticiper les risques réglementaires** : Intégrer une analyse préalable des impacts systémiques dans tout projet de régulation, en sappuyant sur lévaluation du IVC pour éviter la fragmentation sectorielle.
Ces scénarios soulignent que sans action immédiate, les vulnérabilités identifiées (IHH = 19) risquent dengendrer des effondrements chaîne de valeur à grande échelle, avec un impact économique et social majeur. La résilience du secteur dépendra de la capacité à transformer les interdépendances en leviers stratégiques plutôt quen points faibles.

View File

@ -1,97 +0,0 @@
## Analyse Principale
### 1. Analyse détaillée (vulnérabilités critiques → modérées)
La chaîne de fabrication du numérique présente des vulnérabilités systémiques structurelles et géographiques, reflétant une **concentration dacteurs moyennement élevée** (IHH 19) et une **forte concentration géographique** (IHH 31). Ces indices indiquent que :
- **Structure industrielle modérément concentrée** : Un IHH de 19 (<25 = vert, seuil faible) suggère un marché relativement fragmenté avec plusieurs acteurs majeurs. Cela réduit le risque dembargos ou de monopoles directs mais augmente la vulnérabilité face à des perturbations sectorielles (ex : pénurie de semi-conducteurs).
- **Concentration géographique élevée** : Un IHH de 31 (>25 = orange, seuil modéré) traduit une dépendance accrue aux régions stratégiques. Par exemple, la Chine domine les étapes critiques (minerais rares, assemblage), ce qui exacerbe le risque géopolitique et logistique en cas de conflits ou dembargos unilatéraux.
**Vulnérabilités critiques :**
- **Dépendance aux minerais stratégiques** (Germanium, Erbium) : Ces ressources sont rares et leur extraction est concentrée dans des pays à faible stabilité géopolitique (ISG >60 = rouge). Une pénurie ou un blocage de leurs exportations pourrait paralyser la production décrans, de capteurs optiques et de composants réseau.
- **Substituabilité limitée** : Les minerais rares (ICS proche de 1) sont difficiles à remplacer techniquement, augmentant le risque de rupture chaîne en cas de pénurie.
**Vulnérabilités modérées :**
- **Concurrence sectorielle moyenne** : Un IVC de 50 (>50 = fort) indique une concurrence élevée, limitant les risques dembargos unilatéraux mais exigeant des stratégies de résilience proactive (ex : diversification des fournisseurs).
- **Dépendance aux acteurs clés** : Les processeurs x86 et ASIC sont dominés par 3 à 5 géants, ce qui pourrait créer un point de blocage en cas dembargo ou de pénurie.
---
### 2. Interdépendances (effets en cascade)
Les vulnérabilités se propagent le long de la chaîne via des **interdépendances structurelles et géographiques** :
- **Effet domino sur les produits finaux** : Une rupture dans lapprovisionnement de Germanium affecterait directement les écrans (TV, smartphone) et les capteurs optiques. Cela pourrait entraîner une pénurie de composants réseau (ex : fibres optiques), bloquant la production déquipements IoT ou de serveurs.
- **Effet géographique amplifié** : La concentration des usines en Asie du Sud-Est et en Chine crée un risque logistique majeur. Une perturbation dans ces régions (ex : pandémie, conflit) pourrait bloquer la production de 70 % des smartphones et ordinateurs portables à léchelle mondiale.
- **Effet technologique** : La dépendance aux processeurs x86/ARM (dominés par un petit nombre dacteurs) rend les systèmes critiques vulnérables à une pénurie ou à des attaques ciblées sur la chaîne de fabrication.
---
### 3. Points de vigilance (indicateurs à surveiller)
**Indicateurs clés pour anticiper les risques :**
1. **Stabilité géopolitique des pays producteurs de minerais rares** (ISG >60 = rouge). Une détérioration du score pourrait entraîner une pénurie immédiate.
2. **Concentration des fournisseurs de processeurs x86/ARM et ASIC** : Un IHH supérieur à 50 dans ces segments indiquerait un risque accru dembargo ou de monopole technologique.
3. **Substituabilité des composants critiques** (ICS). Une augmentation du score vers 1 signifierait une vulnérabilité accrue face aux ruptures.
4. **Indice IVC dans les segments à forte concentration géographique** : Un IVC <50 pourrait révéler un manque de concurrence, augmentant la dépendance sectorielle.
---
### Synthèse encadrée (synthèse des risques)
- **Risques critiques** : Dépendance aux minerais rares et à lindustrie semi-conductrice (ICS proche de 1).
- **Risques modérés** : Concentration géographique élevée (IHH 31), pénurie potentielle des composants x86/ARM.
- **Indicateurs à surveiller** : ISG >60, IVC <50 dans les segments clés, et ICS proche de 1 pour les minerais rares.# Analyse des vulnérabilités de la chaine de fabrication du numériquee
## Synthèse et Conclusion
### **1. Synthèse : Vulnérabilités systémiques dans la chaîne de valeur numérique**
La chaîne de valeur numérique présente des vulnérabilités structurelles qui méritent une attention immédiate, notamment en raison dun équilibre fragile entre croissance technologique et dépendance à des actifs critiques. Les risques identifiés se hiérarchisent comme suit :
#### **Risque principal (critique) : Dépendance aux composants numériques stratégiques**
- **Indice IVC (Concurrence)** : 1 → *Faible* (concurrence limitée, marché dominé par quelques acteurs).
- **Impact géopolitique** : Les usages de fibres optiques et photodétecteurs sont concentrés dans des secteurs avancés (*80% usage final*, +5.5% croissance annuelle), mais la substitution est limitée (ICS non spécifié, mais contexte suggère une faible substituabilité).
- **Conséquence** : Une rupture de fourniture ou un blocage technologique pourrait provoquer des perturbations massives dans les infrastructures numériques et énergétiques.
#### **Risque secondaire (modéré) : Tensions géopolitiques et dépendance sectorielle**
- **ISG non spécifié**, mais le contexte indique une *tension marché* de 1.5, suggérant des risques dinstabilité régionale ou commerciale.
- Les secteurs concurrents (catalyseurs, optique IR) montrent un potentiel de substitution partiel (*ICS non quantifié*, mais concurrence modérée).
#### **Risque tertiaire : Capacité limitée face à la demande croissante**
- **Ratio capacité/demande** : 0.99 → *Proche dun seuil critique*. Une augmentation de +5% dans lusage final pourrait déclencher des pénuries.
#### **Orientations pour les acteurs économiques**
- **Entreprises technologiques** : Diversifier leurs fournisseurs et investir en R&D pour réduire la dépendance aux composants critiques (ex. fibres optiques).
- **Pouvoirs publics** : Renforcer des politiques de résilience géopolitique, notamment dans les secteurs stratégiques comme loptoélectronique et le solaire concentré.
---
### **2. Conclusion : Scénarios d'impact à horizon court/medium terme**
#### **Scénario 1 : Blocage technologique des fibres optiques (Déclencheur : Sanctions géopolitiques)**
- **Horizon temporel** : 618 mois.
- **Gravité** : Élevée (*Rouge* sur IHH si concentration du marché >50%).
- **Conséquences** : Perturbation des réseaux de communication, perte defficacité dans les systèmes solaires concentrés (dépendance à 10% usage embarqué).
#### **Scénario 2 : Pénurie de photodétecteurs due au déséquilibre capacité/demande**
- **Déclencheur** : Croissance exponentielle des usages numériques (+5.5%/an) sans augmentation proportionnelle de la production (ratio capacité/demande = 0.99).
- **Horizon temporel** : 1236 mois.
- **Gravité** : Modérée (*Orange* sur IHH si concurrence modérée, mais *Rouge* sur ISG en cas de tensions géopolitiques).
- **Conséquences** : Ralentissement des innovations dans loptoélectronique et la recherche.
#### **Scénario 3 : Substitution partielle par les catalyseurs ou optique IR (Déclencheur : Coûts de production exorbitants)**
- **Horizon temporel** : 2460 mois.
- **Gravité** : Modérée (*Orange* sur ICS si substituabilité limitée).
- **Conséquences** : Réduction du marché des fibres optiques de 15% à moyen terme, mais stabilisation grâce aux alternatives (ex. catalyseurs dans les applications énergétiques).
#### **Scénario 4 : Instabilité géopolitique affectant la chaîne dapprovisionnement**
- **Déclencheur** : Conflits régionaux ou sanctions ciblées sur des fournisseurs clés.
- **Horizon temporel** : 624 mois.
- **Gravité** : Élevée (*Rouge* si ISG >60).
- **Conséquences** : Rupture de la chaîne dapprovisionnement pour les composants numériques, impactant léconomie globale (ex. perte de 15% du PIB technologique dans certains pays).
---
### **Encadrés synthétiques clés**
- **IVC = 1 → Faible concurrence**, mais *risque critique* en cas dinterdépendance sectorielle.
- **Ratio capacité/demande = 0.99 → seuil de tension imminent*.
- **Substituabilité limitée (ICS non quantifiée, mais contexte suggère une faible substituabilité).**
*Les décideurs doivent agir immédiatement pour renforcer la résilience technologique et géopolitique.*