mise à jour update et ajout de la génération des rapports (temporaire)

This commit is contained in:
Stéphan Peccini 2025-05-17 08:54:29 +02:00
parent 427c7d26f5
commit 33695092af
4 changed files with 314 additions and 1 deletions

3
.gitignore vendored
View File

@ -6,7 +6,8 @@
__pycache__/
*.pyo
*.pyd
.dot
*.dot
prompt.md
# Ignorer cache et temporaire
.cache/

37
assets/mapping.yaml Normal file
View File

@ -0,0 +1,37 @@
operations:
Extraction:
fiche_prefix: "Fiche minerai"
resource_dir: "Minerai"
section_parent: "Matrice des risques"
slug_parent: "matrices-des-risques"
indices:
IHH:
section_fille: "Indice de Herfindahl-Hirschmann"
slug_fille: "indice-de-herfindahl-hirschmann-extraction"
Traitement:
fiche_prefix: "Fiche minerai"
resource_dir: "Minerai"
section_parent: "Matrice des risques"
slug_parent: "matrices-des-risques"
indices:
IHH:
section_fille: "Indice de Herfindahl-Hirschmann"
slug_fille: "indice-de-herfindahl-hirschmann-traitement"
Assemblage:
fiche_prefix: "Fiche assemblage"
resource_dir: "Assemblage"
section_parent: "Matrice des risques"
slug_parent: "matrices-des-risques"
indices:
IHH:
section_fille: "Indice de Herfindahl-Hirschmann"
slug_fille: "indice-de-herfindahl-hirschmann-assemblage"
Fabrication:
fiche_prefix: "Fiche fabrication"
resource_dir: "Fabrication"
section_parent: "Matrice des risques"
slug_parent: "matrices-des-risques"
indices:
IHH:
section_fille: "Indice de Herfindahl-Hirschmann"
slug_fille: "indice-de-herfindahl-hirschmann-fabrication"

95
generate_corpus.py Normal file
View File

@ -0,0 +1,95 @@
import os
import re
import shutil
from pathlib import Path
EXCLUDE_DIRS = {"Local"}
MAX_SECTION_LENGTH = 1200 # non utilisé ici car découpe selon présence de ###
def slugify(text):
return re.sub(r'\W+', '-', text.strip()).strip('-').lower()
def split_markdown_sections_refined(content):
lines = content.splitlines()
sections = []
header_level_2 = None
section_lines = []
subsections = []
current_subsection = None
inside_section = False
for line in lines:
if line.startswith("## "):
if header_level_2:
if current_subsection:
subsections.append(current_subsection)
sections.append((header_level_2, section_lines, subsections))
section_lines, subsections = [], []
current_subsection = None
header_level_2 = line[3:].strip()
inside_section = True
elif line.startswith("### ") and inside_section:
if current_subsection:
subsections.append(current_subsection)
current_subsection = (line[4:].strip(), [])
elif inside_section:
if current_subsection:
current_subsection[1].append(line)
else:
section_lines.append(line)
if header_level_2:
if current_subsection:
subsections.append(current_subsection)
sections.append((header_level_2, section_lines, subsections))
return sections
def process_markdown_file(md_path, rel_output_dir):
with open(md_path, encoding="utf-8") as f:
content = f.read()
sections = split_markdown_sections_refined(content)
for idx, (sec_title, sec_lines, subsections) in enumerate(sections):
base_name = f"{idx:02d}-{slugify(sec_title)}"
if subsections:
sec_dir = rel_output_dir / base_name
sec_dir.mkdir(parents=True, exist_ok=True)
with open(sec_dir / "_intro.md", "w", encoding="utf-8") as f_out:
f_out.write(f"## {sec_title}\n")
f_out.write("\n".join(sec_lines).strip())
for sub_idx, (sub_title, sub_lines) in enumerate(subsections):
sub_name = f"{sub_idx:02d}-{slugify(sub_title)}.md"
with open(sec_dir / sub_name, "w", encoding="utf-8") as f_out:
f_out.write(f"### {sub_title}\n")
f_out.write("\n".join(sub_lines).strip())
else:
with open(rel_output_dir / f"{base_name}.md", "w", encoding="utf-8") as f_out:
f_out.write(f"## {sec_title}\n")
f_out.write("\n".join(sec_lines).strip())
def build_corpus_structure():
BASE_DIR = Path(__file__).resolve().parent
SOURCE_DIR = BASE_DIR / "Fiches"
DEST_DIR = BASE_DIR / "Corpus"
if DEST_DIR.exists():
shutil.rmtree(DEST_DIR)
DEST_DIR.mkdir(parents=True, exist_ok=True)
for root, _, files in os.walk(SOURCE_DIR):
rel_path = Path(root).relative_to(SOURCE_DIR)
if any(part in EXCLUDE_DIRS for part in rel_path.parts):
continue
for file in files:
if not file.endswith(".md") or ".md." in file:
continue
input_file = Path(root) / file
subdir = rel_path
filename_no_ext = Path(file).stem
output_dir = DEST_DIR / subdir / filename_no_ext
output_dir.mkdir(parents=True, exist_ok=True)
process_markdown_file(input_file, output_dir)
if __name__ == "__main__":
build_corpus_structure()
print("✅ Corpus généré avec succès dans le dossier 'Corpus/'")

180
rapports_IA.py Normal file
View File

