diff --git a/go/worker/storage/committee/diffsync/fetcher.go b/go/worker/storage/committee/diffsync/fetcher.go new file mode 100644 index 00000000000..c017620a8ef --- /dev/null +++ b/go/worker/storage/committee/diffsync/fetcher.go @@ -0,0 +1,394 @@ +package diffsync + +import ( + "container/heap" + "context" + "fmt" + "sync" + "time" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/p2p/rpc" + "github.com/oasisprotocol/oasis-core/go/roothash/api/block" + "github.com/oasisprotocol/oasis-core/go/runtime/history" + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" + "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" +) + +type P2PFetcher struct { + logger *logging.Logger + history history.History + diffSync diffsync.Client + legacyStorageSync synclegacy.Client + undefinedRound uint64 + lastFullyApplied uint64 + nextCh chan Diff + acceptCh chan struct{} + rejectCh chan struct{} + rejectTaskCh chan fetchTask + diffCh chan diffRes + fetcherCount uint +} + +func NewP2PFetcher( + history history.History, + diffSync diffsync.Client, + legacyStorageSync synclegacy.Client, + lastFullyApplied, + undefinedRound uint64, + fetcherCount uint, +) *P2PFetcher { + return &P2PFetcher{ + logger: logging.GetLogger("worker/storage/committee/fetcher").With("runtime_id", history.RuntimeID()), + history: history, + diffSync: diffSync, + legacyStorageSync: legacyStorageSync, + undefinedRound: undefinedRound, + lastFullyApplied: lastFullyApplied, + nextCh: make(chan Diff), + diffCh: make(chan diffRes), + rejectCh: make(chan struct{}, 1), + acceptCh: make(chan struct{}, 1), + rejectTaskCh: make(chan fetchTask), // TODO consider adding more. + fetcherCount: fetcherCount, + } +} + +// Next returns whean a next storage diff is ready to be applied. +// +// Invariants: +// - A call to Next is blocking. +// - It is not safe to call next before either Acceept and or Reject. +// - The order of storage and IO diffs for a single round is not guaranteed. +func (f *P2PFetcher) Next(ctx context.Context) (Diff, error) { + for { + select { + case <-ctx.Done(): + return Diff{}, ctx.Err() + case diff, ok := <-f.nextCh: + if !ok { + return Diff{}, fmt.Errorf("fetcher closed") + } + return diff, nil + } + } +} + +// Accept accepts a storage diff obtained via call to Next. +func (f *P2PFetcher) Accept() { + select { + case f.acceptCh <- struct{}{}: + default: + } +} + +// Reject rejects a storage diff obtained via call to Next. +func (f *P2PFetcher) Reject() { + select { + case f.rejectCh <- struct{}{}: + default: + } +} + +func (f *P2PFetcher) Serve(ctx context.Context) error { + var ( + lastDiff diffRes + acceptedCount int + pendingAck bool + ) + pendingApply := &minRoundQueue{} + + lastFullyApplied := f.lastFullyApplied + + var wg sync.WaitGroup + defer wg.Wait() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + producer, taskCh := newTaskProducer(f.history, f.logger, lastFullyApplied, f.undefinedRound, f.fetcherCount) + wg.Go(func() { + if err := producer.serve(ctx); err != nil { + f.logger.Error("producer failed", "err", err) + } + }) + + for i := uint(0); i < f.fetcherCount; i++ { + wg.Go(func() { + for task := range taskCh { + res, _ := f.fetchWithRetry(ctx, task) // TODO handle the error or omit it. + select { + case f.diffCh <- res: + case <-ctx.Done(): + } + } + }) + } + + wg.Go(func() { + for { + select { + case <-ctx.Done(): + return + case task := <-f.rejectTaskCh: + res, _ := f.fetchWithRetry(ctx, task) // TODO handle the error or omit it. + select { + case f.diffCh <- res: + case <-ctx.Done(): + } + } + } + }) + + trySendingNextForApply := func() { + if pendingAck { + return + } + hasNext := pendingApply.Len() > 0 && lastFullyApplied+1 == (*pendingApply)[0].round + if !hasNext { + return + } + pendingAck = true + lastDiff = heap.Pop(pendingApply).(diffRes) + wg.Go(func() { + select { + case <-ctx.Done(): + return + case f.nextCh <- Diff{ + round: lastDiff.round, + prevRoot: lastDiff.prevRoot, + thisRoot: lastDiff.thisRoot, + writeLog: lastDiff.writeLog}: + } + }) + } + + for { + select { // For optimal performance no case should be blocking. + case <-ctx.Done(): + return ctx.Err() + case diff := <-f.diffCh: + heap.Push(pendingApply, diff) + trySendingNextForApply() + case <-f.rejectCh: + lastDiff.pf.RecordBadPeer() + pendingAck = false + select { + case <-ctx.Done(): + return ctx.Err() + case f.rejectTaskCh <- lastDiff.fetchTask: + } + case <-f.acceptCh: + lastDiff.pf.RecordSuccess() + acceptedCount++ + if acceptedCount == 2 { + lastFullyApplied++ + acceptedCount = 0 + } + pendingAck = false + trySendingNextForApply() + } + } +} + +// TODO add backoff. +func (f *P2PFetcher) fetchWithRetry(ctx context.Context, task fetchTask) (diffRes, error) { + result := diffRes{ + fetchTask: task, + pf: rpc.NewNopPeerFeedback(), + } + + if task.thisRoot.Hash.Equal(&task.prevRoot.Hash) { + result.writeLog = storageApi.WriteLog{} + return result, nil + } + + for { + select { + case <-ctx.Done(): + return result, ctx.Err() + default: + } + + wl, pf, err := f.getDiff(ctx, task.prevRoot, task.thisRoot) + if err != nil { + f.logger.Error("failed to fetch storage diff", "err", err) + continue + } + result.pf = pf + result.writeLog = wl + return result, err + } +} + +// getDiff fetches writelog using diff sync p2p protocol client. +// +// In case of no peers or error, it fallbacks to the legacy storage sync protocol. +func (f *P2PFetcher) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { + f.logger.Debug("calling GetDiff", + "old_root", prevRoot, + "new_root", thisRoot, + ) + + // diffResponseTimeout is the maximum time for fetching storage diff from the peer. + const diffResponseTimeout = 15 * time.Second + + ctx, cancel := context.WithTimeout(ctx, diffResponseTimeout) + defer cancel() + rsp1, pf, err := f.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err == nil { // if NO error + return rsp1.WriteLog, pf, nil + } + + ctx, cancel = context.WithTimeout(ctx, diffResponseTimeout) + defer cancel() + rsp2, pf, err := f.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) + if err != nil { + return nil, nil, err + } + return rsp2.WriteLog, pf, nil +} + +type fetchTask struct { + round uint64 + prevRoot storageApi.Root + thisRoot storageApi.Root +} + +type taskProducer struct { + history history.History + logger *logging.Logger + undefinedRound uint64 + lastEnqueued uint64 + prevStateRoot storageApi.Root + tasks chan fetchTask +} + +func newTaskProducer( + history history.History, + logger *logging.Logger, + lastFullyApplied uint64, + undefinedRound uint64, + queueSize uint, +) (*taskProducer, <-chan fetchTask) { + producer := &taskProducer{ + history: history, + logger: logger, + undefinedRound: undefinedRound, + lastEnqueued: lastFullyApplied, + tasks: make(chan fetchTask, queueSize), + } + return producer, producer.tasks +} + +func (p *taskProducer) serve(ctx context.Context) error { + blkCh, sub, err := p.history.WatchCommittedBlocks() + if err != nil { + return fmt.Errorf("subscribing to commited blocks: %w", err) + } + if err := p.initState(ctx); err != nil { + sub.Close() + return err + } + + defer close(p.tasks) + defer sub.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case next, ok := <-blkCh: + if !ok { + return nil + } + p.logger.Debug("producer received new block", "blk", next.Block) + if err := p.fillUntil(ctx, next.Block); err != nil { + p.logger.Warn("failed to fill fetch task gap: %w", err) + } + } + } +} + +func (p *taskProducer) initState(ctx context.Context) error { + if p.lastEnqueued == p.undefinedRound { + p.prevStateRoot = storageApi.Root{ + Namespace: p.history.RuntimeID(), + Version: p.lastEnqueued + 1, + Type: storageApi.RootTypeState, + } + p.prevStateRoot.Empty() + return nil + } + + blk, err := p.history.GetCommittedBlock(ctx, p.lastEnqueued) + if err != nil { + return fmt.Errorf("get history block (round: %d): %w", p.lastEnqueued, err) + } + p.prevStateRoot = blk.Header.StorageRootState() + return nil +} + +func (p *taskProducer) enqueue(ctx context.Context, blk *block.Block) { + emit := func(task fetchTask) { + select { + case <-ctx.Done(): + return + case p.tasks <- task: + p.logger.Debug("enqueued new fetch task", "task", task) + } + } + + thisIORoot := blk.Header.StorageRootIO() + prevIORoot := thisIORoot + prevIORoot.Hash.Empty() + emit(fetchTask{blk.Header.Round, prevIORoot, thisIORoot}) + + thisStateRoot := blk.Header.StorageRootState() + emit(fetchTask{blk.Header.Round, p.prevStateRoot, thisStateRoot}) + p.prevStateRoot = thisStateRoot + + p.lastEnqueued = blk.Header.Round +} + +func (p *taskProducer) fillUntil(ctx context.Context, blk *block.Block) error { + for r := p.lastEnqueued + 1; r < blk.Header.Round; r++ { + blk, err := p.history.GetCommittedBlock(ctx, r) + if err != nil { + return fmt.Errorf("failed to get light block (round: %d): %w", r, err) + } + p.enqueue(ctx, blk) + } + p.enqueue(ctx, blk) + return nil +} + +// minRoundQueue is a round-based min priority queue. +type minRoundQueue []diffRes + +// Sorting interface. +func (q minRoundQueue) Len() int { return len(q) } +func (q minRoundQueue) Less(i, j int) bool { return q[i].round < q[j].round } +func (q minRoundQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] } + +// Push appends x as the last element in the heap's array. +func (q *minRoundQueue) Push(x any) { + *q = append(*q, x.(diffRes)) +} + +// Pop removes and returns the last element in the heap's array. +func (q *minRoundQueue) Pop() any { + old := *q + n := len(old) + x := old[n-1] + *q = old[0 : n-1] + return x +} + +// diffRes has all the context needed for a single GetDiff operation. +type diffRes struct { + fetchTask + pf rpc.PeerFeedback + writeLog storageApi.WriteLog +} diff --git a/go/worker/storage/committee/diffsync/worker.go b/go/worker/storage/committee/diffsync/worker.go new file mode 100644 index 00000000000..6e431b7a12e --- /dev/null +++ b/go/worker/storage/committee/diffsync/worker.go @@ -0,0 +1,174 @@ +package diffsync + +import ( + "context" + "errors" + "fmt" + + storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" + dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" + "golang.org/x/sync/errgroup" + + "github.com/oasisprotocol/oasis-core/go/common/logging" + "github.com/oasisprotocol/oasis-core/go/runtime/history" +) + +type Fetcher interface { + Next(ctx context.Context) (Diff, error) + Accept() + Reject() +} + +// Diff has a writelog, so that applying it to the state with prevRoot +// produces a new state that corresponds to thisRoot. +type Diff struct { + round uint64 + prevRoot storageApi.Root + thisRoot storageApi.Root + writeLog storageApi.WriteLog +} + +type Finalized struct { + Round uint64 + Roots []storageApi.Root +} + +type Worker struct { + logger *logging.Logger + fetcher Fetcher + localStorage storageApi.LocalBackend + updates chan Finalized +} + +// New creates a new diff sync worker. +// +// Fetcher implements fetching storage diffs, that are then applied ot the localStorage. +func New(history history.History, localStorage storageApi.LocalBackend, fetcher Fetcher) *Worker { + return &Worker{ + logger: logging.GetLogger("worker/storage/committee/diffsync").With("runtime_id", history.RuntimeID()), + localStorage: localStorage, + fetcher: fetcher, + updates: make(chan Finalized, 1), + } +} + +func (w *Worker) Updates() <-chan Finalized { + return w.updates +} + +// Serve fetches, applies and finalizes storage diffs. +func (w *Worker) Serve(ctx context.Context) error { + defer close(w.updates) + + w.logger.Info("starting") + defer w.logger.Info("stopping") + + g, ctx := errgroup.WithContext(ctx) + + appliedCh := make(chan []storageApi.Root, dbApi.MaxPendingVersions) + defer close(appliedCh) + + // Apply. + g.Go(func() error { + var applied []storageApi.Root + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + diff, err := w.fetcher.Next(ctx) + if err != nil { + w.logger.Error("fetcher failed to fetch next diff", "err", err) + continue + } + + // If thisRoot already exists then localStorage.Apply ignores request. + // Consider writting a test for this assumption, so that if Apply changes + // we catch semantic change. + if err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ + Namespace: diff.thisRoot.Namespace, + RootType: diff.thisRoot.Type, + SrcRound: diff.prevRoot.Version, + SrcRoot: diff.prevRoot.Hash, + DstRound: diff.thisRoot.Version, + DstRoot: diff.thisRoot.Hash, + WriteLog: diff.writeLog, + }); err != nil { + w.logger.Error("failed to apply storage diff", "err", err) + w.fetcher.Reject() + continue + // TODO error handling has now changed. + } + + w.logger.Debug("applied", "root", diff.thisRoot) + w.fetcher.Accept() + applied = append(applied, diff.thisRoot) + + // The worker expected that two storage roots are always finalized. + // Maybe it should not. + if len(applied) < 2 { + continue + } + + select { + case <-ctx.Done(): + return ctx.Err() + case appliedCh <- applied: + applied = nil + } + } + }) + + // Finalize. + g.Go(func() error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case roots := <-appliedCh: + finalized, err := w.finalize(roots) + if err != nil { + return fmt.Errorf("failed to finalize: %w", err) + } + w.notify(finalized) + } + } + }) + + return g.Wait() +} + +func (w *Worker) finalize(roots []storageApi.Root) (Finalized, error) { + err := w.localStorage.NodeDB().Finalize(roots) + + var round uint64 + for _, root := range roots { + round = root.Version + } + + switch { + case err == nil: + w.logger.Debug("finalized", "round", round) + case errors.Is(err, storageApi.ErrAlreadyFinalized): + // This can happen if we are restoring after a roothash migration or if + // we crashed before updating the sync state. + w.logger.Warn("already finalized", "round", round) + default: + return Finalized{}, fmt.Errorf("failed to finalize (round: %d): %w", round, err) + } + return Finalized{round, roots}, nil +} + +func (w *Worker) notify(f Finalized) { + select { + case w.updates <- f: + default: + select { + case <-w.updates: + default: + } + w.updates <- f + } +} diff --git a/go/worker/storage/committee/utils.go b/go/worker/storage/committee/utils.go index 863b9fc7bd0..d74f92081f9 100644 --- a/go/worker/storage/committee/utils.go +++ b/go/worker/storage/committee/utils.go @@ -3,12 +3,8 @@ package committee import ( "fmt" "strings" - "time" - - "github.com/cenkalti/backoff/v4" "github.com/oasisprotocol/oasis-core/go/common" - cmnBackoff "github.com/oasisprotocol/oasis-core/go/common/backoff" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" ) @@ -53,22 +49,6 @@ func (o outstandingMask) hasAll() bool { return o == outstandingMaskFull } -type inFlight struct { - startedAt time.Time - outstanding outstandingMask - awaitingRetry outstandingMask -} - -func (i *inFlight) scheduleDiff(rootType storageApi.RootType) { - i.outstanding.add(rootType) - i.awaitingRetry.remove(rootType) -} - -func (i *inFlight) retry(rootType storageApi.RootType) { - i.outstanding.remove(rootType) - i.awaitingRetry.add(rootType) -} - // blockSummary is a short summary of a single block.Block. type blockSummary struct { Namespace common.Namespace `json:"namespace"` @@ -87,21 +67,3 @@ func summaryFromBlock(blk *block.Block) *blockSummary { Roots: blk.Header.StorageRoots(), } } - -type heartbeat struct { - *backoff.Ticker -} - -func (h *heartbeat) reset() { - if h.Ticker != nil { - h.Stop() - } - - boff := cmnBackoff.NewExponentialBackOff() - boff.InitialInterval = 5 * time.Second - boff.MaxInterval = 20 * time.Second - h.Ticker = backoff.NewTicker(boff) - - // Gobble the first tick, which is immediate. - <-h.C -} diff --git a/go/worker/storage/committee/worker.go b/go/worker/storage/committee/worker.go index 05016f34998..05fcbecd7f6 100644 --- a/go/worker/storage/committee/worker.go +++ b/go/worker/storage/committee/worker.go @@ -3,7 +3,6 @@ package committee import ( - "container/heap" "context" "errors" "fmt" @@ -14,7 +13,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" "github.com/oasisprotocol/oasis-core/go/common/pubsub" - "github.com/oasisprotocol/oasis-core/go/common/workerpool" "github.com/oasisprotocol/oasis-core/go/config" consensus "github.com/oasisprotocol/oasis-core/go/consensus/api" commonFlags "github.com/oasisprotocol/oasis-core/go/oasis-node/cmd/common/flags" @@ -26,13 +24,13 @@ import ( "github.com/oasisprotocol/oasis-core/go/runtime/host" storageApi "github.com/oasisprotocol/oasis-core/go/storage/api" "github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint" - dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" "github.com/oasisprotocol/oasis-core/go/worker/common/committee" "github.com/oasisprotocol/oasis-core/go/worker/registration" "github.com/oasisprotocol/oasis-core/go/worker/storage/api" + "github.com/oasisprotocol/oasis-core/go/worker/storage/committee/diffsync" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/checkpointsync" - "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" + diffsyncp2p "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/diffsync" storagePub "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/pub" "github.com/oasisprotocol/oasis-core/go/worker/storage/p2p/synclegacy" ) @@ -142,7 +140,7 @@ type Worker struct { localStorage storageApi.LocalBackend - diffSync diffsync.Client + diffSync diffsyncp2p.Client checkpointSync checkpointsync.Client legacyStorageSync synclegacy.Client @@ -235,7 +233,7 @@ func New( // Advertise and serve p2p protocols. commonNode.P2P.RegisterProtocolServer(synclegacy.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) - commonNode.P2P.RegisterProtocolServer(diffsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) + commonNode.P2P.RegisterProtocolServer(diffsyncp2p.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) if config.GlobalConfig.Storage.Checkpointer.Enabled { commonNode.P2P.RegisterProtocolServer(checkpointsync.NewServer(commonNode.ChainContext, commonNode.Runtime.ID(), localStorage)) } @@ -245,7 +243,7 @@ func New( // Create p2p protocol clients. w.legacyStorageSync = synclegacy.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) - w.diffSync = diffsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) + w.diffSync = diffsyncp2p.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) w.checkpointSync = checkpointsync.NewClient(commonNode.P2P, commonNode.ChainContext, commonNode.Runtime.ID()) return w, nil @@ -367,103 +365,6 @@ func (w *Worker) GetLastSynced() (uint64, storageApi.Root, storageApi.Root) { return w.syncedState.Round, io, state } -func (w *Worker) fetchDiff(ctx context.Context, round uint64, prevRoot, thisRoot storageApi.Root) { - result := &fetchedDiff{ - fetched: false, - pf: rpc.NewNopPeerFeedback(), - round: round, - prevRoot: prevRoot, - thisRoot: thisRoot, - } - defer func() { - select { - case w.diffCh <- result: - case <-ctx.Done(): - } - }() - - // Check if the new root doesn't already exist. - if w.localStorage.NodeDB().HasRoot(thisRoot) { - return - } - - result.fetched = true - - // Even if HasRoot returns false the root can still exist if it is equal - // to the previous root and the root was emitted by the consensus committee - // directly (e.g., during an epoch transition). - if thisRoot.Hash.Equal(&prevRoot.Hash) { - result.writeLog = storageApi.WriteLog{} - return - } - - // New root does not yet exist in storage and we need to fetch it from a peer. - w.logger.Debug("calling GetDiff", - "old_root", prevRoot, - "new_root", thisRoot, - ) - - wl, pf, err := w.getDiff(ctx, prevRoot, thisRoot) - if err != nil { - result.err = err - return - } - result.pf = pf - result.writeLog = wl -} - -// getDiff fetches writelog using diff sync p2p protocol client. -// -// In case of no peers or error, it fallbacks to the legacy storage sync protocol. -func (w *Worker) getDiff(ctx context.Context, prevRoot, thisRoot storageApi.Root) (storageApi.WriteLog, rpc.PeerFeedback, error) { - ctx, cancel := context.WithTimeout(ctx, diffResponseTimeout) - defer cancel() - rsp1, pf, err := w.diffSync.GetDiff(ctx, &diffsync.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) - if err == nil { // if NO error - return rsp1.WriteLog, pf, nil - } - - ctx, cancel = context.WithTimeout(ctx, diffResponseTimeout) - defer cancel() - rsp2, pf, err := w.legacyStorageSync.GetDiff(ctx, &synclegacy.GetDiffRequest{StartRoot: prevRoot, EndRoot: thisRoot}) - if err != nil { - return nil, nil, err - } - return rsp2.WriteLog, pf, nil -} - -func (w *Worker) finalize(ctx context.Context, summary *blockSummary) { - err := w.localStorage.NodeDB().Finalize(summary.Roots) - switch err { - case nil: - w.logger.Debug("storage round finalized", - "round", summary.Round, - ) - case storageApi.ErrAlreadyFinalized: - // This can happen if we are restoring after a roothash migration or if - // we crashed before updating the sync state. - w.logger.Warn("storage round already finalized", - "round", summary.Round, - ) - err = nil - default: - w.logger.Error("failed to finalize storage round", - "err", err, - "round", summary.Round, - ) - } - - result := finalizeResult{ - summary: summary, - err: err, - } - - select { - case w.finalizeCh <- result: - case <-ctx.Done(): - } -} - func (w *Worker) initGenesis(ctx context.Context, rt *registryApi.Runtime, genesisBlock *block.Block) error { w.logger.Info("initializing storage at genesis") @@ -1033,287 +934,65 @@ func (w *Worker) Serve(ctx context.Context) error { // nolint: gocyclo w.checkpointer.ForceCheckpoint(genesisBlock.Header.Round) w.checkpointer.Flush() - // Don't register availability immediately, we want to know first how far behind consensus we are. - latestBlockRound := w.undefinedRound - lastFullyAppliedRound := cachedLastRound - - heartbeat := heartbeat{} - heartbeat.reset() - - syncingRounds := make(map[uint64]*inFlight) - summaryCache := make(map[uint64]*blockSummary) - - fetchPool := workerpool.New("storage_fetch/" + w.commonNode.Runtime.ID().String()) - fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - defer fetchPool.Stop() - - triggerRoundFetches := func() { - for i := lastFullyAppliedRound + 1; i <= latestBlockRound; i++ { - syncing, ok := syncingRounds[i] - if ok && syncing.outstanding.hasAll() { - continue - } - - if !ok { - if len(syncingRounds) >= maxInFlightRounds { - break - } - - syncing = &inFlight{ - startedAt: time.Now(), - awaitingRetry: outstandingMaskFull, - } - syncingRounds[i] = syncing - - if i == latestBlockRound { - storageWorkerLastPendingRound.With(w.getMetricLabels()).Set(float64(i)) - } - } - w.logger.Debug("preparing round sync", - "round", i, - "outstanding_mask", syncing.outstanding, - "awaiting_retry", syncing.awaitingRetry, - ) - - prev := summaryCache[i-1] - this := summaryCache[i] - prevRoots := make([]storageApi.Root, len(prev.Roots)) - copy(prevRoots, prev.Roots) - for i := range prevRoots { - if prevRoots[i].Type == storageApi.RootTypeIO { - // IO roots aren't chained, so clear it (but leave cache intact). - prevRoots[i] = storageApi.Root{ - Namespace: this.Namespace, - Version: this.Round, - Type: storageApi.RootTypeIO, - } - prevRoots[i].Hash.Empty() - break - } - } - - for i := range prevRoots { - rootType := prevRoots[i].Type - if !syncing.outstanding.contains(rootType) && syncing.awaitingRetry.contains(rootType) { - syncing.scheduleDiff(rootType) - doneCh := fetchPool.Submit(func() { - w.fetchDiff(ctx, this.Round, prevRoots[i], this.Roots[i]) - }) - wg.Go(func() { - <-doneCh - }) - } - } - } - } + fetcher := diffsync.NewP2PFetcher( + w.commonNode.Runtime.History(), + w.diffSync, + w.legacyStorageSync, + cachedLastRound, + w.undefinedRound, + config.GlobalConfig.Storage.FetcherCount, + ) + wg.Go(func() { + fetcher.Serve(ctx) + }) + syncWorker := diffsync.New(w.commonNode.Runtime.History(), w.localStorage, fetcher) + wg.Go(func() { + syncWorker.Serve(ctx) + }) w.statusLock.Lock() w.status = api.StatusSyncingRounds w.statusLock.Unlock() - pendingApply := &minRoundQueue{} - pendingFinalize := &minRoundQueue{} - - // Main processing loop. When a new block arrives, its state and I/O roots are inspected. - // If missing locally, diffs are fetched from peers, possibly for many rounds in parallel, - // including all missing rounds since the last fully applied one. Fetched diffs are then applied - // in round order, ensuring no gaps. Once a round has all its roots applied, background finalization - // for that round is triggered asynchronously, not blocking concurrent fetching and diff application. blkCh, blkSub, err := w.commonNode.Runtime.History().WatchCommittedBlocks() if err != nil { return fmt.Errorf("failed to subscribe to runtime blocks: %w", err) } defer blkSub.Close() + finalizedCh := syncWorker.Updates() + lastFinalized := cachedLastRound + latestRound := w.undefinedRound + for { - // Drain the Apply and Finalize queues first, before waiting for new events in the select below. - - // Apply fetched writelogs, but only if they are for the round after the last fully applied one - // and current number of pending roots to be finalized is smaller than max allowed. - applyNext := pendingApply.Len() > 0 && - lastFullyAppliedRound+1 == (*pendingApply)[0].GetRound() && - pendingFinalize.Len() < dbApi.MaxPendingVersions-1 // -1 since one may be already finalizing. - if applyNext { - lastDiff := heap.Pop(pendingApply).(*fetchedDiff) - // Apply the write log if one exists. - err = nil - if lastDiff.fetched { - err = w.localStorage.Apply(ctx, &storageApi.ApplyRequest{ - Namespace: lastDiff.thisRoot.Namespace, - RootType: lastDiff.thisRoot.Type, - SrcRound: lastDiff.prevRoot.Version, - SrcRoot: lastDiff.prevRoot.Hash, - DstRound: lastDiff.thisRoot.Version, - DstRoot: lastDiff.thisRoot.Hash, - WriteLog: lastDiff.writeLog, - }) - switch { - case err == nil: - lastDiff.pf.RecordSuccess() - case errors.Is(err, storageApi.ErrExpectedRootMismatch): - lastDiff.pf.RecordBadPeer() - default: - w.logger.Error("can't apply write log", - "err", err, - "old_root", lastDiff.prevRoot, - "new_root", lastDiff.thisRoot, - ) - lastDiff.pf.RecordSuccess() - } + select { + case <-ctx.Done(): + return ctx.Err() + case f, ok := <-finalizedCh: + if !ok { + return fmt.Errorf("diff syncer closed") } - - syncing := syncingRounds[lastDiff.round] + lastFinalized, err = w.flushSyncedState(&blockSummary{ + Namespace: w.commonNode.Runtime.History().RuntimeID(), + Round: f.Round, + Roots: f.Roots, + }) if err != nil { - syncing.retry(lastDiff.thisRoot.Type) + w.logger.Error("failed to flush synced state: %w", err) continue } - syncing.outstanding.remove(lastDiff.thisRoot.Type) - if !syncing.outstanding.isEmpty() || !syncing.awaitingRetry.isEmpty() { - continue + if config.GlobalConfig.Storage.Checkpointer.Enabled { + w.checkpointer.NotifyNewVersion(lastFinalized) } + storageWorkerLastFullRound.With(w.getMetricLabels()).Set(float64(f.Round)) - // We have fully synced the given round. - w.logger.Debug("finished syncing round", "round", lastDiff.round) - delete(syncingRounds, lastDiff.round) - summary := summaryCache[lastDiff.round] - delete(summaryCache, lastDiff.round-1) - lastFullyAppliedRound = lastDiff.round - - storageWorkerLastSyncedRound.With(w.getMetricLabels()).Set(float64(lastDiff.round)) - storageWorkerRoundSyncLatency.With(w.getMetricLabels()).Observe(time.Since(syncing.startedAt).Seconds()) - - // Finalize storage for this round. This happens asynchronously - // with respect to Apply operations for subsequent rounds. - heap.Push(pendingFinalize, summary) - - continue - } - - // Check if any new rounds were fully applied and need to be finalized. - // Only finalize if it's the round after the one that was finalized last. - // As a consequence at most one finalization can be happening at the time. - if len(*pendingFinalize) > 0 && cachedLastRound+1 == (*pendingFinalize)[0].GetRound() { - lastSummary := heap.Pop(pendingFinalize).(*blockSummary) - wg.Go(func() { // Don't block fetching and applying remaining rounds. - w.finalize(ctx, lastSummary) - }) - continue - } - - select { - case blk := <-blkCh: + case annBlk := <-blkCh: + latestRound = annBlk.Block.Header.Round w.logger.Debug("incoming block", - "round", blk.Block.Header.Round, - "last_synced", lastFullyAppliedRound, - "last_finalized", cachedLastRound, + "latest_round", latestRound, + "last_finalized", lastFinalized, ) - - // Check if we're far enough to reasonably register as available. - latestBlockRound = blk.Block.Header.Round - w.nudgeAvailability(cachedLastRound, latestBlockRound) - - if _, ok := summaryCache[lastFullyAppliedRound]; !ok && lastFullyAppliedRound == w.undefinedRound { - dummy := blockSummary{ - Namespace: blk.Block.Header.Namespace, - Round: lastFullyAppliedRound + 1, - Roots: []storageApi.Root{ - { - Version: lastFullyAppliedRound + 1, - Type: storageApi.RootTypeIO, - }, - { - Version: lastFullyAppliedRound + 1, - Type: storageApi.RootTypeState, - }, - }, - } - dummy.Roots[0].Empty() - dummy.Roots[1].Empty() - summaryCache[lastFullyAppliedRound] = &dummy - } - // Determine if we need to fetch any old block summaries. In case the first - // round is an undefined round, we need to start with the following round - // since the undefined round may be unsigned -1 and in this case the loop - // would not do any iterations. - startSummaryRound := lastFullyAppliedRound - if startSummaryRound == w.undefinedRound { - startSummaryRound++ - } - for i := startSummaryRound; i < blk.Block.Header.Round; i++ { - if _, ok := summaryCache[i]; ok { - continue - } - var oldBlock *block.Block - oldBlock, err = w.commonNode.Runtime.History().GetCommittedBlock(ctx, i) - if err != nil { - return fmt.Errorf("failed to get block for round %d (current round: %d): %w", i, blk.Block.Header.Round, err) - } - summaryCache[i] = summaryFromBlock(oldBlock) - } - if _, ok := summaryCache[blk.Block.Header.Round]; !ok { - summaryCache[blk.Block.Header.Round] = summaryFromBlock(blk.Block) - } - - triggerRoundFetches() - heartbeat.reset() - - case <-heartbeat.C: - if latestBlockRound != w.undefinedRound { - w.logger.Debug("heartbeat", "in_flight_rounds", len(syncingRounds)) - triggerRoundFetches() - } - - case item := <-w.diffCh: - if item.err != nil { - w.logger.Error("error calling getdiff", - "err", item.err, - "round", item.round, - "old_root", item.prevRoot, - "new_root", item.thisRoot, - "fetched", item.fetched, - ) - syncingRounds[item.round].retry(item.thisRoot.Type) - break - } - - heap.Push(pendingApply, item) - // Item was successfully processed, trigger more round fetches. - // This ensures that new rounds are processed as fast as possible - // when we're syncing and are far behind. - triggerRoundFetches() - - case finalized := <-w.finalizeCh: - // If finalization failed, things start falling apart. - // There's no point redoing it, since it's probably not a transient - // error, and cachedLastRound also can't be updated legitimately. - if finalized.err != nil { - return fmt.Errorf("failed to finalize (round: %d): %w", finalized.summary.Round, finalized.err) - } - - // No further sync or out of order handling needed here, since - // only one finalize at a time is triggered (for round cachedLastRound+1) - cachedLastRound, err = w.flushSyncedState(finalized.summary) - if err != nil { - w.logger.Error("failed to flush synced state", - "err", err, - ) - } - storageWorkerLastFullRound.With(w.getMetricLabels()).Set(float64(finalized.summary.Round)) - - // Check if we're far enough to reasonably register as available. - w.nudgeAvailability(cachedLastRound, latestBlockRound) - - // Notify the checkpointer that there is a new finalized round. - if config.GlobalConfig.Storage.Checkpointer.Enabled { - w.checkpointer.NotifyNewVersion(finalized.summary.Round) - } - - case <-ctx.Done(): - return ctx.Err() } + w.nudgeAvailability(lastFinalized, latestRound) } - - // blockCh will be garbage-collected without being closed. It can potentially still contain - // some new blocks, but only as many as were already in-flight at the point when the main - // context was canceled. }