Data Science

Accelerating GPU Analytics Using RAPIDS and Ray

RAPIDS is a suite of open-source GPU-accelerated data science and AI libraries that are well supported for scale-out with distributed engines like Spark and Dask. Ray is a popular open-source distributed Python framework commonly used to scale AI and machine learning (ML) applications. Ray particularly excels at simplifying and scaling training and inference pipelines and can easily target both CPU and GPU devices. 

In this post, we explore how Ray and RAPIDS can be used to accelerate novel analytics pipelines.

Ray Actors

While Ray offers high level abstractions for training and serving ML models, we will be experimenting with the core of Ray, specifically with Ray Actors. Actors are stateful workers, meaning that each worker can store, manage, and mutate any data stored. For example, if you want to use cuDF to load some data on GPU, you could do the following:

@ray.remote(num_gpus=1)
class cuDFActor:
    def __init__(self):
        ...

    def read_parquet(self, filepath: str, columns: list = None) -> cudf.DataFrame:
        return cudf.read_parquet(filepath, columns=columns)

# Start 4 Workers 
pool_size = 4
actor_pool = [cuDFActor.remote() for i in range(pool_size)]

This example uses Ray to create four actors on four GPUs and cuDF to accelerate IO. This example can be used with other RAPIDS optimizations (memory configuration with RMM) or common ETL routines like filtering/custom and user-defined functions: cudf ray-actor example.

Ray Actors are wonderfully general and can be quickly leveraged for parallelizing Python libraries and easily integrate with existing distributed algorithms as well. Additionally, with Ray, you can easily scale this work across multiple GPUs and multiple nodes.  

NCCL and cuGraph

Many RAPIDS implementations of popular algorithms are already built for distributed accelerated GPU computing in C++. These implementations are highly tuned and rely on accelerated communications with NCCL and primitives and solvers found in RAFT (pairwise distances, k-means clustering, iterative solvers, and more). RAFT primitives are used in several RAPIDS libraries including cuML and cuGraph.

cuGraph weakly connected components (WCC) implementation, for example, is largely based on pipelining already cleaned data as quickly as possible from disk to the lower-level CUDA C++ implementation. WCC is a good goal for demonstrating how developers can use both RAPIDS (cuGraph) and Ray together to gain access to powerful, distributed, accelerated algorithms. 

To implement the WCC requires the following:

  1. Loading data into GPU memory
  2. Starting NCCL comms (and cuGraph sub-communicator)
  3. Instantiating and configuring internal multi-GPU cuGraph implementation
  4. Executing WCC

The first step has been demonstrated. And while Ray has NCCL hooks, we will rely on RAFT NCCL interfaces due to the hard dependency from cuGraph to manage communications. The following stubs out the requirements just outlined:

class RAFTActor:
    def __init__(self, index, pool_size, session_id):
        ...

    def broadcast_root_unique_id(self):
        # broadcast root/rank-0 to all actors

    def _setup_nccl(self):
        # start NCCL with identified rank-0 actor

    def _setup_raft(self):
        # configure RAFT and NCCL together

    def set_root_unique_id(self, root_uniqueId):
        # To be set rank-0 for all actors

@ray.remote(num_gpus=1)
class WCCActor(RAFTActor):
    def __init__(self, index, pool_size, session_id):
        super().__init__(index=index, pool_size=pool_size, session_id=session_id, actor_name_prefix="WCC")

    def weakly_connected_components(self, df):
        """
        1. Each actor loads in a chunk
        2. Each actor has a NCCL/RAFT Handle
        3. Pass each chunk and handle to MGGraph
        """

        src_array = df['src']
        dst_array = df['dst']
        weights = df['wgt']

        # Configure and setup a Multi-GPU cuGraph Object with 
        # edge list data and NCCL
	 graph = MGGraph(src_array, dst_array, weights, ...)
	 
        # Execute WCC
	 weakly_connected_components(graph)

# Initialize Ray and Run WCC algorithm

This covers the two classes required to run cuGraph weakly connected components. To learn more, see an implementation of weakly connected components. Much of the work is configuring NCCL/RAFT. This same pattern works for other libraries like cuML, as demonstrated with the cuML k-means implementation.

Conclusion

Ray provides an expressable and scalable Actor interface that can be easily leveraged with RAPIDS. We’ve explored how to connect Ray Actors to use optimized CUDA C++ and NCCL implementations. This exploration has largely focused on Level 1 integration of Ray Actors as launchers. 

To learn more about GPU-accelerated data processing, join the 3,500+ members in the RAPIDS Slack community.

Discuss (0)

Tags