@Slf4j
public class NearbyServiceLoadBalancer implements ReactorServiceInstanceLoadBalancer {
public static final String SWITCH_KEY = "discovery.nearby.enable"; // 开关配置key
private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
private final String serviceId;
private Environment environment;
private final AtomicInteger position;
private final String nearbyIp; // 本机ip
private RoundRobinLoadBalancer roundRobinLoadBalancer;
public NearbyServiceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, Environment environment, InetUtils inetUtils) {
this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
this.serviceId = serviceId;
this.environment = environment;
this.position = new AtomicInteger(1000);
this.nearbyIp = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
this.roundRobinLoadBalancer = new RoundRobinLoadBalancer(serviceInstanceListSupplierProvider, serviceId);
log.info("NearbyServiceLoadBalancer.construct, serviceId={}, nearbyIp={}", this.serviceId, this.nearbyIp);
}
/**
* 判断是否走就近访问策略
**/
private boolean disableNearby() {
return StringUtils.isBlank(this.nearbyIp)
|| !Boolean.TRUE.toString().equals(environment.getProperty(SWITCH_KEY));
}
@Override
public Mono<Response<ServiceInstance>> choose(Request request) {
if (disableNearby()) {
return this.roundRobinLoadBalancer.choose(request);
}
ServiceInstanceListSupplier supplier = this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
return supplier.get(request).next().map((serviceInstances) -> this.processInstanceResponse(supplier, serviceInstances));
}
protected Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances) {
Response<ServiceInstance> serviceInstanceResponse = this.getInstanceResponse(serviceInstances);
if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
}
return serviceInstanceResponse;
}
private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
ServiceInstance instance = selectInstance(instances);
return instance == null ? new EmptyResponse() : new DefaultResponse(instance);
}
/**
* 选择最合适的节点
**/
protected ServiceInstance selectInstance(List<ServiceInstance> instances) {
if (log.isDebugEnabled()) {
log.debug("NearbyServiceLoadBalancer.selectInstance, instances={}", formatServiceInstances(instances));
}
if (CollectionUtils.isEmpty(instances)) {
return null;
}
// 只有一个候选节点,不用走策略,直接使用
if (instances.size() == 1) {
return instances.get(0);
}
ServiceInstance instance = instances.stream()
.filter(o -> this.nearbyIp.equals(o.getHost()))
.findFirst()
.orElse(null);
if (instance == null) {
int pos = Math.abs(this.position.incrementAndGet());
instance = instances.get(pos % instances.size());
}
if (log.isDebugEnabled()) {
log.debug("NearbyServiceLoadBalancer.selectInstance, instance={}", formatServiceInstance(instance));
}
return instance;
}
private String formatServiceInstances(List<ServiceInstance> serviceInstances) {
if (serviceInstances == null) {
return null;
}
if (serviceInstances.size() == 0) {
return "[]";
}
return "[" + serviceInstances.stream().map(this::formatServiceInstance).reduce((s1, s2) -> s1 + "," + s2).get() + "]";
}
private String formatServiceInstance(ServiceInstance serviceInstance) {
if (serviceInstance == null) {
return null;
}
return "{instanceId=" + serviceInstance.getInstanceId() + "}";
}
}