Flink原理:定时器

1. 用途

Flink定时器存在于窗口的触发,TTL等诸多用途,因此搞清楚其原理对于理解这些知识点至关重要。

2. 示例

在flink实时处理中,涉及到延时处理可使用KeyedProcessFunction来完成,KeyedProcessFunction是flink提供面向用户的low level api,可以访问状态、当前的watermark或者当前的processingtime, 更重要的是提供了注册定时器的功能,分为:

  1. 注册处理时间定时器,直到系统的processingTime超过了注册的时间就会触发定时任务
  2. 注册事件时间定时器,直到watermark值超过了注册的时间就会触发定时任务另外也可以删除已经注册的定时器。

示例代码如下:

// 创建bean类CountWithTimestamp,里面有三个字段
package com.bolingcavalry.keyedprocessfunction;

public class CountWithTimestamp {
    public String key;
    public long count;
    public long lastModified;
}
// 创建FlatMapFunction的实现类Splitter,作用是将字符串分割后生成多个Tuple2实例,f0是分隔后的单词,f1等于1:
package com.bolingcavalry;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;

public class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {

        if(StringUtils.isNullOrWhitespaceOnly(s)) {
            System.out.println("invalid line");
            return;
        }

        for(String word : s.split(" ")) {
            collector.collect(new Tuple2<String, Integer>(word, 1));
        }
    }
}
// 最后是整个逻辑功能的主体:ProcessTime.java,这里面有自定义的KeyedProcessFunction子类,还有程序入口的main方法
package com.bolingcavalry.keyedprocessfunction;

import com.bolingcavalry.Splitter;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;


/**
 * @author will
 * @email zq2599@gmail.com
 * @date 2020-05-17 13:43
 * @description 体验KeyedProcessFunction类(时间类型是处理时间)
 */
public class ProcessTime {

    /**
     * KeyedProcessFunction的子类,作用是将每个单词最新出现时间记录到backend,并创建定时器,
     * 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
     */
    static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {

        // 自定义状态
        private ValueState<CountWithTimestamp> state;

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化状态,name是myState
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }

        @Override
        public void processElement(
                Tuple2<String, Integer> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // 取得当前是哪个单词
            Tuple currentKey = ctx.getCurrentKey();

            // 从backend取得当前单词的myState状态
            CountWithTimestamp current = state.value();

            // 如果myState还从未没有赋值过,就在此初始化
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }

            // 单词数量加一
            current.count++;

            // 取当前元素的时间戳,作为该单词最后一次出现的时间
            current.lastModified = ctx.timestamp();

            // 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
            state.update(current);

            // 为当前单词创建定时器,十秒后后触发
            long timer = current.lastModified + 10000;

            ctx.timerService().registerProcessingTimeTimer(timer);

            // 打印所有信息,用于核对数据正确性
            System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",
                    currentKey.getField(0),
                    current.count,
                    current.lastModified,
                    time(current.lastModified),
                    timer,
                    time(timer)));

        }

        /**
         * 定时器触发后执行的方法
         * @param timestamp 这个时间戳代表的是该定时器的触发时间
         * @param ctx
         * @param out
         * @throws Exception
         */
        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {

            // 取得当前单词
            Tuple currentKey = ctx.getCurrentKey();

            // 取得该单词的myState状态
            CountWithTimestamp result = state.value();

            // 当前元素是否已经连续10秒未出现的标志
            boolean isTimeout = false;

            // timestamp是定时器触发时间,如果等于最后一次更新时间+10秒,就表示这十秒内已经收到过该单词了,
            // 这种连续十秒没有出现的元素,被发送到下游算子
            if (timestamp == result.lastModified + 10000) {
                // 发送
                out.collect(new Tuple2<String, Long>(result.key, result.count));

                isTimeout = true;
            }

            // 打印数据,用于核对是否符合预期
            System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",
                    currentKey.getField(0),
                    result.count,
                    result.lastModified,
                    time(result.lastModified),
                    timestamp,
                    time(timestamp),
                    String.valueOf(isTimeout)));
        }
    }


    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度1
        env.setParallelism(1);

       // 处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        // 监听本地9999端口,读取字符串
        DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);

        // 所有输入的单词,如果超过10秒没有再次出现,都可以通过CountWithTimeoutFunction得到
        DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream
                // 对收到的字符串用空格做分割,得到多个单词
                .flatMap(new Splitter())
                // 设置时间戳分配器,用当前时间作为时间戳
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Integer>>() {

                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long previousElementTimestamp) {
                        // 使用当前系统时间作为时间戳
                        return System.currentTimeMillis();
                    }

                    @Override
                    public Watermark getCurrentWatermark() {
                        // 本例不需要watermark,返回null
                        return null;
                    }
                })
                // 将单词作为key分区
                .keyBy(0)
                // 按单词分区后的数据,交给自定义KeyedProcessFunction处理
                .process(new CountWithTimeoutFunction());

        // 所有输入的单词,如果超过10秒没有再次出现,就在此打印出来
        timeOutWord.print();

        env.execute("ProcessFunction demo : KeyedProcessFunction");
    }

    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }
}

