基于Redis实现延时任务
延时任务
1.实现技术栈
- Redis
- MQ
我们选用Redis
2.创建表
- 表1 任务日志表
- 表2 任务表
CREATE TABLE `taskinfo_logs` (
`task_id` bigint(20) NOT NULL COMMENT '任务id',
`execute_time` datetime(3) NOT NULL COMMENT '执行时间',
`parameters` longblob NULL COMMENT '参数',
`priority` int(11) NOT NULL COMMENT '优先级',
`task_type` int(11) NOT NULL COMMENT '任务类型',
`version` int(11) NOT NULL COMMENT '版本号,用乐观锁',
`status` int(11) NULL DEFAULT 0 COMMENT '状态 0=初始化状态 1=EXECUTED 2=CANCELLED',
PRIMARY KEY (`task_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
CREATE TABLE `taskinfo` (
`task_id` bigint(20) NOT NULL COMMENT '任务id',
`execute_time` datetime(3) NOT NULL COMMENT '执行时间',
`parameters` longblob NULL COMMENT '参数',
`priority` int(11) NOT NULL COMMENT '优先级',
`task_type` int(11) NOT NULL COMMENT '任务类型',
PRIMARY KEY (`task_id`) USING BTREE,
INDEX `index_taskinfo_time`(`task_type`, `priority`, `execute_time`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
3.操作
1. 添加任务
1.1.代码
@Autowired
private CacheService cacheService;
@Override
public long addTask(Task task) {
//1.添加任务到数据库中
boolean success = addTaskToDb(task);
if (success) {
//2.添加任务到redis
addTaskToCache(task);
}
return task.getTaskId();
}
/**
* 添加任务到数据库中
*
* @param task
* @return
*/
private boolean addTaskToDb(Task task) {
boolean flag = false;
try {
//保存任务表
Taskinfo taskinfo = new Taskinfo();
BeanUtils.copyProperties(task, taskinfo);
taskinfo.setExecuteTime(new Date(task.getExecuteTime()));
taskinfoMapper.insert(taskinfo);
//设置taskID
task.setTaskId(taskinfo.getTaskId());
//保存任务日志数据
TaskinfoLogs taskinfoLogs = new TaskinfoLogs();
BeanUtils.copyProperties(taskinfo, taskinfoLogs);
taskinfoLogs.setVersion(1);
taskinfoLogs.setStatus(ScheduleConstants.SCHEDULED);
taskinfoLogsMapper.insert(taskinfoLogs);
flag = true;
} catch (Exception e) {
e.printStackTrace();
}
return flag;
}
/**
* 把任务添加到redis中
*
* @param task
*/
private void addTaskToCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
//获取5分钟之后的时间 毫秒值
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
long nextScheduleTime = calendar.getTimeInMillis();
//2.1 如果任务的执行时间小于等于当前时间,存入list
if (task.getExecuteTime() <= System.currentTimeMillis()) {
cacheService.lLeftPush(ScheduleConstants.TOPIC + key, JSON.toJSONString(task));
} else if (task.getExecuteTime() <= nextScheduleTime) {
//2.2 如果任务的执行时间大于当前时间 && 小于等于预设时间(未来5分钟) 存入zset中
cacheService.zAdd(ScheduleConstants.FUTURE + key, JSON.toJSONString(task), task.getExecuteTime());
}
}
1.2 逻辑
2.删除任务
2.1代码
@Override
public boolean cancelTask(long taskId) {
//删除任务,更新任务日志
Task task = updateDb(taskId, ScheduleConstants.EXECUTED);
//删除Redis数据
if (task != null) {
removeTaskFromCache(task);
}
return false;
}
private void removeTaskFromCache(Task task) {
String key = task.getTaskType() + "_" + task.getPriority();
if (task.getExecuteTime() <= System.currentTimeMillis()) {
cacheService.lRemove(ScheduleConstants.TOPIC + key, 0, JSON.toJSONString(task));
} else {
cacheService.zRemove(ScheduleConstants.FUTURE + key, JSON.toJSONString(task));
}
}
private Task updateDb(long taskId, int status) {
Task task = null;
try {
//删除任务
taskinfoMapper.deleteById(taskId);
TaskinfoLogs taskinfoLogs = taskinfoLogsMapper.selectById(taskId);
taskinfoLogs.setStatus(status);
taskinfoLogsMapper.updateById(taskinfoLogs);
task = new Task();
BeanUtils.copyProperties(taskinfoLogs, task);
task.setExecuteTime(taskinfoLogs.getExecuteTime().getTime());
} catch (Exception e) {
log.error("task cancel exception taskid={}", taskId);
}
return task;
}
3.定时刷新任务
3.1代码
@Autowired
private CacheService cacheService;
/**
* 定时刷新任务
*/
@Scheduled(cron = "0 */1 * * * ?")
public void refresh() {
String token = cacheService.tryLock("FUTURE_TASK_SYNC", 1000 * 30);
if (StringUtils.isNotBlank(token)) {
log.info("未来数据定时刷新---定时任务");
//获取所有未来数据的集合key
Set<String> futureKeys = cacheService.scan(ScheduleConstants.FUTURE + "*");
for (String futureKey : futureKeys) {//future_100_50
//获取当前数据的key topic
String topicKey = ScheduleConstants.TOPIC + futureKey.split(ScheduleConstants.FUTURE)[1];
//按照key和分值查询符合条件的数据
Set<String> tasks = cacheService.zRangeByScore(futureKey, 0, System.currentTimeMillis());
//同步数据
if (!tasks.isEmpty()) {
cacheService.refreshWithPipeline(futureKey, topicKey, tasks);
log.info("成功的将" + futureKey + "刷新到了" + topicKey);
}
}
}
}
4.定时拉取任务
4.1 代码
@Override
public Task poll(int type, int priority) {
Task task = null;
try {
String key = type + "_" + priority;
//从Redis中拉去数据
String task_json = cacheService.lRightPop(ScheduleConstants.TOPIC + key);
if (StringUtils.isNotBlank(task_json)) {
task = JSON.parseObject(task_json, Task.class);
//修改数据库信息
updateDb(task.getTaskId(), ScheduleConstants.EXECUTED);
}
} catch (Exception e) {
e.printStackTrace();
log.error("poll task exception");
}
return task;
}
@Scheduled(cron = "0 */5 * * * ?")
@PostConstruct
public void reloadData() {
clearCache();
log.info("数据库数据同步到缓存");
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.MINUTE, 5);
//查看小于未来5分钟的所有任务
List<Taskinfo> allTasks = taskinfoMapper.selectList(Wrappers.<Taskinfo>lambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));
if (allTasks != null && allTasks.size() > 0) {
for (Taskinfo taskinfo : allTasks) {
Task task = new Task();
BeanUtils.copyProperties(taskinfo, task);
task.setExecuteTime(taskinfo.getExecuteTime().getTime());
addTaskToCache(task);
}
}
}
private void clearCache() {
// 删除缓存中未来数据集合和当前消费者队列的所有key
Set<String> futurekeys = cacheService.scan(ScheduleConstants.FUTURE + "*");// future_
Set<String> topickeys = cacheService.scan(ScheduleConstants.TOPIC + "*");// topic_
cacheService.delete(futurekeys);
cacheService.delete(topickeys);
}
4.2 开启调度事务
@EnableScheduling //开启调度任务
public class ScheduleApplication {
}
4.使用Feign调度
4.1Feign暴露接口
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.schedule.dtos.Task;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@FeignClient("leadnews-schedule")
public interface IScheduleClient {
/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
public ResponseResult addTask(@RequestBody Task task);
/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
public ResponseResult cancelTask(@PathVariable("taskId") long taskId);
/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
public ResponseResult poll(@PathVariable("type") int type,@PathVariable("priority") int priority);
}
4.2 实现接口
import com.heima.apis.schedule.IScheduleClient;
import com.heima.model.common.dtos.ResponseResult;
import com.heima.model.common.schedule.dtos.Task;
import com.heima.schedule.service.TaskService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RestController
public class ScheduleClient implements IScheduleClient {
@Autowired
private TaskService taskService;
/**
* 添加任务
* @param task 任务对象
* @return 任务id
*/
@PostMapping("/api/v1/task/add")
@Override
public ResponseResult addTask(@RequestBody Task task) {
return ResponseResult.okResult(taskService.addTask(task));
}
/**
* 取消任务
* @param taskId 任务id
* @return 取消结果
*/
@GetMapping("/api/v1/task/cancel/{taskId}")
@Override
public ResponseResult cancelTask(@PathVariable("taskId") long taskId) {
return ResponseResult.okResult(taskService.cancelTask(taskId));
}
/**
* 按照类型和优先级来拉取任务
* @param type
* @param priority
* @return
*/
@GetMapping("/api/v1/task/poll/{type}/{priority}")
@Override
public ResponseResult poll(@PathVariable("type") int type, @PathVariable("priority") int priority) {
return ResponseResult.okResult(taskService.poll(type,priority));
}
}
4.3 使用
@Resource
IScheduleClient scheduleClient;
@Override
@Async
public void addNewsToTask(Integer id, Date publishTime) {
log.info("添加任务到延迟服务中----begin");
Task task = new Task();
task.setExecuteTime(publishTime.getTime());
task.setTaskType(TaskTypeEnum.NEWS_SCAN_TIME.getTaskType());
task.setPriority(TaskTypeEnum.NEWS_SCAN_TIME.getPriority());
WmNews wmNews = new WmNews();
wmNews.setId(id);
task.setParameters(ProtostuffUtil.serialize(wmNews));
scheduleClient.addTask(task);
log.info("添加任务到延迟服务中----end");
}
在配置类中添加注解
@EnableAsync //开启异步调用
@EnableFeignClients(basePackages = "com.heima.apis.*")
public class WemediaApplication {
因为通过异步调用,所以要开启事务,并且Feign要扫描到