When building a Magento website with many services involved, a developer could face several problems with cross-calling between services. Message queues provide a system for components/applications to asynchronously communicate with each other: Clients send requests to the server and receive an acknowledgment right away. The message is added into the queue and stored until the consumers retrieve them. Let's we show you how to creat a message queue in Magento 2.

Magento provides the Message Queue Framework (MQF) for publishing messages to queues and creating consumers to receive them asynchronously. MQF supports MySql and RabbitMQ queue systems.

In this article, we will create a message queue in Magento 2 using the MySql adapter.

Let’s start with an issue: A merchant bought our SalesForce integration to sync their Magento 2 site with a SalesForce company. The site already has 20k orders, and adding all of them to the queue table ‘magenest_salesfore_queue’ table (which integration uses to prepare sync entities) often leads to timeouts due to running out of memory. To work around this issue, we will create a message queue to add orders to our table asynchronously.

The message queue described in this article applies to Magento 2.2 or newer.

Configuring the message queue topology

A message queue requires 4 XML files in <vendor>/<module>/etc folder:

  • communication.xml - Defines aspects of the message queue system that all communication types have in common.
  • queue_consumer.xml - Defines the relationship between an existing queue and its consumer.
  • queue_topology.xml - Defines the message routing rules and declares queues and exchanges.
  • queue_publisher.xml - Defines the exchange where a topic is published.

communication.xml

We declare our topic as ‘salesforce.queue.order

  • Define its datatype ‘string‘. This is the datatype for pushing any messages through Magento
  • Declare handler class ‘Magenest\Salesforce\Model\Queue\Consumer’ with ‘process’ method to handle input from the queue
<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="salesforce.queue.order" request="string">
        <handler name="processAddOrderToQueue"  type="Magenest\Salesforce\Model\Queue\Consumer" method="process" />
    </topic>
</config>

Next up, the handler class Magenest\Salesforce\Model\Queue\Consumer.php

Class Consumer
{
    /** @var \Psr\Log\LoggerInterface  */
    protected $_logger;

    public function process($orders)
    {
        try{
		 //function execute handles saving order object to table
            $this->execute($orders);

        }catch (\Exception $e){
            //logic to catch and log errors 
            $this->_logger->critical($e->getMessage());
        }
    }
}

Execute function is a placeholder for now, we’ll return to this after step 2

queue_consumer.xml

In this file, we define the consumer parameters. It’s also possible to declare a handler class in this file.

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="salesforce.queue.order"
              queue="salesforce.queue.order"
              connection="db"
              maxMessages="5000"
              consumerInstance="Magento\Framework\MessageQueue\Consumer"
              handler="Magenest\Salesforce\Model\Queue\Consumer::process"/>
</config>
  • The name and queue attributes are required.
  • The connection: For AMQP connections, the connection name must match the connection attribute in the queue_topology.xml file. Otherwise, the connection name must be ‘db’.

Note: handler class can be declared in either communication.xml or queue_consumer.xml . It’s not required to declare a handler in both places.

queue_topology.xml

This file defines the message routing rules and declares queues and exchanges

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento-db" type="topic" connection="db">
        <binding id="processAddOrderToQueueBinding"
                 topic="salesforce.queue.order"
                 destinationType="queue"
                 destination="salesforce.queue.order"/>
    </exchange>
</config>
  • Exchange name, type, connection attributes are required.
  • The exchange name should match with the exchange attribute of connection node in queue_publisher.xml file.
  • Type ‘topic’ meaning the exchange routes events to queues by matching the topic. 
  • Connection is ‘db’ since we’re using MySQL for the queue system.

queue_publisher.xml

<?xml version="1.0"?>
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <publisher topic="salesforce.queue.order">
        <connection name="db" exchange="magento-db" />
    </publisher>
</config>

The topic attribute matches the one defined in communication.xml.

Sending a message from the publisher to a queue

In this step, we will create a controller to send messages from the publisher to a queue. Since we’re using Mysql as queue systems, messages are stored in Magento’s database.

  • queue table manages queues
  • queue_message table stores messages. 
  • queue_message_status manages the message queue workload

Our module has a button in config page sending ajax request to salesforce/queue/order to start adding orders to the queue. With message queue integration, the content of this action class is as below:

<?php
namespace Magenest\Salesforce\Controller\Adminhtml\Queue;

/**
 * Class Order
 * @package Magenest\Salesforce\Controller\Adminhtml\Queue
 */
class Order extends \Magento\Backend\App\Action
{
	/**
 	* Authorization level of a basic admin session
 	*/
	const ADMIN_RESOURCE = 'Magenest_Salesforce::config_salesforce';

	const TOPIC_NAME = 'salesforce.queue.order';

	const SIZE = 5000;

