抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

为什么要使用消息队列

1.消息队列可以系统解耦
2.消息队列的异步调用
3.流量削峰

RabbitMQ的特点

1.开源、跨语言
2.Erlang语言编写、性能高
3.应用广泛
4.社区活跃、API丰富

AMQP协议(MQ使用AMQP协议)

Advanced Message Queuing Protocol
avatar

RabbitMQ核心概念

avatar

使用RabbitMQ简单的发送和接收消息

1.引入依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.36</version>
</dependency>

2.建立发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 hello World 的发送类,链接rabbitMQ服务端,然后发送一条消息,然后退出
* */
public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发布消息
String message = "Hello World";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("发送了消息:" + message);
//关闭连接
channel.close();
connection.close();
}
}

3.建立接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述:接收消息,并打印,持续运行的
* */
public class Recv {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//接收消息并消费
channel.basicConsume(QUEUE_NAME,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
}
});


}
}

多个消费者平衡压力

当多个消费者在接收消息时,rabbitmq会为消费者平均发放消息,但是每个消费者的处理能力不同,有的
处理快有的处理慢,因此为了平衡压力我们要公平派遣,公平派遣是指rabbitmq会根据消费者的压力决定
是否把消息给消费者,为了实现公平派遣我们要在消费者中实现消息确认机制。

1
2
3
4
5
6
1.设置消费者希望接收的消息数量
channel.basicQos(i);
2.关闭自动确认,把autoAck设置为false
channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel)){}
3.手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* 描述: 任务有所耗时,多个任务
* */
public class NewTask {
private final static String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
String message;
if (i % 2 == 0){
message = i + "...";
}else {
message = String.valueOf(i);
}
channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("发送了消息:" + message);
}
channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 消费者:接收批量的消息
* */
public class Worker {
private final static String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("开始接收消息");
//消费者希望处理消息的数量
channel.basicQos(1);
//关闭自动确认,把autoAck设置为false
channel.basicConsume(TASK_QUEUE_NAME,false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("收到了消息:"+message);
try {
doWork(message);
} finally {
System.out.println("消息处理完成");
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
});
}
private static void doWork(String task){
char[] chars = task.toCharArray();
for (char ch : chars) {
if (ch == '.'){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

}

rabbitMQ交换机的工作模式

fanout

广播模式,这种模式只需要将队列绑定到交换机上即可,是不需要设置路由键的
avatar
生产者发送消息到交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 : 发送日志信息
* */
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机(交换机的类型是广播形式),BuiltinExchangeType.FANOUT交换机类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "info: Hello World!";
//s1参数指路由键
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
System.out.println("发送了消息:" + message);
channel.close();
connection.close();
}
}

消费者设置队列与交换机的连接,并消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 :接收日志消息
* */
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机(交换机的类型是广播形式)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//创建一个临时队列
String queueName = channel.queueDeclare().getQueue();
//交换机和队列的绑定
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到消息 :" + s);
}
};
channel.basicConsume(queueName,true,consumer);
}
}

direct

直接模式,根据RoutingKey匹配消息路由到指定的队列
avatar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package direct;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 : direct类型的交换机,发送消息
* */
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes());
System.out.println("发送了消息:" + "等级为info,消息内容:" + message);
channel.basicPublish(EXCHANGE_NAME,"warning",null,message.getBytes());
System.out.println("发送了消息:" + "等级为warning,消息内容:" + message);
channel.basicPublish(EXCHANGE_NAME,"error",null,message.getBytes());
System.out.println("发送了消息:" + "等级为error,消息内容:" + message);
channel.close();
connection.close();
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 :接收三种类型的等级日志
* */
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机(交换机的类型是广播形式)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建一个随机临时队列
String queueName = channel.queueDeclare().getQueue();
//交换机和队列的绑定(一个交换机同时绑定三个)
channel.queueBind(queueName,EXCHANGE_NAME,"info");
channel.queueBind(queueName,EXCHANGE_NAME,"warning");
channel.queueBind(queueName,EXCHANGE_NAME,"error");
System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到消息 :" + s);
}
};
channel.basicConsume(queueName,true,consumer);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package direct;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 :接收1个类型的等级日志
* */
public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机(交换机的类型是广播形式)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建一个随机临时队列
String queueName = channel.queueDeclare().getQueue();
//交换机和队列的绑定(一个交换机只绑定一个queue)
channel.queueBind(queueName,EXCHANGE_NAME,"error");
System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到消息 :" + s);
}
};
channel.basicConsume(queueName,true,consumer);
}

}

