flink的集成测试

news/2024/7/24 12:19:10 标签: flink, 集成测试, java

背景

日常测试中我们使用flink的TestHarness只能测试单个算子,很多情况下我们需要集成测试来测试真正的问题,所以在flink中进行集成测试还是非常有必要的,本文就来记录下如何在flink中进行集成测试

flink_4">flink中进行集成测试

flink中进行集成测试的关键类MiniClusterWithClientResource,这是一个启动本地flink集群的关键类,先看一下集成测试的关键代码:

java">/**
 * FLINK集成测试
 * https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/
 *
 */
public class FlinkIntegrationTest {

    public static final Configuration config = Configuration.fromMap(new HashMap<String, String>() {
        {
            put("heartbeat.timeout", "300000");
        }
    });

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster =
            new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config)
                    .setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());

    @Test
    public void testStateFlatMap() throws Exception {
        StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements("world", "hi").keyBy(e -> "1").flatMap(statefulFlatMap).addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi world")));
    }

    @Test
    public void testStateFlatMap1() throws Exception {
        StatefulFlatMap statefulFlatMap = new StatefulFlatMap();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // configure your test environment
        env.setParallelism(2);

        // values are collected in a static variable
        CollectSink.values.clear();

        // create a stream of custom elements and apply transformations
        env.fromElements("world", "hi", "world").keyBy(e -> e).flatMap(statefulFlatMap).addSink(new CollectSink());

        // execute
        env.execute();

        // verify your results
        assertTrue(CollectSink.values.containsAll(Lists.newArrayList("hello world", "hello hi", "hello world world")));
    }



    // create a testing sink
    private static class CollectSink implements SinkFunction<String> {

        // must be static
        public static final List<String> values = Collections.synchronizedList(new ArrayList<>());

        @Override
        public void invoke(String value, Context context) throws Exception {
            values.add(value);
        }
    }


}

public class StatefulFlatMap extends RichFlatMapFunction<String, String> {

    ValueState<String> previousInput;

    @Override
    public void open(Configuration parameters) throws Exception {
        previousInput = getRuntimeContext().getState(
                new ValueStateDescriptor<String>("previousInput", Types.STRING));
    }

    @Override
    public void flatMap(String in, Collector<String> collector) throws Exception {
        String out = "hello " + in;
        if(previousInput.value() != null){
            out = out + " " + previousInput.value();
        }
        previousInput.update(in);
        collector.collect(out);
    }

由于我们是集成测试,我们一般输入source和输出sink是自己构造的,比如这里的CollectSink,这里就可以正常测试包括状态在内的pineline集成测试


http://www.niftyadmin.cn/n/5209231.html

相关文章

BUUCTF [WUSTCTF2020]find_me 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 得到的 flag 请包上 flag{} 提交。 感谢 Iven Huang 师傅供题。 比赛平台&#xff1a;https://ctfgame.w-ais.cn/ 密文&#xff1a; 下载附件&#xff0c;得到一个.jpg图片。 解题思路&#xff1a; 1、得到一张图…

关于easy-es的聚合问题

es实体类&#xff1a; public class ChemicalES {IndexId(type IdType.CUSTOMIZE)private Long id;HighLightIndexField(fieldType FieldType.TEXT, analyzer "ik_max_word")private String name;IndexField(fieldType FieldType.KEYWORD)private List<Stri…

【Vue3】解决Vue打包后上传服务器 资源路径加载错误

问题&#xff1a; 我这里在打包Vue之后将打包后的dist 上传至服务器站点根目录内子目录 名为 "adminstore" , 但是当我通过域名打开站点后发现 资源加载路径内并没有携带 子目录 "adminstore" 文件名称 错误&#xff1a;http://your website domain/js/app…

优化数据分析——理解与运用各类指标

写在开头 数据分析在当今信息时代扮演着至关重要的角色&#xff0c;而指标则是我们理解数据、揭示模式、支持决策的关键工具。本文将深入讨论各类指标的应用场景和解读方法&#xff0c;以帮助更全面、深入地理解数据。 1. 中心趋势指标 1.1 均值&#xff1a;更深层次的理解 …

BUUCTF [ACTF新生赛2020]outguess 1

BUUCTF:https://buuoj.cn/challenges 题目描述&#xff1a; 得到的 flag 请包上 flag{} 提交。 密文&#xff1a; 下载附件&#xff0c;得到一堆文件。 解题思路&#xff1a; 1、根据题目和flag.txt文件提示&#xff0c;猜测为outguess隐写。 outguess下载安装 kail 终端命…

【漏洞复现】金蝶云星空管理中心 ScpSupRegHandler接口存在任意文件上传漏洞 附POC

漏洞描述 金蝶云星空是一款云端企业资源管理(ERP)软件,为企业提供财务管理、供应链管理以及业务流程管理等一体化解决方案。金蝶云星空聚焦多组织,多利润中心的大中型企业,以 “开放、标准、社交”三大特性为数字经济时代的企业提供开放的 ERP 云平台。服务涵盖:财务、供…

电脑软件:SmartSystemMenu(窗口置顶工具)介绍

目录 一、软件介绍 二、软件用途 三、安装教程 注意事项 四、功能介绍 五、软件设置 六、软件下载 一、软件介绍 SmartSystemMenu 是一款简单实用的 Windows 窗口增强工具&#xff0c;它可以为窗口的标题栏右键菜单新增 17 个新功能。 二、软件用途 SmartSystemMenu(窗口…

springboot项目之计算购物车

商城购物车 商城购物车计算&#xff0c;感觉还有很多的优化空间&#xff0c;后期再做优化方案 代码实现 public AjaxResult calCart(RequestParam("bgroupId") String bgroupId,RequestParam("cartIds") String cartIds){**********************略****…