RAPIDS 是一套开源 GPU 加速的数据科学和 AI 库,可通过 Spark 和 Dask 等分布式引擎进行横向扩展。 Ray 是一种热门的开源分布式 Python 框架,常用于扩展 AI 和机器学习 (ML) 应用。Ray 特别擅长简化和扩展训练和推理工作流,并且可以轻松面向 CPU 和 GPU 设备。
在本文中,我们将探讨如何使用 Ray 和 RAPIDS 加速新型分析流程。
Ray Actors
Ray 提供用于 训练 和 服务 ML 模型的高级抽象概念 ,同时我们将试验 Ray 的核心,尤其是 Ray Actors。Actors 是有状态的 workers,这意味着每个 worker 都可以存储、管理和变异存储的任何数据。例如,如果您想使用 cuDF 在 GPU 上加载一些数据,可以执行以下操作:
@ray.remote(num_gpus=1)
class cuDFActor:
def __init__(self):
...
def read_parquet(self, filepath: str, columns: list = None) -> cudf.DataFrame:
return cudf.read_parquet(filepath, columns=columns)
# Start 4 Workers
pool_size = 4
actor_pool = [cuDFActor.remote() for i in range(pool_size)]
此示例使用 Ray 在四个 GPU 上创建四个 Actor,并使用 cuDF 加速 IO。此示例可与其他 RAPIDS 优化 (使用 RMM 进行内存配置) 或常见 ETL 例程 (例如过滤/自定义函数和用户定义函数) 一起使用:cudf ray-actor 示例。
Ray Actors 非常通用,可以快速用于并行 Python 库,还可以轻松集成现有的分布式算法。此外,借助 Ray,您可以在多个 GPUs 和多个节点上轻松扩展这项工作。
NCCL 和 cuGraph
许多 RAPIDS 实现的流行算法已经为 C++ 中的分布式加速 GPU 计算而构建。这些实现经过高度调整,并且依赖于与 NCCL 以及 RAFT 中的基元和求解器 (成对距离、k-means 聚类、迭代求解器等) 的加速通信。RAFT 基元用于多个 RAPIDS 库 (包括 cuML 和 cuGraph)。
例如,cuGraph 弱连接组件 (WCC) 实现在很大程度上基于已尽快清理数据的流水线,从磁盘到较低级别的 CUDA C++ 实现。WCC 是一个很好的目标,它展示了开发者如何同时使用 RAPIDS (cuGraph) 和 Ray 来访问功能强大的分布式加速算法。
要实施 WCC,需要以下内容:
- 将数据加载到 GPU 显存中
- 启动 NCCL 通信 (以及 cuGraph 子通信器)
- 实例化和配置内部多 GPU cuGraph 实现
- 执行 WCC
第一步已经演示。 虽然 Ray 具有 NCCL hook ,但由于 cuGraph 很难管理通信,因此我们将依赖 RAFT NCCL 接口。以下是对刚才概述的要求的省略:
class RAFTActor:
def __init__(self, index, pool_size, session_id):
...
def broadcast_root_unique_id(self):
# broadcast root/rank-0 to all actors
def _setup_nccl(self):
# start NCCL with identified rank-0 actor
def _setup_raft(self):
# configure RAFT and NCCL together
def set_root_unique_id(self, root_uniqueId):
# To be set rank-0 for all actors
@ray.remote(num_gpus=1)
class WCCActor(RAFTActor):
def __init__(self, index, pool_size, session_id):
super().__init__(index=index, pool_size=pool_size, session_id=session_id, actor_name_prefix="WCC")
def weakly_connected_components(self, df):
"""
1. Each actor loads in a chunk
2. Each actor has a NCCL/RAFT Handle
3. Pass each chunk and handle to MGGraph
"""
src_array = df['src']
dst_array = df['dst']
weights = df['wgt']
# Configure and setup a Multi-GPU cuGraph Object with
# edge list data and NCCL
graph = MGGraph(src_array, dst_array, weights, ...)
# Execute WCC
weakly_connected_components(graph)
# Initialize Ray and Run WCC algorithm
这包括运行 cuGraph 弱连接组件所需的两个类别。如需了解详情,请参阅 弱连接组件的实现 。大部分工作是配置 NCCL/RAFT。此模式适用于 cuML 等其他库,如 cuML k-means 实现所示。
结束语
Ray 提供可表达且可扩展的 Actor 接口,可轻松用于 RAPIDS。我们探讨了如何连接 Ray Actors 以使用优化的 CUDA C++ 和 NCCL 实现。本次探索主要侧重于将 Ray Actors 作为启动器的 Level 1 集成。
要详细了解 GPU 加速数据处理,请加入 RAPIDS Slack 社区 中的 3500 多名成员。