//! # Importer Task //! This module contains the import task which is responsible for //! importing blocks from the network into the local blockchain. use fuel_core_services::{ SharedMutex, StateWatcher, }; use fuel_core_types::{ self, blockchain::{ block::Block, SealedBlock, SealedBlockHeader, }, fuel_types::BlockHeight, services::p2p::{ PeerId, SourcePeer, Transactions, }, }; use futures::{ stream::StreamExt, FutureExt, Stream, }; use std::{ future::Future, ops::{ Range, RangeInclusive, }, sync::Arc, }; use tokio::sync::Notify; use tracing::Instrument; use crate::{ ports::{ BlockImporterPort, ConsensusPort, PeerReportReason, PeerToPeerPort, }, state::State, tracing_helpers::TraceErr, }; #[cfg(any(test, feature = "benchmarking"))] /// Accessories for testing the sync. Available only when compiling under test /// or benchmarking. pub mod test_helpers; #[cfg(test)] mod tests; #[cfg(test)] mod back_pressure_tests; #[derive(Clone, Copy, Debug)] /// Parameters for the import task. pub struct Config { /// The maximum number of get transaction requests to make in a single batch. pub block_stream_buffer_size: usize, /// The maximum number of headers to request in a single batch. pub header_batch_size: usize, } impl Default for Config { fn default() -> Self { Self { block_stream_buffer_size: 10, header_batch_size: 100, } } } /// The combination of shared state, configuration, and services that define /// import behavior. pub struct Import { /// Shared state between import and sync tasks. state: SharedMutex, /// Notify import when sync has new work. notify: Arc, /// Configuration parameters. params: Config, /// Network port. p2p: Arc

, /// Executor port. executor: Arc, /// Consensus port. consensus: Arc, } impl Import { /// Configure an import behavior from a shared state, configuration and /// services that can be executed by an ImportTask. pub fn new( state: SharedMutex, notify: Arc, params: Config, p2p: Arc

, executor: Arc, consensus: Arc, ) -> Self { Self { state, notify, params, p2p, executor, consensus, } } /// Signal other asynchronous tasks that an import event has occurred. pub fn notify_one(&self) { self.notify.notify_one() } } #[derive(Debug)] struct Batch { peer: PeerId, range: Range, results: Vec, } impl Batch { pub fn new(peer: PeerId, range: Range, results: Vec) -> Self { Self { peer, range, results, } } pub fn is_err(&self) -> bool { self.results.len() < self.range.len() } } type SealedHeaderBatch = Batch; type SealedBlockBatch = Batch; impl Import where P: PeerToPeerPort + Send + Sync + 'static, E: BlockImporterPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, { #[tracing::instrument(skip_all)] /// Execute imports until a shutdown is requested. pub async fn import(&self, shutdown: &mut StateWatcher) -> anyhow::Result { self.import_inner(shutdown).await?; Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await) } async fn import_inner(&self, shutdown: &StateWatcher) -> anyhow::Result<()> { // If there is a range to process, launch the stream. if let Some(range) = self.state.apply(|s| s.process_range()) { // Launch the stream to import the range. let count = self.launch_stream(range.clone(), shutdown).await; // Get the size of the range. let range_len = range.size_hint().0; // If we did not process the entire range, mark the failed heights as failed. if count < range_len { let count = u32::try_from(count) .expect("Size of the range can't be more than maximum `BlockHeight`"); let incomplete_range = range.start().saturating_add(count)..=*range.end(); self.state .apply(|s| s.failed_to_process(incomplete_range.clone())); Err(anyhow::anyhow!( "Failed to import range of blocks: {:?}", incomplete_range ))?; } } Ok(()) } #[tracing::instrument(skip(self, shutdown))] /// Launches a stream to import and execute a range of blocks. /// /// This stream will process all blocks up to the given range or /// an error occurs. /// If an error occurs, the preceding blocks still be processed /// and the error will be returned. async fn launch_stream( &self, range: RangeInclusive, shutdown: &StateWatcher, ) -> usize { let Self { state, params, p2p, executor, consensus, .. } = &self; let shutdown_signal = shutdown.clone(); let (shutdown_guard, mut shutdown_guard_recv) = tokio::sync::mpsc::channel::<()>(1); let block_stream = get_block_stream(range.clone(), params, p2p.clone(), consensus.clone()); let result = block_stream .map(move |stream_block_batch| { let shutdown_guard = shutdown_guard.clone(); let shutdown_signal = shutdown_signal.clone(); tokio::spawn(async move { // Hold a shutdown sender for the lifetime of the spawned task let _shutdown_guard = shutdown_guard.clone(); let mut shutdown_signal = shutdown_signal.clone(); tokio::select! { // Stream a batch of blocks blocks = stream_block_batch => Some(blocks), // If a shutdown signal is received during the stream, terminate early and // return an empty response _ = shutdown_signal.while_started() => None } }).map(|task| { task.trace_err("Failed to join the task").ok().flatten() }) }) // Request up to `block_stream_buffer_size` transactions from the network. .buffered(params.block_stream_buffer_size) // Continue the stream until the shutdown signal is received. .take_until({ let mut s = shutdown.clone(); async move { let _ = s.while_started().await; tracing::info!("In progress import stream shutting down"); } }) .into_scan_none() .scan_none() .into_scan_err() .scan_err() .then(|batch| { async move { let Batch { peer, range, results, } = batch; let mut done = vec![]; for sealed_block in results { let res = execute_and_commit(executor.as_ref(), state, sealed_block).await; match &res { Ok(_) => { done.push(()); }, Err(e) => { // If this fails, then it means that consensus has approved a block that is invalid. // This would suggest a more serious issue than a bad peer, e.g. a fork or an out-of-date client. tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer, e); break; }, }; } let batch = Batch::new(peer.clone(), range, done); if !batch.is_err() { report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport); } batch } .instrument(tracing::debug_span!("execute_and_commit")) .in_current_span() }) // Continue the stream unless an error occurs. .into_scan_err() .scan_err() // Count the number of successfully executed blocks. // Fold the stream into a count. .fold(0usize, |count, batch| async move { count.checked_add(batch.results.len()).expect("It is impossible to fetch so much data to overflow `usize`") }) .await; // Wait for any spawned tasks to shutdown let _ = shutdown_guard_recv.recv().await; result } } fn get_block_stream< P: PeerToPeerPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, >( range: RangeInclusive, params: &Config, p2p: Arc

