Traiter plusieurs tâches en parallèle : Python multiprocessing

Le module multiprocessing permet d'exécuter plusieurs processus en même temps, permettant d'exploiter les processeurs multicoeur. Les processus ont leur propre espace mémoire, contrairement au module threading où les threads partagent le même espace mémoire.

multiprocessing est efficace pour les tâches qui nécessitent beaucoup de calculs (CPU-bound) comme les calculs complexes ou les algorithmes de traitement d'images et vidéos. Le module permet de contourner le GIL et d'exécuter du code en parallèle.

Le programme ne se terminera qu'une fois que les processus sont terminés.

Création des processus

Il y a deux façons principales de créer des processus.

En passant une fonction à la classe Process :

import multiprocessing


def tache_intensive(nom) -> None:
    print(f"Début de {nom}")
    # Simulation d'un calcul intensif
    resultat = 0
    for i in range(5000000):
        resultat += i
    print(f"Fin de {nom}, résultat: {resultat}")


if __name__ == "__main__":

    # Création de 2 processus
    process1 = multiprocessing.Process(target=tache_intensive, args=("Tâche 1",))
    process2 = multiprocessing.Process(target=tache_intensive, args=("Tâche 2",))

Ou en héritant de Process :

import multiprocessing


class MonProcessus(multiprocessing.Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"Début de {self.name}")
        # Simulation d'un calcul intensif
        resultat = 0
        for i in range(500000):
            resultat += i
        print(f"Fin de {self.name}, résultat: {resultat}")


if __name__ == "__main__":

    # Création de 2 processus
    process1 = MonProcessus("Tâche 1")
    process2 = MonProcessus("Tâche 2")

Exécuter les processus

Pour exécuter les processus, on utilise les méthodes start et join :

import multiprocessing


class MonProcessus(multiprocessing.Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f"Début de {self.name}")
        # Simulation d'un calcul intensif
        resultat = 0
        for i in range(500000):
            resultat += i
        print(f"Fin de {self.name}, résultat: {resultat}")


if __name__ == "__main__":

    # Création de 2 processus
    process1 = MonProcessus("Tâche 1")
    process2 = MonProcessus("Tâche 2")

    # Démarrage des processus
    print("Démarrage des processus")
    process1.start()
    process2.start()

    # Attendre la fin des processus
    process1.join()
    process2.join()
    print("Les processus sont terminés")

Attention

if __name__ == "__main__": est obligatoire lorsqu'on utilise le module multiprocessing pour éviter des boucles infinies, car sur les systèmes qui utilisent le mode spawn, chaque processus enfant réexécute le script en entier lors de son démarrage.

La méthode start permet de lancer l'exécution du processus en parallèle, dans un processus qui lui est propre.

La méthode join permet de synchroniser les processus. Sans cette méthode, le programme principal pourrait se terminer alors que des processus sont en cours d'exécution.

Communication entre processus

Bien que les processus soient isolés les uns des autres, multiprocessing propose des mécanismes pour qu'ils puissent communiquer entre eux.

La classe Queue

La classe Queue fonctionne comme une file d'attente. Elle permet d'échanger des données entre processus.

import multiprocessing

def producteur(queue):
    """Processus qui produit des données et les met dans la queue."""
    for i in range(3):
        message = f"Message {i}"
        queue.put(message)
        print(f"Producteur: envoi de '{message}'")

    # Envoi d'un signal de fin
    queue.put(None)
    print("Producteur: travail terminé")

def consommateur(queue):
    """Processus qui consomme les données de la queue."""
    while True:
        message = queue.get()

        # Vérification du signal de fin
        if message is None:
            print("Consommateur: signal de fin reçu")
            break

        print(f"Consommateur: réception de '{message}'")