3. 原理

在这里插入图片描述
上图表示flink延时调用的总体流程,其设计也是借助于优先级队列(小顶堆)来完成,堆使用二叉树实现,而二叉树使用数组存储。队列中存储的数据结构如下:

@Internal
public final class TimerHeapInternalTimer<K, N> implements InternalTimer<K, N>, HeapPriorityQueueElement {

	// Key 表示KeyedStream中提取的Key
	@Nonnull
	private final K key;

	// Namespace 表示命名空间,在普通的KeyedStream中是固定的VoidNamespace,在WindowedStream表示的是Window
	@Nonnull
	private final N namespace;

	// Timestamp表示触发的时间戳,在优先级队列中升序排序               
	// 由于该类重写了equals方法,在插入队列,即使尝试重复插入相同的TimerHeapInternalTimer对象多次,也会确保只有一个TimerHeapInternalTimer对象入队成功。详情见下文。
	private final long timestamp;

	@Override
	public boolean equals(Object o) {
		if (this == o) {
			return true;
		}

		if (o instanceof InternalTimer) {
			InternalTimer<?, ?> timer = (InternalTimer<?, ?>) o;
			return timestamp == timer.getTimestamp()
				&& key.equals(timer.getKey())
				&& namespace.equals(timer.getNamespace());
		}

		return false;
	}
}
3.1 注册
  • ProcessingTime类型注册使用registerProcessingTimeTimer,传入的是一个触发的时间戳,内部会将获取到当前的Key、VoidNamespace 、timestamp封装成为一个InternalTimer对象存入优先级队列(小顶堆)中。并且会针对堆顶元素,使用ScheduledThreadPoolExecutor注册一个堆顶元素触发时间与当前时间差值大小的延时调用;
  • EventTime类型注册使用registerEventTimeTimer,与ProcessingTime类型注册不同的是不需要做延时调用,并且二者使用的是不同的队列

下面以TimerService#registerProcessingTimeTimer为入口,分析一下基于process time的定时器的入队过程:

