11use crate :: {
22 AbortReason , CONCURRENT_LEVEL , FALLBACK_SEQUENTIAL , GrevmError , LocationAndType , MemoryEntry ,
33 ParallelState , ReadVersion , Task , TransactionResult , TransactionStatus , TxId , TxState ,
4- TxVersion , async_commit:: StateAsyncCommit , hint:: ParallelExecutionHints , storage:: CacheDB ,
5- tx_dependency:: TxDependency , utils:: ContinuousDetectSet ,
4+ TxVersion ,
5+ async_commit:: { CommitGuard , StateAsyncCommit } ,
6+ hint:: ParallelExecutionHints ,
7+ storage:: CacheDB ,
8+ tx_dependency:: TxDependency ,
9+ utils:: ContinuousDetectSet ,
610} ;
711use ahash:: { AHashMap as HashMap , AHashSet as HashSet } ;
812use alloy_evm:: {
@@ -25,6 +29,7 @@ use revm_inspector::NoOpInspector;
2529use revm_primitives:: Address ;
2630
2731use std:: {
32+ cell:: UnsafeCell ,
2833 cmp:: max,
2934 collections:: BTreeMap ,
3035 fmt:: Debug ,
@@ -36,6 +41,9 @@ use std::{
3641 time:: Instant ,
3742} ;
3843
44+ /// min number of txs to parallel execute.
45+ const MIN_PARALLEL_TXS : usize = 64 ;
46+
3947pub ( crate ) type MVMemory = DashMap < LocationAndType , BTreeMap < TxId , MemoryEntry > > ;
4048
4149#[ derive( Metrics ) ]
@@ -261,7 +269,7 @@ where
261269 env : BlockEnv ,
262270 block_size : usize ,
263271 txs : Arc < Vec < TxEnv > > ,
264- state : ParallelState < DB > ,
272+ state : UnsafeCell < ParallelState < DB > > ,
265273 results : Mutex < Vec < ExecutionResult > > ,
266274 tx_states : Vec < Mutex < TxState > > ,
267275 tx_results : Vec < Mutex < Option < TransactionResult < DB :: Error > > > > ,
@@ -276,6 +284,12 @@ where
276284 metrics : ExecuteMetricsCollector ,
277285}
278286
287+ // SAFETY: Scheduler is shared across threads via `thread::scope`. The `UnsafeCell<ParallelState>`
288+ // is safe because: (1) only the commit thread mutates it (via StateAsyncCommit), serialized by
289+ // finality ordering, (2) worker threads only read via DatabaseRef (DashMap, thread-safe),
290+ // (3) fallback_sequential() is only called after all threads have joined.
291+ unsafe impl < DB : DatabaseRef + Send + Sync > Sync for Scheduler < DB > where DB :: Error : Send + Sync { }
292+
279293impl < DB > Debug for Scheduler < DB >
280294where
281295 DB : DatabaseRef ,
@@ -315,7 +329,7 @@ where
315329 env,
316330 block_size : num_txs,
317331 txs,
318- state,
332+ state : UnsafeCell :: new ( state ) ,
319333 results : Mutex :: new ( vec ! [ ] ) ,
320334 tx_states : ( 0 ..num_txs) . map ( |_| Mutex :: new ( TxState :: default ( ) ) ) . collect ( ) ,
321335 tx_results : ( 0 ..num_txs) . map ( |_| Mutex :: new ( None ) ) . collect ( ) ,
@@ -375,17 +389,14 @@ where
375389
376390 if ( Instant :: now ( ) - start) . as_millis ( ) > 8_000 {
377391 start = Instant :: now ( ) ;
378- println ! (
379- "stuck..., block_number: {}, finality_idx: {}, validation_idx: {}, execution_idx: {}" ,
380- self . env. number,
381- self . scheduler_ctx. finality_idx( ) ,
382- self . scheduler_ctx. validation_idx( ) ,
383- self . scheduler_ctx. executed_set. continuous_idx( ) ,
392+ tracing:: warn!(
393+ target: "grevm::scheduler" ,
394+ block_number = %self . env. number,
395+ finality_idx = self . scheduler_ctx. finality_idx( ) ,
396+ validation_idx = self . scheduler_ctx. validation_idx( ) ,
397+ execution_idx = self . scheduler_ctx. executed_set. continuous_idx( ) ,
398+ "parallel execution stuck" ,
384399 ) ;
385- let status: Vec < ( TxId , TransactionStatus ) > =
386- self . tx_states . iter ( ) . map ( |s| s. lock ( ) . status . clone ( ) ) . enumerate ( ) . collect ( ) ;
387- println ! ( "transaction status: {:?}" , status) ;
388- self . tx_dependency . print ( ) ;
389400 }
390401 }
391402 }
@@ -394,7 +405,7 @@ where
394405 let mut commit_idx = 0 ;
395406 let mut commiter = commiter. lock ( ) ;
396407 let async_commit_state =
397- std:: env:: var ( "ASYNC_COMMIT_STATE" ) . map_or ( true , |s| s. parse ( ) . unwrap ( ) ) ;
408+ std:: env:: var ( "ASYNC_COMMIT_STATE" ) . map_or ( true , |s| s. parse ( ) . unwrap_or ( true ) ) ;
398409 while !self . abort . load ( Ordering :: Acquire ) && commit_idx < self . block_size {
399410 while commit_idx < self . scheduler_ctx . finality_idx . load ( Ordering :: Acquire ) {
400411 if async_commit_state {
@@ -420,7 +431,7 @@ where
420431
421432 /// Take `ExecutionResult` and `ParallelState`
422433 pub fn take_result_and_state ( self ) -> ( Vec < ExecutionResult > , ParallelState < DB > ) {
423- ( self . results . into_inner ( ) , self . state )
434+ ( self . results . into_inner ( ) , self . state . into_inner ( ) )
424435 }
425436
426437 /// Paralle execution
@@ -432,16 +443,18 @@ where
432443 self . metrics . total_tx_cnt . store ( self . block_size , Ordering :: Relaxed ) ;
433444 let concurrency_level = concurrency_level. unwrap_or (
434445 std:: env:: var ( "GREVM_CONCURRENT_LEVEL" )
435- . map_or ( * CONCURRENT_LEVEL , |s| s. parse ( ) . unwrap ( ) ) ,
446+ . map_or ( * CONCURRENT_LEVEL , |s| s. parse ( ) . unwrap_or ( * CONCURRENT_LEVEL ) ) ,
436447 ) ;
437- if * FALLBACK_SEQUENTIAL {
448+ if * FALLBACK_SEQUENTIAL || self . block_size < MIN_PARALLEL_TXS {
438449 return self . fallback_sequential ( ) ;
439450 }
440451 let commiter = Mutex :: new ( StateAsyncCommit :: new (
441452 self . env . beneficiary ,
442- & self . state ,
453+ CommitGuard :: new ( & self . state ) ,
443454 self . cfg . disable_nonce_check ,
444455 ) ) ;
456+
457+ let state_ref = unsafe { & * self . state . get ( ) } ;
445458 commiter. lock ( ) . init ( ) . map_err ( |e| GrevmError { txid : 0 , error : EVMError :: Database ( e) } ) ?;
446459 thread:: scope ( |scope| {
447460 scope. spawn ( || {
@@ -458,7 +471,7 @@ where
458471 let cache_db = CacheDB :: new (
459472 self . cfg . spec ,
460473 self . env . beneficiary ,
461- & self . state ,
474+ state_ref ,
462475 & self . mv_memory ,
463476 & self . scheduler_ctx . commit_idx ,
464477 ) ;
@@ -548,13 +561,6 @@ where
548561 Ok ( ( ) )
549562 }
550563
551- fn state_mut ( & self ) -> & mut ParallelState < DB > {
552- #[ allow( invalid_reference_casting) ]
553- unsafe {
554- & mut * ( & self . state as * const ParallelState < DB > as * mut ParallelState < DB > )
555- }
556- }
557-
558564 /// Fallback to sequential execution
559565 pub fn fallback_sequential ( & self ) -> Result < ( ) , GrevmError < DB :: Error > > {
560566 let mut results = self . results . lock ( ) ;
@@ -564,7 +570,8 @@ where
564570 }
565571
566572 let mut sequential_results = Vec :: with_capacity ( self . block_size - num_commit) ;
567- let state_mut = self . state_mut ( ) ;
573+ let mut commit_guard = CommitGuard :: new ( & self . state ) ;
574+ let state_mut = commit_guard. state_mut ( ) ;
568575 {
569576 let evm = Context :: mainnet ( )
570577 . with_db ( state_mut)
0 commit comments