Accelerating Apache Spark 3.0 with GPUs and RAPIDS

Given the parallel nature of many data processing tasks, it’s only natural that the massively parallel architecture of a GPU should be able to parallelize and accelerate Apache Spark data processing queries, in the same way that a GPU accelerates deep learning (DL) in artificial intelligence (AI).

NVIDIA has worked with the Apache Spark community to implement GPU acceleration through the release of Spark 3.0 and the open source RAPIDS Accelerator for Spark. In this post, we dive into how the RAPIDS Accelerator for Apache Spark uses GPUs to:

  • Accelerate end-to-end data preparation and model training on the same Spark cluster.
  • Accelerate Spark SQL and DataFrame operations without requiring any code changes.
  • Accelerate data transfer performance across nodes (Spark shuffles).

Data preparation and model training on Spark 2.x

GPUs have been responsible for the advancement of DL and machine learning (ML) model training in the past several years. However, 80% of a data scientist’s time is spent on data preprocessing.

Preparing a data set for ML requires understanding the data set, cleaning and manipulating data types and formats, and extracting features for the learning algorithm. These tasks are grouped under the term ETL (extract, transform, load). ETL is often an iterative, exploratory process.

As ML and DL are increasingly applied to larger datasets, Spark has become a commonly used vehicle for the data preprocessing and feature engineering needed to prepare raw input data for the learning phase. Because Spark 2.x has no knowledge about GPUs, data scientists and engineers perform the ETL on CPUs, then send the data over to GPUs for model training. That’s where the performance really is. As data sets grow, the interactivity of this process suffers.

The diagram shows separate clusters for data preparation and model training with Spark 2.x.
Figure 1. In Spark 2.x, separate clusters were needed for ETL on CPUs, and model training on GPUs.

Accelerated end-to-end data analytics and ML pipelines

The Apache Spark community has been focused on bringing both phases of this end-to-end pipeline together, so that data scientists can work with a single Spark cluster and avoid the penalty of moving data between phases.

Apache Spark 3.0 represents a key milestone, as Spark can now schedule GPU-accelerated ML and DL applications on Spark clusters with GPUs, removing bottlenecks, increasing performance, and simplifying clusters.

The diagram shows Spark 3.0 data preparation and model training on a GPU-powered cluster.
Figure 2. In Apache Spark 3.0, you can now have a single pipeline, from data ingest to data preparation to model training on a GPU powered cluster.

Figure 3 shows the complete stack for this accelerated data science.

The diagram shows accelerated Spark components and ML layered on top of RAPIDS and a GPU-accelerated infrastructure.
Figure 3. Apache Spark accelerated end-to-end AI platform stack.

Figure 4 shows a measure of data preprocessing time improvement for Spark on GPUs. The Criteo Terabyte click logs public dataset, one of the largest public datasets for recommendation tasks, was used to demonstrate the efficiency of a GPU-optimized DLRM training pipeline. With eight V100 32-GB GPUs, processing time was sped up by a factor of up to 43X compared to an equivalent Spark-CPU pipeline.

The diagram shows improved performance on GPU, up to 43X on 8xV100 32 GB with no frequency cap.
Figure 4. Spark performance improvement on GPU vs CPU. CPU model: AWS r5d.24xl, 96 cores, 768 GB RAM. Bars represent speedup factor for GPU vs. CPU. The higher, the better.

Next, we cover the key advancements in Apache Spark 3.0 that contribute to transparent GPU acceleration:

  • The new RAPIDS Accelerator for Apache Spark 3.0
  • RAPIDS-accelerated Spark SQL/DataFrame and shuffle operations
  • GPU-aware scheduling in Spark

RAPIDS Accelerator for Apache Spark

RAPIDS is a suite of open-source software libraries and APIs for executing end-to-end data science and analytics pipelines entirely on GPUs, allowing for a substantial speed up, particularly on large data sets. Built on top of NVIDIA CUDA and UCX, the RAPIDS Accelerator for Apache Spark enables applications to take advantage of GPU parallelism and high-bandwidth memory speed with no code changes, through the Spark SQL and DataFrame APIs and a new Spark shuffle implementation.

The diagram shows Spark SQL/DataFrame and Spark shuffle components built on the RAPIDS Accelerator for Apache Spark, which is built on the RAPIDS, CUDA, and UCX libraries.
Figure 5. RAPIDS Accelerator for the Apache Spark technology stack.

RAPIDS-Accelerated Spark SQL and DataFrame

RAPIDS offers a powerful GPU DataFrame based on Apache Arrow data structures. Arrow specifies a standardized, language-independent, columnar memory format, optimized for data locality, to accelerate analytical processing performance on modern CPUs or GPUs. With the GPU DataFrame, batches of column values from multiple records take advantage of modern GPU designs and accelerate reading, queries, and writing.

The diagram shows arrow columnar memory layout grouping columns for parallel CPU or GPU processing.
Figure 6. Columnar memory layout accelerates analytical processing performance on modern CPUs and GPUs.

For Apache Spark 3.0, new RAPIDS APIs are used by Spark SQL and DataFrames for GPU-accelerated memory-efficient columnar data processing and query plans. When a Spark query executes, it goes through the following steps:

  • Creating a logical plan
  • Transforming the logical plan to a physical plan by the Catalyst query optimizer
  • Generating code
  • Executing the tasks on a cluster

