263 lines
9.5 KiB
Python
Executable File
263 lines
9.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Script de surveillance de répertoire pour l'injection automatique dans PrivateGPT
|
|
|
|
Ce script surveille un répertoire et injecte automatiquement les nouveaux fichiers
|
|
dans PrivateGPT dès qu'ils sont ajoutés ou modifiés.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Set, Dict, List
|
|
from datetime import datetime
|
|
|
|
try:
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
|
|
except ImportError:
|
|
print("Bibliothèque 'watchdog' non installée. Installation en cours...")
|
|
subprocess.run([sys.executable, "-m", "pip", "install", "watchdog"])
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler, FileCreatedEvent, FileModifiedEvent
|
|
|
|
# Configuration du logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
logging.FileHandler("pgpt_watch_directory.log")
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Extensions de fichiers couramment supportées par PrivateGPT
|
|
SUPPORTED_EXTENSIONS = {
|
|
'.pdf', '.txt', '.md', '.doc', '.docx', '.ppt', '.pptx',
|
|
'.xls', '.xlsx', '.csv', '.epub', '.html', '.htm'
|
|
}
|
|
|
|
class DocumentHandler(FileSystemEventHandler):
|
|
"""Gestionnaire d'événements pour les fichiers de documents."""
|
|
|
|
def __init__(self, watch_dir: str, ingest_script: str, pgpt_url: str,
|
|
extensions: Set[str], delay: int = 5):
|
|
"""
|
|
Initialise le gestionnaire d'événements.
|
|
|
|
Args:
|
|
watch_dir: Répertoire à surveiller
|
|
ingest_script: Chemin vers le script d'injection
|
|
pgpt_url: URL de l'API PrivateGPT
|
|
extensions: Extensions de fichiers à traiter
|
|
delay: Délai en secondes à attendre avant le traitement (évite de traiter des fichiers partiellement écrits)
|
|
"""
|
|
self.watch_dir = os.path.abspath(watch_dir)
|
|
self.ingest_script = os.path.abspath(ingest_script)
|
|
self.pgpt_url = pgpt_url
|
|
self.extensions = extensions
|
|
self.delay = delay
|
|
|
|
# Queue pour les fichiers en attente de traitement
|
|
self.pending_files: Dict[str, float] = {}
|
|
|
|
# Vérifier que le script d'injection existe
|
|
if not os.path.exists(self.ingest_script):
|
|
logger.error(f"Le script d'injection {self.ingest_script} n'existe pas!")
|
|
raise FileNotFoundError(f"Script d'injection introuvable: {self.ingest_script}")
|
|
|
|
def on_created(self, event):
|
|
"""Appelé lorsqu'un fichier est créé."""
|
|
if not event.is_directory:
|
|
self._handle_file_event(event)
|
|
|
|
def on_modified(self, event):
|
|
"""Appelé lorsqu'un fichier est modifié."""
|
|
if not event.is_directory:
|
|
self._handle_file_event(event)
|
|
|
|
def _handle_file_event(self, event):
|
|
"""Traite un événement de fichier (création ou modification)."""
|
|
file_path = event.src_path
|
|
file_ext = os.path.splitext(file_path)[1].lower()
|
|
|
|
# Ignorer les fichiers non supportés
|
|
if file_ext not in self.extensions:
|
|
return
|
|
|
|
# Ignorer les fichiers temporaires et cachés
|
|
file_name = os.path.basename(file_path)
|
|
if file_name.startswith('.') or file_name.startswith('~') or file_name.endswith('.tmp'):
|
|
return
|
|
|
|
# Ajouter à la queue avec l'horodatage actuel
|
|
self.pending_files[file_path] = time.time()
|
|
logger.info(f"Fichier détecté: {file_path} (en attente pendant {self.delay} secondes)")
|
|
|
|
def process_pending_files(self):
|
|
"""Traite les fichiers en attente qui ont dépassé le délai d'attente."""
|
|
current_time = time.time()
|
|
files_to_process: List[str] = []
|
|
|
|
# Identifier les fichiers prêts à être traités
|
|
for file_path, timestamp in list(self.pending_files.items()):
|
|
if current_time - timestamp >= self.delay:
|
|
if os.path.exists(file_path): # Vérifier que le fichier existe toujours
|
|
files_to_process.append(file_path)
|
|
self.pending_files.pop(file_path)
|
|
|
|
# Traiter les fichiers par lot
|
|
if files_to_process:
|
|
self._ingest_files(files_to_process)
|
|
|
|
def _ingest_files(self, files: List[str]):
|
|
"""
|
|
Injecte une liste de fichiers en utilisant le script d'injection.
|
|
|
|
Args:
|
|
files: Liste des chemins de fichiers à injecter
|
|
"""
|
|
try:
|
|
# Créer un répertoire temporaire pour stocker la liste des fichiers
|
|
temp_dir = os.path.join(os.path.dirname(self.ingest_script), "temp")
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
# Créer un fichier de liste
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
list_file = os.path.join(temp_dir, f"files_to_ingest_{timestamp}.txt")
|
|
|
|
with open(list_file, "w") as f:
|
|
for file_path in files:
|
|
f.write(f"{file_path}\n")
|
|
|
|
# Construire la commande pour le script d'injection
|
|
for file_path in files:
|
|
file_dir = os.path.dirname(file_path)
|
|
logger.info(f"Injection de {file_path}...")
|
|
|
|
cmd = [
|
|
sys.executable,
|
|
self.ingest_script,
|
|
"-d", file_dir,
|
|
"-u", self.pgpt_url,
|
|
"--extensions", os.path.splitext(file_path)[1][1:] # Extension sans le point
|
|
]
|
|
|
|
# Exécuter la commande
|
|
process = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if process.returncode == 0:
|
|
logger.info(f"Injection réussie de {file_path}")
|
|
else:
|
|
logger.error(f"Échec de l'injection de {file_path}: {process.stderr}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'injection des fichiers: {str(e)}")
|
|
|
|
def parse_arguments():
|
|
"""Parse les arguments de ligne de commande."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Surveille un répertoire et injecte automatiquement les nouveaux fichiers dans PrivateGPT"
|
|
)
|
|
parser.add_argument(
|
|
"-d", "--directory",
|
|
type=str,
|
|
required=True,
|
|
help="Chemin du répertoire à surveiller"
|
|
)
|
|
parser.add_argument(
|
|
"-s", "--script",
|
|
type=str,
|
|
default=None,
|
|
help="Chemin vers le script auto_ingest.py (par défaut: détection automatique)"
|
|
)
|
|
parser.add_argument(
|
|
"-u", "--url",
|
|
type=str,
|
|
default="http://localhost:8001",
|
|
help="URL de l'API PrivateGPT (défaut: http://localhost:8001)"
|
|
)
|
|
parser.add_argument(
|
|
"--delay",
|
|
type=int,
|
|
default=5,
|
|
help="Délai en secondes avant de traiter un nouveau fichier (défaut: 5)"
|
|
)
|
|
parser.add_argument(
|
|
"--extensions",
|
|
nargs="+",
|
|
help="Liste d'extensions spécifiques à surveiller (ex: pdf txt)"
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
"""Fonction principale."""
|
|
args = parse_arguments()
|
|
|
|
# Préparation des extensions si spécifiées
|
|
extensions = set(args.extensions) if args.extensions else SUPPORTED_EXTENSIONS
|
|
# Assurer que les extensions commencent par un point
|
|
extensions = {ext if ext.startswith('.') else f'.{ext}' for ext in extensions}
|
|
|
|
# Déterminer le chemin du script d'injection
|
|
if args.script:
|
|
ingest_script = args.script
|
|
else:
|
|
# Utiliser le script auto_ingest.py dans le même répertoire que ce script
|
|
ingest_script = os.path.join(os.path.dirname(os.path.abspath(__file__)), "auto_ingest.py")
|
|
|
|
# Créer le répertoire de surveillance s'il n'existe pas
|
|
watch_dir = os.path.abspath(args.directory)
|
|
if not os.path.exists(watch_dir):
|
|
logger.info(f"Création du répertoire de surveillance: {watch_dir}")
|
|
os.makedirs(watch_dir, exist_ok=True)
|
|
|
|
logger.info(f"Démarrage de la surveillance de {watch_dir}")
|
|
logger.info(f"URL PrivateGPT: {args.url}")
|
|
logger.info(f"Extensions surveillées: {', '.join(extensions)}")
|
|
logger.info(f"Délai de traitement: {args.delay} secondes")
|
|
|
|
# Initialiser le gestionnaire et l'observateur
|
|
event_handler = DocumentHandler(
|
|
watch_dir=watch_dir,
|
|
ingest_script=ingest_script,
|
|
pgpt_url=args.url,
|
|
extensions=extensions,
|
|
delay=args.delay
|
|
)
|
|
|
|
observer = Observer()
|
|
observer.schedule(event_handler, path=watch_dir, recursive=True)
|
|
observer.start()
|
|
|
|
try:
|
|
logger.info("Surveillance en cours... (Ctrl+C pour quitter)")
|
|
|
|
while True:
|
|
# Traiter les fichiers en attente
|
|
event_handler.process_pending_files()
|
|
time.sleep(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\nInterruption par l'utilisateur. Arrêt de la surveillance.")
|
|
observer.stop()
|
|
|
|
observer.join()
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except Exception as e:
|
|
logger.error(f"Erreur non gérée: {str(e)}", exc_info=True)
|
|
sys.exit(1) |