为什么要使用消息队列
1.消息队列可以系统解耦
2.消息队列的异步调用
3.流量削峰
RabbitMQ的特点
1.开源、跨语言
2.Erlang语言编写、性能高
3.应用广泛
4.社区活跃、API丰富
AMQP协议(MQ使用AMQP协议)
Advanced Message Queuing Protocol

RabbitMQ核心概念

使用RabbitMQ简单的发送和接收消息
1.引入依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-nop</artifactId> <version>1.7.36</version> </dependency>
|
2.建立发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package helloworld;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 hello World 的发送类,链接rabbitMQ服务端,然后发送一条消息,然后退出 * */ public class Send { private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //发布消息 String message = "Hello World"; channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); //关闭连接 channel.close(); connection.close(); } }
|
3.建立接收者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package helloworld;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述:接收消息,并打印,持续运行的 * */ public class Recv { private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //接收消息并消费 channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("收到消息:" + message); } });
} }
|
多个消费者平衡压力
当多个消费者在接收消息时,rabbitmq会为消费者平均发放消息,但是每个消费者的处理能力不同,有的
处理快有的处理慢,因此为了平衡压力我们要公平派遣,公平派遣是指rabbitmq会根据消费者的压力决定
是否把消息给消费者,为了实现公平派遣我们要在消费者中实现消息确认机制。
1 2 3 4 5 6
| 1.设置消费者希望接收的消息数量 channel.basicQos(i); 2.关闭自动确认,把autoAck设置为false channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel)){} 3.手动确认消息 channel.basicAck(envelope.getDeliveryTag(),false);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| /* * 描述: 任务有所耗时,多个任务 * */ public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); for (int i = 0; i < 10; i++) { String message; if (i % 2 == 0){ message = i + "..."; }else { message = String.valueOf(i); } channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8")); System.out.println("发送了消息:" + message); } channel.close(); connection.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package workqueues;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 消费者:接收批量的消息 * */ public class Worker { private final static String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); //获得信道 Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("开始接收消息"); //消费者希望处理消息的数量 channel.basicQos(1); //关闭自动确认,把autoAck设置为false channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("收到了消息:"+message); try { doWork(message); } finally { System.out.println("消息处理完成"); //手动确认消息 channel.basicAck(envelope.getDeliveryTag(),false); } } }); } private static void doWork(String task){ char[] chars = task.toCharArray(); for (char ch : chars) { if (ch == '.'){ try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
}
|
rabbitMQ交换机的工作模式
fanout
广播模式,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的

生产者发送消息到交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package fanout;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 : 发送日志信息 * */ public class EmitLog { private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机(交换机的类型是广播形式),BuiltinExchangeType.FANOUT交换机类型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "info: Hello World!"; //s1参数指路由键 channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("发送了消息:" + message); channel.close(); connection.close(); } }
|
消费者设置队列与交换机的连接,并消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package fanout;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 :接收日志消息 * */ public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机(交换机的类型是广播形式) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); //创建一个临时队列 String queueName = channel.queueDeclare().getQueue(); //交换机和队列的绑定 channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("收到消息 :" + s); } }; channel.basicConsume(queueName,true,consumer); } }
|
direct
直接模式,根据RoutingKey匹配消息路由到指定的队列

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package direct;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 : direct类型的交换机,发送消息 * */ public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes()); System.out.println("发送了消息:" + "等级为info,消息内容:" + message); channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes()); System.out.println("发送了消息:" + "等级为warning,消息内容:" + message); channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes()); System.out.println("发送了消息:" + "等级为error,消息内容:" + message); channel.close(); connection.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package direct;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 :接收三种类型的等级日志 * */ public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机(交换机的类型是广播形式) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建一个随机临时队列 String queueName = channel.queueDeclare().getQueue(); //交换机和队列的绑定(一个交换机同时绑定三个) channel.queueBind(queueName,EXCHANGE_NAME,"info"); channel.queueBind(queueName,EXCHANGE_NAME,"warning"); channel.queueBind(queueName,EXCHANGE_NAME,"error"); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("收到消息 :" + s); } }; channel.basicConsume(queueName,true,consumer); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package direct;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 :接收1个类型的等级日志 * */ public class ReceiveLogsDirect1 { private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机(交换机的类型是广播形式) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //创建一个随机临时队列 String queueName = channel.queueDeclare().getQueue(); //交换机和队列的绑定(一个交换机只绑定一个queue) channel.queueBind(queueName,EXCHANGE_NAME,"error"); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("收到消息 :" + s); } }; channel.basicConsume(queueName,true,consumer); }
}
|
topic
生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配的方式进行相应的转发
模糊匹配:
*可以代替一个单词
#可以替代零个或者多个单词

