博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息队列知识储备
阅读量:4034 次
发布时间:2019-05-24

本文共 6539 字,大约阅读时间需要 21 分钟。

学习消息队列

1.背景

​ 学习,不懂得就要学。

2.定义

  • 消息队列可以简单理解为:把要传输的数据放在队列中
    • 把数据放到消息队列叫做生产者
    • 从消息队列里边取数据叫做消费者

3.为什么要用消息队列

3.1为了解耦。

例子:

[外链图片转存失败,源站可能有在这里插入图片描述

系统A将userId写到消息队列中,系统C和系统D从消息队列中拿数据。这样有什么好处

  • 系统A只负责把数据写到队列中,谁想要或不想要这个数据(消息),系统A一点都不关心
  • 即便现在系统D不想要userId这个数据了,系统B又突然想要userId这个数据了,都跟系统A无关,系统A一点代码都不用改。
  • 系统D拿userId不再经过系统A,而是从消息队列里边拿。系统D即便挂了或者请求超时,都跟系统A无关,只跟消息队列有关

这样一来,系统A与系统B、C、D都解耦

3.2为了异步,提高性能

在这里插入图片描述

生产者生产完放入队列就去干别的事情。

3.3为了限流

在这里插入图片描述

生产者生产多少就在队列中放多少,消费者消费多少就在队列中拿多少。

4.使用消息队列要考虑什么问题

4.1高可用

项目中使用消息队列,都是得集群/分布式的。要做集群/分布式就必然希望该消息队列能够提供现成的支持,而不是自己写代码手动去实现。

4.2数据丢失问题

  • 写到消息队列上,系统B和C还没来得及取消息队列的数据,就挂掉了。如果没有做任何的措施,我们的数据就丢了。消息队列中的数据也需要存在别的地方,这样才尽可能减少数据的丢失

丢消息的原因

绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的

消息队列怎么保证消息可靠传递的

  • 利用消息队列的有序性来验证是否有消息丢失

    在 Producer 端,我们给每个发出的消息附加一个连续递增的序号,然后在 Consumer 端来检查这个序号的连续性。如果没有消息丢失,Consumer 收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号 +1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
  • 确保消息可靠传递

    在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到 Broker,Broker 收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。	注意,异步发送时,则需要在回调方法里进行检查
在存储消息的队列中,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。	对于单个节点的 Broker,需要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在 RocketMQ 中,需要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。	如果是 Broker 是由多个节点组成的集群,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失
在消费阶段,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。如果 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认

4.3消费者怎么得到消息队列的数据

  • 生产者将数据放到消息队列中,消息队列有数据了,主动叫消费者去拿(俗称push)
  • 消费者不断去轮训消息队列,看看有没有新的数据,如果有就消费(俗称pull)

4.4消息重复消费后如何保证幂等性

​ 其实消息重复消费的主要原因在于回馈机制(RabbitMQ是ack,Kafka是offset),在某些场景中我们采用的回馈机制不同,原因也不同,例如消费者消费完消息后回复ack, 但是刚消费完还没来得及提交系统就重启了,这时候上来就pull消息的时候由于没有提交ack或者offset,消费的还是上条消息。

​ 那么如何怎么来保证消息消费的幂等性呢?实际上我们只要保证多条相同的数据过来的时候只处理一条或者说多条处理和处理一条造成的结果相同即可,但是具体怎么做要根据业务需求来定,例如入库消息,先查一下消息是否已经入库啊或者说搞个唯一约束啊什么的,还有一些是天生保证幂等性就根本不用去管,例如redis就是天然幂等性。

1.比如某个数据要写库,你先根据主键查一下,如果数据有了,就别插入了,update一下好吧2.比如你是写redis,那没问题了,反正每次都是set,天然幂等性3.对于消息,我们可以建个表(专门存储消息消费记录)生产者,发送消息前判断库中是否有记录(有记录说明已发送),没有记录,先入库,状态为待消费,然后发送消息并把主键id带上。消费者,接收消息,通过主键ID查询记录表,判断消息状态是否已消费。若没消费过,则处理消息,处理完后,更新消息记录的状态为已消费。

4.5数据一致性问题

生产者与消费者之间的一致性问题。

比如,订单系统,建立订单后通过消息队列向购物车系统发送消息,如何避免创建订单却没有删除购物车,或者订单没有创建成功,但是购物车商品却被清掉了。 所以,创建订单和发送消息这两个步骤要么都操作成功,要么都操作失败,不允许一个成功而另一个失败的情况出现。

这个问题靠事务解决

什么是事务?

  • 如果我们需要对若干数据进行更新操作,为了保证这些数据的完整性和一致性,我们希望这些更新操作要么都成功,要么都失败

    这就是事务要解决的问题。

    一个严格意义的事务实现,应该具有 4 个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为 ACID 特性。原子性,是指一个事务操作不可分割,要么成功,要么失败,不能有一半成功一半失败的情况。一致性,是指这些数据在事务执行完成这个时间点之前,读到的一定是更新前的数据,之后读到的一定是更新后的数据,不应该存在一个时刻,让用户读到更新过程中的数据。隔离性,是指一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对正在进行的其他事务是隔离的,并发执行的各个事务之间不能互相干扰,这个有点儿像我们打网游中的副本,我们在副本中打的怪和掉的装备,与其他副本没有任何关联也不会互相影响。持久性,是指一个事务一旦完成提交,后续的其他操作和故障都不会对事务的结果产生任何影响。
  • 分布式系统中很难实现严格的一致性。

    例子:

在这里插入图片描述

半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的

第3步成功就提交信息,失败就回滚信息。妙啊~

  • 对于上图中万一提交时失败该怎么办? kafka是直接返回异常,让程序员自己处理…

4.6如何避免消息队列的积压

优化性能

  • 优化生产者和消费者的业务逻辑

    对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展 Broker 的实例数成倍地提升处理能力。

    我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。

  • 优化生产者:

    • 一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。

      对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。

  • 优化消费者

    使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。

    所以,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。

    消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容 Consumer 的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保 Consumer 的实例数和分区数量是相等的。如果 Consumer 的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。

    误区

    为了避免消息积压,在收到消息的 OnMessage 方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个 Consumer 不能并行消费的问题。  这是一个错误的思路,因为会造成消息丢失!

排查消息积压

  • 能导致积压突然增加,最粗粒度的原因,只有两种:要么是发送变快了,要么是消费变慢了
  • 发送快,通过扩容消费端的实例数来提升总体的消费能力。如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。
  • 你通过监控发现,无论是发送消息的速度还是消费消息的速度和原来都没什么变化,这时候你需要检查一下你的消费端,是不是消费失败导致的一条消息反复消费这种情况比较多,这种情况也会拖慢整个系统的消费速度
  • 监控到消费变慢了,需要检查消费实例,分析一下是什么原因导致消费变慢。优先检查一下日志是否有大量的消费错误,如果没有错误的话,可以通过打印堆栈信息,看一下消费线程是不是卡在什么地方不动了,比如触发了死锁或者卡在等待某些资源上了

5.参考链接

  • https://www.zhihu.com/question/54152397?sort=created
  • https://blog.csdn.net/java_kider/article/details/109014506
  • https://blog.csdn.net/java_kider/article/details/109005839?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161861798916780265494814%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=161861798916780265494814&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_v2~rank_v29-8-109005839.nonecase&utm_term=%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97&spm=1018.2226.3001.4450
  • https://blog.csdn.net/java_kider/article/details/109014118?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161861798916780265494814%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=161861798916780265494814&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2blogfirst_rank_v2~rank_v29-9-109014118.nonecase&utm_term=%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97&spm=1018.2226.3001.4450
  • https://www.cnblogs.com/756623607-zhang/p/10506909.html

6.消息队列知识图谱

知识图谱

7.消息队列模型演化

7.1队列模型

在这里插入图片描述

  • 消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。

  • 如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息。一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。

7.2发布-订阅者模型

在这里插入图片描述

​ 在发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息

7.3RabbitMQ 的消息模型

在这里插入图片描述

​ 在 RabbitMQ 中,Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。同一份消息如果需要被多个消费者来消费,需要配置 Exchange 将消息发送到多个队列,每个队列中都存放一份完整的消息数据,可以为一个消费者提供消费服务。

7.4 Kafka和RocketMQ 的消息模型

  • 基于发布订阅模型,又用上了队列,使用请求-确认机制保证消息的可靠性。

    在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。也保证了有序性。

    请求确认机制保证了有序性,但是也带来的至多只能有一个消费者在消费的局限,使用队列解决。

    每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。

    订阅者的概念是通过消费组来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,	消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者 Consumer1 消费了,那同组的其他消费者就不会再收到这条消息。

在这里插入图片描述

PS: kafka中的队列成为了分区。

你可能感兴趣的文章
如何构建高扩展性网站
查看>>
微服务架构的设计模式
查看>>
持续可用与CAP理论 – 一个系统开发者的观点
查看>>
nginx+tomcat+memcached (msm)实现 session同步复制
查看>>
c++字符数组和字符指针区别以及str***函数
查看>>
c++类的操作符重载注意事项
查看>>
c++模板与泛型编程
查看>>
STL::deque以及由其实现的queue和stack
查看>>
WAV文件解析
查看>>
DAC输出音乐2-解决pu pu 声
查看>>
WPF中PATH使用AI导出SVG的方法
查看>>
WPF UI&控件免费开源库
查看>>
QT打开项目提示no valid settings file could be found
查看>>
Win10+VS+ESP32环境搭建
查看>>
Ubuntu+win10远程桌面
查看>>
flutter-实现圆角带边框的view(android无效)
查看>>
android 代码实现圆角
查看>>
flutter-解析json
查看>>
android中shader的使用
查看>>
java LinkedList与ArrayList迭代器遍历和for遍历对比
查看>>