Update index.py

This commit is contained in:
Stéphan Peccini 2025-05-19 07:10:06 +02:00
parent a3608353a2
commit 70d856c58b

209
index.py
View File

@ -1,151 +1,104 @@
#!/usr/bin/env python3
"""
Indexation incrémentale des fiches avec BGEM3 (FlagEmbedding) + FAISS
---------------------------------------------------------------------
Parcourt récursivement le dossier Fiches (ext .md .MD .markdown .txt)
Découpe chaque fichier en blocs de ~800 tokens (chevauchement 100)
Encode uniquement les passages provenant de fichiers **nouveaux ou modifiés**
depuis la dernière indexation (basé sur l'horodatage mtime).
Les embeddings sont ajoutés à l'index existant sans toucher aux anciens.
Écrit/Met à jour : corpus.idx (vecteurs) + corpus.meta.json (métadonnées)
index.py Indexation « minifiches » SANS découpage
====================================================
 Robustesse améliorée : le script détecte tous les formats de retour possibles
de `BGEM3FlagModel.encode` (ndarray ou dict avec clef `embedding`, etc.).
Objectif : chaque fichier (chapitre) devient **un seul** passage, afin de
préserver lintégrité des tableaux, listes, etc. conformément à votre
organisation manuelle.
Caractéristiques :
Incrémental : seuls les fichiers nouveaux ou modifiés sont encodés.
Paramètres CLI :
--root racine des fiches (défaut : Corpus)
--index nom du fichier idx (défaut : corpus.idx)
--meta nom du fichier méta (défaut : corpus.meta.json)
Extensions prises : .md .markdown .txt
Embeddings : BGEM3 (FlagEmbedding) en CPU, normalisés L2.
Usage :
python index.py # première indexation (tous les fichiers)
python index.py # relance instantanée (rien à faire)
touch Corpus//nouveau.md
python index.py # encode seulement 1 fichier
"""
import argparse, json, os, time
from pathlib import Path
import json
import re
import time
from datetime import datetime
import faiss
import numpy as np
import faiss, numpy as np
from FlagEmbedding import BGEM3FlagModel
from rich import print
# --- Paramètres -------------------------------------------------------------
ROOT = Path("Fiches") # répertoire local des fiches
MODEL_NAME = "BAAI/bge-m3" # embedder multilingue, MIT
CHUNK = 800 # taille cible d'un bloc (≈600 mots)
OVERLAP = 100 # chevauchement pour la cohérence
INDEX_FILE = "corpus.idx"
META_FILE = "corpus.meta.json"
EXTENSIONS = ["*.md", "*.MD", "*.markdown", "*.txt"]
BATCH = 256 # plus grand batch : encode plus vite
# --------------------- CLI --------------------------------------------------
parser = argparse.ArgumentParser(description="Indexation incrémentale des minifiches (1 fichier = 1 passage).")
parser.add_argument("--root", default="Corpus", help="Répertoire racine des fiches")
parser.add_argument("--index", default="corpus.idx", help="Nom du fichier FAISS")
parser.add_argument("--meta", default="corpus.meta.json", help="Nom du méta JSON")
args = parser.parse_args()
# --- Fonctions utilitaires --------------------------------------------------
ROOT = Path(args.root).expanduser()
INDEX_F = Path(args.index)
META_F = Path(args.meta)
EXTS = {".md", ".markdown", ".txt"}
def split(text: str, chunk_size: int = CHUNK, overlap: int = OVERLAP):
"""Découpe *text* en chunks (~chunk_size mots) tout en
préservant entièrement les tableaux Markdown.
print(f"[dim]Racine : {ROOT} | Index : {INDEX_F}[/]")
Si une ligne contient | ou nest constituée que de tirets (---),
on force la coupure avant / après pour ne pas casser le tableau.
Le reste est découpé sur la ponctuation (. ! ?) avec overlap.
"""
sentences = re.split(r"(?<=[\.!?])\s+", text)
chunks, buf = [], []
# ------------------------ lire méta existant -------------------------------
old_meta = []
old_mtime = {}
if INDEX_F.exists() and META_F.exists():
try:
old_meta = json.load(META_F.open())
old_mtime = {m["path"]: m["mtime"] for m in old_meta}
except Exception as e:
print(f"[yellow]Avertissement : impossible de lire l'ancien méta : {e}. On repart de zéro.[/]")
old_meta = []
old_mtime = {}
for s in sentences:
# ---- table Markdown ------------------------------------------------
if "|" in s or re.fullmatch(r"\s*-{3,}\s*", s):
if buf: # vider le buffer courant
chunks.append(" ".join(buf))
buf = []
chunks.append(s) # garder le tableau entier
continue
# ---- traitement normal --------------------------------------------
buf.append(s)
if len(" ".join(buf).split()) >= chunk_size:
chunks.append(" ".join(buf))
buf = buf[-overlap:] # chevauchement
if buf:
chunks.append(" ".join(buf))
return chunks
def gather_files(root: Path):
for pattern in EXTENSIONS:
yield from root.rglob(pattern)
# --- Chargement éventuel de l'index existant -------------------------------
def load_existing():
if not Path(INDEX_FILE).exists():
return None, [], 0 # pas d'index, pas de meta
index = faiss.read_index(INDEX_FILE)
meta = json.load(open(META_FILE, encoding="utf-8"))
return index, meta, len(meta)
# --- Pipeline principal -----------------------------------------------------
def main():
index, meta, existing = load_existing()
# mapping chemin relatif ➜ mtime stocké
meta_mtime = {m["file"]: m.get("mtime", 0) for m in meta}
# ------------------------ scanner les fichiers -----------------------------
files = [fp for fp in ROOT.rglob("*") if fp.suffix.lower() in EXTS]
files.sort()
new_docs, new_meta = [], []
files_scanned, files_updated = 0, 0
kept_meta = [] # meta non modifiés
for fp in gather_files(ROOT):
files_scanned += 1
rel = fp.relative_to(ROOT).as_posix()
for fp in files:
path_str = str(fp.relative_to(ROOT))
mtime = int(fp.stat().st_mtime)
if meta_mtime.get(rel) == mtime:
continue # inchangé
files_updated += 1
text = fp.read_text(encoding="utf-8", errors="ignore")
for i, chunk in enumerate(split(text)):
new_docs.append(chunk)
new_meta.append({"file": rel, "part": i, "mtime": mtime})
if path_str in old_mtime and old_mtime[path_str] == mtime:
# déjà indexé, rien à faire
kept_meta.append(next(m for m in old_meta if m["path"] == path_str))
continue
# fichier nouveau ou modifié
txt = fp.read_text(encoding="utf-8")
new_docs.append(txt)
new_meta.append({"path": path_str, "mtime": mtime})
if not new_docs:
print("Aucun fichier nouveau ou modifié. Index à jour ✔︎")
return
print(
f"Nouveaux passages : {len(new_docs)} issus de {files_updated} fiches ("\
f"{files_scanned} fiches scannées). Génération des embeddings…"
)
model = BGEM3FlagModel(MODEL_NAME, device="cpu")
emb_out = model.encode(new_docs, batch_size=BATCH)
# --- Gestion robuste du format de sortie --------------------------------
if isinstance(emb_out, np.ndarray):
emb = emb_out
elif isinstance(emb_out, dict):
for key in ("embedding", "embeddings", "sentence_embeds", "sentence_embedding"):
if key in emb_out:
emb = np.asarray(emb_out[key])
break
else:
emb = np.asarray(next(iter(emb_out.values())))
else: # liste de tableaux ?
emb = np.asarray(emb_out)
print(f"Nouveaux/Modifiés : {len(new_docs)} | Conservés : {len(kept_meta)}")
if not new_docs and INDEX_F.exists():
print("Index déjà à jour ✔︎")
exit(0)
# ------------------------ embeddings BGEM3 ---------------------------------
model = BGEM3FlagModel("BAAI/bge-m3", device="cpu")
emb = model.encode(new_docs)
if isinstance(emb, dict):
emb = next(v for v in emb.values() if isinstance(v, np.ndarray))
emb = emb / np.linalg.norm(emb, axis=1, keepdims=True)
emb = emb.astype("float32")
emb /= np.linalg.norm(emb, axis=1, keepdims=True) + 1e-12
if index is None:
index = faiss.IndexFlatIP(emb.shape[1])
index.add(emb)
# ------------------------ mise à jour FAISS --------------------------------
if INDEX_F.exists():
idx = faiss.read_index(str(INDEX_F))
else:
idx = faiss.IndexFlatIP(emb.shape[1])
# Mettre à jour les métadonnées (anciens + nouveaux)
meta.extend(new_meta)
faiss.write_index(index, INDEX_FILE)
json.dump(meta, open(META_FILE, "w", encoding="utf-8"), ensure_ascii=False, indent=2)
idx.add(emb)
faiss.write_index(idx, str(INDEX_F))
print(
f"Index mis à jour : {len(meta)} vecteurs au total ("\
f"+{len(new_docs)}). Dernière maj : {datetime.now().isoformat(timespec='seconds')}"
)
# ------------------------ enregistrer le nouveau méta ----------------------
all_meta = kept_meta + new_meta
json.dump(all_meta, META_F.open("w"), ensure_ascii=False, indent=2)
if __name__ == "__main__":
t0 = time.time()
main()
print(f"Terminé en {time.time() - t0:.1f} s")
print(f"Index mis à jour ✔︎ | Total passages : {idx.ntotal}")