Logo

dev-resources.site

for different kinds of informations.

Reducing S3 API Calls by 98% | Exploring the Secrets of OpenDAL's RangeReader

Published at
1/4/2024
Categories
programming
database
api
greptime
Author
Greptime
Categories
4 categories in total
programming
open
database
open
api
open
greptime
open
Reducing S3 API Calls by 98% | Exploring the Secrets of OpenDAL's RangeReader

Preface

At GreptimeDB, we utilize OpenDAL as our unified data access layer. Recently, a colleague informed me that it took 10 seconds to execute a Copy From statement to import an 800 KiB Parquet file from S3. After some investigation and reviewing related Reader of OpenDAL documentation and its implementation (realizing we hadn't RTFSC 🥲), I document and briefly summarize our findings here.

Relevant OpenDAL source code Commit: 6980cd1

Understanding OpenDAL Source Code

Frankly speaking, it was only recently that I fully grasped the intricacies of the OpenDAL source code and its invocation relationships, after previously having only a partial understanding of it.

Starting with the Operator

All our IO operations revolve around the Operator. Let's see how the Operator is constructed. In main.rs, we first create a file-system-based Backend Builder; subsequently build it into an accessor (implementing the Accessor trait); and then pass this accessor into OperatorBuilder::new, finally calling finish.

OpenDAL unifies the behavior of different storage backends through the Accessor trait, exposing a unified IO interface to the upper layer, like create_dir, read, write, etc.

use opendal::services::Fs;
use opendal::Operator;

#[tokio::main]
async fn main() -> Result<()> {
    // Create fs backend builder.
    let mut builder = Fs::default();
    // Set the root for fs, all operations will happen under this root.
    //
    // NOTE: the root must be absolute path.
    builder.root("/tmp");

    let accessor = builder.build()?;
    let op: Operator = OperatorBuilder::new(accessor)?.finish();

    Ok(())
}

What Happens in OperatorBuilder::new

The accessor we pass in is attached with two layers when invoking new, and an additional internal Layer is added when invoking finish. With these layers added, when we invoke interfaces exposed by Operator, the invoking starts from the outermost CompleteLayer and eventually reaches the innermost FsAccessor.

FsAccessor
ErrorContextLayer
CompleteLayer
^
|
| Invoking (`read`, `reader_with`, `stat`...)
impl<A: Accessor> OperatorBuilder<A> {
    /// Create a new operator builder.
    #[allow(clippy::new_ret_no_self)]
    pub fn new(accessor: A) -> OperatorBuilder<impl Accessor> {
        // Make sure error context layer has been attached.
        OperatorBuilder { accessor }
            .layer(ErrorContextLayer)
            .layer(CompleteLayer)
    }

    ...

    /// Finish the building to construct an Operator.
    pub fn finish(self) -> Operator {
        let ob = self.layer(TypeEraseLayer);
        Operator::from_inner(Arc::new(ob.accessor) as FusedAccessor)
    }
}

TL;DR: I just want to emphasize that we should read the source code of OpenDAL starting from CompleteLayer (an epiphany).

Background Information

Let me provide some necessary context here to understand the following content.

LruCacheLayer

Currently, in query scenarios, we add a LruCacheLayer while building the Operator, so our Operator looks like the diagram below:

S3Accessor                FsAccessor
ErrorContextLayer         ErrorContextLayer
CompleteLayer             CompleteLayer
    ^                         ^  |
    |                         |  |
    |`inner`           `cache`|  |
    |                         |  |
    |                         |  |
    |                         |  |
    +----- LruCacheLayer -----+  |
                 ^               |
                 |               |
                 |               |
                 |               v
                 |               FileReader::new(oio::TokioReader<tokio::fs::File>)
                 |
                 Invoking(`reader`, `reader_with`)

For example, with the read interface, LruCacheLayer caches S3 files in the file system, returning the cached file-system-based Box<dyn oio::Read>(FileReader::new(oio::TokioReader<tokio::fs::File>)) to the upper layer; if the file to be read is not in the cache, it's first loaded in full from S3 to the local file system.

struct LruCacheLayer {
  inner: Operator, // S3Backend
  cache: Operator, // FsBackend
  index: CacheIndex
}

impl LayeredAccessor for LruCacheLayer {
  ...
  async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
        if self.index.hit(path, args) {
          // Returns `Box<dyn oio::Read>`
          self.cache.read(path, args).await 
        } else {
          // Fetches cache and stores...
        }
  }
  ...
}

The Copy From Scenario

In the Copy From scenario, I didn't add this LruCacheLayer layer. Thus, our Operator looks like the diagram below:

