Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,18 @@ public GapicSpannerRpc(final SpannerOptions options) {
options, headerProviderWithUserAgent, isEnableDirectAccess);
GrpcGcpEndpointChannelConfigurator endpointChannelConfigurator =
createGrpcGcpEndpointChannelConfigurator(defaultChannelProviderBuilder, options);
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

if (options.getChannelProvider() == null
&& isEnableDirectAccess
&& options.isEnableGcpFallback()) {
boolean useGcpFallback =
options.getChannelProvider() == null
&& isEnableDirectAccess
&& options.isEnableGcpFallback();
if (useGcpFallback) {
setupGcpFallback(
defaultChannelProviderBuilder,
options,
headerProviderWithUserAgent,
credentialsProvider);
} else {
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
}

boolean enableLocationApi = options.isEnableLocationApi();
Expand Down Expand Up @@ -656,8 +658,11 @@ private void setupGcpFallback(
final HeaderProvider headerProviderWithUserAgent,
final CredentialsProvider credentialsProvider) {
InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder =
createChannelProviderBuilder(
createBaseChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);
if (options.isGrpcGcpExtensionEnabled()) {
cloudPathProviderBuilder.setPoolSize(1);
}

InstantiatingGrpcChannelProvider cloudPathProvider = cloudPathProviderBuilder.build();
ManagedChannelBuilder cloudPathBuilder;
Expand Down Expand Up @@ -689,29 +694,35 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> existingConfigurator =
defaultChannelProviderBuilder.getChannelConfigurator();
if (options.isGrpcGcpExtensionEnabled()) {
defaultChannelProviderBuilder.setPoolSize(1);
}
defaultChannelProviderBuilder.setChannelConfigurator(
directPathBuilder -> {
ManagedChannelBuilder builder = directPathBuilder;
if (existingConfigurator != null) {
builder = existingConfigurator.apply(builder);
}

String jsonApiConfig = parseGrpcGcpApiConfig();
GcpManagedChannelOptions gcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
if (gcpOptions == null) {
gcpOptions = GcpManagedChannelOptions.newBuilder().build();
ManagedChannelBuilder<?> primaryBuilder = builder;
ManagedChannelBuilder<?> fallbackBuilder = cloudPathBuilder;
if (options.isGrpcGcpExtensionEnabled()) {
String jsonApiConfig = parseGrpcGcpApiConfig();
GcpManagedChannelOptions primaryGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
GcpManagedChannelOptions fallbackGcpOptions =
GcpManagedChannelOptions.newBuilder(primaryGcpOptions)
.withChannelPoolOptions(getGrpcGcpLazyFallbackChannelPoolOptions(options))
.build();
primaryBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(primaryGcpOptions);
fallbackBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(fallbackGcpOptions);
}
Comment on lines +707 to 724
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The logic for configuring GCP Managed Channel options has a regression compared to the previous implementation. Specifically, the null check for grpcGcpOptionsWithMetricsAndDcp(options) has been removed, which can lead to a NullPointerException when calling GcpManagedChannelOptions.newBuilder(...). Additionally, grpcGcpOptionsWithMetricsAndDcp(options) is called twice (once directly and once inside grpcGcpOptionsWithMetricsAndLazyFallback), which is redundant.

I recommend refactoring this block to compute the base options once, ensure they are non-null, and then derive the fallback options from them.

          ManagedChannelBuilder<?> primaryBuilder = builder;
          ManagedChannelBuilder<?> fallbackBuilder = cloudPathBuilder;
          if (options.isGrpcGcpExtensionEnabled()) {
            String jsonApiConfig = parseGrpcGcpApiConfig();
            GcpManagedChannelOptions primaryGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
            if (primaryGcpOptions == null) {
              primaryGcpOptions = GcpManagedChannelOptions.newBuilder().build();
            }
            GcpManagedChannelOptions fallbackGcpOptions =
                GcpManagedChannelOptions.newBuilder(primaryGcpOptions)
                    .withChannelPoolOptions(getGrpcGcpLazyFallbackChannelPoolOptions(options))
                    .build();
            primaryBuilder =
                GcpManagedChannelBuilder.forDelegateBuilder(builder)
                    .withApiConfigJsonString(jsonApiConfig)
                    .withOptions(primaryGcpOptions);
            fallbackBuilder =
                GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder)
                    .withApiConfigJsonString(jsonApiConfig)
                    .withOptions(fallbackGcpOptions);
          }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


GcpManagedChannelBuilder primaryGcpBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);

GcpManagedChannelBuilder fallbackGcpBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);

GcpFallbackOpenTelemetry fallbackTelemetry =
GcpFallbackOpenTelemetry.newBuilder()
.withSdk(getFallbackOpenTelemetry(options))
Expand All @@ -720,9 +731,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
.build();

return new FallbackChannelBuilder(
primaryGcpBuilder,
fallbackGcpBuilder,
createFallbackChannelOptions(fallbackTelemetry, 1));
primaryBuilder, fallbackBuilder, createFallbackChannelOptions(fallbackTelemetry, 1));
});
}

