181 lines
5.5 KiB
Python
181 lines
5.5 KiB
Python
import os
|
||
import yaml
|
||
import networkx as nx
|
||
from pathlib import Path
|
||
from networkx.drawing.nx_agraph import read_dot
|
||
|
||
# Constantes de chemins
|
||
BASE_DIR = os.path.abspath(os.path.dirname(__file__))
|
||
ASSETS_DIR = os.path.join(BASE_DIR, 'assets')
|
||
CONFIG_PATH = os.path.join(ASSETS_DIR, 'config.yaml')
|
||
MAPPING_PATH = os.path.join(ASSETS_DIR, 'mapping.yaml')
|
||
CORPUS_DIR = os.path.join(BASE_DIR, 'Corpus')
|
||
|
||
|
||
def load_config(config_path=CONFIG_PATH):
|
||
"""
|
||
Charge les seuils depuis config.yaml
|
||
:return: dict des seuils pour 'IHH'
|
||
"""
|
||
with open(config_path, 'r', encoding='utf-8') as f:
|
||
cfg = yaml.safe_load(f)
|
||
return cfg['seuils']['IHH']
|
||
|
||
|
||
def load_mapping(mapping_path=MAPPING_PATH):
|
||
"""
|
||
Charge le mapping opérations ↔ fiches depuis mapping.yaml
|
||
:return: dict
|
||
"""
|
||
with open(mapping_path, 'r', encoding='utf-8') as f:
|
||
mapping = yaml.safe_load(f)
|
||
return mapping['operations']
|
||
|
||
|
||
def parse_graph(dot_path):
|
||
"""
|
||
Parse le graphe DOT pour extraire les IHH
|
||
:return: list of dicts
|
||
"""
|
||
G = read_dot(dot_path)
|
||
records = []
|
||
for node, attrs in G.nodes(data=True):
|
||
name = node
|
||
if '_' not in name:
|
||
continue
|
||
op, res = name.split('_', 1)
|
||
if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs:
|
||
try:
|
||
ihh_p = float(attrs.get('ihh_pays', 0))
|
||
ihh_a = float(attrs.get('ihh_acteurs', 0))
|
||
except ValueError:
|
||
continue
|
||
records.append({
|
||
'operation': op,
|
||
'resource': res,
|
||
'ihh_pays': ihh_p,
|
||
'ihh_acteurs': ihh_a,
|
||
})
|
||
return records
|
||
|
||
|
||
def classify(value, thresholds):
|
||
"""
|
||
Classifie une valeur selon thresholds
|
||
"""
|
||
v_max = thresholds['vert'].get('max')
|
||
o_min = thresholds['orange'].get('min')
|
||
o_max = thresholds['orange'].get('max')
|
||
r_min = thresholds['rouge'].get('min')
|
||
if v_max is not None and value < v_max:
|
||
return 'vert'
|
||
if o_min is not None and o_max is not None and o_min <= value <= o_max:
|
||
return 'orange'
|
||
if r_min is not None and value >= r_min:
|
||
return 'rouge'
|
||
return 'vert'
|
||
|
||
|
||
def filter_alerts(records, thresholds):
|
||
"""
|
||
Filtre pour orange/rouge
|
||
"""
|
||
alerts = []
|
||
for rec in records:
|
||
cp = classify(rec['ihh_pays'], thresholds)
|
||
ca = classify(rec['ihh_acteurs'], thresholds)
|
||
if cp in ('orange','rouge') or ca in ('orange','rouge'):
|
||
rec['color_pays'] = cp
|
||
rec['color_acteurs'] = ca
|
||
alerts.append(rec)
|
||
return alerts
|
||
|
||
|
||
def map_to_fiche(operation, resource, mapping):
|
||
"""
|
||
Retourne le répertoire de la fiche correspondant à l'opération et ressource
|
||
"""
|
||
cfg = mapping[operation]
|
||
res_dir = cfg['resource_dir']
|
||
prefix = cfg['fiche_prefix']
|
||
folder = f"{prefix} {resource.lower()}"
|
||
return os.path.join(CORPUS_DIR, res_dir, folder)
|
||
|
||
|
||
def extract_section(fiche_dir, slug_parent, slug_fille):
|
||
"""
|
||
Extrait le contenu complet de la section sans traitement
|
||
"""
|
||
# Recherche du répertoire *-slug_parent
|
||
sec_dirs = [d for d in os.listdir(fiche_dir)
|
||
if d.endswith(slug_parent) and os.path.isdir(os.path.join(fiche_dir, d))]
|
||
if not sec_dirs:
|
||
return ''
|
||
sec_dir = os.path.join(fiche_dir, sec_dirs[0])
|
||
# Recherche du fichier *-slug_fille.md quel que soit le préfixe
|
||
files = [f for f in os.listdir(sec_dir)
|
||
if f.endswith(f"-{slug_fille}.md")]
|
||
if not files:
|
||
return ''
|
||
file_path = os.path.join(sec_dir, files[0])
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
return f.read().strip()
|
||
|
||
|
||
def build_markdown(intro, entries):
|
||
"""
|
||
Construit le Markdown final prêt à coller dans un prompt
|
||
Recopie intégralement la section sans transformation
|
||
"""
|
||
lines = [intro, '']
|
||
for e in entries:
|
||
key = f"{e['operation']}_{e['resource']}"
|
||
lines.append(f"### {key}")
|
||
lines.append(f"- **IHH pays** : {e['ihh_pays']} ({e['color_pays']})")
|
||
lines.append(f"- **IHH acteurs** : {e['ihh_acteurs']} ({e['color_acteurs']})")
|
||
# Recopie brute de la section
|
||
for line in e['section_full'].splitlines():
|
||
lines.append(line)
|
||
lines.append('')
|
||
return '\n'.join(lines)
|
||
|
||
|
||
def main(dot_path, output_path='prompt.md'):
|
||
thresholds = load_config()
|
||
mapping = load_mapping()
|
||
records = parse_graph(dot_path)
|
||
alerts = filter_alerts(records, thresholds)
|
||
|
||
enriched = []
|
||
for r in alerts:
|
||
fiche_dir = map_to_fiche(r['operation'], r['resource'], mapping)
|
||
full_section = extract_section(
|
||
fiche_dir,
|
||
mapping[r['operation']]['slug_parent'],
|
||
mapping[r['operation']]['indices']['IHH']['slug_fille']
|
||
)
|
||
enriched.append({
|
||
**r,
|
||
'section_full': full_section
|
||
})
|
||
|
||
intro = (
|
||
"Ce rapport, destiné au COMEX et aux responsables risques, a pour objectif "
|
||
"d’identifier et de détailler les opérations dont l’Indice de Herfindahl-Hirschmann (IHH) "
|
||
"présente une vulnérabilité élevée."
|
||
)
|
||
|
||
md = build_markdown(intro, enriched)
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
f.write(md)
|
||
print(f"Prompt généré : {output_path}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
import argparse
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('--dot', required=True)
|
||
parser.add_argument('--output', default='prompt.md')
|
||
args = parser.parse_args()
|
||
main(args.dot, args.output)
|