import streamlit as st
from dateutil import parser
from collections import defaultdict
import os
import csv
import requests
import base64
import re
import json
import html
# Configuration Gitea
GITEA_URL = os.getenv("GITEA_URL", "https://fabnum-git.peccini.fr/api/v1")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORGANISATION = os.getenv("ORGANISATION", "fabnum")
DEPOT_FICHES = os.getenv("DEPOT_FICHES", "fiches")
ENV = os.getenv("ENV")
def charger_fiches_et_labels():
chemin_csv = os.path.join("assets", "fiches_labels.csv")
dictionnaire_fiches = {}
try:
with open(chemin_csv, mode="r", encoding="utf-8") as fichier_csv:
lecteur = csv.DictReader(fichier_csv)
for ligne in lecteur:
fiche = ligne.get("Fiche")
operations = ligne.get("Label opération")
item = ligne.get("Label item")
if fiche and operations and item:
dictionnaire_fiches[fiche.strip()] = {
"operations": [op.strip() for op in operations.split("/")],
"item": item.strip()
}
except FileNotFoundError:
st.error(f"❌ Le fichier {chemin_csv} est introuvable.")
except Exception as e:
st.error(f"❌ Erreur lors du chargement des fiches : {str(e)}")
return dictionnaire_fiches
def rechercher_tickets_gitea(fiche_selectionnee):
headers = {"Authorization": f"token " + GITEA_TOKEN}
params = {"state": "open"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues"
try:
reponse = requests.get(url, headers=headers, params=params, timeout=10)
reponse.raise_for_status()
issues = reponse.json()
correspondances = charger_fiches_et_labels()
cible = correspondances.get(fiche_selectionnee)
if not cible:
return []
labels_cibles = set(cible["operations"] + [cible["item"]])
tickets_associes = []
for issue in issues:
if issue.get("ref") != f"refs/heads/{ENV}":
continue
issue_labels = set()
for label in issue.get("labels", []):
if isinstance(label, dict) and "name" in label:
issue_labels.add(label["name"])
if labels_cibles.issubset(issue_labels):
tickets_associes.append(issue)
return tickets_associes
except requests.RequestException as e:
st.error(f"Erreur lors de la récupération des tickets : {e}")
return []
def extraire_statut_par_label(ticket):
labels = [label.get('name', '') for label in ticket.get('labels', [])]
for statut in ["Backlog", "En attente de traitement", "En cours", "Terminés", "Non retenus"]:
if statut in labels:
return statut
return "Autres"
def afficher_tickets_par_fiche(tickets):
if not tickets:
st.info("Aucun ticket lié à cette fiche.")
return
st.markdown("📝 **Tickets associés à cette fiche**")
tickets_groupes = defaultdict(list)
for ticket in tickets:
statut = extraire_statut_par_label(ticket)
tickets_groupes[statut].append(ticket)
nb_backlogs = len(tickets_groupes["Backlog"])
if nb_backlogs == 1:
st.info(f" ⤇ {nb_backlogs} ticket en attente de modération n'est pas affiché.")
else :
st.info(f" ⤇ {nb_backlogs} ticket(s) en attente de modération ne sont pas affichés.")
ordre_statuts = ["En attente de traitement", "En cours", "Terminés", "Non retenus", "Autres"]
for statut in ordre_statuts:
if tickets_groupes[statut]:
with st.expander(f"{statut} ({len(tickets_groupes[statut])})", expanded=(statut == "En cours")):
for ticket in tickets_groupes[statut]:
afficher_carte_ticket(ticket)
def recuperer_commentaires_ticket(issue_index):
headers = {
"Authorization": f"token {GITEA_TOKEN}"
}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues/{issue_index}/comments"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
commentaires = response.json()
return commentaires
except Exception as e:
st.error(f"Erreur lors de la récupération des commentaires pour le ticket {issue_index} : {e}")
return []
def afficher_carte_ticket(ticket):
titre = ticket.get("title", "Sans titre")
url = ticket.get("html_url", "")
user = ticket.get("user", {}).get("login", "inconnu")
created = ticket.get("created_at", "")
updated = ticket.get("updated_at", "")
body = ticket.get("body", "")
labels = [l["name"] for l in ticket.get("labels", []) if "name" in l]
sujet = ""
match = re.search(r"## Sujet de la proposition\s+(.+?)(\n|$)", body, re.DOTALL)
if match:
sujet = match.group(1).strip()
if created:
try:
dt_created = parser.isoparse(created)
date_created_str = dt_created.strftime("%d/%m/%Y")
except Exception:
date_created_str = "Date inconnue"
else:
date_created_str = "Date inconnue"
if updated and updated != created:
try:
dt_updated = parser.isoparse(updated)
date_updated_str = dt_updated.strftime("%d/%m/%Y")
maj_info = f"(MAJ {date_updated_str})"
except Exception:
maj_info = ""
else:
maj_info = ""
commentaires = recuperer_commentaires_ticket(ticket.get("number"))
commentaires_html = ''
if commentaires:
for commentaire in commentaires:
auteur = html.escape(commentaire.get('user', {}).get('login', 'inconnu'))
contenu = html.escape(commentaire.get('body', ''))
date_commentaire = commentaire.get('created_at', '')
if date_commentaire:
try:
dt_comment = parser.isoparse(date_commentaire)
date_commentaire_str = dt_comment.strftime("%d/%m/%Y")
except Exception:
date_commentaire_str = ""
else:
date_commentaire_str = ""
commentaires_html += f"""
{auteur} ({date_commentaire_str})
{contenu}
"""
else:
commentaires_html = 'Aucun commentaire.
'
with st.container():
st.markdown(f"""
Ouvert par {html.escape(user)} le {date_created_str} {maj_info}
Sujet de la proposition : {html.escape(sujet)}
{' • '.join(html.escape(label) for label in labels) if labels else 'aucun'}
""", unsafe_allow_html=True)
st.markdown("**Contenu du ticket :**")
st.markdown(body, unsafe_allow_html=False)
st.markdown("---")
st.markdown("**Commentaires :**")
st.markdown(commentaires_html, unsafe_allow_html=True)
def get_labels_existants():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/labels"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
labels_data = response.json()
return {label['name']: label['id'] for label in labels_data}
except Exception as e:
st.error(f"Erreur lors de la récupération des labels existants : {e}")
return {}
def creer_ticket_gitea(titre, corps, labels):
headers = {
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json"
}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/issues"
data = {
"title": titre,
"body": corps,
"labels": labels,
"ref": f"refs/heads/{ENV}"
}
try:
response = requests.post(url, headers=headers, data=json.dumps(data), timeout=10)
response.raise_for_status()
issue = response.json()
issue_url = issue.get("html_url", "")
if issue_url:
st.success(f"✅ Ticket créé avec succès ! [Voir le ticket]({issue_url})")
else:
st.success("✅ Ticket créé avec succès !")
except Exception as e:
st.error(f"❌ Erreur lors de la création du ticket : {e}")
def charger_modele_ticket():
headers = {"Authorization": f"token {GITEA_TOKEN}"}
url = f"{GITEA_URL}/repos/{ORGANISATION}/{DEPOT_FICHES}/contents/.gitea/ISSUE_TEMPLATE/Contenu.md"
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
contenu_base64 = response.json().get("content", "")
contenu = base64.b64decode(contenu_base64).decode("utf-8")
return contenu
except Exception as e:
st.error(f"Erreur lors du chargement du modèle de ticket : {e}")
return ""
def gerer_tickets_fiche(fiche_selectionnee):
st.markdown("""
""", unsafe_allow_html=True)
st.markdown("### 🧾 Gestion des tickets pour cette fiche")
tickets = rechercher_tickets_gitea(fiche_selectionnee)
afficher_tickets_par_fiche(tickets)
formulaire_creation_ticket_dynamique(fiche_selectionnee)
# Modification de formulaire_creation_ticket_dynamique pour ajouter Annuler
def formulaire_creation_ticket_dynamique(fiche_selectionnee):
with st.expander("➕ Créer un nouveau ticket lié à cette fiche", expanded=False):
contenu_modele = charger_modele_ticket()
if not contenu_modele:
st.error("Impossible de charger le modèle de ticket.")
return
sections = {}
lignes = contenu_modele.splitlines()
titre_courant = None
contenu_section = []
for ligne in lignes:
if ligne.startswith("## ") and titre_courant:
sections[titre_courant] = "\n".join(contenu_section).strip()
titre_courant = ligne[3:].strip()
contenu_section = []
elif ligne.startswith("## "):
titre_courant = ligne[3:].strip()
contenu_section = []
elif titre_courant:
contenu_section.append(ligne)
if titre_courant and contenu_section:
sections[titre_courant] = "\n".join(contenu_section).strip()
reponses = {}
labels = []
selected_operations = []
correspondances = charger_fiches_et_labels()
cible = correspondances.get(fiche_selectionnee)
if cible:
if len(cible["operations"]) == 1:
labels.append(cible["operations"][0])
elif len(cible["operations"]) > 1:
selected_operations = st.multiselect("Labels opération à associer", cible["operations"], default=cible["operations"])
for section, aide in sections.items():
if "Type de contribution" in section:
options = re.findall(r"- \[.\] (.+)", aide)
clean_options = []
for opt in options:
base = opt.split(":")[0].strip()
if base not in clean_options:
clean_options.append(base)
if "Autre" not in clean_options:
clean_options.append("Autre")
type_contribution = st.radio("Type de contribution", clean_options)
if type_contribution == "Autre":
autre = st.text_input("Précisez le type de contribution")
reponses[section] = autre
else:
reponses[section] = type_contribution
elif "Fiche concernée" in section:
base_url = f"https://fabnum-git.peccini.fr/FabNum/Fiches/src/branch/{ENV}/Documents/"
url_fiche = f"{base_url}{fiche_selectionnee.replace(' ', '%20')}"
reponses[section] = url_fiche
st.text_input("Fiche concernée", value=url_fiche, disabled=True)
elif "Sujet de la proposition" in section:
reponses[section] = st.text_input(section, help=aide)
else:
reponses[section] = st.text_area(section, help=aide)
col1, col2 = st.columns(2)
with col1:
if st.button("Prévisualiser le ticket"):
st.session_state.previsualiser = True
with col2:
if st.button("Annuler"):
st.session_state.previsualiser = False
st.rerun()
if st.session_state.get("previsualiser", False):
st.subheader("Prévisualisation du ticket")
for section, texte in reponses.items():
st.markdown(f"#### {section}")
st.markdown(texte)
if st.button("Confirmer la création du ticket"):
if cible:
labels.append(cible["item"])
if selected_operations:
labels.extend(selected_operations)
labels = list(set([l.strip() for l in labels if l and l.strip()]))
titre_ticket = reponses.get("Sujet de la proposition", "").strip() or "Ticket FabNum"
labels_existants = get_labels_existants()
labels_ids = [labels_existants[l] for l in labels if l in labels_existants]
if "Backlog" in labels_existants:
labels_ids.append(labels_existants["Backlog"])
corps = ""
for section, texte in reponses.items():
corps += f"## {section}\n{texte}\n\n"
creer_ticket_gitea(titre=titre_ticket, corps=corps, labels=labels_ids)
st.success("Formulaire vidé après création du ticket.")