在使用事件时间和时间窗口的过程中,当最后一个事件的事件时间未达到时间窗口的最大时间,窗口不会触发。
举例说明,在按小时的滚动窗口中,假设当前时间是12:05点,按照正常预想13:00时窗口会触发执行,但是在12:00到13:00的时间段内,最后一个事件的时间是12:50,之后再未产生新的事件,那么在13:00的时候,窗口并不会触发执行,只有当后续再产生新的事件,并且事件时间大于13:00时,12-13的窗口才会执行。
在实际开发过程中可能会带来一些问题,当事件不是源源不断的产生时,最后一个窗口不执行,影响结果。
示例代码见末尾。
2. 分析原因源码基于flink 1.14.4
窗口是否执行是由trigger控制,从trigger中逐步寻找原因,在示例代码中使用了TumblingEventTimeWindows,其默认trigger是EventTimeTrigger(通过方法org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows#getDefaultTrigger可知)。
2.1. 窗口触发从EventTimeTrigger实现可知,只有当窗口最大时间小于等于当前水位(每个事件到达时判断) 或 当eventime timer触发时的触发时间戳等于窗口最大时间时才会触发窗口。因此最终决定窗口是否触发的原因是水位值。
flink自身提供的其他和事件时间有关的trigger触发时机类似。
// 返回TriggerResult.FIRE时触发计算public EventTimeTrigger extends Trigger<Object, TimeWindow> { @Override public TriggerResult onElement( Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // 窗口最大时间小于等于水位值时,触发 return TriggerResult.FIRE; } else { // 大于水位值时,将窗口最大时间注册timer,注册timer后,当水印时间超过参数指定的时间时,会调用下面的onEvenTime方法 ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } // time是timer触发时的事件戳 @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; }}因此再了解下水位值如何更新。
2.2. 事件时间和水位DataStrema中默认提供了3种分配水位的方式,重点了解下通过WatermarkStrategy实现指定水位的方法。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy) @Deprecatedpublic SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)@Deprecatedpublic SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)WatermarkStrategy的核心是TimestampAssigner和WatermarkGenerator(按照字面意思暂且称之为时间戳分配器和水位生成器),水位生成器根据指定的时间戳来生成水位。同时WatermarkStrategy默认提供了两种快速实现WatermarkStrategy的方法forMonotonousTimestamps和forBoundedOutOfOrderness,分别表示顺序递增和允许乱序的水位生成策略。
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> { /** 重点,需要在实现WatermarkStrategy时,实现此方法 */ @Override WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context); @Override default TimestampAssigner<T> createTimestampAssigner( TimestampAssignerSupplier.Context context) { // 提供了默认实现,用于类似于kafka等数据源时 return new RecordTimestampAssigner<>(); } static <T> WatermarkStrategy<T> forMonotonousTimestamps() { return (ctx) -> new AscendingTimestampsWatermarks<>(); } static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) { return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness); }}AscendingTimestampsWatermarks是BoundedOutOfOrdernessWatermarks的特例(outOfOrdernessMillis=0)。
由于WatermarkGenerator负责水位的具体生成,因此从该接口中寻找真相,接口内容如下,包含两个方法。具体作用见注释
public interface WatermarkGenerator<T> { /** * 指定每个事件到达后,如何处理 */ void onEvent(T event, long eventTimestamp, WatermarkOutput output); /** * 周期调用,调用时处理水位。 * 调用时间间隔取决于ExecutionConfig#getAutoWatermarkInterval(),默认值为200ms */ void onPeriodicEmit(WatermarkOutput output);}public BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> { private long maxTimestamp; private final long outOfOrdernessMillis; public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) { checkNotNull(maxOutOfOrderness, "maxOutOfOrderness"); checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative"); this.outOfOrdernessMillis = maxOutOfOrderness.toMillis(); // 默认最大时间是long最小值+允许的延迟时间+1,在允许延迟时间合理时,默认最大时间可以认为时long的最小值 this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1; } @Override public void onEvent(T event, long eventTimestamp, WatermarkOutput output) { maxTimestamp = Math.max(maxTimestamp, eventTimestamp); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); }}从BoundedOutOfOrdernessWatermark的实现中可知,每个事件到达后,仅获取当前最大的时间戳。而水位是通过周期方法来生成的,默认情况下200ms生成一次水位。水位值=当前事件的最大时间戳-允许延迟的事件值-1。
到此真相就找到了,水位是根据当前已到事件的最大时间决定的,当后续无更大的时间的事件到之前,水位将暂停。由于水位值无法增加,当窗口到达自然时间点时,无法触发。
示例代码
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = EnvConf.getEnv(); env.setParallelism(1); DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999); DataStream<SensorRecord> inputStream = source.map(line -> { String[] fields = line.split(","); return new SensorRecord(fields[0], new Double(fields[1])); }); WatermarkStrategy<SensorRecord> watermarkStrategy = WatermarkStrategy // 单调递增的水位生成器 .<SensorRecord>forMonotonousTimestamps() // 时间戳生成器,时间戳和 watermark 都是从 1970-01-01T00:00:00Z 起的 Java 纪元开始,并以毫秒为单位。 .withTimestampAssigner((event, ts) -> event.getTs()); SingleOutputStreamOperator<SensorRecord> input = inputStream.assignTimestampsAndWatermarks(watermarkStrategy); SingleOutputStreamOperator<Tuple2<String, Integer>> result = input.keyBy(SensorRecord::getId) .window(TumblingEventTimeWindows.of(Time.hours(1))) .aggregate(new AggregateFunction<SensorRecord, Tuple2<String, Integer>, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> createAccumulator() { return null; } @Override public Tuple2<String, Integer> add(SensorRecord value, Tuple2<String, Integer> accumulator) { return null; } @Override public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) { return null; } @Override public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) { return null; } }); result.print(); env.execute();}作者:有数的编程笔记链接:https://juejin.cn/post/7354940462061846569