topic

生产者指定RoutingKey消息根据消费端指定的队列通过模糊匹配的方式进行相应的转发
模糊匹配:
*可以代替一个单词
#可以替代零个或者多个单词
avatar
speed.colour.species
Q1对所有的橙色动物感兴趣
Q2关心兔子的一切,以及有关懒惰的动物
示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package topic;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 : Topic类型的交换机,发送消息
* */
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机(交换机的类型是topic形式)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "Animal World!";

String[] routingKeys = new String[9];
routingKeys[0] = "quick.orange.rabbit";
routingKeys[1] = "lazy.orange.elephant";
routingKeys[2] = "quick.orange.fox";
routingKeys[3] = "lazy.brown.fox";
routingKeys[4] = "lazy.pink.rabbit";
routingKeys[5] = "quick.brown.fox";
routingKeys[6] = "orange";
routingKeys[7] = "quick.orange.male.rabbit";
routingKeys[8] = "lazy.orange.male.rabbit";

for (int i = 0; i < routingKeys.length; i++) {
channel.basicPublish(EXCHANGE_NAME,routingKeys[i],null,message.getBytes("UTF-8"));
System.out.println("发送了:" + message + " routingKey:" + routingKeys[i]);
}

channel.close();
connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 : 特定路由键
* */
public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//创建一个随机临时队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "*.orange.*";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);

System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到消息 :" + s + "routingKey:" + envelope.getRoutingKey());
}
};
channel.basicConsume(queueName,true,consumer);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
* 描述 : 特定路由键
* */
public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip");
factory.setUsername("silkage");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//定义一个交换机(交换机的类型是广播形式)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//创建一个随机临时队列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "*.*.rabbit";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
String routingKey1 = "lazy.#";
channel.queueBind(queueName,EXCHANGE_NAME,routingKey1);
System.out.println("开始接收消息");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body, "UTF-8");
System.out.println("收到消息 :" + s + "routingKey:" + envelope.getRoutingKey());
}
};
channel.basicConsume(queueName,true,consumer);
}
}

headers

根据发送消息内容中的headers属性来匹配

SpringBoot整合RabbitMQ

消息生产者

1.引入RabbitMQ的依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.1</version>
</dependency>

2.设置与rabbitmq连接的配置文件

1
2
3
4
5
6
7
8
9
server.port=8080
spring.application.name=producer

spring.rabbitmq.addresses=ip:5672

spring.rabbitmq.username=silkage
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

3.建立rabbitMQ的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.slkage.springbootrabbitmqproducter;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;



/*
* 描述 : rabbitmq配置类
* */
@Configuration
public class TopicRabbitConfig {
@Bean
public Queue queue1(){
return new Queue("queue1");
}
@Bean
public Queue queue2(){
return new Queue("queue2");
}
@Bean
TopicExchange exchange(){
return new TopicExchange("bootExchange");
}
@Bean
Binding bingdingExchangeMessage1(Queue queue1, TopicExchange exchange){
return BindingBuilder.bind(queue1).to(exchange).with("dog.red");
}
@Bean
Binding bingdingExchangeMessage2(Queue queue2, TopicExchange exchange){
return BindingBuilder.bind(queue2).to(exchange).with("dog.#");
}
}

4.生产者发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.slkage.springbootrabbitmqproducter;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/*
* 描述 : 发送消息
* */
@Component
public class MsgSender {
@Autowired
private AmqpTemplate rabbitmqTemplate;

public void send1(){
String message = "This is message 1,routing key is dog.red";
System.out.println("发送了:"+message);
this.rabbitmqTemplate.convertAndSend("bootExchange","dog.red",message);
}
public void send2(){
String message = "This is message 2,routing key is dog.black";
System.out.println("发送了:"+message);
this.rabbitmqTemplate.convertAndSend("bootExchange","dog.black",message);
}
}

消息消费者

1.引入依赖
2.设置与rabbitmq连接的配置文件
3.消费者消费消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.silkage.springbootrabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
*描述 :消费者1
* */
@Component
@RabbitListener(queues = "queue1")
public class Receiver1 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver1:" + message);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package com.silkage.springbootrabbitmqconsumer;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/*
*描述 :消费者2
* */
@Component
@RabbitListener(queues = "queue2")
public class Receiver2 {
@RabbitHandler
public void process(String message){
System.out.println("Receiver2 :" + message);
}
}