RabbitMQ基础

RabbitMQ

工作原理

1、客户端连接到消息队列服务器,打开一个channel1、客户端连接到消息队列服务器,打开一个\text channel。

2、客户端声明一个exchange,并设置相关属性。2、客户端声明一个\text exchange,并设置相关属性。

3、客户端声明一个queue,并设置相关属性。3、客户端声明一个\text queue,并设置相关属性。

4、客户端使用routingkey,在exchange和queue之间建立好绑定关系。4、客户端使用\text routing key,在\text exchange和\text queue之间建立好绑定关系。

5、客户端投递消息到exchange5、客户端投递消息到\text exchange。

6、exchange接收到消息后,就根据消息的key和已经设由binding,进行消息路里,将消息投递到6、\text exchange接收到消息后,就根据消息的\text key和已经设由\text binding,进行消息路里,将消息投递到

一个或多个队列里一个或多个队列里

工作模式

helloword

helloworld

ProducerProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static final String QUEUE_NAME = "ykexc_queue";
public static void main(String[] args) throws IOException, TimeoutException {

// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http协议
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 声明队列
// String queue, 队列名
// boolean durable, 持久化
// boolean exclusive, 排他的
// boolean autoDelete, 自动删除
// Map<String, Object> arguments 属性
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发消息
String msg = "hello world";
// String exchange, 交换机 ""表示默认交换机
// String routingKey, 路由键 也就是queue的名称
// AMQP.BasicProperties props, 属性
// byte[] body 消息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 关闭连接
channel.close();
connection.close();
}

ConsumerConsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static final String QUEUE_NAME = "ykexc_queue";
public static void main(String[] args) throws Exception{
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http协议
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);


// 监听队列
Consumer consumer = new DefaultConsumer(channel){
// String consumerTag 消费者标签
// Envelope envelop 信封, 保存许多信息
// AMQP.BasicProperties properties 属性
// byte[] body 消息字节数组
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("envelope = " + envelope);
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}

workqueues

workqueues

代码与helloworld类似,一个生产者生产,多个消费者消费代码与helloworld类似,一个生产者生产,多个消费者消费

Publish/Subscribe

publish/subscribe

订阅模型中,多了一个exchange角色,而且过程略有变化;订阅模型中,多了一个exchange角色,而且过程略有变化;

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)

C:消费者,消息的接受者,会一直等待消息到来。C:消费者,消息的接受者,会一直等待消息到来。

Queue:消息队列,接收消息、缓存消息。Queue:消息队列,接收消息、缓存消息。

Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,

例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange

的类型。Exchange有常见以下3种类型:的类型。Exchange有常见以下3种类型:

Fanout:广播,将消息交给所有绑定到交换机的队列Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routingkey的队列Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routingpattern(路由模式)的队列Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

producerproducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.test.publishsubscribe;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @author mqz
*/
public class Main {
public static final String QUEUE_NAME_2 = "publish_and_subscribe_queue1";

public static final String QUEUE_NAME_3 = "publish_and_subscribe_queue2";

public static final String EXCHANGE_NAME = "publish_and_subscribe_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, true, null);

// 绑定交换机
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "", null);
channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, "", null);

String message = "hello";

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
connection.close();
}
}

consumerconsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.test.publishsubscribe;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author mqz
*/
public class Main {
public static final String QUEUE_NAME_2 = "publish_and_subscribe_queue1";

public static final String QUEUE_NAME_3 = "publish_and_subscribe_queue2";

public static final String EXCHANGE_NAME = "publish_and_subscribe_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, true, null);
// 监听队列
Consumer consumer = new DefaultConsumer(channel) {
// String consumerTag 消费者标签
// Envelope envelop 信封, 保存许多信息
// AMQP.BasicProperties properties 属性
// byte[] body 消息字节数组
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("envelope = " + envelope);
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME_2, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.test.publishsubscribe;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author mqz
*/
public class Main2 {
public static final String QUEUE_NAME_2 = "publish_and_subscribe_queue1";

public static final String QUEUE_NAME_3 = "publish_and_subscribe_queue2";

public static final String EXCHANGE_NAME = "publish_and_subscribe_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, true, true, null);
// 监听队列
Consumer consumer = new DefaultConsumer(channel) {
// String consumerTag 消费者标签
// Envelope envelop 信封, 保存许多信息
// AMQP.BasicProperties properties 属性
// byte[] body 消息字节数组
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag = " + consumerTag);
System.out.println("envelope = " + envelope);
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME_3, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}
}

Routing

routing

publishsubscribe不同的是,exchange不一样,并且给每个队列绑定了routing_key与publish-subscribe不同的是,exchange不一样,并且给每个队列绑定了routing\_key

producerproducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.test.routing;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @author mqz
*/
public class Main {
public static final String QUEUE_NAME_2 = "routing_queue1";

public static final String QUEUE_NAME_3 = "routing_queue2";

public static final String EXCHANGE_NAME = "routing_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, true, null);

// 绑定交换机
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "warning");
String message = "hello";

channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes());
channel.close();
connection.close();
}
}

consumerconsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.test.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author mqz
*/
public class Main {
public static final String QUEUE_NAME_2 = "routing_queue1";

public static final String QUEUE_NAME_3 = "routing_queue2";

public static final String EXCHANGE_NAME = "routing_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, true, null);
// 监听队列
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME_2, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.test.routing;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author mqz
*/
public class Main2 {
public static final String QUEUE_NAME_2 = "routing_queue1";

public static final String QUEUE_NAME_3 = "routing_queue2";

public static final String EXCHANGE_NAME = "routing_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, true, null);
// 监听队列
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME_3, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}
}

Topics

topics

当存在多个绑定关系时,主题模式就会很方便,主题模式和routing模式的区别就是exchange类型不一样。当存在多个绑定关系时,主题模式就会很方便,主题模式和routing模式的区别就是exchange类型不一样。

producerproducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.test.topics;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* @author mqz
*/
public class Main {
public static final String QUEUE_NAME_2 = "topic_queue1";

public static final String QUEUE_NAME_3 = "topic_queue2";

public static final String EXCHANGE_NAME = "topic_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, true, null);

// 绑定交换机
channel.queueBind(QUEUE_NAME_2, EXCHANGE_NAME, "*.work");
channel.queueBind(QUEUE_NAME_3, EXCHANGE_NAME, "ykexc.*");
String message = "hello ykexc-work";
String message2 = "hello xyx-work";
String message3 = "hello ykexc-xyx";
channel.basicPublish(EXCHANGE_NAME, "ykexc.work", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "xyx.work", null, message2.getBytes());
channel.basicPublish(EXCHANGE_NAME, "ykexc.xyx", null, message3.getBytes());
channel.close();
connection.close();
}
}

consumerconsumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//consumer1
package com.test.topics;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author mqz
*/
public class Main {
public static final String QUEUE_NAME_2 = "topic_queue1";

public static final String QUEUE_NAME_3 = "topic_queue2";

public static final String EXCHANGE_NAME = "topic_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, true, null);
// 监听队列
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME_2, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
//consumer2
package com.test.topics;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
* @author mqz
*/
public class Main2 {
public static final String QUEUE_NAME_2 = "topic_queue1";

public static final String QUEUE_NAME_3 = "topic_queue2";

public static final String EXCHANGE_NAME = "topic_exchange";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); //不是http
connectionFactory.setVirtualHost("/"); // 设置虚拟主机
connectionFactory.setUsername("huya");
connectionFactory.setPassword("huya");
// 创建长连接
Connection connection = connectionFactory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 防止生产者还未启动队列
channel.queueDeclare(QUEUE_NAME_2, true, false, false, null);
channel.queueDeclare(QUEUE_NAME_3, true, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, true, true, null);
// 监听队列
Consumer consumer = new DefaultConsumer(channel) {
/**
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message" + new String(body));
}
};
// String queue 监听队列名称
// boolean autoAck 是否自动应答
// Consumer callback //怎样处理这些消息
channel.basicConsume(QUEUE_NAME_3, true, consumer);

// channel.close(); consumer需要一直保持接收状态, 所以不能断开连接
// connection.close();
}
}

RabbitMQ基础
http://example.com/2023/03/07/RabbitMQ基础/
作者
ykexc
发布于
2023年3月7日
许可协议