signed

QiShunwang

“诚信为本、客户至上”

消息队列(针对RabbitMQ)

2021/1/28 11:45:25   来源:

一 MQ基础

主要针对RabbitMq,基础概念所有的mq 都类似

1.基础概念

​ MQ 全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。

简单架构模型如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bXm6fwzm-1611805091728)(image在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
\image-20210127115436373.png)]

2.MQ 优劣势

优势:
1.系统解耦:
	如果系统的耦合度过高,不利于 高可用的原则。通过mq解耦后,可以提升我们系统的容错性。
2.异步提速:
	对应原来的分布式系统来说,模块和模块之间是通过RPC或者http接口的方式调用的。这种服务调用的方式相对mq来讲,会比较耗时。假设有A、B两个系统.A系统的业务逻辑可能需要5S,B系统的逻辑可能需要5S,理想状态下,那么可能客户需要等待10S后,才可能收到前端页面的反馈。但是用了mq后,可能需要的就是前端模块请求A系统耗时5S后,将消息扔到消息队列里面,后续的执行步骤通过消息队列异步执行,首先增高了系统的让容错率,降低了系统的耦合度,并且客户可能在5S后就收到前端反馈的结果
	但是同时可能会降低系统的健壮性。如果系统过分依赖mq的话,如果出现mq集群全部宕机,可能会造成服务全线瘫痪。这一点系统架构设计也要考虑清楚。
3.削峰填谷:
	这一点主要针对一些电商的秒杀业务,可能会在同一瞬间,有超高的并发请求打进我们的系统,可能对系统性能造成影响。严重的话,可能会造成服务器的cpu过高,产生警告。
	使用mq后,可以设定消费的阈值。高峰期如果请求量超过阈值的时候,高峰期产生的数据会被挤压在消息队列里面.这样一来,高峰期的数据就被削掉了。等到高峰期一过后,消息的消费速度还是会保持阈值的速度消费,一直到全部消息被消费完成后,这就就做消息的填谷。
	总结一点,使用mq后,会增加web系统的稳定性。
劣势:
	1.系统可用性降低
		系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。
	2.系统复杂度提高
		MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。

二 市场常见MQ 产品

1 市场常见MQ 产品

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9ehYG00d-1611805091729)(image\image-20210127115419088.png)]

三 MQ协议

​ AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

​ 2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

1 基础架构图如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XaHUqB6B-1611805091731)(image\image-20210127123217673.png)]

四 MQ应用概念 (RabbitMq 举例)

1.各个模块的简介:

  1. Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
  2. Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
  3. Connection:publisher/consumer 和 broker 之间的 TCP 连接
  4. Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  5. Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  6. Queue:消息最终被送到这里等待 consumer 取走
  7. Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

2.MQ 的工作模式

	RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
官网对应模式介绍: https://www.rabbitmq.com/getstarted.html

​ [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1TqbCCe8-1611805091733)(image\image-20210127124654034.png)]

五 RabbitMq的安装和使用

1.rabbitmq 的安装

RabbitMQ 官方地址:http://www.rabbitmq.com/
网盘地址: https://pan.baidu.com/s/1jNLwwNJtV15GQGQI13jgqQ  5ip4    如果可以连外网无需下载这些软件,可以直接从外网下载
只针对linux服务器

安装环境以Centos7为主

rabbit mq 安装问题:

​ https://blog.csdn.net/wanghuawei19930812/article/details/103431777

1.激活网卡

cd /etc/sysconfig/network-scripts
vi ifcfg-eno16777736
修改ONBOOT 为yes
然后
service network restart

2.安装软件包

安装erlang
rpm -Uvh http://www.rabbitmq.com/releases/erlang/erlang-18.1-1.el7.centos.x86_64.rpm
安装RabbitMQ-Server
rpm -Uvh http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-3.5.6-1.noarch.rpm 

3.关闭SELINUX (可先执行第四步,如果出现相同报错回头执行第三步)

​ 因为我本地启动mq报错,所以我把这个工具关闭了

​ 问题:

