Why Message Queues
Message queues decouple producers from consumers, enabling:
- Async processing — Return 202 Accepted immediately, process later
- Load leveling — Buffer traffic spikes instead of overwhelming downstream services
- Reliability — Messages persist even if consumers crash
- Fan-out — One event triggers multiple independent actions
RabbitMQ with amqplib
Exchange Types
RabbitMQ routes messages through exchanges to queues using different strategies.
Connection Setup
const amqp = require('amqplib');
class RabbitMQ {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
this.connection = await amqp.connect(process.env.RABBITMQ_URL);
this.channel = await this.connection.createChannel();
// Prefetch: process one message at a time per consumer
await this.channel.prefetch(1);
this.connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
});
this.connection.on('close', () => {
console.warn('RabbitMQ connection closed, reconnecting...');
setTimeout(() => this.connect(), 5000);
});
}
async close() {
await this.channel?.close();
await this.connection?.close();
}
}Publishing Messages
async function setupExchangeAndQueues(channel) {
// Declare exchange
await channel.assertExchange('orders', 'topic', { durable: true });
// Declare queues with DLX (Dead Letter Exchange)
await channel.assertExchange('orders.dlx', 'direct', { durable: true });
await channel.assertQueue('orders.dead-letter', { durable: true });
await channel.bindQueue('orders.dead-letter', 'orders.dlx', '');
// Main queue with dead-letter routing
await channel.assertQueue('orders.process', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'orders.dlx',
'x-message-ttl': 30000, // 30s timeout
},
});
await channel.bindQueue('orders.process', 'orders', 'order.created');
// Notification queue
await channel.assertQueue('orders.notify', { durable: true });
await channel.bindQueue('orders.notify', 'orders', 'order.*');
}
// Publish a message
async function publishOrder(channel, order) {
const message = Buffer.from(JSON.stringify({
id: order.id,
userId: order.userId,
total: order.total,
timestamp: Date.now(),
}));
channel.publish('orders', 'order.created', message, {
persistent: true, // Survives broker restart
contentType: 'application/json',
messageId: `order-${order.id}`,
headers: {
'x-retry-count': 0,
},
});
}Consuming with Retry Logic
async function consumeOrders(channel) {
channel.consume('orders.process', async (msg) => {
if (!msg) return;
const retryCount = (msg.properties.headers['x-retry-count'] || 0);
const order = JSON.parse(msg.content.toString());
try {
await processOrder(order);
channel.ack(msg);
console.log(`Order ${order.id} processed successfully`);
} catch (err) {
console.error(`Order ${order.id} failed (attempt ${retryCount + 1}):`, err.message);
if (retryCount < 3) {
// Retry with exponential backoff
const delay = Math.pow(2, retryCount) * 5000; // 5s, 10s, 20s
// Republish with incremented retry count
channel.publish('orders', 'order.created', msg.content, {
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount + 1,
'x-delay': delay,
},
});
channel.ack(msg); // Ack original to remove from queue
} else {
// Max retries exceeded → dead letter queue
channel.nack(msg, false, false);
console.error(`Order ${order.id} sent to DLQ after ${retryCount} retries`);
}
}
});
}AWS SQS
SQS is a fully managed queue — no infrastructure to maintain.
Setup with AWS SDK v3
const {
SQSClient,
SendMessageCommand,
ReceiveMessageCommand,
DeleteMessageCommand,
} = require('@aws-sdk/client-sqs');
const sqs = new SQSClient({ region: 'us-east-1' });
const QUEUE_URL = process.env.SQS_QUEUE_URL;
// Send message
async function sendToSQS(data) {
await sqs.send(new SendMessageCommand({
QueueUrl: QUEUE_URL,
MessageBody: JSON.stringify(data),
MessageAttributes: {
EventType: {
DataType: 'String',
StringValue: 'order.created',
},
},
// For FIFO queues:
// MessageGroupId: 'orders',
// MessageDeduplicationId: `order-${data.id}`,
}));
}
// Poll for messages
async function pollMessages() {
while (true) {
const { Messages } = await sqs.send(new ReceiveMessageCommand({
QueueUrl: QUEUE_URL,
MaxNumberOfMessages: 10,
WaitTimeSeconds: 20, // Long polling (reduces empty responses)
VisibilityTimeout: 60, // 60s to process before reappearing
MessageAttributeNames: ['All'],
}));
if (!Messages?.length) continue;
await Promise.allSettled(
Messages.map(async (msg) => {
try {
const data = JSON.parse(msg.Body);
await processOrder(data);
// Delete after successful processing
await sqs.send(new DeleteMessageCommand({
QueueUrl: QUEUE_URL,
ReceiptHandle: msg.ReceiptHandle,
}));
} catch (err) {
console.error('Failed to process message:', err);
// Message will reappear after VisibilityTimeout
}
})
);
}
}SQS Standard vs FIFO
| Feature | Standard | FIFO |
|---|---|---|
| Throughput | Unlimited | 3,000 msg/s (with batching) |
| Ordering | Best-effort | Strict FIFO |
| Delivery | At-least-once | Exactly-once |
| Deduplication | None | 5-minute window |
| Cost | Lower | Higher |
Idempotent Consumers
Since messages can be delivered more than once, consumers must be idempotent.
async function processOrder(order) {
// Check if already processed (using a unique idempotency key)
const existing = await db.processedEvents.findOne({
eventId: `order-${order.id}`,
});
if (existing) {
console.log(`Order ${order.id} already processed, skipping`);
return;
}
// Process the order
await db.transaction(async (tx) => {
await tx.orders.updateStatus(order.id, 'processing');
await tx.inventory.reserve(order.items);
await tx.processedEvents.create({
eventId: `order-${order.id}`,
processedAt: new Date(),
});
});
}RabbitMQ vs SQS
| Factor | RabbitMQ | SQS |
|---|---|---|
| Management | Self-hosted | Fully managed |
| Routing | Exchanges, bindings, topics | Simple queue |
| Protocol | AMQP | HTTP/REST |
| Latency | ~1ms | ~20-50ms |
| Cost | Server costs | Pay per request |
| Best for | Complex routing, low latency | Simple queuing on AWS |
Choose RabbitMQ when you need complex routing patterns, low latency, or are not on AWS. Choose SQS when you want zero infrastructure management and are already on AWS.
