Code/IA/02 - injection_fiches/watch_directory.py
2025-06-04 08:35:53 +02:00

263 lines
9.5 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script de surveillance de répertoire pour l'injection automatique dans PrivateGPT
Ce script surveille un répertoire et injecte automatiquement les nouveaux fichiers
dans PrivateGPT dès qu'ils sont ajoutés ou modifiés.
"""
import os
import sys
import time
import logging
import argparse
import subprocess
from pathlib import Path
from typing import Set, Dict, List
from datetime import datetime
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
except ImportError:
print("Bibliothèque 'watchdog' non installée. Installation en cours...")
subprocess.run([sys.executable, "-m", "pip", "install", "watchdog"])
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler("pgpt_watch_directory.log")
]
)
logger = logging.getLogger(__name__)
# Extensions de fichiers couramment supportées par PrivateGPT
SUPPORTED_EXTENSIONS = {
'.pdf', '.txt', '.md', '.doc', '.docx', '.ppt', '.pptx',
'.xls', '.xlsx', '.csv', '.epub', '.html', '.htm'
}
class DocumentHandler(FileSystemEventHandler):
"""Gestionnaire d'événements pour les fichiers de documents."""
def __init__(self, watch_dir: str, ingest_script: str, pgpt_url: str,
extensions: Set[str], delay: int = 5):
"""
Initialise le gestionnaire d'événements.
Args:
watch_dir: Répertoire à surveiller
ingest_script: Chemin vers le script d'injection
pgpt_url: URL de l'API PrivateGPT
extensions: Extensions de fichiers à traiter
delay: Délai en secondes à attendre avant le traitement (évite de traiter des fichiers partiellement écrits)
"""
self.watch_dir = os.path.abspath(watch_dir)
self.ingest_script = os.path.abspath(ingest_script)
self.pgpt_url = pgpt_url
self.extensions = extensions
self.delay = delay
# Queue pour les fichiers en attente de traitement
self.pending_files: Dict[str, float] = {}
# Vérifier que le script d'injection existe
if not os.path.exists(self.ingest_script):
logger.error(f"Le script d'injection {self.ingest_script} n'existe pas!")
raise FileNotFoundError(f"Script d'injection introuvable: {self.ingest_script}")
def on_created(self, event):
"""Appelé lorsqu'un fichier est créé."""
if not event.is_directory:
self._handle_file_event(event)
def on_modified(self, event):
"""Appelé lorsqu'un fichier est modifié."""
if not event.is_directory:
self._handle_file_event(event)
def _handle_file_event(self, event):
"""Traite un événement de fichier (création ou modification)."""
file_path = event.src_path
file_ext = os.path.splitext(file_path)[1].lower()
# Ignorer les fichiers non supportés
if file_ext not in self.extensions:
return
# Ignorer les fichiers temporaires et cachés
file_name = os.path.basename(file_path)
if file_name.startswith('.') or file_name.startswith('~') or file_name.endswith('.tmp'):
return
# Ajouter à la queue avec l'horodatage actuel
self.pending_files[file_path] = time.time()
logger.info(f"Fichier détecté: {file_path} (en attente pendant {self.delay} secondes)")
def process_pending_files(self):
"""Traite les fichiers en attente qui ont dépassé le délai d'attente."""
current_time = time.time()
files_to_process: List[str] = []
# Identifier les fichiers prêts à être traités
for file_path, timestamp in list(self.pending_files.items()):
if current_time - timestamp >= self.delay:
if os.path.exists(file_path): # Vérifier que le fichier existe toujours
files_to_process.append(file_path)
self.pending_files.pop(file_path)
# Traiter les fichiers par lot
if files_to_process:
self._ingest_files(files_to_process)
def _ingest_files(self, files: List[str]):
"""
Injecte une liste de fichiers en utilisant le script d'injection.
Args:
files: Liste des chemins de fichiers à injecter
"""
try:
# Créer un répertoire temporaire pour stocker la liste des fichiers
temp_dir = os.path.join(os.path.dirname(self.ingest_script), "temp")
os.makedirs(temp_dir, exist_ok=True)
# Créer un fichier de liste
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
list_file = os.path.join(temp_dir, f"files_to_ingest_{timestamp}.txt")
with open(list_file, "w") as f:
for file_path in files:
f.write(f"{file_path}\n")
# Construire la commande pour le script d'injection
for file_path in files:
file_dir = os.path.dirname(file_path)
logger.info(f"Injection de {file_path}...")
cmd = [
sys.executable,
self.ingest_script,
"-d", file_dir,
"-u", self.pgpt_url,
"--extensions", os.path.splitext(file_path)[1][1:] # Extension sans le point
]
# Exécuter la commande
process = subprocess.run(
cmd,
capture_output=True,
text=True
)
if process.returncode == 0:
logger.info(f"Injection réussie de {file_path}")
else:
logger.error(f"Échec de l'injection de {file_path}: {process.stderr}")
except Exception as e:
logger.error(f"Erreur lors de l'injection des fichiers: {str(e)}")
def parse_arguments():
"""Parse les arguments de ligne de commande."""
parser = argparse.ArgumentParser(
description="Surveille un répertoire et injecte automatiquement les nouveaux fichiers dans PrivateGPT"
)
parser.add_argument(
"-d", "--directory",
type=str,
required=True,
help="Chemin du répertoire à surveiller"
)
parser.add_argument(
"-s", "--script",
type=str,
default=None,
help="Chemin vers le script auto_ingest.py (par défaut: détection automatique)"
)
parser.add_argument(
"-u", "--url",
type=str,
default="http://localhost:8001",
help="URL de l'API PrivateGPT (défaut: http://localhost:8001)"
)
parser.add_argument(
"--delay",
type=int,
default=5,
help="Délai en secondes avant de traiter un nouveau fichier (défaut: 5)"
)
parser.add_argument(
"--extensions",
nargs="+",
help="Liste d'extensions spécifiques à surveiller (ex: pdf txt)"
)
return parser.parse_args()
def main():
"""Fonction principale."""
args = parse_arguments()
# Préparation des extensions si spécifiées
extensions = set(args.extensions) if args.extensions else SUPPORTED_EXTENSIONS
# Assurer que les extensions commencent par un point
extensions = {ext if ext.startswith('.') else f'.{ext}' for ext in extensions}
# Déterminer le chemin du script d'injection
if args.script:
ingest_script = args.script
else:
# Utiliser le script auto_ingest.py dans le même répertoire que ce script
ingest_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "auto_ingest.py")
# Créer le répertoire de surveillance s'il n'existe pas
watch_dir = os.path.abspath(args.directory)
if not os.path.exists(watch_dir):
logger.info(f"Création du répertoire de surveillance: {watch_dir}")
os.makedirs(watch_dir, exist_ok=True)
logger.info(f"Démarrage de la surveillance de {watch_dir}")
logger.info(f"URL PrivateGPT: {args.url}")
logger.info(f"Extensions surveillées: {', '.join(extensions)}")
logger.info(f"Délai de traitement: {args.delay} secondes")
# Initialiser le gestionnaire et l'observateur
event_handler = DocumentHandler(
watch_dir=watch_dir,
ingest_script=ingest_script,
pgpt_url=args.url,
extensions=extensions,
delay=args.delay
)
observer = Observer()
observer.schedule(event_handler, path=watch_dir, recursive=True)
observer.start()
try:
logger.info("Surveillance en cours... (Ctrl+C pour quitter)")
while True:
# Traiter les fichiers en attente
event_handler.process_pending_files()
time.sleep(1)
except KeyboardInterrupt:
logger.info("\nInterruption par l'utilisateur. Arrêt de la surveillance.")
observer.stop()
observer.join()
if __name__ == "__main__":
try:
main()
except Exception as e:
logger.error(f"Erreur non gérée: {str(e)}", exc_info=True)
sys.exit(1)