【ERROR: distribution port 25672 in use on localhost (by non-Erlang process?)】
解决步骤: https://jingyan.baidu.com/article/6d704a13409b4128da51ca4e.html
	1.getenforce 查看状态是否为Enforcing。 如果是Enforcing 表示SELINUX 强制启用 
	2.vi /etc/selinux/config  修改SELINUX 的值为 disabled
	3.reboot
	4.查看getenforce 状态是否为 disabled

4.启动rabbitmq

service rabbitmq-server start
service rabbitmq-server restart
service rabbitmq-server stop
rabbitmqctl status  # 查看状态

5.安装维护插件

rabbitmq-plugins enable rabbitmq_management

6.启动成功

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-liStGib3-1611805091734)(image\image-20210127151521952.png)]

7.rabbitmq创建账号 赋权

(1) 新增一个用户
rabbitmqctl  add_user  Username  Password
(2) 删除一个用户
rabbitmqctl  delete_user  Username
(3) 修改用户的密码
rabbitmqctl  change_password  Username  Newpassword
(4) 查看当前用户列表
rabbitmqctl  list_users
// User为用户名, Tag为角色名(对应于上面的administrator,monitoring,policymaker,management,或其他自定义名称)。
//超级管理员(administrator) 监控者(monitoring) 策略制定者(policymaker)  普通管理者(management) 其他   Tag
rabbitmqctl  set_user_tags  User  Tag

(1) 设置用户权限
rabbitmqctl  set_permissions  -p  VHostPath  User  ConfP  WriteP  ReadP
(2) 查看(指定hostpath)所有用户的权限信息
rabbitmqctl  list_permissions  [-p  VHostPath]
(3) 查看指定用户的权限信息
rabbitmqctl  list_user_permissions  User
(4)  清除用户的权限信息
rabbitmqctl  clear_permissions  [-p VHostPath]  User

8.本地访问需要开放端口,rabbitmq页面的端口为 15672 应用的端口为5672

firewall-cmd --add-port=15672/tcp --permanent
firewall-cmd --reload

firewall-cmd --add-port=5672/tcp --permanent
firewall-cmd --reload

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JE3QrTvj-1611805091735)(image\image-20210127152154866.png)]

2.原生rabbitmq的使用

(1) 原生api调用 需要创建 Virtual host 和 对应用户
//1.创建mq的连接util 类
    public class RabbitUtil {
	private static ConnectionFactory connectionFactory = new ConnectionFactory ();
	static {
		connectionFactory.setHost ("192.168.232.146");
		connectionFactory.setPort (5672);
		connectionFactory.setUsername ("test");
		connectionFactory.setPassword ("123456");
		connectionFactory.setVirtualHost ("virtual");
	}

	public static Connection getConnection() {
		Connection connection = null;
		try {
			connection = connectionFactory.newConnection();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (TimeoutException e) {
			e.printStackTrace();
		}
		return connection;
	}
}

//2.测试
public class TestDemo {
	@Test
	public void test(){
		Connection connection = RabbitUtil.getConnection ();
		System.out.println (connection);
	}
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3n5BSe9s-1611805091735)(image\image-20210127155356791.png)]

2.hello world 模式 一对一模式

​ 生产者–> message Queue --> 消费者

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AVjMd1GT-1611805091736)(image\image-20210127160632764.png)]

在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息

实现代码:

//创建队列接口  页面创建 hello_world 队列
public interface QueueName {
   String HELLO_WORLD_QUEUE = "hello_world";
}

//生产者
public class RabbitMqProvider {
	public static void main (String[] args) throws Exception{
		//创建mq连接
		Connection connection = RabbitUtil.getConnection ();
		//创建连接通道
		Channel channel = connection.createChannel ();
		//创建队列,声明并创建一个队列,如果队列已存在,则使用这个队列
		//第一个参数:队列名称ID
		//第二个参数:是否持久化,false对应不持久化数据,MQ停掉数据就会丢失
		//第三个参数:是否队列私有化,false则代表所有消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用,其他消费者不让访问
		//第四个:是否自动删除,false代表连接停掉后不自动删除掉这个队列
		//其他额外的参数, null
		channel.queueDeclare(QueueName.HELLO_WORLD_QUEUE, true, false, false, null);

		for (int i = 0; i < 10; i++) {
			//四个参数
			//exchange 交换机,暂时用不到,在后面进行发布订阅时才会用到
			//队列名称
			//额外的设置属性
			//最后一个参数是要传递的消息字节数组
			channel.basicPublish ("", QueueName.HELLO_WORLD_QUEUE, null, ("hello rabbitmq" + i).getBytes ());
		}
		channel.close ();
		connection.close ();
		System.out.println ("数据发送成功");
	}
}

