Developer Blog

Data Science |

Beginner’s Guide to GPU-Accelerated Event Stream Processing in Python

This tutorial is the seventh installment of introductions to the RAPIDS ecosystem. The series explores and discusses various aspects of RAPIDS that allow its users solve ETL (Extract, Transform, Load) problems, build ML (Machine Learning) and DL (Deep Learning) models, explore expansive graphs, process geospatial, signal, and system log data, or use SQL language via BlazingSQL to process data.

In the age of the Internet, abundant IoT devices, social media, web servers, and more, data flows at incredible speeds. In 2019, Forbes reported that every minute, Americans use approximately 4.4PB of internet data: which converts to roughly 1MB of data per Internet user per minute.


Not only is the volume of data increasing over time, but so are the speeds at which data arrives. Over the years, we went from dial-up modem connections with speeds up to 56kbit in the early 1990s to contemporary 10Gbit networks starting gaining some popularity. 1Gbit networks are still the most widely used type of interconnecting devices at home and in the office unless you are on a WiFi network. 

Many of the Internet services offered these days rely on prompt and fast processing of this constant waterfall of data. cuStreamz is one of the newer additions to the RAPIDS stack. It aims to take the streaming data processing historically done on CPU and accelerate on the GPU. Thanks to GPUs’ immense parallelism, processing streaming data has now become much faster with a friendly Python interface.

In the previous posts we showcased other areas: 

Today, we talk about cuStreamz – a library that uses GPUs to process streaming data. To help get familiar with cuStreamz, we also published a cheat sheet that can be downloaded here cuStreamz cheatsheet, and an interactive notebook with all the current functionality of cuStreamz showcased here.

Streaming frameworks

First released in 2011, Apache Kafka has quickly become a standard for managing vast quantities of fast-moving data with low latency and high-level APIs. Kafka is a distributed platform that maintains a list of topics that systems can subscribe to (the so-called, consumers), and publish their data onto (the producers). Data in Kafka, like many other distributed systems, is replicated among multiple workers (or brokers): if any of the brokers disconnects from the cluster, or otherwise dies, the data is not lost and still available from other brokers. This improves the resiliency and availability of the system that is required by today’s Internet service companies.

Streamz is a Python framework that focuses on processing high-velocity data and allows for branching, joining, controlling the flow of messages, and sinking the data to disk or other streams. Here’s what a set of distinct pipelines might look like;


Figure 1: Source: https://streamz.readthedocs.io/en/latest/_images/complex.svg

The pipeline can branch into multiple branches. A popular Lambda architecture also implements two branches: one to process fast-moving, near real-time data, and another one to provide batch processing.

RAPIDS cuStreamz builds on top of the streamz framework and allows the messages to be batched into cuDF DataFrames instead of text messages. This, on its own, enables significant speed-ups of processing of messages that purport to the same schema by tapping into the power of GPUs. Also, if the data volume cannot fit on a single machine, cuStreams supports pushing the data processing using Dask-cuDF DataFrames.

Setting up locally

It is easy to get started. In this section, we will show you how to set up your own mini-Kafka cluster using Docker. To use cuStreamz, you will, of course, need an NVIDIA GPU with Pascal architecture (GTX 1000-series) or newer as required by RAPIDS.

To get started with Kafka, you need to install Docker and Docker-compose: the installation instructions for Docker can be found here https://docs.docker.com/engine/install/ while the Docker-compose installation manual is here https://docs.docker.com/compose/install/. Please note that you will need a Linux machine to run this as neither Windows nor MacOSX is officially supported: https://github.com/NVIDIA/nvidia-docker/wiki/Frequently-Asked-Questions#is-microsoft-windows-supported.

