redis主从架构利用lettuce实现读写分离

redis主从架构利用lettuce实现读写分离

背景

随着业务流量越来越大,原先所有流量都访问redis主库,给主库造成了很大的压力

目标

在不影响业务的前提下,减轻redis主库压力

现状

当前redis的部署架构是一主一从,从库只是承担了备份的角色,资源有很大的闲置

方案

如果从库也能承担一部分线上流量,那么主库的压力自然就会减轻;方案理论上可行

问题

项目使用的lettuce + spring-boot-starter-data-redis做redis访问

lettuce本身是支持主从模式的访问的,奈何spring-boot-starter-data-redis对于redis哨兵和redis集群模式都有很好的支持,对于主从没有支持

代码分析

入口

org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration

@Configuration(
    proxyBeanMethods = false
)
@ConditionalOnClass({RedisOperations.class})
@EnableConfigurationProperties({RedisProperties.class})
@Import({LettuceConnectionConfiguration.class, JedisConnectionConfiguration.class})
public class RedisAutoConfiguration {
    // ...
}

只看上面的import部分,默认会LettuceConnectionConfiguration会生效,接着看下这个文件

@Bean
@ConditionalOnMissingBean(RedisConnectionFactory.class)
LettuceConnectionFactory redisConnectionFactory(
		ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
		ClientResources clientResources) {
	LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers, clientResources,
			getProperties().getLettuce().getPool());
	return createLettuceConnectionFactory(clientConfig);
}

private LettuceConnectionFactory createLettuceConnectionFactory(LettuceClientConfiguration clientConfiguration) {
	if (getSentinelConfig() != null) {
		return new LettuceConnectionFactory(getSentinelConfig(), clientConfiguration);
	}
	if (getClusterConfiguration() != null) {
		return new LettuceConnectionFactory(getClusterConfiguration(), clientConfiguration);
	}
	return new LettuceConnectionFactory(getStandaloneConfig(), clientConfiguration);
}

private LettuceClientConfiguration getLettuceClientConfiguration(
		ObjectProvider<LettuceClientConfigurationBuilderCustomizer> builderCustomizers,
		ClientResources clientResources, Pool pool) {
	LettuceClientConfigurationBuilder builder = createBuilder(pool);
	applyProperties(builder);
	if (StringUtils.hasText(getProperties().getUrl())) {
		customizeConfigurationFromUrl(builder);
	}
	builder.clientOptions(createClientOptions());
	builder.clientResources(clientResources);
	builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder));
	return builder.build();
}

redisConnectionFactory会创建最终的RedisConnectionFactory,再看createLettuceConnectionFactory方法,可以发现对哨兵(sentinel)和集群(cluster)是天然支持的,具体怎么配置这里先略过

我们再看LettuceConnectionFactory,这里重点关注afterPropertiesSet方法

public void afterPropertiesSet() {
	this.client = this.createClient();
	this.connectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceConnection.CODEC));
	this.reactiveConnectionProvider = new LettuceConnectionFactory.ExceptionTranslatingConnectionProvider(this.createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC));
	if (this.isClusterAware()) {
		this.clusterCommandExecutor = new ClusterCommandExecutor(new LettuceClusterTopologyProvider((RedisClusterClient)this.client), new LettuceClusterNodeResourceProvider(this.connectionProvider), EXCEPTION_TRANSLATION);
	}

	this.initialized = true;
	if (this.getEagerInitialization() && this.getShareNativeConnection()) {
		this.initConnection();
	}
}

再看下createConnectionProvider方法

private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
	if (this.pool != null) {
		return new LettucePoolConnectionProvider(this.pool);
	} else {
		LettuceConnectionProvider connectionProvider = this.doCreateConnectionProvider(client, codec);
		return (LettuceConnectionProvider)(this.clientConfiguration instanceof LettucePoolingClientConfiguration ? new LettucePoolingConnectionProvider(connectionProvider, (LettucePoolingClientConfiguration)this.clientConfiguration) : connectionProvider);
	}
}

protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
	ReadFrom readFrom = (ReadFrom)this.getClientConfiguration().getReadFrom().orElse((Object)null);
	if (this.isStaticMasterReplicaAware()) {
		List<RedisURI> nodes = (List)((RedisStaticMasterReplicaConfiguration)this.configuration).getNodes().stream().map((it) -> {
			return this.createRedisURIAndApplySettings(it.getHostName(), it.getPort());
		}).peek((it) -> {
			it.setDatabase(this.getDatabase());
		}).collect(Collectors.toList());
		return new StaticMasterReplicaConnectionProvider((RedisClient)client, codec, nodes, readFrom);
	} else {
		return (LettuceConnectionProvider)(this.isClusterAware() ? new ClusterConnectionProvider((RedisClusterClient)client, codec, readFrom) : new StandaloneConnectionProvider((RedisClient)client, codec, readFrom));
	}
}

最终会走到最后一行的最后一条语句new StandaloneConnectionProvider((RedisClient)client, codec, readFrom));

再看StandaloneConnectionProvider,重点看getConnection方法

@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {

	if (connectionType.equals(StatefulRedisSentinelConnection.class)) {
		return connectionType.cast(client.connectSentinel());
	}

	if (connectionType.equals(StatefulRedisPubSubConnection.class)) {
		return connectionType.cast(client.connectPubSub(codec));
	}

	if (StatefulConnection.class.isAssignableFrom(connectionType)) {

		return connectionType.cast(readFrom.map(it -> this.masterReplicaConnection(redisURISupplier.get(), it))
				.orElseGet(() -> client.connect(codec)));
	}

	throw new UnsupportedOperationException("Connection type " + connectionType + " not supported!");
}

当connectionType=StatefulConnection时,会走到masterReplicaConnection方法

private StatefulRedisConnection masterReplicaConnection(RedisURI redisUri, ReadFrom readFrom) {

	StatefulRedisMasterReplicaConnection<?, ?> connection = MasterReplica.connect(client, codec, redisUri);
	connection.setReadFrom(readFrom);

	return connection;
}

继续往下追,会走到MasterReplica.connectAsyncSentinelOrAutodiscovery方法

private static <K, V> CompletableFuture<StatefulRedisMasterReplicaConnection<K, V>> connectAsyncSentinelOrAutodiscovery(
		RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) {

	LettuceAssert.notNull(redisClient, "RedisClient must not be null");
	LettuceAssert.notNull(codec, "RedisCodec must not be null");
	LettuceAssert.notNull(redisURI, "RedisURI must not be null");

	if (isSentinel(redisURI)) {
		return new SentinelConnector<>(redisClient, codec, redisURI).connectAsync();
	}

	return new AutodiscoveryConnector<>(redisClient, codec, redisURI).connectAsync();
}

最终走到AutodiscoveryConnector,重点关注initializeConnection方法

private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(RedisCodec<K, V> codec,
		Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionAndUri) {

	ReplicaTopologyProvider topologyProvider = new ReplicaTopologyProvider(connectionAndUri.getT2(),
			connectionAndUri.getT1());

	MasterReplicaTopologyRefresh refresh = new MasterReplicaTopologyRefresh(redisClient, topologyProvider);
	MasterReplicaConnectionProvider<K, V> connectionProvider = new MasterReplicaConnectionProvider<>(redisClient, codec,
			redisURI, (Map) initialConnections);

	Mono<List<RedisNodeDescription>> refreshFuture = refresh.getNodes(redisURI);

	return refreshFuture.map(nodes -> {

		EventRecorder.getInstance().record(new MasterReplicaTopologyChangedEvent(redisURI, nodes));

		connectionProvider.setKnownNodes(nodes);

		MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider,
				redisClient.getResources());

		StatefulRedisMasterReplicaConnectionImpl<K, V> connection = new StatefulRedisMasterReplicaConnectionImpl<>(
				channelWriter, codec, redisURI.getTimeout());

		connection.setOptions(redisClient.getOptions());

		return connection;
	});
}