``plain text
S3Accessor
ErrorContextLayer
CompleteLayer
▲ │
│ │
│ │
│ ▼
│ RangeReader::new(IncomingAsyncBody)
│
Invoking (
reader,reader_with`)




## Issues Encountered with OpenDAL RangeReader

### Starting with the Construction of ParquetRecordBatchStream
In `Copy From`, after obtaining the file information(i.e., the file location on the S3), we first invoke `operator.reader` to return a `reader` implementing `AsyncReader + AsyncSeek`, then wrap it with a `BufReader`. Ultimately, this `reader` is passed into `ParquetRecordBatchStreamBuilder`.

> Here, the use of `BufReader` is superfluous because it clears its internal buffer after invoking the `seek` method, negating any potential performance benefits.



```rust
  ...
  let reader = operator
      .reader(path)
      .await
      .context(error::ReadObjectSnafu { path })?;

  let buf_reader = BufReader::new(reader.compat());

  let builder = ParquetRecordBatchStreamBuilder::new(buf_reader)
      .await
      .context(error::ReadParquetSnafu)?;

  let upstream = builder
      .build()
      .context(error::BuildParquetRecordBatchStreamSnafu)?;

  ...

Reading Metadata in ParquetRecordBatchStream::new

The metadata reading logic is as follows: first, invokes seek(SeekFrom::End(-FOOTER_SIZE_I64)), reads FOOTER_SIZE bytes and parse metadata_len; then invokes seek again, and reads metadata_len bytes to parse the metadata.

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
    fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
        const FOOTER_SIZE_I64: i64 = FOOTER_SIZE as i64;
        async move {
            self.seek(SeekFrom::End(-FOOTER_SIZE_I64)).await?;

            let mut buf = [0_u8; FOOTER_SIZE];
            self.read_exact(&mut buf).await?;

            let metadata_len = decode_footer(&buf)?;
            self.seek(SeekFrom::End(-FOOTER_SIZE_I64 - metadata_len as i64))
                .await?;

            let mut buf = Vec::with_capacity(metadata_len);
            self.take(metadata_len as _).read_to_end(&mut buf).await?;

            Ok(Arc::new(decode_metadata(&buf)?))
        }
        .boxed()
    }
}

The Real Problem

Up to this point, we've discussed some minor issues. The more challenging problem arises here, where the variable stream is the ParquetRecordBatchStream we've built above. When we invoke next, ParquetRecordBatchStream invokes RangeReader's seek and read multiple times. However, each call to seek resets RangeReader's internal state (discarding the previous byte stream) and, on the subsequent read call, initiates a new remote request (in the S3 backend scenario).

You can see detailed information in this issue and the discussion here).

When using ParquetRecordBatchStream to retrieve each column's data, it'll first invoke RangeReader seek, then read some bytes. Thus, the total number of remote calls required is the number of RowGroups multiplied by the number of columns in a RowGroup. Our 800KiB file contains 50 RowGroups and 12 columns (per RowGroup), which results in 600 S3 get requests!

        pub async fn copy_table_from(
    ...
            while let Some(r) = stream.next().await {
                let record_batch = r.context(error::ReadDfRecordBatchSnafu)?;
                let vectors =
                    Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?;

                pending_mem_size += vectors.iter().map(|v| v.memory_size()).sum::<usize>();

                let columns_values = fields
                    .iter()
                    .cloned()
                    .zip(vectors)
                    .collect::<HashMap<_, _>>();

                pending.push(self.inserter.handle_table_insert(
                    InsertRequest {
                        catalog_name: req.catalog_name.to_string(),
                        schema_name: req.schema_name.to_string(),
                        table_name: req.table_name.to_string(),
                        columns_values,
                    },
                    query_ctx.clone(),
                ));

                if pending_mem_size as u64 >= pending_mem_threshold {
                    rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
                }
            }

    ...

Explore the RangeReader Source Code

Take a look at self.poll_read()

In RangeReader, the self.state initially starts as State::Idle. Let's assume that self.offset is Some(0), then
self.state is set to State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>) and self.poll_read(cx, buf) is invoked again.

impl<A, R> oio::Read for RangeReader<A, R>
where
    A: Accessor<Reader = R>,
    R: oio::Read,
{
    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
        ...
        match &mut self.state {
            State::Idle => {
                self.state = if self.offset.is_none() {
                    // When the offset is none, it means we are performing tailing reading.
                    // We should start by getting the correct offset through a stat operation.
                    State::SendStat(self.stat_future())
                } else {
                    State::SendRead(self.read_future())
                };

                self.poll_read(cx, buf)
            }
            ...
        }
    }
}

