use super::{ mailbox::{Mailbox, Message}, resolve_state_sync_floor, BlockDigest, StateSyncMetadata, SyncResult, }; use crate::stateful::{ db::{Anchor, DatabaseSet, StateSyncSet, SyncEngineConfig}, Application, }; use commonware_actor::mailbox::{self as actor_mailbox, Receiver}; use commonware_consensus::{ marshal::core::{Mailbox as MarshalMailbox, Variant}, simplex::types::Finalization, }; use commonware_cryptography::certificate::Scheme; use commonware_macros::select_loop; use commonware_runtime::{spawn_cell, Clock, ContextCell, Handle, Metrics, Spawner, Storage}; use commonware_utils::{ channel::{fallible::OneshotExt, oneshot, ring}, futures::OptionFuture, sync::AsyncMutex, NZUsize, }; use futures::SinkExt; use rand::Rng; use std::sync::Arc; use tracing::debug; /// Configuration for [`Syncer`]. pub struct Config where E: Rng + Spawner + Metrics + Clock + Storage, A: Application, A::Databases: StateSyncSet>, S: Scheme, V: Variant, { /// Runtime context used for metadata and database initialization. pub context: E, /// Database configuration for the managed set. pub db_config: >::Config, /// Per-database sync engine parameters. pub sync_config: SyncEngineConfig, /// Per-database resolvers used to fetch state from peers. pub resolvers: R, /// Durable state-sync metadata. pub sync_metadata: Arc>>, /// Finalized floor marshal should resolve before sync starts. pub finalization: Finalization, /// Marshal mailbox used to query the finalized floor. pub marshal: MarshalMailbox, /// Notifies the stateful actor when state sync has produced an artifact. pub sync_complete: oneshot::Sender>, } pub struct Syncer where E: Rng + Spawner + Metrics + Clock + Storage, A: Application, A::Databases: StateSyncSet>, S: Scheme, V: Variant, { /// Runtime context. context: ContextCell, /// The mailbox. mailbox: Receiver>, /// The produced state sync artifact, if complete. artifact: Option>, /// Database configuration for the managed set. db_config: >::Config, /// Per-database sync engine parameters. sync_config: SyncEngineConfig, /// Per-database resolvers used to fetch state from peers. resolvers: R, /// Durable state-sync metadata. sync_metadata: Arc>>, /// Finalized floor marshal should resolve before sync starts. finalization: Finalization, /// Marshal mailbox used to query the finalized floor. marshal: MarshalMailbox, /// Notifies the stateful actor when state sync has produced an artifact. sync_complete: Option>>, } impl Syncer where E: Rng + Spawner + Metrics + Clock + Storage, A: Application, A::Databases: StateSyncSet>, R: Send + Sync + 'static, S: Scheme, V: Variant, { pub fn new(config: Config) -> (Self, Mailbox) { let (sender, receiver) = actor_mailbox::new(config.context.child("mailbox"), NZUsize!(1)); let mailbox = Mailbox::new(sender); ( Self { context: ContextCell::new(config.context), mailbox: receiver, artifact: None, db_config: config.db_config, sync_config: config.sync_config, resolvers: config.resolvers, sync_metadata: config.sync_metadata, finalization: config.finalization, marshal: config.marshal, sync_complete: Some(config.sync_complete), }, mailbox, ) } pub fn start(mut self) -> Handle<()> { spawn_cell!(self.context, self.run()) } pub async fn run(mut self) { let resolved_floor = resolve_state_sync_floor::(&self.marshal, &self.finalization).await; { let mut sync_metadata = self.sync_metadata.lock().await; sync_metadata.begin_sync(resolved_floor.marker).await; } let (mut tip_updates_tx, tip_updates_rx) = ring::channel(NZUsize!(1)); let mut state_sync_task = OptionFuture::from(Some(Box::pin(A::Databases::sync( self.context.child("state_sync"), self.db_config, self.resolvers, resolved_floor.anchor, resolved_floor.targets, tip_updates_rx, self.sync_config, )))); select_loop! { self.context, on_stopped => { debug!("syncer received stop signal, shutting down"); }, result = &mut state_sync_task => { match result { Ok((databases, anchor)) => { Self::publish_artifact( &mut self.artifact, &mut self.sync_complete, databases, anchor, ); state_sync_task = None.into(); } Err(err) => { panic!("state sync task failed: {err:?}"); } } }, Some(message) = self.mailbox.recv() else { debug!("mailbox closed, shutting down syncer"); break; } => match message { Message::UpdateTargets { update, response } => { if let Some(artifact) = self.artifact.clone() { response.send_lossy(Some(artifact)); continue; } // If sync had already completed, the state-sync branch above would // have published `self.artifact` before this mailbox branch ran. if tip_updates_tx.send(update).await.is_err() { // Tuple sync closes the live tip-update receiver as soon as the // coordinator converges, before the database tasks have necessarily // finished. Treat that close as "wait for the in-flight sync task to // publish its artifact", not as a hard failure. match (&mut state_sync_task).await { Ok((databases, anchor)) => { Self::publish_artifact( &mut self.artifact, &mut self.sync_complete, databases, anchor, ); state_sync_task = None.into(); } Err(err) => { panic!("state sync task failed: {err:?}"); } } response.send_lossy(self.artifact.clone()); continue; } response.send_lossy(None); } }, } } fn publish_artifact( artifact: &mut Option>, sync_complete: &mut Option>>, databases: A::Databases, anchor: Anchor>, ) { let sync_result = SyncResult { databases, anchor }; *artifact = Some(sync_result.clone()); if let Some(sync_complete) = sync_complete.take() { sync_complete.send_lossy(sync_result); } } }