Résolue

APScheduler et multi threading.

# Librairie standard

Bonjour,

J'ai des soucis avec la librairie APScheduler. Voila le contexte :
J'ai un programme qui fait tourner 4 threads :
Un main thread et 3 fils :

  • ScheduledThread

  • SyncreplThread

  • RetryThread

Mon application via le Thread syncrepl écoute un serveur LDAP. LOrsque je perds la connexion au LDAP dans le cas d'un crash serveur je souhaite eeefctué les opérations suivantes :

  • kill les deux threads Syncrepl et Scheduled

  • Utiliser le thread retry pour check la disponibilité du serveur

  • Si le serveur est disponible, relancer les deux threads syncrepl et Scheduled.

J'arrive à faire toutes ces étapes mon soucis se situe au niveau de Scheduled qui est une classe qui va manipuler avec une variable d'instance un objet BlockingScheduler de la lib APScheduler.

APScheduler est une lib pour effectué des taches en décalé un peu comme des cron sous linux. Après chaque redémarrage de mon thread Scheduled ce dernier créer une nouvelle instance de BlockingScheduler qui démarre. Et à chaque fois qu'un traitement ordonancé s'effectue j'ai l'erreur suivante :

> Error submitting job "ScheduledTreatment._token_expiration (trigger: cron[month='', day='', day_of_week='', hour='', minute='*'], next run at: 2023-09-29 14:04:00 UTC)" to executor "default"
Traceback (most recent call last):
File "/srv/environment/lib/python3.9/site-packages/apscheduler/schedulers/base.py", line 978, in _process_jobs
executor.submit_job(job, run_times)
File "/srv/environment/lib/python3.9/site-packages/apscheduler/executors/base.py", line 71, in submit_job
self._do_submit_job(job, run_times)
File "/srv/environment/lib/python3.9/site-packages/apscheduler/executors/pool.py", line 28, in _do_submit_job
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
File "/usr/lib/python3.9/concurrent/futures/thread.py", line 163, in submit
raise RuntimeError('cannot schedule new futures after '

Voia les codes qui gérent cette partie :
Les threads :

class SyncreplThread(threading.Thread):
    def __init__(self, client: Syncrepl) -> None:
        super().__init__(name="SyncreplThread")
        self._client = client

    def run(self):
        self._client.run()


class ScheduledThread(threading.Thread):
    def __init__(
        self,
        scheduled_treatment: ScheduledTreatment,
        concrete_subject: ConcreteSubject,
    ) -> None:
        super().__init__(name="SchedulerThread")
        self._scheduled_treatment = scheduled_treatment
        self.concrete_subject = concrete_subject

    def run(self):
        while True:
            self._scheduled_treatment.run()
            self.concrete_subject.status_event.wait()
            if self.concrete_subject.status == "DOWN":
                self.scheduled_treatment.update(self.concrete_subject)
                self.concrete_subject.status_event.clear()


class RetryThread(threading.Thread):
    def __init__(
        self,
        client,
        scheduled_treatment: ScheduledObserver,
        concrete_subject: ConcreteSubject,
    ) -> None:
        super().__init__(name="RetryThread")
        self.client = client
        self.concrete_subject = concrete_subject
        self.scheduled_treatment = scheduled_treatment

    def run(self):
        while True:
            self.concrete_subject.status_event.wait()
            if self.concrete_subject.status == "DOWN":
                self.client.update(self.concrete_subject)
                # Réinitialisez l'événement après avoir traité DOWN
                self.concrete_subject.status_event.clear()

La fonction qui gèrent l'arret des threads :

def process_ldap_down(self):
        self._stop_all_threads()
        self._logger.info("Arret du thread de réplication")
        self._treatments.check_ldap_status()

   def _stop_all_threads(self) -> None:
        self._logger.debug("Un thread a terminé, arrêt de tous les threads")
        self._client.please_stop()
        if self._scheduled_treatment:
            self._scheduled_treatment.please_stop()

    def please_stop(self):
        self.scheduler.shutdown()

Self.scheduler est un objet BlockingScheduler appartenant à la classe Scheduled.

Le processus de redémarrage :

def process_ldap_up(self):
        self._scheduled_treatment = ScheduledTreatment(
                on_terminate=self._stop_all_threads,
                treatments=self._treatments,
                play_replay=self._play_replay,
                configuration=self._configuration,
                annuaire=self._annuaire,
            )
        self._client: Syncrepl = self._init_syncrepl_client()
        self._start_threaded(retry=False)

def _start_threaded(self, only_syncrepl: bool = False, retry = True) -> None:
        self._syncrepl_thread = SyncreplThread(client=self._client)

        if self._scheduled_treatment and not only_syncrepl:
            self._cron_thread = ScheduledThread(
                scheduled_treatment=self._scheduled_treatment,
                scheduled_observer=self.scheduled_observer,
                concrete_subject=self.subject,
            )
        if retry:
            self._retry_thread = RetryThread(
            client=self.client_observer,
            scheduled_treatment=self.scheduled_observer,
            concrete_subject=self.subject,
        )

        self._syncrepl_thread.start()
        if self._scheduled_treatment and not only_syncrepl:
            self._cron_thread.start()
        if retry:
            self._retry_thread.start()
        self._syncrepl_thread.join()

La méthode run de Scheduled :

def run(self):
        self.scheduler = BlockingScheduler()
        self._setup_all()
        self.scheduler.start()

Thibault houdon

Mentor

Salut Flavien !

Alors il y a quelques éléments que j'ai du mal à saisir car il ne me semble pas avoir tout le code, mais je pense comprendre la logique générale.

L'erreur semble être incomplète aussi mais d'après ce qui est indiqué il me semble que c'est un problème avec l'arrêt des threads.

Est-ce que les déconnexions sont fréquentes et les tâches assez longues ? Je pense que le problème vient du fait que tu essaies de relancer des threads alors que les anciens ne sont pas totalement arrêtés (l'erreur indique une impossibilité de lancer de nouveaux jobs, cannot schedule new futures after).

En allant voir dans la doc de APScheduler, tu as aussi la possibilité de mettre en pause les thread et les résumer, ça me semble une approche plus safe que de faire un shutdown pour tout relancer. C'est plus clean et en plus si le problème vient vraiment d'un processus qui n'est pas totalement arrêté, ça devrait régler le problème.

Dans la doc tu peux voir les méthodes disponibles pour mettre en pause et redémarrer :
https://apscheduler.readthedocs.io/en/3.x/modules/schedulers/base.html?highlight=Scheduled#apscheduler.schedulers.base.BaseScheduler.pause

Voilà la piste que j'explorerais perso :)

Salut !

Pour le coup l"erreur est bien complète. Je me suis posé la question pour savoir si j'ai bien arreté les threads comme il faut mais je ne vois pas comment faire.

J'ai testé ce bout de code dans le debugueur de VS code :

import threading

threading.active_count()

Ce qui me donne 1 qui est le résultat attendu. Je ne comprends pas pourquoi ils ne s'arreteraient pas correctement surtout que j'utilise des méthodes des librairies, ce ne sont pas des méthodes customs.

Néanmoins je vais explorer ta piste et voir ce que cela donne.

Malheuresement cela ne résoud pas le problème. J'ai l'impression que l'arret d'un seul thread peu importe lequel provoque un dysfonctionnement d'APScheduler.

Thibault houdon

Mentor

Juste pour être sûr, tu mettais en pause le scheduler lui-même et non pas seulement une tâche spécifique, n'est-ce pas ?

D'après ce que j'ai compris, l'erreur "RuntimeError: cannot schedule new futures after" est levée quand tu essayes d'envoyer un job à un executor qui a été arrêté, du coup je suis surpris qu'avec la mise en pause tu aies le même problème, je m'attendais au minimum à une autre erreur qui nous aurait permis d'avancer 😅

Autre piste : le problème vient peut-être de la façon dont le BlockingScheduler est recréé.

Essaie de vérifier s'il ne récupère / relance pas un ancien scheduler qui est en cours d'arrêt, je pense que tu dois pouvoir vérifier l'ID du scheduler avec l'API pour vérifier qu'il est différent (https://apscheduler.readthedocs.io/en/3.x/modules/job.html?highlight=identifier#apscheduler.job.Job).

Si ce n'est pas le cas, ça pourrait expliquer pourquoi le nouveau ne peut pas être lancé (si le système pense que l'ancien est toujours en cours d'exécution).

Inscris-toi

(c'est gratuit !)

Inscris-toi

Tu dois créer un compte pour participer aux discussions.

Créer un compte

Rechercher sur le site

Formulaire de contact

Inscris-toi à Docstring

Pour commencer ton apprentissage.

Tu as déjà un compte ? Connecte-toi.