Spring boot integrated Rabbit MQ uses a first experience

First experience of using Spring boot integrated Rabbit MQ

undefined

1.rabbit mq basic characteristics

First introduce several characteristics of rabbitMQ

Asynchronous Messaging
Supports multiple messaging protocols, message queuing, delivery acknowledgement, flexible routing to queues, multiple exchange type.

Asynchronous messages

Support multiple messaging protocols, message queuing, delivery confirmation, flexible routing rules, and multiple exchange types. These should be the core features of rabbitmq.

Developer Experience

Deploy with BOSH, Chef, Docker and Puppet. Develop cross-language messaging with favorite programming languages ​​such as: Java, .NET, PHP, Python, JavaScript, Ruby, Go, and many others.

Deployment experience?

Deploy with BOSH, Chef, Docker and Puppet. Use your favorite programming language to develop cross-language messaging, such as Java, .NET, PHP, Python, JavaScript, Ruby, Go, etc.

Distributed Deployment

Deploy as clusters for high availability and throughput; federate across multiple availability zones and regions.

Distributed deployment

Deployed as a cluster to achieve high availability and throughput; across multiple availability zones and regional unions.

Enterprise & Cloud Ready

Pluggable authentication, authorisation, supports TLS and LDAP. Lightweight and easy to deploy in public and private clouds.< /p>

Enterprise and cloud ready

Plugable authentication, authorization, support for TLS and LDAP. Lightweight and easy to deploy in public and private clouds.

Tools & Plugins

Diverse array of tools and plugins supporting continuous integration, operational metrics, and integration to other enterprise systems. Flexible plug- in approach for extending RabbitMQ functionality.

Tools&Plugins

There are a wide variety of tools and plug-ins, supporting continuous integration, operational indicators, and Integration of other enterprise systems. Flexible plug-in method for extending RabbitMQ functions.

Management & Monitoring

HTTP-API, command line tool, and UI for managing and monitoring RabbitMQ.

Management and monitoring

HTTP-API support, command line tools, management and monitoring interface.

undefined

2. The core concept of rabbit mq

①Message

Message, message is the carrier of data, composed of message header and message body. The message body is opaque, and the message header consists of a series of optional attributes, including routing-key (routing key, that is, how the message is distributed to the queue), priority (priority relative to other messages), delivery-mode (specify whether persistent storage is required)

②Publisher

The producer of the message, the client application that publishes the message to the exchange.

③Exchange

The exchange is used to receive the messages sent by the producer and route these messages to the specified queue according to the routing rules or exchange types. There are four types of switches: direct (default), fanout, topic, and headers. These four types support different routing strategies.

undefined

④Queue

The message queue is used to store messages until they are sent to consumers, and is a container for messages. A message can be stored in one or more queues, and will not be deleted from the queue until the consumer consumes the message.

⑤Binding

Binding, specify the binding rules of the switch and the queue, can be understood as a filter, when the routing key meets this binding rule, the message will be sent to queue. The binding between the switch and the queue can be a many-to-many relationship

⑥Connection

A TCP connection

⑦Channel

Channel, An independent bidirectional data stream channel in a multiplexed connection. A channel is a virtual connection established in a real TCP connection. AMQP commands are sent through the channel. Whether it is publishing messages, subscribing to queues or receiving messages, these actions are all completed through the channel. Because it is very expensive for the operating system to establish and destroy TCP, the concept of channel is introduced to reuse a TCP connection.

⑧Consumer

The consumer of the message represents a client application that obtains the message from the message queue.

⑨Virtual Host

Virtual Host means a batch of switches, message queues and related objects. A virtual host is an independent server domain that shares the same authentication and encryption environment. Each vhost is essentially a mini version of RabbitMQ server, with its own queue, switch, binding, and permission mechanism. vhost is the basis of AMQP concept and must be specified when connecting. The default vhost of RabbitMQ is /.

⑩Broker

Indicates the message queue server entity.

For more detailed instructions, please refer to the official document: https://www.rabbitmq.com/tutorials/amqp-concepts.html

3 .Switch type and message routing

  • Direct Exchange

rabbitmq-direct.png

If the routing key in the message is consistent with the binding key in the Binding, the exchange will send the message to the corresponding queue. The routing key exactly matches the queue name. If a queue is bound to the switch and requires the routing key to be “dog”, only messages marked with the routing key “dog” will be forwarded, and “dog.puppy” or “dog.puppy” will not be forwarded. dog.guard” and so on. It is an exact match, unicast mode.

  • Fanout Exchange

rabbitmq-fanout.png

Every message sent to the fanout exchange will be distributed to all bound queues. The fanout exchange does not process the routing key [routing key is ignored], but simply binds the queue to the exchange, and each message sent to the exchange will be forwarded to all the queues bound to the exchange. Much like a subnet broadcast, every host in the subnet gets a copy of the message. The fanout type is the fastest to forward messages.

  • Topic Exchange

