数据科学

利用 RAPIDS 和 Ray 加速 GPU 数据分析

RAPIDS 是一套开源 GPU 加速的数据科学和 AI 库,可通过 Spark Dask 等分布式引擎进行横向扩展。 Ray 是一种热门的开源分布式 Python 框架,常用于扩展 AI 和机器学习 (ML) 应用。Ray 特别擅长简化和扩展训练和推理工作流,并且可以轻松面向 CPU 和 GPU 设备。

在本文中,我们将探讨如何使用 Ray 和 RAPIDS 加速新型分析流程。

Ray Actors 

Ray 提供用于 训练 服务 ML 模型的高级抽象概念 ,同时我们将试验 Ray 的核心,尤其是 Ray Actors。Actors 是有状态的 workers,这意味着每个 worker 都可以存储、管理和变异存储的任何数据。例如,如果您想使用 cuDF 在 GPU 上加载一些数据,可以执行以下操作:

@ray.remote(num_gpus=1)
class cuDFActor:
    def __init__(self):
        ...

    def read_parquet(self, filepath: str, columns: list = None) -> cudf.DataFrame:
        return cudf.read_parquet(filepath, columns=columns)

# Start 4 Workers 
pool_size = 4
actor_pool = [cuDFActor.remote() for i in range(pool_size)]

此示例使用 Ray 在四个 GPU 上创建四个 Actor,并使用 cuDF 加速 IO。此示例可与其他 RAPIDS 优化 (使用 RMM 进行内存配置) 或常见 ETL 例程 (例如过滤/自定义函数和用户定义函数) 一起使用:cudf ray-actor 示例。

Ray Actors 非常通用,可以快速用于并行 Python 库,还可以轻松集成现有的分布式算法。此外,借助 Ray,您可以在多个 GPUs 和多个节点上轻松扩展这项工作。

NCCL 和 cuGraph 

许多 RAPIDS 实现的流行算法已经为 C++ 中的分布式加速 GPU 计算而构建。这些实现经过高度调整,并且依赖于与 NCCL 以及 RAFT 中的基元和求解器 (成对距离、k-means 聚类、迭代求解器等) 的加速通信。RAFT 基元用于多个 RAPIDS 库 (包括 cuML 和 cuGraph)。

例如,cuGraph 弱连接组件 (WCC) 实现在很大程度上基于已尽快清理数据的流水线,从磁盘到较低级别的 CUDA C++ 实现。WCC 是一个很好的目标,它展示了开发者如何同时使用 RAPIDS (cuGraph) 和 Ray 来访问功能强大的分布式加速算法。

要实施 WCC,需要以下内容:

  1. 将数据加载到 GPU 显存中
  2. 启动 NCCL 通信 (以及 cuGraph 子通信器)
  3. 实例化和配置内部多 GPU cuGraph 实现
  4. 执行 WCC

第一步已经演示。 虽然 Ray 具有 NCCL hook ,但由于 cuGraph 很难管理通信,因此我们将依赖 RAFT NCCL 接口。以下是对刚才概述的要求的省略:

class RAFTActor:
    def __init__(self, index, pool_size, session_id):
        ...

    def broadcast_root_unique_id(self):
        # broadcast root/rank-0 to all actors

    def _setup_nccl(self):
        # start NCCL with identified rank-0 actor

    def _setup_raft(self):
        # configure RAFT and NCCL together

    def set_root_unique_id(self, root_uniqueId):
        # To be set rank-0 for all actors

@ray.remote(num_gpus=1)
class WCCActor(RAFTActor):
    def __init__(self, index, pool_size, session_id):
        super().__init__(index=index, pool_size=pool_size, session_id=session_id, actor_name_prefix="WCC")

    def weakly_connected_components(self, df):
        """
        1. Each actor loads in a chunk
        2. Each actor has a NCCL/RAFT Handle
        3. Pass each chunk and handle to MGGraph
        """

        src_array = df['src']
        dst_array = df['dst']
        weights = df['wgt']

        # Configure and setup a Multi-GPU cuGraph Object with 
        # edge list data and NCCL
	 graph = MGGraph(src_array, dst_array, weights, ...)
	 
        # Execute WCC
	 weakly_connected_components(graph)

# Initialize Ray and Run WCC algorithm

这包括运行 cuGraph 弱连接组件所需的两个类别。如需了解详情,请参阅 弱连接组件的实现 。大部分工作是配置 NCCL/RAFT。此模式适用于 cuML 等其他库,如 cuML k-means 实现所示。

结束语 

Ray 提供可表达且可扩展的 Actor 接口,可轻松用于 RAPIDS。我们探讨了如何连接 Ray Actors 以使用优化的 CUDA C++ 和 NCCL 实现。本次探索主要侧重于将 Ray Actors 作为启动器的 Level 1 集成。

要详细了解 GPU 加速数据处理,请加入 RAPIDS Slack 社区 中的 3500 多名成员。

 

标签