Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ parking_lot = { workspace = true }
rayon = { workspace = true }
thiserror = { workspace = true }
tl-proto = { workspace = true }
tracing = { workspace = true }
tycho-types = { workspace = true, features = ["blake3", "rayon"] }

# local deps
Expand Down
52 changes: 52 additions & 0 deletions block-util/src/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::VecDeque;

use tycho_types::cell::HashBytes;
use tycho_types::models::ShardIdent;

Expand All @@ -14,6 +16,44 @@ mod block_proof_stuff;
mod block_stuff;
mod top_blocks;

pub fn split_shard_ident(workchain: i32, depth: u8) -> Vec<ShardIdent> {
assert!(
depth <= ShardIdent::MAX_SPLIT_DEPTH,
"split depth is too big",
);
struct SplitShardCx {
shard: ShardIdent,
remaining_split_depth: u8,
}
let mut split_queue = VecDeque::new();
split_queue.push_back(SplitShardCx {
shard: ShardIdent::new_full(workchain),
remaining_split_depth: depth,
});
let mut shards = vec![];
while let Some(shard_split_cx) = split_queue.pop_front() {
if shard_split_cx.remaining_split_depth > 0
&& let Some((left, right)) = shard_split_cx.shard.split()
{
let remaining_split_depth = shard_split_cx.remaining_split_depth.saturating_sub(1);
if remaining_split_depth > 0 {
split_queue.push_back(SplitShardCx {
shard: left,
remaining_split_depth,
});
split_queue.push_back(SplitShardCx {
shard: right,
remaining_split_depth,
});
} else {
shards.push(left);
shards.push(right);
}
}
}
shards
}

pub fn shard_ident_at_depth(workchain: i32, account: &HashBytes, depth: u8) -> ShardIdent {
assert!(
depth <= ShardIdent::MAX_SPLIT_DEPTH,
Expand All @@ -31,6 +71,18 @@ pub fn shard_ident_at_depth(workchain: i32, account: &HashBytes, depth: u8) -> S
ShardIdent::new(workchain, (prefix & mask) | tag).expect("computed prefix should be valid")
}

pub struct DisplayShardPrefix<'a>(pub &'a u64);
impl std::fmt::Display for DisplayShardPrefix<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("{:016x}", self.0))
}
}
impl std::fmt::Debug for DisplayShardPrefix<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
80 changes: 79 additions & 1 deletion block-util/src/dict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,14 @@ where
if prefix_len > 0 {
builder.store_uint(shard.prefix() >> (64 - prefix_len), prefix_len)?;
}
tracing::debug!(
depth,
%shard,
hash = ?dict.dict().root().as_ref().map(|c| c.repr_hash()),
left_shard = %left_shard_ident,
right_shard = %right_shard_ident,
"split_aug_dict step",
);
dict.split_by_prefix(&builder.as_data_slice())?
};

Expand Down Expand Up @@ -211,6 +219,15 @@ where
K: DictKey,
A: Default,
{
let (dict, _) = dict.into_parts();
split_dict_raw(dict.into_root(), K::BITS, depth)
}

pub fn split_dict_raw(
dict: Option<Cell>,
key_bit_len: u16,
depth: u8,
) -> Result<FastHashMap<HashBytes, Cell>, Error> {
fn split_dict_impl(
dict: Option<Cell>,
key_bit_len: u16,
Expand Down Expand Up @@ -241,8 +258,69 @@ where
let mut shards =
FastHashMap::with_capacity_and_hasher(2usize.pow(depth as _), Default::default());

split_dict_impl(dict, key_bit_len, depth, &mut shards)?;

Ok(shards)
}

pub fn split_aug_dict_raw_by_shards<K, A, V>(
workchain: i32,
dict: AugDict<K, A, V>,
depth: u8,
) -> Result<Vec<(ShardIdent, Option<Cell>)>, Error>
where
K: DictKey,
A: Default,
{
fn split_dict_impl(
shard: &ShardIdent,
dict: Option<Cell>,
key_bit_len: u16,
depth: u8,
shards: &mut Vec<(ShardIdent, Option<Cell>)>,
) -> Result<(), Error> {
if depth == 0 {
shards.push((*shard, dict));
return Ok(());
}

let Some((left_shard, right_shard)) = shard.split() else {
shards.push((*shard, dict));
return Ok(());
};

let PartialSplitDict {
remaining_bit_len,
left_branch,
right_branch,
} = dict_split_raw(dict.as_ref(), key_bit_len, Cell::empty_context())?;

split_dict_impl(
&left_shard,
left_branch,
remaining_bit_len,
depth - 1,
shards,
)?;
split_dict_impl(
&right_shard,
right_branch,
remaining_bit_len,
depth - 1,
shards,
)
}

let mut shards = Vec::with_capacity(2usize.pow(depth as _));

let (dict_root, _) = dict.into_parts();
split_dict_impl(dict_root.into_root(), K::BITS, depth, &mut shards)?;
split_dict_impl(
&ShardIdent::new_full(workchain),
dict_root.into_root(),
K::BITS,
depth,
&mut shards,
)?;

Ok(shards)
}
Expand Down
2 changes: 2 additions & 0 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl Phase<FinalizeState> {
))
}

