dynamic-datasource实现数据读写分离

dynamic-datasource实现数据读写分离

背景

随着业务流量越来越大,所有数据库请求都访问mysql主库,给主库造成了较大的压力

目标

减轻mysql主库压力

现状

当前数据库部署架构为一主一从,从库只是单纯作为备份,没有承接线上流量,资源有闲置

方案

方案对比

通常会有其中思路

分库

分库的思路是讲数据库中表根据业务紧密程度拆分成几个不同的库,不同的表访问不同的库,不同的库分布在不同的节点,从而可以减轻单个库的压力,是一种横向扩展的思路,类似的还有分表,这里不做介绍

分库通常会涉及到系统的改造,比如原先在一个库的表,一个事务就可以保证,拆分成多个库之后,原先一个事务可以完成的,现在不能保证正确,通常涉及到分布式事务或者BASE等,改造成本比较高

读写分离

读写分离也是业界常用的方案之一,思路是在数据库主从同步延迟可以接受的范围内,讲一部分查询请求分流到数据库从库,从来降低主库压力

考虑到目前的数据容量以及改造成本,现阶段先采用读写分离的方式

实施

因为系统已经集成了dynamic-datasource介入了多个不同的数据源,因此方案基于此作

先了解下dynamic-datasource核心类

首先是com.baomidou.dynamic.datasource.spring.boot.autoconfigure.DynamicDataSourceAutoConfiguration

@Slf4j
@Configuration
@EnableConfigurationProperties(DynamicDataSourceProperties.class)
@AutoConfigureBefore(value = DataSourceAutoConfiguration.class, name = "com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure")
@Import(value = {DruidDynamicDataSourceConfiguration.class, DynamicDataSourceCreatorAutoConfiguration.class})
@ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class DynamicDataSourceAutoConfiguration implements InitializingBean {

    private final DynamicDataSourceProperties properties;

    private final List<DynamicDataSourcePropertiesCustomizer> dataSourcePropertiesCustomizers;

    public DynamicDataSourceAutoConfiguration(
            DynamicDataSourceProperties properties,
            ObjectProvider<List<DynamicDataSourcePropertiesCustomizer>> dataSourcePropertiesCustomizers) {
        this.properties = properties;
        this.dataSourcePropertiesCustomizers = dataSourcePropertiesCustomizers.getIfAvailable();
    }

    @Bean
    public DynamicDataSourceProvider ymlDynamicDataSourceProvider() {
        return new YmlDynamicDataSourceProvider(properties.getDatasource());
    }

    @Bean
    @ConditionalOnMissingBean
    public DataSource dataSource() {
        DynamicRoutingDataSource dataSource = new DynamicRoutingDataSource();
        dataSource.setPrimary(properties.getPrimary());
        dataSource.setStrict(properties.getStrict());
        dataSource.setStrategy(properties.getStrategy());
        dataSource.setP6spy(properties.getP6spy());
        dataSource.setSeata(properties.getSeata());
        return dataSource;
    }

    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    @Bean
    @ConditionalOnProperty(prefix = DynamicDataSourceProperties.PREFIX + ".aop", name = "enabled", havingValue = "true", matchIfMissing = true)
    public Advisor dynamicDatasourceAnnotationAdvisor(DsProcessor dsProcessor) {
        DynamicDatasourceAopProperties aopProperties = properties.getAop();
        DynamicDataSourceAnnotationInterceptor interceptor = new DynamicDataSourceAnnotationInterceptor(aopProperties.getAllowedPublicOnly(), dsProcessor);
        DynamicDataSourceAnnotationAdvisor advisor = new DynamicDataSourceAnnotationAdvisor(interceptor, DS.class);
        advisor.setOrder(aopProperties.getOrder());
        return advisor;
    }

    // ...

    @Bean
    @ConditionalOnMissingBean
    public DsProcessor dsProcessor(BeanFactory beanFactory) {
        DsHeaderProcessor headerProcessor = new DsHeaderProcessor();
        DsSessionProcessor sessionProcessor = new DsSessionProcessor();
        DsSpelExpressionProcessor spelExpressionProcessor = new DsSpelExpressionProcessor();
        spelExpressionProcessor.setBeanResolver(new BeanFactoryResolver(beanFactory));
        headerProcessor.setNextProcessor(sessionProcessor);
        sessionProcessor.setNextProcessor(spelExpressionProcessor);
        return headerProcessor;
    }

    // ...
}

该Configuration定义了几个bean

  • ymlDynamicDataSourceProvider,用于创建实际的数据库如druid/c3p等,代码比较简单这里忽略

  • dataSource,实际类型是DynamicRoutingDataSource,是对众多dataSource的封装,接下来会细讲

  • dynamicDatasourceAnnotationAdvisor,主要功能就是解析接口或者方法上的@DS注解并将值并放入线程上下文(DynamicDataSourceContextHolder)中