, consensus: Arc, ) -> impl Stream> + '_ { let header_stream = get_header_batch_stream(range.clone(), params, p2p.clone()); header_stream .map({ let consensus = consensus.clone(); let p2p = p2p.clone(); move |header_batch: SealedHeaderBatch| { let Batch { peer, range, results, } = header_batch; let checked_headers = results .into_iter() .take_while(|header| { check_sealed_header(header, peer.clone(), &p2p, &consensus) }) .collect::>(); Batch::new(peer, range, checked_headers) } }) .map(move |headers| { let consensus = consensus.clone(); let p2p = p2p.clone(); async move { let Batch { peer, range, results, } = headers; if results.is_empty() { SealedBlockBatch::new(peer, range, vec![]) } else { await_da_height( results .last() .expect("We checked headers are not empty above"), &consensus, ) .await; let headers = SealedHeaderBatch::new(peer, range, results); get_blocks(&p2p, headers).await } } .instrument(tracing::debug_span!("consensus_and_transactions")) .in_current_span() }) } fn get_header_batch_stream( range: RangeInclusive, params: &Config, p2p: Arc

, ) -> impl Stream { let Config { header_batch_size, .. } = params; let ranges = range_chunks(range, *header_batch_size); futures::stream::iter(ranges).then(move |range| { let p2p = p2p.clone(); async move { get_headers_batch(range, &p2p).await } }) } fn range_chunks( range: RangeInclusive, chunk_size: usize, ) -> impl Iterator> { let end = range.end().saturating_add(1); let chunk_size_u32 = u32::try_from(chunk_size).expect("The size of the chunk can't exceed `u32`"); range.step_by(chunk_size).map(move |chunk_start| { let block_end = (chunk_start.saturating_add(chunk_size_u32)).min(end); chunk_start..block_end }) } fn check_sealed_header< P: PeerToPeerPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, >( header: &SealedBlockHeader, peer_id: PeerId, p2p: &Arc

, consensus: &Arc, ) -> bool { let validity = consensus .check_sealed_header(header) .trace_err("Failed to check consensus on header") .unwrap_or(false); if !validity { report_peer(p2p, peer_id.clone(), PeerReportReason::BadBlockHeader); } validity } async fn await_da_height( header: &SealedBlockHeader, consensus: &Arc, ) { let _ = consensus .await_da_height(&header.entity.da_height) .await .trace_err("Failed to wait for DA layer to sync"); } /// Waits for a notify or shutdown signal. /// Returns true if the notify signal was received. async fn wait_for_notify_or_shutdown( notify: &Notify, shutdown: &mut StateWatcher, ) -> bool { let n = notify.notified(); let s = shutdown.while_started(); futures::pin_mut!(n); futures::pin_mut!(s); // Select the first signal to be received. let r = futures::future::select(n, s).await; // Check if the notify signal was received. matches!(r, futures::future::Either::Left(_)) } async fn get_sealed_block_headers

