Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions app/seidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
FlagSCSnapshotMinTimeInterval = "state-commit.sc-snapshot-min-time-interval"
FlagSCSnapshotWriterLimit = "state-commit.sc-snapshot-writer-limit"
FlagSCSnapshotPrefetchThreshold = "state-commit.sc-snapshot-prefetch-threshold"
FlagSCSnapshotWriteRateMBps = "state-commit.sc-snapshot-write-rate-mbps"
FlagSCCacheSize = "state-commit.sc-cache-size"
FlagSCOnlyAllowExportOnSnapshotVersion = "state-commit.sc-only-allow-export-on-snapshot-version"

Expand Down Expand Up @@ -90,6 +91,7 @@ func parseSCConfigs(appOpts servertypes.AppOptions) config.StateCommitConfig {
scConfig.SnapshotMinTimeInterval = cast.ToUint32(appOpts.Get(FlagSCSnapshotMinTimeInterval))
scConfig.SnapshotWriterLimit = cast.ToInt(appOpts.Get(FlagSCSnapshotWriterLimit))
scConfig.SnapshotPrefetchThreshold = cast.ToFloat64(appOpts.Get(FlagSCSnapshotPrefetchThreshold))
scConfig.SnapshotWriteRateMBps = cast.ToInt(appOpts.Get(FlagSCSnapshotWriteRateMBps))
scConfig.OnlyAllowExportOnSnapshotVersion = cast.ToBool(appOpts.Get(FlagSCOnlyAllowExportOnSnapshotVersion))
return scConfig
}
Expand Down
2 changes: 2 additions & 0 deletions app/seidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func (t TestSeiDBAppOpts) Get(s string) interface{} {
return config.DefaultStateCommitConfig().SnapshotWriterLimit
case FlagSCSnapshotPrefetchThreshold:
return config.DefaultStateCommitConfig().SnapshotPrefetchThreshold
case FlagSCSnapshotWriteRateMBps:
return config.DefaultStateCommitConfig().SnapshotWriteRateMBps
case FlagSSEnable:
return config.DefaultStateStoreConfig().Enable
case FlagSSBackend:
Expand Down
3 changes: 3 additions & 0 deletions sei-cosmos/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,10 @@ func GetConfig(v *viper.Viper) (Config, error) {
AsyncCommitBuffer: v.GetInt("state-commit.async-commit-buffer"),
SnapshotKeepRecent: v.GetUint32("state-commit.snapshot-keep-recent"),
SnapshotInterval: v.GetUint32("state-commit.snapshot-interval"),
SnapshotMinTimeInterval: v.GetUint32("state-commit.snapshot-min-time-interval"),
SnapshotWriterLimit: v.GetInt("state-commit.snapshot-writer-limit"),
SnapshotPrefetchThreshold: v.GetFloat64("state-commit.snapshot-prefetch-threshold"),
SnapshotWriteRateMBps: v.GetInt("state-commit.snapshot-write-rate-mbps"),
CacheSize: v.GetInt("state-commit.cache-size"),
OnlyAllowExportOnSnapshotVersion: v.GetBool("state-commit.only-allow-export-on-snapshot-version"),
},
Expand Down
6 changes: 6 additions & 0 deletions sei-db/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const (
DefaultSnapshotMinTimeInterval = 60 * 60 // 1 hour in seconds
DefaultAsyncCommitBuffer = 100
DefaultSnapshotPrefetchThreshold = 0.8 // prefetch if <80% pages in cache
DefaultSnapshotWriteRateMBps = 100 // 100 MB/s default
DefaultSSKeepRecent = 100000
DefaultSSPruneInterval = 600
DefaultSSImportWorkers = 1
Expand Down Expand Up @@ -56,6 +57,10 @@ type StateCommitConfig struct {
// Setting to 0 disables prefetching. Defaults to 0.8
SnapshotPrefetchThreshold float64 `mapstructure:"snapshot-prefetch-threshold"`

// SnapshotWriteRateMBps is the global snapshot write rate limit in MB/s.
// 0 = unlimited. Default 100.
SnapshotWriteRateMBps int `mapstructure:"snapshot-write-rate-mbps"`

// CacheSize defines the size of the cache for each memiavl store.
// Deprecated: this is removed, we will just rely on mmap page cache
CacheSize int `mapstructure:"cache-size"`
Expand Down Expand Up @@ -115,6 +120,7 @@ func DefaultStateCommitConfig() StateCommitConfig {
SnapshotKeepRecent: DefaultSnapshotKeepRecent,
SnapshotMinTimeInterval: DefaultSnapshotMinTimeInterval,
SnapshotPrefetchThreshold: DefaultSnapshotPrefetchThreshold,
SnapshotWriteRateMBps: DefaultSnapshotWriteRateMBps,
}
}

Expand Down
6 changes: 3 additions & 3 deletions sei-db/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@ sc-snapshot-interval = {{ .StateCommit.SnapshotInterval }}
# to allow more frequent snapshots during normal operation.
sc-snapshot-min-time-interval = {{ .StateCommit.SnapshotMinTimeInterval }}

# SnapshotWriterLimit defines the max concurrency for taking commit store snapshot
sc-snapshot-writer-limit = {{ .StateCommit.SnapshotWriterLimit }}

# SnapshotPrefetchThreshold defines the page cache residency threshold (0.0-1.0) to trigger snapshot prefetch.
# Prefetch sequentially reads nodes/leaves files into page cache for faster cold-start replay.
# Only active trees (evm/bank/acc) are prefetched, skipping sparse kv files to save memory.
# Skips prefetch if more than threshold of pages already resident (e.g., 0.8 = 80%).
# Setting to 0 disables prefetching. Defaults to 0.8
sc-snapshot-prefetch-threshold = {{ .StateCommit.SnapshotPrefetchThreshold }}

# Maximum snapshot write rate in MB/s (global across all trees). 0 = unlimited. Default 100.
sc-snapshot-write-rate-mbps = {{ .StateCommit.SnapshotWriteRateMBps }}

# OnlyAllowExportOnSnapshotVersion defines whether we only allow state sync
# snapshot creation happens after the memiavl snapshot is created.
sc-only-allow-export-on-snapshot-version = {{ .StateCommit.OnlyAllowExportOnSnapshotVersion }}
Expand Down
18 changes: 16 additions & 2 deletions sei-db/sc/memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ func (db *DB) RewriteSnapshot(ctx context.Context) error {
path := filepath.Clean(filepath.Join(db.dir, tmpDir))

writeStart := time.Now()
err := db.MultiTree.WriteSnapshot(ctx, path, db.snapshotWriterPool)
err := db.WriteSnapshotWithRateLimit(ctx, path, db.snapshotWriterPool, db.opts.SnapshotWriteRateMBps)
writeElapsed := time.Since(writeStart).Seconds()

if err != nil {
Expand Down Expand Up @@ -817,6 +817,20 @@ func (db *DB) rewriteSnapshotBackground() error {
ch <- snapshotResult{err: err}
return
}

// Switch mmap hints from SEQUENTIAL to RANDOM for tree operations.
// NewMmap() applies MADV_SEQUENTIAL by default for cold-start replay performance,
// but after loading we need MADV_RANDOM for random tree access patterns.
// Without this, the kernel aggressively discards accessed pages and does wrong-direction
// readahead, which is catastrophic on high-latency storage (e.g. NAS).
// This matches the behavior in OpenDB() which also calls PrepareForRandomRead().
for _, tree := range mtree.trees {
if tree.snapshot != nil {
tree.snapshot.nodesMap.PrepareForRandomRead()
tree.snapshot.leavesMap.PrepareForRandomRead()
}
}

cloned.logger.Info("loaded multitree after snapshot", "elapsed", time.Since(loadStart).Seconds())

// do a best effort catch-up, will do another final catch-up in main thread.
Expand Down Expand Up @@ -946,7 +960,7 @@ func (db *DB) WriteSnapshot(dir string) error {
db.mtx.Lock()
defer db.mtx.Unlock()

return db.MultiTree.WriteSnapshot(context.Background(), dir, db.snapshotWriterPool)
return db.WriteSnapshotWithRateLimit(context.Background(), dir, db.snapshotWriterPool, db.opts.SnapshotWriteRateMBps)
}

func snapshotName(version int64) string {
Expand Down
27 changes: 22 additions & 5 deletions sei-db/sc/memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/alitto/pond"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"

"github.com/cosmos/iavl"
"github.com/sei-protocol/sei-db/common/errors"
Expand Down Expand Up @@ -412,19 +413,35 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio
}

func (t *MultiTree) WriteSnapshot(ctx context.Context, dir string, wp *pond.WorkerPool) error {
t.logger.Info("starting snapshot write", "trees", len(t.trees))
return t.WriteSnapshotWithRateLimit(ctx, dir, wp, 0)
}

// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting.
// rateMBps is the rate limit in MB/s. 0 means unlimited.
// A single global limiter is shared across ALL trees and files to ensure
// the total write rate is capped at the configured value.
func (t *MultiTree) WriteSnapshotWithRateLimit(ctx context.Context, dir string, wp *pond.WorkerPool, rateMBps int) error {
t.logger.Info("starting snapshot write", "trees", len(t.trees), "rate_limit_mbps", rateMBps)

if err := os.MkdirAll(dir, os.ModePerm); err != nil { //nolint:gosec
return err
}

// Create a single global limiter shared by all trees and files
// This ensures total write rate is capped regardless of parallelism
limiter := NewGlobalRateLimiter(rateMBps)
if limiter != nil {
t.logger.Info("global rate limiting enabled", "rate_mbps", rateMBps)
}

// Write EVM first to avoid disk I/O contention, then parallel
return t.writeSnapshotPriorityEVM(ctx, dir, wp)
return t.writeSnapshotPriorityEVM(ctx, dir, wp, limiter)
}

// writeSnapshotPriorityEVM writes EVM tree first, then others in parallel
// Best strategy: reduces disk I/O contention for the largest tree
func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool) error {
// limiter is a shared rate limiter. nil means unlimited.
func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp *pond.WorkerPool, limiter *rate.Limiter) error {
startTime := time.Now()

// Phase 1: Write EVM tree first (if it exists)
Expand All @@ -444,7 +461,7 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp
if evmTree != nil {
t.logger.Info("writing evm tree", "phase", "1/2")
evmStart := time.Now()
if err := evmTree.WriteSnapshot(ctx, filepath.Join(dir, evmName)); err != nil {
if err := evmTree.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, evmName), limiter); err != nil {
return err
}
evmElapsed := time.Since(evmStart).Seconds()
Expand All @@ -471,7 +488,7 @@ func (t *MultiTree) writeSnapshotPriorityEVM(ctx context.Context, dir string, wp
wg.Add(1)
wp.Submit(func() {
defer wg.Done()
if err := entry.Tree.WriteSnapshot(ctx, filepath.Join(dir, entry.Name)); err != nil {
if err := entry.WriteSnapshotWithRateLimit(ctx, filepath.Join(dir, entry.Name), limiter); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("tree %s: %w", entry.Name, err))
mu.Unlock()
Expand Down
8 changes: 8 additions & 0 deletions sei-db/sc/memiavl/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type Options struct {
// Minimum time interval between snapshots
// This prevents excessive snapshot creation during catch-up. Default is 1 hour.
SnapshotMinTimeInterval time.Duration

// SnapshotWriteRateMBps is the global snapshot write rate limit in MB/s.
// 0 = unlimited. Default 100.
SnapshotWriteRateMBps int
}

func (opts Options) Validate() error {
Expand Down Expand Up @@ -75,6 +79,10 @@ func (opts *Options) FillDefaults() {
opts.SnapshotMinTimeInterval = 1 * time.Hour
}

if opts.SnapshotWriteRateMBps <= 0 {
opts.SnapshotWriteRateMBps = config.DefaultSnapshotWriteRateMBps
}

opts.PrefetchThreshold = 0.8
opts.Logger = logger.NewNopLogger()
opts.SnapshotKeepRecent = config.DefaultSnapshotKeepRecent
Expand Down
93 changes: 84 additions & 9 deletions sei-db/sc/memiavl/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ import (
"time"
"unsafe"

"github.com/sei-protocol/sei-db/common/errors"
"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/sc/types"
"golang.org/x/sys/unix"

"github.com/sei-protocol/sei-db/common/errors"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -54,6 +54,66 @@ func (w *monitoringWriter) Write(p []byte) (n int, err error) {
return n, err
}

// rateLimitedWriter wraps an io.Writer with rate limiting to prevent
// page cache eviction on machines with limited RAM.
type rateLimitedWriter struct {
w io.Writer
limiter *rate.Limiter
ctx context.Context
}

// NewGlobalRateLimiter creates a shared rate limiter for snapshot writes.
// rateMBps is the rate limit in MB/s. If <= 0, returns nil (no limit).
// This limiter should be shared across all files and trees in a single snapshot operation.
func NewGlobalRateLimiter(rateMBps int) *rate.Limiter {
if rateMBps <= 0 {
return nil
}
const mb = 1024 * 1024
bytesPerSec := rate.Limit(rateMBps * mb)
// Burst = 4MB: small enough to spread large bufio flushes (128MB) across
// many smaller IO ops, preventing page cache eviction spikes.
burstBytes := 4 * mb
return rate.NewLimiter(bytesPerSec, burstBytes)
}

// newRateLimitedWriter creates a rate-limited writer with a shared limiter.
// If limiter is nil, returns the original writer (no limit).
func newRateLimitedWriter(ctx context.Context, w io.Writer, limiter *rate.Limiter) io.Writer {
if limiter == nil {
return w
}
return &rateLimitedWriter{
w: w,
limiter: limiter,
ctx: ctx,
}
}

func (w *rateLimitedWriter) Write(p []byte) (n int, err error) {
// Wait for rate limiter before writing
// For large writes, we may need to wait multiple times
remaining := len(p)
written := 0
for remaining > 0 {
// Limit each wait to burst size to avoid very long waits
toWrite := remaining
if toWrite > w.limiter.Burst() {
toWrite = w.limiter.Burst()
}
if err := w.limiter.WaitN(w.ctx, toWrite); err != nil {
return written, err
}
n, err := w.w.Write(p[written : written+toWrite])
written += n
remaining -= n
if err != nil {
return written, err
}
}
return written, nil
}

// Snapshot manage the lifecycle of mmap-ed files for the snapshot,
// it must out live the objects that derived from it.
type Snapshot struct {
Expand Down Expand Up @@ -391,6 +451,12 @@ func (snapshot *Snapshot) export(callback func(*types.SnapshotNode) bool) {
}

func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
return t.WriteSnapshotWithRateLimit(ctx, snapshotDir, nil)
}

// WriteSnapshotWithRateLimit writes snapshot with optional rate limiting.
// limiter is a shared rate limiter. nil means unlimited.
func (t *Tree) WriteSnapshotWithRateLimit(ctx context.Context, snapshotDir string, limiter *rate.Limiter) error {
// Estimate tree size: root.Size() returns leaf count, total = leaves + branches ≈ 2x
treeSize := int64(0)
if t.root != nil {
Expand All @@ -400,7 +466,7 @@ func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
// Use 128MB buffer for all trees (large buffer for better performance)
bufSize := bufIOSize

err := writeSnapshotWithBuffer(ctx, snapshotDir, t.version, bufSize, treeSize, t.logger, func(w *snapshotWriter) (uint32, error) {
err := writeSnapshotWithBuffer(ctx, snapshotDir, t.version, bufSize, treeSize, limiter, t.logger, func(w *snapshotWriter) (uint32, error) {
if t.root == nil {
return 0, nil
}
Expand All @@ -418,12 +484,14 @@ func (t *Tree) WriteSnapshot(ctx context.Context, snapshotDir string) error {
return nil
}

// writeSnapshotWithBuffer writes snapshot with specified buffer size
// writeSnapshotWithBuffer writes snapshot with specified buffer size and optional rate limiting.
// limiter is a shared rate limiter. nil means unlimited.
func writeSnapshotWithBuffer(
ctx context.Context,
dir string, version uint32,
bufSize int,
totalNodes int64,
limiter *rate.Limiter,
log logger.Logger,
doWrite func(*snapshotWriter) (uint32, error),
) (returnErr error) {
Expand Down Expand Up @@ -470,10 +538,17 @@ func writeSnapshotWithBuffer(
leavesMonitor := &monitoringWriter{f: fpLeaves}
kvsMonitor := &monitoringWriter{f: fpKVs}

// Apply rate limiting if configured (shared limiter across all files)
// This ensures total write rate is capped regardless of file count
var nodesRateLimited, leavesRateLimited, kvsRateLimited io.Writer
nodesRateLimited = newRateLimitedWriter(ctx, nodesMonitor, limiter)
leavesRateLimited = newRateLimitedWriter(ctx, leavesMonitor, limiter)
kvsRateLimited = newRateLimitedWriter(ctx, kvsMonitor, limiter)

// Create buffered writers with buffers
nodesWriter := bufio.NewWriterSize(nodesMonitor, bufSize)
leavesWriter := bufio.NewWriterSize(leavesMonitor, bufSize)
kvsWriter := bufio.NewWriterSize(kvsMonitor, bufSize)
nodesWriter := bufio.NewWriterSize(nodesRateLimited, bufSize)
leavesWriter := bufio.NewWriterSize(leavesRateLimited, bufSize)
kvsWriter := bufio.NewWriterSize(kvsRateLimited, bufSize)

w := newSnapshotWriter(ctx, nodesWriter, leavesWriter, kvsWriter, log)
w.treeName = filepath.Base(dir) // Set tree name for progress reporting
Expand Down Expand Up @@ -547,8 +622,8 @@ func writeSnapshot(
dir string, version uint32,
doWrite func(*snapshotWriter) (uint32, error),
) error {
// Use nop logger for backward compatibility
return writeSnapshotWithBuffer(ctx, dir, version, bufIOSize, 0, logger.NewNopLogger(), doWrite)
// Use nop logger and no rate limit for backward compatibility
return writeSnapshotWithBuffer(ctx, dir, version, bufIOSize, 0, nil, logger.NewNopLogger(), doWrite)
}

// kvWriteOp represents a key-value write operation
Expand Down
Loading
Loading