use crate::{ database::{ database_description::{ off_chain::OffChain, on_chain::OnChain, relayer::Relayer, DatabaseDescription, DatabaseMetadata, }, metadata::MetadataTable, Error as DatabaseError, }, graphql_api::storage::blocks::FuelBlockIdsToHeights, state::{ in_memory::memory_store::MemoryStore, ChangesIterator, DataSource, }, }; use fuel_core_chain_config::TableEntry; use fuel_core_services::SharedMutex; use fuel_core_storage::{ self, blueprint::BlueprintInspect, iter::{ BoxedIter, IterDirection, IterableStore, IteratorOverTable, }, kv_store::{ KVItem, KeyValueInspect, Value, }, not_found, structured_storage::TableWithBlueprint, tables::FuelBlocks, transactional::{ AtomicView, Changes, ConflictPolicy, Modifiable, StorageTransaction, }, Error as StorageError, Result as StorageResult, StorageAsMut, StorageInspect, StorageMutate, }; use fuel_core_types::{ blockchain::{ block::CompressedBlock, primitives::DaBlockHeight, }, fuel_types::BlockHeight, }; use itertools::Itertools; use std::{ fmt::Debug, sync::Arc, }; pub use fuel_core_database::Error; pub type Result = core::result::Result; // TODO: Extract `Database` and all belongs into `fuel-core-database`. #[cfg(feature = "rocksdb")] use crate::state::rocks_db::RocksDb; #[cfg(feature = "rocksdb")] use std::path::Path; // Storages implementation pub mod balances; pub mod block; pub mod coin; pub mod contracts; pub mod database_description; pub mod genesis_progress; pub mod message; pub mod metadata; pub mod sealed_block; pub mod state; pub mod storage; pub mod transactions; #[derive(Default, Debug, Copy, Clone)] pub struct GenesisStage; #[derive(Debug, Clone)] pub struct RegularStage where Description: DatabaseDescription, { /// Cached value from Metadata table, used to speed up lookups. height: SharedMutex>, } impl Default for RegularStage where Description: DatabaseDescription, { fn default() -> Self { Self { height: SharedMutex::new(None), } } } pub type GenesisDatabase = Database; #[derive(Clone, Debug)] pub struct Database> where Description: DatabaseDescription, { data: DataSource, stage: Stage, } impl Database { pub fn latest_block(&self) -> StorageResult { self.iter_all::(Some(IterDirection::Reverse)) .next() .transpose()? .map(|(_, block)| block) .ok_or_else(|| not_found!("FuelBlocks")) } } impl Database where DbDesc: DatabaseDescription, { pub fn entries<'a, T>( &'a self, prefix: Option>, direction: IterDirection, ) -> impl Iterator>> + 'a where T: TableWithBlueprint::Column> + 'a, T::Blueprint: BlueprintInspect, { self.iter_all_filtered::(prefix, None, Some(direction)) .map_ok(|(key, value)| TableEntry { key, value }) } } impl GenesisDatabase where Description: DatabaseDescription, { pub fn new(data_source: DataSource) -> Self { Self { stage: GenesisStage, data: data_source, } } } impl Database where Description: DatabaseDescription, Database: StorageInspect, Error = StorageError>, { pub fn new(data_source: DataSource) -> Self { let mut database = Self { stage: RegularStage { height: SharedMutex::new(None), }, data: data_source, }; let height = database .latest_height() .expect("Failed to get latest height during creation of the database"); database.stage.height = SharedMutex::new(height); database } #[cfg(feature = "rocksdb")] pub fn open_rocksdb(path: &Path, capacity: impl Into>) -> Result { use anyhow::Context; let db = RocksDb::::default_open(path, capacity.into()).map_err(Into::::into).with_context(|| format!("Failed to open rocksdb, you may need to wipe a pre-existing incompatible db e.g. `rm -rf {path:?}`"))?; Ok(Self::new(Arc::new(db))) } /// Converts to an unchecked database. /// Panics if the height is already set. pub fn into_genesis(self) -> GenesisDatabase { assert!( !self.stage.height.lock().is_some(), "Height is already set for `{}`", Description::name() ); GenesisDatabase::new(self.data) } } impl Database where Description: DatabaseDescription, Stage: Default, { pub fn in_memory() -> Self { let data = Arc::>::new(MemoryStore::default()); Self { data, stage: Stage::default(), } } #[cfg(feature = "rocksdb")] pub fn rocksdb_temp() -> Self { let data = Arc::>::new(RocksDb::default_open_temp(None).unwrap()); Self { data, stage: Stage::default(), } } } impl KeyValueInspect for Database where Description: DatabaseDescription, { type Column = Description::Column; fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult { self.data.as_ref().exists(key, column) } fn size_of_value( &self, key: &[u8], column: Self::Column, ) -> StorageResult> { self.data.as_ref().size_of_value(key, column) } fn get(&self, key: &[u8], column: Self::Column) -> StorageResult> { self.data.as_ref().get(key, column) } fn read( &self, key: &[u8], column: Self::Column, buf: &mut [u8], ) -> StorageResult> { self.data.as_ref().read(key, column, buf) } } impl IterableStore for Database where Description: DatabaseDescription, { fn iter_store( &self, column: Self::Column, prefix: Option<&[u8]>, start: Option<&[u8]>, direction: IterDirection, ) -> BoxedIter { self.data .as_ref() .iter_store(column, prefix, start, direction) } } /// Construct an ephemeral database /// uses rocksdb when rocksdb features are enabled /// uses in-memory when rocksdb features are disabled impl Default for Database where Description: DatabaseDescription, Stage: Default, { fn default() -> Self { #[cfg(not(feature = "rocksdb"))] { Self::in_memory() } #[cfg(feature = "rocksdb")] { Self::rocksdb_temp() } } } impl AtomicView for Database { type View = Self; type Height = BlockHeight; fn latest_height(&self) -> Option { *self.stage.height.lock() } fn view_at(&self, _: &BlockHeight) -> StorageResult { // TODO: Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451 Ok(self.latest_view()) } fn latest_view(&self) -> Self::View { // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 self.clone() } } impl AtomicView for Database { type View = Self; type Height = BlockHeight; fn latest_height(&self) -> Option { *self.stage.height.lock() } fn view_at(&self, _: &BlockHeight) -> StorageResult { // TODO: Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451 Ok(self.latest_view()) } fn latest_view(&self) -> Self::View { // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 self.clone() } } impl AtomicView for Database { type View = Self; type Height = DaBlockHeight; fn latest_height(&self) -> Option { *self.stage.height.lock() } fn view_at(&self, _: &Self::Height) -> StorageResult { Ok(self.latest_view()) } fn latest_view(&self) -> Self::View { self.clone() } } impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { commit_changes_with_height_update(self, changes, |iter| { iter.iter_all::(Some(IterDirection::Reverse)) .map(|result| result.map(|(height, _)| height)) .try_collect() }) } } impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { commit_changes_with_height_update(self, changes, |iter| { iter.iter_all::(Some(IterDirection::Reverse)) .map(|result| result.map(|(_, height)| height)) .try_collect() }) } } #[cfg(feature = "relayer")] impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { commit_changes_with_height_update(self, changes, |iter| { iter.iter_all::(Some( IterDirection::Reverse, )) .map(|result| result.map(|(height, _)| height)) .try_collect() }) } } #[cfg(not(feature = "relayer"))] impl Modifiable for Database { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { commit_changes_with_height_update(self, changes, |_| Ok(vec![])) } } impl Modifiable for GenesisDatabase { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { self.data.as_ref().commit_changes(None, changes) } } impl Modifiable for GenesisDatabase { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { self.data.as_ref().commit_changes(None, changes) } } impl Modifiable for GenesisDatabase { fn commit_changes(&mut self, changes: Changes) -> StorageResult<()> { self.data.as_ref().commit_changes(None, changes) } } trait DatabaseHeight: Sized { fn as_u64(&self) -> u64; fn advance_height(&self) -> Option; } impl DatabaseHeight for BlockHeight { fn as_u64(&self) -> u64 { let height: u32 = (*self).into(); height as u64 } fn advance_height(&self) -> Option { self.succ() } } impl DatabaseHeight for DaBlockHeight { fn as_u64(&self) -> u64 { self.0 } fn advance_height(&self) -> Option { self.0.checked_add(1).map(Into::into) } } fn commit_changes_with_height_update( database: &mut Database, changes: Changes, heights_lookup: impl Fn( &ChangesIterator, ) -> StorageResult>, ) -> StorageResult<()> where Description: DatabaseDescription, Description::Height: Debug + PartialOrd + DatabaseHeight, for<'a> StorageTransaction<&'a &'a mut Database>: StorageMutate, Error = StorageError>, { // Gets the all new heights from the `changes` let iterator = ChangesIterator::::new(&changes); let new_heights = heights_lookup(&iterator)?; // Changes for each block should be committed separately. // If we have more than one height, it means we are mixing commits // for several heights in one batch - return error in this case. if new_heights.len() > 1 { return Err(DatabaseError::MultipleHeightsInCommit { heights: new_heights.iter().map(DatabaseHeight::as_u64).collect(), } .into()); } let new_height = new_heights.into_iter().last(); let prev_height = *database.stage.height.lock(); match (prev_height, new_height) { (None, None) => { // We are inside the regenesis process if the old and new heights are not set. // In this case, we continue to commit until we discover a new height. // This height will be the start of the database. } (Some(prev_height), Some(new_height)) => { // Each new commit should be linked to the previous commit to create a monotonically growing database. let next_expected_height = prev_height .advance_height() .ok_or(DatabaseError::FailedToAdvanceHeight)?; // TODO: After https://github.com/FuelLabs/fuel-core/issues/451 // we can replace `next_expected_height > new_height` with `next_expected_height != new_height`. if next_expected_height > new_height { return Err(DatabaseError::HeightsAreNotLinked { prev_height: prev_height.as_u64(), new_height: new_height.as_u64(), } .into()); } } (None, Some(_)) => { // The new height is finally found; starting at this point, // all next commits should be linked(the height should increase each time by one). } (Some(prev_height), None) => { // In production, we shouldn't have cases where we call `commit_changes` with intermediate changes. // The commit always should contain all data for the corresponding height. return Err(DatabaseError::NewHeightIsNotSet { prev_height: prev_height.as_u64(), } .into()); } }; let updated_changes = if let Some(new_height) = new_height { // We want to update the metadata table to include a new height. // For that, we are building a new storage transaction around `changes`. // Modifying this transaction will include all required updates into the `changes`. let mut transaction = StorageTransaction::transaction( &database, ConflictPolicy::Overwrite, changes, ); transaction .storage_as_mut::>() .insert( &(), &DatabaseMetadata::V1 { version: Description::version(), height: new_height, }, )?; transaction.into_changes() } else { changes }; // Atomically commit the changes to the database, and to the mutex-protected field. let mut guard = database.stage.height.lock(); database.data.commit_changes(new_height, updated_changes)?; // Update the block height if let Some(new_height) = new_height { *guard = Some(new_height); } Ok(()) } #[cfg(feature = "rocksdb")] pub fn convert_to_rocksdb_direction(direction: IterDirection) -> rocksdb::Direction { match direction { IterDirection::Forward => rocksdb::Direction::Forward, IterDirection::Reverse => rocksdb::Direction::Reverse, } } #[cfg(test)] mod tests { use super::*; use crate::database::{ database_description::DatabaseDescription, Database, }; use fuel_core_storage::{ tables::FuelBlocks, StorageAsMut, }; fn column_keys_not_exceed_count() where Description: DatabaseDescription, { use enum_iterator::all; use fuel_core_storage::kv_store::StorageColumn; use strum::EnumCount; for column in all::() { assert!(column.as_usize() < Description::Column::COUNT); } } mod on_chain { use super::*; use crate::database::{ database_description::on_chain::OnChain, DatabaseHeight, }; use fuel_core_storage::{ tables::Coins, transactional::WriteTransaction, }; use fuel_core_types::{ blockchain::block::CompressedBlock, entities::coins::coin::CompressedCoin, fuel_tx::UtxoId, }; #[test] fn column_keys_not_exceed_count_test() { column_keys_not_exceed_count::(); } #[test] fn database_advances_with_a_new_block() { // Given let mut database = Database::::default(); assert_eq!(database.latest_height().unwrap(), None); // When let advanced_height = 1.into(); database .storage_as_mut::() .insert(&advanced_height, &CompressedBlock::default()) .unwrap(); // Then assert_eq!(database.latest_height().unwrap(), Some(advanced_height)); } #[test] fn database_not_advances_without_block() { // Given let mut database = Database::::default(); assert_eq!(database.latest_height().unwrap(), None); // When database .storage_as_mut::() .insert(&UtxoId::default(), &CompressedCoin::default()) .unwrap(); // Then assert_eq!(AtomicView::latest_height(&database), None); } #[test] fn database_advances_with_linked_blocks() { // Given let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&starting_height, &CompressedBlock::default()) .unwrap(); assert_eq!(database.latest_height().unwrap(), Some(starting_height)); // When let next_height = starting_height.advance_height().unwrap(); database .storage_as_mut::() .insert(&next_height, &CompressedBlock::default()) .unwrap(); // Then assert_eq!(database.latest_height().unwrap(), Some(next_height)); } #[test] fn database_fails_with_unlinked_blocks() { // Given let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&starting_height, &CompressedBlock::default()) .unwrap(); // When let prev_height = 0.into(); let result = database .storage_as_mut::() .insert(&prev_height, &CompressedBlock::default()); // Then assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::HeightsAreNotLinked { prev_height: 1, new_height: 0 }) .to_string() ); } #[test] fn database_fails_with_non_advancing_commit() { // Given let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&starting_height, &CompressedBlock::default()) .unwrap(); // When let result = database .storage_as_mut::() .insert(&UtxoId::default(), &CompressedCoin::default()); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::NewHeightIsNotSet { prev_height: 1 }) .to_string() ); } #[test] fn database_fails_when_commit_with_several_blocks() { let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&starting_height, &CompressedBlock::default()) .unwrap(); // Given let mut transaction = database.write_transaction(); let next_height = starting_height.advance_height().unwrap(); let next_next_height = next_height.advance_height().unwrap(); transaction .storage_as_mut::() .insert(&next_height, &CompressedBlock::default()) .unwrap(); transaction .storage_as_mut::() .insert(&next_next_height, &CompressedBlock::default()) .unwrap(); // When let result = transaction.commit(); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::MultipleHeightsInCommit { heights: vec![3, 2] }) .to_string() ); } } mod off_chain { use super::*; use crate::{ database::{ database_description::off_chain::OffChain, DatabaseHeight, }, fuel_core_graphql_api::storage::messages::OwnedMessageKey, graphql_api::storage::messages::OwnedMessageIds, }; use fuel_core_storage::transactional::WriteTransaction; #[test] fn column_keys_not_exceed_count_test() { column_keys_not_exceed_count::(); } #[test] fn database_advances_with_a_new_block() { // Given let mut database = Database::::default(); assert_eq!(database.latest_height().unwrap(), None); // When let advanced_height = 1.into(); database .storage_as_mut::() .insert(&Default::default(), &advanced_height) .unwrap(); // Then assert_eq!(database.latest_height().unwrap(), Some(advanced_height)); } #[test] fn database_not_advances_without_block() { // Given let mut database = Database::::default(); assert_eq!(database.latest_height().unwrap(), None); // When database .storage_as_mut::() .insert(&OwnedMessageKey::default(), &()) .unwrap(); // Then assert_eq!(AtomicView::latest_height(&database), None); } #[test] fn database_advances_with_linked_blocks() { // Given let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&Default::default(), &starting_height) .unwrap(); assert_eq!(database.latest_height().unwrap(), Some(starting_height)); // When let next_height = starting_height.advance_height().unwrap(); database .storage_as_mut::() .insert(&Default::default(), &next_height) .unwrap(); // Then assert_eq!(database.latest_height().unwrap(), Some(next_height)); } #[test] fn database_fails_with_unlinked_blocks() { // Given let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&Default::default(), &starting_height) .unwrap(); // When let prev_height = 0.into(); let result = database .storage_as_mut::() .insert(&Default::default(), &prev_height); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::HeightsAreNotLinked { prev_height: 1, new_height: 0 }) .to_string() ); } #[test] fn database_fails_with_non_advancing_commit() { // Given let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&Default::default(), &starting_height) .unwrap(); // When let result = database .storage_as_mut::() .insert(&OwnedMessageKey::default(), &()); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::NewHeightIsNotSet { prev_height: 1 }) .to_string() ); } #[test] fn database_fails_when_commit_with_several_blocks() { let mut database = Database::::default(); let starting_height = 1.into(); database .storage_as_mut::() .insert(&Default::default(), &starting_height) .unwrap(); // Given let mut transaction = database.write_transaction(); let next_height = starting_height.advance_height().unwrap(); let next_next_height = next_height.advance_height().unwrap(); transaction .storage_as_mut::() .insert(&[1; 32].into(), &next_height) .unwrap(); transaction .storage_as_mut::() .insert(&[2; 32].into(), &next_next_height) .unwrap(); // When let result = transaction.commit(); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::MultipleHeightsInCommit { heights: vec![3, 2] }) .to_string() ); } } #[cfg(feature = "relayer")] mod relayer { use super::*; use crate::database::{ database_description::relayer::Relayer, DatabaseHeight, }; use fuel_core_relayer::storage::EventsHistory; use fuel_core_storage::transactional::WriteTransaction; #[test] fn column_keys_not_exceed_count_test() { column_keys_not_exceed_count::(); } #[test] fn database_advances_with_a_new_block() { // Given let mut database = Database::::default(); assert_eq!(database.latest_height().unwrap(), None); // When let advanced_height = 1u64.into(); database .storage_as_mut::() .insert(&advanced_height, &[]) .unwrap(); // Then assert_eq!(database.latest_height().unwrap(), Some(advanced_height)); } #[test] fn database_not_advances_without_block() { // Given let mut database = Database::::default(); assert_eq!(database.latest_height().unwrap(), None); // When database .storage_as_mut::>() .insert( &(), &DatabaseMetadata::::V1 { version: Default::default(), height: Default::default(), }, ) .unwrap(); // Then assert_eq!(AtomicView::latest_height(&database), None); } #[test] fn database_advances_with_linked_blocks() { // Given let mut database = Database::::default(); let starting_height = 1u64.into(); database .storage_as_mut::() .insert(&starting_height, &[]) .unwrap(); assert_eq!(database.latest_height().unwrap(), Some(starting_height)); // When let next_height = starting_height.advance_height().unwrap(); database .storage_as_mut::() .insert(&next_height, &[]) .unwrap(); // Then assert_eq!(database.latest_height().unwrap(), Some(next_height)); } #[test] fn database_fails_with_unlinked_blocks() { // Given let mut database = Database::::default(); let starting_height = 1u64.into(); database .storage_as_mut::() .insert(&starting_height, &[]) .unwrap(); // When let prev_height = 0u64.into(); let result = database .storage_as_mut::() .insert(&prev_height, &[]); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::HeightsAreNotLinked { prev_height: 1, new_height: 0 }) .to_string() ); } #[test] fn database_fails_with_non_advancing_commit() { // Given let mut database = Database::::default(); let starting_height = 1u64.into(); database .storage_as_mut::() .insert(&starting_height, &[]) .unwrap(); // When let result = database.storage_as_mut::>().insert( &(), &DatabaseMetadata::::V1 { version: Default::default(), height: Default::default(), }, ); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::NewHeightIsNotSet { prev_height: 1 }) .to_string() ); } #[test] fn database_fails_when_commit_with_several_blocks() { let mut database = Database::::default(); let starting_height = 1u64.into(); database .storage_as_mut::() .insert(&starting_height, &[]) .unwrap(); // Given let mut transaction = database.write_transaction(); let next_height = starting_height.advance_height().unwrap(); let next_next_height = next_height.advance_height().unwrap(); transaction .storage_as_mut::() .insert(&next_height, &[]) .unwrap(); transaction .storage_as_mut::() .insert(&next_next_height, &[]) .unwrap(); // When let result = transaction.commit(); // Then assert!(result.is_err()); assert_eq!( result.unwrap_err().to_string(), StorageError::from(DatabaseError::MultipleHeightsInCommit { heights: vec![3, 2] }) .to_string() ); } } #[cfg(feature = "rocksdb")] #[test] fn database_iter_all_by_prefix_works() { use fuel_core_storage::tables::ContractsRawCode; use fuel_core_types::fuel_types::ContractId; use std::str::FromStr; let test = |mut db: Database| { let contract_id_1 = ContractId::from_str( "5962be5ebddc516cb4ed7d7e76365f59e0d231ac25b53f262119edf76564aab4", ) .unwrap(); let mut insert_empty_code = |id| { StorageMutate::::insert(&mut db, &id, &[]).unwrap() }; insert_empty_code(contract_id_1); let contract_id_2 = ContractId::from_str( "5baf0dcae7c114f647f6e71f1723f59bcfc14ecb28071e74895d97b14873c5dc", ) .unwrap(); insert_empty_code(contract_id_2); let matched_keys: Vec<_> = db .iter_all_by_prefix::(Some(contract_id_1)) .map_ok(|(k, _)| k) .try_collect() .unwrap(); assert_eq!(matched_keys, vec![contract_id_1]); }; let temp_dir = tempfile::tempdir().unwrap(); let db = Database::::in_memory(); // in memory passes test(db); let db = Database::::open_rocksdb(temp_dir.path(), 1024 * 1024 * 1024) .unwrap(); // rocks db fails test(db); } }