#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Script d'injection automatique de documents pour PrivateGPT Ce script parcourt un répertoire spécifié et injecte tous les fichiers compatibles dans PrivateGPT via son API REST. """ import os import sys import time import argparse import logging import requests from pathlib import Path from typing import List, Dict, Tuple, Set from concurrent.futures import ThreadPoolExecutor, as_completed # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler("pgpt_auto_ingest.log") ] ) logger = logging.getLogger(__name__) # Extensions de fichiers couramment supportées par PrivateGPT SUPPORTED_EXTENSIONS = { '.pdf', '.txt', '.md', '.doc', '.docx', '.ppt', '.pptx', '.xls', '.xlsx', '.csv', '.epub', '.html', '.htm' } def parse_arguments(): """Parse les arguments de ligne de commande.""" parser = argparse.ArgumentParser( description="Injecte automatiquement tous les fichiers d'un répertoire dans PrivateGPT" ) parser.add_argument( "-d", "--directory", type=str, required=True, help="Chemin du répertoire contenant les fichiers à injecter" ) parser.add_argument( "-u", "--url", type=str, default="http://localhost:8001", help="URL de l'API PrivateGPT (défaut: http://localhost:8001)" ) parser.add_argument( "-r", "--recursive", action="store_true", help="Parcourir récursivement les sous-répertoires" ) parser.add_argument( "-t", "--threads", type=int, default=5, help="Nombre de threads pour les injections parallèles (défaut: 5)" ) parser.add_argument( "--retry", type=int, default=3, help="Nombre de tentatives en cas d'échec (défaut: 3)" ) parser.add_argument( "--retry-delay", type=int, default=5, help="Délai entre les tentatives en secondes (défaut: 5)" ) parser.add_argument( "--timeout", type=int, default=300, help="Délai d'attente pour chaque requête en secondes (défaut: 300)" ) parser.add_argument( "--extensions", nargs="+", help="Liste d'extensions spécifiques à injecter (ex: .pdf .txt)" ) return parser.parse_args() def find_files(directory: str, recursive: bool = False, extensions: Set[str] = SUPPORTED_EXTENSIONS) -> List[Path]: """ Trouve tous les fichiers avec les extensions spécifiées dans le répertoire. Args: directory: Répertoire à scanner recursive: Si True, parcourt aussi les sous-répertoires extensions: Ensemble d'extensions de fichier à inclure Returns: Liste des chemins de fichiers trouvés """ directory_path = Path(directory) if not directory_path.exists() or not directory_path.is_dir(): logger.error(f"Le répertoire {directory} n'existe pas ou n'est pas un répertoire.") return [] files = [] if recursive: # Parcours récursif for root, _, filenames in os.walk(directory): for filename in filenames: file_path = Path(root) / filename if file_path.suffix.lower() in extensions: files.append(file_path) else: # Parcours non récursif for file_path in directory_path.iterdir(): if file_path.is_file() and file_path.suffix.lower() in extensions: files.append(file_path) return files def ingest_file(file_path: Path, pgpt_url: str, timeout: int, retry_count: int, retry_delay: int) -> Tuple[Path, bool, str]: """ Injecte un fichier dans PrivateGPT. Args: file_path: Chemin du fichier à injecter pgpt_url: URL de base de l'API PrivateGPT timeout: Délai d'attente pour la requête retry_count: Nombre de tentatives en cas d'échec retry_delay: Délai entre les tentatives en secondes Returns: Tuple contenant (chemin_fichier, succès, message) """ ingest_url = f"{pgpt_url}/v1/ingest/file" for attempt in range(retry_count): try: logger.info(f"Injection de {file_path} (tentative {attempt + 1}/{retry_count})") with open(file_path, 'rb') as file: files = {'file': (file_path.name, file, 'application/octet-stream')} response = requests.post(ingest_url, files=files, timeout=timeout) if response.status_code == 200: result = response.json() doc_ids = result.get('document_ids', []) logger.info(f"Succès! {file_path} -> {len(doc_ids)} documents créés") return file_path, True, f"{len(doc_ids)} documents créés" else: error_msg = f"Erreur HTTP {response.status_code}: {response.text}" logger.warning(error_msg) if attempt < retry_count - 1: logger.info(f"Nouvelle tentative dans {retry_delay} secondes...") time.sleep(retry_delay) else: return file_path, False, error_msg except Exception as e: error_msg = f"Exception: {str(e)}" logger.warning(error_msg) if attempt < retry_count - 1: logger.info(f"Nouvelle tentative dans {retry_delay} secondes...") time.sleep(retry_delay) else: return file_path, False, error_msg return file_path, False, "Nombre maximum de tentatives atteint" def main(): """Fonction principale.""" args = parse_arguments() # Préparation des extensions si spécifiées extensions = set(args.extensions) if args.extensions else SUPPORTED_EXTENSIONS # Assurer que les extensions commencent par un point extensions = {ext if ext.startswith('.') else f'.{ext}' for ext in extensions} logger.info(f"Démarrage de l'injection automatique depuis {args.directory}") logger.info(f"URL PrivateGPT: {args.url}") logger.info(f"Mode récursif: {args.recursive}") logger.info(f"Extensions: {', '.join(extensions)}") # Trouver les fichiers files = find_files(args.directory, args.recursive, extensions) total_files = len(files) if total_files == 0: logger.warning(f"Aucun fichier trouvé avec les extensions {', '.join(extensions)} dans {args.directory}") return logger.info(f"Trouvé {total_files} fichiers à injecter") # Statistiques successful = 0 failed = 0 failed_files = [] # Injection des fichiers en parallèle with ThreadPoolExecutor(max_workers=args.threads) as executor: futures = { executor.submit( ingest_file, file_path, args.url, args.timeout, args.retry, args.retry_delay ): file_path for file_path in files } for future in as_completed(futures): file_path, success, message = future.result() if success: successful += 1 else: failed += 1 failed_files.append((file_path, message)) # Afficher la progression progress = (successful + failed) / total_files * 100 logger.info(f"Progression: {progress:.1f}% ({successful + failed}/{total_files})") # Rapport final logger.info("="*50) logger.info("RAPPORT D'INJECTION") logger.info("="*50) logger.info(f"Total des fichiers: {total_files}") logger.info(f"Succès: {successful}") logger.info(f"Échecs: {failed}") if failed > 0: logger.info("\nDétails des échecs:") for file_path, message in failed_files: logger.info(f"- {file_path}: {message}") logger.info("="*50) if __name__ == "__main__": try: main() except KeyboardInterrupt: logger.info("\nInterruption par l'utilisateur. Arrêt du processus.") sys.exit(1) except Exception as e: logger.error(f"Erreur non gérée: {str(e)}", exc_info=True) sys.exit(1)