Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,18 @@ public GapicSpannerRpc(final SpannerOptions options) {
options, headerProviderWithUserAgent, isEnableDirectAccess);
GrpcGcpEndpointChannelConfigurator endpointChannelConfigurator =
createGrpcGcpEndpointChannelConfigurator(defaultChannelProviderBuilder, options);
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);

if (options.getChannelProvider() == null
&& isEnableDirectAccess
&& options.isEnableGcpFallback()) {
boolean useGcpFallback =
options.getChannelProvider() == null
&& isEnableDirectAccess
&& options.isEnableGcpFallback();
if (useGcpFallback) {
setupGcpFallback(
defaultChannelProviderBuilder,
options,
headerProviderWithUserAgent,
credentialsProvider);
} else {
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
}
Comment on lines +385 to 387
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When the GCP extension is enabled, the GAX pool size should be set to 1 to avoid redundant connection pooling, as the extension manages its own internal pool. This ensures consistency with the logic used when fallback is enabled (see line 701).

Suggested change
} else {
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
}
} else {
maybeEnableGrpcGcpExtension(defaultChannelProviderBuilder, options);
if (options.isGrpcGcpExtensionEnabled()) {
defaultChannelProviderBuilder.setPoolSize(1);
}
}


boolean enableLocationApi = options.isEnableLocationApi();
Expand All @@ -397,6 +399,12 @@ public GapicSpannerRpc(final SpannerOptions options) {
options.getChannelEndpointCacheFactory(),
endpointChannelConfigurator)
: baseChannelProvider;
TransportChannelProvider adminChannelProvider =
useGcpFallback
? createChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false)
.build()
: channelProvider;
Comment on lines +402 to +407
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When useGcpFallback is true, the adminChannelProvider is created without calling maybeEnableGrpcGcpExtension. This is a regression compared to the case where fallback is disabled, as admin clients will miss out on extension features like metrics and pooling. Additionally, the pool size should be set to 1 if the extension is enabled to avoid nested pools.

Suggested change
TransportChannelProvider adminChannelProvider =
useGcpFallback
? createChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false)
.build()
: channelProvider;
TransportChannelProvider adminChannelProvider;
if (useGcpFallback) {
InstantiatingGrpcChannelProvider.Builder adminBuilder =
createChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);
maybeEnableGrpcGcpExtension(adminBuilder, options);
if (options.isGrpcGcpExtensionEnabled()) {
adminBuilder.setPoolSize(1);
}
adminChannelProvider = adminBuilder.build();
} else {
adminChannelProvider = channelProvider;
}


