kafka 并发数配置过程中踩到的坑 InstanceAlreadyExistsException Error registering AppInfo mbean

news/2024/7/24 1:40:00

java 服务器

kafka 并发数配置过程中踩到的坑 InstanceAlreadyExistsException

2017-07-05 13:09:15.460 [kafka_spout:7-MultipleThreadSpoutExecutors] WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:57)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:512)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:73)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:69)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:284)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:222)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:179)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:126)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:204)
at com.mapbar.stream.qingqi.core.spout.KafkaSpout.open(KafkaSpout.java:74)
at com.alibaba.jstorm.task.execute.spout.SpoutExecutors.init(SpoutExecutors.java:142)
at com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors.init(MultipleThreadSpoutExecutors.java:64)
at com.alibaba.jstorm.task.execute.BaseExecutors.initWrapper(BaseExecutors.java:154)
at com.alibaba.jstorm.task.execute.spout.MultipleThreadSpoutExecutors.run(MultipleThreadSpoutExecutors.java:76)
at com.alibaba.jstorm.callback.AsyncLoopRunnable.run(AsyncLoopRunnable.java:95)
at java.lang.Thread.run(Thread.java:745)

上面是本人在使用spring kafka中所遇到的问题,针对此问题做一个记录,整理到此处

出现上述问题的原因:

ConcurrentMessageListenerContainer factory = new ConcurrentMessageListenerContainer(cf, containerProps);
factory.setConcurrency(kafkaConfig.getConcurrencySize());
如果使用了ConcurrentMessageListenerContainer 的实现,并且配置了并发度大于1,同时配置了kafka的 client.id属性则会出现上述问题,而当你配置为1的时候不会出现上述log
解决方式:不配置client.id这一项,kakfa中会默认为多个线程生成id
详细解析:
  org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:640)
  从error log中可以看出

       在调用AppInfoParser.registerAppInfo方法时出现的异常

  

at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

  从堆栈信息中定位到 Repository.addMBean()

截取其中一部分代码

上图中可以得知,一个clientId对应一个,不可重复

最后找到KafkaConsumer,发现是因为自己配置了client.id导致的,如果不配置的话

会为每一个线程生成一个clientid,"consumer" +  自增id,原子性递增

看到这里,发现了一个问题,如果自己不配置client.id的话,那从config里取出来的数据直接判断的length,猜测是有默认配置,当自己不配置的时候给赋值为"" 空串,于是又查了一下代码来验证

创建consumer对象的时候,会创建ConsumerConfig 配置,new ConsumerConfig()会调用父类构造方法
ConsumerConfig(Map<?, ?> props) {
    super(CONFIG, props);
}
AbstractConfig

ConsumerConfig中有static静态初始化块,来初始化 ConfigDef

把client.id赋值为""


                

http://www.niftyadmin.cn/n/957943.html

相关文章

vue.js获取目标标签距离顶部的距离,获得距离后滚到到目标标签位置

动态获得目标高度 <div ref"pronbit"><div>我的动态高度</div> </div> mounted(){window.addEventListener(scroll,this.handleScrollx,true) }, methods: {handleScrollx() {console.log(滚动高度,window.pageYOffset)console.log(距离顶部…

IDEA可视化Log理解Git仓库\历史状态\commit\分支

IDEA可视化Log理解Git仓库\历史状态\commit\分支 from&#xff1a;https://github.com/tintinng/advanced-git/tree/master fork from&#xff1a;https://github.com/hcsp/advanced-git ref:https://xiedaimala.com/courses 什么是仓库&#xff1f; 仓库是指一个历史可追溯&a…

尼克发的可以运行,我的就不行,登录注册页面出不来,不知道是为什么

怎么办&#xff0c;重新倒回去试一遍&#xff1f; 还是两个一起比一下&#xff0c;比得眼睛都绿了 好慢啊&#xff0c;太慢了

vue循环定时器可用

beforeDestroy() {clearInterval(this.timer);},created () {this.timer setInterval(() > {this.autoPlay() //你的方法}, 1000)},

使用 v-on 指令来监听DOM事件

监听事件 我们可以使用 v-on 指令 (通常缩写为 符号) 来监听 DOM 事件&#xff0c;并在触发事件时执行一些 JavaScript。用法为 v-on:click“methodName” 或使用快捷方式 click“methodName” 例如&#xff1a; <div id"basic-event"><button click&quo…

spring mvc获取header

两种方法&#xff1a; 1.在方法参数中加入RequestHeader 2.在类级别注入HttpServletRequest 建议使用第二种方法&#xff0c;这样可避免每个方法都加入HttpHeaders参数 Controller RequestMapping("/hello") public class HelloController { Autowired …

域名/IP访问显示“拒绝了我们的连接请求”

1.先ping看ping的通不 2.查看Apache,Nginx等web服务器是否正常 3.查看服务器防火墙是否关闭

Git Reset 三种模式

有时候&#xff0c;我们用Git的时候有可能commit提交代码后&#xff0c;发现这一次commit的内容是有错误的&#xff0c;那么有两种处理方法&#xff1a; 1、修改错误内容&#xff0c;再次commit一次 2、使用git reset 命令撤销这一次错误的commit 第一种方法比较直接&#xff0…