{
/// Receives signals when the relayer reaches consistency with the DA layer.
synced: Synced,
start_da_block_height: DaBlockHeight,
database: D,
}
/// Not initialized version of the [`Task`].
pub struct NotInitializedTask {
/// Sends signals when the relayer reaches consistency with the DA layer.
synced: NotifySynced,
/// The node that communicates with Ethereum.
eth_node: P,
/// The fuel database.
database: D,
/// Configuration settings.
config: Config,
/// Retry on error
retry_on_error: bool,
}
/// The actual relayer background task that syncs with the DA layer.
pub struct Task
{
/// Sends signals when the relayer reaches consistency with the DA layer.
synced: NotifySynced,
/// The node that communicates with Ethereum.
eth_node: P,
/// The fuel database.
database: D,
/// Configuration settings.
config: Config,
/// The watcher used to track the state of the service. If the service stops,
/// the task will stop synchronization.
shutdown: StateWatcher,
/// Retry on error
retry_on_error: bool,
}
impl
NotInitializedTask
{
/// Create a new relayer task.
fn new(eth_node: P, database: D, config: Config, retry_on_error: bool) -> Self {
let (synced, _) = watch::channel(None);
Self {
synced,
eth_node,
database,
config,
retry_on_error,
}
}
}
#[async_trait]
impl
RelayerData for Task
where
P: Middleware + 'static,
D: RelayerDb + 'static,
{
async fn wait_if_eth_syncing(&self) -> anyhow::Result<()> {
let mut shutdown = self.shutdown.clone();
tokio::select! {
biased;
_ = shutdown.while_started() => {
Err(anyhow::anyhow!("The relayer got a stop signal"))
},
result = syncing::wait_if_eth_syncing(
&self.eth_node,
self.config.syncing_call_frequency,
self.config.syncing_log_frequency,
) => {
result
}
}
}
async fn download_logs(
&mut self,
eth_sync_gap: &state::EthSyncGap,
) -> anyhow::Result<()> {
let logs = download_logs(
eth_sync_gap,
self.config.eth_v2_listening_contracts.clone(),
&self.eth_node,
self.config.log_page_size,
);
let logs = logs.take_until(self.shutdown.while_started());
write_logs(&mut self.database, logs).await
}
fn update_synced(&self, state: &state::EthState) {
self.synced.send_if_modified(|last_state| {
if let Some(val) = state.is_synced_at() {
*last_state = Some(DaBlockHeight::from(val));
true
} else {
false
}
});
}
}
#[async_trait]
impl RunnableService for NotInitializedTask
where
P: Middleware + 'static,
D: RelayerDb + Clone + 'static,
{
const NAME: &'static str = "Relayer";
type SharedData = SharedState;
type Task = Task;
type TaskParams = ();
fn shared_data(&self) -> Self::SharedData {
let synced = self.synced.subscribe();
SharedState {
synced,
start_da_block_height: self.config.da_deploy_height,
database: self.database.clone(),
}
}
async fn into_task(
mut self,
watcher: &StateWatcher,
_: Self::TaskParams,
) -> anyhow::Result {
let shutdown = watcher.clone();
let NotInitializedTask {
synced,
eth_node,
database,
config,
retry_on_error,
} = self;
let task = Task {
synced,
eth_node,
database,
config,
shutdown,
retry_on_error,
};
Ok(task)
}
}
#[async_trait]
impl RunnableTask for Task
where
P: Middleware + 'static,
D: RelayerDb + 'static,
{
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result {
let now = tokio::time::Instant::now();
let result = run::run(self).await;
if self.shutdown.borrow_and_update().started()
&& (result.is_err() | self.synced.borrow().is_some())
{
// Sleep the loop so the da node is not spammed.
tokio::time::sleep(
self.config
.sync_minimum_duration
.saturating_sub(now.elapsed()),
)
.await;
}
if let Err(err) = result {
if !self.retry_on_error {
tracing::error!("Exiting due to Error in relayer task: {:?}", err);
let should_continue = false;
Ok(should_continue)
} else {
Err(err)
}
} else {
let should_continue = true;
Ok(should_continue)
}
}
async fn shutdown(self) -> anyhow::Result<()> {
// Nothing to shut down because we don't have any temporary state that should be dumped,
// and we don't spawn any sub-tasks that we need to finish or await.
Ok(())
}
}
impl SharedState {
/// Wait for the `Task` to be in sync with
/// the data availability layer.
///
/// Yields until the relayer reaches a point where it
/// considered up to date. Note that there's no guarantee
/// the relayer will ever catch up to the da layer and
/// may fall behind immediately after this future completes.
///
/// The only guarantee is that if this future completes then
/// the relayer did reach consistency with the da layer for
/// some period of time.
pub async fn await_synced(&self) -> anyhow::Result<()> {
let mut rx = self.synced.clone();
if rx.borrow_and_update().deref().is_none() {
rx.changed().await?;
}
Ok(())
}
/// Wait until at least the given height is synced.
pub async fn await_at_least_synced(
&self,
height: &DaBlockHeight,
) -> anyhow::Result<()> {
let mut rx = self.synced.clone();
while rx.borrow_and_update().deref().map_or(true, |h| h < *height) {
rx.changed().await?;
}
Ok(())
}
/// Get finalized da height that represents last block from da layer that got finalized.
/// Panics if height is not set as of initialization of the relayer.
pub fn get_finalized_da_height(&self) -> DaBlockHeight
where
D: RelayerDb + 'static,
{
self.database
.get_finalized_da_height()
.unwrap_or(self.start_da_block_height)
}
/// Getter for database field
pub fn database(&self) -> &D {
&self.database
}
}
#[async_trait]
impl state::EthRemote for Task
where
P: Middleware,
D: RelayerDb + 'static,
{
async fn finalized(&self) -> anyhow::Result {
let mut shutdown = self.shutdown.clone();
tokio::select! {
biased;
_ = shutdown.while_started() => {
Err(anyhow::anyhow!("The relayer got a stop signal"))
},
block = self.eth_node.get_block(ethers_core::types::BlockNumber::Finalized) => {
let block_number = block.map_err(anyhow::Error::msg)?
.and_then(|block| block.number)
.ok_or(anyhow::anyhow!("Block pending"))?
.as_u64();
Ok(block_number)
}
}
}
}
#[async_trait]
impl EthLocal for Task
where
P: Middleware,
D: RelayerDb + 'static,
{
fn observed(&self) -> Option {
Some(
self.database
.get_finalized_da_height()
.map(|h| h.into())
.unwrap_or(self.config.da_deploy_height.0),
)
}
}
/// Creates an instance of runnable relayer service.
pub fn new_service(database: D, config: Config) -> anyhow::Result>
where
D: RelayerDb + Clone + 'static,
{
let url = config.relayer.clone().ok_or_else(|| {
anyhow::anyhow!(
"Tried to start Relayer without setting an eth_client in the config"
)
})?;
// TODO: Does this handle https?
let http = Http::new(url);
let eth_node = Provider::new(http);
let retry_on_error = true;
Ok(new_service_internal(
eth_node,
database,
config,
retry_on_error,
))
}
#[cfg(any(test, feature = "test-helpers"))]
/// Start a test relayer.
pub fn new_service_test(
eth_node: P,
database: D,
config: Config,
) -> CustomizableService
where
P: Middleware + 'static,
D: RelayerDb + Clone + 'static,
{
let retry_on_fail = false;
new_service_internal(eth_node, database, config, retry_on_fail)
}
fn new_service_internal(
eth_node: P,
database: D,
config: Config,
retry_on_error: bool,
) -> CustomizableService
where
P: Middleware + 'static,
D: RelayerDb + Clone + 'static,
{
let task = NotInitializedTask::new(eth_node, database, config, retry_on_error);
CustomizableService::new(task)
}