// 自定义KeyedProcessFunction
static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {
		// 每来一条数据,都会经过此方法进行处理
       @Override
        public void processElement(
        		Tuple2<String, Integer> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
        ...
	        long timer = current.lastModified + 10000;
	        // 注册定时器
			ctx.timerService().registerProcessingTimeTimer(timer);
		...
		}

		// 定时器回调函数
       @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
		...
		CountWithTimestamp result = state.value();
		// 定时器到达时间,则往下游发送数据
		out.collect(new Tuple2<String, Long>(result.key, result.count));
		...
		}

}
@Internal
public class SimpleTimerService implements TimerService {
	@Override
	public void registerProcessingTimeTimer(long time) {
		// 注册定时器
		internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, time);
	}
}
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N> {
	@Override
	public void registerProcessingTimeTimer(N namespace, long time) {
		InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
		// 定时器入队
		// 一旦if条件满足,则证明入队成功且入队的是小顶堆的堆顶元素,要针对小顶堆堆顶元素创建延迟调用
		if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
			long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
			// check if we need to re-schedule our timer to earlier
			if (time < nextTriggerTime) {
				if (nextTimer != null) {
					nextTimer.cancel(false);
				}
				nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
			}
		}
	}
}
public class HeapPriorityQueueSet<T extends HeapPriorityQueueElement>
	extends HeapPriorityQueue<T>
	implements KeyGroupedInternalPriorityQueue<T> {
	@Override
	public boolean add(@Nonnull T element) {
		// 定时器入队
		// 获取keygroup对应的hashmap,并往其中插入定时器,如果key、namespace、time均相同,则不让其入队
		// 这里一旦super.add(element)返回true,则表明当前插入的是小顶堆的堆顶元素,需要针对堆顶元素建立延迟
		return getDedupMapForElement(element).putIfAbsent(element, element) == null && super.add(element);
	}
	// keyedstream在shuffle时,是将一批key放入到一个key group中,然后根据key group进行shuffle的
	// 该方法拿到当前key所在keygroup,一个keygroup维护了一个hashmap,获取该hashmap
	private HashMap<T, T> getDedupMapForElement(T element) {
		int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(
			keyExtractor.extractKeyFromElement(element),
			totalNumberOfKeyGroups);
		return getDedupMapForKeyGroup(keyGroup);
	}

}
public abstract class AbstractHeapPriorityQueue<T extends HeapPriorityQueueElement> implements InternalPriorityQueue<T> {
	@Override
	public boolean add(@Nonnull T toAdd) {
		// 定时器入队
		addInternal(toAdd);
		// 如果入队后的定时器是堆顶节点,则返回true,后面的逻辑会根据这里是否返回true,来判断是否需要建立ScheduledThreadPoolExecutor延迟调用;换言之,延迟调用只会根据堆顶节点来建立
		return toAdd.getInternalIndex() == getHeadElementIndex();
	}
}
public class HeapPriorityQueue<T extends HeapPriorityQueueElement>
	extends AbstractHeapPriorityQueue<T> {
	// 定时器入队
	@Override
	protected void addInternal(@Nonnull T element) {
		final int newSize = increaseSizeByOne();
		// 先将定时器插入数组
		moveElementToIdx(element, newSize);
		// 然后对有序的队列进行siftup操作,以保持小顶堆的特性
		// 这里有个有意思的现象,由于小顶堆按照定时器中的时间戳来比大小,而后来的定时器时间必然大于先来的定时器时间,底层数组一直就是一个单调递增的序列。因此,siftup操作其实并没有做任何调整,天然能保持小顶堆的特性。
		siftUp(newSize);
	}
}

经过以上步骤,就完成了定时器的入队操作。如果入队的是小顶堆的堆顶元素,则需要针对其创建延迟调用。代码如下:

public class SystemProcessingTimeService implements TimerService {
	@Override
	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {
		// 这里延迟调用的时间颇有讲究,并非是用定时器时间减去当前时间这么简单,而是要将相减的值再加1ms
		// 这么做是为了与watemark的语义保持一致(虽然基于processtime的定时器用不到watermark)
		// 例如,在窗口 [20000, 30000)中,30000这个时间点是不会触发窗口计算的,只有当watermark至少为30001时,才会触发窗口操作。有兴趣的同学可以看一下该方法源码中的注释。
		long delay = ProcessingTimeServiceUtil.getProcessingTimeDelay(timestamp, getCurrentProcessingTime());

		// we directly try to register the timer and only react to the status on exception
		// that way we save unnecessary volatile accesses for each timer
		try {
		    // 这里wrapOnTimerCallback(callback, timestamp)中的一波lambda操作秀我一脸,看似复杂,其实直接看作是callback即可
		    // 往上追溯一下,你就会发现callback其实就是InternalTimerServiceImpl#onProcessingTime方法
			return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
		}
		catch (RejectedExecutionException e) {
			final int status = this.status.get();
			if (status == STATUS_QUIESCED) {
				return new NeverCompleteFuture(delay);
			}
			else if (status == STATUS_SHUTDOWN) {
				throw new IllegalStateException("Timer service is shut down");
			}
			else {
				// something else happened, so propagate the exception
				throw e;
			}
		}
	}

}

