Building Kafka-compatible Streaming & Batch Workers in Python

What is a worker?#

A worker is a program that listens to the events in your system and performs actions in the background asynchronously. Asynchronous means the worker performs the action in the background outside the request-response cycle, so the user doesn’t have to wait for the execution to finish. For example, when a new user is registered to your SaaS product, a worker will listen to the user registration event and send a welcome email. We’ll continue with this example throughout this article.

Generally, there are two types of workers:

  • The streaming worker will process each message that the worker sees immediately. In our case, every time a user signs up for our service, we will immediately send them a welcome email.
  • The batch worker will accumulate the messages into a buffer queue and only process when they satisfy a particular condition, for example, every 5 seconds. In our case, when a user signs up for our service, the worker will wait for 5 seconds to accumulate all the users signing up before sending all of them a welcome email in batch.

Both of the worker types are great in their use cases. In most cases, the streaming worker is for you, like sending a welcome email to the user right after signing up for your service. However, some operations are much efficient when they are done in batches. For example, Clickhouse DB prefers the data inserted in large batches instead of small and frequent insertions due to its analytical database engine nature. This is when you should use batch workers instead of streaming workers.

Graceful Shutdown#

It is vital to handle service termination correctly. The ‘Disposability’ rule in Twelve-Factor App suggests that the processes can be started or stopped at a moment’s notice.

Let say that our worker is performing two operations each time the user registered:

  • Sending the email to the user
  • Updating the user record from status ‘Pending’ to ‘Waiting Confirmation’

What would happen if the streaming worker is terminated when it just finished sending the email to the user BUT hasn’t updated the user record yet? The record in your database will not reflect the latest status of the user. You have sent the email, but the status hasn’t been updated like it was supposed to.

In the case of batch workers, it’s much worse. If the batch worker is terminated when it just finished sending the email & updating the record for 100 users in the queue buffer, the remaining users currently in the queue buffer will never receive their welcome email.

What you should do is, when the worker receives a SIGTERM or SIGINT signal, it should stop pulling for new messages and finish all the tasks at hand before terminating the service. Both signals are usually used to notify your program that you want to bring down the service; for example, when you click Ctrl+C in your terminal or terminating a pod in Kubernetes.

What we’ll be building#

Overview

Producer: A script that publishes the message to the message queue broker.

Message Queue Broker: We’ll launch a Redpanda docker container. Redpanda is a Kafka-compatible message broker.

Collector Worker: The worker who will listen to the broker’s messages and pretends to send email & update the user status. We’ll implement in both streaming approach and batch approach.

Note: If you’re using web frameworks like Django or Flask, it’s recommended to use an asynchronous task queue library like Celery or RQ as they are well integrated.

The objective of this article is to discuss on how streaming and batch worker works behind the scene. We’ll be building minimal workers to understand how both of them work.

Environment Setup#

Python Environment#

For this tutorial, I’m using Python 3.7.3 on Ubuntu 19.04. You’ll need to install the Python Kafka library.

$ pip3 install kafka-python

Message Broker#

Run the following command to launch Redpanda container:

$ docker run --name=redpanda-1 --rm -p 9092:9092 docker.vectorized.io/vectorized/redpanda:latest redpanda start --overprovisioned --smp 1 --memory 1G --reserve-memory 0M --node-id 0 --check=false

Producer#

First, we’ll write our producer. The producer will publish messages to our message broker. Create producer.py file with the following content:

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin import NewTopic
from time import sleep

if __name__ == "__main__":

    topic_name = "user_signups"
    producer = KafkaProducer(bootstrap_servers=["localhost:9092"])

    try:
        # Create Kafka topic
        topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=1)
        admin = KafkaAdminClient(bootstrap_servers="localhost:9092")
        admin.create_topics([topic])
    except Exception:
        print(f"Topic {topic_name} is already created")

    for i in range(10):
        email = f"user{i}@gmail.com"
        producer.send(topic_name, email.encode())
        sleep(0.1)
        print(f"Published message to message broker. User email: {email}")

Bear in mind that the producer can be in many forms. You can write an API endpoint that publishes messages to the message broker, a crawler that crawls websites and publishes messages to the message broker, etc.

Here’s an example of an output of this producer script:

Producer

Notice that the producer script is creating a user_signups topic and publishing 10 messages to the topic.

Streaming Worker#

This is a minimal implementation of a streaming worker. It listens to every message in Kafka user_signups topic and processes the message as soon as it sees it.

# streaming_worker.py
from kafka import KafkaConsumer
from time import sleep

def process_message(email):
    print(f"sending email to {email}")
    sleep(1) # pretend to do work
    print(f"updating user {email} status to Waiting Confirmation")
    sleep(1) # pretend to do work
    print("finished processing message")
    
if __name__ == "__main__":
    print("starting streaming consumer app")
    consumer = KafkaConsumer(
        'user_signups',
        bootstrap_servers=["localhost:9092"],
        group_id="group1"
    )
    
    for message in consumer:
        process_message(message.value)

It’s essential to set a group ID so that the consumer commits the offset to the broker. Otherwise, every time you start the worker, it will read the message from the start rather than from the last message you were last time.

This is the example of an output of this worker:

Streaming Worker output

Press Ctrl+C to quit the script.

You can see that it’s sending email and updating user status for each of the user, one by one.

Graceful Shutdown#

To gracefully shut down a streaming worker, the worker must finish processing the message it is currently working on, then exit the process.

from kafka import KafkaConsumer
from time import sleep
+ import signal

