处理大量数据的工作负载 (尤其是在云端运行的工作负载) 通常会使用对象存储服务 (S3、Google Cloud Storage、Azure Blob Storage 等) 作为数据源。对象存储服务可以存储和提供海量数据,但要想获得最佳性能,可能需要根据远程对象存储的行为方式调整工作负载。本文适用于希望尽快将数据读或写到对象存储,以便 IO 不会限制工作负载的 RAPIDS 用户。
您对本地文件系统行为方式的一些了解可转换为远程对象存储,但它们本质上是不同的。这两者之间的最大区别 (至少对于数据分析工作负载而言) 可能在于,对象存储上的读取和写入操作具有越来越高的可变延迟。每个存储服务 (AWS、Azure) 都有自己的一套最佳实践和性能指南。在这里,我们将提供一些专注于数据分析工作负载的一般指南。
地址
将计算节点放置在存储服务附近 (理想情况下,应位于同一云区域),可在运行工作负载的计算机和为数据提供服务的计算机之间提供速度最快、最可靠的网络。在一天结束时,传输将受到光速的限制,因此最大限度地减少物理距离不会造成伤害。
文件格式
“云原生”文件格式的开发能够很好地与对象存储配合使用。这些文件格式通常可让用户快速轻松地访问元数据 (元数据包括列名称或数据类型等高级信息,以及文件特定数据子集所在位置等低级信息)。 Apache Parquet 、 Zarr 和 Cloud Optimized GeoTIFF 是适用于各种类型数据的云原生文件格式的一些示例。
由于对象存储服务通常支持范围请求,因此客户端 (如 cuDF ) 可以读取元数据,然后只下载您实际需要的数据。例如,cuDF 只能从包含多列的 Parquet 文件中读取几列数据,或者 Zarr 客户端可以从大型 n 维数组中读取单个 chunk。这些读取只需通过几次 HTTP 请求即可完成,而且无需下载一堆刚刚被过滤掉的不相干数据。
文件大小
由于每个读取操作都需要 (至少) 一个 HTTP 请求,因此我们倾向于在合理数量的字节数上分担每个 HTTP 请求的用度。如果您控制数据写入过程,则需要确保文件足够大,以便下游处理任务获得良好性能。最佳值取决于您的工作负载,但 parquet 文件的大小通常介于数十 MB 到数百 MB 之间 (请参阅下文,了解一些特定示例)。
也就是说,您需要注意文件大小与 Kit 中的下一个工具:并发的交互方式。
并发
使用并发同时下载多个 blobs (或单个 blob 的多个部分) 对于从远程存储服务中获得良好性能至关重要。由于这是一项远程服务,您的流程将花费一些时间 (可能会花费大量时间) 四处等待,不执行任何操作。此等待时间为 HTTP 请求被发送到响应被接收之间的时间。在此期间,我们会等待网络执行请求,等待存储服务处理并发送响应,等待网络执行响应 (可能较大)。虽然该请求/响应周期的一部分会随所涉及的数据量而扩展,但其他部分只是固定的开销。
对象存储服务旨在处理许多并发请求。我们可以将这一点与每个请求都涉及一些时间来等待不执行任何操作的事实相结合,以发出许多并发请求来提高整体吞吐量。在 Python 中,这通常使用线程池完成:
pool = concurrent.futures.ThreadPoolExecutor()
futures = pool.map(request_chunk, chunks)
或使用 异步 :
tasks = [request_chunk_async(chunk) for chunk in chunks]
await asyncio.gather(*tasks)
我们能够让大量读取 同时 不执行任何操作,从而提高吞吐量。由于每个线程/任务通常不执行任何任务,因此拥有比计算机核心数更多的线程/任务也是可以的。如果并发请求数量足够多,您最终会使存储服务饱和,而存储服务试图满足一些每秒请求数和带宽目标数。但这些目标很高;您通常需要多台机器使存储服务饱和,并且应该实现非常高的吞吐量。
库
上述内容基本上适用于从对象存储服务执行远程 IO 的任何库。在 RAPIDS 环境中, NVIDIA KvikIO 值得注意,因为
- 它会自动将大型请求分块为多个较小的请求,并并发发出这些请求。
- 它可以高效读取主机或设备内存,尤其是启用 GPU Direct Storage 时。
- 速度很快。
正如 RADIDS 24.12 发布公告中提到的那样,从 S3 读取数据时,KvikIO 可以实现惊人的吞吐量。我们来看看一些基准测试,看看效果如何。
基准测试
当您读取文件时,KvikIO 会将读取的文件拆分成较小的 kvikio.defaults.task_size
字节读取。它使用具有 kvikio.defaults.num_threads
工作线程的线程池并行执行这些读取请求。可以使用环境变量 KVIKIO_TASK_SIZE
和 KVIKIO_NTHREADS
控制这些内容,也可以通过 Python 使用:
with kvikio.defaults.set_num_threads(num_threads), kvikio.defaults.set_task_size(size):
...
详情请参阅 Runtime Settings 。
此图表显示了在同一区域内,针对不同大小的线程池,从 S3 到 g4dn EC2 实例读取 1 GB Blob 的吞吐量 (以 Mbps 为单位) (越高越好)。
图 1、从 S3 读取 1 GB 文件的基准测试,到具有高达 25 Gbps 已发布带宽的 g4dn.xlarge EC2 实例。这是kvikio.RemoteFile.read
的吞吐量,适用于各种值的 kvikio.defaults.num
_threads 和 16 MiB 的任务。随着我们添加更多线程并对读取进行并行化,吞吐量会增加到一定程度。
线程越少 (少于 4 个),吞吐量越低,读取文件的时间越长。更多线程 (64、128、256) 通过将请求并行化到以并行方式提供服务的存储服务,实现更高的吞吐量。当我们遇到系统中存储服务、网络或其他瓶颈的限制时,会出现递减甚至负回报的情况。
借助远程 IO,每个线程都会在相对较长的时间内等待响应,因此对于您的工作负载,可能适合使用更多线程 (相对于核心数量而言)。我们看到,在本例中,吞吐量最高,介于 64 到 128 个线程之间。
如下图所示,任务大小也会影响最大吞吐量。
图 2、从 S3 读取 1 GB 文件的基准测试,到具有高达 25 Gbps 已发布带宽的 g4dn.xlarge EC2 实例 。这显示了kvikio.RemoteFile.read
吞吐量的热图。水平轴显示各种任务大小的吞吐量,而垂直轴显示各种线程数量。
只要任务大小不是太小(大约或低于 4 MiB)或太大(大约或超过 128 MiB),吞吐量就会达到 10 Gbps 左右。由于任务规模过小,发出许多 HTTP 请求会降低吞吐量。由于任务规模过大,我们无法获得足够的并发能力来最大限度地提高吞吐量。
与 boto3 (适用于 Python 的 AWS SDK) 相比,即使在线程池中使用 boto3 并发执行请求,KvikIO 也能实现更高的吞吐量。
图 3、从从 S3 读取 1 GB 的基准测试,到具有高达 25 Gbps 已发布带宽的 g4dn.xlarge EC2 实例。KvikIO 基准测试使用 64 个线程和 16 MiB 任务大小。Boto3 基准测试使用 ThreadPool 并行读取许多 4 MB 字节的块,而参数搜索表明,对于 Boto3 而言,这是最快的块大小。对于略为逼真的工作负载 (尽管仍然仅有一个工作负载专注于 IO),我们比较了读取一批 360 个 parquet 文件 (每个文件约 128 MB) 的性能。这在 AWS g4dn.12xlarge
实例上运行,该实例包含 4 个 NVIDIA T4 GPU 和 48 个 vCPUs。
启用 KvikIO 后,四个 Dask 工作进程能够共同实现从 S3 到此单个节点的近 20 Gbps 吞吐量。
结束语
随着 RAPIDS 加速工作负载的其他部分,IO 可能会成为瓶颈。如果您使用的是对象存储,并且已经疲于等待数据加载,请尝试本博文中的一些建议。让我们了解如何在 Github 上使用 KvikIO。您还可以与 RAPIDS Slack 社区的 3,500 多名成员一起讨论 GPU 加速的数据处理。