分布式定时任务qutz利用redis 实现防重

2021/12/30 2:07:12

本文主要是介绍分布式定时任务qutz利用redis 实现防重,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1.设计思路:

①利用spring aop做定时任务的拦截

②利用redis实现注册中心

③利用redis ttl机制结合java代码实现心跳检测,机器淘汰,故障转移

2.拦截器代码

package cn.togeek.conf;

import cn.togeek.tools.UUIDUtil;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Aspect
@Component
public class ScheduledStatisticsHandler2 {
 
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Value("${server.port}")
    private String port;

    private String mId = UUIDUtil.getUUID(); //机器id,模拟k8s每次都不一样

    private final Logger logger = LoggerFactory.getLogger(ScheduledStatisticsHandler2.class);
 
    @Pointcut("@annotation(org.springframework.scheduling.annotation.Scheduled)")  
    public void proxyAspect() {}

    private double permitRate = 0.01;
 
    @Around("proxyAspect()")
    public void doInvoke(ProceedingJoinPoint joinPoint) throws Throwable{
        Object target = joinPoint.getTarget();
        String clazzName = target.getClass().getName();
        String methodName = joinPoint.getSignature().getName();
        Object[] args = joinPoint.getArgs();
        String id = clazzName.concat("#").concat(methodName);
        Method method = joinPoint.getTarget().getClass().getDeclaredMethod(joinPoint.getSignature().getName());
        Scheduled scheduled = method.getAnnotation(Scheduled.class);
        long fixedRate = scheduled.fixedRate();
        if (fixedRate == 0 && StringUtils.isNotEmpty(scheduled.cron())) {
            logger.error("unsupported scheduled task,the aop current only support fixrate");
            return;
        }
        String setId = id.concat("&").concat("set");
        Set<String> onlineNodes = redisTemplate.opsForSet().members(setId); //在线机器列表
//        logger.info("在线机器列表:{}",onlineNodes);
        String taskId = mId.concat("#").concat(id);
        boolean isMember = onlineNodes.contains(taskId);
        redisTemplate.opsForValue().set(taskId,"1",fixedRate,TimeUnit.MILLISECONDS); //保留5s的taskId
        onlineNodes.stream().forEach(x -> { //淘汰key
            Long expire = redisTemplate.getExpire(x);
            if (expire < 0) {
                redisTemplate.opsForSet().remove(setId,x);
            }
        });
        Object permitKey = redisTemplate.opsForValue().get(id); //alive key
//        logger.info("当前默认执行定时任务的机器:{}",permitKey);
        if (!isMember) {
            Set<String> keys = redisTemplate.keys("*#".concat(id));
            redisTemplate.opsForSet().add(setId,taskId);
            if (keys.isEmpty()) {
                process(joinPoint, args, id, setId,taskId);
            } else {
                if (!keys.contains(permitKey)) { //存活节点已经不存活了,重新选举
                    redisTemplate.opsForValue().set(id,taskId);
                    redisTemplate.opsForSet().remove(setId,permitKey);
                    execute(joinPoint,args);
                } else {
                    if (taskId.equals(permitKey)) {
                        execute(joinPoint,args);
                    } else {
                        logger.info("其他节点执行任务");
                    }
                }
            }
        } else {
            Object object = permitKey;
            String permittaskId = null;
            if (Objects.isNull(object)) {
                process(joinPoint, args, id, setId,taskId);
            } else {
                permittaskId = ((String) object);
                if (onlineNodes.contains(permittaskId)) {
                    if (taskId.equals(permittaskId)) {
                        execute(joinPoint,args);
                    } else {
                        logger.info("其他节点执行任务");
                    }
                } else {
                    redisTemplate.opsForValue().set(id,taskId);
                    execute(joinPoint,args);
                }
            }
        }
    }

    private void execute (ProceedingJoinPoint joinPoint,Object[] args) {
        try {
            joinPoint.proceed(args);
        } catch (Throwable throwable) {}
    }

    private void process(ProceedingJoinPoint joinPoint, Object[] args, String id, String setId,String taskId) throws Throwable {
        redisTemplate.opsForValue().set(id, taskId);
        redisTemplate.opsForSet().add(setId, taskId);
        execute(joinPoint,args);
    }

    private Long calculate(String corn) { //本方法仅支持简单表达式
        String[] args = corn.split(" ");
        Long times = 0l;
        for (int i = 0; i < args.length; i++) {
            String arg = args[i];
            boolean ignore = arg.equals("*");
            if (i==0) { //秒
                times += ignore?0:Integer.valueOf(arg);
            } else if (i == 1) { //分

            } else if (i == 2) { //时

            } else if (i == 3) { //day of month

            } else if (i == 4) { //month

            } else if (i == 5) { //星期

            } else { //年

            }
        }
        return times;
    }
}

3.application.yml

spring:
  redis:
    host: 127.0.0.1
    port: 6379

4.缺陷:只能处理fixrate类型的定时任务

    @Scheduled(fixedRate = 5 * SECONDS)
    public void test() throws Exception {
        String format = new SimpleDateFormat("hh:mm:ss").format(new Date());
        System.out.println("task execute in time:" + format);
    }

 



这篇关于分布式定时任务qutz利用redis 实现防重的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程