项目使用的lettuce + spring-boot-starter-data-redis做redis访问
Copy @ Configuration (
proxyBeanMethods = false
@ ConditionalOnClass ({ RedisOperations . class })
@ EnableConfigurationProperties ({ RedisProperties . class })
@ Import ({ LettuceConnectionConfiguration . class , JedisConnectionConfiguration . class })
public class RedisAutoConfiguration {
// ...
Copy @ Bean
@ ConditionalOnMissingBean ( RedisConnectionFactory . class )
LettuceConnectionFactory redisConnectionFactory(
ObjectProvider< LettuceClientConfigurationBuilderCustomizer > builderCustomizers ,
ClientResources clientResources) {
LettuceClientConfiguration clientConfig = getLettuceClientConfiguration(builderCustomizers , clientResources ,
getProperties() . getLettuce() . getPool()) ;
return createLettuceConnectionFactory(clientConfig) ;
private LettuceConnectionFactory createLettuceConnectionFactory( LettuceClientConfiguration clientConfiguration) {
if ( getSentinelConfig() != null ) {
return new LettuceConnectionFactory(getSentinelConfig() , clientConfiguration) ;
if ( getClusterConfiguration() != null ) {
return new LettuceConnectionFactory(getClusterConfiguration() , clientConfiguration) ;
return new LettuceConnectionFactory(getStandaloneConfig() , clientConfiguration) ;
private LettuceClientConfiguration getLettuceClientConfiguration(
ObjectProvider< LettuceClientConfigurationBuilderCustomizer > builderCustomizers ,
ClientResources clientResources , Pool pool) {
LettuceClientConfigurationBuilder builder = createBuilder(pool) ;
applyProperties(builder) ;
if ( StringUtils . hasText ( getProperties() . getUrl ())) {
customizeConfigurationFromUrl(builder) ;
builder . clientOptions ( createClientOptions() );
builder . clientResources (clientResources);
builderCustomizers . orderedStream () . forEach ((customizer) -> customizer . customize (builder));
return builder . build ();
Copy public void afterPropertiesSet() {
this . client = this . createClient ();
this . connectionProvider = new LettuceConnectionFactory . ExceptionTranslatingConnectionProvider ( this . createConnectionProvider ( this . client , LettuceConnection . CODEC ));
this . reactiveConnectionProvider = new LettuceConnectionFactory . ExceptionTranslatingConnectionProvider ( this . createConnectionProvider ( this . client , LettuceReactiveRedisConnection . CODEC ));
if ( this . isClusterAware ()) {
this . clusterCommandExecutor = new ClusterCommandExecutor( new LettuceClusterTopologyProvider((RedisClusterClient) this . client ) , new LettuceClusterNodeResourceProvider( this . connectionProvider ) , EXCEPTION_TRANSLATION) ;
this . initialized = true ;
if ( this . getEagerInitialization () && this . getShareNativeConnection ()) {
this . initConnection ();
Copy private LettuceConnectionProvider createConnectionProvider( AbstractRedisClient client , RedisCodec<? , ?> codec) {
if ( this . pool != null ) {
return new LettucePoolConnectionProvider( this . pool ) ;
} else {
LettuceConnectionProvider connectionProvider = this . doCreateConnectionProvider (client , codec);
return (LettuceConnectionProvider)( this . clientConfiguration instanceof LettucePoolingClientConfiguration ? new LettucePoolingConnectionProvider(connectionProvider , (LettucePoolingClientConfiguration) this . clientConfiguration ) : connectionProvider);
protected LettuceConnectionProvider doCreateConnectionProvider( AbstractRedisClient client , RedisCodec<? , ?> codec) {
ReadFrom readFrom = (ReadFrom) this . getClientConfiguration () . getReadFrom () . orElse ((Object) null );
if ( this . isStaticMasterReplicaAware ()) {
List < RedisURI > nodes = (List)((RedisStaticMasterReplicaConfiguration) this . configuration ) . getNodes () . stream () . map ((it) -> {
return this . createRedisURIAndApplySettings ( it . getHostName () , it . getPort ());
}) . peek ((it) -> {
it . setDatabase ( this . getDatabase ());
}) . collect ( Collectors . toList ());
return new StaticMasterReplicaConnectionProvider((RedisClient)client , codec , nodes , readFrom) ;
} else {
return (LettuceConnectionProvider)( this . isClusterAware () ? new ClusterConnectionProvider((RedisClusterClient)client , codec , readFrom) : new StandaloneConnectionProvider((RedisClient)client , codec , readFrom) );
最终会走到最后一行的最后一条语句new StandaloneConnectionProvider((RedisClient)client, codec, readFrom));
Copy @ Override
public <T extends StatefulConnection<? , ?>> T getConnection( Class< T > connectionType) {
if ( connectionType . equals ( StatefulRedisSentinelConnection . class )) {
return connectionType . cast ( client . connectSentinel ());
if ( connectionType . equals ( StatefulRedisPubSubConnection . class )) {
return connectionType . cast ( client . connectPubSub (codec));
if ( StatefulConnection . class . isAssignableFrom (connectionType)) {
return connectionType . cast ( readFrom . map (it -> this . masterReplicaConnection ( redisURISupplier . get () , it))
. orElseGet (() -> client . connect (codec)));
throw new UnsupportedOperationException( "Connection type " + connectionType + " not supported!" ) ;
Copy private StatefulRedisConnection masterReplicaConnection( RedisURI redisUri , ReadFrom readFrom) {
StatefulRedisMasterReplicaConnection < ? , ? > connection = MasterReplica . connect (client , codec , redisUri);
connection . setReadFrom (readFrom);
return connection;
Copy private static < K , V > CompletableFuture<StatefulRedisMasterReplicaConnection< K , V >> connectAsyncSentinelOrAutodiscovery(
RedisClient redisClient , RedisCodec< K , V > codec , RedisURI redisURI) {
LettuceAssert . notNull (redisClient , "RedisClient must not be null" );
LettuceAssert . notNull (codec , "RedisCodec must not be null" );
LettuceAssert . notNull (redisURI , "RedisURI must not be null" );
if ( isSentinel(redisURI) ) {
return new SentinelConnector <>(redisClient , codec , redisURI) . connectAsync ();
return new AutodiscoveryConnector <>(redisClient , codec , redisURI) . connectAsync ();
Copy private Mono<StatefulRedisMasterReplicaConnection< K , V >> initializeConnection( RedisCodec< K , V > codec ,
Tuple2< RedisURI , StatefulRedisConnection< K , V >> connectionAndUri) {
ReplicaTopologyProvider topologyProvider = new ReplicaTopologyProvider( connectionAndUri . getT2() ,
connectionAndUri . getT1()) ;
MasterReplicaTopologyRefresh refresh = new MasterReplicaTopologyRefresh(redisClient , topologyProvider) ;
MasterReplicaConnectionProvider < K , V > connectionProvider = new MasterReplicaConnectionProvider <>(redisClient , codec ,
redisURI , (Map) initialConnections);
Mono < List < RedisNodeDescription >> refreshFuture = refresh . getNodes (redisURI);
return refreshFuture . map (nodes -> {
EventRecorder . getInstance () . record ( new MasterReplicaTopologyChangedEvent(redisURI , nodes) );
connectionProvider . setKnownNodes (nodes);
MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider ,
redisClient . getResources()) ;
StatefulRedisMasterReplicaConnectionImpl < K , V > connection = new StatefulRedisMasterReplicaConnectionImpl <>(
channelWriter , codec , redisURI . getTimeout ());
connection . setOptions ( redisClient . getOptions ());
return connection;
Copy @ Override
@ SuppressWarnings ( "unchecked" )
public < K , V , T > RedisCommand< K , V , T > write( RedisCommand< K , V , T > command) {
LettuceAssert . notNull (command , "Command must not be null" );
if (closed) {
throw new RedisException( "Connection is closed" ) ;
if ( isStartTransaction( command . getType()) ) {
inTransaction = true ;
Intent intent = inTransaction ? Intent . WRITE : getIntent( command . getType()) ;
CompletableFuture < StatefulRedisConnection < K , V >> future = (CompletableFuture) masterReplicaConnectionProvider
. getConnectionAsync (intent);
if ( isEndTransaction( command . getType()) ) {
inTransaction = false ;
if ( isSuccessfullyCompleted(future) ) {
writeCommand(command , future . join() , null ) ;
} else {
future . whenComplete ((c , t) -> writeCommand(command , c , t) );
return command;
重点在CompletableFuture<StatefulRedisConnection<K, V>> future = (CompletableFuture) masterReplicaConnectionProvider .getConnectionAsync(intent);
Copy public CompletableFuture<StatefulRedisConnection< K , V >> getConnectionAsync( Intent intent) {
if (debugEnabled) {
logger . debug ( "getConnectionAsync(" + intent + ")" );
if (readFrom != null && intent == Intent . READ ) {
List < RedisNodeDescription > selection = readFrom . select ( new ReadFrom . Nodes () {
@ Override
public List< RedisNodeDescription > getNodes() {
return knownNodes;
@ Override
public Iterator< RedisNodeDescription > iterator() {
return knownNodes . iterator ();
if ( selection . isEmpty ()) {
throw new RedisException( String . format( "Cannot determine a node to read (Known nodes: %s) with setting %s" ,
knownNodes , readFrom)) ;
try {
Flux < StatefulRedisConnection < K , V >> connections = Flux . empty ();
for ( RedisNodeDescription node : selection) {
connections = connections . concatWith ( Mono . fromFuture ( getConnection(node) ));
if ( OrderingReadFromAccessor . isOrderSensitive (readFrom) || selection . size () == 1 ) {
return connections . filter (StatefulConnection :: isOpen) . next () . switchIfEmpty ( connections . next ()) . toFuture ();
return connections . filter (StatefulConnection :: isOpen) . collectList () . map (it -> {
int index = ThreadLocalRandom . current () . nextInt ( it . size ());
return it . get (index);
}) . switchIfEmpty ( connections . next ()) . toFuture ();
} catch ( RuntimeException e) {
throw Exceptions . bubble (e);
return getConnection(getMaster()) ;
其中有一行需要注意;if (readFrom != null && intent == Intent.READ) 块中的内容
Copy @ ConditionalOnProperty (value = "spring.redis.replica-preferred" , havingValue = "true" )
@ Component
@ Slf4j
public class RedisPreferredLettuceClientConfigurationBuilderCustomizer implements LettuceClientConfigurationBuilderCustomizer , InitializingBean {
@ Override
public void customize ( LettuceClientConfiguration . LettuceClientConfigurationBuilder clientConfigurationBuilder) {
log . info ( "RedisPreferredLettuceClientConfigurationBuilderCustomizer.customize" );
clientConfigurationBuilder . readFrom ( ReadFrom . REPLICA_PREFERRED );
@ Override
public void afterPropertiesSet () throws Exception {
log . info ( "RedisPreferredLettuceClientConfigurationBuilderCustomizer inited" );
Last updated 8 months ago