//消费者
public class RabbitMqCustomer {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		channel.queueDeclare(QueueName.HELLO_WORLD_QUEUE, true, false, false, null);
		Consumer consumer = new DefaultConsumer (channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String str = new String(body, "UTF-8");
				System.out.println(str);
				//envelope.getDeliveryTag() 获取这个消息的TagId
				//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QueueName.HELLO_WORLD_QUEUE, consumer);
	}
}
3.work queue 模式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7uD76NG8-1611805091736)(image\image-20210127164020726.png)]

Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。
//创建word_queue队列名称
public interface QueueName {
	String HELLO_WORLD_QUEUE = "hello_world";
	String WORD_QUEUE = "word_queue";
}

//提供者
public class RabbitProviderQueue {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		channel.queueDeclare (QueueName.WORD_QUEUE, false, false, false, null);
		for (int i = 0; i < 100; i++) {
			channel.basicPublish ("", QueueName.WORD_QUEUE, null, ("word queue" + i).getBytes ());
		}
		channel.close ();
		connection.close ();
		System.out.println ("数据发送成功");
	}
}

//消费者1
public class RabbitCustomerQueue1 {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		channel.basicQos (1);
		Consumer consumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String str = new String(body, "UTF-8");
				System.out.println(str);
				//envelope.getDeliveryTag() 获取这个消息的TagId
				//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QueueName.WORD_QUEUE, consumer);
	}
}
//消费者2
public class RabbitCutomerQueue2 {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		channel.basicQos (1);
		Consumer consumer = new DefaultConsumer (channel){
			@Override
			public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String str = new String(body, "UTF-8");
				System.out.println(str);
				//envelope.getDeliveryTag() 获取这个消息的TagId
				//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicConsume(QueueName.WORD_QUEUE, consumer);
	}
}
4.发布订阅模式

​ [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2OPvpdNv-1611805091737)(image\image-20210127165619484.png)]

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
public interface QueueName {
	String HELLO_WORLD_QUEUE = "hello_world";
	String WORD_QUEUE = "word_queue";
	String PUB_SUB_EXCHANGE = "pub_sub_exchange";
	String PUB_SUB_QUEUE = "pub_sub_queue";
}

//提供者
public class PubSubProvider {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		String next = new Scanner (System.in).next ();
		//第一个参数 交换机名字
		//第二个参数 路由key  目前用不到
		//第三个参数 格外属性
		//第四个参数 消息
		channel.basicPublish (QueueName.PUB_SUB_EXCHANGE, "", null, next.getBytes ());
		channel.close ();
		connection.close ();
	}
}

//消费者
public class PubSubConsumer {
	public static void main(String[] args) throws IOException {
		Connection connection = RabbitUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QueueName.PUB_SUB_QUEUE, true, false, false, null);
		channel.queueBind(QueueName.PUB_SUB_QUEUE, QueueName.PUB_SUB_EXCHANGE, "");
		channel.basicQos(1);
		DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String string = new String(body, "UTF-8");
				channel.basicAck(envelope.getDeliveryTag(), false);
				System.out.println(string);
			}
		};
		channel.basicConsume(QueueName.PUB_SUB_QUEUE, defaultConsumer);
	}
}
5.路由模式
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
用法与发布订阅一致,不过加了路由绑定的key
public interface QueueName {
	String HELLO_WORLD_QUEUE = "hello_world";
	String WORD_QUEUE = "word_queue";
	String PUB_SUB_EXCHANGE = "pub_sub_exchange";
	String PUB_SUB_QUEUE = "pub_sub_queue";
	String ROUTE_BIND_EXCHANGE = "route_bind_exchange";
	String ROUTE_BIND_QUEUE = "route_bind_queue";
}

