import streamlit as st from dateutil import parser from collections import defaultdict import os import csv import requests import base64 import re import json import html # Configuration Gitea GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1") GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") ORGANISATION = os.getenv("ORGANISATION", "fabnum") DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches") ENV = os.getenv("ENV") def charger_fiches_et_labels(): chemin_csv = os.path.join("assets", "fiches_labels.csv") dictionnaire_fiches = {} try: with open(chemin_csv, mode="r", encoding="utf-8") as fichier_csv: lecteur = csv.DictReader(fichier_csv) for ligne in lecteur: fiche = ligne.get("Fiche") operations = ligne.get("Label opération") item = ligne.get("Label item") if fiche and operations and item: dictionnaire_fiches[fiche.strip()] = { "operations": [op.strip() for op in operations.split("/")], "item": item.strip() } except FileNotFoundError: st.error(f"❌ Le fichier {chemin_csv} est introuvable.") except Exception as e: st.error(f"❌ Erreur lors du chargement des fiches : {str(e)}") return dictionnaire_fiches def rechercher_tickets_gitea(fiche_selectionnee): headers = {"Authorization": f"token " + GITEA_TOKEN} params = {"state": "open"} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues" try: reponse = requests.get(url, headers=headers, params=params, timeout=10) reponse.raise_for_status() issues = reponse.json() correspondances = charger_fiches_et_labels() cible = correspondances.get(fiche_selectionnee) if not cible: return [] labels_cibles = set(cible["operations"] + [cible["item"]]) tickets_associes = [] for issue in issues: if issue.get("ref") != f"refs/heads/{ENV}": continue issue_labels = set() for label in issue.get("labels", []): if isinstance(label, dict) and "name" in label: issue_labels.add(label["name"]) if labels_cibles.issubset(issue_labels): tickets_associes.append(issue) return tickets_associes except requests.RequestException as e: st.error(f"Erreur lors de la récupération des tickets : {e}") return [] def extraire_statut_par_label(ticket): labels = [label.get('name', '') for label in ticket.get('labels', [])] for statut in ["Backlog", "En attente de traitement", "En cours", "Terminés", "Non retenus"]: if statut in labels: return statut return "Autres" def afficher_tickets_par_fiche(tickets): if not tickets: st.info("Aucun ticket lié à cette fiche.") return st.markdown("📝 **Tickets associés à cette fiche**") tickets_groupes = defaultdict(list) for ticket in tickets: statut = extraire_statut_par_label(ticket) tickets_groupes[statut].append(ticket) nb_backlogs = len(tickets_groupes["Backlog"]) if nb_backlogs == 1: st.info(f" ⤇ {nb_backlogs} ticket en attente de modération n'est pas affiché.") else : st.info(f" ⤇ {nb_backlogs} ticket(s) en attente de modération ne sont pas affichés.") ordre_statuts = ["En attente de traitement", "En cours", "Terminés", "Non retenus", "Autres"] for statut in ordre_statuts: if tickets_groupes[statut]: with st.expander(f"{statut} ({len(tickets_groupes[statut])})", expanded=(statut == "En cours")): for ticket in tickets_groupes[statut]: afficher_carte_ticket(ticket) def recuperer_commentaires_ticket(issue_index): headers = { "Authorization": f"token {GITEA_TOKEN}" } url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues/{issue_index}/comments" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() commentaires = response.json() return commentaires except Exception as e: st.error(f"Erreur lors de la récupération des commentaires pour le ticket {issue_index} : {e}") return [] def afficher_carte_ticket(ticket): titre = ticket.get("title", "Sans titre") url = ticket.get("html_url", "") user = ticket.get("user", {}).get("login", "inconnu") created = ticket.get("created_at", "") updated = ticket.get("updated_at", "") body = ticket.get("body", "") labels = [l["name"] for l in ticket.get("labels", []) if "name" in l] sujet = "" match = re.search(r"## Sujet de la proposition\s+(.+?)(\n|$)", body, re.DOTALL) if match: sujet = match.group(1).strip() if created: try: dt_created = parser.isoparse(created) date_created_str = dt_created.strftime("%d/%m/%Y") except Exception: date_created_str = "Date inconnue" else: date_created_str = "Date inconnue" if updated and updated != created: try: dt_updated = parser.isoparse(updated) date_updated_str = dt_updated.strftime("%d/%m/%Y") maj_info = f"(MAJ {date_updated_str})" except Exception: maj_info = "" else: maj_info = "" commentaires = recuperer_commentaires_ticket(ticket.get("number")) commentaires_html = '' if commentaires: for commentaire in commentaires: auteur = html.escape(commentaire.get('user', {}).get('login', 'inconnu')) contenu = html.escape(commentaire.get('body', '')) date_commentaire = commentaire.get('created_at', '') if date_commentaire: try: dt_comment = parser.isoparse(date_commentaire) date_commentaire_str = dt_comment.strftime("%d/%m/%Y") except Exception: date_commentaire_str = "" else: date_commentaire_str = "" commentaires_html += f"""

{auteur} ({date_commentaire_str})

{contenu}

""" else: commentaires_html = '

Aucun commentaire.

' with st.container(): st.markdown(f"""

🎫 {titre}

Ouvert par {html.escape(user)} le {date_created_str} {maj_info}

Sujet de la proposition : {html.escape(sujet)}