rabbitmq-topic.png

The topic exchange assigns the routing key attribute of the message through pattern matching, and matches the routing key with a pattern. At this time, the queue needs to be bound to a pattern. It divides the string of routing key and binding key into words, and these words are separated by dots. It also recognizes two wildcard characters: the symbol “#” and the symbol “.

Note #Match 0 or more words, *Match one word

4. Start using

< p>Let’s take a look at what the official spring boot documentation says.

First, add these configurations

spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /test

The default username and password of rabbitmq is guest:guest

First, configure the class, RabbitTemplate uses the auto-configured one, and it is automatically injected in. We also need to configure a RabbitAdmin object. There are two RabbitAdmins. Construction method

/*** Construct an instance using the provided {@link ConnectionFactory}.* @param connectionFactory the connection factory-must not be null.*/public RabbitAdmin(ConnectionFactory connectionFactory ) {Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); this.connectionFactory = connectionFactory; this.rabbitTemplate = new RabbitTemplate(connectionFactory);}/*** Construct an instance using the provided {@link RabbitTemplate}. Use this* constructor when, for example, you want the admin operations to be performed within* the scope of the provided template's {@code invoke()} method.* @param rabbitTemplate the template-must not be null and must have a connection * f actory.* @since 2.0*/public RabbitAdmin(RabbitTemplate rabbitTemplate) {Assert.notNull(rabbitTemplate, "RabbitTemplate must not be null"); Assert.notNull(rabbitTemplate.getConnectionFactory(), "RabbitTemplate's ConnectionFactory must not be null"); this.connectionFactory = rabbitTemplate.getConnectionFactory(); this.rabbitTemplate = rabbitTemplate;}

But actually looking at their constructors, I found that if we don’t need to customize the RabbitTemplate ourselves, just use the first construction method. .

@Configurationpublic class RabbitMqConfig {@Autowired RabbitTemplate rabbitTemplate; @Bean public RabbitAdmin rabbitAdmin() {return new RabbitAdmin(rabbitTemplate); }}

Similar to this, just Configured.

Next, write a test class to test the declaration of exchanges, queues, and operations such as sending and receiving messages.

First, declare the switch type. The corresponding construction methods for the four switches are as follows

//The parameter lists are: 1. The name of the switch, 2. Whether it is persistent 3. Whether to automatically delete [refers to whether to automatically delete the exchange when the last queue bound to it is deleted] TopicExchange topicExchange = new TopicExchange("default.topic", true, false); DirectExchange directExchange = new DirectExchange("default.direct", true, false);FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);rabbitAdmin.declareExchange (topicExchange);rabbitAdmin.declareExchange(directExchange);rabbitAdmin.declareExchange(fanoutExchange);rabbitAdmin.declareExchange(headersExchange);

Then the declaration queue

// 1. Queue name, 2. Declare a persistent queue, 3. Declare an independent queue, 4. Whether to automatically delete the queue Queue queue1 = new Queue("queue1", true, false, false);Queue queue2 = new Queue("queue2 ", true, false, false);Queue queue3 = new Queue("queue3", true, false, false);Queue queue4 = new Queue("queue4", true, false, false);rabbitAdmin.declareQueue(queue1); rabbitAdmin.declareQueue(queue2);rabbitAd min.declareQueue(queue3);rabbitAdmin.declareQueue(queue4);

Then bind the queue and the switch to each other

//1.queue: bound Queue, 2.topicExchange: bound to that exchange, 3.test.send.topic: bound routing name [routing key]rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange)); rabbitAdmin.declareBinding (BindingBuilder.bind(queue2).to(fanoutExchange));rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));rabbitAdmin.declareBinding(BindingBuilder.bind(queue4) .to(directExchange).with("mq.direct"));

Because the fanout type switch ignores the routing key attribute, it does not need to be set.

The complete test code is as follows

