//! Service engine for `commonware-reshare` validators. use crate::{ application::{Application, Block, EpochProvider, Provider}, dkg::{self, UpdateCallBack}, orchestrator, setup::PeerConfig, BLOCKS_PER_EPOCH, }; use commonware_broadcast::buffered; use commonware_consensus::{ application::marshaled::Marshaled, marshal::{self, ingress::handler}, simplex::{elector::Config as Elector, scheme::Scheme, types::Finalization}, types::{FixedEpocher, ViewDelta}, }; use commonware_cryptography::{ bls12381::{ dkg::Output, primitives::{group, variant::Variant}, }, Hasher, Signer, }; use commonware_p2p::{Blocker, Manager, Receiver, Sender}; use commonware_runtime::{ buffer::PoolRef, spawn_cell, Clock, ContextCell, Handle, Metrics, Network, Spawner, Storage, }; use commonware_storage::archive::immutable; use commonware_utils::{ordered::Set, union, NZUsize, NZU32, NZU64}; use futures::{channel::mpsc, future::try_join_all}; use rand::{CryptoRng, Rng}; use std::{marker::PhantomData, num::NonZero, time::Instant}; use tracing::{error, info, warn}; const MAILBOX_SIZE: usize = 10; const DEQUE_SIZE: usize = 10; const ACTIVITY_TIMEOUT: ViewDelta = ViewDelta::new(256); const SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER: u64 = 10; const PRUNABLE_ITEMS_PER_SECTION: NonZero = NZU64!(4_096); const IMMUTABLE_ITEMS_PER_SECTION: NonZero = NZU64!(262_144); const FREEZER_TABLE_RESIZE_FREQUENCY: u8 = 4; const FREEZER_TABLE_RESIZE_CHUNK_SIZE: u32 = 2u32.pow(16); // 3MB const FREEZER_JOURNAL_TARGET_SIZE: u64 = 1024 * 1024 * 1024; // 1GB const FREEZER_JOURNAL_COMPRESSION: Option = Some(3); const REPLAY_BUFFER: NonZero = NZUsize!(8 * 1024 * 1024); // 8MB const WRITE_BUFFER: NonZero = NZUsize!(1024 * 1024); // 1MB const BUFFER_POOL_PAGE_SIZE: NonZero = NZUsize!(4_096); // 4KB const BUFFER_POOL_CAPACITY: NonZero = NZUsize!(8_192); // 32MB const MAX_REPAIR: NonZero = NZUsize!(50); pub struct Config where P: Manager>, C: Signer, B: Blocker, V: Variant, { pub signer: C, pub manager: P, pub blocker: B, pub namespace: Vec, pub output: Option>, pub share: Option, pub peer_config: PeerConfig, pub partition_prefix: String, pub freezer_table_initial_size: u32, } pub struct Engine where E: Spawner + Metrics + Rng + CryptoRng + Clock + Storage + Network, C: Signer, P: Manager>, B: Blocker, H: Hasher, V: Variant, S: Scheme, L: Elector, Provider: EpochProvider, { context: ContextCell, config: Config, dkg: dkg::Actor, dkg_mailbox: dkg::Mailbox, buffer: buffered::Engine>, buffered_mailbox: buffered::Mailbox>, #[allow(clippy::type_complexity)] marshal: marshal::Actor< E, Block, Provider, immutable::Archive>, immutable::Archive>, FixedEpocher, >, #[allow(clippy::type_complexity)] orchestrator: orchestrator::Actor< E, B, V, C, H, Marshaled, Block, FixedEpocher>, S, L, >, orchestrator_mailbox: orchestrator::Mailbox, } impl Engine where E: Spawner + Metrics + Rng + CryptoRng + Clock + Storage + Network, C: Signer, P: Manager>, B: Blocker, H: Hasher, V: Variant, S: Scheme, L: Elector, Provider: EpochProvider, { pub async fn new(context: E, config: Config) -> Self { let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY); let consensus_namespace = union(&config.namespace, b"_CONSENSUS"); let num_participants = NZU32!(config.peer_config.max_participants_per_round()); let (dkg, dkg_mailbox) = dkg::Actor::init( context.with_label("dkg"), dkg::Config { manager: config.manager.clone(), signer: config.signer.clone(), mailbox_size: MAILBOX_SIZE, partition_prefix: config.partition_prefix.clone(), peer_config: config.peer_config.clone(), }, ) .await; let (buffer, buffered_mailbox) = buffered::Engine::new( context.with_label("buffer"), buffered::Config { public_key: config.signer.public_key(), mailbox_size: MAILBOX_SIZE, deque_size: DEQUE_SIZE, priority: true, codec_config: num_participants, }, ); // Initialize finalizations by height let start = Instant::now(); let finalizations_by_height = immutable::Archive::init( context.with_label("finalizations_by_height"), immutable::Config { metadata_partition: format!( "{}-finalizations-by-height-metadata", config.partition_prefix ), freezer_table_partition: format!( "{}-finalizations-by-height-freezer-table", config.partition_prefix ), freezer_table_initial_size: config.freezer_table_initial_size, freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY, freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE, freezer_journal_partition: format!( "{}-finalizations-by-height-freezer-journal", config.partition_prefix ), freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE, freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION, freezer_journal_buffer_pool: buffer_pool.clone(), ordinal_partition: format!( "{}-finalizations-by-height-ordinal", config.partition_prefix ), items_per_section: IMMUTABLE_ITEMS_PER_SECTION, codec_config: S::certificate_codec_config_unbounded(), replay_buffer: REPLAY_BUFFER, write_buffer: WRITE_BUFFER, }, ) .await .expect("failed to initialize finalizations by height archive"); info!(elapsed = ?start.elapsed(), "restored finalizations by height archive"); // Initialize finalized blocks archive let start = Instant::now(); let finalized_blocks = immutable::Archive::init( context.with_label("finalized_blocks"), immutable::Config { metadata_partition: format!( "{}-finalized_blocks-metadata", config.partition_prefix ), freezer_table_partition: format!( "{}-finalized_blocks-freezer-table", config.partition_prefix ), freezer_table_initial_size: config.freezer_table_initial_size, freezer_table_resize_frequency: FREEZER_TABLE_RESIZE_FREQUENCY, freezer_table_resize_chunk_size: FREEZER_TABLE_RESIZE_CHUNK_SIZE, freezer_journal_partition: format!( "{}-finalized_blocks-freezer-journal", config.partition_prefix ), freezer_journal_target_size: FREEZER_JOURNAL_TARGET_SIZE, freezer_journal_compression: FREEZER_JOURNAL_COMPRESSION, freezer_journal_buffer_pool: buffer_pool.clone(), ordinal_partition: format!("{}-finalized_blocks-ordinal", config.partition_prefix), items_per_section: IMMUTABLE_ITEMS_PER_SECTION, codec_config: num_participants, replay_buffer: REPLAY_BUFFER, write_buffer: WRITE_BUFFER, }, ) .await .expect("failed to initialize finalized blocks archive"); info!(elapsed = ?start.elapsed(), "restored finalized blocks archive"); // Create the certificate verifier from the initial output (if available). // This allows epoch-independent certificate verification after the DKG is complete. let certificate_verifier = config .output .as_ref() .and_then( as EpochProvider>::certificate_verifier); let provider = Provider::new(config.signer.clone(), certificate_verifier); let (marshal, marshal_mailbox, _processed_height) = marshal::Actor::init( context.with_label("marshal"), finalizations_by_height, finalized_blocks, marshal::Config { provider: provider.clone(), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), partition_prefix: format!("{}_marshal", config.partition_prefix), mailbox_size: MAILBOX_SIZE, view_retention_timeout: ViewDelta::new( ACTIVITY_TIMEOUT .get() .saturating_mul(SYNCER_ACTIVITY_TIMEOUT_MULTIPLIER), ), namespace: consensus_namespace.clone(), prunable_items_per_section: PRUNABLE_ITEMS_PER_SECTION, buffer_pool: buffer_pool.clone(), replay_buffer: REPLAY_BUFFER, write_buffer: WRITE_BUFFER, block_codec_config: num_participants, max_repair: MAX_REPAIR, }, ) .await; let application = Marshaled::new( context.with_label("application"), Application::new(dkg_mailbox.clone()), marshal_mailbox.clone(), FixedEpocher::new(BLOCKS_PER_EPOCH), ); let (orchestrator, orchestrator_mailbox) = orchestrator::Actor::new( context.with_label("orchestrator"), orchestrator::Config { oracle: config.blocker.clone(), application, provider, marshal: marshal_mailbox, namespace: consensus_namespace, muxer_size: MAILBOX_SIZE, mailbox_size: MAILBOX_SIZE, partition_prefix: format!("{}_consensus", config.partition_prefix), _phantom: PhantomData, }, ); Self { context: ContextCell::new(context), config, dkg, dkg_mailbox, buffer, buffered_mailbox, marshal, orchestrator, orchestrator_mailbox, } } #[allow(clippy::type_complexity, clippy::too_many_arguments)] pub fn start( mut self, votes: ( impl Sender, impl Receiver, ), certificates: ( impl Sender, impl Receiver, ), resolver: ( impl Sender, impl Receiver, ), broadcast: ( impl Sender, impl Receiver, ), dkg: ( impl Sender, impl Receiver, ), marshal: ( mpsc::Receiver>>, commonware_resolver::p2p::Mailbox>, C::PublicKey>, ), callback: Box>, ) -> Handle<()> { spawn_cell!( self.context, self.run( votes, certificates, resolver, broadcast, dkg, marshal, callback ) .await ) } #[allow(clippy::type_complexity, clippy::too_many_arguments)] async fn run( self, votes: ( impl Sender, impl Receiver, ), certificates: ( impl Sender, impl Receiver, ), resolver: ( impl Sender, impl Receiver, ), broadcast: ( impl Sender, impl Receiver, ), dkg: ( impl Sender, impl Receiver, ), marshal: ( mpsc::Receiver>>, commonware_resolver::p2p::Mailbox>, C::PublicKey>, ), callback: Box>, ) { let dkg_handle = self.dkg.start( self.config.output, self.config.share, self.orchestrator_mailbox, dkg, callback, ); let buffer_handle = self.buffer.start(broadcast); let marshal_handle = self .marshal .start(self.dkg_mailbox, self.buffered_mailbox, marshal); let orchestrator_handle = self.orchestrator.start(votes, certificates, resolver); if let Err(e) = try_join_all(vec![ dkg_handle, buffer_handle, marshal_handle, orchestrator_handle, ]) .await { error!(?e, "task failed"); } else { warn!("engine stopped"); } } }