Improving Amazon S3 Performance

In general, reading and writing data is slower with Amazon S3 than with HDFS, even when virtual clusters are running on Amazon EC2 infrastructure in the same datacenter where the Amazon S3 buckets are located. This is due to the following reasons.

On Amazon S3, renaming is a very expensive copy operation. The slow performance of the rename surfaces during the commit phase of work, including:

Because the write only begins on a close() operation, it may be in the final phase of a process that the write starts. This can take so long that some processes may time out.

Performance problems may also occur due to the size of the HTTP requests made.

This section offers tips on how to improve performance when accessing data in Amazon S3 buckets.

Note

You can set configuration properties using the Custom Properties option available in the cluster create form in the cloud controller UI or using a corresponding option in the CLI. Refer to Setting Configuration Properties.

Improving Container Allocation Performance

As AWS does not have the concept of rack locality, set yarn.scheduler.capacity.node-locality-delay = 0 to have faster container launches when using the Capacity Scheduler. For more information, refer to the Apache documentation.

Optimizing HTTP GET Requests

Warning

The following feature is experimental and its behavior may change in the future.

Experimental fadvise Policy

The S3A filesystem client supports the notion of input policies, similar to that of the POSIX fadvise() API call. This tunes the behavior of the S3A client to optimize HTTP GET requests for various use cases. To optimize HTTP GET requests, you can take advantage of the S3A experimental input policy fs.s3a.experimental.input.fadvise:

PolicyDescription
"sequential" (default)

Read through the file, possibly with some short forward seeks.

The whole document is requested in a single HTTP request; forward seeks within the readahead range are supported by skipping over the intermediate data.

This leads to maximum read throughput, but with very expensive backward seeks.

"normal"

This is currently the same as "sequential".

"random"

Optimized for random IO, specifically the Hadoop `PositionedReadable` operations — though `seek(offset); read(byte_buffer)` also benefits.

Rather than ask for the whole file, the range of the HTTP request is set to that of the length of data desired in the `read` operation - rounded up to the readahead value set in `setReadahead()` if necessary.

By reducing the cost of closing existing HTTP requests, this is highly efficient for file IO accessing a binary file through a series of `PositionedReadable.read()` and `PositionedReadable.readFully()` calls. Sequential reading of a file is expensive, as now many HTTP requests must be made to read through the file.

For operations simply reading through a file (copying, DistCp, reading gzip or other compressed formats, parsing .csv files, and so on) the sequential policy is appropriate. This is the default, so you don't need to configure it.

For the specific case of high-performance random access IO (for example, accessing ORC files), you may consider using the random policy in the following circumstances:

Setting Experimental fadvise Policy

You must set the desired fadvise policy in the configuration option fs.s3a.experimental.input.fadvise when the filesystem instance is created. It can only be set on a per-filesystem basis, not on a per-file-read basis. You can set it in core-site.xml:

<property>
  <name>fs.s3a.experimental.input.fadvise</name>
  <value>random</value>
  <description>Policy for reading files.
   Values: 'random', 'sequential' or 'normal'
   </description>
</property>

Or, you can set it in the spark-defaults.conf configuration of Spark:

spark.hadoop.fs.s3a.experimental.input.fadvise random

Be aware that this random access performance comes at the expense of sequential IO — which includes reading files compressed with gzip.

Writing Data with S3A Fast Upload

Warning

This new feature is still stabilizing. These tuning recommendations are experimental and may change in the future.

Because of the nature of the S3 object store, data written to an S3A OutputStream is not written incrementally — instead, by default, it is buffered to disk until the stream is closed in its close() method. This can make output slow because:

In summary, the further the process is from the S3 endpoint, or the smaller the EC2 VM is, the longer it will take complete the work. This can create problems in application code:

Work to address this has been ongoing, through "S3A Fast upload", which is now considered ready for use, though the exact configuration options my change.

Features of "S3A Fast Upload"

Features of "S3A Fast Upload" include:

