聊聊Flink必知必会(四)
时间:2023-06-17 01:40:39来源:博客园

概述

Flink Streaming API借鉴了谷歌数据流模型(Google Data Flow Model),它的流API支持不同的时间概念。Flink明确支持以下3个不同的时间概念。


【资料图】

Flink明确支持以下3个不同的时间概念。(1)事件时间:事件发生的时间,由产生(或存储)事件的设备记录。

(2)接入时间:Flink在接入事件时记录的时间戳。

(3)处理时间:管道中特定操作符处理事件的时间。

支持事件时间的流处理器需要一种方法来度量事件时间的进度。在Flink中测量事件时间进展的机制是水印(watermark)。水印是一种特殊类型的事件,是告诉系统事件时间进度的一种方式。水印流是数据流的一部分,并带有时间戳t。水印(t)声明事件时间已经到达该流中的时间t,这意味着时间戳t′≤t(时间戳更早或等于水印的事件)的流中不应该有更多的元素。

Flink中水印的处理

水印的时间戳

时间t的水印标记了数据流中的一个位置,并断言此时的流在时间t之前已经完成。为了执行基于事件时间的事件处理,Flink需要知道与每个事件相关联的时间,它还需要包含水印的流。水印就是系统事件时间的时钟。水印触发是基于事件时间的计时器的触发。

事件流的类型有两种,一个是顺序的,一个是无序的。先看顺序场景下,水印的排列。

对于无序流,水印是至关重要的,其中事件不是按照它们的时间戳排序的。

例如,当操作符接收到w(11)这条水印时,可以认为时间戳小于或等于11的数据已经到达,此时可以触发计算。同样,当接收到w(17)这条水印时,可以认为时间戳小于或等于17的数据已经到达,此时可以触发计算。

可以看出,水印的时间戳是单调递增的,时间戳为t的水印意味着所有后续记录的时间戳将大于t。一般来讲,水印是一种声明,在流中的那个点之前,即在某个时间戳之前的所有事件都应该已经到达。

水印是在源函数处或直接在源函数之后生成的。源函数的每个并行子任务通常可以独立地生成水印。这些水印定义了特定并行源处的事件时间。

水印的生成

Flink提供了用于处理事件时间、时间戳和水印的API。

为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过TimestampAssigner从元素中的某个字段访问/提取时间戳实现的。

Flink提供了两种方式创建水印。

1.使用WatermarkStrategy上的静态辅助方法实现公共水印策略:

2.实现WatermarkStrategy接口,自定义TimestampAssigner与WatermarkGenerator捆绑在一起:

@Publicpublic interface WatermarkStrategy        extends TimestampAssignerSupplier, WatermarkGeneratorSupplier {    @Override    WatermarkGenerator createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);    @Override    default TimestampAssigner createTimestampAssigner(            TimestampAssignerSupplier.Context context) {        return new RecordTimestampAssigner<>();    }    @Experimental    default WatermarkAlignmentParams getAlignmentParameters() {        return WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED;    }    default WatermarkStrategy withTimestampAssigner(            TimestampAssignerSupplier timestampAssigner) {        checkNotNull(timestampAssigner, "timestampAssigner");        return new WatermarkStrategyWithTimestampAssigner<>(this, timestampAssigner);    }    default WatermarkStrategy withTimestampAssigner(            SerializableTimestampAssigner timestampAssigner) {        checkNotNull(timestampAssigner, "timestampAssigner");        return new WatermarkStrategyWithTimestampAssigner<>(                this, TimestampAssignerSupplier.of(timestampAssigner));    }    default WatermarkStrategy withIdleness(Duration idleTimeout) {        checkNotNull(idleTimeout, "idleTimeout");        checkArgument(                !(idleTimeout.isZero() || idleTimeout.isNegative()),                "idleTimeout must be greater than zero");        return new WatermarkStrategyWithIdleness<>(this, idleTimeout);    }    @Experimental    default WatermarkStrategy withWatermarkAlignment(            String watermarkGroup, Duration maxAllowedWatermarkDrift) {        return withWatermarkAlignment(                watermarkGroup,                maxAllowedWatermarkDrift,                WatermarksWithWatermarkAlignment.DEFAULT_UPDATE_INTERVAL);    }    @Experimental    default WatermarkStrategy withWatermarkAlignment(            String watermarkGroup, Duration maxAllowedWatermarkDrift, Duration updateInterval) {        return new WatermarksWithWatermarkAlignment(                this, watermarkGroup, maxAllowedWatermarkDrift, updateInterval);    }    static  WatermarkStrategy forMonotonousTimestamps() {        return (ctx) -> new AscendingTimestampsWatermarks<>();    }    static  WatermarkStrategy forBoundedOutOfOrderness(Duration maxOutOfOrderness) {        return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);    }    static  WatermarkStrategy forGenerator(WatermarkGeneratorSupplier generatorSupplier) {        return generatorSupplier::createWatermarkGenerator;    }    static  WatermarkStrategy noWatermarks() {        return (ctx) -> new NoWatermarksGenerator<>();    }}