//提供者
public class RouteBindProvider {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		String next = new Scanner (System.in).next ();
		//第一个参数 交换机名字
		//第二个参数 路由key  目前用不到
		//第三个参数 额外属性
		//第四个参数 消息
		channel.basicPublish (QueueName.ROUTE_BIND_EXCHANGE, "key3", null, next.getBytes ());
		channel.close ();
		connection.close ();
	}
}
//消费者
public class RouteBindCustomer {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QueueName.ROUTE_BIND_QUEUE, true, false, false, null);
		channel.queueBind(QueueName.ROUTE_BIND_QUEUE, QueueName.ROUTE_BIND_EXCHANGE, "key2");
		channel.basicQos(1);
		DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String string = new String(body, "UTF-8");
				channel.basicAck(envelope.getDeliveryTag(), false);
				System.out.println(string);
			}
		};
		channel.basicConsume(QueueName.ROUTE_BIND_QUEUE, defaultConsumer);
	}
}
6.Topic 模式
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert 
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B7Iuyxdi-1611805091737)(image\image-20210127172827263.png)]

图解:
红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
public interface QueueName {
	String HELLO_WORLD_QUEUE = "hello_world";
	String WORD_QUEUE = "word_queue";
	String PUB_SUB_EXCHANGE = "pub_sub_exchange";
	String PUB_SUB_QUEUE = "pub_sub_queue";
	String ROUTE_BIND_EXCHANGE = "route_bind_exchange";
	String ROUTE_BIND_QUEUE = "route_bind_queue";
	String TOPIC_EXCHANGE = "topic_exchange";
	String TOPIC_QUEUE = "topic_queue";
}

//提供者
public class TopicProvider {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		Scanner scanner = new Scanner (System.in);
		channel.basicPublish (QueueName.TOPIC_EXCHANGE, "topic.a.b", null, scanner.next ().getBytes ());
		channel.close ();
		connection.close ();
	}
}

//消费者1
public class TopicConsumer {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		channel.queueDeclare (QueueName.TOPIC_QUEUE, true, false, false, null);
		channel.queueBind (QueueName.TOPIC_QUEUE, QueueName.TOPIC_EXCHANGE, "topic.*");
		Consumer consumer = new DefaultConsumer (channel) {
			@Override
			public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String string = new String(body, "UTF-8");
				System.out.println(string);
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicQos (1);
		channel.basicConsume (QueueName.TOPIC_QUEUE, consumer);
	}
}
//消费者2
public class TopicConsumer1 {
	public static void main (String[] args) throws Exception{
		Connection connection = RabbitUtil.getConnection ();
		Channel channel = connection.createChannel ();
		channel.queueDeclare (QueueName.TOPIC_QUEUE, true, false, false, null);
		channel.queueBind (QueueName.TOPIC_QUEUE, QueueName.TOPIC_EXCHANGE, "topic.#");
		Consumer consumer = new DefaultConsumer (channel) {
			@Override
			public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
				String string = new String(body, "UTF-8");
				System.out.println(string);
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		channel.basicQos (1);
		channel.basicConsume (QueueName.TOPIC_QUEUE, consumer);
	}
}
总结rabbitmq 模式

1.简单模式 HelloWorld

一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)。

2.工作队列模式 Work Queue

一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)。

3.发布订阅模式 Publish/subscribe

需要设置类型为 fanout 的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

4.路由模式 Routing

需要设置类型为 direct 的交换机,交换机和队列进行绑定,并且指定 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

5.通配符模式 Topic

需要设置类型为 topic 的交换机,交换机和队列进行绑定,并且指定通配符方式的 routing key,当发送消息到交换机后,交换机会根据 routing key 将消息发送到对应的队列。

git代码地址: https://github.com/zxf-json/-rabbitmq.git

3.rabbitmq 和 springboot 整合

使用idea 创建一个新的工程,名字定义为 rabbitspringbootdemo

接下来导入rabbitmq的依赖

