Hikari数据源项目启动后覆盖为新数据源【多库】

【适用场景:动态切换数据库(单库或者多库)。 切换新库完成后可不改配置,重启也能直接连新库】

方案:启动时多连接两个新库,即初始化4个库。在切换时,是通过实现protected abstract DataSource determineDataSource();的方法,进行数据源的决策,动态切换到新库。

项目情况:本身使用hikari的数据源,且连接了两个库。 现在希望运行中动态切换为另外两个数据库。项目本身就使用了dynamic-datasource-spring-boot-starter多数据源连接。

项目在启动时,就会读取下方配置,创建动态数据源对象:com.baomidou.dynamic.datasource.DynamicRoutingDataSource,内部的map存储了多个数据源。

/**所有数据库*/

private Map<String, DataSource> dataSourceMap = new LinkedHashMap<>();

使用时默认使用的master主数据源,需要切换时,在具体的service实现类或者mapper上,加上注解:@DS("second")即可使用从数据源的链接。

本次功能项目新增结构:

  1. 启动连接配置,是从nacos配置读取的(原本的配置)

spring:
  main: 
    allow-bean-definition-overriding: true
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    type: com.zaxxer.hikari.HikariDataSource
    hikari:
      maximum-pool-size: 10
      idle-timeout: 180000
      connection-timeout: 60000            
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
    dynamic:
      #修改默认数据源名称,默认值是master
      primary: master
      datasource:
        master:
          type: com.zaxxer.hikari.HikariDataSource
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://x1111111:3306/db_master?characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=Asia/Shanghai
          username: u1111111
          password: p1111111
        second:
          type: com.zaxxer.hikari.HikariDataSource
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://x2222222:3306/db_second?characterEncoding=UTF-8&amp&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
          username: u2222222
          password: p2222222
## 注意:如果原本是单库,也需要调整为这种格式的多库配置。不过此时需要额外引入dynamic-datasource-spring-boot-starter的jar包

配置调整:

增加以下新数据源配置:

        masterOfNew:
          type: com.zaxxer.hikari.HikariDataSource
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://new1111111:3306/db_master?characterEncoding=UTF-8&amp&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
          username: newU111111
          password: newP111111
        secondOfNew:
          type: com.zaxxer.hikari.HikariDataSource
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://new222222:3306/db_second?characterEncoding=UTF-8&amp&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai
          username: newU222222
          password: newP222222
## 注意:新数据源名字不能带下划线(会被放进分组数据库groupDataSources中,影响数据源决策,具体自己看DynamicRoutingDataSource源码,不复杂),不能带#号会被直接去除。我这里用的是加个“OfNew”后缀

  1. 自己的动态数据源实现类MyDynamicDsSource

import com.baomidou.dynamic.datasource.DynamicRoutingDataSource;
import javax.sql.DataSource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;

@Slf4j
public class MyDynamicDsSource extends DynamicRoutingDataSource {

    /** 新加字段,当前是否使用新数据源 */
    @Getter
    @Setter
    private volatile boolean useNewSource = false;

    /**
     * 其实就是父类的primary字段,只因在父类是私有的get不到,所以初始化set到这个字段直接使用
     */
    @Setter
    private String myPrimary;

    /**
     * 获取数据源
     *
     * @param ds 数据源名称
     * @return 数据源   说明:数据源名称不配置下划线,groupDataSources则为空,永远只会从dataSourceMap中获取。 而且ds为空时,默认获取primary。 因此我只需要提前处理ds即可
     */
    public DataSource getDataSource(String ds) {
        // 如果使用的是新数据源,则补充后缀
        if (useNewSource) {
            ds = (StringUtils.isEmpty(ds) ? myPrimary : ds) + DSConstants.DS_SUFFIX;
        }
        return super.getDataSource(ds);
    }
}

  1. 替换jar包的默认bean,使用自己新建的动态数据源bean。

import java.util.Map;
import javax.annotation.Resource;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.baomidou.dynamic.datasource.provider.DynamicDataSourceProvider;
import com.baomidou.dynamic.datasource.provider.YmlDynamicDataSourceProvider;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DataSourceProperty;
import com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceProperties;
import com.baomidou.dynamic.datasource.strategy.DynamicDataSourceStrategy;

import lombok.extern.slf4j.Slf4j;

/**
 * @see com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceAutoConfiguration
 */
@Slf4j
@Configuration
public class MyDynamicDsBeanCreator {
    /**
     * nacos配置的字段,项目启动时使用,判断是否直接使用新的数据源
     */
    @Value("${want.change.ds.operate:none}")
    private String changeOperate;

    @Resource
    private DynamicDataSourceProperties dynamicDataSourceProperties;

    @Bean
    public DynamicDataSourceProvider dynamicDataSourceProvider() {
        Map<String, DataSourceProperty> datasourceMap = dynamicDataSourceProperties.getDatasource();
        return new YmlDynamicDataSourceProvider(datasourceMap);
    }

    @Bean
    public DynamicDataSourceStrategy dynamicDataSourceStrategy() throws Exception {
        return dynamicDataSourceProperties.getStrategy().newInstance();
    }

