"<p>In this tutorial, we will setup RabbitMQ in a Node.js project using amqplib.</p> <h2><strong>What is RabbitMQ? </strong></h2> <p>RabbitMQ is a messaging broker - an intermediary for messaging. It gives your applications a common platform to send and receive messages, and your messages a safe place to live until received.<br /> Also, it’s lightweight and easy to deploy on premises and in the cloud. It supports multiple messaging protocols (AMQP, MQTT STOMP)</p> <p>In this short post, we will setup RabbitMQ in a Symfony 5 project using the messenger component of Symfony. We already published a tutorial that clarify all the steps above and more : <a href="https://devbrains.tn/tutorials/discover-rabbitmq-with-symfony-5-project">RabbitMQ Tutorial</a></p> <h3><strong>Requirements: </strong></h3> <p>1- Node.js installed</p> <p>2- RabbitMQ Instance</p> <p>3- Symfony Application (see this Tutorial)</p> <p> </p> <h3><strong>Intitializing the Node.js project:</strong></h3> <pre> <code>npm init</code></pre> <h3><strong>Install amqplib:</strong></h3> <pre> <code>npm i amqplib</code></pre> <h2><strong>Create Node.js server (server.js):</strong></h2> <pre> <code>const http = require("http"); const MyMessage = require("./MyMessage"); const server = http.createServer(); server.listen(3000); console.log("Server Started"); let producer = require("./producer"); producer.start(); setTimeout(()=>{ for (let i=0;i<500000;i++){ setTimeout(()=>{ let myMessage = JSON.stringify(new MyMessage("Node.js "+i,"Symfony","Hello World!")); producer.publish("","DevBrainsQueue",new Buffer.alloc(myMessage.length,myMessage)); console.log("Message Sent "+i); },(i*1000+1000)); } },3000);</code></pre> <h3><strong>Define your message structure:</strong></h3> <pre> <code>class MyMessage{ constructor(sender,receiver,content){ this.sender=sender; this.receiver=receiver; this.content=content; } } module.exports =MyMessage</code></pre> <h3><strong>Import producer script (producer.js):</strong></h3> <pre> <code>module.exports = { start, publish } var amqp = require('amqplib/callback_api'); //TODO Add Your CloudAMQP URL (create your instance on cloudamqp.com) process.env.CLOUDAMQP_URL = 'URL'; // if the connection is closed or fails to be established, we will reconnect var amqpConn = null; function start() { amqp.connect(process.env.CLOUDAMQP_URL + "?heartbeat=60", function(err, conn) { if (err) { console.error("[AMQP]", err.message); return setTimeout(start, 7000); } conn.on("error", function(err) { if (err.message !== "Connection closing") { console.error("[AMQP] conn error", err.message); } }); conn.on("close", function() { console.error("[AMQP] reconnecting"); return setTimeout(start, 7000); }); console.log("[AMQP] connected"); amqpConn = conn; whenConnected(); }); } function whenConnected() { startPublisher(); } var pubChannel = null; var offlinePubQueue = []; function startPublisher() { amqpConn.createConfirmChannel(function(err, ch) { if (closeOnErr(err)) return; ch.on("error", function(err) { console.error("[AMQP] channel error", err.message); }); ch.on("close", function() { console.log("[AMQP] channel closed"); }); pubChannel = ch; while (true) { var m = offlinePubQueue.shift(); console.log('M = ', m); if (!m) break; publish(m[0], m[1], m[2]); } }); } // method to publish a message, will queue messages internally if the connection is down and resend later function publish(exchange, routingKey, content) { try { // TODO change header pubChannel.publish(exchange, routingKey, content, { persistent: false, headers: { 'type': 'App\\Entity\\MyMessage','content_type':'application/json' } }, function(err, ok) { if (err) { console.error("[AMQP] publish", err); offlinePubQueue.push([exchange, routingKey, content]); pubChannel.connection.close(); } }); } catch (e) { console.error("[AMQP] publish", e.message); offlinePubQueue.push([exchange, routingKey, content]); } } function closeOnErr(err) { if (!err) return false; console.error("[AMQP] error", err); amqpConn.close(); return true; }</code></pre> <p> </p>"