Flink之数据擦除及自定义Evictor

1 窗口数据移除机制

Flink中窗口数据移除机制是通过Evictor来控制的, Flink内置的Evictor如下:
  • DeltaEvictor
  • TimeEvictor
  • CountEvictor
Evictor的作用就是在窗口触发前或窗口触发中将其中的某些数据进行移除.
1.1 源码解析

关于Evictor的源码只需要关注三个方法就可以了evictBefore,evictAfter,evict.

源码内容如下:

public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
    // ...

    // doEvictAfter默认为false
    // 窗口计算前擦除数据
    @Override
    public void evictBefore(
            Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (!doEvictAfter) {
            // 执行数据擦除方法
            evict(elements, size, ctx);
        }
    }

    // 窗口计算后擦除数据
    @Override
    public void evictAfter(
            Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
        if (doEvictAfter) {
            // 执行数据擦除方法
            evict(elements, size, ctx);
        }
    }

    // 擦除数据逻辑
    private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
        if (!hasTimestamp(elements)) {
            return;
        }

        long currentTime = getMaxTimestamp(elements);
        long evictCutoff = currentTime - windowSize;

        // 遍历迭代器中的数据
        for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                iterator.hasNext(); ) {
            TimestampedValue<Object> record = iterator.next();
            // 判断本条数据的事件时间是否 <= 移除截止点
            if (record.getTimestamp() <= evictCutoff) {
                iterator.remove(); // 移除数据
            }
        }
    }

    // ...
}

注释中对于evictBefore,evictAfter,evict这三个方法都进行了解释,这里就不细说了.

1.2 代码实现
  • 自定义Evictor

    public class CustomEvictor extends TimeEvictor<TimeWindow> {
        private long size;
        private boolean isAfter;
    
        public CustomEvictor(long windowSize) {
            super(windowSize);
            this.size = windowSize;
        }
    
        public CustomEvictor(long windowSize, boolean doEvictAfter) {
            super(windowSize, doEvictAfter);
            this.size = windowSize;
            this.isAfter = doEvictAfter;
        }
    
        @Override
        public void evictBefore(
                Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
            if (!isAfter) {
                evict(elements, size, ctx);
            }
        }
    
        @Override
        public void evictAfter(
                Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) {
            if (isAfter) {
                evict(elements, size, ctx);
            }
        }
    
        /**
         * @Param elements
         * @Param size
         * @Param ctx
         * @return void
         * @Description TODO 真正的处理逻辑
        **/
        private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
            if (!hasTimestamp(elements)) {
                return;
            }
    
            // 获取当前最大事件时间
            long currentTime = getMaxTimestamp(elements);
            // 移除截止点
            long evictCutoff = currentTime - this.size;
    
            for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                 iterator.hasNext(); ) {
                TimestampedValue<Object> record = iterator.next();
    
                // 将数据强转,便于获取ID进行判断
                UserEvent2 userEvent2 = (UserEvent2) record.getValue();
    
                // 判断本条数据的事件时间是否 <= 移除截止点
                // 这里增加一个判断逻辑,当数据中的ID为1001时,将数据移除
                if (record.getTimestamp() <= evictCutoff || userEvent2.getUId().equals("1001")) {
                    iterator.remove();
                }
            }
        }
    
    
        /**
         * @Param elements
         * @return boolean
         * @Description TODO 这个也是TimeEvictor中的方法,直接复制过来即可
        **/
        private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
            Iterator<TimestampedValue<Object>> it = elements.iterator();
            if (it.hasNext()) {
                return it.next().hasTimestamp();
            }
            return false;
        }
    
    
        /**
         * @Param elements
         * @return long
         * @Description TODO TODO 这个也是TimeEvictor中的方法,直接复制过来即可
        **/
        private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
            long currentTime = Long.MIN_VALUE;
            for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
                 iterator.hasNext(); ) {
                TimestampedValue<Object> record = iterator.next();
                currentTime = Math.max(currentTime, record.getTimestamp());
            }
            return currentTime;
        }
    }
    
  • 业务代码

    public class FlinkWindowDataDelay {
        public static void main(String[] args) throws Exception {
            // ...
            SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream
                    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口,窗口大小10s, 滑动步长5s
                    .evictor(new CustomEvictor(10 * 1000, false)) // 添加自定义Evictor
                    .max("time");// 获取用户行为发生事件最大的这条数据
    
            // ...
            env.execute();
        }
    }
    

    具体数据擦除的逻辑根据实际业务规则而定.