use anyhow::{ anyhow, Context, }; use parquet::{ data_type::AsBytes, file::{ reader::{ ChunkReader, FileReader, }, serialized_reader::SerializedFileReader, }, record::RowAccessor, }; pub struct Decoder { data_source: SerializedFileReader, group_index: usize, } impl Decoder where R: ChunkReader + 'static, { pub fn num_groups(&self) -> usize { self.data_source.num_row_groups() } fn current_group(&self) -> anyhow::Result>> { let data = self .data_source .get_row_group(self.group_index)? .get_row_iter(None)? .map(|result| { result.map_err(|e| anyhow!(e)).and_then(|row| { const FIELD_IDX: usize = 0; Ok(row .get_bytes(FIELD_IDX) .context("While decoding postcard bytes")? .as_bytes() .to_vec()) }) }) .collect::, _>>()?; Ok(data) } } impl Iterator for Decoder where R: ChunkReader + 'static, { type Item = anyhow::Result>>; fn next(&mut self) -> Option { if self.group_index >= self.data_source.metadata().num_row_groups() { return None; } let group = self.current_group(); self.group_index = self.group_index.saturating_add(1); Some(group) } fn nth(&mut self, n: usize) -> Option { self.group_index = self.group_index.saturating_add(n); self.next() } } impl Decoder { pub fn new(reader: R) -> anyhow::Result { Ok(Self { data_source: SerializedFileReader::new(reader)?, group_index: 0, }) } }