import streamlit as st from dateutil import parser from collections import defaultdict import os import csv import requests import base64 import re import json import html # Configuration Gitea GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1") GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") ORGANISATION = os.getenv("ORGANISATION", "fabnum") DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches") ENV = os.getenv("ENV") def charger_fiches_et_labels(): chemin_csv = os.path.join("assets", "fiches_labels.csv") dictionnaire_fiches = {} try: with open(chemin_csv, mode="r", encoding="utf-8") as fichier_csv: lecteur = csv.DictReader(fichier_csv) for ligne in lecteur: fiche = ligne.get("Fiche") operations = ligne.get("Label opération") item = ligne.get("Label item") if fiche and operations and item: dictionnaire_fiches[fiche.strip()] = { "operations": [op.strip() for op in operations.split("/")], "item": item.strip() } except FileNotFoundError: st.error(f"❌ Le fichier {chemin_csv} est introuvable.") except Exception as e: st.error(f"❌ Erreur lors du chargement des fiches : {str(e)}") return dictionnaire_fiches def rechercher_tickets_gitea(fiche_selectionnee): headers = {"Authorization": f"token {GITEA_TOKEN}"} params = {"state": "open"} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues" try: reponse = requests.get(url, headers=headers, params=params, timeout=10) reponse.raise_for_status() issues = reponse.json() correspondances = charger_fiches_et_labels() cible = correspondances.get(fiche_selectionnee) if not cible: return [] labels_cibles = set(cible["operations"] + [cible["item"]]) tickets_associes = [] for issue in issues: if issue.get("ref") != f"refs/heads/{ENV}": continue issue_labels = set(label.get("name", "") for label in issue.get("labels", [])) if labels_cibles.issubset(issue_labels): tickets_associes.append(issue) return tickets_associes except requests.RequestException as e: st.error(f"Erreur lors de la récupération des tickets : {e}") return [] def extraire_statut_par_label(ticket): labels = [label.get('name', '') for label in ticket.get('labels', [])] for statut in ["Backlog", "En attente de traitement", "En cours", "Terminés", "Non retenus"]: if statut in labels: return statut return "Autres" def afficher_tickets_par_fiche(tickets): if not tickets: st.info("Aucun ticket lié à cette fiche.") return st.markdown("**Tickets associés à cette fiche**") tickets_groupes = defaultdict(list) for ticket in tickets: statut = extraire_statut_par_label(ticket) tickets_groupes[statut].append(ticket) nb_backlogs = len(tickets_groupes["Backlog"]) if nb_backlogs: st.info(f"⤇ {nb_backlogs} ticket(s) en attente de modération ne sont pas affichés.") ordre_statuts = ["En attente de traitement", "En cours", "Terminés", "Non retenus", "Autres"] for statut in ordre_statuts: if tickets_groupes[statut]: with st.expander(f"{statut} ({len(tickets_groupes[statut])})", expanded=(statut == "En cours")): for ticket in tickets_groupes[statut]: afficher_carte_ticket(ticket) def recuperer_commentaires_ticket(issue_index): headers = {"Authorization": f"token {GITEA_TOKEN}"} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues/{issue_index}/comments" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return response.json() except Exception as e: st.error(f"Erreur lors de la récupération des commentaires : {e}") return [] def get_labels_existants(): headers = {"Authorization": f"token {GITEA_TOKEN}"} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/labels" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() return {label['name']: label['id'] for label in response.json()} except Exception as e: st.error(f"Erreur lors de la récupération des labels : {e}") return {} def nettoyer_labels(labels): return sorted(set(l.strip() for l in labels if isinstance(l, str) and l.strip())) def construire_corps_ticket_markdown(reponses): return "\n\n".join(f"## {section}\n{texte}" for section, texte in reponses.items()) def creer_ticket_gitea(titre, corps, labels): headers = { "Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json" } data = { "title": titre, "body": corps, "labels": labels, "ref": f"refs/heads/{ENV}" } url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues" try: response = requests.post(url, headers=headers, data=json.dumps(data), timeout=10) response.raise_for_status() issue_url = response.json().get("html_url", "") if issue_url: st.success(f"Ticket créé ! [Voir le ticket]({issue_url})") else: st.success("Ticket créé avec succès.") except Exception as e: st.error(f"❌ Erreur création ticket : {e}") def afficher_carte_ticket(ticket): titre = ticket.get("title", "Sans titre") url = ticket.get("html_url", "") user = ticket.get("user", {}).get("login", "inconnu") created = ticket.get("created_at", "") updated = ticket.get("updated_at", "") body = ticket.get("body", "") labels = [l["name"] for l in ticket.get("labels", []) if "name" in l] sujet = "" match = re.search(r"## Sujet de la proposition\s+(.+?)(\n|$)", body, re.DOTALL) if match: sujet = match.group(1).strip() def format_date(iso): try: return parser.isoparse(iso).strftime("%d/%m/%Y") except: return "?" date_created_str = format_date(created) maj_info = f"(MAJ {format_date(updated)})" if updated and updated != created else "" commentaires = recuperer_commentaires_ticket(ticket.get("number")) commentaires_html = "" for commentaire in commentaires: auteur = html.escape(commentaire.get('user', {}).get('login', 'inconnu')) contenu = html.escape(commentaire.get('body', '')) date = format_date(commentaire.get('created_at', '')) commentaires_html += f"""
Ouvert par {html.escape(user)} le {date_created_str} {maj_info}
Sujet : {html.escape(sujet)}
Labels : {' • '.join(labels) if labels else 'aucun'}
{auteur} ({date})
{contenu}