rabbitmq 依赖
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
    
使用idea 创建提供者工程 和 消费者工程
rabbitprovider
rabbitconsumer

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Z7BMxAOH-1611805091738)(image\image-20210128100215472.png)]

1.hello world 模式 其他模式就是在rabbitTemplate 提供者侧调用不同的api ,这边就不做赘述了
1.生产者:
//pom 文件
    <dependencies>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-devtools</artifactId>
    <scope>runtime</scope>
    <optional>true</optional>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    </dependencies>

//yml文件
  server:
  port: 8081
spring:
  application:
    name: springboot-rabbitmq
  rabbitmq:
    port: 5672
    host: 192.168.232.146
    username: test
    password: 123456
    virtual-host: virtual

//配置类:
@Configuration
public class WebConfiguration {
	@Bean
	public Queue getQueue(){
		return new Queue ("hello_world_springboot");
	}
}

//provider
@Component
public class RabbitMqProvider {
	@Autowired
	private RabbitTemplate rabbitTemplate;

	public void send (String msg) {
		rabbitTemplate.convertAndSend ("hello_world_springboot", msg);
	}
}

//测试类
@SpringBootTest(classes = RabbitproviderApplication.class)
@RunWith(SpringRunner.class)
public class TestProvider {
	@Autowired
	private RabbitMqProvider rabbitMqProvider;

	@Test
	public void send(){
		String msg = "sender " + LocalDate.now();
		rabbitMqProvider.send (msg);
	}
}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8vdhAfmk-1611805091738)(image\image-20210128104203538.png)]

2.消费者:
//pom文件
	<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
        <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>

//yml文件
server:
  port: 8082
spring:
  application:
    name: springboot-rabbitmq-consumer
  rabbitmq:
    port: 5672
    host: 192.168.232.146
    username: test
    password: 123456
    virtual-host: virtual
//配置文件
@Configuration
public class WebConfiguration {
	@Bean
	public Queue getQueue(){
		return new Queue ("hello_world_springboot");
	}
}
//消费者
@Component
public class ConsumerMessage {

	@RabbitListener(queues = {"hello_world_springboot"})
	@RabbitHandler
	public void process(String json, Channel channel, Message message) throws IOException {
		System.out.println("DirectReceiver消费者收到消息  : " + message.getBody ());
		System.out.println (json);
	}
}

六 RabbitMq 高级特性

RabbitMQ高级特性
消息可靠性投递
Consumer ACK
消费端限流
TTL
死信队列
延迟队列

1.消息的可靠性投递
在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
    confirm 确认模式
    return  退回模式
    
rabbitmq 整个消息投递的路径为:
producer--->rabbitmq broker--->exchange--->queue--->consumer
消息从 producer 到 exchange 则会返回一个 confirmCallback 。
消息从 exchange-->queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递

消息的可靠投递小结
设置ConnectionFactory的publisher-confirms="true" 开启 确认模式。

使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack,如果为true,则发送成功,如果为false,则发送失败,需要处理
。
设置ConnectionFactory的publisher-returns="true" 开启 退回模式。

使用rabbitTemplate.setReturnCallback设置退回函数,当消息从exchange路由到queue失败后,如果设置了
rabbitTemplate.setMandatory(true)参数,则会将消息退回给producer。并执行回调函数returnedMessage。

在RabbitMQ中也提供了事务机制,但是性能较差,此处不做讲解。
使用channel下列方法,完成事务控制:
txSelect(), 用于将当前channel设置成transaction模式
txCommit(),用于提交事务
txRollback(),用于回滚事务
2.Consumer Ack
ack指Acknowledge,确认。 表示消费端收到消息后的确认方式。
有三种确认方式:
自动确认:acknowledge="none"
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。如果设置了手动确认方式,则需要在业务处理成功后,调用channel.basicAck(),手动签收,如果出现异常,则调用channel.basicNack()方法,让其自动重新发送消息。