#[tracing::instrument(skip_all)]
pub fn finalize_block(
mut self,
ctx: FinalizeBlockContext,
Expand Down Expand Up @@ -1179,6 +1180,7 @@ impl Phase<FinalizeState> {
Ok(min_ref_mc_seqno)
}

#[tracing::instrument(skip_all)]
fn build_accounts(
executor: MessagesExecutor,
config_address: &HashBytes,
Expand Down
23 changes: 13 additions & 10 deletions collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ impl MessagesExecutor {
tracing::trace!(target: tracing_targets::EXEC_MANAGER, "execute messages group");

let labels = &[("workchain", self.shard_id.workchain().to_string())];
let _hist = HistogramGuard::begin_with_labels(
"tycho_do_collate_one_tick_group_exec_time_high",
labels,
);

let mut ext_msgs_skipped = 0;

let group_horizontal_size = msg_group.len();
Expand All @@ -116,7 +121,6 @@ impl MessagesExecutor {
let mut items = Vec::with_capacity(group_messages_count);
let mut ext_msgs_error_count = 0;

let mut max_account_msgs_exec_time = Duration::ZERO;
let mut total_exec_time = Duration::ZERO;

let mut group_gas = 0u128;
Expand Down Expand Up @@ -147,7 +151,6 @@ impl MessagesExecutor {
for executed in result {
self.save_subgroup_result(
&mut ext_msgs_skipped,
&mut max_account_msgs_exec_time,
&mut total_exec_time,
&mut ext_msgs_error_count,
&mut group_max_vert_size,
Expand All @@ -173,7 +176,6 @@ impl MessagesExecutor {
group_horizontal_size, group_max_vert_size, group_accounts_count,
total_exec_time = %format_duration(total_exec_time),
mean_account_msgs_exec_time = %format_duration(mean_account_msgs_exec_time),
max_account_msgs_exec_time = %format_duration(max_account_msgs_exec_time),
group_messages_count, group_gas, group_exec_wu,
"execute_group",
);
Expand All @@ -191,11 +193,6 @@ impl MessagesExecutor {
labels
)
.record(mean_account_msgs_exec_time);
metrics::histogram!(
"tycho_do_collate_one_tick_account_msgs_exec_max_time",
labels
)
.record(max_account_msgs_exec_time);

Ok(ExecutedGroup {
items,
Expand All @@ -206,6 +203,7 @@ impl MessagesExecutor {
}

#[allow(clippy::vec_box)]
#[tracing::instrument(skip_all, fields(%account_id))]
fn execute_subgroup(
account_id: HashBytes,
msgs: Vec<Box<ParsedMessage>>,
Expand All @@ -222,7 +220,6 @@ impl MessagesExecutor {
fn save_subgroup_result(
&mut self,
ext_msgs_skipped: &mut u64,
max_account_msgs_exec_time: &mut Duration,
total_exec_time: &mut Duration,
ext_msgs_error_count: &mut u64,
group_max_vert_size: &mut usize,
Expand All @@ -233,7 +230,6 @@ impl MessagesExecutor {
) -> Result<()> {
*ext_msgs_skipped += executed.ext_msgs_skipped;

*max_account_msgs_exec_time = (*max_account_msgs_exec_time).max(executed.exec_time);
*total_exec_time += executed.exec_time;
*group_max_vert_size = cmp::max(*group_max_vert_size, executed.transactions.len());

Expand Down Expand Up @@ -276,6 +272,7 @@ impl MessagesExecutor {
}

#[allow(clippy::vec_box)]
#[tracing::instrument(skip_all)]
fn execute_messages(
mut account_state: Box<ShardAccountStuff>,
msgs: Vec<Box<ParsedMessage>>,
Expand Down Expand Up @@ -403,7 +400,13 @@ impl AccountsCache {
Ok(None)
}

#[tracing::instrument(skip_all)]
fn get_account_stuff(&self, account_id: &AccountId) -> Result<Box<ShardAccountStuff>> {
let labels = [("workchain", self.workchain_id.to_string())];
let _hist = HistogramGuard::begin_with_labels(
"tycho_collator_get_account_stuff_time_high",
&labels,
);
if let Some(account) = self.items.get(account_id) {
Ok(account.clone())
} else if let Some((_depth, shard_account)) = self.shard_accounts.get(account_id)? {
Expand Down
13 changes: 11 additions & 2 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,15 @@ impl CollatorStdImpl {
// get last store task
let last_task = self.store_new_state_tasks.pop().expect("shouldn't happen");

let is_last = self.store_new_state_tasks.is_empty();

// if it is finished, then we can just reload prev state
if last_task.store_new_state_task.is_finished() {
if last_task.store_new_state_task.is_finished()
// or when unfinished tasks limit is 0
|| self.config.merkle_chain_limit == 0
// or this task is the last in queue
|| is_last
{
last_task.store_new_state_task.await?;

// and reload pure prev state in the working state
Expand All @@ -686,7 +693,9 @@ impl CollatorStdImpl {

// collect all unfinished tasks
if !task.store_new_state_task.is_finished()
// until the limit reached
&& unfinished_tasks.len() < self.config.merkle_chain_limit
// or until we reached the last task
&& !is_last
{
unfinished_tasks.push(task);
Expand Down Expand Up @@ -1016,7 +1025,7 @@ impl CollatorStdImpl {
});

// Keep only the last `merkle_chain_limit` states alive
if self.store_state_refs.len() == self.config.merkle_chain_limit
if self.store_state_refs.len() >= self.config.merkle_chain_limit
&& let Some(old_ref) = self.store_state_refs.pop_front()
{
// NOTE: State update can be quite huge so drop it outside of tokio.
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ tycho-crypto = { workspace = true }
tycho-network = { workspace = true, features = ["test"] }
tycho-storage = { workspace = true, features = ["test"] }
tycho-util = { workspace = true, features = ["test"] }
serde_json = { workspace = true, features = ["preserve_order"] }
similar-asserts = { workspace = true }

[features]
Expand Down
14 changes: 13 additions & 1 deletion core/src/storage/block_handle/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,19 @@ impl BlockHandle {
}

pub fn has_state(&self) -> bool {
self.inner.meta.flags().contains(BlockFlags::HAS_STATE)
let main_state_and_parts = BlockFlags::HAS_STATE_MAIN.union(BlockFlags::HAS_STATE_PARTS);
self.inner.meta.flags().contains(main_state_and_parts)
}

pub fn has_state_main(&self) -> bool {
self.inner.meta.flags().contains(BlockFlags::HAS_STATE_MAIN)
}

pub fn has_state_parts(&self) -> bool {
self.inner
.meta
.flags()
.contains(BlockFlags::HAS_STATE_PARTS)
}

pub fn has_persistent_shard_state(&self) -> bool {
Expand Down
4 changes: 3 additions & 1 deletion core/src/storage/block_handle/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ bitflags::bitflags! {
const HAS_PROOF = 1 << 1;
const HAS_QUEUE_DIFF = 1 << 2;

const HAS_STATE = 1 << 3;
const HAS_STATE_MAIN = 1 << 3;
const HAS_PERSISTENT_SHARD_STATE = 1 << 4;
const HAS_PERSISTENT_QUEUE_STATE = 1 << 5;

Expand All @@ -131,6 +131,8 @@ bitflags::bitflags! {
const IS_KEY_BLOCK = 1 << 11;
const IS_PERSISTENT = 1 << 12;

const HAS_STATE_PARTS = 1 << 13;

const IS_REMOVED = 1 << 15;

// Composite flags
Expand Down
Loading
Loading