With incremental writes of blocks, "S3A fast upload" offers an upload time at least as fast as the "classic" mechanism, with significant benefits on long-lived output streams and when very large amounts of data are generated. The in memory buffering mechanism may also offer speedup when running adjacent to S3 endpoints, as disks are not used for intermediate data storage.

Enabling S3A Fast Upload

To enable the fast upload mechanism, set the fs.s3a.fast.upload property to true:

<property>
  <name>fs.s3a.fast.upload</name>
  <value>true</value>
  <description>
    Use the incremental block upload mechanism with
    the buffering mechanism set in fs.s3a.fast.upload.buffer.
    The number of threads performing uploads in the filesystem is defined
    by fs.s3a.threads.max; the queue of waiting uploads limited by
    fs.s3a.max.total.tasks.
    The size of each buffer is set by fs.s3a.multipart.size.
  </description>
</property>

Core Configuration Options

The following major configuration options are available for the "S3A fast upload":

<property>
  <name>fs.s3a.fast.upload.buffer</name>
  <value>disk</value>
  <description>
    The buffering mechanism to use when using S3A fast upload
    (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
    This configuration option has no effect if fs.s3a.fast.upload is false.

    "disk" will use the directories listed in fs.s3a.buffer.dir as
    the location(s) to save data prior to being uploaded.

    "array" uses arrays in the JVM heap

    "bytebuffer" uses off-heap memory within the JVM.

    Both "array" and "bytebuffer" will consume memory in a single stream up to the number
    of blocks set by: fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.

    If using either of these mechanisms, keep this value low

    The total number of threads performing work across all threads is set by
    fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
    work items.
  </description>
</property>

<property>
  <name>fs.s3a.multipart.size</name>
  <value>100M</value>
  <description>How big (in bytes) to split upload or copy operations up into.
    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
  </description>
</property>

<property>
  <name>fs.s3a.fast.upload.active.blocks</name>
  <value>8</value>
  <description>
    Maximum Number of blocks a single output stream can have
    active (uploading, or queued to the central FileSystem
    instance's pool of queued operations.

    This stops a single stream overloading the shared thread pool.
  </description>
</property>

Note that:

Fast Upload with Disk Buffers

When fs.s3a.fast.upload.buffer is set to disk, all data is buffered to local hard disks prior to upload. This minimizes the amount of memory consumed, and so eliminates heap size as the limiting factor in queued uploads — exactly as the original "direct to disk" buffering used when fs.s3a.fast.upload=false.

<property>
  <name>fs.s3a.fast.upload</name>
  <value>true</value>
</property>

<property>
  <name>fs.s3a.fast.upload.buffer</name>
  <value>disk</value>
</property>

<property>
  <name>fs.s3a.buffer.dir</name>
  <value></value>
  <description>Comma separated list of temporary directories use for
  storing blocks of data prior to their being uploaded to S3.
  When unset, the Hadoop temporary directory hadoop.tmp.dir is used</description>
</property>

This is the default buffer mechanism. The amount of data which can be buffered is limited by the amount of available disk space.

Fast Upload with ByteBuffers

When fs.s3a.fast.upload.buffer is set to bytebuffer, all data is buffered in "direct" ByteBuffers prior to upload. This may be faster than buffering to disk, and, if disk space is small (for example, tiny EC2 VMs), there may not be much disk space to buffer with.

The ByteBuffers are created in the memory of the JVM, but not in the Java Heap itself. The amount of data which can be buffered is limited by the Java runtime, the operating system, and, for YARN applications, the amount of memory requested for each container.

The slower the upload bandwidth to S3, the greater the risk of running out of memory —and so the more care is needed in tuning the upload thread settings to reduce the maximum amount of data which can be buffered awaiting upload (see below).

<property>
  <name>fs.s3a.fast.upload</name>
  <value>true</value>
</property>

<property>
  <name>fs.s3a.fast.upload.buffer</name>
  <value>bytebuffer</value>
</property>

Fast Upload with Arrays

When fs.s3a.fast.upload.buffer is set to array, all data is buffered in byte arrays in the JVM's heap prior to upload. This may be faster than buffering to disk.

The amount of data which can be buffered is limited by the available size of the JVM heap heap. The slower the write bandwidth to S3, the greater the risk of heap overflows. This risk can be mitigated by tuning the upload thread settings (see below).

<property>
  <name>fs.s3a.fast.upload</name>
  <value>true</value>
</property>

<property>
  <name>fs.s3a.fast.upload.buffer</name>
  <value>array</value>
</property>

S3A Fast Upload Thread Tuning

Both the array and byte buffer buffer mechanisms can consume very large amounts of memory, on-heap or off-heap respectively. The disk buffer mechanism does not use much memory up, but will consume hard disk capacity.

If there are many output streams being written to in a single process, the amount of memory or disk used is the multiple of all stream's active memory/disk use.

Careful tuning may be needed to reduce the risk of running out memory, especially if the data is buffered in memory. There are a number parameters which can be tuned:

  1. The total number of threads available in the filesystem for data uploads or any other queued filesystem operation. This is set in fs.s3a.threads.max

  2. The number of operations which can be queued for execution, awaiting a thread: fs.s3a.max.total.tasks

  3. The number of blocks which a single output stream can have active, that is, being uploaded by a thread, or queued in the filesystem thread queue: fs.s3a.fast.upload.active.blocks

  4. How long an idle thread can stay in the thread pool before it is retired: fs.s3a.threads.keepalivetime

When the maximum allowed number of active blocks of a single stream is reached, no more blocks can be uploaded from that stream until one or more of those active blocks' uploads completes. That is: a write() call which would trigger an upload of a now full datablock will instead block until there is capacity in the queue.

In conclusion:

We recommend a low value of fs.s3a.fast.upload.active.blocks — enough to start background upload without overloading other parts of the system. Then experiment to see if higher values deliver more throughput — especially from VMs running on EC2.


<property>
  <name>fs.s3a.fast.upload.active.blocks</name>
  <value>4</value>
  <description>
    Maximum Number of blocks a single output stream can have
    active (uploading, or queued to the central FileSystem
    instance's pool of queued operations.

    This stops a single stream overloading the shared thread pool.
  </description>
</property>

<property>
  <name>fs.s3a.threads.max</name>
  <value>10</value>
  <description>The total number of threads available in the filesystem for data
    uploads *or any other queued filesystem operation*.</description>
</property>

<property>
  <name>fs.s3a.max.total.tasks</name>
  <value>5</value>
  <description>The number of operations which can be queued for execution</description>
</property>

<property>
  <name>fs.s3a.threads.keepalivetime</name>
  <value>60</value>
  <description>Number of seconds a thread can be idle before being
    terminated.</description>
</property>

Improving Load-Balancing Behavior

Amazon S3 uses a set of front-end servers to provide access to the underlying data. The decision about which front-end server to use is handled via load-balancing DNS service. When the IP address of an Amazon S3 bucket is looked up, the choice of which IP address to return to the client is made based on the the current load of the front-end servers.

Over time, the load across the front end changes, so those servers that are considered "lightly loaded" change. This means that if the DNS value is cached for any length of time, applications may end up talking to an overloaded server; or, in the case of failures, they may end up trying to talk to a server that is no longer there.

And, for historical security reasons, in the era of applets, the DNS TTL of a JVM is set to "infinity" by default.

To improve AWS load-balancing, set the DNS time-to-live of an application which works with Amazon S3 to something lower than the default. See Setting the JVM TTL for DNS Name Lookups in the AWS documentation.

More Amazon S3 Performance Tips

For more tips on how to improve performance when working with Amazon S3, refer to:

Amazon S3 Performance Checklists

The following checklists will help you optimize your environment for working with Amazon S3:

Performance: Checklist for Data

Performance: Checklist for Cluster Configs

Performance: Checklist for Code