spannerWatchdog =
Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -495,7 +503,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
pdmlSettings.build(), clientContext);
this.instanceAdminStubSettings =
options.getInstanceAdminStubSettings().toBuilder()
.setTransportChannelProvider(channelProvider)
.setTransportChannelProvider(adminChannelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.setTracerFactory(
Expand All @@ -506,7 +514,7 @@ public GapicSpannerRpc(final SpannerOptions options) {

this.databaseAdminStubSettings =
options.getDatabaseAdminStubSettings().toBuilder()
.setTransportChannelProvider(channelProvider)
.setTransportChannelProvider(adminChannelProvider)
.setCredentialsProvider(credentialsProvider)
.setStreamWatchdogProvider(watchdogProvider)
.setTracerFactory(
Expand Down Expand Up @@ -656,8 +664,11 @@ private void setupGcpFallback(
final HeaderProvider headerProviderWithUserAgent,
final CredentialsProvider credentialsProvider) {
InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder =
createChannelProviderBuilder(
createBaseChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);
Comment on lines 666 to 668
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The cloudPathProviderBuilder should have its pool size set to 1 when the GCP extension is enabled. Since this builder is used for the fallback path which is later wrapped in a GcpManagedChannel (line 722), leaving the default GAX pool size would result in multiple extension-managed pools, which is inefficient.

Suggested change
InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder =
createChannelProviderBuilder(
createBaseChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);
InstantiatingGrpcChannelProvider.Builder cloudPathProviderBuilder =
createBaseChannelProviderBuilder(
options, headerProviderWithUserAgent, /* isEnableDirectAccess= */ false);
if (options.isGrpcGcpExtensionEnabled()) {
cloudPathProviderBuilder.setPoolSize(1);
}

if (options.isGrpcGcpExtensionEnabled()) {
cloudPathProviderBuilder.setPoolSize(1);
}

InstantiatingGrpcChannelProvider cloudPathProvider = cloudPathProviderBuilder.build();
ManagedChannelBuilder cloudPathBuilder;
Expand Down Expand Up @@ -689,29 +700,34 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(

final ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> existingConfigurator =
defaultChannelProviderBuilder.getChannelConfigurator();
if (options.isGrpcGcpExtensionEnabled()) {
defaultChannelProviderBuilder.setPoolSize(1);
}
defaultChannelProviderBuilder.setChannelConfigurator(
directPathBuilder -> {
ManagedChannelBuilder builder = directPathBuilder;
if (existingConfigurator != null) {
builder = existingConfigurator.apply(builder);
}

String jsonApiConfig = parseGrpcGcpApiConfig();
GcpManagedChannelOptions gcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
if (gcpOptions == null) {
gcpOptions = GcpManagedChannelOptions.newBuilder().build();
ManagedChannelBuilder<?> primaryBuilder = builder;
ManagedChannelBuilder<?> fallbackBuilder = cloudPathBuilder;
if (options.isGrpcGcpExtensionEnabled()) {
String jsonApiConfig = parseGrpcGcpApiConfig();
GcpManagedChannelOptions gcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);
if (gcpOptions == null) {
gcpOptions = GcpManagedChannelOptions.newBuilder().build();
}
primaryBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);
fallbackBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);
}

GcpManagedChannelBuilder primaryGcpBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);

GcpManagedChannelBuilder fallbackGcpBuilder =
GcpManagedChannelBuilder.forDelegateBuilder(cloudPathBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(gcpOptions);

GcpFallbackOpenTelemetry fallbackTelemetry =
GcpFallbackOpenTelemetry.newBuilder()
.withSdk(getFallbackOpenTelemetry(options))
Expand All @@ -720,9 +736,7 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
.build();

return new FallbackChannelBuilder(
primaryGcpBuilder,
fallbackGcpBuilder,
createFallbackChannelOptions(fallbackTelemetry, 1));
primaryBuilder, fallbackBuilder, createFallbackChannelOptions(fallbackTelemetry, 1));
});
}

Expand Down Expand Up @@ -2595,15 +2609,15 @@ private static class FallbackChannelBuilder
extends ForwardingChannelBuilder2<FallbackChannelBuilder> {
private final GcpFallbackChannelOptions options;

private final GcpManagedChannelBuilder primaryGcpBuilder;
private final GcpManagedChannelBuilder fallbackGcpBuilder;
private final ManagedChannelBuilder<?> primaryBuilder;
private final ManagedChannelBuilder<?> fallbackBuilder;

private FallbackChannelBuilder(
GcpManagedChannelBuilder primary,
GcpManagedChannelBuilder fallback,
ManagedChannelBuilder<?> primary,
ManagedChannelBuilder<?> fallback,
GcpFallbackChannelOptions options) {
this.primaryGcpBuilder = primary;
this.fallbackGcpBuilder = fallback;
this.primaryBuilder = primary;
this.fallbackBuilder = fallback;
this.options = options;
}

Expand All @@ -2613,7 +2627,7 @@ private FallbackChannelBuilder(
*/
@Override
protected ManagedChannelBuilder<?> delegate() {
return primaryGcpBuilder;
return primaryBuilder;
}

/**
Expand All @@ -2622,7 +2636,7 @@ protected ManagedChannelBuilder<?> delegate() {
*/
@Override
public ManagedChannel build() {
return new GcpFallbackChannel(options, primaryGcpBuilder, fallbackGcpBuilder);
return new GcpFallbackChannel(options, primaryBuilder, fallbackBuilder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,17 @@
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import java.io.IOException;
import java.lang.reflect.Array;
import java.lang.reflect.Modifier;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -172,6 +177,25 @@ public class GapicSpannerRpcTest {
new java.util.Date(
System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS))));

private static final String GRPC_GCP_CHANNEL_REF_CLASS_NAME =
"com.google.cloud.grpc.GcpManagedChannel$ChannelRef";

private static final class GrpcGcpObjectCounts {
int gcpManagedChannels;
int channelRefs;

GrpcGcpObjectCounts minus(GrpcGcpObjectCounts other) {
GrpcGcpObjectCounts difference = new GrpcGcpObjectCounts();
difference.gcpManagedChannels = gcpManagedChannels - other.gcpManagedChannels;
difference.channelRefs = channelRefs - other.channelRefs;
return difference;
}

String debugString() {
return "GcpManagedChannel=" + gcpManagedChannels + ", ChannelRef=" + channelRefs;
}
}

private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static InetSocketAddress address;
Expand Down Expand Up @@ -1409,6 +1433,134 @@ private SpannerOptions createSpannerOptions() {
.build();
}

@Test
public void testDirectPathFallbackUsesCloudPathForAdminClients() {
SpannerOptions.useEnvironment(new SpannerOptions.SpannerEnvironment() {});
GapicSpannerRpc rpc = null;
try {
SpannerOptions options = createDirectPathFallbackObjectCountOptions().build();
assumeTrue(
"GCP fallback must be enabled for this DirectPath fallback test",
options.isEnableGcpFallback());
GrpcGcpObjectCounts before = countGrpcGcpObjectsFromChannelz();
rpc = new GapicSpannerRpc(options);
GrpcGcpObjectCounts counts = countGrpcGcpObjectsFromChannelz().minus(before);
assertEquals(counts.debugString(), 4, counts.gcpManagedChannels);
assertEquals(counts.debugString(), 32, counts.channelRefs);
} finally {
if (rpc != null) {
rpc.shutdown();
}
SpannerOptions.useDefaultEnvironment();
}
}

@Test
public void testDirectPathFallbackWithGaxChannelPoolDoesNotCreateGrpcGcpChannelRefs() {
SpannerOptions.useEnvironment(new SpannerOptions.SpannerEnvironment() {});
GapicSpannerRpc rpc = null;
try {
SpannerOptions options =
createDirectPathFallbackObjectCountOptions().disableGrpcGcpExtension().build();
assumeTrue(
"GCP fallback must be enabled for this DirectPath fallback test",
options.isEnableGcpFallback());
GrpcGcpObjectCounts before = countGrpcGcpObjectsFromChannelz();
rpc = new GapicSpannerRpc(options);
GrpcGcpObjectCounts counts = countGrpcGcpObjectsFromChannelz().minus(before);
assertEquals(counts.debugString(), 0, counts.gcpManagedChannels);
assertEquals(counts.debugString(), 0, counts.channelRefs);
} finally {
if (rpc != null) {
rpc.shutdown();
}
SpannerOptions.useDefaultEnvironment();
}
}

private SpannerOptions.Builder createDirectPathFallbackObjectCountOptions() {
return SpannerOptions.newBuilder()
.setProjectId("test-project")
.setEnableDirectAccess(true)
.setHost("http://localhost:1")
.setCredentials(NoCredentials.getInstance());
}

private static GrpcGcpObjectCounts countGrpcGcpObjectsFromChannelz() {
GrpcGcpObjectCounts counts = new GrpcGcpObjectCounts();
Object channelz = io.grpc.InternalChannelz.instance();
Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>());
countGrpcGcpObjectsFromChannelzField(channelz, "rootChannels", visited, counts);
countGrpcGcpObjectsFromChannelzField(channelz, "subchannels", visited, counts);
return counts;
}

private static void countGrpcGcpObjectsFromChannelzField(
Object channelz, String fieldName, Set<Object> visited, GrpcGcpObjectCounts counts) {
try {
java.lang.reflect.Field field = channelz.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
countGrpcGcpObjects(field.get(channelz), visited, counts);
} catch (RuntimeException | ReflectiveOperationException ignored) {
// Ignore fields that are not reflectively accessible in this runtime.
}
}

private static void countGrpcGcpObjects(
Object object, Set<Object> visited, GrpcGcpObjectCounts counts) {
if (object == null || !visited.add(object)) {
return;
}
if (object instanceof GcpManagedChannel) {
counts.gcpManagedChannels++;
}
Class<?> clazz = object.getClass();
if (clazz.getName().equals(GRPC_GCP_CHANNEL_REF_CLASS_NAME)) {
counts.channelRefs++;
}
if (object instanceof Collection<?>) {
for (Object value : (Collection<?>) object) {
countGrpcGcpObjects(value, visited, counts);
}
return;
}
if (object instanceof Map<?, ?>) {
for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) {
countGrpcGcpObjects(entry.getKey(), visited, counts);
countGrpcGcpObjects(entry.getValue(), visited, counts);
}
return;
}
if (clazz.isArray()) {
int length = Array.getLength(object);
for (int i = 0; i < length; i++) {
countGrpcGcpObjects(Array.get(object, i), visited, counts);
}
return;
}
if (!shouldInspectFields(clazz)) {
return;
}
for (Class<?> current = clazz; current != null; current = current.getSuperclass()) {
for (java.lang.reflect.Field field : current.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers())) {
continue;
}
try {
field.setAccessible(true);
countGrpcGcpObjects(field.get(object), visited, counts);
} catch (RuntimeException | IllegalAccessException ignored) {
// Ignore fields that are not reflectively accessible in this runtime.
}
}
}
}

private static boolean shouldInspectFields(Class<?> clazz) {
String name = clazz.getName();
return name.startsWith("com.google.") || name.startsWith("io.grpc.");
}

static class TestableGapicSpannerRpc extends GapicSpannerRpc {
public TestableGapicSpannerRpc(SpannerOptions options) {
super(options);
Expand Down
Loading