Comment nos services communiquent entre eux : les files de messages expliquées
Quand deux services doivent communiquer sans se bloquer mutuellement, vous avez besoin d'une file de messages. RabbitMQ découple nos services, gère les retries, et fait le pont entre nos microservices Node.js et Python — voici comment ça fonctionne et pourquoi nous l'avons choisi.
Lorsque vous avez besoin que NestJS communique avec des services Python, HTTP ajoute de la latence et du couplage. RabbitMQ fournit une messagerie asynchrone, indépendante du langage, avec une fiabilité intégrée. Ce guide couvre l'implémentation complète du pattern RPC pour les microservices NestJS et Python.
Installation
NestJS
npm install @nestjs/microservices amqplib amqp-connection-manager
Python
pip install aio-pika
Qu'est-ce que RabbitMQ ?
RabbitMQ est un broker de messages open source qui permet aux applications de communiquer de manière asynchrone. Au lieu que les services s'appellent directement, ils envoient des messages via RabbitMQ, qui gère le routage, la mise en file d'attente et la livraison.
Concepts fondamentaux
FLUX DE MESSAGES
Producteur ──▶ Exchange ──▶ Liaison ──▶ File ──▶ Consommateur
│ │
│ routing_key │
└────────────────────────┘
- Producteur : Application qui envoie des messages
- Exchange : Reçoit les messages et les route vers les files selon des règles
- Liaison : Lien entre l'exchange et la file avec des règles de routage
- File : Tampon qui stocke les messages jusqu'à leur consommation
- Consommateur : Application qui reçoit et traite les messages
Types d'exchanges
- Direct : Route vers les files où la clé de routage correspond exactement à la clé de liaison
- Fanout : Diffuse vers toutes les files liées, ignore la clé de routage
- Topic : Correspondance de patterns avec des caractères génériques (* pour un mot, # pour zéro ou plusieurs)
- Headers : Route en fonction des attributs d'en-tête du message
Cas d'utilisation
- Découplage de services : Les services communiquent sans se connaître
- Équilibrage de charge : Distribue les tâches entre plusieurs workers
- Traitement asynchrone : Déleste les tâches lourdes (e-mails, rapports, traitement d'images)
- Communication inter-langages : Connecte des services écrits dans différents langages
- RPC : Pattern requête-réponse utilisant des identifiants de corrélation et des files de réponse
Vue d'ensemble de l'architecture
RabbitMQ joue le rôle de broker de messages pour les services Python, tandis que TCP gère la communication NestJS-vers-NestJS. Le pattern RPC permet la communication requête-réponse entre différents langages.
ARCHITECTURE DE MICROSERVICES
┌─────────────────┐
│ API Gateway │
│ (NestJS) │
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ TCP │ TCP │ RMQ
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌─────────────┐
│ NestJS │ │ NestJS │ │ Python │
│ Service A │ │ Service B │ │ Service │
└─────┬─────┘ └───────────┘ └─────────────┘
│ RMQ ▲
│ │
└──────────┐ ┌──────────────┘
▼ │
┌─────────────────┐
│ RabbitMQ │
│ Message Broker │
└─────────────────┘
aio-pika : RabbitMQ asynchrone pour Python
aio-pika est un wrapper asynchrone autour d'aiormq qui fournit une interface Pythonique pour RabbitMQ. Contrairement à la bibliothèque standard pika qui est synchrone, aio-pika est conçu pour asyncio, permettant une gestion de messages à haute concurrence.
Fonctionnalités clés
- asyncio natif : Opérations non bloquantes pour une haute concurrence
- Reconnexion automatique :
connect_robust()récupère les files, exchanges et consommateurs après une déconnexion - Confirmations d'éditeur : Valide l'acceptation des messages par le broker
- Gestionnaires de contexte :
async with message.process()pour une gestion sûre des messages - Annotations de type complètes : Meilleur support IDE et sécurité du code
Méthodes principales
- connect_robust() : Connexion avec reconnexion automatique
- channel.declare_queue() : Créer ou obtenir une file
- queue.consume() : Commencer à recevoir des messages
- exchange.publish() : Envoyer des messages
- message.ack() / message.reject() : Acquitter ou rejeter des messages
Configuration NestJS
Enregistrez les transports TCP (pour les services NestJS) et RMQ (pour les services Python) dans le même module.
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { ClientsModule, Transport } from "@nestjs/microservices";
@Module({
imports: [
ConfigModule.forRoot(),
ClientsModule.register([
{
name: "PYTHON_SERVICE",
transport: Transport.RMQ,
options: {
urls: [
`amqp://${process.env.RABBITMQ_USER}:${process.env.RABBITMQ_PASSWORD}@${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}`,
],
queue: "python-service-queue",
queueOptions: {
durable: true,
},
},
},
{
name: "NESTJS_SERVICE",
transport: Transport.TCP,
options: {
host: process.env.NESTJS_SERVICE_HOST,
port: 3003,
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Envoi de messages vers Python
import { Injectable, Inject } from "@nestjs/common";
import { ClientProxy } from "@nestjs/microservices";
import { firstValueFrom, timeout } from "rxjs";
@Injectable()
export class AppService {
constructor(
@Inject("PYTHON_SERVICE") private pythonClient: ClientProxy,
) {}
async analyzeData(data: AnalyzeDto) {
const result = await firstValueFrom(
this.pythonClient.send({ cmd: "analyze" }, data).pipe(
timeout(30000),
),
);
if (result.error) {
throw new RpcException(result.message);
}
return result;
}
}
Connexion Python à RabbitMQ
Un gestionnaire de connexion singleton assure une utilisation efficace des ressources avec une reconnexion automatique.
import aio_pika
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
_connection = None
_channel = None
async def get_connection(self):
if self._connection and not self._connection.is_closed:
return self._connection
self._connection = await aio_pika.connect_robust(
f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/",
heartbeat=30
)
return self._connection
async def get_channel(self):
if self._channel and not self._channel.is_closed:
return self._channel
connection = await self.get_connection()
self._channel = await connection.channel()
return self._channel
async def close(self):
if self._channel:
await self._channel.close()
if self._connection:
await self._connection.close()
connection_manager = ConnectionManager()
Consommateur RPC Python
Le consommateur écoute les messages, les traite et renvoie les réponses en utilisant la file reply_to et le correlation_id.
import asyncio
import json
import logging
import aio_pika
from .connection import connection_manager
logger = logging.getLogger(__name__)
class RpcConsumer:
def __init__(self, queue_name):
self.queue_name = queue_name
async def process_message(self, body):
raise NotImplementedError
async def on_request(self, message: aio_pika.IncomingMessage):
async with message.process():
try:
request_body = json.loads(message.body)
response = await self.process_message(request_body)
except json.JSONDecodeError:
logger.error("Failed to decode message body.")
response = {"error": "invalid_request", "message": "Invalid JSON format."}
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
response = {"error": "internal_server_error", "message": str(e)}
try:
channel = await connection_manager.get_channel()
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(response).encode(),
correlation_id=message.correlation_id,
content_type="application/json"
),
routing_key=message.reply_to,
)
except Exception as e:
logger.error(f"Failed to publish response: {e}")
async def start_consuming(self):
channel = await connection_manager.get_channel()
queue = await channel.declare_queue(self.queue_name, durable=True)
await channel.set_qos(prefetch_count=1)
logger.info(f"Waiting for messages on queue '{self.queue_name}'")
try:
await queue.consume(self.on_request)
await asyncio.Future()
except asyncio.CancelledError:
logger.info("Consumer task cancelled.")
finally:
await connection_manager.close()
Implémentation d'un consommateur
class AnalyzeConsumer(RpcConsumer):
def __init__(self):
super().__init__("python-service-queue")
async def process_message(self, body):
pattern = body.get("pattern", {})
data = body.get("data", {})
if pattern.get("cmd") == "analyze":
result = await self.analyze_service.process(data)
return {"status": "success", "data": result}
return {"error": "unknown_pattern", "message": f"Unknown pattern: {pattern}"}
Démarrage du consommateur
import asyncio
async def main():
consumer = AnalyzeConsumer()
await consumer.start_consuming()
if __name__ == "__main__":
asyncio.run(main())
Producteur RPC Python
Lorsque Python a besoin d'appeler d'autres services, le producteur envoie des requêtes et attend les réponses en utilisant des identifiants de corrélation.
import asyncio
import uuid
import json
import logging
import aio_pika
from .connection import connection_manager
logger = logging.getLogger(__name__)
class RpcProducer:
def __init__(self):
self.futures = {}
async def on_response(self, message: aio_pika.IncomingMessage):
async with message.process():
correlation_id = message.correlation_id
if correlation_id in self.futures:
future = self.futures.pop(correlation_id)
future.set_result(message.body)
async def call(self, queue_name, message_data, timeout=10):
channel = await connection_manager.get_channel()
if not channel:
raise ConnectionError("Could not establish channel to RabbitMQ.")
correlation_id = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
self.futures[correlation_id] = future
callback_queue = await channel.declare_queue(exclusive=True)
await callback_queue.consume(self.on_response)
try:
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(message_data).encode(),
content_type="application/json",
correlation_id=correlation_id,
reply_to=callback_queue.name,
),
routing_key=queue_name,
)
response_body = await asyncio.wait_for(future, timeout=timeout)
return json.loads(response_body)
except asyncio.TimeoutError:
logger.warning(f"Request to queue '{queue_name}' timed out.")
if correlation_id in self.futures:
self.futures.pop(correlation_id)
return {"error": "timeout", "message": "The request timed out."}
except Exception as e:
logger.error(f"An unexpected error occurred during RPC call: {e}")
if correlation_id in self.futures:
self.futures.pop(correlation_id)
raise
Format des messages NestJS
NestJS envoie les messages dans un format spécifique que Python doit analyser.
{
"pattern": { "cmd": "analyze" },
"data": { "text": "content to analyze" },
"id": "unique-message-id"
}
Le consommateur Python extrait pattern pour router la requête et data comme charge utile.
Le pattern RPC expliqué
FLUX DE MESSAGES RPC
┌──────────┐ ┌──────────┐ ┌──────────┐
│ NestJS │ │ RabbitMQ │ │ Python │
│ Client │ │ Broker │ │ Consumer │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ 1. Envoi du message │ │
│ (correlation_id, reply_to) │ │
│────────────────────────────▶│ │
│ │ 2. Livraison dans la file │
│ │────────────────────────────▶│
│ │ │
│ │ │ 3. Traitement
│ │ │
│ │ 4. Réponse vers reply_to │
│ │◀────────────────────────────│
│ 5. Correspondance corr. ID │ │
│◀────────────────────────────│ │
│ │ │
Exécution de RabbitMQ
Développement local avec Docker Compose
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
Le port 5672 est le port du protocole AMQP pour la communication des messages. Le port 15672 expose l'interface de gestion à l'adresse http://localhost:15672.
Déploiement sur Azure Container Apps
Déployez RabbitMQ sur Azure Container Apps avec une ingress interne pour une communication sécurisée entre microservices.
az login --service-principal \
--username $AZURE_SERVICE_PRINCIPAL_ID \
--password $AZURE_SERVICE_PRINCIPAL_SECRET \
--tenant $AZURE_TENANT_ID
az acr login --name $AZURE_REGISTRY_NAME
docker tag rabbitmq:3-management $AZURE_CONTAINER_REGISTRY/rabbitmq:3-management
docker push $AZURE_CONTAINER_REGISTRY/rabbitmq:3-management
az containerapp create \
--name rabbitmq \
--resource-group $RESOURCE_GROUP \
--environment $CONTAINER_APP_ENV \
--image $AZURE_CONTAINER_REGISTRY/rabbitmq:3-management \
--target-port 5672 \
--ingress 'internal' \
--transport 'tcp' \
--min-replicas 1 \
--max-replicas 10 \
--registry-server $AZURE_CONTAINER_REGISTRY \
--registry-username $AZURE_REGISTRY_USERNAME \
--registry-password $AZURE_REGISTRY_PASSWORD \
--env-vars \
"RABBITMQ_DEFAULT_USER=secretref:rabbitmq-user" \
"RABBITMQ_DEFAULT_PASS=secretref:rabbitmq-password"
Configuration clé :
- ingress: internal - Accessible uniquement au sein de l'environnement Container Apps
- transport: tcp - Requis pour le protocole AMQP (pas HTTP)
- secretref - Référence les secrets stockés dans Azure Container Apps
Conclusion
RabbitMQ résout le défi fondamental de la communication entre microservices de différents langages. En découplant les services via des files de messages, vous gagnez en fiabilité, évolutivité et flexibilité que les appels HTTP directs ne peuvent pas offrir.
Le pattern RPC avec les identifiants de corrélation et les files de réponse permet la communication requête-réponse tout en conservant les avantages de la messagerie asynchrone. Combiné à une gestion appropriée des erreurs et à la gestion des connexions, cette architecture gère les charges de travail en production de manière fiable.
Que vous connectiez NestJS à Python, traitiez des tâches en arrière-plan, ou construisiez des systèmes orientés événements, RabbitMQ fournit les fondations pour une communication robuste entre microservices.
Écrit par

Tech Lead et Ingénieur Full Stack pilotant une équipe de 5 ingénieurs chez Fygurs (Paris, Remote) sur un SaaS cloud-native Azure. Diplômé de 1337 Coding School (42 Network / UM6P). Écrit sur l'architecture, l'infrastructure cloud et le leadership technique.