diff --git a/.gitignore b/.gitignore index b9e6fb7..48cef30 100644 --- a/.gitignore +++ b/.gitignore @@ -6,7 +6,8 @@ __pycache__/ *.pyo *.pyd -.dot +*.dot +prompt.md # Ignorer cache et temporaire .cache/ diff --git a/assets/mapping.yaml b/assets/mapping.yaml new file mode 100644 index 0000000..311aa78 --- /dev/null +++ b/assets/mapping.yaml @@ -0,0 +1,37 @@ +operations: + Extraction: + fiche_prefix: "Fiche minerai" + resource_dir: "Minerai" + section_parent: "Matrice des risques" + slug_parent: "matrices-des-risques" + indices: + IHH: + section_fille: "Indice de Herfindahl-Hirschmann" + slug_fille: "indice-de-herfindahl-hirschmann-extraction" + Traitement: + fiche_prefix: "Fiche minerai" + resource_dir: "Minerai" + section_parent: "Matrice des risques" + slug_parent: "matrices-des-risques" + indices: + IHH: + section_fille: "Indice de Herfindahl-Hirschmann" + slug_fille: "indice-de-herfindahl-hirschmann-traitement" + Assemblage: + fiche_prefix: "Fiche assemblage" + resource_dir: "Assemblage" + section_parent: "Matrice des risques" + slug_parent: "matrices-des-risques" + indices: + IHH: + section_fille: "Indice de Herfindahl-Hirschmann" + slug_fille: "indice-de-herfindahl-hirschmann-assemblage" + Fabrication: + fiche_prefix: "Fiche fabrication" + resource_dir: "Fabrication" + section_parent: "Matrice des risques" + slug_parent: "matrices-des-risques" + indices: + IHH: + section_fille: "Indice de Herfindahl-Hirschmann" + slug_fille: "indice-de-herfindahl-hirschmann-fabrication" diff --git a/generate_corpus.py b/generate_corpus.py new file mode 100644 index 0000000..e3702a5 --- /dev/null +++ b/generate_corpus.py @@ -0,0 +1,95 @@ +import os +import re +import shutil +from pathlib import Path + +EXCLUDE_DIRS = {"Local"} +MAX_SECTION_LENGTH = 1200 # non utilisé ici car découpe selon présence de ### + +def slugify(text): + return re.sub(r'\W+', '-', text.strip()).strip('-').lower() + +def split_markdown_sections_refined(content): + lines = content.splitlines() + sections = [] + header_level_2 = None + section_lines = [] + subsections = [] + current_subsection = None + inside_section = False + + for line in lines: + if line.startswith("## "): + if header_level_2: + if current_subsection: + subsections.append(current_subsection) + sections.append((header_level_2, section_lines, subsections)) + section_lines, subsections = [], [] + current_subsection = None + header_level_2 = line[3:].strip() + inside_section = True + elif line.startswith("### ") and inside_section: + if current_subsection: + subsections.append(current_subsection) + current_subsection = (line[4:].strip(), []) + elif inside_section: + if current_subsection: + current_subsection[1].append(line) + else: + section_lines.append(line) + + if header_level_2: + if current_subsection: + subsections.append(current_subsection) + sections.append((header_level_2, section_lines, subsections)) + return sections + +def process_markdown_file(md_path, rel_output_dir): + with open(md_path, encoding="utf-8") as f: + content = f.read() + sections = split_markdown_sections_refined(content) + + for idx, (sec_title, sec_lines, subsections) in enumerate(sections): + base_name = f"{idx:02d}-{slugify(sec_title)}" + if subsections: + sec_dir = rel_output_dir / base_name + sec_dir.mkdir(parents=True, exist_ok=True) + with open(sec_dir / "_intro.md", "w", encoding="utf-8") as f_out: + f_out.write(f"## {sec_title}\n") + f_out.write("\n".join(sec_lines).strip()) + for sub_idx, (sub_title, sub_lines) in enumerate(subsections): + sub_name = f"{sub_idx:02d}-{slugify(sub_title)}.md" + with open(sec_dir / sub_name, "w", encoding="utf-8") as f_out: + f_out.write(f"### {sub_title}\n") + f_out.write("\n".join(sub_lines).strip()) + else: + with open(rel_output_dir / f"{base_name}.md", "w", encoding="utf-8") as f_out: + f_out.write(f"## {sec_title}\n") + f_out.write("\n".join(sec_lines).strip()) + +def build_corpus_structure(): + BASE_DIR = Path(__file__).resolve().parent + SOURCE_DIR = BASE_DIR / "Fiches" + DEST_DIR = BASE_DIR / "Corpus" + + if DEST_DIR.exists(): + shutil.rmtree(DEST_DIR) + DEST_DIR.mkdir(parents=True, exist_ok=True) + + for root, _, files in os.walk(SOURCE_DIR): + rel_path = Path(root).relative_to(SOURCE_DIR) + if any(part in EXCLUDE_DIRS for part in rel_path.parts): + continue + for file in files: + if not file.endswith(".md") or ".md." in file: + continue + input_file = Path(root) / file + subdir = rel_path + filename_no_ext = Path(file).stem + output_dir = DEST_DIR / subdir / filename_no_ext + output_dir.mkdir(parents=True, exist_ok=True) + process_markdown_file(input_file, output_dir) + +if __name__ == "__main__": + build_corpus_structure() + print("✅ Corpus généré avec succès dans le dossier 'Corpus/'") diff --git a/rapports_IA.py b/rapports_IA.py new file mode 100644 index 0000000..1559563 --- /dev/null +++ b/rapports_IA.py @@ -0,0 +1,180 @@ +import os +import yaml +import networkx as nx +from pathlib import Path +from networkx.drawing.nx_agraph import read_dot + +# Constantes de chemins +BASE_DIR = os.path.abspath(os.path.dirname(__file__)) +ASSETS_DIR = os.path.join(BASE_DIR, 'assets') +CONFIG_PATH = os.path.join(ASSETS_DIR, 'config.yaml') +MAPPING_PATH = os.path.join(ASSETS_DIR, 'mapping.yaml') +CORPUS_DIR = os.path.join(BASE_DIR, 'Corpus') + + +def load_config(config_path=CONFIG_PATH): + """ + Charge les seuils depuis config.yaml + :return: dict des seuils pour 'IHH' + """ + with open(config_path, 'r', encoding='utf-8') as f: + cfg = yaml.safe_load(f) + return cfg['seuils']['IHH'] + + +def load_mapping(mapping_path=MAPPING_PATH): + """ + Charge le mapping opérations ↔ fiches depuis mapping.yaml + :return: dict + """ + with open(mapping_path, 'r', encoding='utf-8') as f: + mapping = yaml.safe_load(f) + return mapping['operations'] + + +def parse_graph(dot_path): + """ + Parse le graphe DOT pour extraire les IHH + :return: list of dicts + """ + G = read_dot(dot_path) + records = [] + for node, attrs in G.nodes(data=True): + name = node + if '_' not in name: + continue + op, res = name.split('_', 1) + if 'ihh_pays' in attrs or 'ihh_acteurs' in attrs: + try: + ihh_p = float(attrs.get('ihh_pays', 0)) + ihh_a = float(attrs.get('ihh_acteurs', 0)) + except ValueError: + continue + records.append({ + 'operation': op, + 'resource': res, + 'ihh_pays': ihh_p, + 'ihh_acteurs': ihh_a, + }) + return records + + +def classify(value, thresholds): + """ + Classifie une valeur selon thresholds + """ + v_max = thresholds['vert'].get('max') + o_min = thresholds['orange'].get('min') + o_max = thresholds['orange'].get('max') + r_min = thresholds['rouge'].get('min') + if v_max is not None and value < v_max: + return 'vert' + if o_min is not None and o_max is not None and o_min <= value <= o_max: + return 'orange' + if r_min is not None and value >= r_min: + return 'rouge' + return 'vert' + + +def filter_alerts(records, thresholds): + """ + Filtre pour orange/rouge + """ + alerts = [] + for rec in records: + cp = classify(rec['ihh_pays'], thresholds) + ca = classify(rec['ihh_acteurs'], thresholds) + if cp in ('orange','rouge') or ca in ('orange','rouge'): + rec['color_pays'] = cp + rec['color_acteurs'] = ca + alerts.append(rec) + return alerts + + +def map_to_fiche(operation, resource, mapping): + """ + Retourne le répertoire de la fiche correspondant à l'opération et ressource + """ + cfg = mapping[operation] + res_dir = cfg['resource_dir'] + prefix = cfg['fiche_prefix'] + folder = f"{prefix} {resource.lower()}" + return os.path.join(CORPUS_DIR, res_dir, folder) + + +def extract_section(fiche_dir, slug_parent, slug_fille): + """ + Extrait le contenu complet de la section sans traitement + """ + # Recherche du répertoire *-slug_parent + sec_dirs = [d for d in os.listdir(fiche_dir) + if d.endswith(slug_parent) and os.path.isdir(os.path.join(fiche_dir, d))] + if not sec_dirs: + return '' + sec_dir = os.path.join(fiche_dir, sec_dirs[0]) + # Recherche du fichier *-slug_fille.md quel que soit le préfixe + files = [f for f in os.listdir(sec_dir) + if f.endswith(f"-{slug_fille}.md")] + if not files: + return '' + file_path = os.path.join(sec_dir, files[0]) + with open(file_path, 'r', encoding='utf-8') as f: + return f.read().strip() + + +def build_markdown(intro, entries): + """ + Construit le Markdown final prêt à coller dans un prompt + Recopie intégralement la section sans transformation + """ + lines = [intro, ''] + for e in entries: + key = f"{e['operation']}_{e['resource']}" + lines.append(f"### {key}") + lines.append(f"- **IHH pays** : {e['ihh_pays']} ({e['color_pays']})") + lines.append(f"- **IHH acteurs** : {e['ihh_acteurs']} ({e['color_acteurs']})") + # Recopie brute de la section + for line in e['section_full'].splitlines(): + lines.append(line) + lines.append('') + return '\n'.join(lines) + + +def main(dot_path, output_path='prompt.md'): + thresholds = load_config() + mapping = load_mapping() + records = parse_graph(dot_path) + alerts = filter_alerts(records, thresholds) + + enriched = [] + for r in alerts: + fiche_dir = map_to_fiche(r['operation'], r['resource'], mapping) + full_section = extract_section( + fiche_dir, + mapping[r['operation']]['slug_parent'], + mapping[r['operation']]['indices']['IHH']['slug_fille'] + ) + enriched.append({ + **r, + 'section_full': full_section + }) + + intro = ( + "Ce rapport, destiné au COMEX et aux responsables risques, a pour objectif " + "d’identifier et de détailler les opérations dont l’Indice de Herfindahl-Hirschmann (IHH) " + "présente une vulnérabilité élevée." + ) + + md = build_markdown(intro, enriched) + with open(output_path, 'w', encoding='utf-8') as f: + f.write(md) + print(f"Prompt généré : {output_path}") + + +if __name__ == '__main__': + import argparse + parser = argparse.ArgumentParser() + parser.add_argument('--dot', required=True) + parser.add_argument('--output', default='prompt.md') + args = parser.parse_args() + main(args.dot, args.output)