Using Apache Spark with Amazon S3

This section describes how to use Apache Spark with data on Amazon S3.

Using Apache Spark with HDCloud

Before you start working with Apache Spark in HDCloud, review Accessing a Cluster and Using Apache Spark on HDCloud documentation.

Authenticating with Amazon S3

Apart from the special case of public read-only data, access to Amazon S3-hosted data requires callers to authenticate themselves to the service. For Spark applications to do this, the Spark context must be configured with the authentication details of the object store.

In Spark deployments via Hortonworks Data Cloud for AWS, these authentication details are automatically derived from information available to the VM, so you don't need to perform any configuration.

Accessing Amazon S3 Data in Spark

Amazon S3 is viewed by Spark as a filesystem, allowing Amazon S3 to be used as the source and destination of data of data: be it batch, SQL, DataFrame, or Spark Streaming. To load and save data on Amazon S3, Spark uses the same APIs that is used to load and save data in HDFS or other filesystems.

Provided the relevant libraries are on the classpath, a file in S3 can be referenced simply via a URL:

sparkContext.textFile("s3a://landsat-pds/scene_list.gz").count()

Similarly, an RDD can be saved to an object store via saveAsTextFile():

val numbers = sparkContext.parallelize(1 to 1000)
// save to Amazon S3 (or compatible implementation)
numbers.saveAsTextFile("s3a://bucket1/counts")

Learn More

Make sure to always use the full URI to refer to a bucket, always including the s3a:// prefix.

Example: DataFrames

DataFrames can read from and write to object stores using their read() and write() methods:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType

val spark = SparkSession
    .builder
    .appName("DataFrames")
    .config(sparkConf)
    .getOrCreate()
import spark.implicits._
val numRows = 1000

// generate test data
val sourceData = spark.range(0, numRows).select($"id".as("l"), $"id".cast(StringType).as("s"))

// define the destination
val dest = "s3a://bucket1/dataframes"

// write the data
val orcFile = dest + "/data.orc"
sourceData.write.format("orc").save(orcFile)

// now read it back
val orcData = spark.read.format("orc").load(orcFile)

// finally, write the data as Parquet
orcData.write.format("parquet").save(dest + "/data.parquet")

spark.stop()

Checkpointing

Checkpointing streaming data to an S3 bucket is very slow, as the stream data is (potentially) recalculated, uploaded to S3, and then renamed into the checkpoint file (the rename being a slow copy operation). If S3 is used for checkpointing, the interval between checkpoints must be long enough to allow for this slow checkpoint.

Example: Spark Streaming and Cloud Storage

Spark Streaming can monitor files added to object stores by creating a FileInputDStream DStream monitoring path under a bucket:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._

val sparkConf = new SparkConf()
val ssc = new StreamingContext(sparkConf, Milliseconds(5000))
try {
  val lines = ssc.textFileStream("s3a://bucket1/incoming")
  val matches = lines.filter(_.endsWith("3"))
  matches.print()
  ssc.start()
  ssc.awaitTermination()
} finally {
  ssc.stop(true)
}

The time to scan for new files is proportional to the number of files under the path — not the number of new files — so this can become a slow operation.

Committing Output to Amazon S3

For the reasons covered in the section Working with Amazon S3, using Amazon S3 as the direct destination of work may be slow and unreliable in the presence of failures. Therefore, we recommend that you use HDFS as the destination of work, using DistCp to copy to Amazon S3 afterwards if you wish to persist beyond the life of the cluster. HDFS has the behaviors critical to the output committer used by Spark and Hadoop MapReduce to ensure the output is correctly generated (atomic directory renames and consistent directory listings).

Improving Performance for Spark Jobs

This section includes performance tips related to Spark.

Reading Sequentially Through Files

The most effective way to scan a large file is in a single HTTPS request - which is the default behavior. If the scanning code skips parts of the file using seek(), then the performance of these forward seeks can potentially be improved by tuning the option spark.hadoop.fs.s3a.readahead.range. For example:

spark.hadoop.fs.s3a.readahead.range 512M

This option declares the number of bytes to read when seeking forwards in a file before closing and re-opening the HTTPS connection to Amazon S3. That close/reopen operation can be so slow that simply reading and discarding the data is actually faster. This is particularly true when working with remote Amazon S3 buckets of "long-haul" connections.

Reading ORC and Parquet Datasets

When reading binary ORC and Parquet datasets, you should configure Spark to use the S3A's random IO read policy, as described in Optimizing HTTP GET Requests.

With fs.s3a.experimental.input.fadvise set to random, rather than ask for a the entire file in one HTTPS request (the "normal" operation), the S3A connector only asks for part of a file at a time. If it needs to seek backwards, the remaining data in this part is discarded, and then a new request is made on the same HTTPS connection. This reduces the time wasted on closing and opening up new HTTPS connections.

This dramatically speeds up random access, but actually reduces performance on queries performing sequential reads through an entire file — so do not use random setting for such jobs.

For optimal performance when reading files saved in the Apache ORC format, read and write operations must be minimized. To achieve this, set the following options:

spark.sql.orc.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

The spark.sql.orc.filterPushdown option enables the ORC library to skip un-needed columns and to use index information to filter out parts of the file where it can be determined that no columns match the predicate.

With the spark.sql.hive.metastorePartitionPruning option enabled, predicates are pushed down into the Hive Metastore to eliminate unmatched partitions.

For optimal performance when reading files saved in the Apache Parquet format, read and write operations must be minimized, including generation of summary metadata, and coalescing metadata from multiple files. The Predicate pushdown option enables the Parquet library to skip un-needed columns, saving bandwidth. To achieve this, set the following options:

spark.hadoop.parquet.enable.summary-metadata false
spark.sql.parquet.mergeSchema false
spark.sql.parquet.filterPushdown true
spark.sql.hive.metastorePartitionPruning true

For more information on how to improve performance when working with Amazon S3, refer to Improving Amazon S3 Performance.