Apache Spark 是用于大数据处理和分析的行业领先平台。随着非结构化数据(documents、emails、multimedia content)的日益普及,深度学习 (DL) 和大语言模型 (LLMs) 已成为现代数据分析工作流的核心组成部分。这些模型支持各种下游任务,例如图像描述、语义标记、文档摘要等。
然而,将 GPU 密集型 DL 与 Spark 相结合一直是一项挑战。 NVIDIA RAPIDS Accelerator for Apache Spark 和 Spark RAPIDS ML 库可实现无缝 GPU 加速,但主要用于提取、转换和加载 (ETL) 以及传统机器学习 (ML) 工作负载。
最近用于分布式训练和推理的 Spark API (如上一篇博客中所述) 在 DL 集成方面取得了重大进展。本文以这项工作为基础,介绍了 Spark 上分布式推理的最佳实践。我们将展示与 NVIDIA Triton Inference Server 等服务平台的集成、使用 vLLM 进行的高性能 LLM 推理以及在云平台上的部署。
为什么要进行批量推理?
虽然实时推理最适合交互式应用,但批量推理提供了一种可扩展的高吞吐量范式,可一次性处理大量数据集。一些关键用例包括:
- 语义搜索 :为大型内容库批量生成 embeddings 和语义元数据,从而提高搜索质量。
- 数据转换: 将非结构化数据集 (例如自由格式的文本或图像) 转换、汇总或转换为结构化模式,以执行下游任务。
- 内容创作 :自动生成用于大规模内容制作的产品描述、图像描述、社交媒体帖子或营销文案。
将 DL/LLM 模型集成到现有的 Spark 工作流中,可将 DL 和生成式 AI 的功能直接引入统一工作流中的企业数据。现在,我们来探索实现,首先回顾一下 Spark 的 predict_batch_udf
API。
基本部署:使用 predict_batch_udf 进行分布式推理
Spark 3.4 引入了 predict_batch_udf API,可为 DL 模型推理提供简单的接口。此 API 可自动将 Spark DataFrame 列转换为批量处理的 NumPy 输入,并在 Spark 执行程序上缓存模型。有关更多详细信息,请参阅 Distributed Deep Learning Made Easy with Spark 3.4 。
例如,以下代码演示了如何使用 Huggingface Sentence Transformers 在包含文本数据的 Spark DataFrame 上执行分布式文本嵌入:
from pyspark.sql.functions import predict_batch_udf
from pyspark.sql.types import *
def predict_batch_fn():
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("paraphrase-MiniLM-L6-v2", device="cuda")
def predict(inputs):
return model.encode(inputs)
return predict
embed_udf = predict_batch_udf(predict_batch_fn,
return_type=ArrayType(FloatType()),
batch_size=128)
df = spark.read.parquet("/path/to/text_data")
embeddings_df = df.withColumn("embedding", embed("text"))
embeddings_df.write.parquet("/path/to/embeddings")
请注意,这是一个数据并行架构 (图 1) ,其中每个 Python 工作节点将模型副本加载到 GPU 上,并在其数据集分区上进行预测。

predict_batch_udf
API 进行分布式推理。每个 Python 工作者加载一个模型副本。借助这种直接推理方法,您可以将现有的 PyTorch、TensorFlow 或 Hugging Face 框架代码移植到 Spark,以进行分布式推理,同时尽可能减少代码更改。
但是,我们将看到,将多个模型副本加载到 GPU 上可能会给大型模型带来问题。我们将讨论推理服务如何解决这一问题,从而改进资源分离。
高级部署:分布式推理服务
在基本方法中,使用 predict_batch_udf
并行执行任务会使每个 Python 工作者在 GPU 上加载模型副本。因此,您必须调整每个执行程序的任务,以确定可以运行的模型副本数量,而不会出现内存不足错误或过度开销。占用整个 GPU 显存的大型模型(例如 LLMs)可能需要每个执行程序仅执行一项任务(即 spark.task.resource.gpu.amount=1
,适用于整个应用,如图 2 所示)。

predict_batch_udf
仅有 1 个模型适合 GPU,则我们必须将每个执行程序限制为 1 项任务。predict_batch_udf
的这种局限性凸显了 Spark 调度的挑战:它统一处理所有任务,而不区分 CPU 和 GPU 资源利用率。
推理服务通过 将 GPU 执行与 Spark 任务调度解 。我们可以在每个执行程序上部署专用推理服务器,而不是在每个 Spark 任务中加载模型。然后,许多任务可以并行加载、预处理和写入数据,以充分利用执行程序 CPU,而服务器将占用和利用 GPU 进行推理,如下所示。

