Flink零基础学习(二)DataSource以及kafka和Socket信息源传输实例

news/2024/7/24 12:11:50
一:数据源有以下几种类型

* 1.基于集合:有界数据集,更偏向本地测试
* 2.基于文件,适合监听文件修改并读取内容
* 3.基于socket:监听主机的host port,从socket中获取数据
* 4.自定义addSource:无界

二:详细分析

    1.集合

     DataStreamSource<OUT>  input =env.fromCollection(Collection<OUT> data);

    2.迭代器

     DataStreamSource<OUT>  input =env.fromCollection(Iterable,Class);

    3.给定的数据

     DataStream<Out> input =env.fromElements();

    4.从一个迭代器里创建并行数据流

    DataStreamSource<OUT>  input = env.fromParallelCollection(SplittableIterator,Class)

    5.创建一个生成制定区间范围内的数字序列的并行数据流
    DataStreamSource<OUT>  input = env.generateSequence(from,to);   6.基于文件
     
   DataStreamSource<String>  text = env.readFile(path);

    指定格式的文件输入格式读取文件

    DataStreamSource<String>  text = env.readFile(fileInputFormat,path);

    解释:上面两个方法内部调用的方法。它根据给定的 fileInputFormat 和读取路径读取文件。
         根据提供的 watchType,这个 source 可以定期(每隔 interval 毫秒)
         监测给定路径的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),
         或者处理一次路径对应文件的数据并退出(FileProcessingMode.PROCESS_ONCE)。
         你可以通过 pathFilter 进一步排除掉需要处理的文件,如下:

    DataStreamSource<MyEvent>  stream = env.readFile(
            myFormat,myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY,100,
            FilePathFilter.createDefaultFilter(),typeInfo
    );

三:Socket实现demo

  1.maven引入

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
   <scope>provided</scope>
</dependency>
<flink.version>1.8.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>

一个是flink版本,一个是scala预言版本

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream=env.socketTextStream("localhost",9999)
                                                 .flatMap(new SocketTextStreamWordCount.LinSplitter())
                                                 .keyBy(0)
        .timeWindow(Time.seconds(5))
        .sum(1);

统计方法类:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author zhouxl
 * 实例demo
 */
public class SocketTextStreamWordCount {
    public static void  main(String[] args) throws Exception{
      //参数检查
        if(args.length !=2){
            System.err.println("USAGE:\\nSocketTextStreamWordCount <hostname> <port>");
            return;
        }
       String hostname =args[0];
       Integer port =Integer.parseInt(args[1]);


       //设置流执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      //获取数据
        DataStreamSource<String> stream  =env.socketTextStream(hostname,port);
      //计数
        SingleOutputStreamOperator<Tuple2<String,Integer>> sum =stream.flatMap(new LinSplitter())
                .keyBy(0)
                .sum(1);

        sum.print();

        env.execute("Java WordCount from SocketTextStream Example");
    }


    public static  final  class LinSplitter implements FlatMapFunction<String,Tuple2<String,Integer>>{


        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
           String[] tokens =s.toLowerCase().split("\\W+");

           for(String token :tokens){
               if(token.length()>0){
                   collector.collect(new Tuple2<String, Integer>(token,1));
               }
           }

        }
    }
}

实验结果

1maven打包好启动jar

flink run -c com.tuya.SocketTextStreamWordCount /Users/zhouxl/flinkdemo/target/original-flinkdemo-1.0-SNAPSHOT.jar 127.0.0.1 9000

2设置控制台监听 nc -l 9000

3启动flink(sh文件启动)

启动后

在idea控制台打印任意字符,都可以在日志(flink的log文件夹下面)看到统计结果

四:kafka实现demo

1.引入参考三

2.定义传递dto

@data
public class Metric {

    public String name;

    public long timestamp;

}

3.kafka发布类

/**
 * @author zhouxl
 * kafka发送工具类
 */
public class kafkaUtils {

    public static  final String broker_list ="kafka地址";

    public static  final String topic ="flink_demo";

