//! # Importer Task
//! This module contains the import task which is responsible for
//! importing blocks from the network into the local blockchain.
use fuel_core_services::{
SharedMutex,
StateWatcher,
};
use fuel_core_types::{
self,
blockchain::{
block::Block,
SealedBlock,
SealedBlockHeader,
},
fuel_types::BlockHeight,
services::p2p::{
PeerId,
SourcePeer,
Transactions,
},
};
use futures::{
stream::StreamExt,
FutureExt,
Stream,
};
use std::{
future::Future,
ops::{
Range,
RangeInclusive,
},
sync::Arc,
};
use tokio::sync::Notify;
use tracing::Instrument;
use crate::{
ports::{
BlockImporterPort,
ConsensusPort,
PeerReportReason,
PeerToPeerPort,
},
state::State,
tracing_helpers::TraceErr,
};
#[cfg(any(test, feature = "benchmarking"))]
/// Accessories for testing the sync. Available only when compiling under test
/// or benchmarking.
pub mod test_helpers;
#[cfg(test)]
mod tests;
#[cfg(test)]
mod back_pressure_tests;
#[derive(Clone, Copy, Debug)]
/// Parameters for the import task.
pub struct Config {
/// The maximum number of get transaction requests to make in a single batch.
pub block_stream_buffer_size: usize,
/// The maximum number of headers to request in a single batch.
pub header_batch_size: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
block_stream_buffer_size: 10,
header_batch_size: 100,
}
}
}
/// The combination of shared state, configuration, and services that define
/// import behavior.
pub struct Import
{
/// Shared state between import and sync tasks.
state: SharedMutex,
/// Notify import when sync has new work.
notify: Arc,
/// Configuration parameters.
params: Config,
/// Network port.
p2p: Arc,
/// Executor port.
executor: Arc,
/// Consensus port.
consensus: Arc,
}
impl Import
{
/// Configure an import behavior from a shared state, configuration and
/// services that can be executed by an ImportTask.
pub fn new(
state: SharedMutex,
notify: Arc,
params: Config,
p2p: Arc,
executor: Arc,
consensus: Arc,
) -> Self {
Self {
state,
notify,
params,
p2p,
executor,
consensus,
}
}
/// Signal other asynchronous tasks that an import event has occurred.
pub fn notify_one(&self) {
self.notify.notify_one()
}
}
#[derive(Debug)]
struct Batch {
peer: PeerId,
range: Range,
results: Vec,
}
impl Batch {
pub fn new(peer: PeerId, range: Range, results: Vec) -> Self {
Self {
peer,
range,
results,
}
}
pub fn is_err(&self) -> bool {
self.results.len() < self.range.len()
}
}
type SealedHeaderBatch = Batch;
type SealedBlockBatch = Batch;
impl Import
where
P: PeerToPeerPort + Send + Sync + 'static,
E: BlockImporterPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
{
#[tracing::instrument(skip_all)]
/// Execute imports until a shutdown is requested.
pub async fn import(&self, shutdown: &mut StateWatcher) -> anyhow::Result {
self.import_inner(shutdown).await?;
Ok(wait_for_notify_or_shutdown(&self.notify, shutdown).await)
}
async fn import_inner(&self, shutdown: &StateWatcher) -> anyhow::Result<()> {
// If there is a range to process, launch the stream.
if let Some(range) = self.state.apply(|s| s.process_range()) {
// Launch the stream to import the range.
let count = self.launch_stream(range.clone(), shutdown).await;
// Get the size of the range.
let range_len = range.size_hint().0;
// If we did not process the entire range, mark the failed heights as failed.
if count < range_len {
let count = u32::try_from(count)
.expect("Size of the range can't be more than maximum `BlockHeight`");
let incomplete_range = range.start().saturating_add(count)..=*range.end();
self.state
.apply(|s| s.failed_to_process(incomplete_range.clone()));
Err(anyhow::anyhow!(
"Failed to import range of blocks: {:?}",
incomplete_range
))?;
}
}
Ok(())
}
#[tracing::instrument(skip(self, shutdown))]
/// Launches a stream to import and execute a range of blocks.
///
/// This stream will process all blocks up to the given range or
/// an error occurs.
/// If an error occurs, the preceding blocks still be processed
/// and the error will be returned.
async fn launch_stream(
&self,
range: RangeInclusive,
shutdown: &StateWatcher,
) -> usize {
let Self {
state,
params,
p2p,
executor,
consensus,
..
} = &self;
let shutdown_signal = shutdown.clone();
let (shutdown_guard, mut shutdown_guard_recv) =
tokio::sync::mpsc::channel::<()>(1);
let block_stream =
get_block_stream(range.clone(), params, p2p.clone(), consensus.clone());
let result = block_stream
.map(move |stream_block_batch| {
let shutdown_guard = shutdown_guard.clone();
let shutdown_signal = shutdown_signal.clone();
tokio::spawn(async move {
// Hold a shutdown sender for the lifetime of the spawned task
let _shutdown_guard = shutdown_guard.clone();
let mut shutdown_signal = shutdown_signal.clone();
tokio::select! {
// Stream a batch of blocks
blocks = stream_block_batch => Some(blocks),
// If a shutdown signal is received during the stream, terminate early and
// return an empty response
_ = shutdown_signal.while_started() => None
}
}).map(|task| {
task.trace_err("Failed to join the task").ok().flatten()
})
})
// Request up to `block_stream_buffer_size` transactions from the network.
.buffered(params.block_stream_buffer_size)
// Continue the stream until the shutdown signal is received.
.take_until({
let mut s = shutdown.clone();
async move {
let _ = s.while_started().await;
tracing::info!("In progress import stream shutting down");
}
})
.into_scan_none()
.scan_none()
.into_scan_err()
.scan_err()
.then(|batch| {
async move {
let Batch {
peer,
range,
results,
} = batch;
let mut done = vec![];
for sealed_block in results {
let res = execute_and_commit(executor.as_ref(), state, sealed_block).await;
match &res {
Ok(_) => {
done.push(());
},
Err(e) => {
// If this fails, then it means that consensus has approved a block that is invalid.
// This would suggest a more serious issue than a bad peer, e.g. a fork or an out-of-date client.
tracing::error!("Failed to execute and commit block from peer {:?}: {:?}", peer, e);
break;
},
};
}
let batch = Batch::new(peer.clone(), range, done);
if !batch.is_err() {
report_peer(p2p, peer, PeerReportReason::SuccessfulBlockImport);
}
batch
}
.instrument(tracing::debug_span!("execute_and_commit"))
.in_current_span()
})
// Continue the stream unless an error occurs.
.into_scan_err()
.scan_err()
// Count the number of successfully executed blocks.
// Fold the stream into a count.
.fold(0usize, |count, batch| async move {
count.checked_add(batch.results.len()).expect("It is impossible to fetch so much data to overflow `usize`")
})
.await;
// Wait for any spawned tasks to shutdown
let _ = shutdown_guard_recv.recv().await;
result
}
}
fn get_block_stream<
P: PeerToPeerPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
>(
range: RangeInclusive,
params: &Config,
p2p: Arc,
consensus: Arc,
) -> impl Stream- > + '_ {
let header_stream = get_header_batch_stream(range.clone(), params, p2p.clone());
header_stream
.map({
let consensus = consensus.clone();
let p2p = p2p.clone();
move |header_batch: SealedHeaderBatch| {
let Batch {
peer,
range,
results,
} = header_batch;
let checked_headers = results
.into_iter()
.take_while(|header| {
check_sealed_header(header, peer.clone(), &p2p, &consensus)
})
.collect::>();
Batch::new(peer, range, checked_headers)
}
})
.map(move |headers| {
let consensus = consensus.clone();
let p2p = p2p.clone();
async move {
let Batch {
peer,
range,
results,
} = headers;
if results.is_empty() {
SealedBlockBatch::new(peer, range, vec![])
} else {
await_da_height(
results
.last()
.expect("We checked headers are not empty above"),
&consensus,
)
.await;
let headers = SealedHeaderBatch::new(peer, range, results);
get_blocks(&p2p, headers).await
}
}
.instrument(tracing::debug_span!("consensus_and_transactions"))
.in_current_span()
})
}
fn get_header_batch_stream(
range: RangeInclusive,
params: &Config,
p2p: Arc
,
) -> impl Stream- {
let Config {
header_batch_size, ..
} = params;
let ranges = range_chunks(range, *header_batch_size);
futures::stream::iter(ranges).then(move |range| {
let p2p = p2p.clone();
async move { get_headers_batch(range, &p2p).await }
})
}
fn range_chunks(
range: RangeInclusive,
chunk_size: usize,
) -> impl Iterator
- > {
let end = range.end().saturating_add(1);
let chunk_size_u32 =
u32::try_from(chunk_size).expect("The size of the chunk can't exceed `u32`");
range.step_by(chunk_size).map(move |chunk_start| {
let block_end = (chunk_start.saturating_add(chunk_size_u32)).min(end);
chunk_start..block_end
})
}
fn check_sealed_header<
P: PeerToPeerPort + Send + Sync + 'static,
C: ConsensusPort + Send + Sync + 'static,
>(
header: &SealedBlockHeader,
peer_id: PeerId,
p2p: &Arc
,
consensus: &Arc,
) -> bool {
let validity = consensus
.check_sealed_header(header)
.trace_err("Failed to check consensus on header")
.unwrap_or(false);
if !validity {
report_peer(p2p, peer_id.clone(), PeerReportReason::BadBlockHeader);
}
validity
}
async fn await_da_height(
header: &SealedBlockHeader,
consensus: &Arc,
) {
let _ = consensus
.await_da_height(&header.entity.da_height)
.await
.trace_err("Failed to wait for DA layer to sync");
}
/// Waits for a notify or shutdown signal.
/// Returns true if the notify signal was received.
async fn wait_for_notify_or_shutdown(
notify: &Notify,
shutdown: &mut StateWatcher,
) -> bool {
let n = notify.notified();
let s = shutdown.while_started();
futures::pin_mut!(n);
futures::pin_mut!(s);
// Select the first signal to be received.
let r = futures::future::select(n, s).await;
// Check if the notify signal was received.
matches!(r, futures::future::Either::Left(_))
}
async fn get_sealed_block_headers(
range: Range,
p2p: &Arc,
) -> SourcePeer>
where
P: PeerToPeerPort + Send + Sync + 'static,
{
tracing::debug!(
"getting header range from {} to {} inclusive",
range.start,
range.end
);
p2p.get_sealed_block_headers(range)
.await
.trace_err("Failed to get headers")
.unwrap_or_default()
.map(|inner| inner.unwrap_or_default())
}
async fn get_transactions(
peer_id: PeerId,
range: Range,
p2p: &Arc,
) -> Option>
where
P: PeerToPeerPort + Send + Sync + 'static,
{
let range = peer_id.clone().bind(range);
let res = p2p
.get_transactions(range)
.await
.trace_err("Failed to get transactions");
match res {
Ok(Some(transactions)) => Some(transactions),
_ => {
report_peer(p2p, peer_id.clone(), PeerReportReason::MissingTransactions);
None
}
}
}
async fn get_headers_batch(range: Range, p2p: &Arc) -> SealedHeaderBatch
where
P: PeerToPeerPort + Send + Sync + 'static,
{
tracing::debug!(
"getting header range from {} to {} inclusive",
range.start,
range.end
);
let sourced_headers = get_sealed_block_headers(range.clone(), p2p).await;
let SourcePeer {
peer_id,
data: headers,
} = sourced_headers;
let heights = range.clone().map(BlockHeight::from);
let headers = headers
.into_iter()
.zip(heights)
.take_while(move |(header, expected_height)| {
let height = header.entity.height();
height == expected_height
})
.map(|(header, _)| header)
.collect::>();
if headers.len() != range.len() {
report_peer(p2p, peer_id.clone(), PeerReportReason::MissingBlockHeaders);
}
Batch::new(peer_id, range, headers)
}
fn report_peer(p2p: &Arc
, peer_id: PeerId, reason: PeerReportReason)
where
P: PeerToPeerPort + Send + Sync + 'static,
{
tracing::info!("Reporting peer for {:?}", reason);
// Failure to report a peer is a non-fatal error; ignore the error
let _ = p2p
.report_peer(peer_id.clone(), reason)
.trace_err(&format!("Failed to report peer {:?}", peer_id));
}
/// Get blocks correlating to the headers from a specific peer
#[tracing::instrument(skip(p2p, headers))]
async fn get_blocks
(p2p: &Arc
, headers: SealedHeaderBatch) -> SealedBlockBatch
where
P: PeerToPeerPort + Send + Sync + 'static,
{
let Batch {
results: headers,
peer,
range,
} = headers;
let Some(transaction_data) = get_transactions(peer.clone(), range.clone(), p2p).await
else {
return Batch::new(peer, range, vec![])
};
let iter = headers.into_iter().zip(transaction_data.into_iter());
let mut blocks = vec![];
for (block_header, transactions) in iter {
let SealedBlockHeader {
consensus,
entity: header,
} = block_header;
let block =
Block::try_from_executed(header, transactions.0).map(|block| SealedBlock {
entity: block,
consensus,
});
if let Some(block) = block {
blocks.push(block);
} else {
report_peer(p2p, peer.clone(), PeerReportReason::InvalidTransactions);
break
}
}
Batch::new(peer, range, blocks)
}
#[tracing::instrument(
skip_all,
fields(
height = **block.entity.header().height(),
id = %block.entity.header().consensus().generated.application_hash
),
err
)]
async fn execute_and_commit(
executor: &E,
state: &SharedMutex,
block: SealedBlock,
) -> anyhow::Result<()>
where
E: BlockImporterPort + Send + Sync + 'static,
{
// Execute and commit the block.
let height = *block.entity.header().height();
let r = executor.execute_and_commit(block).await;
// If the block executed successfully, mark it as committed.
if r.is_ok() {
state.apply(|s| s.commit(*height));
} else {
tracing::error!("Execution of height {} failed: {:?}", *height, r);
}
r
}
/// Extra stream utilities.
trait StreamUtil: Sized {
/// Scan the stream for `None`.
fn into_scan_none(self) -> ScanNone {
ScanNone(self)
}
/// Scan the stream for errors.
fn into_scan_err(self) -> ScanErr {
ScanErr(self)
}
}
impl StreamUtil for S {}
struct ScanErr(S);
struct ScanNone(S);
impl ScanNone {
fn scan_none<'a, T: 'a>(self) -> impl Stream- + 'a
where
S: Stream
- > + Send + 'a,
{
let stream = self.0.boxed::<'a>();
futures::stream::unfold((false, stream), |(_, mut stream)| async move {
let element = stream.next().await?;
element.map(|e| (e, (false, stream)))
})
}
}
impl
ScanErr {
fn scan_err<'a, T: 'a>(self) -> impl Stream- > + 'a
where
S: Stream
- > + Send + 'a,
{
let stream = self.0.boxed::<'a>();
futures::stream::unfold((false, stream), |(mut err, mut stream)| async move {
if err {
None
} else {
let batch = stream.next().await?;
err = batch.is_err();
Some((batch, (err, stream)))
}
})
}
}