接下来看DynamicRoutingDataSource类

先看afterPropertiesSet

@Override
public void afterPropertiesSet() throws Exception {
	// 检查开启了配置但没有相关依赖
	checkEnv();
	// 添加并分组数据源
	Map<String, DataSource> dataSources = new HashMap<>(16);
	for (DynamicDataSourceProvider provider : providers) {
		dataSources.putAll(provider.loadDataSources());
	}
	for (Map.Entry<String, DataSource> dsItem : dataSources.entrySet()) {
		addDataSource(dsItem.getKey(), dsItem.getValue());
	}
	// 检测默认数据源是否设置
	if (groupDataSources.containsKey(primary)) {
		log.info("dynamic-datasource initial loaded [{}] datasource,primary group datasource named [{}]", dataSources.size(), primary);
	} else if (dataSourceMap.containsKey(primary)) {
		log.info("dynamic-datasource initial loaded [{}] datasource,primary datasource named [{}]", dataSources.size(), primary);
	} else {
		log.warn("dynamic-datasource initial loaded [{}] datasource,Please add your primary datasource or check your configuration", dataSources.size());
	}
}

主要是讲所有的实际数据库连接放到两个map

  • Map<String, DataSource>,最简单的分组,key是配置的数据库name,value是具体的数据库

  • Map<String, GroupDataSource>,将配置的数据库name按照下划线分割之后,取第一个分组

代码如下

/**
 * 添加数据源
 *
 * @param ds         数据源名称
 * @param dataSource 数据源
 */
public synchronized void addDataSource(String ds, DataSource dataSource) {
	DataSource oldDataSource = dataSourceMap.put(ds, dataSource);
	// 新数据源添加到分组
	this.addGroupDataSource(ds, dataSource);
	// 关闭老的数据源
	if (oldDataSource != null) {
		closeDataSource(ds, oldDataSource);
	}
	log.info("dynamic-datasource - add a datasource named [{}] success", ds);
}

/**
 * 新数据源添加到分组
 *
 * @param ds         新数据源的名字
 * @param dataSource 新数据源
 */
private void addGroupDataSource(String ds, DataSource dataSource) {
	if (ds.contains(UNDERLINE)) {
		String group = ds.split(UNDERLINE)[0];
		GroupDataSource groupDataSource = groupDataSources.get(group);
		if (groupDataSource == null) {
			try {
				groupDataSource = new GroupDataSource(group, strategy.getDeclaredConstructor().newInstance());
				groupDataSources.put(group, groupDataSource);
			} catch (Exception e) {
				throw new RuntimeException("dynamic-datasource - add the datasource named " + ds + " error", e);
			}
		}
		groupDataSource.addDatasource(ds, dataSource);
	}
}

接下来看下AbstractRoutingDataSource,是DynamicRoutingDataSource的基类

public abstract class AbstractRoutingDataSource extends AbstractDataSource {

    /**
     * 抽象获取连接池
     *
     * @return 连接池
     */
    protected abstract DataSource determineDataSource();

    @Override
    public Connection getConnection() throws SQLException {
        String xid = TransactionContext.getXID();
        if (StringUtils.isEmpty(xid)) {
            return determineDataSource().getConnection();
        } else {
            String ds = DynamicDataSourceContextHolder.peek();
            ds = StringUtils.isEmpty(ds) ? "default" : ds;
            ConnectionProxy connection = ConnectionFactory.getConnection(ds);
            return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection()) : connection;
        }
    }

    @Override
    public Connection getConnection(String username, String password) throws SQLException {
        String xid = TransactionContext.getXID();
        if (StringUtils.isEmpty(xid)) {
            return determineDataSource().getConnection(username, password);
        } else {
            String ds = DynamicDataSourceContextHolder.peek();
            ds = StringUtils.isEmpty(ds) ? "default" : ds;
            ConnectionProxy connection = ConnectionFactory.getConnection(ds);
            return connection == null ? getConnectionProxy(ds, determineDataSource().getConnection(username, password))
                    : connection;
        }
    }
    // ...
}

getConnection方法可以看到动态切换数据的逻辑,具体逻辑留给了子类实现即determineDataSource方法

接下来,我们DynamicDataSourceAutoConfiguration.determineDataSource方法

@Override
public DataSource determineDataSource() {
    String dsKey = DynamicDataSourceContextHolder.peek();
    return getDataSource(dsKey);
}

/**
 * 获取数据源
 *
 * @param ds 数据源名称
 * @return 数据源
 */
