Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions pkg/utils/testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,7 @@ func NewRequestHeader(clusterID uint64) *pdpb.RequestHeader {
}

// MustNewGrpcClient must create a new PD grpc client.
func MustNewGrpcClient(re *require.Assertions, addr string) pdpb.PDClient {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a new function as close for pdClient, it will close this conn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we just create a gRPC client instead of pd client.

conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithTransportCredentials(insecure.NewCredentials()))
re.NoError(err)
return pdpb.NewPDClient(conn)
}

// MustNewGrpcClientWithConn must create a new PD grpc client and return both client and connection.
func MustNewGrpcClientWithConn(re *require.Assertions, addr string) (pdpb.PDClient, *grpc.ClientConn) {
func MustNewGrpcClient(re *require.Assertions, addr string) (pdpb.PDClient, *grpc.ClientConn) {
conn, err := grpc.Dial(strings.TrimPrefix(addr, "http://"), grpc.WithTransportCredentials(insecure.NewCredentials()))
re.NoError(err)
return pdpb.NewPDClient(conn), conn
Expand Down
25 changes: 19 additions & 6 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/goleak"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand Down Expand Up @@ -503,11 +504,17 @@ func (suite *followerForwardAndHandleTestSuite) SetupSuite() {
suite.endpoints = runServer(re, cluster)
re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, leader.GetAddr())
defer conn.Close()
suite.regionID = regionIDAllocator.alloc()
testutil.Eventually(re, func() bool {
regionHeartbeat, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
if err != nil {
return false
}
defer func() {
_ = stream.CloseSend()
}()
region := &metapb.Region{
Id: suite.regionID,
RegionEpoch: &metapb.RegionEpoch{
Expand All @@ -521,9 +528,9 @@ func (suite *followerForwardAndHandleTestSuite) SetupSuite() {
Region: region,
Leader: peers[0],
}
err = regionHeartbeat.Send(req)
err = stream.Send(req)
re.NoError(err)
_, err = regionHeartbeat.Recv()
_, err = stream.Recv()
return err == nil
})
}
Expand Down Expand Up @@ -1040,6 +1047,7 @@ type clientTestSuiteImpl struct {
grpcSvr *server.GrpcServer
client pd.Client
grpcPDClient pdpb.PDClient
conn *grpc.ClientConn
regionHeartbeat pdpb.PD_RegionHeartbeatClient
reportBucket pdpb.PD_ReportBucketsClient
}
Expand All @@ -1049,7 +1057,7 @@ func (suite *clientTestSuiteImpl) setup() {
re := suite.Require()
suite.srv, suite.cleanup, err = tests.NewServer(re, assertutil.CheckerWithNilAssert(re))
re.NoError(err)
suite.grpcPDClient = testutil.MustNewGrpcClient(re, suite.srv.GetAddr())
suite.grpcPDClient, suite.conn = testutil.MustNewGrpcClient(re, suite.srv.GetAddr())
suite.grpcSvr = &server.GrpcServer{Server: suite.srv}

tests.MustWaitLeader(re, []*server.Server{suite.srv})
Expand Down Expand Up @@ -1092,6 +1100,11 @@ func (suite *clientTestSuiteImpl) setup() {

func (suite *clientTestSuiteImpl) tearDown() {
suite.client.Close()
_ = suite.regionHeartbeat.CloseSend()
_ = suite.reportBucket.CloseSend()
if suite.conn != nil {
_ = suite.conn.Close()
}
suite.clean()
suite.cleanup()
}
Expand Down
9 changes: 8 additions & 1 deletion tests/integrations/client/router_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -56,6 +57,7 @@ type routerClientSuite struct {
cluster *tests.TestCluster
client pd.Client
grpcPDClient pdpb.PDClient
conn *grpc.ClientConn
regionHeartbeat pdpb.PD_RegionHeartbeatClient
reportBucket pdpb.PD_ReportBucketsClient

Expand All @@ -73,7 +75,7 @@ func (suite *routerClientSuite) SetupSuite() {

re.NotEmpty(suite.cluster.WaitLeader())
leader := suite.cluster.GetLeaderServer()
suite.grpcPDClient = testutil.MustNewGrpcClient(re, leader.GetAddr())
suite.grpcPDClient, suite.conn = testutil.MustNewGrpcClient(re, leader.GetAddr())
suite.client = setupCli(suite.ctx, re, endpoints,
opt.WithEnableRouterClient(suite.routerClientEnabled),
opt.WithEnableFollowerHandle(true))
Expand All @@ -90,6 +92,11 @@ func (suite *routerClientSuite) SetupSuite() {
// TearDownSuite cleans up the test cluster and client.
func (suite *routerClientSuite) TearDownSuite() {
suite.client.Close()
_ = suite.regionHeartbeat.CloseSend()
_ = suite.reportBucket.CloseSend()
if suite.conn != nil {
_ = suite.conn.Close()
}
suite.clean()
suite.cluster.Destroy()
}
Expand Down
51 changes: 43 additions & 8 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,13 @@ func (suite *serverTestSuite) TestForwardRegionHeartbeat() {
re.Empty(resp.GetHeader().GetError())
}

grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()
peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
{Id: 22, StoreId: 2},
Expand Down Expand Up @@ -626,11 +630,15 @@ func (suite *serverTestSuite) TestForwardReportBuckets() {
re.Empty(resp.GetHeader().GetError())
}

grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()

// First create a region via region heartbeat
heartbeatStream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = heartbeatStream.CloseSend()
}()
peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
{Id: 22, StoreId: 2},
Expand Down Expand Up @@ -665,6 +673,9 @@ func (suite *serverTestSuite) TestForwardReportBuckets() {
// Now test ReportBuckets forwarding with multiple requests
bucketStream, err := grpcPDClient.ReportBuckets(suite.ctx)
re.NoError(err)
defer func() {
_ = bucketStream.CloseSend()
}()

// Send multiple bucket reports to test streaming
for version := uint64(1); version <= 3; version++ {
Expand Down Expand Up @@ -745,7 +756,8 @@ func (suite *serverTestSuite) TestStoreLimit() {
conf.MaxReplicas = 1
err = leaderServer.SetReplicationConfig(*conf)
re.NoError(err)
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
for i := uint64(1); i <= 2; i++ {
resp, err := grpcPDClient.PutStore(
context.Background(), &pdpb.PutStoreRequest{
Expand All @@ -764,6 +776,9 @@ func (suite *serverTestSuite) TestStoreLimit() {

stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()
for i := uint64(2); i <= 10; i++ {
peers := []*metapb.Peer{{Id: i, StoreId: 1}}
if len(peers) == 0 {
Expand Down Expand Up @@ -998,9 +1013,13 @@ func (suite *serverTestSuite) TestPrepareChecker() {
}

// Send enough regions to satisfy the prepare checker in the initial cluster
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()

peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
Expand Down Expand Up @@ -1215,9 +1234,13 @@ func (suite *serverTestSuite) TestBatchSplit() {
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()
peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
{Id: 22, StoreId: 2},
Expand Down Expand Up @@ -1292,9 +1315,13 @@ func (suite *serverTestSuite) TestBatchSplitCompatibility() {
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()
peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
{Id: 22, StoreId: 2},
Expand Down Expand Up @@ -1435,9 +1462,13 @@ func (suite *serverTestSuite) TestConcurrentBatchSplit() {
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
}
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()
peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
{Id: 22, StoreId: 2},
Expand Down Expand Up @@ -1561,9 +1592,13 @@ func (suite *serverTestSuite) TestForwardSplitRegion() {
}

// Create a region via region heartbeat
grpcPDClient := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, suite.pdLeader.GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(suite.ctx)
re.NoError(err)
defer func() {
_ = stream.CloseSend()
}()

peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
Expand Down
6 changes: 5 additions & 1 deletion tests/integrations/tso/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type tsoConsistencyTestSuite struct {
tsoClientConn *grpc.ClientConn

pdClient pdpb.PDClient
conn *grpc.ClientConn
tsoClient tsopb.TSOClient
}

Expand Down Expand Up @@ -87,7 +88,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() {
re.NoError(err)
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = testutil.MustNewGrpcClient(re, backendEndpoints)
suite.pdClient, suite.conn = testutil.MustNewGrpcClient(re, backendEndpoints)
} else {
suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc())
suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr())
Expand All @@ -100,6 +101,9 @@ func (suite *tsoConsistencyTestSuite) TearDownSuite() {
suite.tsoClientConn.Close()
suite.tsoServerCleanup()
}
if suite.conn != nil {
suite.conn.Close()
}
suite.cluster.Destroy()
}

Expand Down
6 changes: 5 additions & 1 deletion tests/integrations/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type tsoServerTestSuite struct {
tsoClientConn *grpc.ClientConn

pdClient pdpb.PDClient
conn *grpc.ClientConn
tsoClient tsopb.TSOClient
}

Expand Down Expand Up @@ -86,7 +87,7 @@ func (suite *tsoServerTestSuite) SetupSuite() {
re.NoError(err)
backendEndpoints := suite.pdLeaderServer.GetAddr()
if suite.legacy {
suite.pdClient = testutil.MustNewGrpcClient(re, backendEndpoints)
suite.pdClient, suite.conn = testutil.MustNewGrpcClient(re, backendEndpoints)
} else {
suite.tsoServer, suite.tsoServerCleanup = tests.StartSingleTSOTestServer(suite.ctx, re, backendEndpoints, tempurl.Alloc())
suite.tsoClientConn, suite.tsoClient = tso.MustNewGrpcClient(re, suite.tsoServer.GetAddr())
Expand All @@ -103,6 +104,9 @@ func (suite *tsoServerTestSuite) TearDownSuite() {
suite.tsoClientConn.Close()
suite.tsoServerCleanup()
}
if suite.conn != nil {
suite.conn.Close()
}
suite.cluster.Destroy()
}

Expand Down
2 changes: 1 addition & 1 deletion tests/scheduling_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (tc *TestSchedulingCluster) WaitForPrimaryServing(re *require.Assertions) *
return tc.pd.GetLeaderServer().GetRaftCluster().IsServiceIndependent(constant.SchedulingServiceName)
})
// send a heartbeat immediately to make prepare checker pass
grpcPDClient, conn := testutil.MustNewGrpcClientWithConn(re, tc.pd.GetLeaderServer().GetServer().GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, tc.pd.GetLeaderServer().GetServer().GetAddr())
defer conn.Close()
stream, err := grpcPDClient.RegionHeartbeat(tc.ctx)
re.NoError(err)
Expand Down
9 changes: 6 additions & 3 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,8 @@ func TestRemovingProgress(t *testing.T) {

re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, leader.GetAddr())
defer conn.Close()
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Expand Down Expand Up @@ -1153,7 +1154,8 @@ func TestSendApiWhenRestartRaftCluster(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()

grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, leader.GetAddr())
defer conn.Close()
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Expand Down Expand Up @@ -1197,7 +1199,8 @@ func TestPreparingProgress(t *testing.T) {

re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leader.GetAddr())
grpcPDClient, conn := testutil.MustNewGrpcClient(re, leader.GetAddr())
defer conn.Close()
clusterID := leader.GetClusterID()
req := &pdpb.BootstrapRequest{
Header: testutil.NewRequestHeader(clusterID),
Expand Down
Loading