{' • '.join(html.escape(label) for label in labels) if labels else 'aucun'}


""", unsafe_allow_html=True) st.markdown("**Contenu du ticket :**") st.markdown(body, unsafe_allow_html=False) st.markdown("---") st.markdown("**Commentaires :**") st.markdown(commentaires_html, unsafe_allow_html=True) def get_labels_existants(): headers = {"Authorization": f"token {GITEA_TOKEN}"} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/labels" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() labels_data = response.json() return {label['name']: label['id'] for label in labels_data} except Exception as e: st.error(f"Erreur lors de la récupération des labels existants : {e}") return {} def creer_ticket_gitea(titre, corps, labels): headers = { "Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json" } url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues" data = { "title": titre, "body": corps, "labels": labels, "ref": f"refs/heads/{ENV}" } try: response = requests.post(url, headers=headers, data=json.dumps(data), timeout=10) response.raise_for_status() issue = response.json() issue_url = issue.get("html_url", "") if issue_url: st.success(f"✅ Ticket créé avec succès ! [Voir le ticket]({issue_url})") else: st.success("✅ Ticket créé avec succès !") except Exception as e: st.error(f"❌ Erreur lors de la création du ticket : {e}") def charger_modele_ticket(): headers = {"Authorization": f"token {GITEA_TOKEN}"} url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/.gitea/ISSUE_TEMPLATE/Contenu.md" try: response = requests.get(url, headers=headers, timeout=10) response.raise_for_status() contenu_base64 = response.json().get("content", "") contenu = base64.b64decode(contenu_base64).decode("utf-8") return contenu except Exception as e: st.error(f"Erreur lors du chargement du modèle de ticket : {e}") return "" def gerer_tickets_fiche(fiche_selectionnee): st.markdown("""
""", unsafe_allow_html=True) st.markdown("### 🧾 Gestion des tickets pour cette fiche") tickets = rechercher_tickets_gitea(fiche_selectionnee) afficher_tickets_par_fiche(tickets) formulaire_creation_ticket_dynamique(fiche_selectionnee) # Modification de formulaire_creation_ticket_dynamique pour ajouter Annuler def formulaire_creation_ticket_dynamique(fiche_selectionnee): with st.expander("➕ Créer un nouveau ticket lié à cette fiche", expanded=False): contenu_modele = charger_modele_ticket() if not contenu_modele: st.error("Impossible de charger le modèle de ticket.") return sections = {} lignes = contenu_modele.splitlines() titre_courant = None contenu_section = [] for ligne in lignes: if ligne.startswith("## ") and titre_courant: sections[titre_courant] = "\n".join(contenu_section).strip() titre_courant = ligne[3:].strip() contenu_section = [] elif ligne.startswith("## "): titre_courant = ligne[3:].strip() contenu_section = [] elif titre_courant: contenu_section.append(ligne) if titre_courant and contenu_section: sections[titre_courant] = "\n".join(contenu_section).strip() reponses = {} labels = [] selected_operations = [] correspondances = charger_fiches_et_labels() cible = correspondances.get(fiche_selectionnee) if cible: if len(cible["operations"]) == 1: labels.append(cible["operations"][0]) elif len(cible["operations"]) > 1: selected_operations = st.multiselect("Labels opération à associer", cible["operations"], default=cible["operations"]) for section, aide in sections.items(): if "Type de contribution" in section: options = re.findall(r"- \[.\] (.+)", aide) clean_options = [] for opt in options: base = opt.split(":")[0].strip() if base not in clean_options: clean_options.append(base) if "Autre" not in clean_options: clean_options.append("Autre") type_contribution = st.radio("Type de contribution", clean_options) if type_contribution == "Autre": autre = st.text_input("Précisez le type de contribution") reponses[section] = autre else: reponses[section] = type_contribution elif "Fiche concernée" in section: base_url = f"https://fabnum-git.peccini.fr/FabNum/Fiches/src/branch/{ENV}/Documents/" url_fiche = f"{base_url}{fiche_selectionnee.replace(' ', '%20')}" reponses[section] = url_fiche st.text_input("Fiche concernée", value=url_fiche, disabled=True) elif "Sujet de la proposition" in section: reponses[section] = st.text_input(section, help=aide) else: reponses[section] = st.text_area(section, help=aide) col1, col2 = st.columns(2) with col1: if st.button("Prévisualiser le ticket"): st.session_state.previsualiser = True with col2: if st.button("Annuler"): st.session_state.previsualiser = False st.rerun() if st.session_state.get("previsualiser", False): st.subheader("Prévisualisation du ticket") for section, texte in reponses.items(): st.markdown(f"#### {section}") st.markdown(texte) if st.button("Confirmer la création du ticket"): if cible: labels.append(cible["item"]) if selected_operations: labels.extend(selected_operations) labels = list(set([l.strip() for l in labels if l and l.strip()])) titre_ticket = reponses.get("Sujet de la proposition", "").strip() or "Ticket FabNum" labels_existants = get_labels_existants() labels_ids = [labels_existants[l] for l in labels if l in labels_existants] if "Backlog" in labels_existants: labels_ids.append(labels_existants["Backlog"]) corps = "" for section, texte in reponses.items(): corps += f"## {section}\n{texte}\n\n" creer_ticket_gitea(titre=titre_ticket, corps=corps, labels=labels_ids) st.success("Formulaire vidé après création du ticket.")