Consumer Ack 小结
在rabbit:listener-container标签中设置acknowledge属性,设置ack方式 none:自动确认,manual:手动确认
如果在消费端没有出现异常,则调用channel.basicAck(deliveryTag,false);方法确认签收消息
如果出现异常,则在catch中调用 basicNack或 basicReject,拒绝消息,让MQ重新发送消息。
3.消费端限流
在<rabbit:listener-container> 中配置 prefetch属性设置消费端一次拉取多少消息

消费端的确认模式一定为手动确认。acknowledge="manual"
4.TTL
TTL 全称 Time To Live(存活时间/过期时间)。
当消息到达存活时间后,还没有被消费,会被自动清除。
RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

TTL 小结
设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mrVm5xWI-1611805091739)(image\image-20210128112703707.png)]

5.死信队列
死信队列,英文缩写:DLX  。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

消息成为死信的三种情况:
1. 队列消息长度到达限制;
2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3. 原队列存在消息过期设置,消息到达超时时间未被消费;

队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

死信队列小节
1. 死信交换机和死信队列和普通的没有区别
2. 当消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
3. 消息成为死信的三种情况:
    (1). 队列消息长度到达限制;
    (2). 消费者拒接消费消息,并且不重回队列;
    (3). 原队列存在消息过期设置,消息到达超时时间未被消费;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5gbI3hpM-1611805091739)(image\image-20210128112816121.png)]

延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

需求:
1. 下单后,30分钟未支付,取消订单,回滚库存。
2. 新用户注册成功7天后,发送短信问候。

实现方式:
1. 定时器
2. 延迟队列

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-O2SURiNO-1611805091739)(image\image-20210128113024058.png)]

很可惜,在RabbitMQ中并未提供延迟队列功能。
但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vv1BRgrc-1611805091740)(image\image-20210128113057240.png)]

七 RabbitMq集群

	一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了,但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑,在生产环境上一般都会考虑使用RabbitMQ的集群方案。
1.1 集群方案的原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写,Erlang语言天生具备分布式特性(通过同步Erlang集群各节点的cookie来实现)。RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-65eA9r4Y-1611805091740)(image\image-20210128113237423.png)]

