Skip to content Skip to sidebar Skip to footer

Rabbitmq Headers Exchange Example Java

Skip to content
Headers Exchange in AMQP – RabbitMQ
  • Headers Exchange in RabbitMQ

Headers Exchange Type is the most powerful exchange type in AMQP (RabbitMQ). Headers exchanges route a message based on the message headers matching, they ignore routing keys.

The diagram here represents 3 Queues (HealthQ, SportsQ, and EducationQ) are bound to the Headers Exchange (my-header-exchange) with JSON like headers. We will discuss each of these headers below in detail.

The flow of a message in Headers Exchange

  • One or more Queues bind (linked) to a Headers Exchange using header properties( H ).
  • A Producer sends a message to this Exchange with a Header property (MH).
  • If MH matches with H, the message is forwarded to the Queue. The Headers matching algorithm is discussed next.
  • The consumers listening to the Queue receives the message and processes it.

The Headers matching algorithm

  • There are 2 types of headers matching allowed which are any (similar to logical OR) or all (similar to logical AND).
  • They are represented in the bindings as { "x-match", "any" ..} or { "x-match", "all" ..}.
  • The x-match = any means, a message sent to the Exchange should contain at least one of the headers that Queue is linked with, then the message will be routed to the Queue.
  • On the other hand, if a queue is bound with headers has x-match = all, messages that have all of its listed headers will be forwarded to the Queue.
  • You can see in the above diagram if a message with header {"h1": "Header1"} is sent to my-header-exchange, it will be routed to both HealthQ and EducationQ.
  • Only when the message has both the h1 and h2 headers with correct values, it will be forwarded to SportsQ, HealthQ and EducationQ as well.

Implementation of Headers Exchange

We will follow the same steps we did for other exchanges implementation.

Step-1
Create an instance of the Connection class as a singleton object if you have not done it yet. It is safe to share the same single instance of the Connection object and that is the recommended approach.

package com.amqp.exchanges.all;  import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class ConnectionManager {    private static Connection connection = null;    public static Connection getConnection() {     if (connection == null) {       try {         ConnectionFactory connectionFactory = new ConnectionFactory();         connection = connectionFactory.newConnection("amqp://guest:guest@localhost:5672/");       } catch (IOException | TimeoutException e) {         e.printStackTrace();       }     }     return connection;   } }                

