diff --git a/Cargo.lock b/Cargo.lock index a5e250b864..69ea087e57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3715,6 +3715,7 @@ dependencies = [ "rayon", "thiserror 2.0.12", "tl-proto", + "tracing", "tycho-storage-traits", "tycho-types", "tycho-util", @@ -3909,6 +3910,7 @@ dependencies = [ "rand 0.9.1", "scopeguard", "serde", + "serde_json", "sha2", "similar-asserts", "smallvec", diff --git a/block-util/Cargo.toml b/block-util/Cargo.toml index 9b953f5272..7ba7ba27cf 100644 --- a/block-util/Cargo.toml +++ b/block-util/Cargo.toml @@ -28,6 +28,7 @@ parking_lot = { workspace = true } rayon = { workspace = true } thiserror = { workspace = true } tl-proto = { workspace = true } +tracing = { workspace = true } tycho-types = { workspace = true, features = ["blake3", "rayon"] } # local deps diff --git a/block-util/src/block/mod.rs b/block-util/src/block/mod.rs index 9923abf079..5ec098d0ca 100644 --- a/block-util/src/block/mod.rs +++ b/block-util/src/block/mod.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use tycho_types::cell::HashBytes; use tycho_types::models::ShardIdent; @@ -14,6 +16,44 @@ mod block_proof_stuff; mod block_stuff; mod top_blocks; +pub fn split_shard_ident(workchain: i32, depth: u8) -> Vec { + assert!( + depth <= ShardIdent::MAX_SPLIT_DEPTH, + "split depth is too big", + ); + struct SplitShardCx { + shard: ShardIdent, + remaining_split_depth: u8, + } + let mut split_queue = VecDeque::new(); + split_queue.push_back(SplitShardCx { + shard: ShardIdent::new_full(workchain), + remaining_split_depth: depth, + }); + let mut shards = vec![]; + while let Some(shard_split_cx) = split_queue.pop_front() { + if shard_split_cx.remaining_split_depth > 0 + && let Some((left, right)) = shard_split_cx.shard.split() + { + let remaining_split_depth = shard_split_cx.remaining_split_depth.saturating_sub(1); + if remaining_split_depth > 0 { + split_queue.push_back(SplitShardCx { + shard: left, + remaining_split_depth, + }); + split_queue.push_back(SplitShardCx { + shard: right, + remaining_split_depth, + }); + } else { + shards.push(left); + shards.push(right); + } + } + } + shards +} + pub fn shard_ident_at_depth(workchain: i32, account: &HashBytes, depth: u8) -> ShardIdent { assert!( depth <= ShardIdent::MAX_SPLIT_DEPTH, @@ -31,6 +71,18 @@ pub fn shard_ident_at_depth(workchain: i32, account: &HashBytes, depth: u8) -> S ShardIdent::new(workchain, (prefix & mask) | tag).expect("computed prefix should be valid") } +pub struct DisplayShardPrefix<'a>(pub &'a u64); +impl std::fmt::Display for DisplayShardPrefix<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:016x}", self.0)) + } +} +impl std::fmt::Debug for DisplayShardPrefix<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/block-util/src/dict.rs b/block-util/src/dict.rs index 4eb42f73ae..ba9a20e78a 100644 --- a/block-util/src/dict.rs +++ b/block-util/src/dict.rs @@ -183,6 +183,14 @@ where if prefix_len > 0 { builder.store_uint(shard.prefix() >> (64 - prefix_len), prefix_len)?; } + tracing::debug!( + depth, + %shard, + hash = ?dict.dict().root().as_ref().map(|c| c.repr_hash()), + left_shard = %left_shard_ident, + right_shard = %right_shard_ident, + "split_aug_dict step", + ); dict.split_by_prefix(&builder.as_data_slice())? }; @@ -211,6 +219,15 @@ where K: DictKey, A: Default, { + let (dict, _) = dict.into_parts(); + split_dict_raw(dict.into_root(), K::BITS, depth) +} + +pub fn split_dict_raw( + dict: Option, + key_bit_len: u16, + depth: u8, +) -> Result, Error> { fn split_dict_impl( dict: Option, key_bit_len: u16, @@ -241,8 +258,69 @@ where let mut shards = FastHashMap::with_capacity_and_hasher(2usize.pow(depth as _), Default::default()); + split_dict_impl(dict, key_bit_len, depth, &mut shards)?; + + Ok(shards) +} + +pub fn split_aug_dict_raw_by_shards( + workchain: i32, + dict: AugDict, + depth: u8, +) -> Result)>, Error> +where + K: DictKey, + A: Default, +{ + fn split_dict_impl( + shard: &ShardIdent, + dict: Option, + key_bit_len: u16, + depth: u8, + shards: &mut Vec<(ShardIdent, Option)>, + ) -> Result<(), Error> { + if depth == 0 { + shards.push((*shard, dict)); + return Ok(()); + } + + let Some((left_shard, right_shard)) = shard.split() else { + shards.push((*shard, dict)); + return Ok(()); + }; + + let PartialSplitDict { + remaining_bit_len, + left_branch, + right_branch, + } = dict_split_raw(dict.as_ref(), key_bit_len, Cell::empty_context())?; + + split_dict_impl( + &left_shard, + left_branch, + remaining_bit_len, + depth - 1, + shards, + )?; + split_dict_impl( + &right_shard, + right_branch, + remaining_bit_len, + depth - 1, + shards, + ) + } + + let mut shards = Vec::with_capacity(2usize.pow(depth as _)); + let (dict_root, _) = dict.into_parts(); - split_dict_impl(dict_root.into_root(), K::BITS, depth, &mut shards)?; + split_dict_impl( + &ShardIdent::new_full(workchain), + dict_root.into_root(), + K::BITS, + depth, + &mut shards, + )?; Ok(shards) } diff --git a/collator/src/collator/do_collate/finalize.rs b/collator/src/collator/do_collate/finalize.rs index 2ce28c137f..44929cee88 100644 --- a/collator/src/collator/do_collate/finalize.rs +++ b/collator/src/collator/do_collate/finalize.rs @@ -284,6 +284,7 @@ impl Phase { )) } + #[tracing::instrument(skip_all)] pub fn finalize_block( mut self, ctx: FinalizeBlockContext, @@ -1179,6 +1180,7 @@ impl Phase { Ok(min_ref_mc_seqno) } + #[tracing::instrument(skip_all)] fn build_accounts( executor: MessagesExecutor, config_address: &HashBytes, diff --git a/collator/src/collator/execution_manager.rs b/collator/src/collator/execution_manager.rs index ac4ae00fca..82e922711c 100644 --- a/collator/src/collator/execution_manager.rs +++ b/collator/src/collator/execution_manager.rs @@ -103,6 +103,11 @@ impl MessagesExecutor { tracing::trace!(target: tracing_targets::EXEC_MANAGER, "execute messages group"); let labels = &[("workchain", self.shard_id.workchain().to_string())]; + let _hist = HistogramGuard::begin_with_labels( + "tycho_do_collate_one_tick_group_exec_time_high", + labels, + ); + let mut ext_msgs_skipped = 0; let group_horizontal_size = msg_group.len(); @@ -116,7 +121,6 @@ impl MessagesExecutor { let mut items = Vec::with_capacity(group_messages_count); let mut ext_msgs_error_count = 0; - let mut max_account_msgs_exec_time = Duration::ZERO; let mut total_exec_time = Duration::ZERO; let mut group_gas = 0u128; @@ -147,7 +151,6 @@ impl MessagesExecutor { for executed in result { self.save_subgroup_result( &mut ext_msgs_skipped, - &mut max_account_msgs_exec_time, &mut total_exec_time, &mut ext_msgs_error_count, &mut group_max_vert_size, @@ -173,7 +176,6 @@ impl MessagesExecutor { group_horizontal_size, group_max_vert_size, group_accounts_count, total_exec_time = %format_duration(total_exec_time), mean_account_msgs_exec_time = %format_duration(mean_account_msgs_exec_time), - max_account_msgs_exec_time = %format_duration(max_account_msgs_exec_time), group_messages_count, group_gas, group_exec_wu, "execute_group", ); @@ -191,11 +193,6 @@ impl MessagesExecutor { labels ) .record(mean_account_msgs_exec_time); - metrics::histogram!( - "tycho_do_collate_one_tick_account_msgs_exec_max_time", - labels - ) - .record(max_account_msgs_exec_time); Ok(ExecutedGroup { items, @@ -206,6 +203,7 @@ impl MessagesExecutor { } #[allow(clippy::vec_box)] + #[tracing::instrument(skip_all, fields(%account_id))] fn execute_subgroup( account_id: HashBytes, msgs: Vec>, @@ -222,7 +220,6 @@ impl MessagesExecutor { fn save_subgroup_result( &mut self, ext_msgs_skipped: &mut u64, - max_account_msgs_exec_time: &mut Duration, total_exec_time: &mut Duration, ext_msgs_error_count: &mut u64, group_max_vert_size: &mut usize, @@ -233,7 +230,6 @@ impl MessagesExecutor { ) -> Result<()> { *ext_msgs_skipped += executed.ext_msgs_skipped; - *max_account_msgs_exec_time = (*max_account_msgs_exec_time).max(executed.exec_time); *total_exec_time += executed.exec_time; *group_max_vert_size = cmp::max(*group_max_vert_size, executed.transactions.len()); @@ -276,6 +272,7 @@ impl MessagesExecutor { } #[allow(clippy::vec_box)] + #[tracing::instrument(skip_all)] fn execute_messages( mut account_state: Box, msgs: Vec>, @@ -403,7 +400,13 @@ impl AccountsCache { Ok(None) } + #[tracing::instrument(skip_all)] fn get_account_stuff(&self, account_id: &AccountId) -> Result> { + let labels = [("workchain", self.workchain_id.to_string())]; + let _hist = HistogramGuard::begin_with_labels( + "tycho_collator_get_account_stuff_time_high", + &labels, + ); if let Some(account) = self.items.get(account_id) { Ok(account.clone()) } else if let Some((_depth, shard_account)) = self.shard_accounts.get(account_id)? { diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index b858f9187c..f33d9419a1 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -666,8 +666,15 @@ impl CollatorStdImpl { // get last store task let last_task = self.store_new_state_tasks.pop().expect("shouldn't happen"); + let is_last = self.store_new_state_tasks.is_empty(); + // if it is finished, then we can just reload prev state - if last_task.store_new_state_task.is_finished() { + if last_task.store_new_state_task.is_finished() + // or when unfinished tasks limit is 0 + || self.config.merkle_chain_limit == 0 + // or this task is the last in queue + || is_last + { last_task.store_new_state_task.await?; // and reload pure prev state in the working state @@ -686,7 +693,9 @@ impl CollatorStdImpl { // collect all unfinished tasks if !task.store_new_state_task.is_finished() + // until the limit reached && unfinished_tasks.len() < self.config.merkle_chain_limit + // or until we reached the last task && !is_last { unfinished_tasks.push(task); @@ -1016,7 +1025,7 @@ impl CollatorStdImpl { }); // Keep only the last `merkle_chain_limit` states alive - if self.store_state_refs.len() == self.config.merkle_chain_limit + if self.store_state_refs.len() >= self.config.merkle_chain_limit && let Some(old_ref) = self.store_state_refs.pop_front() { // NOTE: State update can be quite huge so drop it outside of tokio. diff --git a/core/Cargo.toml b/core/Cargo.toml index 4bb927f58f..fba2ea63ed 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -77,6 +77,7 @@ tycho-crypto = { workspace = true } tycho-network = { workspace = true, features = ["test"] } tycho-storage = { workspace = true, features = ["test"] } tycho-util = { workspace = true, features = ["test"] } +serde_json = { workspace = true, features = ["preserve_order"] } similar-asserts = { workspace = true } [features] diff --git a/core/src/storage/block_handle/handle.rs b/core/src/storage/block_handle/handle.rs index da498b64f4..6f0e67e46d 100644 --- a/core/src/storage/block_handle/handle.rs +++ b/core/src/storage/block_handle/handle.rs @@ -105,7 +105,19 @@ impl BlockHandle { } pub fn has_state(&self) -> bool { - self.inner.meta.flags().contains(BlockFlags::HAS_STATE) + let main_state_and_parts = BlockFlags::HAS_STATE_MAIN.union(BlockFlags::HAS_STATE_PARTS); + self.inner.meta.flags().contains(main_state_and_parts) + } + + pub fn has_state_main(&self) -> bool { + self.inner.meta.flags().contains(BlockFlags::HAS_STATE_MAIN) + } + + pub fn has_state_parts(&self) -> bool { + self.inner + .meta + .flags() + .contains(BlockFlags::HAS_STATE_PARTS) } pub fn has_persistent_shard_state(&self) -> bool { diff --git a/core/src/storage/block_handle/meta.rs b/core/src/storage/block_handle/meta.rs index 800e29f4d6..cd459e05ae 100644 --- a/core/src/storage/block_handle/meta.rs +++ b/core/src/storage/block_handle/meta.rs @@ -119,7 +119,7 @@ bitflags::bitflags! { const HAS_PROOF = 1 << 1; const HAS_QUEUE_DIFF = 1 << 2; - const HAS_STATE = 1 << 3; + const HAS_STATE_MAIN = 1 << 3; const HAS_PERSISTENT_SHARD_STATE = 1 << 4; const HAS_PERSISTENT_QUEUE_STATE = 1 << 5; @@ -131,6 +131,8 @@ bitflags::bitflags! { const IS_KEY_BLOCK = 1 << 11; const IS_PERSISTENT = 1 << 12; + const HAS_STATE_PARTS = 1 << 13; + const IS_REMOVED = 1 << 15; // Composite flags diff --git a/core/src/storage/config.rs b/core/src/storage/config.rs index 9c231cd786..97babc6fda 100644 --- a/core/src/storage/config.rs +++ b/core/src/storage/config.rs @@ -1,9 +1,10 @@ +use std::path::PathBuf; use std::time::Duration; use bytesize::ByteSize; use serde::{Deserialize, Serialize}; use tycho_util::config::PartialConfig; -use tycho_util::serde_helpers; +use tycho_util::{FastHashMap, serde_helpers}; #[derive(Debug, Clone, Serialize, Deserialize, PartialConfig)] #[serde(deny_unknown_fields, default)] @@ -29,6 +30,11 @@ pub struct CoreStorageConfig { /// States GC is disabled if this field is `None`. pub states_gc: Option, + /// State partitions config. + /// + /// State partitioning is disabled if this field is `None`. + pub state_parts: Option, + /// Blocks GC config. /// /// Blocks GC is disabled if this field is `None`. @@ -61,6 +67,7 @@ impl Default for CoreStorageConfig { drop_interval: 3, archives_gc: Some(ArchivesGcConfig::default()), states_gc: Some(StatesGcConfig::default()), + state_parts: None, blocks_gc: Some(BlocksGcConfig::default()), blocks_cache: BlocksCacheConfig::default(), blob_db: BlobDbConfig::default(), @@ -191,3 +198,128 @@ impl Default for BlobDbConfig { } } } + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(default, deny_unknown_fields)] +pub struct StatePartitionsConfig { + /// State partitions split depth + /// + /// Default: 0 -> 2^0 = 1 partition, no split. + pub split_depth: u8, + + /// Map of state partitions directories. + /// + /// Default: empty, relative paths will be generated. + #[serde(with = "serde_shard_part_dirs_map")] + pub part_dirs: FastHashMap, +} + +mod serde_shard_part_dirs_map { + use std::path::PathBuf; + + use serde::de::Deserializer; + use serde::ser::{SerializeMap, Serializer}; + use tycho_block_util::block::DisplayShardPrefix; + use tycho_util::FastHashMap; + + use super::*; + + pub fn serialize(value: &FastHashMap, serializer: S) -> Result + where + S: Serializer, + { + #[repr(transparent)] + struct WrappedKey<'a>(&'a u64); + impl Serialize for WrappedKey<'_> { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + serializer.serialize_str(&DisplayShardPrefix(self.0).to_string()) + } + } + + let mut ser = serializer.serialize_map(Some(value.len()))?; + for (prefix, path) in value { + ser.serialize_entry(&WrappedKey(prefix), &path)?; + } + ser.end() + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + #[derive(PartialEq, Eq, Hash)] + #[repr(transparent)] + struct WrappedKey(u64); + impl<'de> Deserialize<'de> for WrappedKey { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let tmp = String::deserialize(deserializer)?; + if tmp.len() != 16 || !tmp.chars().all(|c| c.is_ascii_hexdigit()) { + return Err(serde::de::Error::custom(format!( + "invalid shard prefix key format '{tmp}', expected 16 hex digits (e.g. a000000000000000)" + ))); + } + let prefix = u64::from_str_radix(&tmp, 16).map_err(serde::de::Error::custom)?; + Ok(Self(prefix)) + } + } + + >::deserialize(deserializer) + .map(|map| map.into_iter().map(|(k, v)| (k.0, v)).collect()) + } +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use super::*; + + #[test] + pub fn test_state_partitions_config() { + let test: &str = r#"{ + "split_depth": 2, + "part_dirs": { + "2000000000000000": "/dev1/node/data/cells-part-1", + "6000000000000000": "/dev2/node/data/cells-part-2", + "a000000000000000": "/dev3/node/data/cells-part-3", + "e000000000000000": "/dev4/node/data/cells-part-4" + } + }"#; + + let mut value = StatePartitionsConfig::default(); + value + .part_dirs + .insert(234567890, "/dev1/node/data/cells-part".into()); + println!("test: {:?}", test); + + let parsed: StatePartitionsConfig = serde_json::from_str(test).unwrap(); + println!("parsed: {:?}", parsed); + + assert_eq!(parsed.split_depth, 2); + let path1 = parsed.part_dirs.get(&2305843009213693952).unwrap(); + let expected = PathBuf::from_str("/dev1/node/data/cells-part-1").unwrap(); + assert_eq!(path1, &expected); + + let serialized = serde_json::to_string(&parsed).unwrap(); + println!("test serialized: {:?}", serialized); + + let mut value = StatePartitionsConfig::default(); + value + .part_dirs + .insert(234567890, "/dev99/node/data/cells-part".into()); + + let test = serde_json::to_string(&value).unwrap(); + println!("test: {:?}", test); + + let parsed: StatePartitionsConfig = serde_json::from_str(&test).unwrap(); + println!("parsed: {:?}", parsed); + + assert_eq!(parsed, value); + } +} diff --git a/core/src/storage/db.rs b/core/src/storage/db.rs index a42ca149e2..067545913e 100644 --- a/core/src/storage/db.rs +++ b/core/src/storage/db.rs @@ -1,13 +1,17 @@ +use std::sync::Arc; + use tycho_storage::kv::{ - Migrations, NamedTables, StateVersionProvider, TableContext, WithMigrations, + Migrations, NamedTables, StateVersionProvider, StoredValue, TableContext, WithMigrations, }; use tycho_util::sync::CancellationFlag; -use weedb::{MigrationError, Semver, VersionProvider, WeeDb}; +use weedb::{MigrationError, Semver, Table, VersionProvider, WeeDb, rocksdb}; +use super::block_handle::{BlockFlags, BlockMeta}; use super::tables; pub type CoreDb = WeeDb; pub type CellsDb = WeeDb; +pub type CellsPartDb = WeeDb; pub trait CoreDbExt { fn normalize_version(&self) -> anyhow::Result<()>; @@ -71,7 +75,7 @@ impl NamedTables for CoreTables { } impl WithMigrations for CoreTables { - const VERSION: Semver = [0, 0, 4]; + const VERSION: Semver = [0, 0, 5]; type VersionProvider = StateVersionProvider; @@ -80,9 +84,79 @@ impl WithMigrations for CoreTables { } fn register_migrations( - _migrations: &mut Migrations, - _cancelled: CancellationFlag, + migrations: &mut Migrations, + cancelled: CancellationFlag, ) -> Result<(), MigrationError> { + let cancelled = cancelled.clone(); + migrations.register([0, 0, 4], [0, 0, 5], move |db| { + const BATCH_LIMIT: usize = 1000; + + let mut batch = rocksdb::WriteBatch::default(); + let mut pending = 0; + let mut updated = 0; + + let mut iter = db.block_handles.raw_iterator(); + iter.seek_to_first(); + iter.status().map_err(MigrationError::DbError)?; + let mut started = false; + while { + if started { + iter.next(); + } + iter.valid() + } { + started = true; + + if cancelled.check() { + return Err(MigrationError::Custom(Box::new(std::io::Error::new( + std::io::ErrorKind::Interrupted, + "migration cancelled", + )))); + } + + let key = match iter.key() { + Some(key) => key, + None => break, + }; + let value = match iter.value() { + Some(value) => value, + None => continue, + }; + + let meta = BlockMeta::from_slice(value); + if !meta.add_flags(BlockFlags::HAS_STATE_PARTS) { + continue; + } + + batch.merge_cf(&db.block_handles.cf(), key, meta.to_vec()); + + // write batch when limit reached + pending += 1; + updated += 1; + if pending >= BATCH_LIMIT { + db.rocksdb() + .write_opt(batch, db.block_handles.write_config()) + .map_err(MigrationError::DbError)?; + batch = rocksdb::WriteBatch::default(); + pending = 0; + } + } + + // write last batch + if pending > 0 { + db.rocksdb() + .write_opt(batch, db.block_handles.write_config()) + .map_err(MigrationError::DbError)?; + } + + tracing::info!( + updated, + "migration: added HAS_STATE_PARTS flag to existing block handles" + ); + + Ok(()) + })?; + Ok(()) } } @@ -129,3 +203,104 @@ impl WithMigrations for CellsTables { Ok(()) } } + +weedb::tables! { + pub struct CellsPartTables { + pub state: tables::State, + + pub shard_states: tables::ShardStates, + pub cells: tables::Cells, + pub temp_cells: tables::TempCells, + } +} + +impl NamedTables for CellsPartTables { + const NAME: &'static str = "cells-part"; +} + +impl WithMigrations for CellsPartTables { + const VERSION: Semver = [0, 0, 1]; + + type VersionProvider = StateVersionProvider; + + fn new_version_provider() -> Self::VersionProvider { + StateVersionProvider::new::() + } + + fn register_migrations( + _migrations: &mut Migrations, + _cancelled: CancellationFlag, + ) -> Result<(), MigrationError> { + Ok(()) + } +} + +impl CoreDbExt for CellsPartDb { + fn normalize_version(&self) -> anyhow::Result<()> { + let provider = CellsPartTables::new_version_provider(); + + // Check if there is NO VERSION + if provider.get_version(self.raw())?.is_some() { + return Ok(()); + } + + // Check if the DB is NOT EMPTY + { + let mut cells_iter = self.cells.raw_iterator(); + cells_iter.seek_to_first(); + cells_iter.status()?; + if cells_iter.item().is_none() { + return Ok(()); + } + } + + // Set the initial version + tracing::warn!("normalizing DB version for cells parts"); + provider.set_version(self.raw(), [0, 0, 1])?; + Ok(()) + } +} + +/// The abstraction over `CellsDb` and `CellsShardDb` +pub(super) trait CellsDbOps: Send + Sync { + fn shard_states(&self) -> &Table; + fn cells(&self) -> &Table; + fn temp_cells(&self) -> &Table; + fn rocksdb(&self) -> &Arc; +} + +#[derive(Clone)] +pub(super) enum CellStorageDb { + Main(CellsDb), + Part(CellsPartDb), +} + +impl CellsDbOps for CellStorageDb { + fn shard_states(&self) -> &Table { + match self { + Self::Main(db) => &db.shard_states, + Self::Part(db) => &db.shard_states, + } + } + + fn cells(&self) -> &Table { + match self { + Self::Main(db) => &db.cells, + Self::Part(db) => &db.cells, + } + } + + fn temp_cells(&self) -> &Table { + match self { + Self::Main(db) => &db.temp_cells, + Self::Part(db) => &db.temp_cells, + } + } + + fn rocksdb(&self) -> &Arc { + match self { + Self::Main(db) => db.rocksdb(), + Self::Part(db) => db.rocksdb(), + } + } +} diff --git a/core/src/storage/mod.rs b/core/src/storage/mod.rs index cb6d580b93..a8b8ff81e0 100644 --- a/core/src/storage/mod.rs +++ b/core/src/storage/mod.rs @@ -1,6 +1,9 @@ +use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use anyhow::Result; +use tycho_block_util::block::{DisplayShardPrefix, split_shard_ident}; use tycho_storage::StorageContext; use tycho_storage::kv::ApplyMigrations; @@ -17,14 +20,15 @@ pub use self::config::{ ArchivesGcConfig, BlocksCacheConfig, BlocksGcConfig, BlocksGcType, CoreStorageConfig, StatesGcConfig, }; -pub use self::db::{CellsDb, CoreDb, CoreDbExt, CoreTables}; +pub use self::db::{CellsDb, CellsPartDb, CoreDb, CoreDbExt, CoreTables}; pub use self::node_state::{NodeStateStorage, NodeSyncState}; pub use self::persistent_state::{ BriefBocHeader, PersistentStateInfo, PersistentStateKind, PersistentStateStorage, QueueDiffReader, QueueStateReader, QueueStateWriter, ShardStateReader, ShardStateWriter, }; pub use self::shard_state::{ - ShardStateStorage, ShardStateStorageError, ShardStateStorageMetrics, StoreStateHint, + ShardStateStorage, ShardStateStorageContext, ShardStateStorageError, ShardStateStorageMetrics, + ShardStateStoragePart, ShardStateStoragePartImpl, StoragePartsMap, StoreStateHint, }; pub mod tables; @@ -80,14 +84,20 @@ impl CoreStorage { ) .await?; let block_storage = Arc::new(block_storage); - let shard_state_storage = ShardStateStorage::new( - cells_db.clone(), - block_handle_storage.clone(), - block_storage.clone(), - ctx.temp_files().clone(), - config.cells_cache_size, - config.drop_interval, - )?; + + // try init state partitions if configured + let (part_split_depth, storage_parts) = try_init_state_partitions(&ctx, &config).await?; + + let shard_state_storage = ShardStateStorage::new(ShardStateStorageContext { + cells_db: cells_db.clone(), + block_handle_storage: block_handle_storage.clone(), + block_storage: block_storage.clone(), + temp_file_storage: ctx.temp_files().clone(), + cache_size_bytes: config.cells_cache_size, + drop_interval: config.drop_interval, + part_split_depth, + storage_parts, + })?; let persistent_state_storage = PersistentStateStorage::new( cells_db.clone(), ctx.files_dir(), @@ -174,3 +184,42 @@ struct Inner { node_state_storage: NodeStateStorage, persistent_state_storage: PersistentStateStorage, } + +async fn try_init_state_partitions( + ctx: &StorageContext, + config: &CoreStorageConfig, +) -> Result<(u8, Arc)> { + let Some(state_parts_config) = &config.state_parts else { + return Ok(Default::default()); + }; + + let mut storage_parts = StoragePartsMap::default(); + + // NOTE: workchain_id does not matter because we use only shard prefixes + let shards = split_shard_ident(0, state_parts_config.split_depth); + for shard in shards { + let shard_prefix = shard.prefix(); + let path = match state_parts_config.part_dirs.get(&shard_prefix) { + Some(p) => p.clone(), + None => PathBuf::from_str(&format!( + "cells-parts/cells-part-{}", + DisplayShardPrefix(&shard_prefix) + ))?, + }; + let cells_part_db: CellsPartDb = + ctx.open_preconfigured_partition(path, Some(shard_prefix))?; + cells_part_db.normalize_version()?; + cells_part_db.apply_migrations().await?; + storage_parts.insert( + shard_prefix, + Arc::new(ShardStateStoragePartImpl::new( + shard_prefix, + cells_part_db, + config.cells_cache_size, + config.drop_interval, + )), + ); + } + + Ok((state_parts_config.split_depth, Arc::new(storage_parts))) +} diff --git a/core/src/storage/shard_state/cell_storage.rs b/core/src/storage/shard_state/cell_storage.rs index 87c633e606..e06f41f869 100644 --- a/core/src/storage/shard_state/cell_storage.rs +++ b/core/src/storage/shard_state/cell_storage.rs @@ -15,6 +15,7 @@ use bytesize::ByteSize; use dashmap::Map; use quick_cache::sync::{Cache, DefaultLifecycle}; use triomphe::ThinArc; +use tycho_block_util::block::DisplayShardPrefix; use tycho_storage::kv::refcount; use tycho_types::cell::*; use tycho_util::metrics::{HistogramGuard, spawn_metrics_loop}; @@ -22,13 +23,19 @@ use tycho_util::{FastDashMap, FastHashMap, FastHashSet, FastHasherState}; use weedb::rocksdb::WriteBatch; use weedb::{BoundedCfHandle, rocksdb}; -use crate::storage::CellsDb; +use super::{SplitAccountEntry, StoragePartsMap}; +use crate::storage::db::{CellStorageDb, CellsDbOps}; +use crate::storage::{CellsDb, CellsPartDb}; pub struct CellStorage { - cells_db: CellsDb, + cells_db: CellStorageDb, cells_cache: Arc, raw_cells_cache: Arc, drop_interval: u32, + /// Contains the shard prefix of the state partition when used in `ShardStateStoragePart` + part_shard_prefix: Option, + /// State storage partitions + storage_parts: Option>, } type CellsIndex = FastDashMap; @@ -39,7 +46,43 @@ struct CachedCell { } impl CellStorage { - pub fn new(cells_db: CellsDb, cache_size_bytes: ByteSize, drop_interval: u32) -> Arc { + pub fn new( + cells_db: CellsDb, + cache_size_bytes: ByteSize, + drop_interval: u32, + storage_parts: Option>, + ) -> Arc { + Self::new_inner( + CellStorageDb::Main(cells_db), + cache_size_bytes, + drop_interval, + None, + storage_parts, + ) + } + + pub fn new_for_shard( + cells_db: CellsPartDb, + cache_size_bytes: ByteSize, + drop_interval: u32, + part_shard_prefix: ShardPrefix, + ) -> Arc { + Self::new_inner( + CellStorageDb::Part(cells_db), + cache_size_bytes, + drop_interval, + Some(part_shard_prefix), + None, + ) + } + + fn new_inner( + cells_db: CellStorageDb, + cache_size_bytes: ByteSize, + drop_interval: u32, + part_shard_prefix: Option, + storage_parts: Option>, + ) -> Arc { let cells_cache = Default::default(); let raw_cells_cache = Arc::new(RawCellsCache::new(cache_size_bytes.as_u64())); @@ -54,6 +97,8 @@ impl CellStorage { cells_cache, raw_cells_cache, drop_interval, + part_shard_prefix, + storage_parts, }) } @@ -100,9 +145,9 @@ impl CellStorage { Existing, } - struct Context<'a> { + struct Context<'a, D: CellsDbOps> { cells_cf: BoundedCfHandle<'a>, - cells_db: &'a CellsDb, + cells_db: &'a D, buffer: Vec, transaction: FastHashMap, new_cells_batch: rocksdb::WriteBatch, @@ -110,10 +155,10 @@ impl CellStorage { raw_cache: &'a RawCellsCache, } - impl<'a> Context<'a> { - fn new(cells_db: &'a CellsDb, raw_cache: &'a RawCellsCache) -> Self { + impl<'a, D: CellsDbOps> Context<'a, D> { + fn new(cells_db: &'a D, raw_cache: &'a RawCellsCache) -> Self { Self { - cells_cf: cells_db.cells.cf(), + cells_cf: cells_db.cells().cf(), cells_db, buffer: Vec::with_capacity(512), transaction: Default::default(), @@ -124,9 +169,9 @@ impl CellStorage { } fn load_temp(&self, key: &HashBytes) -> Result, CellStorageError> { - let data = match self.cells_db.temp_cells.get(key) { + let data = match self.cells_db.temp_cells().get(key) { Ok(Some(data)) => data, - Ok(None) => return Err(CellStorageError::CellNotFound), + Ok(None) => return Err(CellStorageError::CellNotFound(*key)), Err(e) => return Err(CellStorageError::Internal(e)), }; @@ -163,7 +208,7 @@ impl CellStorage { InsertedCell::Existing } hash_map::Entry::Vacant(entry) => { - if let Some(value) = self.cells_db.cells.get(key)? { + if let Some(value) = self.cells_db.cells().get(key)? { let (rc, value) = refcount::decode_value_with_rc(value.as_ref()); debug_assert!(rc > 0 && value.is_some() || rc == 0 && value.is_none()); if value.is_some() { @@ -270,7 +315,8 @@ impl CellStorage { &self, root: &DynCell, batch: &mut WriteBatch, - split_at: FastHashMap, + split_at: FastHashMap, + split_by_partitions: bool, capacity: usize, ) -> Result { type StoreResult = Result<(), CellStorageError>; @@ -286,12 +332,14 @@ impl CellStorage { buffer: Vec, } - struct StoreContext<'a> { - db: &'a CellsDb, + struct StoreContext<'a, D: CellsDbOps> { + db: &'a D, herd: &'a Herd, raw_cache: &'a RawCellsCache, /// Subtrees to process in parallel. - split_at: FastHashMap, + split_at: FastHashMap, + /// Indicates that accounts are splitted by partitions + split_by_partitions: bool, // TODO: Use `&'a HashBytes` for key? // Pros: // - Less `memcpy` calls; @@ -306,12 +354,13 @@ impl CellStorage { delayed_additions: std::sync::Mutex>, } - impl<'a> StoreContext<'a> { + impl<'a, D: CellsDbOps> StoreContext<'a, D> { fn new( - db: &'a CellsDb, + db: &'a D, herd: &'a Herd, raw_cache: &'a RawCellsCache, - split_accounts: FastHashMap, + split_accounts: FastHashMap, + split_by_partitions: bool, capacity: usize, ) -> Self { Self { @@ -319,6 +368,7 @@ impl CellStorage { raw_cache, herd, split_at: split_accounts, + split_by_partitions, transaction: FastDashMap::with_capacity_and_hasher_and_shard_amount( capacity, Default::default(), @@ -352,9 +402,14 @@ impl CellStorage { }; for child in &mut *iter { - // Skip cell to store it later in parallel let child_hash = child.repr_hash(); if self.split_at.contains_key(child_hash) { + // skip if subtree should be stored in partition + if self.split_by_partitions { + continue 'outer; + } + + // skip cell to store it in parallel let mut delayed_additions = self.delayed_additions.lock().unwrap(); match delayed_additions.entry(*child_hash) { hash_map::Entry::Vacant(entry) => { @@ -364,8 +419,7 @@ impl CellStorage { drop(delayed_additions); // Spawn processing. - // TODO: Handle error properly. - scope.spawn(|| self.traverse_cell(child, scope).unwrap()); + scope.spawn(|| self.traverse_cell(child, scope)); } hash_map::Entry::Occupied(mut entry) => { // Other thread will add this subtree only once, @@ -508,7 +562,7 @@ impl CellStorage { // Merge transaction items into the final batch. let mut buffer = Vec::with_capacity(512); let total = self.transaction.len(); - let cells_cf = &self.db.cells.cf(); + let cells_cf = &self.db.cells().cf(); for kv in self.transaction.iter() { let key = kv.key(); let item = kv.value(); @@ -517,6 +571,7 @@ impl CellStorage { refcount::add_positive_refount(item.additions, item.data, &mut buffer); batch.merge_cf(cells_cf, key.as_slice(), &buffer); } + total }) } @@ -528,6 +583,7 @@ impl CellStorage { &herd, &self.raw_cells_cache, split_at, + split_by_partitions, capacity, ); @@ -548,15 +604,15 @@ impl CellStorage { data: Option<&'a [u8]>, } - struct Context<'a> { - db: &'a CellsDb, + struct Context<'a, D: CellsDbOps> { + db: &'a D, raw_cells_cache: &'a RawCellsCache, alloc: &'a Bump, transaction: FastHashMap<&'a HashBytes, AddedCell<'a>>, buffer: Vec, } - impl<'a> Context<'a> { + impl<'a, D: CellsDbOps> Context<'a, D> { fn insert_cell( &mut self, cell: &'a DynCell, @@ -596,7 +652,7 @@ impl CellStorage { fn finalize(mut self, batch: &mut rocksdb::WriteBatch) -> usize { let total = self.transaction.len(); - let cells_cf = &self.db.cells.cf(); + let cells_cf = &self.db.cells().cf(); for (key, item) in self.transaction { self.buffer.clear(); @@ -658,26 +714,103 @@ impl CellStorage { self: &Arc, hash: &HashBytes, epoch: u32, + ) -> Result, CellStorageError> { + self.load_cell_ext(hash, epoch, None) + } + + pub fn load_cell_ext( + self: &Arc, + hash: &HashBytes, + epoch: u32, + shard_router: Option, ) -> Result, CellStorageError> { #[cfg(feature = "cells-metrics")] let _histogram = HistogramGuard::begin("tycho_storage_load_cell_time"); + let need_replace_cell_in_cache = + |cached_router: &Option, load_router: &Option| { + match (cached_router, load_router) { + (Some(cached), Some(required)) if cached.eq_by_inner_refs(required) => { + // do not reload cell if routers match + false + } + (_, None) => { + // do not reload cell if loading without router + false + } + _ => { + // do not use cached cell + true + } + } + }; + + tracing::trace!( + storage_shard = ?self.part_shard_prefix.as_ref().map(DisplayShardPrefix), + %hash, + load_router = ?shard_router, + "try load cell", + ); + + // try to take cell from the cache if let Some(cell) = self.cells_cache.get(hash) && cell.epoch.saturating_add(self.drop_interval) >= epoch && let Some(cell) = cell.weak.upgrade() + // cell may be cached before without shard router + // when we reload cell with specified shard router we should replace it in the cache + && !need_replace_cell_in_cache(&cell.shard_router, &shard_router) { + tracing::trace!( + storage_shard = ?self.part_shard_prefix.as_ref().map(DisplayShardPrefix), + %hash, + cell_shard_router = ?cell.shard_router, + load_router = ?shard_router, + "return cell from cache", + ); return Ok(cell); } + // load cell from separate storage partition if required + if self.part_shard_prefix.is_none() + && let Some(CellShardRouter::Shard { shard_prefix }) = &shard_router + && let Some(storage_part) = self + .storage_parts + .as_ref() + .and_then(|parts| parts.get(shard_prefix)) + .cloned() + { + let cell = storage_part + .cell_storage() + .load_cell_ext(hash, epoch, None)?; + return Ok(cell); + } + + // fallback: if no shard partition storage, do not inherit router + let target_shard_router = match &shard_router { + Some(CellShardRouter::Shard { .. }) => None, + _ => shard_router.clone(), + }; + + // otherwise load from raw cache or db let mut cell = match self.raw_cells_cache.get_raw(&self.cells_db, hash) { - Ok(Some(value)) => match StorageCell::deserialize(self.clone(), &value.slice, epoch) { - Some(cell) => Arc::new(cell), - None => return Err(CellStorageError::InvalidCell), - }, - Ok(None) => return Err(CellStorageError::CellNotFound), + Ok(Some(value)) => { + match StorageCell::deserialize( + self.clone(), + &value.slice, + epoch, + target_shard_router.clone(), + ) { + Some(cell) => Arc::new(cell), + None => return Err(CellStorageError::InvalidCell), + } + } + Ok(None) => return Err(CellStorageError::CellNotFound(*hash)), Err(e) => return Err(CellStorageError::Internal(e)), }; + let mut replaced_in_cache = false; + let mut loaded_prev_from_cache = false; + let has_new; match self.cells_cache.entry(*hash) { dashmap::Entry::Vacant(entry) => { @@ -689,20 +822,35 @@ impl CellStorage { } dashmap::Entry::Occupied(mut entry) => { has_new = false; - if entry.get().epoch >= epoch + let cached_epoch = entry.get().epoch; + if cached_epoch >= epoch && let Some(mut prev) = entry.get().weak.upgrade() + // we should replace cell in the cache if reloaded with specified shard + && !need_replace_cell_in_cache(&prev.shard_router, &cell.shard_router) { drop(entry); std::mem::swap(&mut prev, &mut cell); + loaded_prev_from_cache = true; } else { entry.insert(CachedCell { epoch, weak: Arc::downgrade(&cell), }); + replaced_in_cache = true; } } }; + tracing::trace!( + storage_shard = ?self.part_shard_prefix.as_ref().map(DisplayShardPrefix), + %hash, + cell_shard_router = ?cell.shard_router, + load_router = ?shard_router, + loaded_prev_from_cache, + replaced_in_cache, + "cell loaded", + ); + if has_new { #[cfg(feature = "cells-metrics")] metrics::gauge!("tycho_storage_cells_tree_cache_size").increment(1f64); @@ -716,6 +864,7 @@ impl CellStorage { herd: &Herd, root: &HashBytes, split_at: FastHashSet, + split_by_partitions: bool, ) -> Result<(usize, WriteBatch), CellStorageError> { type RemoveResult = Result<(), CellStorageError>; @@ -724,12 +873,14 @@ impl CellStorage { buffer: Vec, } - struct RemoveContext<'a> { - db: &'a CellsDb, + struct RemoveContext<'a, D: CellsDbOps> { + db: &'a D, herd: &'a Herd, raw_cache: &'a RawCellsCache, /// Subtrees to process in parallel. split_at: FastHashSet, + /// Indicates that accounts are splitted by partitions + split_by_partitions: bool, // TODO: Use `&'a HashBytes` for key? // Pros: // - Less `memcpy` calls; @@ -744,18 +895,20 @@ impl CellStorage { delayed_removes: std::sync::Mutex>, } - impl<'a> RemoveContext<'a> { + impl<'a, D: CellsDbOps> RemoveContext<'a, D> { fn new( - db: &'a CellsDb, + db: &'a D, herd: &'a Herd, raw_cache: &'a RawCellsCache, split_at: FastHashSet, + split_by_partitions: bool, ) -> Self { Self { db, raw_cache, herd, split_at, + split_by_partitions, transaction: FastDashMap::with_capacity_and_hasher_and_shard_amount( 128, Default::default(), @@ -789,33 +942,37 @@ impl CellStorage { }; for child_hash in iter.by_ref() { - // Skip cell to remove it later in parallel - if self.split_at.contains(child_hash) { - let mut delayed_removes = self.delayed_removes.lock().unwrap(); - match delayed_removes.entry(*child_hash) { - hash_map::Entry::Vacant(entry) => { - // This subtree will be removed by another thread, - // so no removes is needed on first occurrence. - entry.insert(0); - drop(delayed_removes); - - // Spawn processing. - // TODO: Handle error properly. - scope.spawn(|| { - self.traverse_cell(child_hash, scope).unwrap(); - }); - } - hash_map::Entry::Occupied(mut entry) => { - // Other thread will remove this subtree only once, - // so we need to adjust references to keep them in sync. - *entry.get_mut() += 1; + let split_at_reached = self.split_at.contains(child_hash); + if split_at_reached { + if self.split_by_partitions { + // do not try to remove subtree in parallel if split is by partitions + // subtree will be removed from partition in a separate task + continue 'outer; + } else { + // Skip cell to remove it later in parallel + let mut delayed_removes = self.delayed_removes.lock().unwrap(); + match delayed_removes.entry(*child_hash) { + hash_map::Entry::Vacant(entry) => { + // This subtree will be removed by another thread, + // so no removes is needed on first occurrence. + entry.insert(0); + drop(delayed_removes); + + // Spawn processing. + scope.spawn(|| self.traverse_cell(child_hash, scope)); + } + hash_map::Entry::Occupied(mut entry) => { + // Other thread will remove this subtree only once, + // so we need to adjust references to keep them in sync. + *entry.get_mut() += 1; + } } - } - continue 'outer; + continue 'outer; + } } - // Process the current cell. + // Process the current cell let refs = self.remove_cell(child_hash, &mut alloc)?; if let Some(refs) = refs { @@ -929,7 +1086,7 @@ impl CellStorage { // Merge transaction items into the final batch. let total = self.transaction.len(); - let cells_cf = &self.db.cells.cf(); + let cells_cf = &self.db.cells().cf(); for kv in self.transaction.iter() { let key = kv.key(); let item = kv.value(); @@ -945,7 +1102,13 @@ impl CellStorage { } } - let ctx = RemoveContext::new(&self.cells_db, herd, &self.raw_cells_cache, split_at); + let ctx = RemoveContext::new( + &self.cells_db, + herd, + &self.raw_cells_cache, + split_at, + split_by_partitions, + ); std::thread::scope(|scope| ctx.traverse_cell(root, scope))?; @@ -963,7 +1126,7 @@ impl CellStorage { alloc: &Bump, hash: &HashBytes, ) -> Result<(usize, WriteBatch), CellStorageError> { - let cells = &self.cells_db.cells; + let cells = self.cells_db.cells(); let cells_cf = &cells.cf(); let mut transaction: FastHashMap<&HashBytes, RemovedCell<'_>> = @@ -1078,8 +1241,8 @@ impl<'a> RemovedCell<'a> { #[derive(thiserror::Error, Debug)] pub enum CellStorageError { - #[error("Cell not found in cell db")] - CellNotFound, + #[error("Cell not found in cell db (hash: {0})")] + CellNotFound(HashBytes), #[error("Invalid cell")] InvalidCell, #[error("Cell counter mismatch: expected refcount {expected}, got {actual} removes")] @@ -1088,6 +1251,46 @@ pub enum CellStorageError { Internal(#[from] rocksdb::Error), } +pub type ShardPrefix = u64; +pub type ShardStatePartitionsMap = FastHashMap; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CellShardRouter { + /// Cell belongs to specified shard, stored in a separate partition + Shard { shard_prefix: ShardPrefix }, + /// Defines that specified child is a root cell for shard accounts, + /// defines roots in descending cells to split on shard partitions + ChildIsShardAccountsRoot(u8, Arc), + /// Cell belongs to shard accounts subtree, + /// defines roots in descending cells to split on shard partitions + SplitOnPartitionsAt(Arc), +} + +impl CellShardRouter { + fn eq_by_inner_refs(&self, other: &Self) -> bool { + match (self, other) { + ( + Self::Shard { + shard_prefix: shard, + }, + Self::Shard { + shard_prefix: other_shard, + }, + ) if shard == other_shard => true, + ( + Self::ChildIsShardAccountsRoot(idx, map), + Self::ChildIsShardAccountsRoot(other_idx, other_map), + ) if idx == other_idx && Arc::ptr_eq(map, other_map) => true, + (Self::SplitOnPartitionsAt(map), Self::SplitOnPartitionsAt(other_map)) + if Arc::ptr_eq(map, other_map) => + { + true + } + _ => false, + } + } +} + pub struct StorageCell { cell_storage: Arc, descriptor: CellDescriptor, @@ -1099,6 +1302,9 @@ pub struct StorageCell { reference_states: [AtomicU8; 4], reference_data: [UnsafeCell; 4], + + /// Defines cell relation to accounts shard + shard_router: Option, } impl StorageCell { @@ -1108,7 +1314,12 @@ impl StorageCell { const HASHES_ITEM_LEN: usize = 32 + 2; - pub fn deserialize(cell_storage: Arc, buffer: &[u8], epoch: u32) -> Option { + pub fn deserialize( + cell_storage: Arc, + buffer: &[u8], + epoch: u32, + shard_router: Option, + ) -> Option { if buffer.len() < 4 { return None; } @@ -1141,6 +1352,7 @@ impl StorageCell { ); } + // preload child hashes and map to shards Some(Self { cell_storage, bit_len, @@ -1151,6 +1363,7 @@ impl StorageCell { epoch, reference_states, reference_data, + shard_router, }) } @@ -1217,14 +1430,60 @@ impl StorageCell { let current_state = state.load(Ordering::Acquire); if current_state == Self::REF_STORAGE { - return Some(unsafe { &(*slot).storage_cell }); + let cell = unsafe { &(*slot).storage_cell }; + return Some(cell); } + let child_hash = unsafe { (*slot).hash }; + + // detect shard router for child cell + let child_shard_router = match &self.shard_router { + Some(CellShardRouter::ChildIsShardAccountsRoot(idx, map)) => { + if *idx == index { + if let Some(child_prefix) = map.get(&child_hash) { + Some(CellShardRouter::Shard { + shard_prefix: *child_prefix, + }) + } else { + Some(CellShardRouter::SplitOnPartitionsAt(map.clone())) + } + } else { + None + } + } + Some(CellShardRouter::SplitOnPartitionsAt(map)) => { + if let Some(child_shard) = map.get(&child_hash) { + Some(CellShardRouter::Shard { + shard_prefix: *child_shard, + }) + } else { + self.shard_router.clone() + } + } + Some(CellShardRouter::Shard { .. }) + if self.cell_storage.part_shard_prefix.is_none() => + { + self.shard_router.clone() + } + Some(CellShardRouter::Shard { .. }) | None => None, + }; + + tracing::trace!( + storage_shard = ?self.cell_storage.part_shard_prefix.as_ref().map(DisplayShardPrefix), + index, + child_cell = %child_hash, + ?child_shard_router, + current_cell = %DynCell::repr_hash(self), + current_shard_router = ?self.shard_router, + "try load child cell", + ); + let mut res = Ok(()); - Self::initialize_inner(state, &mut || match self - .cell_storage - .load_cell(unsafe { &(*slot).hash }, self.epoch) - { + Self::initialize_inner(state, &mut || match self.cell_storage.load_cell_ext( + &child_hash, + self.epoch, + child_shard_router.clone(), + ) { Ok(cell) => unsafe { *slot = StorageCellReferenceData { storage_cell: ManuallyDrop::new(cell), @@ -1232,6 +1491,13 @@ impl StorageCell { true }, Err(err) => { + tracing::error!( + child_cell = %child_hash, + ?child_shard_router, + current_cell = %DynCell::repr_hash(self), + current_shard_router = ?self.shard_router, + "unable to load child cell", + ); res = Err(err); false } @@ -1455,9 +1721,9 @@ impl RawCellsCache { } } - fn get_raw( + fn get_raw( &self, - db: &CellsDb, + db: &D, key: &HashBytes, ) -> Result, rocksdb::Error> { use quick_cache::sync::GuardResult; @@ -1471,7 +1737,7 @@ impl RawCellsCache { self.rocksdb_access_histogram.record(started_at.elapsed()); }); - db.cells.get(key.as_slice())? + db.cells().get(key.as_slice())? }; Ok(if let Some(value) = value { @@ -1492,9 +1758,9 @@ impl RawCellsCache { } } - fn get_rc_for_insert( + fn get_rc_for_insert( &self, - db: &CellsDb, + db: &D, key: &HashBytes, depth: usize, ) -> Result { @@ -1516,7 +1782,7 @@ impl RawCellsCache { } } - match db.cells.get(key).map_err(CellStorageError::Internal)? { + match db.cells().get(key).map_err(CellStorageError::Internal)? { Some(value) => { let (rc, value) = refcount::decode_value_with_rc(value.as_ref()); @@ -1530,9 +1796,9 @@ impl RawCellsCache { } } - fn get_rc_for_delete( + fn get_rc_for_delete( &self, - db: &CellsDb, + db: &D, key: &HashBytes, refs_buffer: &mut Vec, ) -> Result { @@ -1542,7 +1808,7 @@ impl RawCellsCache { if let Some(value) = self.inner.peek(key) { let rc = value.header.header.load(Ordering::Acquire); if rc <= 0 { - return Err(CellStorageError::CellNotFound); + return Err(CellStorageError::CellNotFound(*key)); } else if rc != i64::MAX { return StorageCell::deserialize_references(&value.slice, refs_buffer) .then_some(rc) @@ -1550,7 +1816,7 @@ impl RawCellsCache { } } - match db.cells.get(key.as_slice()) { + match db.cells().get(key.as_slice()) { Ok(value) => { if let Some(value) = value && let (rc, Some(value)) = refcount::decode_value_with_rc(&value) @@ -1560,7 +1826,7 @@ impl RawCellsCache { .ok_or(CellStorageError::InvalidCell); } - Err(CellStorageError::CellNotFound) + Err(CellStorageError::CellNotFound(*key)) } Err(e) => Err(CellStorageError::Internal(e)), } diff --git a/core/src/storage/shard_state/mod.rs b/core/src/storage/shard_state/mod.rs index c39f3d058c..0eaa9b04d5 100644 --- a/core/src/storage/shard_state/mod.rs +++ b/core/src/storage/shard_state/mod.rs @@ -1,15 +1,20 @@ use std::fs::File; +use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::time::Instant; use anyhow::{Context, Result}; +use bytes::Buf; use bytesize::ByteSize; +use futures_util::StreamExt; +use futures_util::stream::FuturesUnordered; +use tokio::sync::OwnedMutexGuard; use tycho_block_util::block::*; -use tycho_block_util::dict::split_aug_dict_raw; +use tycho_block_util::dict::{split_aug_dict_raw_by_shards, split_dict_raw}; use tycho_block_util::state::*; use tycho_storage::fs::TempFileStorage; -use tycho_storage::kv::StoredValue; +use tycho_storage::kv::{StoredValue, StoredValueBuffer}; use tycho_types::models::*; use tycho_types::prelude::*; use tycho_util::mem::Reclaimer; @@ -19,14 +24,232 @@ use weedb::rocksdb; use self::cell_storage::*; use self::store_state_raw::StoreStateContext; -use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb}; +use super::{BlockFlags, BlockHandle, BlockHandleStorage, BlockStorage, CellsDb, CellsPartDb}; +use crate::storage::db::{CellStorageDb, CellsDbOps}; mod cell_storage; mod entries_buffer; mod store_state_raw; +pub type StoragePartsMap = FastHashMap>; + +#[derive(Debug, Clone)] +pub struct SplitAccountEntry { + pub shard: Option, + pub cell: Cell, +} + +pub trait ShardStateStoragePart: Send + Sync { + fn shard_prefix(&self) -> ShardPrefix; + fn cell_storage(&self) -> &Arc; + fn store_accounts_subtree( + self: Arc, + block_id: &BlockId, + cell: Cell, + estimated_cell_count: usize, + split_depth: u8, + ) -> Pin> + Send>>; + fn blocking_store_accounts_subtree( + self: Arc, + block_id: &BlockId, + cell: Cell, + estimated_cell_count: usize, + split_depth: u8, + ) -> Result; + fn remove_outdated_states_in_partition( + self: Arc, + top_blocks: TopBlocks, + remaining_split_depth: u8, + ) -> Pin> + Send>>; +} + +pub struct ShardStateStoragePartImpl { + shard_prefix: ShardPrefix, + cells_db: CellsPartDb, + cell_storage: Arc, + gc_lock: Arc>, +} + +impl ShardStateStoragePart for ShardStateStoragePartImpl { + fn shard_prefix(&self) -> ShardPrefix { + self.shard_prefix + } + + fn cell_storage(&self) -> &Arc { + &self.cell_storage + } + + fn blocking_store_accounts_subtree( + self: Arc, + block_id: &BlockId, + cell: Cell, + estimated_cell_count: usize, + split_depth: u8, + ) -> Result { + self.store_accounts_subtree_impl(block_id, cell, estimated_cell_count, split_depth) + } + + fn store_accounts_subtree( + self: Arc, + block_id: &BlockId, + cell: Cell, + estimated_cell_count: usize, + split_depth: u8, + ) -> Pin> + Send>> { + let block_id = *block_id; + let fut = async move { + tokio::task::spawn_blocking(move || { + self.store_accounts_subtree_impl(&block_id, cell, estimated_cell_count, split_depth) + }) + .await? + }; + Box::pin(fut) + } + + fn remove_outdated_states_in_partition( + self: Arc, + top_blocks: TopBlocks, + remaining_split_depth: u8, + ) -> Pin> + Send>> { + let fut = self.remove_outdated_states_impl(top_blocks, remaining_split_depth); + Box::pin(fut) + } +} + +impl ShardStateStoragePartImpl { + pub fn new( + shard_prefix: ShardPrefix, + cells_db: CellsPartDb, + cache_size_bytes: ByteSize, + drop_interval: u32, + ) -> Self { + let cell_storage = CellStorage::new_for_shard( + cells_db.clone(), + cache_size_bytes, + drop_interval, + shard_prefix, + ); + Self { + shard_prefix, + cells_db, + cell_storage, + gc_lock: Default::default(), + } + } + + fn store_accounts_subtree_impl( + self: Arc, + block_id: &BlockId, + cell: Cell, + estimated_cell_count: usize, + split_depth: u8, + ) -> Result { + // check if subtree already stored + let check_if_subtree_already_stored = || { + let mut already_stored = false; + if let Some(value) = self.cells_db.shard_states.get(block_id.to_vec())? { + let entry = ShardStateEntry::from_slice(value.as_ref()); + already_stored = *cell.repr_hash() == entry.root_hash; + } + Ok::<_, anyhow::Error>(already_stored) + }; + + if check_if_subtree_already_stored()? { + return Ok(0); + } + + let _guard = { + let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high"); + self.gc_lock.clone().blocking_lock_owned() + }; + + // Double check if subtree already stored + if check_if_subtree_already_stored()? { + return Ok(0); + } + + let raw_db = self.cells_db.rocksdb().clone(); + let cf = self.cells_db.shard_states.get_unbounded_cf(); + let cell_storage = self.cell_storage.clone(); + + let estimated_update_size_bytes = estimated_cell_count * 192; // p50 cell size in bytes + let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes); + + let split_at = split_accounts_subtree(cell.clone(), split_depth)?; + + let new_cell_count = cell_storage.store_cell_mt( + cell.as_ref(), + &mut batch, + split_at, + false, + estimated_cell_count, + )?; + + let entry = ShardStateEntry { + root_hash: *cell.repr_hash(), + partitions: None, + }; + batch.put_cf(&cf.bound(), block_id.to_vec(), entry.to_vec()); + + raw_db.write(batch)?; + + tracing::debug!( + shard = %DisplayShardPrefix(&self.shard_prefix), + %block_id, + subtree_root_hash = %cell.repr_hash(), + new_cell_count, + "stored subtree in storage part" + ); + + Reclaimer::instance().drop(cell); + + Ok(new_cell_count) + } + + async fn remove_outdated_states_impl( + self: Arc, + top_blocks: TopBlocks, + remaining_split_depth: u8, + ) -> Result { + let cells_db = CellStorageDb::Part(self.cells_db.clone()); + + let mut cx = RemoveOutdatedStatesContext::new( + true, + cells_db.clone(), + self.cell_storage.clone(), + remaining_split_depth, + self.gc_lock.clone(), + ); + + cx.remove_outdated_states(&top_blocks, |removed_cells, block_id| { + tracing::debug!( + removed_cells, + %block_id, + part_shard = %DisplayShardPrefix(&self.shard_prefix), + "removed state in partition", + ); + }) + .await?; + + metrics::counter!("tycho_storage_state_gc_cells_count").increment(cx.removed_cells as u64); + + Ok(cx.removed_cells) + } +} + +pub struct ShardStateStorageContext { + pub cells_db: CellsDb, + pub block_handle_storage: Arc, + pub block_storage: Arc, + pub temp_file_storage: TempFileStorage, + pub cache_size_bytes: ByteSize, + pub drop_interval: u32, + pub part_split_depth: u8, + pub storage_parts: Arc, +} + pub struct ShardStateStorage { - cells_db: CellsDb, + cells_db: CellStorageDb, block_handle_storage: Arc, block_storage: Arc, @@ -39,22 +262,42 @@ pub struct ShardStateStorage { max_new_sc_cell_count: AtomicUsize, accounts_split_depth: u8, + + /// The target split depth of shard accounts cells on partitions. + /// E.g. `3` -> `2^2=8` partitions + part_split_depth: u8, + storage_parts: Option>, } impl ShardStateStorage { - // TODO: Replace args with a config. - pub fn new( - cells_db: CellsDb, - block_handle_storage: Arc, - block_storage: Arc, - temp_file_storage: TempFileStorage, - cache_size_bytes: ByteSize, - drop_interval: u32, - ) -> Result> { - let cell_storage = CellStorage::new(cells_db.clone(), cache_size_bytes, drop_interval); + pub fn new(cx: ShardStateStorageContext) -> Result> { + let ShardStateStorageContext { + cells_db, + block_handle_storage, + block_storage, + temp_file_storage, + cache_size_bytes, + drop_interval, + part_split_depth, + storage_parts, + } = cx; + + let expected_parts_count = if part_split_depth == 0 { + 0 + } else { + 1usize << part_split_depth + }; + assert_eq!(storage_parts.len(), expected_parts_count); + + let cell_storage = CellStorage::new( + cells_db.clone(), + cache_size_bytes, + drop_interval, + Some(storage_parts.clone()), + ); Ok(Arc::new(Self { - cells_db, + cells_db: CellStorageDb::Main(cells_db), block_handle_storage, block_storage, temp_file_storage, @@ -64,9 +307,15 @@ impl ShardStateStorage { max_new_mc_cell_count: AtomicUsize::new(0), max_new_sc_cell_count: AtomicUsize::new(0), accounts_split_depth: 4, + part_split_depth, + storage_parts: Some(storage_parts), })) } + fn uses_partitions(&self) -> bool { + self.part_split_depth > 0 + } + pub fn metrics(&self) -> ShardStateStorageMetrics { ShardStateStorageMetrics { max_new_mc_cell_count: self.max_new_mc_cell_count.swap(0, Ordering::AcqRel), @@ -101,12 +350,14 @@ impl ShardStateStorage { .await } + #[tracing::instrument(skip_all, fields(block_id = %handle.id().as_short_id()))] pub async fn store_state_root( &self, handle: &BlockHandle, root_cell: Cell, hint: StoreStateHint, ) -> Result { + // check if the main state and parts are already stored if handle.has_state() { return Ok(false); } @@ -115,76 +366,214 @@ impl ShardStateStorage { let _hist = HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high"); self.gc_lock.clone().lock_owned().await }; + let mut gc_lock = Some(gc_lock); - // Double check if the state is already stored + // Double check if the main state and parts are already stored if handle.has_state() { return Ok(false); } + let _hist = HistogramGuard::begin("tycho_storage_state_store_time"); let block_id = *handle.id(); - let raw_db = self.cells_db.rocksdb().clone(); - let cf = self.cells_db.shard_states.get_unbounded_cf(); - let cell_storage = self.cell_storage.clone(); - let block_handle_storage = self.block_handle_storage.clone(); - let handle = handle.clone(); - let accounts_split_depth = self.accounts_split_depth; - // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task. - let (new_cell_count, updated) = tokio::task::spawn_blocking(move || { - let root_hash = *root_cell.repr_hash(); - let estimated_merkle_update_size = hint.estimate_cell_count(); + let estimated_merkle_update_size = hint.estimate_cell_count(); - let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes - let mut batch = rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes); + // calculate split depth to store accounts in partitions + // or to store in main in parallel + let (accounts_split_depth, remaining_split_depth) = + calc_split_depth(self.accounts_split_depth, self.part_split_depth); - let in_mem_store = HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high"); + // NOTE: we split shard state only for shard blocks + let split_at = if !block_id.is_masterchain() { + split_shard_accounts(&block_id.shard, &root_cell, accounts_split_depth)? + } else { + FastHashMap::default() + }; - let new_cell_count = if block_id.is_masterchain() { - cell_storage.store_cell( - &mut batch, - root_cell.as_ref(), - estimated_merkle_update_size, - )? + // compute partitions info for ShardStateEntry + let partitions_info = + if !block_id.is_masterchain() && self.uses_partitions() && !split_at.is_empty() { + let mut partitions = Vec::new(); + for (hash, entry) in &split_at { + if let Some(shard) = &entry.shard { + partitions.push(ShardStatePartition { + hash: *hash, + prefix: shard.prefix(), + }); + } + } + partitions.sort_unstable(); + Some(partitions) } else { - let split_at = split_shard_accounts(&root_cell, accounts_split_depth)?; - - cell_storage.store_cell_mt( - root_cell.as_ref(), - &mut batch, - split_at, - estimated_merkle_update_size, - )? + None }; - in_mem_store.finish(); - metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64); + // run store tasks in partitions + let mut part_store_tasks = FuturesUnordered::new(); + if !handle.has_state_parts() && !block_id.is_masterchain() { + tracing::debug!(accounts_split_depth, remaining_split_depth, ?split_at); + + if let Some(storage_parts) = &self.storage_parts { + // estimate partition cells count + let denom = 1 << (self.part_split_depth.saturating_add(1)); + let part_estimated_cells_count = + (estimated_merkle_update_size / denom.max(1)).max(1); + + for v in split_at.values() { + if let Some(shard) = &v.shard + && let Some(storage_part) = storage_parts.get(&shard.prefix()) + { + let storage_part = storage_part.clone(); + part_store_tasks.push(tokio::spawn(storage_part.store_accounts_subtree( + &block_id, + v.cell.clone(), + part_estimated_cells_count, + remaining_split_depth, + ))); + } + } + } + } - batch.put_cf(&cf.bound(), block_id.to_vec(), root_hash.as_slice()); + // store main state in a separate task + let mut new_cell_count = 0; + let mut updated = false; + if !handle.has_state_main() { + let root_hash = *root_cell.repr_hash(); - let hist = HistogramGuard::begin("tycho_storage_state_update_time_high"); - metrics::histogram!("tycho_storage_state_update_size_bytes") - .record(batch.size_in_bytes() as f64); - metrics::histogram!("tycho_storage_state_update_size_predicted_bytes") - .record(estimated_update_size_bytes as f64); + let block_handle_storage = self.block_handle_storage.clone(); + let handle = handle.clone(); - raw_db.write(batch)?; + let update_block_handle = move || { + // update block handle flags that main state stored + let updated = handle.meta().add_flags(BlockFlags::HAS_STATE_MAIN); + if updated { + block_handle_storage.store_handle(&handle, false); + } + updated + }; - Reclaimer::instance().drop(root_cell); + // check if state already stored + let mut already_stored = false; + if let Some(value) = self.cells_db.shard_states().get(block_id.to_vec())? { + let entry = ShardStateEntry::from_slice(value.as_ref()); + if entry.root_hash == root_hash && entry.partitions == partitions_info { + already_stored = true; + } + } - hist.finish(); + if already_stored { + // update block handle flags that main state stored + updated = update_block_handle(); - let updated = handle.meta().add_flags(BlockFlags::HAS_STATE); - if updated { - block_handle_storage.store_handle(&handle, false); + // NOTE: Ensure that GC lock is dropped only after storing the state. + drop(gc_lock.take()); + } else { + // NOTE: `spawn_blocking` is used here instead of `rayon_run` as it is IO-bound task. + let raw_db = self.cells_db.rocksdb().clone(); + let cf = self.cells_db.shard_states().get_unbounded_cf(); + let cell_storage = self.cell_storage.clone(); + let uses_partitions = self.uses_partitions(); + let gc_lock = gc_lock.take(); + (new_cell_count, updated) = tokio::task::spawn_blocking(move || { + let estimated_merkle_update_size = hint.estimate_cell_count(); + let estimated_update_size_bytes = estimated_merkle_update_size * 192; // p50 cell size in bytes + let mut batch = + rocksdb::WriteBatch::with_capacity_bytes(estimated_update_size_bytes); + + let in_mem_store = + HistogramGuard::begin("tycho_storage_cell_in_mem_store_time_high"); + + let new_cell_count = if block_id.is_masterchain() { + cell_storage.store_cell( + &mut batch, + root_cell.as_ref(), + estimated_merkle_update_size, + )? + } else { + cell_storage.store_cell_mt( + root_cell.as_ref(), + &mut batch, + split_at, + uses_partitions, + estimated_merkle_update_size, + )? + }; + + in_mem_store.finish(); + + let entry = ShardStateEntry { + root_hash, + partitions: partitions_info, + }; + batch.put_cf(&cf.bound(), block_id.to_vec(), entry.to_vec()); + + let hist = HistogramGuard::begin("tycho_storage_state_update_time_high"); + metrics::histogram!("tycho_storage_state_update_size_bytes") + .record(batch.size_in_bytes() as f64); + metrics::histogram!("tycho_storage_state_update_size_predicted_bytes") + .record(estimated_update_size_bytes as f64); + + raw_db.write(batch)?; + + Reclaimer::instance().drop(root_cell); + + hist.finish(); + + // update block handle flags that main state stored + let updated = update_block_handle(); + + // NOTE: Ensure that GC lock is dropped only after storing the state. + drop(gc_lock); + + Ok::<_, anyhow::Error>((new_cell_count, updated)) + }) + .await??; } + } - // NOTE: Ensure that GC lock is dropped only after storing the state. - drop(gc_lock); + // wait for all store tasks in partitions + let mut parts_store_failed = false; + while let Some(store_res) = part_store_tasks.next().await { + match store_res { + Ok(Ok(part_new_cells)) => { + new_cell_count += part_new_cells; + } + Ok(Err(store_error)) => { + parts_store_failed = true; + tracing::error!(?store_error, "error in store_accounts_subtree method"); + } + Err(join_error) => { + parts_store_failed = true; + tracing::error!(?join_error, "error executing store_accounts_subtree task"); + } + } + } - Ok::<_, anyhow::Error>((new_cell_count, updated)) - }) - .await??; + // update block handle flags that state parts stored + if !parts_store_failed { + let updated_state_parts = handle.meta().add_flags(BlockFlags::HAS_STATE_PARTS); + if updated_state_parts { + updated = true; + + // TODO: maybe this gc lock is excessive? + + // acquire new lock if it was dropped before + if gc_lock.is_none() { + let lock = { + let _hist = + HistogramGuard::begin("tycho_storage_cell_gc_lock_store_time_high"); + self.gc_lock.clone().lock_owned().await + }; + gc_lock = Some(lock); + } + self.block_handle_storage.store_handle(handle, false); + drop(gc_lock); + } + } + + metrics::histogram!("tycho_storage_cell_count").record(new_cell_count as f64); let count = if block_id.shard.is_masterchain() { &self.max_new_mc_cell_count @@ -222,6 +611,7 @@ impl ShardStateStorage { // knowing its `min_ref_mc_seqno` which can only be found out by // parsing the state. Creating a "Brief State" struct won't work either // because due to model complexity it is going to be error-prone. + #[tracing::instrument(skip_all, fields(block_id = %block_id.as_short_id()))] pub async fn load_state( &self, ref_by_mc_seqno: u32, @@ -230,25 +620,45 @@ impl ShardStateStorage { // NOTE: only for metrics. static MAX_KNOWN_EPOCH: AtomicU32 = AtomicU32::new(0); - let root_hash = self.load_state_root_hash(block_id)?; + let Some(shard_state) = self.load_state_entry(block_id)? else { + anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id())) + }; + + let root_hash = shard_state.root_hash; let root = self.cell_storage.load_cell(&root_hash, ref_by_mc_seqno)?; - let root = Cell::from(root as Arc<_>); + let mut root = Cell::from(root as Arc<_>); let max_known_epoch = MAX_KNOWN_EPOCH .fetch_max(ref_by_mc_seqno, Ordering::Relaxed) .max(ref_by_mc_seqno); metrics::gauge!("tycho_storage_state_max_epoch").set(max_known_epoch); + if !block_id.shard.is_masterchain() { + init_shard_partitions_router( + &mut root, + &self.cell_storage, + ref_by_mc_seqno, + &block_id.shard, + shard_state.partitions, + )?; + } + let shard_state = root.parse::>()?; let handle = self.min_ref_mc_state.insert(&shard_state); ShardStateStuff::from_state_and_root(block_id, shard_state, root, handle) } + pub fn load_state_entry(&self, block_id: &BlockId) -> Result> { + let shard_states = &self.cells_db.shard_states(); + let value = shard_states.get(block_id.to_vec())?; + let entry = value.map(|v| ShardStateEntry::from_slice(v.as_ref())); + Ok(entry) + } + pub fn load_state_root_hash(&self, block_id: &BlockId) -> Result { - let shard_states = &self.cells_db.shard_states; - let shard_state = shard_states.get(block_id.to_vec())?; - match shard_state { - Some(root) => Ok(HashBytes::from_slice(&root[..32])), + let entry = self.load_state_entry(block_id)?; + match entry { + Some(entry) => Ok(entry.root_hash), None => { anyhow::bail!(ShardStateStorageError::NotFound(block_id.as_short_id())) } @@ -267,108 +677,72 @@ impl ShardStateStorage { target_block_id = %top_blocks.mc_block, "started states GC", ); + // TODO: add overall metric for main db and partitions let started_at = Instant::now(); - let raw = self.cells_db.rocksdb(); - - // Manually get required column factory and r/w options - let snapshot = raw.snapshot(); - let shard_states_cf = self.cells_db.shard_states.get_unbounded_cf(); - let mut states_read_options = self.cells_db.shard_states.new_read_config(); - states_read_options.set_snapshot(&snapshot); - - let mut alloc = bumpalo_herd::Herd::new(); - - // Create iterator - let mut iter = raw.raw_iterator_cf_opt(&shard_states_cf.bound(), states_read_options); - iter.seek_to_first(); - - // Iterate all states and remove outdated - let mut removed_states = 0usize; - let mut removed_cells = 0usize; - loop { - let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high"); - let (key, value) = match iter.item() { - Some(item) => item, - None => match iter.status() { - Ok(()) => break, - Err(e) => return Err(e.into()), - }, - }; - - let block_id = BlockId::from_slice(key); - let root_hash = HashBytes::from_slice(value); - - // Skip blocks from zero state and top blocks - if block_id.seqno == 0 - || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno) - { - iter.next(); - continue; + // calculate split depth + let (accounts_split_depth, remaining_split_depth) = + calc_split_depth(self.accounts_split_depth, self.part_split_depth); + + // run remove tasks in partitions + let mut part_remove_tasks = FuturesUnordered::new(); + if let Some(storage_parts) = &self.storage_parts { + for (_, storage_part) in storage_parts.iter() { + let storage_part = storage_part.clone(); + part_remove_tasks.push(tokio::spawn( + storage_part.remove_outdated_states_in_partition( + top_blocks.clone(), + remaining_split_depth, + ), + )); } + } - alloc.reset(); - - let guard = { - let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high"); - self.gc_lock.clone().lock_owned().await - }; - - let db = self.cells_db.clone(); - let cell_storage = self.cell_storage.clone(); - let key = key.to_vec(); - let accounts_split_depth = self.accounts_split_depth; - let (total, inner_alloc) = tokio::task::spawn_blocking(move || { - let in_mem_remove = - HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high"); - - let (stats, mut batch) = if block_id.is_masterchain() { - cell_storage.remove_cell(alloc.get().as_bump(), &root_hash)? - } else { - // NOTE: We use epoch `0` here so that cells of old states - // will not be used by recent loads. - let root_cell = Cell::from(cell_storage.load_cell(&root_hash, 0)? as Arc<_>); - - let split_at = split_shard_accounts(&root_cell, accounts_split_depth)? - .into_keys() - .collect::>(); - cell_storage.remove_cell_mt(&alloc, &root_hash, split_at)? - }; - - in_mem_remove.finish(); - - batch.delete_cf(&db.shard_states.get_unbounded_cf().bound(), key); - db.raw() - .rocksdb() - .write_opt(batch, db.cells.write_config())?; - - // NOTE: Ensure that guard is dropped only after writing the batch. - drop(guard); - - Ok::<_, anyhow::Error>((stats, alloc)) - }) - .await??; - - removed_cells += total; - alloc = inner_alloc; // Reuse allocation without passing alloc by ref - - tracing::debug!(removed_cells = total, %block_id); - - removed_states += 1; - iter.next(); + // remove state data in main db + let mut cx = RemoveOutdatedStatesContext::new( + false, + self.cells_db.clone(), + self.cell_storage.clone(), + accounts_split_depth, + self.gc_lock.clone(), + ); - metrics::counter!("tycho_storage_state_gc_count").increment(1); - metrics::counter!("tycho_storage_state_gc_cells_count").increment(1); + cx.remove_outdated_states(&top_blocks, |removed_cells, block_id| { + tracing::debug!(removed_cells, %block_id, "removed state"); if block_id.is_masterchain() { metrics::gauge!("tycho_gc_states_seqno").set(block_id.seqno as f64); } - tracing::debug!(removed_states, removed_cells, %block_id, "removed state"); + }) + .await?; + + metrics::counter!("tycho_storage_state_gc_count").increment(cx.removed_states as u64); + metrics::counter!("tycho_storage_state_gc_cells_count").increment(cx.removed_cells as u64); + + // wait for all remove tasks in partitions + while let Some(remove_res) = part_remove_tasks.next().await { + match remove_res { + Ok(Ok(removed_cells)) => { + cx.removed_cells += removed_cells; + } + Ok(Err(remove_error)) => { + tracing::error!( + ?remove_error, + "error in remove_outdated_states_in_partition method" + ); + } + Err(join_error) => { + tracing::error!( + ?join_error, + "error executing remove_outdated_states_in_partition task" + ); + } + } } // Done tracing::info!( - removed_states, - removed_cells, + removed_states = cx.removed_states, + removed_cells = cx.removed_cells, block_id = %top_blocks.mc_block, elapsed_sec = started_at.elapsed().as_secs_f64(), "finished states GC", @@ -379,7 +753,7 @@ impl ShardStateStorage { /// Searches for an edge with the least referenced masterchain block /// /// Returns `None` if all states are recent enough - pub async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result> { + async fn compute_recent_blocks(&self, mut mc_seqno: u32) -> Result> { // 0. Adjust masterchain seqno with minimal referenced masterchain state if let Some(min_ref_mc_seqno) = self.min_ref_mc_state.seqno() && min_ref_mc_seqno < mc_seqno @@ -441,7 +815,7 @@ impl ShardStateStorage { mc_seqno: u32, snapshot: &rocksdb::Snapshot<'_>, ) -> Result> { - let shard_states = &self.cells_db.shard_states; + let shard_states = self.cells_db.shard_states(); let mut bound = BlockId { shard: ShardIdent::MASTERCHAIN, @@ -466,6 +840,203 @@ impl ShardStateStorage { } } +#[derive(Clone)] +struct RemoveOutdatedStatesContext { + in_partition: bool, + + cells_db: CellStorageDb, + cell_storage: Arc, + + split_depth: u8, + + gc_lock: Arc>, + + // alloc: bumpalo_herd::Herd, + removed_cells: usize, + removed_states: usize, +} + +impl RemoveOutdatedStatesContext { + fn new( + in_partition: bool, + cells_db: CellStorageDb, + cell_storage: Arc, + split_depth: u8, + gc_lock: Arc>, + ) -> Self { + Self { + in_partition, + cells_db, + cell_storage, + split_depth, + gc_lock, + removed_cells: 0, + removed_states: 0, + } + } + + async fn acquire_gc_lock(&self) -> OwnedMutexGuard<()> { + let _h = HistogramGuard::begin("tycho_storage_cell_gc_lock_remove_time_high"); + self.gc_lock.clone().lock_owned().await + } + + fn create_states_iterator_to_top_blocks( + cells_db: &CellStorageDb, + top_blocks: &TopBlocks, + ) -> impl Iterator> { + let raw = cells_db.rocksdb(); + + // Manually get required column factory and r/w options + let snapshot = raw.snapshot(); + let shard_states_cf = cells_db.shard_states().get_unbounded_cf(); + let mut states_read_options = cells_db.shard_states().new_read_config(); + states_read_options.set_snapshot(&snapshot); + + // Create iterator + let iter = raw.iterator_cf_opt( + &shard_states_cf.bound(), + states_read_options, + rocksdb::IteratorMode::Start, + ); + + // skip zerostae and states equal or above top blocks + iter.filter_map(|item| match item { + Ok((key, value)) => { + let block_id = BlockId::from_slice(&key); + if block_id.seqno == 0 + || top_blocks.contains_shard_seqno(&block_id.shard, block_id.seqno) + { + None + } else { + Some(Ok((block_id, ShardStateEntry::from_slice(value.as_ref())))) + } + } + Err(err) => Some(Err(err)), + }) + } + + fn blocking_remove_state( + &mut self, + block_id: BlockId, + entry: ShardStateEntry, + guard: OwnedMutexGuard<()>, + mut alloc: bumpalo_herd::Herd, + ) -> Result { + alloc.reset(); + + let in_mem_remove = HistogramGuard::begin("tycho_storage_cell_in_mem_remove_time_high"); + + let (removed_cells, mut batch) = if block_id.is_masterchain() { + self.cell_storage + .remove_cell(alloc.get().as_bump(), &entry.root_hash)? + } else { + // NOTE: We use epoch `0` here so that cells of old states + // will not be used by recent loads. + let root_cell = Cell::from(self.cell_storage.load_cell(&entry.root_hash, 0)? as Arc<_>); + + // try to split state + let (split_at, split_by_partitions) = if self.in_partition { + // in partition we split subtree for parallel processing + let split_at = split_accounts_subtree(root_cell, self.split_depth)?; + let split_at: FastHashSet<_> = split_at.into_keys().collect(); + (split_at, false) + } else if let Some(partitions_info) = entry.partitions + && !partitions_info.is_empty() + { + // in main tree we use stored split when exists + let split_at: FastHashSet<_> = + partitions_info.into_iter().map(|p| p.hash).collect(); + (split_at, true) + } else { + // otherwise in main tree we split for parallel processing + let split_at = split_shard_accounts(&block_id.shard, root_cell, self.split_depth)?; + let split_at: FastHashSet<_> = split_at.into_keys().collect(); + (split_at, false) + }; + + self.cell_storage.remove_cell_mt( + &alloc, + &entry.root_hash, + split_at, + split_by_partitions, + )? + }; + + in_mem_remove.finish(); + + let db = &self.cells_db; + batch.delete_cf( + &db.shard_states().get_unbounded_cf().bound(), + block_id.to_vec(), + ); + db.rocksdb().write_opt(batch, db.cells().write_config())?; + + // NOTE: Ensure that guard is dropped only after writing the batch. + drop(guard); + + self.removed_cells += removed_cells; + self.removed_states += 1; + + Ok(RemoveStateResult { + removed_cells, + inner_alloc: alloc, + }) + } + + async fn remove_state( + &mut self, + block_id: BlockId, + entry: ShardStateEntry, + guard: OwnedMutexGuard<()>, + alloc: bumpalo_herd::Herd, + ) -> Result { + let mut this = self.clone(); + let (this, res) = tokio::task::spawn_blocking(move || { + let res = this.blocking_remove_state(block_id, entry, guard, alloc)?; + Ok::<_, anyhow::Error>((this, res)) + }) + .await??; + *self = this; + Ok(res) + } + + async fn remove_outdated_states( + &mut self, + top_blocks: &TopBlocks, + handle_state_removed: F, + ) -> Result<()> + where + F: Fn(usize, BlockId), + { + let cells_db = self.cells_db.clone(); + let iter = RemoveOutdatedStatesContext::create_states_iterator_to_top_blocks( + &cells_db, top_blocks, + ); + + let mut alloc = bumpalo_herd::Herd::new(); + + for item in iter { + let _hist = HistogramGuard::begin("tycho_storage_state_gc_time_high"); + let (block_id, entry) = item?; + let guard = self.acquire_gc_lock().await; + let RemoveStateResult { + removed_cells, + inner_alloc, + } = self.remove_state(block_id, entry, guard, alloc).await?; + alloc = inner_alloc; + + handle_state_removed(removed_cells, block_id); + } + + Ok(()) + } +} + +struct RemoveStateResult { + removed_cells: usize, + inner_alloc: bumpalo_herd::Herd, +} + #[derive(Default, Debug, Clone, Copy)] pub struct StoreStateHint { pub block_data_size: Option, @@ -483,6 +1054,93 @@ impl StoreStateHint { } } +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +pub struct ShardStatePartition { + pub hash: HashBytes, + pub prefix: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ShardStateEntry { + pub root_hash: HashBytes, + pub partitions: Option>, +} + +impl StoredValue for ShardStateEntry { + /// 32 bytes root hash, + /// 2 bytes partitions map size + /// ( + /// 32 bytes partition root cell hash + + /// 8 bytes shard ident prefix + /// ) * 16 max partitions + /// = 640 bytes partitions map + const SIZE_HINT: usize = 32 + 2 + (32 + 8) * 16; + + type OnStackSlice = [u8; Self::SIZE_HINT]; + + fn serialize(&self, buffer: &mut T) { + buffer.write_raw_slice(self.root_hash.as_slice()); + + if let Some(partitions) = self.partitions.as_ref() { + if partitions.is_empty() { + return; + } + + let count = partitions.len() as u16; + assert!(count <= 16, "too many shard state partitions"); + buffer.write_raw_slice(&count.to_be_bytes()); + + for partition in partitions { + buffer.write_raw_slice(partition.hash.as_slice()); + buffer.write_raw_slice(&partition.prefix.to_be_bytes()); + } + } + } + + fn deserialize(reader: &mut &[u8]) -> Self + where + Self: Sized, + { + debug_assert!(reader.remaining() >= 32); + + let root_hash = HashBytes::from_slice(&reader[..32]); + *reader = &reader[32..]; + + if reader.is_empty() { + return Self { + root_hash, + partitions: None, + }; + } + + debug_assert!(reader.remaining() >= 2); + + let count = reader.get_u16(); + if count == 0 { + return Self { + root_hash, + partitions: None, + }; + } + + let mut partitions = Vec::with_capacity(count as usize); + for _ in 0..count { + debug_assert!(reader.remaining() >= 32 + 8); + + let hash = HashBytes::from_slice(&reader[..32]); + *reader = &reader[32..]; + + let prefix = reader.get_u64(); + partitions.push(ShardStatePartition { hash, prefix }); + } + + Self { + root_hash, + partitions: Some(partitions), + } + } +} + #[derive(Debug, Copy, Clone)] pub struct ShardStateStorageMetrics { pub max_new_mc_cell_count: usize, @@ -500,10 +1158,44 @@ pub enum ShardStateStorageError { }, } +/// Calculates `(target_split_depth, remaining_split_depth)`: +/// * `target_split_depth` - target accounts split depth. Returns `part_split_depth` when `part_split_depth > 0` (partitions used). +/// * `remaining_split_depth` - remaining accounts split depth inside partitions when partitions used. +/// When we split accounts on partitions we can split accounts inside partition more and store in parallel. +fn calc_split_depth(accounts_split_depth: u8, part_split_depth: u8) -> (u8, u8) { + let target_split_depth = if part_split_depth > 0 { + part_split_depth + } else { + accounts_split_depth + }; + let mut remaining_split_depth = accounts_split_depth.saturating_sub(target_split_depth); + if accounts_split_depth >= part_split_depth { + remaining_split_depth += 1; + } + (target_split_depth, remaining_split_depth) +} + +fn split_accounts_subtree( + subtree_root_cell: Cell, + split_depth: u8, +) -> Result> { + let shards = split_dict_raw(Some(subtree_root_cell), HashBytes::BITS, split_depth) + .context("failed to split shard accounts subtree")?; + + let mut result = FastHashMap::default(); + + for (hash, cell) in shards { + result.insert(hash, SplitAccountEntry { shard: None, cell }); + } + + Ok(result) +} + fn split_shard_accounts( + state_shard: &ShardIdent, root_cell: impl AsRef, split_depth: u8, -) -> Result> { +) -> Result> { // Cell#0 - processed_upto // Cell#1 - accounts let shard_accounts = root_cell @@ -513,5 +1205,56 @@ fn split_shard_accounts( .parse::() .context("failed to load shard accounts")?; - split_aug_dict_raw(shard_accounts, split_depth).context("failed to split shard accounts") + let shards = split_aug_dict_raw_by_shards(state_shard.workchain(), shard_accounts, split_depth) + .context("failed to split shard accounts")?; + + let mut result = FastHashMap::default(); + + for (shard, dict) in shards { + if let Some(cell) = dict { + result.insert(*cell.repr_hash(), SplitAccountEntry { + shard: Some(shard), + cell, + }); + } + } + + Ok(result) +} + +#[tracing::instrument(skip_all)] +fn init_shard_partitions_router( + root_cell: &mut Cell, + cell_storage: &Arc, + ref_by_mc_seqno: u32, + state_shard: &ShardIdent, + partitions_info: Option>, +) -> Result<()> { + // get partitions info + let partitions_info = match partitions_info { + Some(p) if !p.is_empty() => p, + _ => return Ok(()), + }; + + // build router map + let split_at: FastHashMap<_, _> = partitions_info + .into_iter() + .map(|p| (p.hash, p.prefix)) + .collect(); + + tracing::debug!(%state_shard, ?split_at); + + // reload state root cell with shard router + let cell = cell_storage.load_cell_ext( + root_cell.repr_hash(), + ref_by_mc_seqno, + Some(CellShardRouter::ChildIsShardAccountsRoot( + 1, + Arc::new(split_at), + )), + )?; + + *root_cell = Cell::from(cell as Arc<_>); + + Ok(()) } diff --git a/core/src/storage/shard_state/store_state_raw.rs b/core/src/storage/shard_state/store_state_raw.rs index e2c7a254d5..69d7399b62 100644 --- a/core/src/storage/shard_state/store_state_raw.rs +++ b/core/src/storage/shard_state/store_state_raw.rs @@ -13,14 +13,16 @@ use tycho_util::io::ByteOrderRead; use tycho_util::progress_bar::*; use weedb::{BoundedCfHandle, rocksdb}; +use super::ShardStateEntry; use super::cell_storage::*; use super::entries_buffer::*; -use crate::storage::{BriefBocHeader, CellsDb, ShardStateReader}; +use crate::storage::db::{CellStorageDb, CellsDbOps}; +use crate::storage::{BriefBocHeader, ShardStateReader}; pub const MAX_DEPTH: u16 = u16::MAX - 1; pub struct StoreStateContext { - pub cells_db: CellsDb, + pub cells_db: CellStorageDb, pub cell_storage: Arc, pub temp_file_storage: TempFileStorage, } @@ -114,9 +116,9 @@ impl StoreStateContext { .open_as_mapped_mut()?; let raw = self.cells_db.rocksdb().as_ref(); - let write_options = self.cells_db.temp_cells.write_config(); + let write_options = self.cells_db.temp_cells().write_config(); - let mut ctx = FinalizationContext::new(&self.cells_db); + let mut ctx = FinalizationContext::new(self.cells_db.temp_cells().cf()); ctx.clear_temp_cells(&self.cells_db)?; // Allocate on heap to prevent big future size @@ -211,15 +213,19 @@ impl StoreStateContext { ctx.clear_temp_cells(&self.cells_db)?; let shard_state_key = block_id.to_vec(); + let entry = ShardStateEntry { + root_hash: HashBytes(*root_hash), + partitions: None, + }; self.cells_db - .shard_states - .insert(&shard_state_key, root_hash)?; + .shard_states() + .insert(&shard_state_key, entry.to_vec())?; pg.complete(); // Load stored shard state - match self.cells_db.shard_states.get(shard_state_key)? { - Some(root) => Ok(HashBytes::from_slice(&root[..32])), + match self.cells_db.shard_states().get(shard_state_key)? { + Some(value) => Ok(ShardStateEntry::from_slice(value.as_ref()).root_hash), None => Err(StoreStateError::NotFound.into()), } } @@ -235,18 +241,18 @@ struct FinalizationContext<'a> { } impl<'a> FinalizationContext<'a> { - fn new(db: &'a CellsDb) -> Self { + fn new(temp_cells_cf: BoundedCfHandle<'a>) -> Self { Self { pruned_branches: Default::default(), cell_usages: FastHashMap::with_capacity_and_hasher(128, Default::default()), entries_buffer: EntriesBuffer::new(), output_buffer: Vec::with_capacity(1 << 10), - temp_cells_cf: db.temp_cells.cf(), + temp_cells_cf, write_batch: rocksdb::WriteBatch::default(), } } - fn clear_temp_cells(&self, db: &CellsDb) -> std::result::Result<(), rocksdb::Error> { + fn clear_temp_cells(&self, db: &CellStorageDb) -> std::result::Result<(), rocksdb::Error> { let from = &[0x00; 32]; let to = &[0xff; 32]; db.rocksdb().delete_range_cf(&self.temp_cells_cf, from, to) @@ -544,7 +550,7 @@ mod test { use weedb::rocksdb::{IteratorMode, WriteBatch}; use super::*; - use crate::storage::{CoreStorage, CoreStorageConfig}; + use crate::storage::{CellsDb, CoreStorage, CoreStorageConfig}; #[tokio::test] #[ignore] @@ -586,7 +592,7 @@ mod test { let cell_storage = &storage.shard_state_storage().cell_storage; let store_ctx = StoreStateContext { - cells_db: cells_db.clone(), + cells_db: CellStorageDb::Main(cells_db.clone()), cell_storage: cell_storage.clone(), temp_file_storage: storage.context().temp_files().clone(), }; @@ -619,7 +625,8 @@ mod test { let (_, value) = state?; // check that state actually exists - let cell = cell_storage.load_cell(&HashBytes::from_slice(value.as_ref()), 0)?; + let entry = ShardStateEntry::from_slice(value.as_ref()); + let cell = cell_storage.load_cell(&entry.root_hash, 0)?; let (_, batch) = cell_storage.remove_cell(&bump, cell.hash(LevelMask::MAX_LEVEL))?; @@ -710,6 +717,7 @@ mod test { new_dict_cell.as_ref(), &mut batch, Default::default(), + false, MODIFY_COUNT * 3, )?; diff --git a/core/src/storage/tables.rs b/core/src/storage/tables.rs index e0e3a4c4d1..5aecd2ebf2 100644 --- a/core/src/storage/tables.rs +++ b/core/src/storage/tables.rs @@ -130,7 +130,7 @@ impl ColumnFamilyOptions for FullBlockIds { /// Maps `BlockId` to root cell hash /// - Key: `BlockId` -/// - Value: `[u8; 32]` +/// - Value: `ShardStateEntry` pub struct ShardStates; impl ColumnFamily for ShardStates { diff --git a/rpc/src/state/mod.rs b/rpc/src/state/mod.rs index 70787c5a3e..21e362fe85 100644 --- a/rpc/src/state/mod.rs +++ b/rpc/src/state/mod.rs @@ -661,6 +661,7 @@ impl Default for LatestBlockchainConfig { } impl Inner { + #[tracing::instrument("rpc_init", skip_all, fields(mc_block_id = %mc_block_id.as_short_id()))] async fn init(self: &Arc, mc_block_id: &BlockId) -> Result<()> { anyhow::ensure!(mc_block_id.is_masterchain(), "not a masterchain state"); diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index f674255418..56d6fdf375 100755 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -1460,12 +1460,12 @@ def collator_execution_metrics() -> RowPanel: ), create_heatmap_panel( "tycho_do_collate_one_tick_account_msgs_exec_mean_time", - "MEAN exec time in group", + "MEAN one account msgs exec time in group", labels=['workchain=~"$workchain"'], ), create_heatmap_panel( - "tycho_do_collate_one_tick_account_msgs_exec_max_time", - "MAX exec time in group", + "tycho_do_collate_one_tick_group_exec_time_high", + "One exec tick group exec time", labels=['workchain=~"$workchain"'], ), create_gauge_panel( @@ -1739,11 +1739,6 @@ def collator_time_metrics() -> RowPanel: "tycho_collator_try_collate_next_shard_block_time", "Try collate next shard block", ), - create_heatmap_panel( - "tycho_collator_import_next_anchor_time_high", - "Import next anchor time", - labels=['workchain=~"$workchain"'], - ), create_gauge_panel( "tycho_collator_anchor_importing_lag_ms", "Anchor importing lag (ms)", @@ -2993,6 +2988,11 @@ def collator_execution_manager() -> RowPanel: create_heatmap_panel( "tycho_collator_execute_ticktock_time", "Execute ticktock time" ), + create_heatmap_panel( + "tycho_collator_get_account_stuff_time_high", + "Time to get account stuff", + labels=['workchain=~"$workchain"'], + ), ] return create_row("collator: Execution Manager", metrics) diff --git a/storage/src/context.rs b/storage/src/context.rs index 6d7e79488e..e0fa938c7c 100644 --- a/storage/src/context.rs +++ b/storage/src/context.rs @@ -187,12 +187,29 @@ impl StorageContext { P: AsRef, T: NamedTables + 'static, { - let subdir = subdir.as_ref(); - tracing::debug!(subdir = %subdir.display(), "opening RocksDB instance"); + self.open_preconfigured_partition(subdir, None) + } + + pub fn open_preconfigured_partition( + &self, + dir_path: P, + partition_id: Option, + ) -> Result> + where + P: AsRef, + T: NamedTables + 'static, + { + let dir_path = dir_path.as_ref(); + tracing::debug!(dir_path = %dir_path.display(), "opening RocksDB instance"); let this = self.inner.as_ref(); - let db_dir = this.root_dir.create_subdir(subdir)?; + let db_dir = if dir_path.is_relative() { + this.root_dir.create_subdir(dir_path)? + } else { + Dir::new(dir_path)? + }; + let db = weedb::WeeDb::::builder_prepared(db_dir.path(), this.rocksdb_table_context.clone()) .with_metrics_enabled(this.config.rocksdb_enable_metrics) @@ -200,7 +217,9 @@ impl StorageContext { .build()?; if let Some(name) = db.db_name() { - self.add_rocksdb_instance(name, db.raw()); + let name_w_part = partition_id.map(|id| format!("{}-{:016x}", name, id)); + let name = name_w_part.unwrap_or_else(|| name.to_owned()); + self.add_rocksdb_instance(&name, db.raw()); } tracing::debug!(current_rocksdb_buffer_usage = ?self.rocksdb_table_context().buffer_usage());