public DataSource getDataSource(String ds) {
    if (StringUtils.isEmpty(ds)) {
        return determinePrimaryDataSource();
    } else if (!groupDataSources.isEmpty() && groupDataSources.containsKey(ds)) {
        log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
        return groupDataSources.get(ds).determineDataSource();
    } else if (dataSourceMap.containsKey(ds)) {
        log.debug("dynamic-datasource switch to the datasource named [{}]", ds);
        return dataSourceMap.get(ds);
    }
    if (strict) {
        throw new CannotFindDataSourceException("dynamic-datasource could not find a datasource named" + ds);
    }
    return determinePrimaryDataSource();
}

private DataSource determinePrimaryDataSource() {
    log.debug("dynamic-datasource switch to the primary datasource");
    DataSource dataSource = dataSourceMap.get(primary);
    if (dataSource != null) {
        return dataSource;
    }
    GroupDataSource groupDataSource = groupDataSources.get(primary);
    if (groupDataSource != null) {
        return groupDataSource.determineDataSource();
    }
    throw new CannotFindDataSourceException("dynamic-datasource can not find primary datasource");
}

可以发现,如果从上下文可以获取到ds,将根据ds选择数据库,如果没有ds,将直接根据配置的primary选择数据库

还记得上文中提到的两个map吗,可以发现如果ds命中第一map时,返回的数据库是固定的,也就无法实现所谓的读写分离;除非每个方法都手动通过@DS注解指定,这种方式不友好;当第一个map不命中且第二个map命中时,实现轻松在多个数据库之间动态切换,且不需要在方法上指定@DS注解

因此第一个关键点就是要对 数据库进行分组,配置方式如下

spring:
  datasource:
    dynamic:
      datasource:
        group1_master:
          # 具体数据库配置
        group1_slave1:
          # 具体数据库配置
        group1_slave2:
          # 具体数据库配置
        group2_master:
          # 具体数据库配置
        group2_slave1:
          # 具体数据库配置
        group2_slave2:
          # 具体数据库配置

如果走到分组,具体分组逻辑是GroupDataSource.determineDataSource方法

public String determineDsKey() {
    return dynamicDataSourceStrategy.determineKey(new ArrayList<>(dataSourceMap.keySet()));
}

public DataSource determineDataSource() {
    return dataSourceMap.get(determineDsKey());
}

可以看到,具体选择分组中的那个数据库,是根据dynamicDataSourceStrategy来决定的,因此我们只需要实现一个dynamicDataSourceStrategy的实现能够根据读写请求选择不同的ds即可。

策略基本如下:

1、普通读请求,走从库或者主库都可以

2、普通写请求,走主库

3、强制指定了走主库的请求,走主库

4、事务执行,强制走主库

接下来难点在于

1、如何判断是读请求还是写请求

2、如何判断强制走主库

3、对于事物,比较特殊,在事务开启之前就需要切到主库然后获取数据库连接

一个一个解决

首先判断是否读请求还是写请求,这个需要使用mybatis拦截器,拦截Executor方法

public interface Executor {
    ResultHandler NO_RESULT_HANDLER = null;

    int update(MappedStatement var1, Object var2) throws SQLException;

    <E> List<E> query(MappedStatement var1, Object var2, RowBounds var3, ResultHandler var4, CacheKey var5, BoundSql var6) throws SQLException;

    <E> List<E> query(MappedStatement var1, Object var2, RowBounds var3, ResultHandler var4) throws SQLException;
   // ...
}

通过第一个参数MappedStatement可以判断

SqlCommandType.SELECT == MappedStatement.getSqlCommandType()

接下来就是如何判断强制走主库了,这个需要自定义实现,通常是使用注解+AOP方式

所以整体实现比较简单

首先定义上下文,保存是否走主库标志

public class ForceMasterContext {

    private static final ThreadLocal<Boolean> FORCE_MASTER = new ThreadLocal<>();

    public static void setForceMaster(boolean forceMaster) {
        FORCE_MASTER.set(forceMaster);
    }

    public static boolean isForceMaster() {
        return FORCE_MASTER.get() != null && FORCE_MASTER.get();
    }

    public static void clearForceMaster() {
        FORCE_MASTER.remove();
    }
}

接下来实现策略,根据标志位选择主库或者从库

@Slf4j
public class ForceMasterDynamicDataSourceStrategy extends LoadBalanceDynamicDataSourceStrategy {

    private static final String MASTER_SUFFIX = "_master";

    @Override
    public String determineKey(List<String> dsNames) {
        String ds = null;
        if (ForceMasterContext.isForceMaster()) {
            ds = Optional.ofNullable(dsNames)
                    .orElse(new ArrayList<>())
                    .stream()
                    .filter(StringUtils::isNotBlank)
                    .filter(o -> o.endsWith(MASTER_SUFFIX))
                    .findFirst()
                    .orElse(null);
        }

        if (ds == null) {
            ds = super.determineKey(dsNames);
        }

        return ds;
    }
}