speed.colour.species
Q1对所有的橙色动物感兴趣
Q2关心兔子的一切,以及有关懒惰的动物
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package topic;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 : Topic类型的交换机,发送消息 * */ public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机(交换机的类型是topic形式) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String message = "Animal World!";
String[] routingKeys = new String[9]; routingKeys[0] = "quick.orange.rabbit"; routingKeys[1] = "lazy.orange.elephant"; routingKeys[2] = "quick.orange.fox"; routingKeys[3] = "lazy.brown.fox"; routingKeys[4] = "lazy.pink.rabbit"; routingKeys[5] = "quick.brown.fox"; routingKeys[6] = "orange"; routingKeys[7] = "quick.orange.male.rabbit"; routingKeys[8] = "lazy.orange.male.rabbit";
for (int i = 0; i < routingKeys.length; i++) { channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8")); System.out.println("发送了:" + message + " routingKey:" + routingKeys[i]); }
channel.close(); connection.close(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package topic;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 : 特定路由键 * */ public class ReceiveLogsTopic1 { private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //创建一个随机临时队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "*.orange.*"; channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("收到消息 :" + s + "routingKey:" + envelope.getRoutingKey()); } }; channel.basicConsume(queueName,true,consumer); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package topic;
import com.rabbitmq.client.*;
import java.io.IOException; import java.util.concurrent.TimeoutException;
/* * 描述 : 特定路由键 * */ public class ReceiveLogsTopic2 { private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("ip"); factory.setUsername("silkage"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //定义一个交换机(交换机的类型是广播形式) channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //创建一个随机临时队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "*.*.rabbit"; channel.queueBind(queueName,EXCHANGE_NAME,routingKey); String routingKey1 = "lazy.#"; channel.queueBind(queueName,EXCHANGE_NAME,routingKey1); System.out.println("开始接收消息"); Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body, "UTF-8"); System.out.println("收到消息 :" + s + "routingKey:" + envelope.getRoutingKey()); } }; channel.basicConsume(queueName,true,consumer); } }
|
根据发送消息内容中的headers属性来匹配
SpringBoot整合RabbitMQ
消息生产者
1.引入RabbitMQ的依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.4.1</version> </dependency>
|
2.设置与rabbitmq连接的配置文件
1 2 3 4 5 6 7 8 9
| server.port=8080 spring.application.name=producer
spring.rabbitmq.addresses=ip:5672
spring.rabbitmq.username=silkage spring.rabbitmq.password=123456 spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000
|
3.建立rabbitMQ的配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.slkage.springbootrabbitmqproducter;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
/* * 描述 : rabbitmq配置类 * */ @Configuration public class TopicRabbitConfig { @Bean public Queue queue1(){ return new Queue("queue1"); } @Bean public Queue queue2(){ return new Queue("queue2"); } @Bean TopicExchange exchange(){ return new TopicExchange("bootExchange"); } @Bean Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange){ return BindingBuilder.bind(queue1).to(exchange).with("dog.red"); } @Bean Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange){ return BindingBuilder.bind(queue2).to(exchange).with("dog.#"); } }
|
4.生产者发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.slkage.springbootrabbitmqproducter;
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
/* * 描述 : 发送消息 * */ @Component public class MsgSender { @Autowired private AmqpTemplate rabbitmqTemplate;
public void send1(){ String message = "This is message 1,routing key is dog.red"; System.out.println("发送了:"+message); this.rabbitmqTemplate.convertAndSend("bootExchange","dog.red",message); } public void send2(){ String message = "This is message 2,routing key is dog.black"; System.out.println("发送了:"+message); this.rabbitmqTemplate.convertAndSend("bootExchange","dog.black",message); } }
|
消息消费者
1.引入依赖
2.设置与rabbitmq连接的配置文件
3.消费者消费消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.silkage.springbootrabbitmqconsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
/* *描述 :消费者1 * */ @Component @RabbitListener(queues = "queue1") public class Receiver1 { @RabbitHandler public void process(String message){ System.out.println("Receiver1:" + message); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package com.silkage.springbootrabbitmqconsumer;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
/* *描述 :消费者2 * */ @Component @RabbitListener(queues = "queue2") public class Receiver2 { @RabbitHandler public void process(String message){ System.out.println("Receiver2 :" + message); } }
|