基于Redis实现延时任务

延时任务

1.实现技术栈

  • Redis
  • MQ

我们选用Redis

2.创建表

  • 表1 任务日志表
  • 表2 任务表
CREATE TABLE `taskinfo_logs`  (
  `task_id` bigint(20) NOT NULL COMMENT '任务id',
  `execute_time` datetime(3) NOT NULL COMMENT '执行时间',
  `parameters` longblob NULL COMMENT '参数',
  `priority` int(11) NOT NULL COMMENT '优先级',
  `task_type` int(11) NOT NULL COMMENT '任务类型',
  `version` int(11) NOT NULL COMMENT '版本号,用乐观锁',
  `status` int(11) NULL DEFAULT 0 COMMENT '状态 0=初始化状态 1=EXECUTED 2=CANCELLED',
  PRIMARY KEY (`task_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;


CREATE TABLE `taskinfo`  (
  `task_id` bigint(20) NOT NULL COMMENT '任务id',
  `execute_time` datetime(3) NOT NULL COMMENT '执行时间',
  `parameters` longblob NULL COMMENT '参数',
  `priority` int(11) NOT NULL COMMENT '优先级',
  `task_type` int(11) NOT NULL COMMENT '任务类型',
  PRIMARY KEY (`task_id`) USING BTREE,
  INDEX `index_taskinfo_time`(`task_type`, `priority`, `execute_time`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

3.操作

1. 添加任务

1.1.代码
@Autowired
private CacheService cacheService;

@Override
public long addTask(Task task) {
    //1.添加任务到数据库中

    boolean success = addTaskToDb(task);

    if (success) {
        //2.添加任务到redis
        addTaskToCache(task);
    }
    return task.getTaskId();
}

/**
 * 添加任务到数据库中
 *
 * @param task
 * @return
 */
private boolean addTaskToDb(Task task) {
    boolean flag = false;
    try {
        //保存任务表
        Taskinfo taskinfo = new Taskinfo();
        BeanUtils.copyProperties(task, taskinfo);
        taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
        taskinfoMapper.insert(taskinfo);
        //设置taskID
        task.setTaskId(taskinfo.getTaskId());
        //保存任务日志数据
        TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
        BeanUtils.copyProperties(taskinfo, taskinfoLogs);
        taskinfoLogs.setVersion(1);
        taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
        taskinfoLogsMapper.insert(taskinfoLogs);
        flag = true;
    } catch (Exception e) {
        e.printStackTrace();
    }
    return flag;
}
/**
 * 把任务添加到redis中
 *
 * @param task
 */
private void addTaskToCache(Task task) {
    String key = task.getTaskType() + "_" + task.getPriority();
    //获取5分钟之后的时间  毫秒值
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);
    long nextScheduleTime = calendar.getTimeInMillis();
    //2.1 如果任务的执行时间小于等于当前时间,存入list
    if (task.getExecuteTime() <= System.currentTimeMillis()) {
        cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
    } else if (task.getExecuteTime() <= nextScheduleTime) {
        //2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
        cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
    }
}
1.2 逻辑

在这里插入图片描述

2.删除任务

2.1代码
@Override
public boolean cancelTask(long taskId) {
    //删除任务,更新任务日志
    Task task = updateDb(taskId, ScheduleConstants.EXECUTED);


    //删除Redis数据
    if (task != null) {
        removeTaskFromCache(task);
    }

    return false;
}


private void removeTaskFromCache(Task task) {
    String key = task.getTaskType() + "_" + task.getPriority();
    if (task.getExecuteTime() <= System.currentTimeMillis()) {
        cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
    } else {
        cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
    }
}

private Task updateDb(long taskId, int status) {
    Task task = null;
    try {
        //删除任务
        taskinfoMapper.deleteById(taskId);

        TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
        taskinfoLogs.setStatus(status);
        taskinfoLogsMapper.updateById(taskinfoLogs);

        task = new Task();
        BeanUtils.copyProperties(taskinfoLogs, task);
        task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
    } catch (Exception e) {
        log.error("task cancel exception taskid={}", taskId);
    }

    return task;

}

3.定时刷新任务

3.1代码
@Autowired
private CacheService cacheService;

/**
 * 定时刷新任务
 */
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {

    String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
    if (StringUtils.isNotBlank(token)) {
        log.info("未来数据定时刷新---定时任务");

        //获取所有未来数据的集合key
        Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
        for (String futureKey : futureKeys) {//future_100_50

            //获取当前数据的key  topic
            String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];

            //按照key和分值查询符合条件的数据
            Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());

            //同步数据
            if (!tasks.isEmpty()) {
                cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
                log.info("成功的将" + futureKey + "刷新到了" + topicKey);
            }
        }
    }
}

4.定时拉取任务

4.1 代码
@Override
public Task poll(int type, int priority) {
    Task task = null;
    try {
        String key = type + "_" + priority;
        //从Redis中拉去数据
        String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
        if (StringUtils.isNotBlank(task_json)) {
            task = JSON.parseObject(task_json, Task.class);
            //修改数据库信息
            updateDb(task.getTaskId(), ScheduleConstants.EXECUTED);
        }
    } catch (Exception e) {
        e.printStackTrace();
        log.error("poll task exception");

    }
    return task;

}

@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
    clearCache();
    log.info("数据库数据同步到缓存");
    Calendar calendar = Calendar.getInstance();
    calendar.add(Calendar.MINUTE, 5);

    //查看小于未来5分钟的所有任务
    List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
    if (allTasks != null && allTasks.size() > 0) {
        for (Taskinfo taskinfo : allTasks) {
            Task task = new Task();
            BeanUtils.copyProperties(taskinfo, task);
            task.setExecuteTime(taskinfo.getExecuteTime().getTime());
            addTaskToCache(task);
        }
    }
}

private void clearCache() {
    // 删除缓存中未来数据集合和当前消费者队列的所有key
    Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
    Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
    cacheService.delete(futurekeys);
    cacheService.delete(topickeys);
}
4.2 开启调度事务
@EnableScheduling //开启调度任务
public class ScheduleApplication {
}

4.使用Feign调度

4.1Feign暴露接口

import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;

@FeignClient("leadnews-schedule")
public interface IScheduleClient {

    /**
     * 添加任务
     * @param task   任务对象
     * @return       任务id
     */
    @PostMapping("/api/v1/task/add")
    public ResponseResult  addTask(@RequestBody Task task);

    /**
     * 取消任务
     * @param taskId        任务id
     * @return              取消结果
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId);

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority")  int priority);
}

4.2 实现接口

import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;


@RestController
public class ScheduleClient  implements IScheduleClient {
    @Autowired
    private TaskService taskService;

    /**
     * 添加任务
     * @param task 任务对象
     * @return 任务id
     */
    @PostMapping("/api/v1/task/add")
    @Override
    public ResponseResult addTask(@RequestBody Task task) {
        return ResponseResult.okResult(taskService.addTask(task));
    }

    /**
     * 取消任务
     * @param taskId 任务id
     * @return 取消结果
     */
    @GetMapping("/api/v1/task/cancel/{taskId}")
    @Override
    public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
        return ResponseResult.okResult(taskService.cancelTask(taskId));
    }

    /**
     * 按照类型和优先级来拉取任务
     * @param type
     * @param priority
     * @return
     */
    @GetMapping("/api/v1/task/poll/{type}/{priority}")
    @Override
    public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
        return ResponseResult.okResult(taskService.poll(type,priority));
    }
}

4.3 使用

@Resource
IScheduleClient scheduleClient;

@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {


    log.info("添加任务到延迟服务中----begin");

    Task task = new Task();
    task.setExecuteTime(publishTime.getTime());
    task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
    task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
    WmNews wmNews = new WmNews();
    wmNews.setId(id);
    task.setParameters(ProtostuffUtil.serialize(wmNews));

    scheduleClient.addTask(task);

    log.info("添加任务到延迟服务中----end");

}

在配置类中添加注解

@EnableAsync  //开启异步调用
@EnableFeignClients(basePackages = "com.heima.apis.*")
public class WemediaApplication {

因为通过异步调用,所以要开启事务,并且Feign要扫描到