kafka多个消费者,怎么收到所有的消息.

news/2024/7/24 10:18:17

kafka多个消费者,怎么收到所有的消息.

 

 

消费者组是Kafka实现单播和广播两种消息模型的手段。同一个topic,每个消费者组都可以拿到相同的全部数据。

 

组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。

 

一、

1、原理图

2、原理描述

一个topic 可以配置几个partition,produce发送的消息分发到不同的partition中,consumer接受数据的时候是按照group来接受,kafka确保每个partition只能同一个group中的同一个consumer消费,如果想要重复消费,那么需要其他的组来消费。Zookeerper中保存这每个topic下的每个partition在每个group中消费的offset 

 

新版kafka把这个offsert保存到了一个__consumer_offsert的topic下 

 

这个__consumer_offsert 有50个分区,通过将group的id哈希值%50的值来确定要保存到那一个分区.  这样也是为了考虑到zookeeper不擅长大量读写的原因。

 

所以,如果要一个group用几个consumer来同时读取的话,需要多线程来读取,一个线程相当于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。 

 

假设一个topic test 被groupA消费了,现在启动另外一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新建立,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 除非用显式的用–from-beginnging 来获取从0开始数据 


http://www.niftyadmin.cn/n/958003.html

相关文章

小程序web-view转发

对应页面 js // index.js Page({data: {shareObj: },onLoad: function (options) {console.log(options, options)},bindGetMsg: function (e) {this.data.shareObj e.detail.data[e.detail.data.length - 1];},// 分享onShareAppMessage(options) {let shareObj this.data.…

不同的vite命令建vue,是不同的vite版本,一个是vite1,一个是vite2.0

npm init vite-app daily-cost这个1,是培训的,写教程的时候vite还没更新,过年前的教程,过年了,就升2.0了。 npm init vitejs/app这个是2.0,是官网的

jq设置ul自动滚动

ui,li {list-style: none;}#news{height: 175px;overflow: hidden;}<div id"news"><ul><li>白天极限挑战</li><li>晚上挑战极限</li><li>我就是肾虚公子罗志祥</li></ul></div>$(function() {var $this…

router也升级了,不同的命令升不同的级,现在已经到4级了,官网的弄出来还是3级?

官网的 npm install vue-router升的是3.0级&#xff0c;不是最新版&#xff0c;真是坑死人。 培训机构的这个命令也不行&#xff0c;升级错误 yarn add vue-routernext这个命令才行&#xff0c;升的是4.0 npm install vue-routernext我从哪里知道会是这个呢&#xff0c;是试…

SpringBoot 处理异常的几种常见姿势

1. 使用 ControllerAdvice 和 ExceptionHandler 处理全局异常 这是目前很常用的一种方式&#xff0c;非常推荐。测试代码中用到了 Junit 5&#xff0c;如果你新建项目验证下面的代码的话&#xff0c;记得添加上相关依赖。 1. 新建异常信息实体类 非必要的类&#xff0c;主要…

mysql索引类型和索引方法

索引类型 mysql索引类型normal&#xff0c;unique&#xff0c;full text的区别是什么&#xff1f; normal&#xff1a;表示普通索引 unique&#xff1a;表示唯一的&#xff0c;不允许重复的索引&#xff0c;如果该字段信息保证不会重复例如身份证号用作索引时&#xff0c;可…