Code/rapports_IA.py

181 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import yaml
import networkx as nx
from pathlib import Path
from networkx.drawing.nx_agraph import read_dot
# Constantes de chemins
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
ASSETS_DIR = os.path.join(BASE_DIR, 'assets')
CONFIG_PATH = os.path.join(ASSETS_DIR, 'config.yaml')
MAPPING_PATH = os.path.join(ASSETS_DIR, 'mapping.yaml')
CORPUS_DIR = os.path.join(BASE_DIR, 'Corpus')
def load_config(config_path=CONFIG_PATH):
"""
Charge les seuils depuis config.yaml
:return: dict des seuils pour 'IHH'
"""
with open(config_path, 'r', encoding='utf-8') as f:
cfg = yaml.safe_load(f)
return cfg['seuils']['IHH']
def load_mapping(mapping_path=MAPPING_PATH):
"""
Charge le mapping opérations ↔ fiches depuis mapping.yaml
:return: dict
"""
with open(mapping_path, 'r', encoding='utf-8') as f:
mapping = yaml.safe_load(f)
return mapping['operations']
def parse_graph(dot_path):
"""
Parse le graphe DOT pour extraire les IHH
:return: list of dicts
"""
G = read_dot(dot_path)
records = []
for node, attrs in G.nodes(data=True):
name = node
if '_' not in name:
continue
op, res = name.split('_', 1)
if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs:
try:
ihh_p = float(attrs.get('ihh_pays', 0))
ihh_a = float(attrs.get('ihh_acteurs', 0))
except ValueError:
continue
records.append({
'operation': op,
'resource': res,
'ihh_pays': ihh_p,
'ihh_acteurs': ihh_a,
})
return records
def classify(value, thresholds):
"""
Classifie une valeur selon thresholds
"""
v_max = thresholds['vert'].get('max')
o_min = thresholds['orange'].get('min')
o_max = thresholds['orange'].get('max')
r_min = thresholds['rouge'].get('min')
if v_max is not None and value < v_max:
return 'vert'
if o_min is not None and o_max is not None and o_min <= value <= o_max:
return 'orange'
if r_min is not None and value >= r_min:
return 'rouge'
return 'vert'
def filter_alerts(records, thresholds):
"""
Filtre pour orange/rouge
"""
alerts = []
for rec in records:
cp = classify(rec['ihh_pays'], thresholds)
ca = classify(rec['ihh_acteurs'], thresholds)
if cp in ('orange','rouge') or ca in ('orange','rouge'):
rec['color_pays'] = cp
rec['color_acteurs'] = ca
alerts.append(rec)
return alerts
def map_to_fiche(operation, resource, mapping):
"""
Retourne le répertoire de la fiche correspondant à l'opération et ressource
"""
cfg = mapping[operation]
res_dir = cfg['resource_dir']
prefix = cfg['fiche_prefix']
folder = f"{prefix} {resource.lower()}"
return os.path.join(CORPUS_DIR, res_dir, folder)
def extract_section(fiche_dir, slug_parent, slug_fille):
"""
Extrait le contenu complet de la section sans traitement
"""
# Recherche du répertoire *-slug_parent
sec_dirs = [d for d in os.listdir(fiche_dir)
if d.endswith(slug_parent) and os.path.isdir(os.path.join(fiche_dir, d))]
if not sec_dirs:
return ''
sec_dir = os.path.join(fiche_dir, sec_dirs[0])
# Recherche du fichier *-slug_fille.md quel que soit le préfixe
files = [f for f in os.listdir(sec_dir)
if f.endswith(f"-{slug_fille}.md")]
if not files:
return ''
file_path = os.path.join(sec_dir, files[0])
with open(file_path, 'r', encoding='utf-8') as f:
return f.read().strip()
def build_markdown(intro, entries):
"""
Construit le Markdown final prêt à coller dans un prompt
Recopie intégralement la section sans transformation
"""
lines = [intro, '']
for e in entries:
key = f"{e['operation']}_{e['resource']}"
lines.append(f"### {key}")
lines.append(f"- **IHH pays** : {e['ihh_pays']} ({e['color_pays']})")
lines.append(f"- **IHH acteurs** : {e['ihh_acteurs']} ({e['color_acteurs']})")
# Recopie brute de la section
for line in e['section_full'].splitlines():
lines.append(line)
lines.append('')
return '\n'.join(lines)
def main(dot_path, output_path='prompt.md'):
thresholds = load_config()
mapping = load_mapping()
records = parse_graph(dot_path)
alerts = filter_alerts(records, thresholds)
enriched = []
for r in alerts:
fiche_dir = map_to_fiche(r['operation'], r['resource'], mapping)
full_section = extract_section(
fiche_dir,
mapping[r['operation']]['slug_parent'],
mapping[r['operation']]['indices']['IHH']['slug_fille']
)
enriched.append({
**r,
'section_full': full_section
})
intro = (
"Ce rapport, destiné au COMEX et aux responsables risques, a pour objectif "
"didentifier et de détailler les opérations dont lIndice de Herfindahl-Hirschmann (IHH) "
"présente une vulnérabilité élevée."
)
md = build_markdown(intro, enriched)
with open(output_path, 'w', encoding='utf-8') as f:
f.write(md)
print(f"Prompt généré : {output_path}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--dot', required=True)
parser.add_argument('--output', default='prompt.md')
args = parser.parse_args()
main(args.dot, args.output)