到这里,保存定时器的小顶堆维护好了,基于堆顶元素触发时间创建的延时调用也创建好了,接下来到时间就会触发回调函数了。

3.2 触发
  • ProcessingTime类型的定时触发由注册的时候的延时调度触发,会不断从小顶堆堆顶弹出定时器,触发KeyedProcessFunction#onTimer方法,onTimer方法中可以从上下文OnTimerContext中获取到当前的key以及触发时间,有了key就可以从ValueState中提取出当前key对应的值(ValueState是一个散列表,其根据上下文中key获取value的逻辑对用户不可见,详情可以参考《Flink源码剖析:ValueState》),进而进行某些计算。
    当获取到InternalTimer对象中的时间大于延时调度时间,停止弹出定时器并处罚onTimer方法,重新针对堆顶元素建立新的延迟调用
public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>, ProcessingTimeCallback {

  @Override
	public void onProcessingTime(long time) throws Exception {
		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
		// inside the callback.
		nextTimer = null;

		InternalTimer<K, N> timer;
		// 从小顶堆堆顶依次弹出到达时间的定时器,调用用户自定义的KeyedProcessFunction#onTimer方法
		// 一旦堆顶元素不满足触发时间,则重新针对堆顶元素建立延迟调用
		while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
			processingTimeTimersQueue.poll();
			keyContext.setCurrentKey(timer.getKey());
			triggerTarget.onProcessingTime(timer);
		}

    // 这段逻辑调用processingTimeService实现类SystemProcessingTimeService中的registerTimer方法,该方法中将上次遍历中的最后一个timer的触发时间注册到ScheduledThreadPoolExecutor线程池中,实现再次延迟调用当前  InternalTimerServiceImpl#onProcessingTime,以此实现while逻辑的不断执行,即优先级队列的不断遍历
		if (timer != null && nextTimer == null) {
			nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this);
		}
	}
  
} 
  • EventTime类型的定时器触发是由Watermark决定的,同样会不断遍历优先级队列触发任务,直到获取到InternalTimer对象中的时间大于Watermark值;
3.3 延迟队列state

为了保证任务重启仍然能够执行未完成的延时调用,flink会在checkpoint过程中将优先级队列中的数据一起持久化到hdfs上,待下次任务重启仍然能够获取到这部分数据。由于EventTime类型定时器是由Watermark,那么只要任务产生watermark就能正常触发恢复的定时任务,但是ProcessingTime类型的定时器是由系统注册的延时调度来触发,所以在重启的时候获取到队列中第一个元素来注册延时调度,保证其恢复之后的正常触发。

3.4 定时器注意事项
  1. 优先级队列默认使用的是内存存储,在一些数据量比较大并且重度依赖定时触发的任务会占用比较大的内存,可以选择Rocksdb存储定时信息

  2. flink为了保证定时触发操作(onTimer)与正常处理(processElement)操作的线程安全,做了同步处理,在调用触发时必须要获取到锁,也就是二者同时只能有一个执行,因此一定要保证onTimer处理的速度,以免任务发生阻塞。

    如果不做同步处理,processElement方法中会进行state.update(),onTimer中会进行state.value(),两者会发生不一致从而引发线程安全问题

参考:
https://blog.csdn.net/u013516966/article/details/102927825
https://mp.weixin.qq.com/s/kRpG2lQRgvIi7VHBBXV-KQ
https://www.jianshu.com/p/95b168bea405
https://www.jianshu.com/p/502f9952c09b
https://blog.csdn.net/m0_37637141/article/details/81775082