if __name__ == "__main__":
    # Création d'une queue partagée
    queue = multiprocessing.Queue()

    # Création des processus
    proc_producteur = multiprocessing.Process(target=producteur, args=(queue,))
    proc_consommateur = multiprocessing.Process(target=consommateur, args=(queue,))

    # Démarrage des processus
    proc_producteur.start()
    proc_consommateur.start()

    # Attente de la fin des processus
    proc_producteur.join()
    proc_consommateur.join()

    print("Programme principal: tous les processus sont terminés")

Les deux processus communiquent entre eux par l'intermédiaire de l'instance de Queue. La méthode put permet d'insérer un élément dans la file d'attente, tandis que get permet de les récupérer.

Pourquoi faire queue.put(None) ?

L'ajout de None permet de dire : "j'arrête d'insérer des informations". Sans ce signal de fin, le consommateur resterait bloqué sur queue.get() en attendant les données.

La classe Pipe

Il existe un autre mécanisme de communication : la classe Pipe.

Pipe renvoie une paire d'objets Connection représentant les deux extrémités qui communiquent.

import multiprocessing


def producteur(conn):
    """Processus qui produit des données et les envoie via Pipe."""
    for i in range(3):
        message = f"Message {i}"
        conn.send(message)
        print(f"Producteur: envoi de '{message}'")

    # Fermeture de la connexion pour signaler la fin
    conn.close()
    print("Producteur: travail terminé")

def consommateur(conn):
    """Processus qui consomme les données reçues via Pipe."""
    # Continue tant que la connexion n'est pas fermée
    while conn.poll(timeout=1.0):  # Timeout d'une seconde
        message = conn.recv()
        print(f"Consommateur: réception de '{message}'")

    print("Consommateur: fin de la communication")

if __name__ == "__main__":
    # Création d'un pipe (deux extrémités de connexion)
    conn_producteur, conn_consommateur = multiprocessing.Pipe()

    # Création des processus
    proc_producteur = multiprocessing.Process(target=producteur, args=(conn_producteur,))
    proc_consommateur = multiprocessing.Process(target=consommateur, args=(conn_consommateur,))

    # Démarrage des processus
    proc_producteur.start()
    proc_consommateur.start()

    # Attente de la fin des processus
    proc_producteur.join()
    proc_consommateur.join()

    print("Programme principal: tous les processus sont terminés")

Le producteur envoie des messages avec la méthode send, tandis que le consommateur les reçoit avec la méthode recv. La méthode poll vérifie si des données sont disponibles. Pour signaler la fin de la communication, on utilise la méthode close.

À noter

Pipe est limité à une communication entre deux points, alors que Queue permet à plus de deux processus de communiquer.

Synchronisation

Les processus peuvent accéder à des ressources partagées. Pour éviter les problèmes de concurrence, la classe Lock permet de protéger l'accès.

import multiprocessing


def modifier_ressource(lock, compteur, id_processus):
    # Acquisition du verrou
    with lock:
        # Section critique - un seul processus à la fois
        valeur = compteur.value
        compteur.value = valeur + 1
        print(f"Processus {id_processus}: compteur = {compteur.value}")

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    compteur = multiprocessing.Value('i', 0)  # Valeur partagée

    processus = []
    for i in range(3):
        p = multiprocessing.Process(target=modifier_ressource, 
                                   args=(lock, compteur, i))
        processus.append(p)
        p.start()

    for p in processus:
        p.join()

    print(f"Valeur finale: {compteur.value}")

Lock garantit qu'un seul processus exécute le code protégé à la fois.
Avec compteur = multiprocessing.Value('i', 0), le premier argument est le type de données ('i' comme integer), alors que le deuxième argument est la valeur initiale.

Quand utiliser multiprocessing ?

  • multiprocessing est utilisé pour les calculs intensifs

  • threading pour les opérations I/O avec des bibliothèques non compatibles async

  • async pour les opérations I/O avec des bibliothèques compatibles

Bravo, tu es prêt à passer à la suite

Rechercher sur le site

Formulaire de contact

Inscris-toi à Docstring

Pour commencer ton apprentissage.

Tu as déjà un compte ? Connecte-toi.