需要修改配置,指定spring.datasource.dynamic.strategy=Xxx.class才会生效

定义mybatis拦截器,设置

@Intercepts({@Signature(
        type = Executor.class,
        method = "query",
        args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}
), @Signature(
        type = Executor.class,
        method = "query",
        args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}
), @Signature(
        type = Executor.class,
        method = "update",
        args = {MappedStatement.class, Object.class}
)})
@Slf4j
@Component
public class ForceMasterPlugin implements Interceptor {

    @Override
    public Object intercept(Invocation invocation) throws Throwable {
        // 事务操作,正常不应该走到这里
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            return invocation.proceed();
        }
        
        // 已经强制走到主库了
        if (ForceMasterContext.isForceMaster()) {
            return invocation.proceed();
        }

        Object[] args = invocation.getArgs();
        MappedStatement ms = (MappedStatement) args[0];
        // 非事务中的查询操作直接返回
        if (SqlCommandType.SELECT == ms.getSqlCommandType()) {
            return invocation.proceed();
        }

        try {
            ForceMasterContext.setForceMaster(true);
            return invocation.proceed();
        } finally {
            ForceMasterContext.clearForceMaster();
        }
    }

    @Override
    public Object plugin(Object target) {
        return target instanceof Executor ? Plugin.wrap(target, this) : target;
    }

    @Override
    public void setProperties(Properties properties) {
    }
}

接下来自定义强制走主库注解

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ForceMaster {
}

定义Aop

@Configuration
@Aspect
@Slf4j
public class ForceMasterAspectConfiguration {

    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    @Bean
    public Advisor forceMasterAdvisor() {
        MethodInterceptor interceptor = invocation -> {
            if (ForceMasterContext.isForceMaster()) {
                return invocation.proceed();
            }

            try {
                ForceMasterContext.setForceMaster(true);
                return invocation.proceed();
            } finally {
                ForceMasterContext.clearForceMaster();
            }
        };
        return new DynamicDataSourceAnnotationAdvisor(interceptor, ForceMaster.class);
    }
}

最后如何判断事务即将开启,spring-tx开启事务在AbstractPlatformTransactionManager.getTransaction方法,代码如下

@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
        throws TransactionException {

    // Use defaults if no transaction definition given.
    TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());

    Object transaction = doGetTransaction();
    boolean debugEnabled = logger.isDebugEnabled();

    if (isExistingTransaction(transaction)) {
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(def, transaction, debugEnabled);
    }

    // Check definition settings for new transaction.
    if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    }

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        SuspendedResourcesHolder suspendedResources = suspend(null);
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
        }
        try {
            return startTransaction(def, transaction, debugEnabled, suspendedResources);
        }
        catch (RuntimeException | Error ex) {
            resume(null, suspendedResources);
            throw ex;
        }
    }
    else {
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + def);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
}

/**
 * Start a new transaction.
 */
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
        boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {

    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}

最终会走到doBegin方法,具体实现类在DataSourceTransactionManager中

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
    Connection con = null;

    try {
        if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            Connection newCon = this.obtainDataSource().getConnection();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }

            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();
        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);
        txObject.setReadOnly(definition.isReadOnly());
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }

            con.setAutoCommit(false);
        }

        this.prepareTransactionalConnection(con, definition);
        txObject.getConnectionHolder().setTransactionActive(true);
        int timeout = this.determineTimeout(definition);
        if (timeout != -1) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
        }

    } catch (Throwable var7) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, this.obtainDataSource());
            txObject.setConnectionHolder((ConnectionHolder)null, false);
        }

        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
    }
}

可以看到doBegin方法中获取了数据连接(Connection对象),结合上面的两个方法,可以发现两点

1、在获取数据库连接之前没有任何的回调和埋点,也就是说无法通过添加钩子在做额外逻辑,也不能通过上下文去判断

2、方法要么是protected,要么是final修饰,也就是说不能通过AOP来扩展

因此只剩下一种方法,通过继承去扩展,且需要修改默认的PlatformTransactionManager实现类

代码如下

@Component
public class ForceMasterDataSourceTransactionManager extends DataSourceTransactionManager {

    public ForceMasterDataSourceTransactionManager(DataSource dataSource, ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
        super(dataSource);

        TransactionManagerCustomizers customizers = transactionManagerCustomizers.getIfAvailable();
        if (customizers != null) {
            customizers.customize(this);
        }
    }

    @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        if (!ForceMasterContext.isForceMaster()) {
            ForceMasterContext.setForceMaster(true);
        }

        super.doBegin(transaction, definition);
    }


    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        try {
            super.doCleanupAfterCompletion(transaction);
        } finally {
            ForceMasterContext.clearForceMaster();
        }
    }
}

Last updated