How Our Services Talk to Each Other: Message Queues Explained
When two services need to communicate without blocking each other, you need a message queue. RabbitMQ decouples our services, handles retries, and bridges our Node.js and Python microservices — here's how it works and why we chose it.
When you need NestJS to communicate with Python services, HTTP adds latency and coupling. RabbitMQ provides asynchronous, language-agnostic messaging with built-in reliability. This guide covers the complete RPC pattern implementation for NestJS and Python microservices.
Installation
NestJS
npm install @nestjs/microservices amqplib amqp-connection-manager
Python
pip install aio-pika
What is RabbitMQ?
RabbitMQ is an open source message broker that enables applications to communicate asynchronously. Instead of services calling each other directly, they send messages through RabbitMQ, which handles routing, queuing, and delivery.
Core Concepts
MESSAGE FLOW
Producer ──▶ Exchange ──▶ Binding ──▶ Queue ──▶ Consumer
│ │
│ routing_key │
└───────────────────────┘
- Producer: Application that sends messages
- Exchange: Receives messages and routes them to queues based on rules
- Binding: Link between exchange and queue with routing rules
- Queue: Buffer that stores messages until consumed
- Consumer: Application that receives and processes messages
Exchange Types
- Direct: Routes to queues where routing key exactly matches binding key
- Fanout: Broadcasts to all bound queues, ignores routing key
- Topic: Pattern matching with wildcards (* for one word, # for zero or more)
- Headers: Routes based on message header attributes
Use Cases
- Service decoupling: Services communicate without knowing about each other
- Load balancing: Distribute tasks among multiple workers
- Async processing: Offload heavy tasks (email, reports, image processing)
- Cross-language communication: Connect services written in different languages
- RPC: Request-response pattern using correlation IDs and reply queues
Architecture Overview
RabbitMQ acts as the message broker for Python services, while TCP handles NestJS-to-NestJS communication. The RPC pattern enables request-response across languages.
MICROSERVICES ARCHITECTURE
┌─────────────────┐
│ API Gateway │
│ (NestJS) │
└────────┬────────┘
│
┌──────────────┼──────────────┐
│ TCP │ TCP │ RMQ
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌─────────────┐
│ NestJS │ │ NestJS │ │ Python │
│ Service A │ │ Service B │ │ Service │
└─────┬─────┘ └───────────┘ └─────────────┘
│ RMQ ▲
│ │
└──────────┐ ┌──────────────┘
▼ │
┌─────────────────┐
│ RabbitMQ │
│ Message Broker │
└─────────────────┘
aio-pika: Async RabbitMQ for Python
aio-pika is an async wrapper around aiormq that provides a Pythonic interface for RabbitMQ. Unlike the standard pika library which is synchronous, aio-pika is built for asyncio, enabling high-concurrency message handling.
Key Features
- Native asyncio: Non-blocking operations for high concurrency
- Auto-reconnection:
connect_robust()recovers queues, exchanges, and consumers after disconnection - Publisher confirms: Validate message acceptance by the broker
- Context managers:
async with message.process()for safe message handling - Full type hints: Better IDE support and code safety
Core Methods
- connect_robust(): Auto-reconnecting connection
- channel.declare_queue(): Create or get a queue
- queue.consume(): Start receiving messages
- exchange.publish(): Send messages
- message.ack() / message.reject(): Acknowledge or reject messages
NestJS Configuration
Register both TCP (for NestJS services) and RMQ (for Python services) transports in the same module.
import { Module } from "@nestjs/common";
import { ConfigModule } from "@nestjs/config";
import { ClientsModule, Transport } from "@nestjs/microservices";
@Module({
imports: [
ConfigModule.forRoot(),
ClientsModule.register([
{
name: "PYTHON_SERVICE",
transport: Transport.RMQ,
options: {
urls: [
`amqp://${process.env.RABBITMQ_USER}:${process.env.RABBITMQ_PASSWORD}@${process.env.RABBITMQ_HOST}:${process.env.RABBITMQ_PORT}`,
],
queue: "python-service-queue",
queueOptions: {
durable: true,
},
},
},
{
name: "NESTJS_SERVICE",
transport: Transport.TCP,
options: {
host: process.env.NESTJS_SERVICE_HOST,
port: 3003,
},
},
]),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
Sending Messages to Python
import { Injectable, Inject } from "@nestjs/common";
import { ClientProxy } from "@nestjs/microservices";
import { firstValueFrom, timeout } from "rxjs";
@Injectable()
export class AppService {
constructor(
@Inject("PYTHON_SERVICE") private pythonClient: ClientProxy,
) {}
async analyzeData(data: AnalyzeDto) {
const result = await firstValueFrom(
this.pythonClient.send({ cmd: "analyze" }, data).pipe(
timeout(30000),
),
);
if (result.error) {
throw new RpcException(result.message);
}
return result;
}
}
Python RabbitMQ Connection
A singleton connection manager ensures efficient resource usage with automatic reconnection.
import aio_pika
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
_connection = None
_channel = None
async def get_connection(self):
if self._connection and not self._connection.is_closed:
return self._connection
self._connection = await aio_pika.connect_robust(
f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/",
heartbeat=30
)
return self._connection
async def get_channel(self):
if self._channel and not self._channel.is_closed:
return self._channel
connection = await self.get_connection()
self._channel = await connection.channel()
return self._channel
async def close(self):
if self._channel:
await self._channel.close()
if self._connection:
await self._connection.close()
connection_manager = ConnectionManager()
Python RPC Consumer
The consumer listens for messages, processes them, and sends responses back using the reply_to queue and correlation_id.
import asyncio
import json
import logging
import aio_pika
from .connection import connection_manager
logger = logging.getLogger(__name__)
class RpcConsumer:
def __init__(self, queue_name):
self.queue_name = queue_name
async def process_message(self, body):
raise NotImplementedError
async def on_request(self, message: aio_pika.IncomingMessage):
async with message.process():
try:
request_body = json.loads(message.body)
response = await self.process_message(request_body)
except json.JSONDecodeError:
logger.error("Failed to decode message body.")
response = {"error": "invalid_request", "message": "Invalid JSON format."}
except Exception as e:
logger.error(f"Error processing message: {e}", exc_info=True)
response = {"error": "internal_server_error", "message": str(e)}
try:
channel = await connection_manager.get_channel()
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(response).encode(),
correlation_id=message.correlation_id,
content_type="application/json"
),
routing_key=message.reply_to,
)
except Exception as e:
logger.error(f"Failed to publish response: {e}")
async def start_consuming(self):
channel = await connection_manager.get_channel()
queue = await channel.declare_queue(self.queue_name, durable=True)
await channel.set_qos(prefetch_count=1)
logger.info(f"Waiting for messages on queue '{self.queue_name}'")
try:
await queue.consume(self.on_request)
await asyncio.Future()
except asyncio.CancelledError:
logger.info("Consumer task cancelled.")
finally:
await connection_manager.close()
Implementing a Consumer
class AnalyzeConsumer(RpcConsumer):
def __init__(self):
super().__init__("python-service-queue")
async def process_message(self, body):
pattern = body.get("pattern", {})
data = body.get("data", {})
if pattern.get("cmd") == "analyze":
result = await self.analyze_service.process(data)
return {"status": "success", "data": result}
return {"error": "unknown_pattern", "message": f"Unknown pattern: {pattern}"}
Starting the Consumer
import asyncio
async def main():
consumer = AnalyzeConsumer()
await consumer.start_consuming()
if __name__ == "__main__":
asyncio.run(main())
Python RPC Producer
When Python needs to call other services, the producer sends requests and waits for responses using correlation IDs.
import asyncio
import uuid
import json
import logging
import aio_pika
from .connection import connection_manager
logger = logging.getLogger(__name__)
class RpcProducer:
def __init__(self):
self.futures = {}
async def on_response(self, message: aio_pika.IncomingMessage):
async with message.process():
correlation_id = message.correlation_id
if correlation_id in self.futures:
future = self.futures.pop(correlation_id)
future.set_result(message.body)
async def call(self, queue_name, message_data, timeout=10):
channel = await connection_manager.get_channel()
if not channel:
raise ConnectionError("Could not establish channel to RabbitMQ.")
correlation_id = str(uuid.uuid4())
future = asyncio.get_event_loop().create_future()
self.futures[correlation_id] = future
callback_queue = await channel.declare_queue(exclusive=True)
await callback_queue.consume(self.on_response)
try:
await channel.default_exchange.publish(
aio_pika.Message(
body=json.dumps(message_data).encode(),
content_type="application/json",
correlation_id=correlation_id,
reply_to=callback_queue.name,
),
routing_key=queue_name,
)
response_body = await asyncio.wait_for(future, timeout=timeout)
return json.loads(response_body)
except asyncio.TimeoutError:
logger.warning(f"Request to queue '{queue_name}' timed out.")
if correlation_id in self.futures:
self.futures.pop(correlation_id)
return {"error": "timeout", "message": "The request timed out."}
except Exception as e:
logger.error(f"An unexpected error occurred during RPC call: {e}")
if correlation_id in self.futures:
self.futures.pop(correlation_id)
raise
NestJS Message Format
NestJS sends messages in a specific format that Python must parse.
{
"pattern": { "cmd": "analyze" },
"data": { "text": "content to analyze" },
"id": "unique-message-id"
}
The Python consumer extracts pattern to route the request and data as the payload.
RPC Pattern Explained
RPC MESSAGE FLOW
┌──────────┐ ┌──────────┐ ┌──────────┐
│ NestJS │ │ RabbitMQ │ │ Python │
│ Client │ │ Broker │ │ Consumer │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
│ 1. Send message │ │
│ (correlation_id, reply_to) │ │
│────────────────────────────▶│ │
│ │ 2. Deliver to queue │
│ │────────────────────────────▶│
│ │ │
│ │ │ 3. Process
│ │ │
│ │ 4. Response to reply_to │
│ │◀────────────────────────────│
│ 5. Match correlation_id │ │
│◀────────────────────────────│ │
│ │ │
Running RabbitMQ
Local Development with Docker Compose
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
volumes:
rabbitmq_data:
Port 5672 is the AMQP protocol port for message communication. Port 15672 exposes the management UI at http://localhost:15672.
Azure Container Apps Deployment
Deploy RabbitMQ to Azure Container Apps with internal ingress for secure microservice communication.
az login --service-principal \
--username $AZURE_SERVICE_PRINCIPAL_ID \
--password $AZURE_SERVICE_PRINCIPAL_SECRET \
--tenant $AZURE_TENANT_ID
az acr login --name $AZURE_REGISTRY_NAME
docker tag rabbitmq:3-management $AZURE_CONTAINER_REGISTRY/rabbitmq:3-management
docker push $AZURE_CONTAINER_REGISTRY/rabbitmq:3-management
az containerapp create \
--name rabbitmq \
--resource-group $RESOURCE_GROUP \
--environment $CONTAINER_APP_ENV \
--image $AZURE_CONTAINER_REGISTRY/rabbitmq:3-management \
--target-port 5672 \
--ingress 'internal' \
--transport 'tcp' \
--min-replicas 1 \
--max-replicas 10 \
--registry-server $AZURE_CONTAINER_REGISTRY \
--registry-username $AZURE_REGISTRY_USERNAME \
--registry-password $AZURE_REGISTRY_PASSWORD \
--env-vars \
"RABBITMQ_DEFAULT_USER=secretref:rabbitmq-user" \
"RABBITMQ_DEFAULT_PASS=secretref:rabbitmq-password"
Key configuration:
- ingress: internal - Only accessible within the Container Apps environment
- transport: tcp - Required for AMQP protocol (not HTTP)
- secretref - References secrets stored in Azure Container Apps
Conclusion
RabbitMQ solves the fundamental challenge of cross-language microservice communication. By decoupling services through message queues, you gain reliability, scalability, and flexibility that direct HTTP calls cannot provide.
The RPC pattern with correlation IDs and reply queues enables request-response communication while maintaining the benefits of async messaging. Combined with proper error handling and connection management, this architecture handles production workloads reliably.
Whether you're connecting NestJS to Python, processing background tasks, or building event-driven systems, RabbitMQ provides the foundation for robust microservice communication.
Written by

Technical Lead and Full Stack Engineer leading a 5-engineer team at Fygurs (Paris, Remote) on Azure cloud-native SaaS. Graduate of 1337 Coding School (42 Network / UM6P). Writes about architecture, cloud infrastructure, and engineering leadership.