1.2 如下案例中使用多台云服务器进行集群搭建
主要参考官方文档:https://www.rabbitmq.com/clustering.html
首先确保RabbitMQ运行没有问题
1 #修改hostname
2 vim /etc/hostname
3 m1
4 m2
5 #修改hosts集群设备
6 vim /etc/hosts
7 192.168.132.137 m1
8 192.168.132.139 m2
9
10 #开放防火墙 4369/5672/15672/25672端口
11 firewall‐cmd ‐‐zone=public ‐‐add‐port=4369/tcp ‐‐permanent
12 firewall‐cmd ‐‐zone=public ‐‐add‐port=5672/tcp ‐‐permanent
13 firewall‐cmd ‐‐zone=public ‐‐add‐port=15672/tcp ‐‐permanent
14 firewall‐cmd ‐‐zone=public ‐‐add‐port=25672/tcp ‐‐permanent
15
16 #重载防火墙
17 firewall‐cmd ‐‐reload
18
19 #重启服务器
20 reboot
21
22 #同步.erlang.coolie
23 find / ‐name *.cookie
24
25 #将文件发送至指定ip的服务器中,发送的过程中需要指定另一台服务器的密码信息
26 scp /var/lib/rabbitmq/.erlang.cookie 192.168.132.134:/var/lib/rabbitmq/
27
28 #两个电脑启动MQ服务
29 rabbit‐server
30
31
32 # 停止应用 通过rabbitmqctl status 可以查看当前rabbitmactl默认操作的节点信息
33 rabbitmqctl stop_app
34
35 # 将当前节点加入到一个集群中 默认磁盘节点被加入的节点只要是集群中的一员,其他节
点都能够马上感受到集群节点的变化
36 rabbitmqctl join_cluster rabbit@m1
37
38 # 重新启动当前节点
39 rabbitmqctl start_app
40
41 #查看集群信息
42 rabbitmqctl cluster_status
1.5 负载均衡-HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是
免费、快速并且可靠的一种解决方案,包括Twitter,Reddit,StackOverflow,GitHub在内
的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型,此模型支
持非常大的并发连接数。
1.5.1 安装HAProxy
1 #安装
2 yum install haproxy
3
4
5 #检测安装是否成功
6 haproxy
7
8 #查找haproxy.cfg文件的位置
9 find / ‐name haproxy.cfg
10
11 #配置haproxy.cfg文件 具体参照 如下 1.5.2 配置HAProxy
12 vim /etc/haproxy/haproxy.cfg
13
14
15 #启动haproxy
16 haproxy ‐f /etc/haproxy/haproxy.cfg
17
18 #查看haproxy进程状态
19 systemctl status haproxy.service
20 #状态如下说明 已经启动成功 Active: active (running)
21
22 #访问如下地址对mq节点进行监控
23 http://47.114.175.29:1080/haproxy_stats
24
25 #代码中访问mq集群地址,则变为访问haproxy地址:5672
1.5.2 配置HAProxy
配置文件路径:/etc/haproxy/haproxy.cfg
1 #‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
2 # Example configuration for a possible web application. See the
3 # full configuration options online.
4 #
5 # http://haproxy.1wt.eu/download/1.4/doc/configuration.txt
6 #
7 #‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
8
9 #‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
10 # Global settings
11 #‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
12 global
13 # to have these messages end up in /var/log/haproxy.log you will
14 # need to:
15 #
16 # 1) configure syslog to accept network log events. This is done
17 # by adding the '‐r' option to the SYSLOGD_OPTIONS in
18 # /etc/sysconfig/syslog
19 #
20 # 2) configure local2 events to go to the /var/log/haproxy.log
21 # file. A line like the following can be added to
22 # /etc/sysconfig/syslog
23 #
24 # local2.* /var/log/haproxy.log
25 #
26 log 127.0.0.1 local2
27
28 chroot /var/lib/haproxy
29 pidfile /var/run/haproxy.pid
30 maxconn 4000
31 user haproxy
32 group haproxy
33 daemon
34
35 # turn on stats unix socket
36 stats socket /var/lib/haproxy/stats
37
38 #‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
39 # common defaults that all the 'listen' and 'backend' sections will
40 # use if not designated in their block
41 #‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐‐
42 defaults
43 mode http
44 log global
45 option httplog
46 option dontlognull
47 option http‐server‐close
48 option forwardfor except 127.0.0.0/8
49 option redispatch
50 retries 3
51 timeout http‐request 10s
52 timeout queue 1m
53 timeout connect 10s
54 timeout client 1m
55 timeout server 1m
56 timeout http‐keep‐alive 10s
57 timeout check 10s
58 maxconn 3000
59
60
61 #对MQ集群进行监听
62 listen rabbitmq_cluster
63 bind 0.0.0.0:5672
64 option tcplog
65 mode tcp
66 option clitcpka
67 timeout connect 1s
68 timeout client 10s
69 timeout server 10s
70 balance roundrobin
71 server node1 节点1 ip地址:5672 check inter 5s rise 2 fall 3
72 server node2 节点2 ip地址:5672 check inter 5s rise 2 fall 3
73
74 #开启haproxy监控服务
75 listen http_front
76 bind 0.0.0.0:1080
77 stats refresh 30s
78 stats uri /haproxy_stats
79 stats auth admin:admin

八 RabbitMq的常见面试题

1 如何保证mq的幂等性?
	首先解释一下什么叫mq幂等性和非等幂性。
	幂等性:消息提供者可能每天都会多次发送消息到mq,不论某个环节出现了什么情况,都要保证消息只被消费一次。
	非幂等性:提供者发送消息后,消费端给生产端返回ack标记是发生网络中断,导致生产者没有收到消费端完成消费的信息,导致生产者重新发送此条消息,导致消息重复消费。 或者是消费端完成消费后给mq发送ack时,出现网络中断,导致生产者没有收到消费已经被消费的结果,导致消息多次消费的问题。
	解决方案:
		1.mq内部会为每条消息生成一个全局唯一、与业务无关的消息id,当mq接收到消息时,会先根据该id判断消息是否重复发送,mq再决定是否接收该消息。