@SpringBootTest@RunWith(SpringJUnit4ClassRunner.class) public class RabbitMqTest {@Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Test public void testDeclare() {//The parameter lists are: 1. The name of the exchange, 2. Whether it is persistent, 3. Whether it is automatically deleted [refers to whether it is automatically deleted when the last queue bound to it is deleted Switch] TopicExchange topicExchange = new TopicExchange("default.topic", true, false); DirectExchange directExchange = new DirectExchange("default.direct", true, false); FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false); HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareExchange(directExchange); rabbitAdmin.declareExchange(fanoutExchange); rabbitAdmin.declareExchange(headersExchange); // 1. Queue name, 2. Declare a persistent queue, 3. Declare an independent queue, 4. Whether to automatically delete the queue Queue queue1 = new Queue("queue1", t rue, false, false); Queue queue2 = new Queue("queue2", true, false, false); Queue queue3 = new Queue("queue3", true, false, false); Queue queue4 = new Queue("queue4" , true, false, false); rabbitAdmin.declareQueue(queue1); rabbitAdmin.declareQueue(queue2); rabbitAdmin.declareQueue(queue3); rabbitAdmin.declareQueue(queue4); //1.queue: bound queue, 2.topicExchange : Bind to that exchange, 3.test.send.topic: binding routing name [routing key] rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange)); rabbitAdmin.declareBinding(BindingBuilder.bind( queue2).to(fanoutExchange)); rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*")); rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange) .with("mq.direct")); }}

The results are as follows:

Snipaste_2019-09-28_17-33-54.png

Snipaste_2019-09 -28_17-35-35.png

Look at the binding situation again:

Direct Switch

Snipaste_2019-09-28_17-36-26.png

Fanout switch

Snipaste_2019-09-28_17-37-06.png

Topic Switch

Snipaste_2019-09-28_17-38-59.png

All tests are successful, and then you can start sending messages.

There are multiple APIs available for sending messages. Choose the highlighted API here. There is actually a send method available, but you need to build the message yourself

Snipaste_2019- 09-28_17-45-28.png

Snipaste_2019-09-28_17-54 -51.png

@Testpublic void testSendMessage() {//1. Switch , 2. Routing key, 3. Message body sent [The message body here will be automatically converted into a message, or you can build a message object yourself] rabbitTemplate.convertAndSend("default.topic","mq.whatever.this.is",new Student(1,"mmp","male",234));}

The test results are as follows:

Snipaste_2019-09-28_18-01-01. png

Be sure to pay attention to the matching rules of the routing keys of topic-type switches, #match 0 or more A word, *match a word

Then if you don’t want to create a switch and queue in the test class every time, what can you do? The CommandLineRunner interface can be implemented in the program entry class. The code is as follows. In this case, it will be declared once every time it is started. Of course, repeated declarations will not report an error, but it will overwrite the previous declaration. For example, the routing key defined in the previous declaration may be Is covered.

@SpringBootApplication@EnableRabbitpublic class AmqpApplication implements CommandLineRunner {@Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; public static void main(String[] args) {SpringApplication.run(AmqpApplication. class, args);} @Override public void run(String... args) throws Exception {//The parameter lists are: 1. Exchanger name, 2. Whether to persist, 3. Whether to delete automatically [refers to when When the last queue bound to it is deleted, whether to automatically delete the exchange] TopicExchange topicExchange = new TopicExchange("default.topic", true, false); DirectExchange directExchange = new DirectExchange("default.direct", true, false) ; FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false); HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareExchange(directExchange); rabbitAdmin .declareExchange(fanoutExchange); rabbit Admin.declareExchange(headersExchange); //1. Queue name, 2. Declare a persistent queue, 3. Declare an independent queue, 4. Whether to automatically delete the queue Queue queue1 = new Queue("queue1", true, false, false) ; Queue queue2 = new Queue("queue2", true, false, false); Queue queue3 = new Queue("queue3", true, false, false); Queue queue4 = new Queue("queue4", true, false, false ); rabbitAdmin.declareQueue(queue1); rabbitAdmin.declareQueue(queue2); rabbitAdmin.declareQueue(queue3); rabbitAdmin.declareQueue(queue4); //1.queue: bound queue, 2.topicExchange: bound to that exchange器,3.test.send.topic: binding routing name [routing key] rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange)); rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange )); rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*")); rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq. direct")); }}

Continue to test the received message, it is convenient to have a comment.

Snipaste_2019-09-28_18-10-09.png

< pre class="java">@Servicepublic class ReceiverService {@RabbitListener(queues = {“queue3”}) public void receive(Student student) {System.out.println(“Receive the message and print it:”+student);} }

The test results are as follows:

The message is received and printed: student{id=1, name='mmp', gender='male', age=234}

You can also use methods to receive messages directly

@Testpublic void testReceive() {Student student = (Student) rabbitTemplate.receiveAndConvert("queue3"); System.out.println (student);}

The test results are as follows:

student{id=1, name='mmp', gender='male', age=234} 

What if you want to send student objects in JSON format?

Need to customize:

@Configurationpublic class RabbitMqConfig {@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter (new Jackson2JsonMessageConverter()); return rabbitTemplate;} @Bean public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {return new RabbitAdmin(rabbitTemplate); }}

Test it:

Snipaste_2019-09-28_18-21-46.png

Source code address: https://github .com/lingEric/springboot-integration-hello

For more official tutorials, please visit https://github.com/rabbitmq/rabbitmq-tutorials

Leave a Comment

Your email address will not be published.