    @Bean
    public DataSource dataSource(DynamicDataSourceProvider dynamicDataSourceProvider, DynamicDataSourceStrategy dynamicDataSourceStrategy) {
        MyDynamicDsSource dataSource = new MyDynamicDsSource();
        dataSource.setPrimary(dynamicDataSourceProperties.getPrimary());
        dataSource.setStrict(dynamicDataSourceProperties.getStrict());
        dataSource.setStrategy(dynamicDataSourceStrategy);
        dataSource.setProvider(dynamicDataSourceProvider);
        dataSource.setP6spy(dynamicDataSourceProperties.getP6spy());
        dataSource.setSeata(dynamicDataSourceProperties.getSeata());
        log.info("MyDynamicDsBeanCreator::starting, changeOperate={}", changeOperate);
        //这里,如果一致,则为true,代表启动时就链接新数据源。否则是链接老数据源
        dataSource.setUseNewSource("change".equals(changeOperate));
        dataSource.setMyPrimary(dynamicDataSourceProperties.getPrimary());
        return dataSource;
    }
}

  1. 事件监听NacosListener用于动态切换

可以使用mq,或者apollo,或其他的。这里是监听的nacos配置文件:

获取到nacos变更信息,取出我需要的字段,哪种操作,新数据源链接配置等,进行切换新数据源操作。顺带一提,如果多个项目都使用的同一个nacos配置文件,那么多个项目都能监听到。


import com.alibaba.cloud.nacos.NacosConfigManager;
import com.alibaba.cloud.nacos.NacosConfigProperties;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.Constructor;
import javax.annotation.Resource;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;

/**
 * 
 */
@Component
@Slf4j
public class NacosListener implements ApplicationRunner {
    @Resource
    private NacosConfigProperties nacosConfigProperties;
    @Autowired
    private DSChangeService dsChangeService;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        ConfigService configService = new NacosConfigManager(nacosConfigProperties).getConfigService();
        try {
            // 配置文件名称,这里是例子找出第一个配置文件此处文件自定义配置
            // NacosConfigProperties.Config config = nacosConfigProperties.getSharedConfigs().get(0);
            // configService.addListener(config.getDataId(), config.getGroup(), new Listener() {
            configService.addListener(nacosConfigProperties.getName(), nacosConfigProperties.getGroup(),
                    new Listener() {
                        @Override
                        public ScheduledThreadPoolExecutor getExecutor() {
                            return new ScheduledThreadPoolExecutor(1, new BasicThreadFactory.Builder()
                                    .namingPattern("nacos-listen-pool-%d").daemon(Boolean.TRUE).build());
                        }

                        @Override
                        public void receiveConfigInfo(String content) {
                            if (StringUtils.isNotEmpty(content)) {
                                loadData(content);
                            }
                        }
                    });
            log.info("nacos配置文件动态更新已添加监听");
        } catch (NacosException e) {
            log.error("nacos配置文件错误");
        }
    }

    // 监听到的content是nacos全部配置信息,最后我们只需要获取自己需要的配置即可
    private void loadData(String content) {
        try {
            // 默认解析yaml格式的文件,其他类型自行配置解析
            Yaml yaml = new Yaml(new Constructor(JSONObject.class));
            InputStream inputStream = new ByteArrayInputStream(content.getBytes());
            Map<String, Object> load = yaml.loadAs(inputStream, Map.class);

            String task = String.valueOf(load.get("want.change.ds.operate"));
            log.info("NacosListener:loadData:task={}", task);
            String result = null;
            if (task.contains("data")) {  //测试获取数据,观察数据源切换是否成功
                result = dsChangeService.getData();
            } else if (task.contains("back")) {  // 切换回老的数据源
                result = dsChangeService.setBack();
            } else if (task.contains("change")) {   // 切换到新的数据源
                result = dsChangeService.changeDS();
            }
            log.info("NacosListener:loadData:result={}", result);
        } catch (Exception e) {
            log.error("ERROR:NacosListener:loadData:", e);
        }
    }
}

  1. service操作类

import java.util.Map;
public interface DSChangeService{
    /**
     * 切换数据源
     */
    String changeDS();

    /**
     * 测试数据查询
     */
    String getData();

    /**
     * 设置回去老数据源
     */
    String setBack();
}
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.uwork.common.util.json.JsonHelper;
import javax.annotation.Resource;
import java.util.List;

@Slf4j
@Service("dsChangeService")
public class DSChangeServiceImpl implements DSChangeService {

    @Resource
    private MyDynamicDsSource dataSource;

    /**
     * 切换到新数据源,使之走新的数据源决策逻辑
     * @return
     */
    @Override
    public String changeDS() {
        if (dataSource.isUseNewSource()) {
            return "changeDS::数据源已切换,无需重复切换";
        }
        // 设置为已经切换
        dataSource.setUseNewSource(true);
        return "changeDS:切换数据源完成";
    }


    @Override
    public String getData() {
        List list = xxxxxx(); //自行获取测试数据
        String s = JsonHelper.toJson(list);
        return "getData::" + s;
    }

    @Override
    public String setBack() {
        // 当前使用的是新数据源,才能切回老数据源
        if (dataSource.isUseNewSource()) {
            dataSource.setUseNewSource(false);
            return "setBack::切回老数据源完成!";
        }
        return "setBack::nothing--";
    }
}

至此,我们可以通过动态变更nacos配置,实现对新数据源的切换,设置回去,和数据验证。

  • 项目启动完成,加载多个数据源 -> naocs配置变更 -> NacosListener监听到,获取要执行的操作 -> 切换到新数据源

  • 且后续此配置存在时,项目需要重启,则可以直接使新数据源生效。

//nacos配置变更 
want.change.ds.operate: change