+ is_shutting_down = False


def process_message(email):
    print(f"sending email to {email}")
    sleep(0.5)  # pretend to do work
    print(f"updating user {email} status to Waiting Confirmation")
    sleep(0.5)  # pretend to do work
    print("finished processing message")


+ def graceful_exit(*args, **kwargs):
+     global is_shutting_down
+     is_shutting_down = True


if __name__ == "__main__":

+     signal.signal(signal.SIGINT, graceful_exit)
+     signal.signal(signal.SIGTERM, graceful_exit)

    print("starting streaming consumer worker")

    consumer = KafkaConsumer(
        "user_signups", bootstrap_servers=["localhost:9092"], group_id="group1"
    )

    for message in consumer:
        process_message(message.value)

+         if is_shutting_down:
+             break

+     print("End of the program. I was killed gracefully")
+     consumer.close()
+     exit()

When it receives a SIGTERM or SIGINT signal, it will set the is_shutting_down global variable to True, close the connection to the broker and exit the script right after processing the current message. Closing the connection to the broker will commit the offset in the partition, so it won’t process the same message the next time it runs.

Streaming worker graceful exit

Before running this script, remember to run producer.py script again to publish the messages again.

In the sample output above, I pressed Ctrl+C multiple times. However, the worker did not stop immediately until it finishes the current operation. After the user status is updated successfully, then only it will terminate the process. This is exactly what we want.

Batch Worker#

For batch workers, we’ll utilize Python multithreading features. The main thread will process the messages in the queue buffer every 5 seconds. We’ll spawn a child thread to listen to messages from the message broker and insert the data into the queue buffer.

# batch_worker.py

from time import sleep
from kafka import KafkaConsumer
import threading
from queue import Queue, Empty

emails = Queue()


class Consumer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        
    def run(self):
        consumer = KafkaConsumer(
            "user_signups",
            bootstrap_servers=["localhost:9092"],
            auto_offset_reset="earliest",
            enable_auto_commit=True,
        )
        
        for message in consumer:
            self.process_message(message.value)
            
        consumer.close()
        
    def process_message(self, message):
        # Insert the message into buffer queue
        emails.put(message)
        
        
def process_messages():
    print("processing message in queue buffer")
    temp_emails = []
    
    try:
        while True:
            temp_emails.append(emails.get_nowait())

     except Empty:
        pass
      
    # Combine all emails in 1 call
    # This is the beauty of batch worker
    print(f"sending email to user " + str(temp_emails))
    sleep(0.5)  # pretend to do work
    print(f"updating status to Waiting Confirmation for users " + str(temp_emails))
    sleep(0.5)  # pretend to do work
    print("finished processing messages")
    
if __name__ == "__main__":
    print("starting batch consumer worker")
    
    consumer = Consumer()
    consumer.daemon = True
    consumer.start()
    
    while True:
        process_messages()
        sleep(5)

You’ll need to run producer.py script again to publish messages to the topic. This is the example output of batch worker:

Batch Worker output

Notice that it is sending the emails and updating users all at once. This is the beauty of batch worker, coalescing all payloads in one call.

Graceful Shutdown#

To gracefully shut down a batch worker, the worker must:

  • Stop pulling for new messages from the broker
  • Close the broker connection
  • Process all the messages currently in the buffer one last time
  • Finally, exit the process
from time import sleep
from kafka import KafkaConsumer
import threading
from queue import Queue, Empty
+ import signal

emails = Queue()
+ is_shutting_down = False


class Consumer(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        consumer = KafkaConsumer(
            "user_signups", bootstrap_servers=["localhost:9092"], group_id="group1"
        )

        for message in consumer:
            self.insert_to_buffer(message.value)

+            if is_shutting_down:
+                break

        consumer.close()

    def insert_to_buffer(self, message):
        print("received a message, inserting into a queue buffer")
        emails.put(message)


def process_messages():
    print("processing message in queue buffer")

    temp_emails = []

    try:
        while True:
            temp_emails.append(emails.get_nowait())

    except Empty:
        pass

    # Combine all emails in 1 call
    # This is the beauty of batch worker
    print(f"sending email to user " + str(temp_emails))
    sleep(0.5)  # pretend to do work
    print(f"updating status to Waiting Confirmation for users " + str(temp_emails))
    sleep(0.5)  # pretend to do work
    print("finished processing messages")


+ def exit_gracefully(*args, **kwargs):
+     global is_shutting_down
+     is_shutting_down = True
+     process_messages()
+     exit()


if __name__ == "__main__":

+     signal.signal(signal.SIGINT, exit_gracefully)
+     signal.signal(signal.SIGTERM, exit_gracefully)

    print("starting batch consumer worker")

    consumer = Consumer()
    consumer.daemon = True
    consumer.start()

    while True:
        process_messages()
        sleep(5)

This is example of an example output of batch worker script with graceful shutdown feature.

Batch worker graceful exit

As you can see here, when I press Ctrl+C, the worker will stop listening for new messages and process the messages in the queue buffer until all of them are done. Checked!

Conclusion#

There are two important concepts in this tutorial:

  • Streaming & batch worker
  • Graceful shutdown

Both of these concepts are language agnostic and broker agnostic. This concept applies to all programming languages and all brokers. You can bring the same concept to Go, Ruby, Javascript, etc. language and Kafka, Redpanda, NATS, RabbitMQ, etc. message broker.

Hope you learn something from this post. Please support me to write more tutorials.

References#

© Fadhil Yaacob 2021