    public static  void writeTokafak(Integer count) throws InterruptedException{

        //生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍
        Map<String,Object> config=new HashMap<String, Object>(16);
        //kafka服务器地址
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,kafka地址);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG," org.apache.kafka.clients.producer.internals.DefaultPartitioner");

        config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);
        KafkaProducer producer =new KafkaProducer<String,String>(config);
        Metric metric =new Metric();
        metric.setTimestamp(System.currentTimeMillis());
        metric.setName("mem");
        JSON.toJSONString(metric));
        ProducerRecord record =new ProducerRecord<String,String>(topic,metric.toString());
        Future<RecordMetadata> future= producer.send(record);
        producer.flush();
       try{
           future.get();
           System.out.println("发送"+future.isDone()+"数据:"+JSON.toJSONString(metric));
       }catch (Exception e){
       }
    }

    /**
     * 模拟kafka发送数据
     * @param args
     */
    public static void main(String[] args)throws InterruptedException{
        Integer count=1;
        while(true){
            Thread.sleep(3000);
            writeTokafak(count++);
        }
    }

4.flink流处理

/**
 * @author zhouxl
 * 接受kafka消息
 */
public class Main {
    public static  void main(String[] args) throws Exception{

        final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        Properties  props =new Properties();

        props.put("bootstrap.servers",kafka地址);

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        props.put("auto.offset.reset","latest");

        DataStreamSource<String> dataStreamSource =env.addSource(
           new FlinkKafkaConsumer<String>("flink_demo",
                   //String序列化
                   new SimpleStringSchema(),
                   props
                   )
        ).setParallelism(1);

        //将从kafka读到的数据打印在控制台
        dataStreamSource.print();

        env.execute("Flink添加kafka资源");
    }
}

5实验结果

发送的结果

接受到的结果

 

总结:挤出点时间不容易啊


http://www.niftyadmin.cn/n/1301215.html

相关文章

javascript全局异常监听

不管我们用javascript做什么开发&#xff0c;总会出现项目运行异常&#xff0c;甚至crash&#xff0c;这个时候&#xff0c;我们希望不只是javascript会打印一行日志&#xff0c;而是可以出现一个弹窗或者其他的一些让我们开发者更能直接获取到的信息。 对于这个需求&#xff…

Flink零基础学习(三) Data Sink讲解和实例

1.概念&#xff1a; 主要是对经过flink处理后的流所做一系列的操作&#xff0c;操作完后就把计算后的数据结果 Sink 到某个地方&#xff08; Kafka、ElasticSearch、Socket、RabbitMQ、JDBC、Cassandra POJO、File、Print 等&#xff09;&#xff0c;简单的讲就是流去向 2.主要…

CocosCreator图片资源加密解密

文章转载自&#xff1a;http://www.cnblogs.com/pixs-union/p/6226337.html 主要处理png图片&#xff0c;其他格式图片也是一样的原理。阅读前可以简略了解一下png格式图片的Data trunck。 首先使用python脚本去掉png的PNG SIG(8 bytes) 以及末尾的PNGIEND(12 bytes)。然后图片…

解决go vendor版本引起的api接口异常问题

业务场景&#xff1a; 在go项目中引入vendor做版本控制&#xff08;类似maven&#xff09;&#xff0c;在gitclone本地后&#xff0c;发现部分aws接口不存在,尝试修复此bug 解决思路&#xff1a; 1.检查import各种引入是否异常,这个一般从错误日志就可以发现 2.检查vendor&a…

bat删除文本文件每行前几个字符

echo off set fna.txt (for /f "usebackq delims" %%i in ("%fn%")do ( echo;%%i>con set "h%%i" setlocal enabledelayedexpansion echo;!h:~4! endlocal))>b.txt move b.txt "%fn%" pause 该BAT脚本运行时&#xff0c;需要有…

Linux之Ansible基础操作以及注意

前段时间用到了&#xff0c;过了几个星期觉得自己差不多快忘了&#xff0c;于是留下点东西 背景&#xff1a;需要一个脚本来自动化支持文件传输功能&#xff0c;将服务端传输到客户端&#xff0c;客户端可配置无需操作 采用原因:支持api操作&#xff0c;基于python开发&#…

Java报错-Service not be found

作为Java新手&#xff0c;今天新增了一套接口&#xff0c;其中包括 Controller、Service、Imp、Mapper、Mapper.xml。 最后运行的时候&#xff0c;没有直接运行起来&#xff0c;报错为以下内容&#xff1a; Description:Field pieceService in com.heque.minigame.controlle…

JavaScript --- Map集合结构详解

Map 对象保存键值对。任何值(对象或者原始值) 都可以作为一个键或一个值。 语法 new Map([iterable]) 参数 iterable Iterable 可以是一个数组或者其他 iterable 对象&#xff0c;其元素或为键值对&#xff0c;或为两个元素的数组。 每个键值对都会添加到新的 Map。null 会被当…