Flink之数据擦除及自定义Evictor
1 窗口数据移除机制
Flink中窗口数据移除机制是通过Evictor来控制的, Flink内置的Evictor如下:
- DeltaEvictor
- TimeEvictor
- CountEvictor
1.1 源码解析
关于Evictor
的源码只需要关注三个方法就可以了evictBefore
,evictAfter
,evict
.
源码内容如下:
public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
// ...
// doEvictAfter默认为false
// 窗口计算前擦除数据
@Override
public void evictBefore(
Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (!doEvictAfter) {
// 执行数据擦除方法
evict(elements, size, ctx);
}
}
// 窗口计算后擦除数据
@Override
public void evictAfter(
Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
if (doEvictAfter) {
// 执行数据擦除方法
evict(elements, size, ctx);
}
}
// 擦除数据逻辑
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
if (!hasTimestamp(elements)) {
return;
}
long currentTime = getMaxTimestamp(elements);
long evictCutoff = currentTime - windowSize;
// 遍历迭代器中的数据
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
// 判断本条数据的事件时间是否 <= 移除截止点
if (record.getTimestamp() <= evictCutoff) {
iterator.remove(); // 移除数据
}
}
}
// ...
}
注释中对于evictBefore
,evictAfter
,evict
这三个方法都进行了解释,这里就不细说了.
1.2 代码实现
-
自定义Evictor
public class CustomEvictor extends TimeEvictor<TimeWindow> { private long size; private boolean isAfter; public CustomEvictor(long windowSize) { super(windowSize); this.size = windowSize; } public CustomEvictor(long windowSize, boolean doEvictAfter) { super(windowSize, doEvictAfter); this.size = windowSize; this.isAfter = doEvictAfter; } @Override public void evictBefore( Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) { if (!isAfter) { evict(elements, size, ctx); } } @Override public void evictAfter( Iterable<TimestampedValue<Object>> elements, int size, TimeWindow window, EvictorContext ctx) { if (isAfter) { evict(elements, size, ctx); } } /** * @Param elements * @Param size * @Param ctx * @return void * @Description TODO 真正的处理逻辑 **/ private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) { if (!hasTimestamp(elements)) { return; } // 获取当前最大事件时间 long currentTime = getMaxTimestamp(elements); // 移除截止点 long evictCutoff = currentTime - this.size; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Object> record = iterator.next(); // 将数据强转,便于获取ID进行判断 UserEvent2 userEvent2 = (UserEvent2) record.getValue(); // 判断本条数据的事件时间是否 <= 移除截止点 // 这里增加一个判断逻辑,当数据中的ID为1001时,将数据移除 if (record.getTimestamp() <= evictCutoff || userEvent2.getUId().equals("1001")) { iterator.remove(); } } } /** * @Param elements * @return boolean * @Description TODO 这个也是TimeEvictor中的方法,直接复制过来即可 **/ private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) { Iterator<TimestampedValue<Object>> it = elements.iterator(); if (it.hasNext()) { return it.next().hasTimestamp(); } return false; } /** * @Param elements * @return long * @Description TODO TODO 这个也是TimeEvictor中的方法,直接复制过来即可 **/ private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) { long currentTime = Long.MIN_VALUE; for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) { TimestampedValue<Object> record = iterator.next(); currentTime = Math.max(currentTime, record.getTimestamp()); } return currentTime; } }
-
业务代码
public class FlinkWindowDataDelay { public static void main(String[] args) throws Exception { // ... SingleOutputStreamOperator<UserEvent2> windowedStream = keyedStream .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) // 滑动窗口,窗口大小10s, 滑动步长5s .evictor(new CustomEvictor(10 * 1000, false)) // 添加自定义Evictor .max("time");// 获取用户行为发生事件最大的这条数据 // ... env.execute(); } }
具体数据擦除的逻辑根据实际业务规则而定.