2.Rabbitmq的脑裂问题?
	RabbitMQ 集群的网络分区容错性并不高,在网络质量较差的环境中会比较容易出现问题,其中最明显的就是脑裂问题
	所谓的脑裂问题,就是在多机热备的高可用 HA 系统中,当两个节点心跳突然断开,就分裂为了两个独立的个体,由于互相失去联系,都认为对方出现了故障,因此都会去争抢对方的资源,争抢启动,由此就会发生严重的后果
    举个形象的例子,A 和 B 作为一个双机热备集群的两个节点,各自持有集群的一部分数据 -- a 和 b,这时,两机器之间突然无法通信,A 认为 B 已经挂掉,B 认为 A 已经宕机,于是会出现:
    如果 A 拥有 b 数据的备份,则 A 将以完整数据运行,B 也同样以完整数据运行,这将很可能导致两个节点同时读写共享数据造成数据损坏
    如果 A、B 各自仅拥有 a、b 数据,那么两个节点要么均无法启动,要么以瓜分完整共享数据的方式启动
    总之,无论是哪一种,都不是我们希望见到的
解决方案:
    RabbitMQ 提供了三种配置:
    ignore:默认配置,发生网络分区时不作处理,当认为网络是可靠时选用该配置
    autoheal:各分区协商后重启客户端连接最少的分区节点,恢复集群(CAP 中保证 AP,有状态丢失)
    pause_minority:分区发生后判断自己所在分区内节点是否超过集群总节点数一半,如果没有超过则暂停这些节点(保证 CP,总节点数为奇数个)
3.mq的优劣势?
	可以参考上面的mq优劣势...   

ats auth admin:admin




### **八 RabbitMq的常见面试题**

```text
1 如何保证mq的幂等性?
	首先解释一下什么叫mq幂等性和非等幂性。
	幂等性:消息提供者可能每天都会多次发送消息到mq,不论某个环节出现了什么情况,都要保证消息只被消费一次。
	非幂等性:提供者发送消息后,消费端给生产端返回ack标记是发生网络中断,导致生产者没有收到消费端完成消费的信息,导致生产者重新发送此条消息,导致消息重复消费。 或者是消费端完成消费后给mq发送ack时,出现网络中断,导致生产者没有收到消费已经被消费的结果,导致消息多次消费的问题。
	解决方案:
		1.mq内部会为每条消息生成一个全局唯一、与业务无关的消息id,当mq接收到消息时,会先根据该id判断消息是否重复发送,mq再决定是否接收该消息。
2.Rabbitmq的脑裂问题?
	RabbitMQ 集群的网络分区容错性并不高,在网络质量较差的环境中会比较容易出现问题,其中最明显的就是脑裂问题
	所谓的脑裂问题,就是在多机热备的高可用 HA 系统中,当两个节点心跳突然断开,就分裂为了两个独立的个体,由于互相失去联系,都认为对方出现了故障,因此都会去争抢对方的资源,争抢启动,由此就会发生严重的后果
    举个形象的例子,A 和 B 作为一个双机热备集群的两个节点,各自持有集群的一部分数据 -- a 和 b,这时,两机器之间突然无法通信,A 认为 B 已经挂掉,B 认为 A 已经宕机,于是会出现:
    如果 A 拥有 b 数据的备份,则 A 将以完整数据运行,B 也同样以完整数据运行,这将很可能导致两个节点同时读写共享数据造成数据损坏
    如果 A、B 各自仅拥有 a、b 数据,那么两个节点要么均无法启动,要么以瓜分完整共享数据的方式启动
    总之,无论是哪一种,都不是我们希望见到的
解决方案:
    RabbitMQ 提供了三种配置:
    ignore:默认配置,发生网络分区时不作处理,当认为网络是可靠时选用该配置
    autoheal:各分区协商后重启客户端连接最少的分区节点,恢复集群(CAP 中保证 AP,有状态丢失)
    pause_minority:分区发生后判断自己所在分区内节点是否超过集群总节点数一半,如果没有超过则暂停这些节点(保证 CP,总节点数为奇数个)
3.mq的优劣势?
	可以参考上面的mq优劣势...