import os import yaml import networkx as nx from pathlib import Path from networkx.drawing.nx_agraph import read_dot # Constantes de chemins BASE_DIR = os.path.abspath(os.path.dirname(__file__)) ASSETS_DIR = os.path.join(BASE_DIR, 'assets') CONFIG_PATH = os.path.join(ASSETS_DIR, 'config.yaml') MAPPING_PATH = os.path.join(ASSETS_DIR, 'mapping.yaml') CORPUS_DIR = os.path.join(BASE_DIR, 'Corpus') def load_config(config_path=CONFIG_PATH): """ Charge les seuils depuis config.yaml :return: dict des seuils pour 'IHH' """ with open(config_path, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) return cfg['seuils']['IHH'] def load_mapping(mapping_path=MAPPING_PATH): """ Charge le mapping opérations ↔ fiches depuis mapping.yaml :return: dict """ with open(mapping_path, 'r', encoding='utf-8') as f: mapping = yaml.safe_load(f) return mapping['operations'] def parse_graph(dot_path): """ Parse le graphe DOT pour extraire les IHH :return: list of dicts """ G = read_dot(dot_path) records = [] for node, attrs in G.nodes(data=True): name = node if '_' not in name: continue op, res = name.split('_', 1) if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs: try: ihh_p = float(attrs.get('ihh_pays', 0)) ihh_a = float(attrs.get('ihh_acteurs', 0)) except ValueError: continue records.append({ 'operation': op, 'resource': res, 'ihh_pays': ihh_p, 'ihh_acteurs': ihh_a, }) return records def classify(value, thresholds): """ Classifie une valeur selon thresholds """ v_max = thresholds['vert'].get('max') o_min = thresholds['orange'].get('min') o_max = thresholds['orange'].get('max') r_min = thresholds['rouge'].get('min') if v_max is not None and value < v_max: return 'vert' if o_min is not None and o_max is not None and o_min <= value <= o_max: return 'orange' if r_min is not None and value >= r_min: return 'rouge' return 'vert' def filter_alerts(records, thresholds): """ Filtre pour orange/rouge """ alerts = [] for rec in records: cp = classify(rec['ihh_pays'], thresholds) ca = classify(rec['ihh_acteurs'], thresholds) if cp in ('orange','rouge') or ca in ('orange','rouge'): rec['color_pays'] = cp rec['color_acteurs'] = ca alerts.append(rec) return alerts def map_to_fiche(operation, resource, mapping): """ Retourne le répertoire de la fiche correspondant à l'opération et ressource """ cfg = mapping[operation] res_dir = cfg['resource_dir'] prefix = cfg['fiche_prefix'] folder = f"{prefix} {resource.lower()}" return os.path.join(CORPUS_DIR, res_dir, folder) def extract_section(fiche_dir, slug_parent, slug_fille): """ Extrait le contenu complet de la section sans traitement """ # Recherche du répertoire *-slug_parent sec_dirs = [d for d in os.listdir(fiche_dir) if d.endswith(slug_parent) and os.path.isdir(os.path.join(fiche_dir, d))] if not sec_dirs: return '' sec_dir = os.path.join(fiche_dir, sec_dirs[0]) # Recherche du fichier *-slug_fille.md quel que soit le préfixe files = [f for f in os.listdir(sec_dir) if f.endswith(f"-{slug_fille}.md")] if not files: return '' file_path = os.path.join(sec_dir, files[0]) with open(file_path, 'r', encoding='utf-8') as f: return f.read().strip() def build_markdown(intro, entries): """ Construit le Markdown final prêt à coller dans un prompt Recopie intégralement la section sans transformation """ lines = [intro, ''] for e in entries: key = f"{e['operation']}_{e['resource']}" lines.append(f"### {key}") lines.append(f"- **IHH pays** : {e['ihh_pays']} ({e['color_pays']})") lines.append(f"- **IHH acteurs** : {e['ihh_acteurs']} ({e['color_acteurs']})") # Recopie brute de la section for line in e['section_full'].splitlines(): lines.append(line) lines.append('') return '\n'.join(lines) def main(dot_path, output_path='prompt.md'): thresholds = load_config() mapping = load_mapping() records = parse_graph(dot_path) alerts = filter_alerts(records, thresholds) enriched = [] for r in alerts: fiche_dir = map_to_fiche(r['operation'], r['resource'], mapping) full_section = extract_section( fiche_dir, mapping[r['operation']]['slug_parent'], mapping[r['operation']]['indices']['IHH']['slug_fille'] ) enriched.append({ **r, 'section_full': full_section }) intro = ( "Ce rapport, destiné au COMEX et aux responsables risques, a pour objectif " "d’identifier et de détailler les opérations dont l’Indice de Herfindahl-Hirschmann (IHH) " "présente une vulnérabilité élevée." ) md = build_markdown(intro, enriched) with open(output_path, 'w', encoding='utf-8') as f: f.write(md) print(f"Prompt généré : {output_path}") if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('--dot', required=True) parser.add_argument('--output', default='prompt.md') args = parser.parse_args() main(args.dot, args.output)