Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ module github.com/tikv/pd

go 1.25.5

replace github.com/pingcap/kvproto => github.com/bufferflies/kvproto v0.0.0-20251225090614-a3753d45c0a7

// When you modify PD cooperatively with kvproto, this will be useful to submit the PR to PD and the PR to
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
Expand Down
1,674 changes: 1,665 additions & 9 deletions go.sum

Large diffs are not rendered by default.

26 changes: 24 additions & 2 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,14 @@ type RegionInfo struct {
queryStats *pdpb.QueryStats
flowRoundDivisor uint64
// buckets is not thread unsafe, it should be accessed by the request `report buckets` with greater version.
// todo: keep it compatible with previous design, we can remove it later.
buckets unsafe.Pointer
// source is used to indicate region's source, such as Storage/Sync/Heartbeat.
source RegionSource
// ref is used to indicate the reference count of the region in root-tree and sub-tree.
ref atomic.Int32
// bucketMeta is used to store the bucket meta reported by tikv.
bucketMeta *metapb.BucketMeta
}

// RegionSource is the source of region.
Expand Down Expand Up @@ -215,6 +218,7 @@ type RegionHeartbeatRequest interface {
GetQueryStats() *pdpb.QueryStats
GetApproximateSize() uint64
GetApproximateKeys() uint64
GetBucketMeta() *metapb.BucketMeta
}

// RegionFromHeartbeat constructs a Region from region heartbeat.
Expand Down Expand Up @@ -243,6 +247,7 @@ func RegionFromHeartbeat(heartbeat RegionHeartbeatRequest, flowRoundDivisor uint
queryStats: heartbeat.GetQueryStats(),
source: Heartbeat,
flowRoundDivisor: flowRoundDivisor,
bucketMeta: heartbeat.GetBucketMeta(),
}

// scheduling service doesn't need the following fields.
Expand Down Expand Up @@ -293,7 +298,8 @@ func (r *RegionInfo) Inherit(origin *RegionInfo, bucketEnable bool) {
r.approximateSize = EmptyRegionApproximateSize
}
}
if bucketEnable && origin != nil && r.buckets == nil {
// skip bucket meta update if tikv has report bucket meta.
if r.bucketMeta == nil && bucketEnable && origin != nil && r.buckets == nil {
r.buckets = origin.buckets
}
}
Expand Down Expand Up @@ -328,6 +334,7 @@ func (r *RegionInfo) Clone(opts ...RegionCreateOption) *RegionInfo {
replicationStatus: r.replicationStatus,
buckets: r.buckets,
queryStats: typeutil.DeepClone(r.queryStats, QueryStatsFactory),
bucketMeta: r.bucketMeta,
}

for _, opt := range opts {
Expand Down Expand Up @@ -619,7 +626,15 @@ func (r *RegionInfo) GetStat() *pdpb.RegionStat {
}
}

// UpdateBuckets sets the buckets of the region.
// SetBucketMeta sets the bucket meta of the region, used by region sync.
func (r *RegionInfo) SetBucketMeta(buckets *metapb.Buckets) {
r.bucketMeta = &metapb.BucketMeta{
Version: buckets.GetVersion(),
Keys: buckets.GetKeys(),
}
}

// UpdateBuckets sets the buckets of the region, used by bucket report.
func (r *RegionInfo) UpdateBuckets(buckets, old *metapb.Buckets) bool {
if buckets == nil {
atomic.StorePointer(&r.buckets, nil)
Expand All @@ -639,6 +654,13 @@ func (r *RegionInfo) GetBuckets() *metapb.Buckets {
if r == nil {
return nil
}
if meta := r.bucketMeta; meta != nil {
return &metapb.Buckets{
RegionId: r.GetID(),
Version: meta.GetVersion(),
Keys: meta.GetKeys(),
}
}
buckets := atomic.LoadPointer(&r.buckets)
return (*metapb.Buckets)(buckets)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1456,3 +1456,35 @@ func TestResetRegionCache(t *testing.T) {
re.Equal(1, regions.GetTotalRegionCount())
re.NotNil(regions.GetRegion(4))
}

func TestGetBucketMeta(t *testing.T) {
re := require.New(t)
region := NewTestRegionInfo(1, 1, []byte("a"), []byte("d"))
re.Nil(region.GetBuckets())
origin := NewTestRegionInfo(1, 2, []byte("a"), []byte("d"))
bucket := &metapb.Buckets{
RegionId: 100,
Version: 1,
Keys: [][]byte{[]byte("a"), []byte("b"), []byte("d")},
}
origin.UpdateBuckets(bucket, nil)
re.Equal(uint64(1), origin.GetBuckets().GetVersion())
region.Inherit(origin, true)
re.Equal(uint64(1), region.GetBuckets().GetVersion())

// Inherit false if region has bucket meta
bucket1 := &metapb.Buckets{
RegionId: 100,
Version: 2,
Keys: [][]byte{[]byte("a"), []byte("b"), []byte("d")},
}
re.True(origin.UpdateBuckets(bucket1, origin.GetBuckets()))
re.Equal(uint64(2), origin.GetBuckets().GetVersion())
region.bucketMeta = &metapb.BucketMeta{
Version: 1,
Keys: [][]byte{[]byte("a"), []byte("b"), []byte("d")},
}
region.Inherit(origin, true)
// Inherit false if region
re.Equal(uint64(1), region.GetBuckets().GetVersion())
}
1 change: 1 addition & 0 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
if hasBuckets {
if old := origin.GetBuckets(); buckets[i].GetVersion() > old.GetVersion() {
region.UpdateBuckets(buckets[i], old)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use SetBucketMeta, do we still need UpdateBuckets?

region.SetBucketMeta(buckets[i])
}
}
if saveKV {
Expand Down
1 change: 1 addition & 0 deletions tests/integrations/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/tests/integrations
go 1.25.5

replace (
github.com/pingcap/kvproto => github.com/bufferflies/kvproto v0.0.0-20251225090614-a3753d45c0a7
github.com/tikv/pd => ../../
github.com/tikv/pd/client => ../../client
github.com/tikv/pd/tests/integrations/mcs => ./mcs
Expand Down
Loading