-
Notifications
You must be signed in to change notification settings - Fork 1.1k
fix(spanner): avoid double grpc-gcp wrapping for directpath fallback #13155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -103,12 +103,17 @@ | |||||||||||
| import io.opentelemetry.sdk.trace.SdkTracerProvider; | ||||||||||||
| import io.opentelemetry.sdk.trace.samplers.Sampler; | ||||||||||||
| import java.io.IOException; | ||||||||||||
| import java.lang.reflect.Array; | ||||||||||||
| import java.lang.reflect.Modifier; | ||||||||||||
| import java.net.InetSocketAddress; | ||||||||||||
| import java.time.Duration; | ||||||||||||
| import java.util.Collection; | ||||||||||||
| import java.util.Collections; | ||||||||||||
| import java.util.HashMap; | ||||||||||||
| import java.util.IdentityHashMap; | ||||||||||||
| import java.util.Map; | ||||||||||||
| import java.util.Objects; | ||||||||||||
| import java.util.Set; | ||||||||||||
| import java.util.concurrent.Executor; | ||||||||||||
| import java.util.concurrent.ScheduledExecutorService; | ||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||
|
|
@@ -172,6 +177,25 @@ public class GapicSpannerRpcTest { | |||||||||||
| new java.util.Date( | ||||||||||||
| System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(1L, TimeUnit.DAYS)))); | ||||||||||||
|
|
||||||||||||
| private static final String GRPC_GCP_CHANNEL_REF_CLASS_NAME = | ||||||||||||
| "com.google.cloud.grpc.GcpManagedChannel$ChannelRef"; | ||||||||||||
|
|
||||||||||||
| private static final class GrpcGcpObjectCounts { | ||||||||||||
| int gcpManagedChannels; | ||||||||||||
| int channelRefs; | ||||||||||||
|
|
||||||||||||
| GrpcGcpObjectCounts minus(GrpcGcpObjectCounts other) { | ||||||||||||
| GrpcGcpObjectCounts difference = new GrpcGcpObjectCounts(); | ||||||||||||
| difference.gcpManagedChannels = gcpManagedChannels - other.gcpManagedChannels; | ||||||||||||
| difference.channelRefs = channelRefs - other.channelRefs; | ||||||||||||
| return difference; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| String debugString() { | ||||||||||||
| return "GcpManagedChannel=" + gcpManagedChannels + ", ChannelRef=" + channelRefs; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static MockSpannerServiceImpl mockSpanner; | ||||||||||||
| private static Server server; | ||||||||||||
| private static InetSocketAddress address; | ||||||||||||
|
|
@@ -1409,6 +1433,134 @@ private SpannerOptions createSpannerOptions() { | |||||||||||
| .build(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| public void testDirectPathFallbackCreatesOneGrpcGcpLayerPerPath() { | ||||||||||||
| SpannerOptions.useEnvironment(new SpannerOptions.SpannerEnvironment() {}); | ||||||||||||
| GapicSpannerRpc rpc = null; | ||||||||||||
| try { | ||||||||||||
| SpannerOptions options = createDirectPathFallbackObjectCountOptions().build(); | ||||||||||||
| assumeTrue( | ||||||||||||
| "GCP fallback must be enabled for this DirectPath fallback test", | ||||||||||||
| options.isEnableGcpFallback()); | ||||||||||||
| GrpcGcpObjectCounts before = countGrpcGcpObjectsFromChannelz(); | ||||||||||||
| rpc = new GapicSpannerRpc(options); | ||||||||||||
| GrpcGcpObjectCounts counts = countGrpcGcpObjectsFromChannelz().minus(before); | ||||||||||||
| assertEquals(counts.debugString(), 6, counts.gcpManagedChannels); | ||||||||||||
| assertEquals(counts.debugString(), 48, counts.channelRefs); | ||||||||||||
| } finally { | ||||||||||||
| if (rpc != null) { | ||||||||||||
| rpc.shutdown(); | ||||||||||||
| } | ||||||||||||
| SpannerOptions.useDefaultEnvironment(); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| @Test | ||||||||||||
| public void testDirectPathFallbackWithGaxChannelPoolDoesNotCreateGrpcGcpChannelRefs() { | ||||||||||||
| SpannerOptions.useEnvironment(new SpannerOptions.SpannerEnvironment() {}); | ||||||||||||
| GapicSpannerRpc rpc = null; | ||||||||||||
| try { | ||||||||||||
| SpannerOptions options = | ||||||||||||
| createDirectPathFallbackObjectCountOptions().disableGrpcGcpExtension().build(); | ||||||||||||
| assumeTrue( | ||||||||||||
| "GCP fallback must be enabled for this DirectPath fallback test", | ||||||||||||
| options.isEnableGcpFallback()); | ||||||||||||
| GrpcGcpObjectCounts before = countGrpcGcpObjectsFromChannelz(); | ||||||||||||
| rpc = new GapicSpannerRpc(options); | ||||||||||||
| GrpcGcpObjectCounts counts = countGrpcGcpObjectsFromChannelz().minus(before); | ||||||||||||
| assertEquals(counts.debugString(), 0, counts.gcpManagedChannels); | ||||||||||||
| assertEquals(counts.debugString(), 0, counts.channelRefs); | ||||||||||||
| } finally { | ||||||||||||
| if (rpc != null) { | ||||||||||||
| rpc.shutdown(); | ||||||||||||
| } | ||||||||||||
| SpannerOptions.useDefaultEnvironment(); | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private SpannerOptions.Builder createDirectPathFallbackObjectCountOptions() { | ||||||||||||
| return SpannerOptions.newBuilder() | ||||||||||||
| .setProjectId("test-project") | ||||||||||||
| .setEnableDirectAccess(true) | ||||||||||||
| .setHost("http://localhost:1") | ||||||||||||
|
Comment on lines
+1484
to
+1485
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default is true |
||||||||||||
| .setCredentials(NoCredentials.getInstance()); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static GrpcGcpObjectCounts countGrpcGcpObjectsFromChannelz() { | ||||||||||||
| GrpcGcpObjectCounts counts = new GrpcGcpObjectCounts(); | ||||||||||||
| Object channelz = io.grpc.InternalChannelz.instance(); | ||||||||||||
| Set<Object> visited = Collections.newSetFromMap(new IdentityHashMap<>()); | ||||||||||||
| countGrpcGcpObjectsFromChannelzField(channelz, "rootChannels", visited, counts); | ||||||||||||
| countGrpcGcpObjectsFromChannelzField(channelz, "subchannels", visited, counts); | ||||||||||||
| return counts; | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static void countGrpcGcpObjectsFromChannelzField( | ||||||||||||
| Object channelz, String fieldName, Set<Object> visited, GrpcGcpObjectCounts counts) { | ||||||||||||
| try { | ||||||||||||
| java.lang.reflect.Field field = channelz.getClass().getDeclaredField(fieldName); | ||||||||||||
| field.setAccessible(true); | ||||||||||||
| countGrpcGcpObjects(field.get(channelz), visited, counts); | ||||||||||||
| } catch (RuntimeException | ReflectiveOperationException ignored) { | ||||||||||||
| // Ignore fields that are not reflectively accessible in this runtime. | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static void countGrpcGcpObjects( | ||||||||||||
| Object object, Set<Object> visited, GrpcGcpObjectCounts counts) { | ||||||||||||
| if (object == null || !visited.add(object)) { | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| if (object instanceof GcpManagedChannel) { | ||||||||||||
| counts.gcpManagedChannels++; | ||||||||||||
| } | ||||||||||||
| Class<?> clazz = object.getClass(); | ||||||||||||
| if (clazz.getName().equals(GRPC_GCP_CHANNEL_REF_CLASS_NAME)) { | ||||||||||||
| counts.channelRefs++; | ||||||||||||
| } | ||||||||||||
| if (object instanceof Collection<?>) { | ||||||||||||
| for (Object value : (Collection<?>) object) { | ||||||||||||
| countGrpcGcpObjects(value, visited, counts); | ||||||||||||
| } | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| if (object instanceof Map<?, ?>) { | ||||||||||||
| for (Map.Entry<?, ?> entry : ((Map<?, ?>) object).entrySet()) { | ||||||||||||
| countGrpcGcpObjects(entry.getKey(), visited, counts); | ||||||||||||
| countGrpcGcpObjects(entry.getValue(), visited, counts); | ||||||||||||
| } | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| if (clazz.isArray()) { | ||||||||||||
| int length = Array.getLength(object); | ||||||||||||
| for (int i = 0; i < length; i++) { | ||||||||||||
| countGrpcGcpObjects(Array.get(object, i), visited, counts); | ||||||||||||
| } | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| if (!shouldInspectFields(clazz)) { | ||||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| for (Class<?> current = clazz; current != null; current = current.getSuperclass()) { | ||||||||||||
| for (java.lang.reflect.Field field : current.getDeclaredFields()) { | ||||||||||||
| if (Modifier.isStatic(field.getModifiers())) { | ||||||||||||
| continue; | ||||||||||||
| } | ||||||||||||
| try { | ||||||||||||
| field.setAccessible(true); | ||||||||||||
| countGrpcGcpObjects(field.get(object), visited, counts); | ||||||||||||
| } catch (RuntimeException | IllegalAccessException ignored) { | ||||||||||||
| // Ignore fields that are not reflectively accessible in this runtime. | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| private static boolean shouldInspectFields(Class<?> clazz) { | ||||||||||||
| String name = clazz.getName(); | ||||||||||||
| return name.startsWith("com.google.") || name.startsWith("io.grpc."); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| static class TestableGapicSpannerRpc extends GapicSpannerRpc { | ||||||||||||
| public TestableGapicSpannerRpc(SpannerOptions options) { | ||||||||||||
| super(options); | ||||||||||||
|
|
||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method
createBaseChannelProviderBuilderis called here, but its definition is not included in the pull request. Additionally, the call site at line 371 (visible in context but not in the diff) still usescreateChannelProviderBuilder. IfcreateBaseChannelProviderBuilderis a new method intended to provide a base configuration without the GCP extension, please ensure its definition is included and consider if other call sites should also be updated for consistency.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's there in line 746 createBaseChannelProviderBuilder