Expand Down Expand Up @@ -841,6 +850,14 @@ static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options
.build();
}

@VisibleForTesting
static GcpChannelPoolOptions getGrpcGcpLazyFallbackChannelPoolOptions(SpannerOptions options) {
return GcpChannelPoolOptions.newBuilder(getGrpcGcpChannelPoolOptions(options))
.setMinSize(0)
.setInitSize(0)
.build();
}

@VisibleForTesting
static GcpChannelPoolOptions getGrpcGcpEndpointChannelPoolOptions(SpannerOptions options) {
GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions();
Expand Down Expand Up @@ -2595,15 +2612,15 @@ private static class FallbackChannelBuilder
extends ForwardingChannelBuilder2<FallbackChannelBuilder> {
private final GcpFallbackChannelOptions options;

private final GcpManagedChannelBuilder primaryGcpBuilder;
private final GcpManagedChannelBuilder fallbackGcpBuilder;
private final ManagedChannelBuilder<?> primaryBuilder;
private final ManagedChannelBuilder<?> fallbackBuilder;

private FallbackChannelBuilder(
GcpManagedChannelBuilder primary,
GcpManagedChannelBuilder fallback,
ManagedChannelBuilder<?> primary,
ManagedChannelBuilder<?> fallback,
GcpFallbackChannelOptions options) {
this.primaryGcpBuilder = primary;
this.fallbackGcpBuilder = fallback;
this.primaryBuilder = primary;
this.fallbackBuilder = fallback;
this.options = options;
}

Expand All @@ -2613,7 +2630,7 @@ private FallbackChannelBuilder(
*/
@Override
protected ManagedChannelBuilder<?> delegate() {
return primaryGcpBuilder;
return primaryBuilder;
}

/**
Expand All @@ -2622,7 +2639,7 @@ protected ManagedChannelBuilder<?> delegate() {
*/
@Override
public ManagedChannel build() {
return new GcpFallbackChannel(options, primaryGcpBuilder, fallbackGcpBuilder);
return new GcpFallbackChannel(options, primaryBuilder, fallbackBuilder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.io.IOException;
import java.lang.reflect.Array;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -172,6 +177,25 @@ public class GapicSpannerRpcTest {
new java.util.Date(
System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS))));

private static final String GRPC_GCP_CHANNEL_REF_CLASS_NAME =
"com.google.cloud.grpc.GcpManagedChannel$ChannelRef";

private static final class GrpcGcpObjectCounts {
int gcpManagedChannels;
int channelRefs;

GrpcGcpObjectCounts minus(GrpcGcpObjectCounts other) {
GrpcGcpObjectCounts difference = new GrpcGcpObjectCounts();
difference.gcpManagedChannels = gcpManagedChannels - other.gcpManagedChannels;
difference.channelRefs = channelRefs - other.channelRefs;
return difference;
}

String debugString() {
return "GcpManagedChannel=" + gcpManagedChannels + ", ChannelRef=" + channelRefs;
}
}

private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static InetSocketAddress address;
Expand Down Expand Up @@ -1240,6 +1264,27 @@ public void testGrpcGcpOptionsIncludeStaticChannelPoolSettingsWithoutDcp() throw
assertEquals(Duration.ZERO, grpcGcpOptions.getChannelPoolOptions().getScaleDownInterval());
}

@Test
public void testGrpcGcpLazyFallbackChannelPoolOptionsStartWithoutChannels() {
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("[PROJECT]")
.enableGrpcGcpExtension()
.disableDynamicChannelPool()
.setNumChannels(8)
.build();

GcpChannelPoolOptions poolOptions =
GapicSpannerRpc.getGrpcGcpLazyFallbackChannelPoolOptions(options);

assertEquals(8, poolOptions.getMaxSize());
assertEquals(0, poolOptions.getMinSize());
assertEquals(0, poolOptions.getInitSize());
assertEquals(0, poolOptions.getMinRpcPerChannel());
assertEquals(0, poolOptions.getMaxRpcPerChannel());
assertEquals(Duration.ZERO, poolOptions.getScaleDownInterval());
}

@Test
public void testGrpcGcpOptionsRetainDynamicChannelPoolSettingsWithDcp() throws Exception {
Duration affinityKeyLifetime = Duration.ofMinutes(10);
Expand Down Expand Up @@ -1409,6 +1454,134 @@ private SpannerOptions createSpannerOptions() {
.build();
}

@Test
public void testDirectPathFallbackCreatesLazyCloudPathGrpcGcpPool() {
SpannerOptions.useEnvironment(new SpannerOptions.SpannerEnvironment() {});
GapicSpannerRpc rpc = null;
try {
SpannerOptions options = createDirectPathFallbackObjectCountOptions().build();
assumeTrue(
"GCP fallback must be enabled for this DirectPath fallback test",
options.isEnableGcpFallback());
GrpcGcpObjectCounts before = countGrpcGcpObjectsFromChannelz();
rpc = new GapicSpannerRpc(options);
GrpcGcpObjectCounts counts = countGrpcGcpObjectsFromChannelz().minus(before);
assertEquals(counts.debugString(), 3, counts.gcpManagedChannels);
assertEquals(counts.debugString(), 24, counts.channelRefs);
} finally {
if (rpc != null) {
rpc.shutdown();
}
SpannerOptions.useDefaultEnvironment();
}
}

@Test
public void testDirectPathFallbackWithGaxChannelPoolDoesNotCreateGrpcGcpChannelRefs() {
SpannerOptions.useEnvironment(new SpannerOptions.SpannerEnvironment() {});
GapicSpannerRpc rpc = null;
try {
SpannerOptions options =
createDirectPathFallbackObjectCountOptions().disableGrpcGcpExtension().build();
assumeTrue(
"GCP fallback must be enabled for this DirectPath fallback test",
options.isEnableGcpFallback());
GrpcGcpObjectCounts before = countGrpcGcpObjectsFromChannelz();
rpc = new GapicSpannerRpc(options);
GrpcGcpObjectCounts counts = countGrpcGcpObjectsFromChannelz().minus(before);
assertEquals(counts.debugString(), 0, counts.gcpManagedChannels);
assertEquals(counts.debugString(), 0, counts.channelRefs);
} finally {
if (rpc != null) {
rpc.shutdown();
}
SpannerOptions.useDefaultEnvironment();
}
}

private SpannerOptions.Builder createDirectPathFallbackObjectCountOptions() {
return SpannerOptions.newBuilder()
.setProjectId("test-project")
.setEnableDirectAccess(true)
.setHost("http://localhost:1")
.setCredentials(NoCredentials.getInstance());
}

private static GrpcGcpObjectCounts countGrpcGcpObjectsFromChannelz() {
GrpcGcpObjectCounts counts = new GrpcGcpObjectCounts();
Object channelz = io.grpc.InternalChannelz.instance();
Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>());
countGrpcGcpObjectsFromChannelzField(channelz, "rootChannels", visited, counts);
countGrpcGcpObjectsFromChannelzField(channelz, "subchannels", visited, counts);
return counts;
}

private static void countGrpcGcpObjectsFromChannelzField(
Object channelz, String fieldName, Set<Object> visited, GrpcGcpObjectCounts counts) {
try {
java.lang.reflect.Field field = channelz.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
countGrpcGcpObjects(field.get(channelz), visited, counts);
} catch (RuntimeException | ReflectiveOperationException ignored) {
// Ignore fields that are not reflectively accessible in this runtime.
}
}

private static void countGrpcGcpObjects(
Object object, Set<Object> visited, GrpcGcpObjectCounts counts) {
if (object == null || !visited.add(object)) {
return;
}
if (object instanceof GcpManagedChannel) {
counts.gcpManagedChannels++;
}
Class<?> clazz = object.getClass();
if (clazz.getName().equals(GRPC_GCP_CHANNEL_REF_CLASS_NAME)) {
counts.channelRefs++;
}
if (object instanceof Collection<?>) {
for (Object value : (Collection<?>) object) {
countGrpcGcpObjects(value, visited, counts);
}
return;
}
if (object instanceof Map<?, ?>) {
for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) {
countGrpcGcpObjects(entry.getKey(), visited, counts);
countGrpcGcpObjects(entry.getValue(), visited, counts);
}
return;
}
if (clazz.isArray()) {
int length = Array.getLength(object);
for (int i = 0; i < length; i++) {
countGrpcGcpObjects(Array.get(object, i), visited, counts);
}
return;
}
if (!shouldInspectFields(clazz)) {
return;
}
for (Class<?> current = clazz; current != null; current = current.getSuperclass()) {
for (java.lang.reflect.Field field : current.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
try {
field.setAccessible(true);
countGrpcGcpObjects(field.get(object), visited, counts);
} catch (RuntimeException | IllegalAccessException ignored) {
// Ignore fields that are not reflectively accessible in this runtime.
}
}
}
}

private static boolean shouldInspectFields(Class<?> clazz) {
String name = clazz.getName();
return name.startsWith("com.google.") || name.startsWith("io.grpc.");
}

static class TestableGapicSpannerRpc extends GapicSpannerRpc {
public TestableGapicSpannerRpc(SpannerOptions options) {
super(options);
Expand Down
Loading