Kafka3.1部署和Topic主题数据生产与消费

文章目录

  • 前言
  • 一、Kafka3.1X版本在Windows11主机部署
  • 二、Kafk生产Topic主题数据
    • 1.kafka生产数据
    • 2.JAVA kafka客户端消费数据
  • 总结


前言

本章节主要讲述Kafka3.1X版本在Windows11主机下部署以及JAVA对Kafka应用:

一、Kafka3.1X版本在Windows11主机部署

1.安装JDK配置环境变量

2.Zookeeper(zookeeper-3.7.1)
zk
部署后的目录位置:D:\setup\apache-zookeeper-3.7.1

3.安装Kafka3.1X
3.1 下载包(kafka_2.12-3.1.2.tgz)
Kafka
在这里插入图片描述
3.2、 解压并进入Kafka目录:
根目录:D:\setup\kafka3.1.2

3、 编辑config/server.properties文件
注意 log.dirs=D:\setup\kafka3.1.2\logs 为根目录下的\logs

listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=D:\\setup\\kafka3.1.2\\logs

4.运行Zookeeper
Zookeeper安装目录D:\setup\apache-zookeeper-3.7.1\bin,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

  .\zkServer.cmd;

在这里插入图片描述
5.运行Kafka
Kafka安装目录D:\setup\kafka3.1.2,按下Shift+右键,选择“打开命令窗口”选项,打开命令行

.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述

二、Kafk生产Topic主题数据

kafka_44">1.kafka生产数据

创建Topic主题heima

.\bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic heima --partitions 2 --replication-factor 1
Created topic heima.

查看Topic主题heima

.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092  --topic heima

在这里插入图片描述
Topic主题heima生产数据

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic heima

在 > 符号后输入数据:

{"mobilePhone":"186xxxx1234","roleCode":"super_admin_xxx"}

在这里插入图片描述

kafka_64">2.JAVA kafka客户端消费数据

2.1 pom.xml文件配置kafka客户端-kafka-clients-2.0.1版本

        <!-- kafka客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.1</version>
        </dependency>

2.2 JAVA数据读取文件

package com.ems.mgr.web.controller.thirdparty;
import com.alibaba.fastjson.JSONObject;
import com.ems.mgr.common.utils.spring.SpringUtils;
import com.ems.mgr.system.service.ISysUserService;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * Kafka服务器操作与数据读取
 */
public class KafkaUtilDemo {
    public static final Logger log = LoggerFactory.getLogger(KafkaUtilDemo.class);
    public static final Properties props = new Properties();
//    protected ISysUserService userService = SpringUtils.getBean(ISysUserService.class);

    public static void init(String kafakservers) {
        // 配置Kafka消费者属性
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafakservers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    }

    /**
     * 持续监听并处理kafa消息,当手机号mobilePhone非空时进入数据同步操作
     * @param kafaktopic
     * @return
     */
    public static String poll(String kafaktopic) {
        String msg = "";
        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singletonList(kafaktopic));
            log.info("Kafka消费者订阅指定主题,持续监听并处理消息");
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(60000));
                for (ConsumerRecord<String, String> record : records) {
                    log.info("offset = " + record.offset() + ",key = " + record.key() + ",value = " + record.value());
                    msg = record.value();
                    if (!StringUtils.isBlank(record.value())) {
                        JSONObject jsonObject = JSONObject.parseObject(record.value());
                        String mobilePhone = jsonObject.getString("mobilePhone");
                        if (StringUtils.isBlank(mobilePhone)) {
                            log.error("Kafka消费者手机号mobilePhone为空");
                        } else {
                            KafkaUtilDemo kafkaUtil = new KafkaUtilDemo();
                            kafkaUtil.syncSystemInfoTask(jsonObject);
                        }
                    }
                }
            }
        } catch (Exception e) {
            log.error("Kafka消费者订阅指定主题,持续监听并处理消息 error msg=" + e.getMessage());
        }
        return msg;
    }

    public boolean syncSystemInfoTask(JSONObject jsonObject) {
        boolean repsBln = true;
        try {
            String mobilePhone = jsonObject.getString("mobilePhone");
            String roleType = jsonObject.getString("roleType");
            String roleCode = jsonObject.getString("roleCode");
            log.info("业务数据同步操作................");
        } catch (Exception e) {
            repsBln = false;
            log.error("Kafka消费者同步入库异常,error msg=" + e.getMessage());
        }
        return repsBln;
    }

    public static void main(String[] args) {
        try {
            String kafakservers = "localhost:9092";
            String kafaktopic = "heima";
            init(kafakservers);
            poll(kafaktopic);
        } catch (Exception e) {
            log.error("error msg=" + e.getMessage());
        }
    }

}

