Data Science

Accelerating Sequential Python User-Defined Functions with RAPIDS on GPUs for 100X Speedups


Custom “row-by-row” processing logic (sometimes called sequential User-Defined Functions) is prevalent in ETL workflows. The sequential nature of UDFs makes parallelization on GPUs tricky. This blog post covers how to implement the same UDF logic using RAPIDS to parallelize computation on GPUs and unlock 100x speedups.


Typically, sequential UDFs revolve around records with the same grouping key. For example, analyzing a user’s clicks per session on a website where a session ends after 30 minutes of inactivity. In Apache Spark, you’d first cluster on user key, and then segregate user sessions using a UDF like in the example below. This UDF uses multiple CPU threads to run sequential functions in parallel.

# requires data to be in format <user_sk>\t<timestamp>, clustered by user_sk and sorted by <timestamp> ascending
line = ''
current_uid = ''
last_click_time = -1
perUser_sessionID_counter = 1
timeout = 60*60
for line in click_rows:
user_sk, tstamp_str = line.strip().split("\t")
tstamp = long(tstamp_str)
# reset if next partition beginns
if current_uid != user_sk:
current_uid = user_sk
perUser_sessionID_counter = 1
last_click_time = tstamp
# time between clicks exceeds session timeout?
if tstamp – last_click_time > timeout:
perUser_sessionID_counter += 1
last_click_time = tstamp
print(f"{user_sk}, {str(perUser_sessionID_counter)}")

Running on the GPU with Numba

Dask-CUDA clusters use a single process and thread per GPU, so running a single sequential function won’t work for this model. To run serial functions on the hundreds of CUDA cores available in GPUs, you can take advantage of the following properties:

  • The function can be applied independently across keys.
  • The grouping key has high cardinality (there are a high number of unique keys)

Let’s look at how you can use Numba to transform our sessionization UDF for use on GPUs. Let’s break the algorithm into two parts.

Each cuda  thread does sequential work specific  to session. As we have a lot of sessions and cuda threads  we get a lot of parallelism.

For CPU we have less threads so we get less parallelism.
Figure 1: Comparison b/w execution on CPUs and GPUs for the UDF.

Part 1. Session change flag

With this function, we create session boundaries. This function is embarrassingly parallel so that we can launch threads for each row of the dataframe.

# UDF-1
# make a array for session change flag
# for all rows x
# set the flag if user_sk[x]!=user_sk[x-1] or tstamp_inSec[x] – tstamp_inSec[x-1] > 60 minutes
def make_session_change_flag_kernel(
user_sk, tstamp_inSec, session_change_flag, time_out
gid = cuda.grid(1)
if 0 < gid < len(wcs_user_sk):
if (
user_sk[gid] != user_sk[gid – 1]
or tstamp_inSec[gid] – tstamp_inSec[gid – 1] > time_out
session_change_flag[gid] = np.int32(1)
session_change_flag[gid] = np.int32(0)
session_change_flag[gid] = np.int32(1)

Part 2. Serial logic on a session-level

For session analysis, we have to set a unique session flag across all the rows that belong to that session for each user. This function is serial at the session level, but as we have many sessions (approx 18 M) in each dataframe, we can take advantage of GPU parallelism by launching threads for each session. 

# UDF-2:
#if session change flag
#. set session id to x
#. set all other clicks to x until next session_change_flag
#. do nothing
def populate_session_ids_kernel(session_boundary):
gid = cuda.grid(1)
# don't loop if we're off the edge
if gid < len(session_boundary):
# if this is a session boundary…
if session_boundary[gid] == 1:
# this thread marks the start of a session
session_boundary[gid] = gid
look_ahead = 1
# check elements 'forward' of this one
# until a new session boundary is found
while session_boundary[gid + look_ahead] == 0:
session_boundary[gid + look_ahead] = gid
look_ahead += 1
# don't segfault if I'm the last thread
if gid + look_ahead == len(session_boundary) – 1:

Using these functions with dask-cuDF Dataframe

We invoke the above functions on each partition of a dask-cudf dataframe using a map_partition call. The main logic of the code above is still similar to pure python while providing a lot of speedups. A similar serial function in pure python takes about `17.7s` vs. `14.3 ms` on GPUs. That’s a whopping 100x speedup.

def get_session_id(df, time_out):
This function creates a session id column for each click
The session id grows in incremeant for each user's susbequent session
Session boundry is defined by the time_out
# Preallocate destination column for Numba
df["session_change_flag"] = cp.zeros(len(df), dtype="int32")
wcs_user_sk = df["wcs_user_sk"]._column.data_array_view
tstamp_inSec = df["tstamp_inSec"]._column.data_array_view
session_change_flag = df["session_change_flag"]._column.data_array_view
## configure kernel based on number of tasks
conf_session_change_flag_kernel = make_session_change_flag_kernel.forall(
conf_populate_session_ids_kernel = populate_session_ids_kernel.forall(
## Determine session boundries
wcs_user_sk, tstamp_inSec, session_change_flag, time_out
## Populate session ids
df = df.rename(columns={"session_change_flag": "session_id"})
return df
# dask cudf dataframe call to sessionize
# after repartitioning along a user key
df = df.map_partitions(create_session_id, 60 * 60)


When working with serial user-defined functions that operate on many small groups, transforming the UDF using numba, as shown above, will maximize performance on GPUs. Thus, with the power of numba, Dask, and RAPIDS, you can now supercharge the map-reduce UDF ETL jobs to run at the speed of light.

Want to start using Dask and RAPIDS to analyze big data at the speed of light? Check out the RAPIDS Getting Started webpage, with links to help you download pre-built Docker containers or install directly via Conda.

Discuss (0)