[flink 实时流基础]源算子和转换算子

news/2024/7/24 5:49:42 标签: flink, linq, 大数据

文章目录

    • 1. 源算子 Source
        • 1. 从集合读
        • 2. 从文件读取
        • 3. 从 socket 读取
        • 4. 从 kafka 读取
        • 5. 从数据生成器读取数据
    • 2. 转换算子
        • 基本转换算子(map/ filter/ flatMap)


1. 源算子 Source

Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端。
image.png
在Flink1.12以前,旧的添加source的方式,是调用执行环境的addSource()方法:
DataStream stream = env.addSource(…);
方法传入的参数是一个“源函数”(source function),需要实现SourceFunction接口。
从Flink1.12开始,主要使用流批统一的新Source架构:
DataStreamSource stream = env.fromSource(…)
Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的Source,通常情况下足以应对我们的实际需求。

1. 从集合读
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 1. 从集合读
//        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 2, 3));

        // 2. 直接填元素
        DataStreamSource<Integer> source = env.fromElements(1, 2, 3, 4);

        source.print();

        env.execute();
    }
2. 从文件读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-files</artifactId>
			<version>${flink.version}</version>
		</dependency>

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        FileSource<String> source = FileSource.forRecordStreamFormat(
            new TextLineInputFormat(),
            new Path("input/world.txt"))
            .build();

        env
            .fromSource(source, WatermarkStrategy.noWatermarks(), "fileSource")
            .print();


        env.execute();
    }
3. 从 socket 读取
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 7777);
        source.print();


        env.execute();
    }

可以使用 nc -l 7777创建一个监听链接的 tcp

4. 从 kafka 读取
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("hadoop102:9092")
            .setTopics("topic_1")
            .setGroupId("atguigu")
            .setStartingOffsets(OffsetsInitializer.latest())
            .setValueOnlyDeserializer(new SimpleStringSchema()) 
            .build();

        DataStreamSource<String> stream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source");

        stream.print("Kafka");

        env.execute();
    }
5. 从数据生成器读取数据
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-datagen</artifactId>
			<version>${flink.version}</version>
		</dependency>
 public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(new GeneratorFunction<Long, String>() {
            @Override
            public String map(Long value) throws Exception {
                return "Number:" + value;
            }
        }, 10, // 自动生成的数字序列
            RateLimiterStrategy.perSecond(10), // 限速策略,每秒生成10条
            Types.STRING // 返回类型
        );


        env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "datagenerator").print();


        env.execute();


    }

2. 转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。
image.png

基本转换算子(map/ filter/ flatMap)

map是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素。
image.png
filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。
image.png
flatMap操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。
:::info
消费一个元素,可以产生0到多个元素。
:::
flatMap可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。
image.png


http://www.niftyadmin.cn/n/5459455.html

相关文章

UE4_Mouse_Interaction——拖拽物体的实现

鼠标拖拽物体&#xff0c;效果如下图&#xff1a; 1、新建PlayerController,更名字为MI_PlayerController&#xff0c;双击打开并设置参数&#xff1a; 2、新建GameMode&#xff0c;更名为MI_Gameinfo。参数如下设置&#xff1a; 3、新建材质&#xff0c;更名为BasicAsset02.参…

bizcharts中LineChart时间戳使用moment转化出现Invalid Date

文章目录 一、前言1.1、问题1.2、解决 二、bizcharts三、moment.js四、在线源码五、最后 一、前言 1.1、问题 最近在使用bizcharts绘制折线图LineChart的时候&#xff0c;发现X轴的时间显示成了Invalid Date。如下图所示&#xff1a; 发现是后端返回了时间戳字符串"1572…

前端开发学习笔记(1)

文章目录 基础概念VSCode常用插件和快捷键VSCode常用插件VSCode常用快捷键 常用标签和属性 基础概念 网页和网站&#xff1a; 网页&#xff1a;网站中的一页&#xff0c;通常是HTML格式的文件。网页是由网页元素组成的&#xff0c;这些元素用HTML标签描述&#xff0c;然后通过…

服务器配置入门教程

问题环境&#xff1a; 现场调试的时候遇到很多离奇的问题&#xff0c;部分设备已经老到需要使用清华同方 Windows XP 系统的接口&#xff0c;所以写下这边记录&#xff0c;本文主要是基础教程。 快速入门常识 服务器基础知识_mezz卡-CSDN博客 基本接口识别 IOIOI-RJ45串口&a…

OriginBot智能机器人开源套件

详情可参见&#xff1a;OriginBot智能机器人开源套件——支持ROS2/TogetherROS&#xff0c;算力强劲&#xff0c;配套古月居定制课程 (guyuehome.com) OriginBot智能机器人开源套件 最新消息&#xff1a;OriginBot V2.1.0版本正式发布&#xff0c;新增车牌识别&#xff0c;点击…

opencv-python库 cv2.imwrite() 保存图片

cv2.imwrite 是 OpenCV 库中的一个函数&#xff0c;用于将图像数据保存为文件。其基本语法如下&#xff1a; python cv2.imwrite(filename, img, [params]) 参数说明&#xff1a; filename&#xff1a;要保存的图像的文件名&#xff0c;包括文件路径和扩展名&#xff08;如 …

gdb | 实战调试死循环

事情是这样的&#xff0c;运行一个程序&#xff0c;然后资源直接拉满&#xff0c;风扇呼呼的叫 然后去定位问题 采用gdb 断点调试&#xff0c;挨个 定位 如果想查看数组的所有元素 print *cntn n 是数组cnt 的长度 如果在死循环&#xff0c;就终止&#xff0c;然后 l 查看环境代…

FreeRTOS day1

1.总结keil5下载代码和编译代码需要注意的事项 需要与板子连通 配置完成后才点击下载 2.总结STM32Cubemx的使用方法和需要注意的事项 下载支持包 打开芯片配置界面 3.总结STM32Cubemx配置GPIO的方法