可以发现返回的StatefulRedisMasterReplicaConnection对象中,channerWriter是MasterReplicaChannelWriter;重点看write方法,大部分redis操作都走到这个方法

@Override
@SuppressWarnings("unchecked")
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

	LettuceAssert.notNull(command, "Command must not be null");

	if (closed) {
		throw new RedisException("Connection is closed");
	}

	if (isStartTransaction(command.getType())) {
		inTransaction = true;
	}

	Intent intent = inTransaction ? Intent.WRITE : getIntent(command.getType());
	CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) masterReplicaConnectionProvider
			.getConnectionAsync(intent);

	if (isEndTransaction(command.getType())) {
		inTransaction = false;
	}

	if (isSuccessfullyCompleted(future)) {
		writeCommand(command, future.join(), null);
	} else {
		future.whenComplete((c, t) -> writeCommand(command, c, t));
	}

	return command;
}

重点在CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) masterReplicaConnectionProvider .getConnectionAsync(intent);

public CompletableFuture<StatefulRedisConnection<K, V>> getConnectionAsync(Intent intent) {

	if (debugEnabled) {
		logger.debug("getConnectionAsync(" + intent + ")");
	}

	if (readFrom != null && intent == Intent.READ) {
		List<RedisNodeDescription> selection = readFrom.select(new ReadFrom.Nodes() {

			@Override
			public List<RedisNodeDescription> getNodes() {
				return knownNodes;
			}

			@Override
			public Iterator<RedisNodeDescription> iterator() {
				return knownNodes.iterator();
			}

		});

		if (selection.isEmpty()) {
			throw new RedisException(String.format("Cannot determine a node to read (Known nodes: %s) with setting %s",
					knownNodes, readFrom));
		}

		try {

			Flux<StatefulRedisConnection<K, V>> connections = Flux.empty();

			for (RedisNodeDescription node : selection) {
				connections = connections.concatWith(Mono.fromFuture(getConnection(node)));
			}

			if (OrderingReadFromAccessor.isOrderSensitive(readFrom) || selection.size() == 1) {
				return connections.filter(StatefulConnection::isOpen).next().switchIfEmpty(connections.next()).toFuture();
			}

			return connections.filter(StatefulConnection::isOpen).collectList().map(it -> {
				int index = ThreadLocalRandom.current().nextInt(it.size());
				return it.get(index);
			}).switchIfEmpty(connections.next()).toFuture();
		} catch (RuntimeException e) {
			throw Exceptions.bubble(e);
		}
	}

	return getConnection(getMaster());
}

其中有一行需要注意;if (readFrom != null && intent == Intent.READ) 块中的内容

可以发现,当遇到读请求且readFrom不为空时,会有选择节点的策略

所以要实现读写分离,我们只需要保证走到这里时,readFrom是从从库选择节点的策略就行

其中内置的ReadFrom.REPLICA_PREFERRED即可满足要求

接下来需要解决的是如何保证走到这里时readFrom不为空,实际上这里无法直接通过配置完成,具体可以看RedisProperties,里面没有定义这个字段,接下来,我们向上回溯可以发现

LettuceConnectionConfiguration.getLettuceClientConfiguration此时可以通过新增LettuceClientConfigurationBuilderCustomizer去设置readFrom,整个推理过程不在描述

因此,我们只需要增加一个LettuceClientConfigurationBuilderCustomizer实现即可

@ConditionalOnProperty(value = "spring.redis.replica-preferred", havingValue = "true")
@Component
@Slf4j
public class RedisPreferredLettuceClientConfigurationBuilderCustomizer implements LettuceClientConfigurationBuilderCustomizer, InitializingBean {

    @Override
    public void customize(LettuceClientConfiguration.LettuceClientConfigurationBuilder clientConfigurationBuilder) {
        log.info("RedisPreferredLettuceClientConfigurationBuilderCustomizer.customize");
        clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        log.info("RedisPreferredLettuceClientConfigurationBuilderCustomizer inited");
    }
}

Last updated