First experience of using Spring boot integrated Rabbit MQ
1.rabbit mq basic characteristics
First introduce several characteristics of rabbitMQ
Asynchronous Messaging
Supports multiple messaging protocols, message queuing, delivery acknowledgement, flexible routing to queues, multiple exchange type.
Asynchronous messages
Support multiple messaging protocols, message queuing, delivery confirmation, flexible routing rules, and multiple exchange types. These should be the core features of rabbitmq.
Developer Experience
Deploy with BOSH, Chef, Docker and Puppet. Develop cross-language messaging with favorite programming languages such as: Java, .NET, PHP, Python, JavaScript, Ruby, Go, and many others.
Deployment experience?
Deploy with BOSH, Chef, Docker and Puppet. Use your favorite programming language to develop cross-language messaging, such as Java, .NET, PHP, Python, JavaScript, Ruby, Go, etc.
Distributed Deployment
Deploy as clusters for high availability and throughput; federate across multiple availability zones and regions.
Distributed deployment
Deployed as a cluster to achieve high availability and throughput; across multiple availability zones and regional unions.
Enterprise & Cloud Ready
Pluggable authentication, authorisation, supports TLS and LDAP. Lightweight and easy to deploy in public and private clouds.< /p>
Enterprise and cloud ready
Plugable authentication, authorization, support for TLS and LDAP. Lightweight and easy to deploy in public and private clouds.
Tools & Plugins
Diverse array of tools and plugins supporting continuous integration, operational metrics, and integration to other enterprise systems. Flexible plug- in approach for extending RabbitMQ functionality.
Tools&Plugins
There are a wide variety of tools and plug-ins, supporting continuous integration, operational indicators, and Integration of other enterprise systems. Flexible plug-in method for extending RabbitMQ functions.
Management & Monitoring
HTTP-API, command line tool, and UI for managing and monitoring RabbitMQ.
Management and monitoring
HTTP-API support, command line tools, management and monitoring interface.
2. The core concept of rabbit mq
①Message
Message, message is the carrier of data, composed of message header and message body. The message body is opaque, and the message header consists of a series of optional attributes, including routing-key (routing key, that is, how the message is distributed to the queue), priority (priority relative to other messages), delivery-mode (specify whether persistent storage is required)
②Publisher
The producer of the message, the client application that publishes the message to the exchange.
③Exchange
The exchange is used to receive the messages sent by the producer and route these messages to the specified queue according to the routing rules or exchange types. There are four types of switches: direct (default), fanout, topic, and headers. These four types support different routing strategies.
④Queue
The message queue is used to store messages until they are sent to consumers, and is a container for messages. A message can be stored in one or more queues, and will not be deleted from the queue until the consumer consumes the message.
⑤Binding
Binding, specify the binding rules of the switch and the queue, can be understood as a filter, when the routing key meets this binding rule, the message will be sent to queue. The binding between the switch and the queue can be a many-to-many relationship
⑥Connection
A TCP connection
⑦Channel
Channel, An independent bidirectional data stream channel in a multiplexed connection. A channel is a virtual connection established in a real TCP connection. AMQP commands are sent through the channel. Whether it is publishing messages, subscribing to queues or receiving messages, these actions are all completed through the channel. Because it is very expensive for the operating system to establish and destroy TCP, the concept of channel is introduced to reuse a TCP connection.
⑧Consumer
The consumer of the message represents a client application that obtains the message from the message queue.
⑨Virtual Host
Virtual Host means a batch of switches, message queues and related objects. A virtual host is an independent server domain that shares the same authentication and encryption environment. Each vhost is essentially a mini version of RabbitMQ server, with its own queue, switch, binding, and permission mechanism. vhost is the basis of AMQP concept and must be specified when connecting. The default vhost of RabbitMQ is /.
⑩Broker
Indicates the message queue server entity.
For more detailed instructions, please refer to the official document: https://www.rabbitmq.com/tutorials/amqp-concepts.html
3 .Switch type and message routing
- Direct Exchange
If the routing key in the message is consistent with the binding key in the Binding, the exchange will send the message to the corresponding queue. The routing key exactly matches the queue name. If a queue is bound to the switch and requires the routing key to be “dog”, only messages marked with the routing key “dog” will be forwarded, and “dog.puppy” or “dog.puppy” will not be forwarded. dog.guard” and so on. It is an exact match, unicast mode.
- Fanout Exchange
Every message sent to the fanout exchange will be distributed to all bound queues. The fanout exchange does not process the routing key [routing key is ignored], but simply binds the queue to the exchange, and each message sent to the exchange will be forwarded to all the queues bound to the exchange. Much like a subnet broadcast, every host in the subnet gets a copy of the message. The fanout type is the fastest to forward messages.
- Topic Exchange
The topic exchange assigns the routing key attribute of the message through pattern matching, and matches the routing key with a pattern. At this time, the queue needs to be bound to a pattern. It divides the string of routing key and binding key into words, and these words are separated by dots. It also recognizes two wildcard characters: the symbol “#” and the symbol ““.
Note #Match 0 or more words, *Match one word
4. Start using
< p>Let’s take a look at what the official spring boot documentation says.
First, add these configurations
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /test
The default username and password of rabbitmq is guest:guest
First, configure the class, RabbitTemplate uses the auto-configured one, and it is automatically injected in. We also need to configure a RabbitAdmin object. There are two RabbitAdmins. Construction method
/*** Construct an instance using the provided {@link ConnectionFactory}.* @param connectionFactory the connection factory-must not be null.*/public RabbitAdmin(ConnectionFactory connectionFactory ) {Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); this.connectionFactory = connectionFactory; this.rabbitTemplate = new RabbitTemplate(connectionFactory);}/*** Construct an instance using the provided {@link RabbitTemplate}. Use this* constructor when, for example, you want the admin operations to be performed within* the scope of the provided template's {@code invoke()} method.* @param rabbitTemplate the template-must not be null and must have a connection * f actory.* @since 2.0*/public RabbitAdmin(RabbitTemplate rabbitTemplate) {Assert.notNull(rabbitTemplate, "RabbitTemplate must not be null"); Assert.notNull(rabbitTemplate.getConnectionFactory(), "RabbitTemplate's ConnectionFactory must not be null"); this.connectionFactory = rabbitTemplate.getConnectionFactory(); this.rabbitTemplate = rabbitTemplate;}
But actually looking at their constructors, I found that if we don’t need to customize the RabbitTemplate ourselves, just use the first construction method. .
@Configurationpublic class RabbitMqConfig {@Autowired RabbitTemplate rabbitTemplate; @Bean public RabbitAdmin rabbitAdmin() {return new RabbitAdmin(rabbitTemplate); }}
Similar to this, just Configured.
Next, write a test class to test the declaration of exchanges, queues, and operations such as sending and receiving messages.
First, declare the switch type. The corresponding construction methods for the four switches are as follows
//The parameter lists are: 1. The name of the switch, 2. Whether it is persistent 3. Whether to automatically delete [refers to whether to automatically delete the exchange when the last queue bound to it is deleted] TopicExchange topicExchange = new TopicExchange("default.topic", true, false); DirectExchange directExchange = new DirectExchange("default.direct", true, false);FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false);HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false);rabbitAdmin.declareExchange (topicExchange);rabbitAdmin.declareExchange(directExchange);rabbitAdmin.declareExchange(fanoutExchange);rabbitAdmin.declareExchange(headersExchange);
Then the declaration queue
// 1. Queue name, 2. Declare a persistent queue, 3. Declare an independent queue, 4. Whether to automatically delete the queue Queue queue1 = new Queue("queue1", true, false, false);Queue queue2 = new Queue("queue2 ", true, false, false);Queue queue3 = new Queue("queue3", true, false, false);Queue queue4 = new Queue("queue4", true, false, false);rabbitAdmin.declareQueue(queue1); rabbitAdmin.declareQueue(queue2);rabbitAd min.declareQueue(queue3);rabbitAdmin.declareQueue(queue4);
Then bind the queue and the switch to each other
//1.queue: bound Queue, 2.topicExchange: bound to that exchange, 3.test.send.topic: bound routing name [routing key]rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange)); rabbitAdmin.declareBinding (BindingBuilder.bind(queue2).to(fanoutExchange));rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*"));rabbitAdmin.declareBinding(BindingBuilder.bind(queue4) .to(directExchange).with("mq.direct"));
Because the fanout type switch ignores the routing key attribute, it does not need to be set.
The complete test code is as follows
@SpringBootTest@RunWith(SpringJUnit4ClassRunner.class) public class RabbitMqTest {@Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; @Test public void testDeclare() {//The parameter lists are: 1. The name of the exchange, 2. Whether it is persistent, 3. Whether it is automatically deleted [refers to whether it is automatically deleted when the last queue bound to it is deleted Switch] TopicExchange topicExchange = new TopicExchange("default.topic", true, false); DirectExchange directExchange = new DirectExchange("default.direct", true, false); FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false); HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareExchange(directExchange); rabbitAdmin.declareExchange(fanoutExchange); rabbitAdmin.declareExchange(headersExchange); // 1. Queue name, 2. Declare a persistent queue, 3. Declare an independent queue, 4. Whether to automatically delete the queue Queue queue1 = new Queue("queue1", t rue, false, false); Queue queue2 = new Queue("queue2", true, false, false); Queue queue3 = new Queue("queue3", true, false, false); Queue queue4 = new Queue("queue4" , true, false, false); rabbitAdmin.declareQueue(queue1); rabbitAdmin.declareQueue(queue2); rabbitAdmin.declareQueue(queue3); rabbitAdmin.declareQueue(queue4); //1.queue: bound queue, 2.topicExchange : Bind to that exchange, 3.test.send.topic: binding routing name [routing key] rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange)); rabbitAdmin.declareBinding(BindingBuilder.bind( queue2).to(fanoutExchange)); rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*")); rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange) .with("mq.direct")); }}
The results are as follows:
Look at the binding situation again:
Direct Switch
p>
Fanout switch
Topic Switch
All tests are successful, and then you can start sending messages.
There are multiple APIs available for sending messages. Choose the highlighted API here. There is actually a send method available, but you need to build the message yourself
@Testpublic void testSendMessage() {//1. Switch , 2. Routing key, 3. Message body sent [The message body here will be automatically converted into a message, or you can build a message object yourself] rabbitTemplate.convertAndSend("default.topic","mq.whatever.this.is",new Student(1,"mmp","male",234));}
The test results are as follows:
Be sure to pay attention to the matching rules of the routing keys of topic-type switches, #match 0 or more A word, *match a word
Then if you don’t want to create a switch and queue in the test class every time, what can you do? The CommandLineRunner interface can be implemented in the program entry class. The code is as follows. In this case, it will be declared once every time it is started. Of course, repeated declarations will not report an error, but it will overwrite the previous declaration. For example, the routing key defined in the previous declaration may be Is covered.
@SpringBootApplication@EnableRabbitpublic class AmqpApplication implements CommandLineRunner {@Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitAdmin rabbitAdmin; public static void main(String[] args) {SpringApplication.run(AmqpApplication. class, args);} @Override public void run(String... args) throws Exception {//The parameter lists are: 1. Exchanger name, 2. Whether to persist, 3. Whether to delete automatically [refers to when When the last queue bound to it is deleted, whether to automatically delete the exchange] TopicExchange topicExchange = new TopicExchange("default.topic", true, false); DirectExchange directExchange = new DirectExchange("default.direct", true, false) ; FanoutExchange fanoutExchange = new FanoutExchange("default.fanout", true, false); HeadersExchange headersExchange = new HeadersExchange("default.headers", true, false); rabbitAdmin.declareExchange(topicExchange); rabbitAdmin.declareExchange(directExchange); rabbitAdmin .declareExchange(fanoutExchange); rabbit Admin.declareExchange(headersExchange); //1. Queue name, 2. Declare a persistent queue, 3. Declare an independent queue, 4. Whether to automatically delete the queue Queue queue1 = new Queue("queue1", true, false, false) ; Queue queue2 = new Queue("queue2", true, false, false); Queue queue3 = new Queue("queue3", true, false, false); Queue queue4 = new Queue("queue4", true, false, false ); rabbitAdmin.declareQueue(queue1); rabbitAdmin.declareQueue(queue2); rabbitAdmin.declareQueue(queue3); rabbitAdmin.declareQueue(queue4); //1.queue: bound queue, 2.topicExchange: bound to that exchange器,3.test.send.topic: binding routing name [routing key] rabbitAdmin.declareBinding(BindingBuilder.bind(queue1).to(fanoutExchange)); rabbitAdmin.declareBinding(BindingBuilder.bind(queue2).to(fanoutExchange )); rabbitAdmin.declareBinding(BindingBuilder.bind(queue3).to(topicExchange).with("mq.*")); rabbitAdmin.declareBinding(BindingBuilder.bind(queue4).to(directExchange).with("mq. direct")); }}
Continue to test the received message, it is convenient to have a comment.
< pre class="java">@Servicepublic class ReceiverService {@RabbitListener(queues = {“queue3”}) public void receive(Student student) {System.out.println(“Receive the message and print it:”+student);} }
The test results are as follows:
The message is received and printed: student{id=1, name='mmp', gender='male', age=234}
You can also use methods to receive messages directly
@Testpublic void testReceive() {Student student = (Student) rabbitTemplate.receiveAndConvert("queue3"); System.out.println (student);}
The test results are as follows:
student{id=1, name='mmp', gender='male', age=234}
What if you want to send student objects in JSON format?
Need to customize:
@Configurationpublic class RabbitMqConfig {@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter (new Jackson2JsonMessageConverter()); return rabbitTemplate;} @Bean public RabbitAdmin rabbitAdmin(RabbitTemplate rabbitTemplate) {return new RabbitAdmin(rabbitTemplate); }}
Test it:
Source code address: https://github .com/lingEric/springboot-integration-hello
For more official tutorials, please visit https://github.com/rabbitmq/rabbitmq-tutorials
p>