Step-2
Create/declare a Headers Exchange with the namemy-header-exchange. We will make use of theChannel.exchangeDeclare method as shown below. Create a class asHeadersExchange and add the below code as a method in it.

                  public static void declareExchange() throws IOException, TimeoutException {     Channel channel = ConnectionManager.getConnection().createChannel();     //Declare my-header-exchange     channel.exchangeDeclare("my-header-exchange", BuiltinExchangeType.HEADERS, true);     channel.close();   }

Step-3
Declare the Queues which will be linked to this exchange. We will create 3 queues asHealthQ,SportsQ andEducationQ. Will set the durable=ture, autoDelete=false, exclusive=false and arguments=null as shown below.

                  public static void declareQueues() throws IOException, TimeoutException {     //Create a channel - do not share the Channel instance     Channel channel = ConnectionManager.getConnection().createChannel();      //Create the Queues     channel.queueDeclare("HealthQ", true, false, false, null);     channel.queueDeclare("SportsQ", true, false, false, null);     channel.queueDeclare("EducationQ", true, false, false, null);      channel.close();   }

Step-4
Till now we have declared the Headers Exchange and Queues. It is time to set the bindings between them using respective routing headers. Look at the routing headers for each of the Queues, HealthQ with any of h1 = Header1 or h2 = Header2, SportsQ with both h1 = Header1 and h2 = Header2, and EducationQ with any of h1 = Header1 or h2 = Header2. The protocol of any of the header or both of the header matching is defined in x-match header property.

                  public static void declareBindings() throws IOException, TimeoutException {     Channel channel = ConnectionManager.getConnection().createChannel();     //Create bindings - (queue, exchange, routingKey, headers) - routingKey != null     Map<String, Object> healthArgs = new HashMap<>();     healthArgs.put("x-match", "any"); //Match any of the header     healthArgs.put("h1", "Header1");     healthArgs.put("h2", "Header2");     channel.queueBind("HealthQ", "my-header-exchange", "", healthArgs);      Map<String, Object> sportsArgs = new HashMap<>();     sportsArgs.put("x-match", "all"); //Match all of the header     sportsArgs.put("h1", "Header1");     sportsArgs.put("h2", "Header2");     channel.queueBind("SportsQ", "my-header-exchange", "", sportsArgs);      Map<String, Object> educationArgs = new HashMap<>();     educationArgs.put("x-match", "any"); //Match any of the header     educationArgs.put("h1", "Header1");     educationArgs.put("h2", "Header2");     channel.queueBind("EducationQ", "my-header-exchange", "", educationArgs);      channel.close();   }

Step-5
Now you need the consumers listening to each of the Queues to see what messages they receive. As you can see there are 2 callbacks, the first one isDeliverCallback – with the parameters(consumerTag, message) and the second one isCancelCallback with the parameter –consumerTag. I have also added the message.getEnvelope() to give you an idea to obtain the exchange name and other properties from the received message.

                  public static void subscribeMessage() throws IOException, TimeoutException {     Channel channel = ConnectionManager.getConnection().createChannel();     channel.basicConsume("HealthQ", true, ((consumerTag, message) -> {       System.out.println("\n\n=========== Health Queue ==========");       System.out.println(consumerTag);       System.out.println("HealthQ: " + new String(message.getBody()));       System.out.println(message.getEnvelope());     }), consumerTag -> {       System.out.println(consumerTag);     });      channel.basicConsume("SportsQ", true, ((consumerTag, message) -> {       System.out.println("\n\n ============ Sports Queue ==========");       System.out.println(consumerTag);       System.out.println("SportsQ: " + new String(message.getBody()));       System.out.println(message.getEnvelope());     }), consumerTag -> {       System.out.println(consumerTag);     });      channel.basicConsume("EducationQ", true, ((consumerTag, message) -> {       System.out.println("\n\n ============ Education Queue ==========");       System.out.println(consumerTag);       System.out.println("EducationQ: " + new String(message.getBody()));       System.out.println(message.getEnvelope());     }), consumerTag -> {       System.out.println(consumerTag);     });   }

Step-6
Publish messages with different header properties. Observe the headers and compare the output. The first message is sent with h1 and h3 and the second message is sent with h1, h2 and h3.

                  public static void publishMessage() throws IOException, TimeoutException {     Channel channel = ConnectionManager.getConnection().createChannel();      String message = "Header Exchange example 1";     Map<String, Object> headerMap = new HashMap<>();     headerMap.put("h1", "Header1");     headerMap.put("h3", "Header3");     BasicProperties properties = new BasicProperties()         .builder().headers(headerMap).build();     channel.basicPublish("my-header-exchange", "", properties, message.getBytes());      message = "Header Exchange example 2";     headerMap.put("h2", "Header2");     properties = new BasicProperties()         .builder().headers(headerMap).build();     channel.basicPublish("my-header-exchange", "", properties, message.getBytes());     channel.close();   }                

Step-7
Let's put all together and invoke each of these methods inside a public static void main(String[] args) of the HeadersExchange class. I will use two separate threads to asynchronously run the publish and subscribe methods.

                  public static void main(String[] args) throws IOException, TimeoutException {     HeadersExchange.declareQueues();     HeadersExchange.declareExchange();     HeadersExchange.declareBindings();      //Threads created to publish-subscribe asynchronously     Thread subscribe = new Thread() {       @Override       public void run() {         try {           HeadersExchange.subscribeMessage();         } catch (IOException | TimeoutException e) {           e.printStackTrace();         }       }     };      Thread publish = new Thread() {       @Override       public void run() {         try {           HeadersExchange.publishMessage();         } catch (IOException | TimeoutException e) {           e.printStackTrace();         }       }     };      subscribe.start();     publish.start();   } }

Output:

=========== Health Queue ========== amq.ctag-jQKFauizf0BhnEHToOBlBQ HealthQ: Header Exchange example 1 Envelope(deliveryTag=1, redeliver=false, exchange=my-header-exchange, routingKey=)   =========== Health Queue ========== amq.ctag-jQKFauizf0BhnEHToOBlBQ HealthQ: Header Exchange example 2 Envelope(deliveryTag=2, redeliver=false, exchange=my-header-exchange, routingKey=)    ============ Sports Queue ========== amq.ctag-QK-momZTsJNXrLsZ4EtPBw SportsQ: Header Exchange example 2 Envelope(deliveryTag=3, redeliver=false, exchange=my-header-exchange, routingKey=)    ============ Education Queue ========== amq.ctag-S9ti35xtL_2-azL6kfgNFg EducationQ: Header Exchange example 1 Envelope(deliveryTag=4, redeliver=false, exchange=my-header-exchange, routingKey=)    ============ Education Queue ========== amq.ctag-S9ti35xtL_2-azL6kfgNFg EducationQ: Header Exchange example 2 Envelope(deliveryTag=5, redeliver=false, exchange=my-header-exchange, routingKey=)                

As you can see the first message reaches HealthQ and EducationQ as it has only h1. The second message reaches all the Queues as it meets x-match = any as well as x-match = all.

Conclusion:

I hope to have given you a practical guide on the Headers Exchange and its implementation. You need to understand its working and its application in your projects, specifically the way routing headers matching works. I would love to hear your feedback in the comments below if you have any.

Share This Page, Choose Your Platform!

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Page load link

vanleerfored1985.blogspot.com

Source: https://jstobigdata.com/rabbitmq/headers-exchange-in-amqp-rabbitmq/

Post a Comment for "Rabbitmq Headers Exchange Example Java"