( range: Range, p2p: &Arc

, ) -> SourcePeer> where P: PeerToPeerPort + Send + Sync + 'static, { tracing::debug!( "getting header range from {} to {} inclusive", range.start, range.end ); p2p.get_sealed_block_headers(range) .await .trace_err("Failed to get headers") .unwrap_or_default() .map(|inner| inner.unwrap_or_default()) } async fn get_transactions

( peer_id: PeerId, range: Range, p2p: &Arc

, ) -> Option> where P: PeerToPeerPort + Send + Sync + 'static, { let range = peer_id.clone().bind(range); let res = p2p .get_transactions(range) .await .trace_err("Failed to get transactions"); match res { Ok(Some(transactions)) => Some(transactions), _ => { report_peer(p2p, peer_id.clone(), PeerReportReason::MissingTransactions); None } } } async fn get_headers_batch

(range: Range, p2p: &Arc

) -> SealedHeaderBatch where P: PeerToPeerPort + Send + Sync + 'static, { tracing::debug!( "getting header range from {} to {} inclusive", range.start, range.end ); let sourced_headers = get_sealed_block_headers(range.clone(), p2p).await; let SourcePeer { peer_id, data: headers, } = sourced_headers; let heights = range.clone().map(BlockHeight::from); let headers = headers .into_iter() .zip(heights) .take_while(move |(header, expected_height)| { let height = header.entity.height(); height == expected_height }) .map(|(header, _)| header) .collect::>(); if headers.len() != range.len() { report_peer(p2p, peer_id.clone(), PeerReportReason::MissingBlockHeaders); } Batch::new(peer_id, range, headers) } fn report_peer

(p2p: &Arc

, peer_id: PeerId, reason: PeerReportReason) where P: PeerToPeerPort + Send + Sync + 'static, { tracing::info!("Reporting peer for {:?}", reason); // Failure to report a peer is a non-fatal error; ignore the error let _ = p2p .report_peer(peer_id.clone(), reason) .trace_err(&format!("Failed to report peer {:?}", peer_id)); } /// Get blocks correlating to the headers from a specific peer #[tracing::instrument(skip(p2p, headers))] async fn get_blocks

(p2p: &Arc

, headers: SealedHeaderBatch) -> SealedBlockBatch where P: PeerToPeerPort + Send + Sync + 'static, { let Batch { results: headers, peer, range, } = headers; let Some(transaction_data) = get_transactions(peer.clone(), range.clone(), p2p).await else { return Batch::new(peer, range, vec![]) }; let iter = headers.into_iter().zip(transaction_data.into_iter()); let mut blocks = vec![]; for (block_header, transactions) in iter { let SealedBlockHeader { consensus, entity: header, } = block_header; let block = Block::try_from_executed(header, transactions.0).map(|block| SealedBlock { entity: block, consensus, }); if let Some(block) = block { blocks.push(block); } else { report_peer(p2p, peer.clone(), PeerReportReason::InvalidTransactions); break } } Batch::new(peer, range, blocks) } #[tracing::instrument( skip_all, fields( height = **block.entity.header().height(), id = %block.entity.header().consensus().generated.application_hash ), err )] async fn execute_and_commit( executor: &E, state: &SharedMutex, block: SealedBlock, ) -> anyhow::Result<()> where E: BlockImporterPort + Send + Sync + 'static, { // Execute and commit the block. let height = *block.entity.header().height(); let r = executor.execute_and_commit(block).await; // If the block executed successfully, mark it as committed. if r.is_ok() { state.apply(|s| s.commit(*height)); } else { tracing::error!("Execution of height {} failed: {:?}", *height, r); } r } /// Extra stream utilities. trait StreamUtil: Sized { /// Scan the stream for `None`. fn into_scan_none(self) -> ScanNone { ScanNone(self) } /// Scan the stream for errors. fn into_scan_err(self) -> ScanErr { ScanErr(self) } } impl StreamUtil for S {} struct ScanErr(S); struct ScanNone(S); impl ScanNone { fn scan_none<'a, T: 'a>(self) -> impl Stream + 'a where S: Stream> + Send + 'a, { let stream = self.0.boxed::<'a>(); futures::stream::unfold((false, stream), |(_, mut stream)| async move { let element = stream.next().await?; element.map(|e| (e, (false, stream))) }) } } impl ScanErr { fn scan_err<'a, T: 'a>(self) -> impl Stream> + 'a where S: Stream> + Send + 'a, { let stream = self.0.boxed::<'a>(); futures::stream::unfold((false, stream), |(mut err, mut stream)| async move { if err { None } else { let batch = stream.next().await?; err = batch.is_err(); Some((batch, (err, stream))) } }) } }