3 执行KafkaUtilDemo 文件,查看消费数据。
在这里插入图片描述

总结

pom.xml文件在引入spring-kafka 会由于版本问题出现


org.apache.kafka
kafka-clients
2.0.1

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>2.2.8.RELEASE</version>
    </dependency>

http://www.niftyadmin.cn/n/5012052.html

相关文章

自然语言处理学习笔记(十)———— 停用词过滤

目录 1.停用词 2.实现思路 3.全部实现代码&#xff1a; 4.运行结果&#xff1a; 1.停用词 汉语中有一类没有多少意义的词语&#xff0c;比如助词“的”、连词“以及”、副词“甚至”、语气词“吧”&#xff0c;称为停用词。一个句子去掉了停用词并不影响理解。停用词视具体任…

PHP调用java class 类实现文件签名

PHP调用java class 类实现文件签名 原始代码改造开始PHP内调用方式起因:对接某平台API接口,发送的文件需要做 SM3 签名,对方平台是java写的,只有java加密示例,照着java的加密算法翻译为PHP版本,在编码转换上始终有些差异。没办法,只能想办法使用他们的java方式。 原始代…

TGA格式文件转材质

今天淘宝上买了一个美女的模型&#xff0c;是blender的源文件&#xff0c;上面说有fbx格式的。我用unity&#xff0c;所以觉得应该可以用。文件内容如下图&#xff1a; FBX文件夹打开后&#xff0c;内容如下图所示&#xff0c;当时就预感到可能没有色彩。 unity打开后果然发现只…

【常用代码14】el-input输入框内判断正则,只能输入数字,过滤汉字+字母。

问题描述&#xff1a; el-input输入框&#xff0c;只能输入数字&#xff0c;但是不能显示输入框最右边的上下箭头&#xff0c; <el-input v-model"input" type"number" placeholder"请输入内容" style"width: 200px;margin: 50px 0;&…

pytorch学习——LSTM和GRU

参考书籍&#xff1a;https://zh-v2.d2l.ai/chapter_recurrent-modern/lstm.html 参考论文&#xff1a; https://colah.github.io/posts/2015-08-Understanding-LSTMs/ 简介&#xff1a; LSTM&#xff08;长短期记忆网络&#xff09;和GRU&#xff08;门控循环单元&#xff09;…

Yew应用中如何获取<textarea/>的值?

当我在开发Yew的组件时&#xff0c;我发现自己总是无法摆脱ReactJS的思维模式。这不在获取中的值时&#xff0c;我脑海里浮现的代码是这样的&#xff1a; <textarea onChange{(e)>console.log(e.target.value)}/>但是在Yew中&#xff0c;最终实现的代码是这样的&…

财务领域的数字助手,银企对账与到账通知软件机器人

在现代财务领域&#xff0c;高效的对账和及时的到账通知是确保财务运营流畅的关键。然而&#xff0c;财务人员通常需要花费大量时间处理繁琐的对账工作&#xff0c;以及手动发送到账通知。博为小帮软件机器人可以为财务部门提高效率和准确性。 软件机器人已经成为财务领域的利器…

Unity之3D物理导航系统

一 介绍 Unity自带寻路(导航)系统是unity官方自带的一种寻路系统。我们可以通过它来制作简单的寻路&#xff0c;比如可以制作点击某个位置&#xff0c;让角色自动的绕开障碍走到目标点的效果&#xff0c;比如可以制作敌人AI&#xff0c;让它可以通过NavMesh绕开障碍追击我方单…