Code/utils/tickets_fiche.py
Fabrication du Numérique 967ca4bcf2 Trop de modifications
2025-05-08 00:26:02 +02:00

299 lines
12 KiB
Python

import streamlit as st
from dateutil import parser
from collections import defaultdict
import os
import csv
import requests
import base64
import re
import json
import html
# Configuration Gitea
GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORGANISATION = os.getenv("ORGANISATION", "fabnum")
DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches")
ENV = os.getenv("ENV")
def charger_fiches_et_labels():
chemin_csv = os.path.join("assets", "fiches_labels.csv")
dictionnaire_fiches = {}
try:
with open(chemin_csv, mode="r", encoding="utf-8") as fichier_csv:
lecteur = csv.DictReader(fichier_csv)
for ligne in lecteur:
fiche = ligne.get("Fiche")
operations = ligne.get("Label opération")
item = ligne.get("Label item")
if fiche and operations and item:
dictionnaire_fiches[fiche.strip()] = {
"operations": [op.strip() for op in operations.split("/")],
"item": item.strip()
}
except FileNotFoundError:
st.error(f"❌ Le fichier {chemin_csv} est introuvable.")
except Exception as e:
st.error(f"❌ Erreur lors du chargement des fiches : {str(e)}")
return dictionnaire_fiches
def rechercher_tickets_gitea(fiche_selectionnee):
headers = {"Authorization": f"token {GITEA_TOKEN}"}
params = {"state": "open"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues"
try:
reponse = requests.get(url, headers=headers, params=params, timeout=10)
reponse.raise_for_status()
issues = reponse.json()
correspondances = charger_fiches_et_labels()
cible = correspondances.get(fiche_selectionnee)
if not cible:
return []
labels_cibles = set(cible["operations"] + [cible["item"]])
tickets_associes = []
for issue in issues:
if issue.get("ref") != f"refs/heads/{ENV}":
continue
issue_labels = set(label.get("name", "") for label in issue.get("labels", []))
if labels_cibles.issubset(issue_labels):
tickets_associes.append(issue)
return tickets_associes
except requests.RequestException as e:
st.error(f"Erreur lors de la récupération des tickets : {e}")
return []
def extraire_statut_par_label(ticket):
labels = [label.get('name', '') for label in ticket.get('labels', [])]
for statut in ["Backlog", "En attente de traitement", "En cours", "Terminés", "Non retenus"]:
if statut in labels:
return statut
return "Autres"
def afficher_tickets_par_fiche(tickets):
if not tickets:
st.info("Aucun ticket lié à cette fiche.")
return
st.markdown("**Tickets associés à cette fiche**")
tickets_groupes = defaultdict(list)
for ticket in tickets:
statut = extraire_statut_par_label(ticket)
tickets_groupes[statut].append(ticket)
nb_backlogs = len(tickets_groupes["Backlog"])
if nb_backlogs:
st.info(f"{nb_backlogs} ticket(s) en attente de modération ne sont pas affichés.")
ordre_statuts = ["En attente de traitement", "En cours", "Terminés", "Non retenus", "Autres"]
for statut in ordre_statuts:
if tickets_groupes[statut]:
with st.expander(f"{statut} ({len(tickets_groupes[statut])})", expanded=(statut == "En cours")):
for ticket in tickets_groupes[statut]:
afficher_carte_ticket(ticket)
def recuperer_commentaires_ticket(issue_index):
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues/{issue_index}/comments"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.json()
except Exception as e:
st.error(f"Erreur lors de la récupération des commentaires : {e}")
return []
def get_labels_existants():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/labels"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return {label['name']: label['id'] for label in response.json()}
except Exception as e:
st.error(f"Erreur lors de la récupération des labels : {e}")
return {}
def nettoyer_labels(labels):
return sorted(set(l.strip() for l in labels if isinstance(l, str) and l.strip()))
def construire_corps_ticket_markdown(reponses):
return "\n\n".join(f"## {section}\n{texte}" for section, texte in reponses.items())
def creer_ticket_gitea(titre, corps, labels):
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json"
}
data = {
"title": titre,
"body": corps,
"labels": labels,
"ref": f"refs/heads/{ENV}"
}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues"
try:
response = requests.post(url, headers=headers, data=json.dumps(data), timeout=10)
response.raise_for_status()
issue_url = response.json().get("html_url", "")
if issue_url:
st.success(f"Ticket créé ! [Voir le ticket]({issue_url})")
else:
st.success("Ticket créé avec succès.")
except Exception as e:
st.error(f"❌ Erreur création ticket : {e}")
def afficher_carte_ticket(ticket):
titre = ticket.get("title", "Sans titre")
url = ticket.get("html_url", "")
user = ticket.get("user", {}).get("login", "inconnu")
created = ticket.get("created_at", "")
updated = ticket.get("updated_at", "")
body = ticket.get("body", "")
labels = [l["name"] for l in ticket.get("labels", []) if "name" in l]
sujet = ""
match = re.search(r"## Sujet de la proposition\s+(.+?)(\n|$)", body, re.DOTALL)
if match:
sujet = match.group(1).strip()
def format_date(iso):
try:
return parser.isoparse(iso).strftime("%d/%m/%Y")
except:
return "?"
date_created_str = format_date(created)
maj_info = f"(MAJ {format_date(updated)})" if updated and updated != created else ""
commentaires = recuperer_commentaires_ticket(ticket.get("number"))
commentaires_html = ""
for commentaire in commentaires:
auteur = html.escape(commentaire.get('user', {}).get('login', 'inconnu'))
contenu = html.escape(commentaire.get('body', ''))
date = format_date(commentaire.get('created_at', ''))
commentaires_html += f"""
<div class="conteneur_commentaire">
<p class="commentaire_auteur"><strong>{auteur}</strong> <small>({date})</small></p>
<p class="commentaire_contenu">{contenu}</p>
</div>
"""
with st.container():
st.markdown(f"""
<div class="conteneur_ticket">
<h4><a href='{url}' target='_blank'>{titre}</a></h4>
<p>Ouvert par <strong>{html.escape(user)}</strong> le {date_created_str} {maj_info}</p>
<p>Sujet : <strong>{html.escape(sujet)}</strong></p>
<p>Labels : {''.join(labels) if labels else 'aucun'}</p>
</div>
""", unsafe_allow_html=True)
st.markdown(body, unsafe_allow_html=False)
st.markdown("---")
st.markdown("**Commentaire(s) :**")
st.markdown(commentaires_html or "Aucun commentaire.", unsafe_allow_html=True)
def charger_modele_ticket():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/.gitea/ISSUE_TEMPLATE/Contenu.md"
try:
r = requests.get(url, headers=headers, timeout=10)
r.raise_for_status()
return base64.b64decode(r.json().get("content", "")).decode("utf-8")
except Exception as e:
st.error(f"Erreur chargement modèle : {e}")
return ""
def gerer_tickets_fiche(fiche_selectionnee):
st.markdown("<hr style='border: 1px solid #ccc; margin: 2rem 0;' />", unsafe_allow_html=True)
st.markdown("## Gestion des tickets pour cette fiche")
afficher_tickets_par_fiche(rechercher_tickets_gitea(fiche_selectionnee))
formulaire_creation_ticket_dynamique(fiche_selectionnee)
def formulaire_creation_ticket_dynamique(fiche_selectionnee):
with st.expander("Créer un nouveau ticket lié à cette fiche", expanded=False):
contenu_modele = charger_modele_ticket()
if not contenu_modele:
st.error("Impossible de charger le modèle de ticket.")
return
# Découpe le modèle en sections
sections, reponses = {}, {}
lignes, titre_courant, contenu = contenu_modele.splitlines(), None, []
for ligne in lignes:
if ligne.startswith("## "):
if titre_courant:
sections[titre_courant] = "\n".join(contenu).strip()
titre_courant, contenu = ligne[3:].strip(), []
elif titre_courant:
contenu.append(ligne)
if titre_courant:
sections[titre_courant] = "\n".join(contenu).strip()
# Labels prédéfinis selon la fiche
labels, selected_ops = [], []
correspondances = charger_fiches_et_labels()
cible = correspondances.get(fiche_selectionnee)
if cible:
if len(cible["operations"]) == 1:
labels.append(cible["operations"][0])
elif len(cible["operations"]) > 1:
selected_ops = st.multiselect("Labels opération à associer", cible["operations"], default=cible["operations"])
# Génération des champs
for section, aide in sections.items():
if "Type de contribution" in section:
options = sorted(set(re.findall(r"- \[.\] (.+)", aide)))
if "Autre" not in options:
options.append("Autre")
choix = st.radio("Type de contribution", options)
reponses[section] = st.text_input("Précisez", "") if choix == "Autre" else choix
elif "Fiche concernée" in section:
url_fiche = f"https://fabnum-git.peccini.fr/FabNum/Fiches/src/branch/{ENV}/Documents/{fiche_selectionnee.replace(' ', '%20')}"
reponses[section] = url_fiche
st.text_input("Fiche concernée", value=url_fiche, disabled=True)
elif "Sujet de la proposition" in section:
reponses[section] = st.text_input(section, help=aide)
else:
reponses[section] = st.text_area(section, help=aide)
col1, col2 = st.columns(2)
if col1.button("Prévisualiser le ticket"):
st.session_state.previsualiser = True
if col2.button("Annuler"):
st.session_state.previsualiser = False
st.rerun()
if st.session_state.get("previsualiser", False):
st.subheader("Prévisualisation du ticket")
for section, texte in reponses.items():
st.markdown(f"#### {section}")
st.code(texte, language="markdown")
titre_ticket = reponses.get("Sujet de la proposition", "").strip() or "Ticket FabNum"
final_labels = nettoyer_labels(labels + selected_ops + ([cible["item"]] if cible else []))
st.markdown(f"**Résumé :**\n- **Titre** : `{titre_ticket}`\n- **Labels** : `{', '.join(final_labels)}`")
if st.button("Confirmer la création du ticket"):
labels_existants = get_labels_existants()
labels_ids = [labels_existants[l] for l in final_labels if l in labels_existants]
if "Backlog" in labels_existants:
labels_ids.append(labels_existants["Backlog"])
corps = construire_corps_ticket_markdown(reponses)
creer_ticket_gitea(titre_ticket, corps, labels_ids)
st.session_state.previsualiser = False
st.success("Ticket créé et formulaire vidé.")