To use cuStreamz, your machine will need NVIDIA drivers and CUDA environment present (instructions to follow can be found here https://developer.nvidia.com/cuda-downloads) and NVIDIA-docker addition so Docker can connect to your GPU: find it here https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/install-guide.html#docker.

Kafka cluster

Next, let’s set up our Kafka cluster. If you clone the Github repository, inside the cheatsheets/cuStreamz folder navigate to Kafka and open docker-compose.yaml file.

Docker-compose uses the YAML configuration files to set up the whole cluster. The first service we start is the zookeeper. Zookeeper is a service used to track naming and configuration data for Kafka; it maintains information about the cluster nodes’ status and their topics, partitions, replication, etc. Besides, the Zookeeper service allows multiple clients to carry out concurrent reads and writes to the service to keep up with the volume and velocity of the incoming and outgoing data calls.

services:
 zookeeper:
   image: 'confluentinc/cp-zookeeper:5.4.3'
   hostname: zookeeper
   networks:
     - kafka
   environment:
     - ZOO_MY_ID=1
     - ZOOKEEPER_CLIENT_PORT=2181
     - ZOO_SERVERS=zookeeper:2888:3888
   ports:
     - 2181:2181
   volumes:
     - ./data/zookeeper/data:/data
     - ./data/zookeeper/datalog:/datalog

In this example, we use the cp-zookeeper:5.4.3 image from Confluent to start our Zookeeper service; the server started will be named zookeeper. The Zookeeper service can be replicated among multiple servers, so it can become resilient; the Zookeeper servers talk to each other on port 2888, and the leader-of-the-pack runs on port 3888. Clients that want to use the Zookeeper connect to the service on port 2181, and that port gets forwarded to the host via the config ports. We also map some host folders to the container so the data that Zookeeper stores is persisted.

Next, we start two Kafka worker nodes (one shown here for brevity).

kafka0:
 image: confluentinc/cp-kafka:5.4.3
 hostname: kafka0
 networks:
   - kafka
 ports:
   - "9092:9092"
 environment:
   KAFKA_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka0:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
   KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka0:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
   KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
   KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
   KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
   KAFKA_BROKER_ID: 0
   KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
 volumes:
   - ./data/kafka0/data:/var/lib/kafka/data
 depends_on:
   - zookeeper

The cp-kafka image comes from the Confluent’s Docker Hub; here, we also use version 5.4.3. There are plenty of environmental variables but let’s review just the most important from our point of view:

  • KAFKA_LISTENERS identifies a list of server names and ports the server will be listening to. Note that the external and internal ports are different: to facilitate communication between multiple docker containers the server will be placed on the Docker internal network (in our case kafka_kafka) and the kafka0 server will be listening on port 19092. If you would like to connect to this service from the host you can use the localhost and port 9092. The same list is provided in the KAFKA_ADVERTISED_LISTENERS environmental variable.
  • KAFKA_INTER_BROKER_LISTENER_NAME tells the Docker which server name to use for internal communication between containers: in our case, this is LISTENER_DOCKER_INTERNAL but any recognizable name should work. Should you, however, change this name you will have to change the KAFKA_LISTENERS and the KAFKA_ADVERTISED_LISTENERS.
  • KAFKA_ZOOKEEPER_CONNECT specifies the address of the zookeeper to connect to; in our case, that is zookeeper: 2181.
  • KAFKA_BROKER_ID is a unique identifier of the kafka node and by convention should be included in the name of the service and server name.

We also identify the zookeeper as a service this container depends on.

To start all these services, simply navigate to the folder where the docker-compose.yaml file is saved and run docker-compose up in the terminal (if you want to stop the service press Ctrl-C or from another terminal window type docker-compose down). Once the services are running, you can check the list of all containers by running docker ps command.

With all the services running, let’s create a sample topic. Run the following command in the terminal.

docker exec -ti <HASH_OF_KAFKA0_CONTAINER> bash

Once inside, run the following command.

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor <NO_OF_REPLICATIONS> --partitions <NO_OF_PARTITIONS> --topic test

Now, you should be able to subscribe to the topic test to either sink or consume the messages. Your Kafka service is running!!!

Let’s get streaming!

In this example, we will be using the official RAPIDS container. Go ahead and pull the latest one following the examples here https://rapids.ai/start.html. Start the container using the command listed on the RAPIDS website. You should now be able to navigate to https://localhost:8888 and access JupyterLab.

Before we move forward, we need to connect this container to the kafka_kafka network: do so with the following command from the terminal.

docker network connect kafka_kafka <RAPIDS_CONTAINER_HASH> 

From now on, we should be able to access the kafka0:19092 server from the RAPIDS container.

Note that if you do not have custreamz available in your container, you can install it using the following command.

conda install -c rapidsai -c rapidsai-nightly -c nvidia -c conda-forge -c defaults custreamz python=3.7 cudatoolkit=11

Next, let’s subscribe to our test topic with cuStreamz!

import streamz

consumer_conf = {'bootstrap.servers': 'kafka0:19092',
                 'group.id': 'custreamz',
                 'session.timeout.ms': '60000'
                }

source = streamz.Stream.from_kafka_batched(
    'test'
    , consumer_conf
    , poll_interval='2s'
    , asynchronous=True
    , dask=False
    , engine="cudf"
    , start=False
)

We will be using the .from_kafka_batched(...) method to subscribe as this allows us to use the CUDA Kafka connector and return the messages in the form of a cudf DataFrame. The first parameter specifies the topic name and is followed by the dictionary with configuration. Next, we set up the interval the stream object will be checking the Kafka topic for new messages; 2 seconds in this example. The engine set cudf specifies that the messages should be returned as DataFrames. We can now provide the rest of the pipeline and start the listener.

from streamz.dataframe import DataFrame

def process_batch(messages):
    batch_df = cudf.DataFrame()
    
    for message in messages:
        df_split = messages[message].str.tokenize()
        df_split = (
            df_split
            .to_frame('word')
            .reset_index()
            .groupby(by='word')
            .agg({'index': 'count'})
            .rename(columns={'index': 'count'})
            .reset_index()
        )
        print("\nWord Count for this batch:")
        
        batch_df = cudf.concat([batch_df, df_split])
    
    return batch_df

stream_df = source.map(process_batch)

# Create a streamz dataframe to get stateful word count
sdf = DataFrame(stream_df, example=cudf.DataFrame({'word':[], 'count':[]}))

# Formatting the print statements
def print_format(sdf):
    print("\nGlobal Word Count:")
    return sdf

# Print cumulative word count from the start of the stream, after every batch. 
# One can also sink the output to a list.
sdf.groupby('word').sum().stream.gather().map(print_format)

After this run;

source.start()


Et voila! We now have a running listener to the test topic!

The code here is pretty self-explanatory, but at the high level, we expect the message to come as a DataFrame. We will count all the words occurring in the message by using the .tokenize() functionality of RAPIDS cudf and then count the number of individual words. Finally, we create a Streamz DataFrame that we use to produce the final tally of words by summing the occurrences of each word.

With the consumer running now, let’s produce some messages! Open a new notebook and install kafka-python package by running in a cell.

 !pip install kafka-python  

Next, we start a producer.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='kafka0:19092'
    , value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

The bootstrap_servers is the address of our kafka0 server. Every message we will emit will be JSON string UTF-8 encoded. Now we can start pushing the messages onto the topic message bus:

producer.send('test',{'text': 'RAPIDS rocks!'})

What your notebook with cuStreamz consumer running should produce is a DataFrame with index being RAPIDS and rocks! rows, and a count 1 against each of these words. You can now play more with it!

With the introduction of cuStreamz, the RAPIDS ecosystem can speed up the processing of fast-moving data. You can try the above examples and more for yourself at app.blazingsql.com and download the cuStreamz cheatsheet here.