Le module multiprocessing permet d'exécuter plusieurs processus en même temps, permettant d'exploiter les processeurs multicoeur. Les processus ont leur propre espace mémoire, contrairement au module threading où les threads partagent le même espace mémoire.
multiprocessing est efficace pour les tâches qui nécessitent beaucoup de calculs (CPU-bound) comme les calculs complexes ou les algorithmes de traitement d'images et vidéos. Le module permet de contourner le GIL et d'exécuter du code en parallèle.
Le programme ne se terminera qu'une fois que les processus sont terminés.
Création des processus
Il y a deux façons principales de créer des processus.
En passant une fonction à la classe Process :
import multiprocessing def tache_intensive(nom) -> None: print(f"Début de {nom}") # Simulation d'un calcul intensif resultat = 0 for i in range(5000000): resultat += i print(f"Fin de {nom}, résultat: {resultat}") if __name__ == "__main__": # Création de 2 processus process1 = multiprocessing.Process(target=tache_intensive, args=("Tâche 1",)) process2 = multiprocessing.Process(target=tache_intensive, args=("Tâche 2",))
Ou en héritant de Process :
import multiprocessing class MonProcessus(multiprocessing.Process): def __init__(self, name): super().__init__() self.name = name def run(self): print(f"Début de {self.name}") # Simulation d'un calcul intensif resultat = 0 for i in range(500000): resultat += i print(f"Fin de {self.name}, résultat: {resultat}") if __name__ == "__main__": # Création de 2 processus process1 = MonProcessus("Tâche 1") process2 = MonProcessus("Tâche 2")
Exécuter les processus
Pour exécuter les processus, on utilise les méthodes start et join :
import multiprocessing class MonProcessus(multiprocessing.Process): def __init__(self, name): super().__init__() self.name = name def run(self): print(f"Début de {self.name}") # Simulation d'un calcul intensif resultat = 0 for i in range(500000): resultat += i print(f"Fin de {self.name}, résultat: {resultat}") if __name__ == "__main__": # Création de 2 processus process1 = MonProcessus("Tâche 1") process2 = MonProcessus("Tâche 2") # Démarrage des processus print("Démarrage des processus") process1.start() process2.start() # Attendre la fin des processus process1.join() process2.join() print("Les processus sont terminés")
Attention
if __name__ == "__main__": est obligatoire lorsqu'on utilise le module multiprocessing pour éviter des boucles infinies, car sur les systèmes qui utilisent le mode spawn, chaque processus enfant réexécute le script en entier lors de son démarrage.
La méthode start permet de lancer l'exécution du processus en parallèle, dans un processus qui lui est propre.
La méthode join permet de synchroniser les processus. Sans cette méthode, le programme principal pourrait se terminer alors que des processus sont en cours d'exécution.
Communication entre processus
Bien que les processus soient isolés les uns des autres, multiprocessing propose des mécanismes pour qu'ils puissent communiquer entre eux.
La classe Queue
La classe Queue fonctionne comme une file d'attente. Elle permet d'échanger des données entre processus.
import multiprocessing def producteur(queue): """Processus qui produit des données et les met dans la queue.""" for i in range(3): message = f"Message {i}" queue.put(message) print(f"Producteur: envoi de '{message}'") # Envoi d'un signal de fin queue.put(None) print("Producteur: travail terminé") def consommateur(queue): """Processus qui consomme les données de la queue.""" while True: message = queue.get() # Vérification du signal de fin if message is None: print("Consommateur: signal de fin reçu") break print(f"Consommateur: réception de '{message}'") if __name__ == "__main__": # Création d'une queue partagée queue = multiprocessing.Queue() # Création des processus proc_producteur = multiprocessing.Process(target=producteur, args=(queue,)) proc_consommateur = multiprocessing.Process(target=consommateur, args=(queue,)) # Démarrage des processus proc_producteur.start() proc_consommateur.start() # Attente de la fin des processus proc_producteur.join() proc_consommateur.join() print("Programme principal: tous les processus sont terminés")
Les deux processus communiquent entre eux par l'intermédiaire de l'instance de Queue. La méthode put permet d'insérer un élément dans la file d'attente, tandis que get permet de les récupérer.
Pourquoi faire queue.put(None) ?
L'ajout de None permet de dire : "j'arrête d'insérer des informations". Sans ce signal de fin, le consommateur resterait bloqué sur queue.get() en attendant les données.
La classe Pipe
Il existe un autre mécanisme de communication : la classe Pipe.
Pipe renvoie une paire d'objets Connection représentant les deux extrémités qui communiquent.
import multiprocessing def producteur(conn): """Processus qui produit des données et les envoie via Pipe.""" for i in range(3): message = f"Message {i}" conn.send(message) print(f"Producteur: envoi de '{message}'") # Fermeture de la connexion pour signaler la fin conn.close() print("Producteur: travail terminé") def consommateur(conn): """Processus qui consomme les données reçues via Pipe.""" # Continue tant que la connexion n'est pas fermée while conn.poll(timeout=1.0): # Timeout d'une seconde message = conn.recv() print(f"Consommateur: réception de '{message}'") print("Consommateur: fin de la communication") if __name__ == "__main__": # Création d'un pipe (deux extrémités de connexion) conn_producteur, conn_consommateur = multiprocessing.Pipe() # Création des processus proc_producteur = multiprocessing.Process(target=producteur, args=(conn_producteur,)) proc_consommateur = multiprocessing.Process(target=consommateur, args=(conn_consommateur,)) # Démarrage des processus proc_producteur.start() proc_consommateur.start() # Attente de la fin des processus proc_producteur.join() proc_consommateur.join() print("Programme principal: tous les processus sont terminés")
Le producteur envoie des messages avec la méthode send, tandis que le consommateur les reçoit avec la méthode recv. La méthode poll vérifie si des données sont disponibles. Pour signaler la fin de la communication, on utilise la méthode close.
À noter
Pipe est limité à une communication entre deux points, alors que Queue permet à plus de deux processus de communiquer.
Synchronisation
Les processus peuvent accéder à des ressources partagées. Pour éviter les problèmes de concurrence, la classe Lock permet de protéger l'accès.
import multiprocessing def modifier_ressource(lock, compteur, id_processus): # Acquisition du verrou with lock: # Section critique - un seul processus à la fois valeur = compteur.value compteur.value = valeur + 1 print(f"Processus {id_processus}: compteur = {compteur.value}") if __name__ == "__main__": lock = multiprocessing.Lock() compteur = multiprocessing.Value('i', 0) # Valeur partagée processus = [] for i in range(3): p = multiprocessing.Process(target=modifier_ressource, args=(lock, compteur, i)) processus.append(p) p.start() for p in processus: p.join() print(f"Valeur finale: {compteur.value}")
Lock garantit qu'un seul processus exécute le code protégé à la fois.
Avec compteur = multiprocessing.Value('i', 0), le premier argument est le type de données ('i' comme integer), alors que le deuxième argument est la valeur initiale.
Quand utiliser multiprocessing ?
-
multiprocessingest utilisé pour les calculs intensifs -
threadingpour les opérations I/O avec des bibliothèques non compatiblesasync -
asyncpour les opérations I/O avec des bibliothèques compatibles