通过在 CPU 和 GPU 并行性之间提供逻辑分离,推理服务无需根据 GPU 显存调整每个执行程序的任务。此外,它还支持轻松集成服务功能,例如模型管理和动态批处理。
我们在 Spark-RAPIDS-Examples DL 推理库 中提供服务器实用程序,以在 Spark 集群中启动和管理服务器实例,并支持 NVIDIA Triton Inference Server 和 vLLM 。请注意,这些示例正在不断发展:我们可能很快会将支持范围扩展到 NVIDIA Dynamo 和 NVIDIA NIMs 等推理解决方案。
与 Triton Inference Server 配合使用
NVIDIA Triton 推理服务器是用于高性能模型服务的行业标准平台,支持许多 主要功能 ,包括模型集成、并发执行和动态批处理。由于 Triton 通常在 Docker 容器中运行,因此在基于云的 Spark 环境(执行程序本身在容器中运行)中进行部署需要在 Docker 中部署 Docker,这带来了权限要求和缺乏资源隔离等挑战。
幸运的是, PyTriton 提供了一个 Python 原生接口,可直接在 Python 进程中运行 Triton,从而简化云端部署。有关 PyTriton 部署基础知识的 博客 ,请查看简要概述。
Spark-RAPIDS-Examples DL Inference repo 中的 server_utils
模块提供 TritonServerManager
,用于管理整个 Spark 集群中服务器的生命周期,包括查找和分配可用端口、在每个执行程序上启动服务器进程以及处理推理后的正常关闭。
在本课程中,部署 Triton 服务器的步骤很简单:
- 使用 PyTriton 服务器逻辑定义
triton_server
函数,其中包含您的 inference 框架代码。 - 使用您的模型名称和路径初始化
TritonServerManager
。 - 调用
TritonServerManager.start_servers
(triton_server
),在集群中分配triton_server
函数。
我们将介绍以下步骤。首先,定义 triton_server
函数。为简洁起见,我们省略了这一点 – 请参阅 notebooks 以获取您选择的框架中的大量示例。
def triton_server(ports: List[int], model_path: str):
# Load model to GPU, define inference logic, bind to server
定义服务器逻辑后,使用模型名称和路径初始化服务器管理器,并在启动服务器时传递 triton_server
函数:
from server_utils import TritonServerManager
server_manager = TritonServerManager(model_name="my-model", model_path="path/to/my-model")
server_manager.start_servers(triton_server)
host_to_grpc_url = server_manager.host_to_grpc_url
驱动上的 ServerManager 向每个执行程序分配启动任务,从而生成运行用户定义的 triton_server
函数的 Python 进程,如图 4 所示。