@ -0,0 +1,180 @@
import os
import yaml
import networkx as nx
from pathlib import Path
from networkx.drawing.nx_agraph import read_dot
# Constantes de chemins
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
ASSETS_DIR = os.path.join(BASE_DIR, 'assets')
CONFIG_PATH = os.path.join(ASSETS_DIR, 'config.yaml')
MAPPING_PATH = os.path.join(ASSETS_DIR, 'mapping.yaml')
CORPUS_DIR = os.path.join(BASE_DIR, 'Corpus')
def load_config(config_path=CONFIG_PATH):
"""
Charge les seuils depuis config.yaml
:return: dict des seuils pour 'IHH'
"""
with open(config_path, 'r', encoding='utf-8') as f:
cfg = yaml.safe_load(f)
return cfg['seuils']['IHH']
def load_mapping(mapping_path=MAPPING_PATH):
"""
Charge le mapping opérations fiches depuis mapping.yaml
:return: dict
"""
with open(mapping_path, 'r', encoding='utf-8') as f:
mapping = yaml.safe_load(f)
return mapping['operations']
def parse_graph(dot_path):
"""
Parse le graphe DOT pour extraire les IHH
:return: list of dicts
"""
G = read_dot(dot_path)
records = []
for node, attrs in G.nodes(data=True):
name = node
if '_' not in name:
continue
op, res = name.split('_', 1)
if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs:
try:
ihh_p = float(attrs.get('ihh_pays', 0))
ihh_a = float(attrs.get('ihh_acteurs', 0))
except ValueError:
continue
records.append({
'operation': op,
'resource': res,
'ihh_pays': ihh_p,
'ihh_acteurs': ihh_a,
})
return records
def classify(value, thresholds):
"""
Classifie une valeur selon thresholds
"""
v_max = thresholds['vert'].get('max')
o_min = thresholds['orange'].get('min')
o_max = thresholds['orange'].get('max')
r_min = thresholds['rouge'].get('min')
if v_max is not None and value < v_max:
return 'vert'
if o_min is not None and o_max is not None and o_min <= value <= o_max:
return 'orange'
if r_min is not None and value >= r_min:
return 'rouge'
return 'vert'
def filter_alerts(records, thresholds):
"""
Filtre pour orange/rouge
"""
alerts = []
for rec in records:
cp = classify(rec['ihh_pays'], thresholds)
ca = classify(rec['ihh_acteurs'], thresholds)
if cp in ('orange','rouge') or ca in ('orange','rouge'):
rec['color_pays'] = cp
rec['color_acteurs'] = ca
alerts.append(rec)
return alerts
def map_to_fiche(operation, resource, mapping):
"""
Retourne le répertoire de la fiche correspondant à l'opération et ressource
"""
cfg = mapping[operation]
res_dir = cfg['resource_dir']
prefix = cfg['fiche_prefix']
folder = f"{prefix} {resource.lower()}"
return os.path.join(CORPUS_DIR, res_dir, folder)
def extract_section(fiche_dir, slug_parent, slug_fille):
"""
Extrait le contenu complet de la section sans traitement
"""
# Recherche du répertoire *-slug_parent
sec_dirs = [d for d in os.listdir(fiche_dir)
if d.endswith(slug_parent) and os.path.isdir(os.path.join(fiche_dir, d))]
if not sec_dirs:
return ''
sec_dir = os.path.join(fiche_dir, sec_dirs[0])
# Recherche du fichier *-slug_fille.md quel que soit le préfixe
files = [f for f in os.listdir(sec_dir)
if f.endswith(f"-{slug_fille}.md")]
if not files:
return ''
file_path = os.path.join(sec_dir, files[0])
with open(file_path, 'r', encoding='utf-8') as f:
return f.read().strip()
def build_markdown(intro, entries):
"""
Construit le Markdown final prêt à coller dans un prompt
Recopie intégralement la section sans transformation
"""
lines = [intro, '']
for e in entries:
key = f"{e['operation']}_{e['resource']}"
lines.append(f"### {key}")
lines.append(f"- **IHH pays** : {e['ihh_pays']} ({e['color_pays']})")
lines.append(f"- **IHH acteurs** : {e['ihh_acteurs']} ({e['color_acteurs']})")
# Recopie brute de la section
for line in e['section_full'].splitlines():
lines.append(line)
lines.append('')
return '\n'.join(lines)
def main(dot_path, output_path='prompt.md'):
thresholds = load_config()
mapping = load_mapping()
records = parse_graph(dot_path)
alerts = filter_alerts(records, thresholds)
enriched = []
for r in alerts:
fiche_dir = map_to_fiche(r['operation'], r['resource'], mapping)
full_section = extract_section(
fiche_dir,
mapping[r['operation']]['slug_parent'],
mapping[r['operation']]['indices']['IHH']['slug_fille']
)
enriched.append({
**r,
'section_full': full_section
})
intro = (
"Ce rapport, destiné au COMEX et aux responsables risques, a pour objectif "
"didentifier et de détailler les opérations dont lIndice de Herfindahl-Hirschmann (IHH) "
"présente une vulnérabilité élevée."
)
md = build_markdown(intro, enriched)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(md)
print(f"Prompt généré : {output_path}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--dot', required=True)
parser.add_argument('--output', default='prompt.md')
args = parser.parse_args()
main(args.dot, args.output)