What happens in self.read_future()

Clearly, self.read_future() returns a BoxedFuture. Within this BoxedFuture, the underlying Accessor's read method (acc.read(&path, op).await) is invoked. The Accessor can be an implementation for some storage backend. In our context, this Accessor represents an S3 storage backend. When its read interface is invoked, it establishes a TCP connection to retrieve the file data and returns a byte stream from S3's response to the upper layer.

impl<A, R> RangeReader<A, R>
where
    A: Accessor<Reader = R>,
    R: oio::Read,
{
    fn read_future(&self) -> BoxFuture<'static, Result<(RpRead, R)>> {
        let acc = self.acc.clone();
        let path = self.path.clone();

        let mut op = self.op.clone();
        if self.cur != 0 {
            op = op.into_deterministic();
        }
        op = op.with_range(self.calculate_range());

        Box::pin(async move { acc.read(&path, op).await })
    }

    ...
}

Continuing from where we left off in self.poll_read()

At this point, poll_read has not yet returned. In the previous section, self.poll_read() was invoked again with self.state being State::SendRead(BoxFuture<'static, Result<(RpRead, R)>>). The value returned by ready!(Pin::new(fut).poll(cx)) corresponds to the result of acc.read(&path, op).await from the previous section. For the S3 storage backend, remote calls happen here.

Afterward, the internal state self.poll_read is set to State::Read(r), and self.poll_read(cx, buf) is invoked once more. Upon entering self.poll_read() again, the internal state of RangeReader is set to State::Reader(R). Here, R(r) represents the byte stream of the read request's response. For the S3 storage backend, the Pin::new(r).poll_read(cx, buf) writes the byte data from the TCP buffer into the upper-level application.

impl<A, R> oio::Read for RangeReader<A, R>
where
    A: Accessor<Reader = R>,
    R: oio::Read,
{
    fn poll_read(&mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
        // Sanity check for normal cases.
        if buf.is_empty() || self.cur >= self.size.unwrap_or(u64::MAX) {
            return Poll::Ready(Ok(0));
        }

        match &mut self.state {
            ...
            State::SendRead(fut) => {
                let (rp, r) = ready!(Pin::new(fut).poll(cx)).map_err(|err| {
                    // If the read future returns an error, reset the state to Idle to retry.
                    self.state = State::Idle;
                    err
                })?;

                // Set the size if the read returns a size hint.
                if let Some(size) = rp.size() {
                    if size != 0 && self.size.is_none() {
                        self.size = Some(size + self.cur);
                    }
                }
                self.state = State::Read(r);
                self.poll_read(cx, buf)
            }
            State::Read(r) => match ready!(Pin::new(r).poll_read(cx, buf)) {
                Ok(0) => {
                    // Reset the state to Idle after all data has been consumed.
                    self.state = State::Idle;
                    Poll::Ready(Ok(0))
                }
                Ok(n) => {
                    self.cur += n as u64;
                    Poll::Ready(Ok(n))
                }
                Err(e) => {
                    self.state = State::Idle;
                    Poll::Ready(Err(e))
                }
            },
        }
    }
}

Final Look at self.poll_seek()

Remember the internal state of RangeReader we discussed earlier? Yes, it was State::Reader(R). When we call seek after a read, the byte stream inside RangeReader is discarded, and the state is reset to State::Idle. In other words, every time read is invoked after seek, RangeReader requests the read method of the underlying Accessor (acc.read(&path, op).await) to initiate a remote call. For the S3 storage backend, invoking this interface incurs significant overhead (typically around hundreds of milliseconds).

Additionally, there's a performance-related point to be considered. When attempting SeekFrom::End() and self.size is unknown, an additional stat operation is performed. After invoking self.poll_seek(), self.cur will be set to base.checked_add(amt).

Summary

  • We've implemented a quick fix that decreased the number of RowGroups imported from 50 to just 1. However, this solution still necessitates 12 remote calls. Moving forward, we plan to contribute a BufferReader to OpenDAL (details available at RFC here), which is expected to significantly reduce the number of consecutive remote calls triggered by 'seek' and 'read' operations in RangeReader. In certain cases, these calls could be entirely eliminated.

  • When invokes seek on a RangeReader, the internal state will be reset, and a subsequent read invoking results in a remote call that happens in the underlying Accessor (in scenarios where the backend is S3). (For related information, please refer to this issue and discussion links provided).

  • Both std::io::BufReader and tokio::io::BufReader clear their internal buffers after seek. If you wish to continue reading from the Buffer, you should use seek_relative.

Featured ones: