关于Flink Checkpoint 参数详解

在工作中,使用到了flink的 Checkpoint 配置操作

遇到了个问题,业务逻辑是:使用flink流 处理数据后,放入kafka中,

但是在flink程序 kafka流消费时,发现只有100条的,现在有 > 100条数据进行消费,

查询问题后,发现flink 的 Checkpoint 配置含义,如下:
 

什么是 checkpoint

保存状态

Checkpoint 参数详解

 

 

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每 60s 做一次 checkpoint env.enableCheckpointing(60000); // 高级配置: // checkpoint 语义设置为 EXACTLY_ONCE,这是默认语义 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 两次 checkpoint 的间隔时间至少为 1 s,默认是 0,立即进行下一次 checkpoint env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); // checkpoint 必须在 60s 内结束,否则被丢弃,默认是 10 分钟 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只能允许有一个 checkpoint env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 最多允许 checkpoint 失败 3 次 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 当 Flink 任务取消时,保留外部保存的 checkpoint 信息 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 当有较新的 Savepoint 时,作业也会从 Checkpoint 处恢复 env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 允许实验性的功能:非对齐的 checkpoint,以提升性能 env.getCheckpointConfig().enableUnalignedCheckpoints();

相关参数的文字描述:

  1. env.enableCheckpointing(60000),1 分钟触发一次 checkpoint;
  2. setCheckpointTimeout,checkpoint 超时时间,默认是 10 分钟超时,超过了超时时间就会被丢弃;
  3. setCheckpointingMode,设置 checkpoint 语义,可以设置为 EXACTLY_ONCE,表示既不重复消费也不丢数据;AT_LEAST_ONCE,表示至少消费一次,可能会重复消费;
  4. setMinPauseBetweenCheckpoints,两次 checkpoint 之间的间隔时间。假如设置每分钟进行一次 checkpoint,两次 checkpoint 间隔时间为 30s。假设某一次 checkpoint 耗时 40s,那么理论上20s 后就要进行一次 checkpoint,但是设置了两次 checkpoint 之间的间隔时间为 30s,所以是 30s 之后才会进行 checkpoint。另外,如果配置了该参数,那么同时进行的 checkpoint 数量只能为 1;
  5. enableExternalizedCheckpoints,Flink 任务取消后,外部 checkpoint 信息是否被清理。
  • DELETE_ON_CANCELLATION,任务取消后,所有的 checkpoint 都将会被清理。只有在任务失败后,才会被保留;
  • RETAIN_ON_CANCELLATION,任务取消后,所有的 checkpoint 都将会被保留,需要手工清理。
  1. setPreferCheckpointForRecovery,恢复任务时,是否从最近一个比较新的 savepoint 处恢复,默认是 false;
  2. enableUnalignedCheckpoints,是否开启试验性的非对齐的 checkpoint,可以在反压情况下极大减少 checkpoint 的次数;

在以前,在进行对齐的过程中,算子是不会再接着处理数据了,一定要等到对齐动作完成之后,才能继续对齐

在 Flink 1.11 版本中,引入了一个 Unaligned Checkpointing 的模块,主要功能是,在 barrier 到达之后,不必等待所有的输入流的 barrier,而是继续处理数据

以上文章描述,是参考以下链接文章。

zhttps://www.cnblogs.com/weijiqian/p/14159326.html#:~:text=setCheckpointingMode%EF%BC%8C%E8%AE%BE%E7%BD%AE%20checkpoint%20%E8%AF%AD%E4%B9%89%EF%BC%8C%E5%8F%AF%E4%BB%A5%E8%AE%BE%E7%BD%AE%E4%B8%BA,EXACTLY_ONCE%EF%BC%8C%E8%A1%A8%E7%A4%BA%E6%97%A2%E4%B8%8D%E9%87%8D%E5%A4%8D%E6%B6%88%E8%B4%B9%E4%B9%9F%E4%B8%8D%E4%B8%A2%E6%95%B0%E6%8D%AE%EF%BC%9BAT_LEAST_ONCE%EF%BC%8C%E8%A1%A8%E7%A4%BA%E8%87%B3%E5%B0%91%E6%B6%88%E8%B4%B9%E4%B8%80%E6%AC%A1%EF%BC%8C%E5%8F%AF%E8%83%BD%E4%BC%9A%E9%87%8D%E5%A4%8D%E6%B6%88%E8%B4%B9%EF%BC%9B%20setMinPauseBetweenCheckpoints%EF%BC%8C%E4%B8%A4%E6%AC%A1%20checkpoint%20%E4%B9%8B%E9%97%B4%E7%9A%84%E9%97%B4%E9%9A%94%E6%97%B6%E9%97%B4%E3%80%82