ServerManager start_servers()
函数在每个 executor 上启动 triton_server
进程的部署。 然后,使用 predict_batch_udf
预处理一批输入,并使用 PyTriton 的 ModelClient API 向服务器发送推理请求。
def triton_predict_fn(model_name, host_to_url):
import socket
from pytriton.client import ModelClient
url = host_to_url.get(socket.gethostname())
def infer_batch(inputs):
with ModelClient(url, model_name) as client:
# Do some preprocessing...
result_data = client.infer_batch(inputs) # Send batch to server
return result_data["predictions"] # Return predictions
return infer_batch
predict_udf = predict_batch_udf(partial(triton_predict_fn, model_name="my-model", host_to_url=host_to_grpc_url),
return_type=ArrayType(FloatType()),
batch_size=32)
# Run inference
df = spark.read.parquet("/path/to/my-data")
predictions_df = df.withColumn("predictions", predict_udf(col("data")))
predictions_df.write.parquet("/path/to/predictions.parquet")
# Once we're finished, stop servers
server_manager.stop_servers()
请注意,在 CPU 上的 UDF 中执行的加载和预处理现已与 GPU 上的推理解耦 — Spark 可以自由并行安排这些任务,而无需在 GPU 显存中创建额外的模型副本。
使用 vLLM 服务器提供服务
虽然 Triton 擅长处理自定义推理逻辑、多个框架和不同的模型类型,但 vLLM 是一种直接的替代服务,专门针对 LLM 进行了优化。它为生产部署提供了兼容 OpenAI 的 HTTP 服务器。
我们支持通过实用程序类中的 VLLMServerManager 在 Spark 上提供 vLLM 服务。与图 3 类似,此方法会在每个 Spark 执行程序上启动 vLLM 服务器进程,从而将 CPU 和 GPU 执行解耦。启动服务器时,您可以传递任何受支持的 vLLM CLI 参数,而不是使用自定义服务器功能:
from server_utils import VLLMServerManager
server_manager = VLLMServerManager(model_name="qwen-2.5-7b",
model_path="/path/to/Qwen2.5-7B")
server_manager.start_servers(gpu_memory_utilization=0.95,
max_model_len=6600,
task="generate")
host_to_http_url = server_manager.host_to_http_url
同样,您可以通过向服务器发送请求来运行分布式推理,在本例中,您可以使用兼容 Open-AI 的 JSON 格式:
def vllm_fn(model_name, host_to_url):
import socket
import numpy as np
import requests
url = host_to_url[socket.gethostname()]
def predict(inputs):
response = requests.post(
f"{url}/v1/completions",
json={
"model": model_name,
"prompt": inputs.tolist(),
"max_tokens": 256,
}
)
return np.array([r["text"] for r in response.json()["choices"]])
return predict
generate = predict_batch_udf(partial(vllm_fn, model_name="qwen-2.5-7b", host_to_url=host_to_http_url),
return_type=StringType(),
batch_size=32)
# Run inference
preds = text_df.withColumn("response", generate("prompt"))
# Once we're finished, stop servers
server_manager.stop_servers()
摘要:选择部署策略
在探索了这两种部署方法后,我们来比较它们的优势和权衡,以指导您的实施。我们通常推荐简单原型设计的基本方法,以及实现灵活性和清洁资源分离的高级方法,如下表总结所示。
注意事项 | 基本部署 (predict_batch_udf) | 高级部署 (Inference Server) |
资源管理 | 需要调整任务并行性以适应 GPU 内存 | 无需任务并行调优 – 将 CPU 和 GPU 调度解耦 |
设置复杂性 | 简单、直接移植您的 framework 代码 | 使用 ServerManager 非常简单,但需要一些额外的客户端/服务器代码 |
推理功能 | 受限于 framework 功能 | 其他特定于服务器的功能 (dynamic batching, model ensembles) |
可移植性 | 推理代码特定于 UDF | 只需在服务器中定义一次推理逻辑,即可在在线/离线应用程序中重复使用 |
最适合 | 小模型、简单流程和原型设计 | 更大的模型和复杂的 pipelines |
在云平台上部署
虽然我们 之前的博客 演示了本地部署,但我们已更新 Spark-RAPIDS-Examples DL 推理资源库 ,为您提供在 CSP Spark 集群上部署 DL/LLM 推理工作负载所需的一切。
云就绪模板
CSP 说明 提供了用于设置和运行工作负载的云就绪模板,目标是 Databricks 和 Dataproc Spark 环境。其中包括:
- 预配置初始化脚本,用于 spin-up 集群并安装必要的库。
- 推荐用于批量推理的 Spark 配置。
- 从云存储保存和加载模型的最佳实践。
无论您的集群环境如何 (包括本地独立集群、Databricks 集群或 Dataproc 集群) ,Notebooks 均配置为端到端工作,且无需更改代码。
配置 GPU 实例
要运行这些示例,我们推荐 A10/L4 或更高版本的 GPU 实例 (例如 Azure 上的 NVadsA10、AWS 上的 g5/g6、GCP 上的 g2-standard) ,可确保有足够的 GPU 显存。A100/H100 GPU 将是大型 LLM 的更好选择,例如,半精度的 Llama 70b 可在两台 H100 上舒适运行。
对于大型模型,通常需要跨多个 GPU 对模型进行分片,并且可以在每个节点有多个 GPU 的 Spark 集群中完成。设置 spark.executor.resource.gpu.amount=(gpus per node)
将为每个执行程序分配必要的 GPU,进而使推理服务器可见这些 GPU。然后,可以通过框架并行处理模型:例如,通过设置 tensor_parallel_size=(gpus per node)
与 vLLM。有关此示例,请参阅 vLLM Tensor Parallel Notebook 。
入门指南
首先,您可以浏览 示例 notebooks ,其中展示了使用开源模型和数据集的一系列用例 (包括图像分类、情感分析、文档摘要等) 的端到端 Spark 应用。要在云端部署这些应用程序,请参阅我们的 CSP 平台运行指南 。