With the RAPIDS accelerator, the Catalyst query optimizer has been modified to identify operators within a query plan that can be accelerated with the RAPIDS API, mostly a one-to-one mapping. It also schedules those operators on GPUs within the Spark cluster when executing the query plan.

With a physical plan for CPUs, the DataFrame data is transformed into RDD row format and usually processed one row at a time. Spark supports columnar batch, but in Spark 2.x only the Vectorized Parquet and ORC readers use it. The RAPIDS plugin extends columnar batch processing on GPUs to most Spark operations.

The diagram shows a query execution flow from a DataFrame to a logical plan to a GPU physical plan to columnar batch execution.
Figure 7. Accelerated Spark SQL query execution plan flow.

RAPIDS-accelerated Spark shuffles 

Spark operations that sort, group, or join data by value must move data between partitions, when creating a new DataFrame from an existing one between stages, in a process called a shuffle.

The diagram shows that data is moved between partitions when creating a new DataFrame from an existing one between stages.
Figure 8. Example of a Spark shuffle. Data is grouped by value and exchanged between partitions (white rectangles) when creating a new DataFrame (green rectangles) from an existing one between stages.

Shuffles first write data to local disk and then transfer data across the network to partitions on other CPUs or GPUs. Shuffles are expensive in terms of CPU, RAM, disk, network, and PCI-e bus traffic as they involve disk I/O, data serialization, and network I/O.   

The diagram shows data movement to local storage, network, and another GPU using the PCI-e bus and CPU.
Figure 9. Shuffles involve disk I/O, data serialization, and network I/O which causes CPU, RAM, disk, network, and PCI-e bus traffic and congestion.

The new Spark shuffle implementation is built upon the GPU-accelerated Unified Communication X (UCX) library to dramatically optimize the data transfer between Spark processes. UCX exposes a set of abstract communication primitives which utilize the best of available hardware resources and offloads, including RDMA, TCP, GPUs, shared memory, and network atomic operations.

In the new shuffle process, as much data as possible is first cached on the GPU. This means no shuffling of data for the next task on that GPU. Next, if GPUs are on the same node and connected with NVIDIA NVLink high-speed interconnect, data is transferred at 300 GB/s. If GPUs are on different nodes, RDMA allows GPUs to communicate directly with each other, across nodes, at up to 100 Gb/s. Each of these cases avoids traffic on the PCI-e bus and CPU.

The diagram shows data movement to local storage, network, and another GPU using GPU direct storage, RDMA, and NVLink.
Figure 10. The new Spark shuffle implementation avoids traffic on the PCI-e bus and CPU, with UCX.

If the shuffle data cannot all be cached locally, it is first pushed to host memory and then spilled to disk when that is exhausted. Fetching data from host memory avoids PCI bus traffic by using RDMA transfer. 

The diagram shows data movement to local storage, network, and another GPU using host memory, RDMA, and less CPU and PCI-e bus.
Figure 11. For shuffles that spill to disk, the new Spark shuffle implementation avoids PCI bus traffic by using host memory and RDMA.

Accelerated shuffle results

Figure 12 shows a measure of how much faster the new Spark shuffle implementation runs, with an inventory pricing query running at 10 terabyte scale with 192 CPU cores and 32 GPUs. The standard Spark-CPU shuffle took 228 seconds compared to 8.4 seconds for the new shuffle using GPUs with UCX.

The diagram shows CPU query took 228 seconds,  GPUs+UCX query took 8.4 seconds.
Figure 12. Spark Query performance improvement on GPU vs CPU. Bars represent query duration for GPU vs. CPU. The lower, the better.

Figure 13 shows the results of an ETL query shuffling 800 GB of data that spills to disk. Using GPUs with UCX took 79 seconds compared to 1,555 for CPUs. 

The diagram shows CPU query took 1,556 seconds,  GPUs+UCX query took 79 seconds.
Figure 13. Spark Query performance improvement on GPU vs CPU. Bars represent query duration for GPU vs. CPU. The lower, the better.

GPU-aware scheduling in Spark

GPUs are now a schedulable resource in Apache Spark 3.0. This allows Spark to schedule executors with a specified number of GPUs, and you can specify how many GPUs each task requires. Spark conveys these resource requests to the underlying cluster manager, Kubernetes, YARN, or standalone. You can also configure a discovery script to detect which GPUs were assigned by the cluster manager. This greatly simplifies running ML applications that need GPUs, as you previously had to work around the lack of GPU scheduling in Spark applications.

Figure 14 shows an example of a flow for GPU scheduling. The user submits an application with a GPU resource configuration discovery script. Spark starts the driver, which uses the configuration to pass on to the cluster manager, to request a container with a specified amount of resources and GPUs. The cluster manager returns the container. Spark launches the container. When the executor starts, it runs the discovery script. Spark sends that information back to the driver and the driver can then use that information to schedule tasks to GPUs.

The diagram shows GPU scheduling flow from the Spark driver to the cluster manager, to executor launch, GPU assignment, and task launch.
Figure 14. Example of a flow for Spark GPU scheduling.

Next steps

In this post, we discussed how the new RAPIDS Accelerator for Apache Spark enables GPU acceleration of end-to-end data analytic pipelines, Spark SQL operations, and Spark shuffle operations.

No Comments