	/* @var \Magento\Sales\Model\ResourceModel\Order\CollectionFactory  /
	protected $_orderColFactory;

	/* @var \Magento\Framework\Serialize\Serializer\Json  /
	protected $_json;

	/* @var \Magento\Framework\MessageQueue\PublisherInterface  /
	protected $_publisher;

	/**
 	* Order constructor.
 	*
 	* @param \Magento\Sales\Model\ResourceModel\Order\CollectionFactory $orderColFactory
 	* @param \Magento\Framework\MessageQueue\PublisherInterface $publisher
 	* @param \Magento\Framework\Serialize\Serializer\Json $json
 	* @param \Magento\Backend\App\Action\Context $context
 	*/
	public function __construct(
    	\Magento\Sales\Model\ResourceModel\Order\CollectionFactory $orderColFactory,
    	\Magento\Framework\MessageQueue\PublisherInterface $publisher,
    	\Magento\Framework\Serialize\Serializer\Json $json,
    	\Magento\Backend\App\Action\Context $context
	){
    	$this->_orderColFactory = $orderColFactory;
    	$this->_json = $json;
    	$this->_publisher = $publisher;
	}

	/**
 	* @return \Magento\Framework\App\ResponseInterface|\Magento\Framework\Controller\ResultInterface|void
 	*/
	public function execute()
	{
    	if ($this->getRequest()->isAjax()) {
        	try {
		//get list of order IDs
$orderCollection = $this->_orderColFactory->create()->addFieldToSelect('entity_id')->getAllIds();
		//send data to publish function
            	$this->publishData($orderCollection, $this->type);
            	$this->getResponse()->setBody($this->_json->serialize([
                	'error' => 0,
                	'message' => __('Orders are being added to queue')
            	]));
            	return;
        	} catch (\Exception $e) {
            	$this->getResponse()->setBody($this->_json->serialize([
                	'error' => 0,
'message' => __('Something went wrong while adding record(s) to queue. Error: '.$e->getMessage())
            	]));
            	return;
        	}
    	}
    	return $this->_redirect('*/*/index');
	}

	/**
 	* @param $data
 	* @param $type
 	*/
	public function publishData($data,$type)
	{
    	if(is_array($data)){
		//split list of IDs into arrays of 5000 IDs each
        	$chunks = array_chunk($data,self::SIZE);
        	foreach ($chunks as $chunk){
			//publish IDs to queue
            		$rawData = [$type => $chunk];
$this->_publisher->publish(self::TOPIC_NAME, $this->_json->serialize($rawData));
        	}
    	}
	}
}

In this action class, we fetch the list of order IDs and split them into chunks of 5000 IDs each, then add each chunk to the message queue. This solves two problems:

  • Unresponsive Magento backend
  • PHP timeout due to too many orders needed to be processed

Processing message from queue

In this step we’ll expand the handler class declared in step 1:

Magenest\Salesforce\Model\Queue\Consumer.php

<?php
namespace Magenest\Salesforce\Model\Queue;

/**
 * Class Consumer
 * @package Magenest\Salesforce\Model\Queue
 */
class Consumer
{
	....
	/* @var \Magento\Framework\Serialize\Serializer\Json  /     
protected $_json;

	/**
 	* @param string $orders
 	*/
	public function process($orders)
	{
    	try{
        	$this->execute($orders);
        	
    	}catch (\Exception $e){
        	$errorCode = $e->getCode();
        	$message = __(Something went wrong while adding orders to queue');
        	$this->_notifier->addCritical(
            	$errorCode,
            	$message
        	);
        	$this->_logger->critical($errorCode .": ". $message);
    	}
	}

	/**
 	* @param $orderItems
 	*
 	* @throws LocalizedException
 	*/
	private function execute($orderItems)
	{
    	$orderCollectionArr = [];
    	/* @var \Magenest\Salesforce\Model\Queue $queue /
    	$queue = $this->_queueFactory->create();
    	$orderItems = $this->_json->unserialize($orderItems);
    	if(is_array($orderItems)){
        	foreach ($orderItems as $type => $orderId) {
            $orderCollectionArr[] = [
                	'type' => 'order',
                	'entity_id' => $orderId,
                	'priority' => 1,
            	];
        	}
        	//handle insertMulti orders into Salesforce queue
        	$queue->add($orderCollectionArr);
    	}
	}
}

You can find the full content of this class here

Executing message queue in Magento 2

For Magento to recognize our queue, run the following commands from root folder:

  • php bin/magento setup:upgrade
  • php bin/magento setup:di:compile

You can check if the queue is registered by running  ‘php bin/magento queue:consumers:list’. Registered consumers will appear in the result:

run command to creat message queue in magento 2

Consumers are executed by cron, declared under Magento/MessageQueue/etc/crontab.xml

Aside from running via cron, consumers can be executed with the following command:

php bin/magento queue:consumers:start [--max-messages=<value>] [--batch-size=<value>] [--single-thread] [--area-code=<value>] <consumer_name>

For example, to run the queue we created in this article:

bin/magento queue:consumers:start salesforce.queue.order

After adding messages to queue, the 3 tables are updated accordingly:

  • queue table:
  • queue_message table
  • queue_message_status table

We hope to provide you with a clear view of message queues with Magento 2’s using MySql in this blog. Thank you for reading!