这里面提供了很多静态的方法和带有缺省实现的方法,只有一个方法是非default和没有缺省实现的,就是createWatermarkGenerator方法。

所以默认情况下,我们只需要实现这个方法就行了,这个方法主要是返回一个 WatermarkGenerator。

@Publicpublic interface WatermarkGenerator {/** * Called for every event, allows the watermark generator to examine and remember the * event timestamps, or to emit a watermark based on the event itself. */void onEvent(T event, long eventTimestamp, WatermarkOutput output);/** * Called periodically, and might emit a new watermark, or not. * * 

The interval in which this method is called and Watermarks are generated * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. */void onPeriodicEmit(WatermarkOutput output);}

这个方法简单明了,主要是有两个方法:

  • onEvent :每个元素都会调用这个方法,如果我们想依赖每个元素生成一个水印,然后发射到下游(可选,就是看是否用output来收集水印),我们可以实现这个方法.

  • onPeriodicEmit : 如果数据量比较大的时候,我们每条数据都生成一个水印的话,会影响性能,所以这里还有一个周期性生成水印的方法。这个水印的生成周期可以这样设置:env.getConfig().setAutoWatermarkInterval(5000L)

标签:

最新
  • 聊聊Flink必知必会(四)

    概述FlinkStreamingAPI借鉴了谷歌数据流模型(GoogleDataFlowModel)

  • 西延高铁马坊隧道及北村隧道相继贯通 每日热闻

    (张远 焦键 陈晖)记者16日从中国铁路西安局集团有限公司获悉,西

  • 枫香湖南城步群体家系H3_关于枫香湖南城步群体家系H3概略

    1、枫香湖南城步群体家系H3是国家林木种质资源平台收载的金缕梅科,枫

  • 上市险企前五月保费收入同比大增 基本面好转获进一步验证

    日前,上市险企前五月保费收入悉数出炉,头部险企均交出一份令人振奋的

  • 保障房产业创新联盟在京成立 由56家保障房上下游产业单位组成

    保障房产业创新联盟是包括北京保障房中心有限公司、深圳市人才安居集团

  • 度小满逾期一个月会被起诉吗?度小满逾期怎么办?|当前热文

    随着互联网金融的快速发展和普及,越来越多的人开始使用P2P网络借贷来

  • 热点!热身-武磊6分钟2球!张琳芃林良铭破门 国足4-0缅甸

    热身-武磊6分钟2球!张琳芃林良铭破门国足4-0缅甸,武磊,张琳,林良铭,张

  • 沪滇广告与品牌联合研究与协同促进中心在云南民族大学成立 全球热闻

    6月16日,由上海大学、上海市广告协会、云南民族大学、云南省广告协会

  • 踢得像梅西!缅甸球员戏耍国足中场,徐新犯规送任意球

    直播吧6月16日讯国足vs缅甸第21分钟,缅甸中场带球,王上源提前预判往

  • 浦发银行重庆分行多形式开展金融知识宣教活动

    近日,浦发银行重庆分行秉持“金融为民”初心,积极组织全辖网点开展“

  • 今日播报!五部门:截至4月末涉农贷款余额53.16万亿元 同比增长16.4%

    五部门:截至4月末涉农贷款余额53 16万亿元同比增长16 4%---人民网北京

  • Satz-Batz·Night·By·Night 夜夜杀伐骑士谭 #6

    仍在战斗的二丁目·street,响彻着重低音cybersound(电子音)!小太鼓

  • 北京大学鄂尔多斯能源研究院正式揭牌

    中新网鄂尔多斯6月16日电(记者 李爱平)北京大学鄂尔多斯能源研究院

  • 位运算与集合_天天观热点

    前言在刷LeetCode的时候,我们常常碰到需要枚举同时选择几个元素,或者

  • 全球热点!海珠区东方红印刷厂地块规划调整公示,拟变身住宅用地

    该地块原来主要为商业金融业用地和公园绿地,本次拟调整为二类居住用地

  • 环球快报:鲍·维昂小说五篇_关于鲍·维昂小说五篇简述

    小伙伴们,你们好,今天小夏来聊聊一篇关于鲍·维昂小说五篇,关于鲍·

  • 旅游
    • 全面注册制什么意思? 全面注册制对股民有什么影响?

    • 股票池是什么意思?怎么建立自己的股票池?

    • 防晒喷雾什么时候喷效果比较好呢?你知道吗?

    • 仙佑医药科技有限公司怎么样? 仙佑集团口碑为什么这么好?