Kafka Python Feature Store Example

Producing and Consuming Messages to/from Kafka, using Python Clients

Tested with python 3.6 and python 2.7

Before running this notebook, you should have created a Kafka topic with a name that you can configure in the TOPIC_NAME variable below in the code.

The screenshots below illustrates the steps necessary to create a Kafka topic on Hops

kafka.png kafka2.png kafka3.png kafka4.png kafka5.png kafka6.png kafka7.png kafka8.png

In this notebook we use two python dependencies:

To install the confluent-kafka-python libary, use the Hopsworks UI:

kafka9.png kafka10.png

The hops-util library is already installed by default when projects are created on Hops. However, if you need to re-install it for some reason you can use the Hopsworks UI to first uninstall it and the install it from pip using the same method as described above.

Imports

from hops import kafka
from hops import tls
from confluent_kafka import Producer, Consumer
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
31application_1538483294796_0034pysparkidleLinkLink
SparkSession available as 'spark'.

Constants

Define the name of the topic you have created here

TOPIC_NAME = "test"

We can get the schema defined for the topic by using the utility-library to make a REST-call to Hopsworks:

kafka.get_schema(TOPIC_NAME)
{'contents': '[]', 'version': 0}

Define Kafka Config

The hops-util-py library provides utility methods for setting up secure communication using Kafka producers and consumers running inside a Hopsworks cluster. You can use this utility methods in combination with any python kafka client. In this noteobook we will be using confluent-kafka-python.

config = {
    "bootstrap.servers": kafka.get_broker_endpoints(),
    "security.protocol": kafka.get_security_protocol(),
    "ssl.ca.location": tls.get_ca_chain_location(),
    "ssl.certificate.location": tls.get_client_certificate_location(),
    "ssl.key.location": tls.get_client_key_location(),
    "group.id": "something"
}
# equivalently you can use:
# config = kafka.get_kafka_default_config()

Create Kafka Producer and Consumer

producer = Producer(config)
consumer = Consumer(config)

Subscribe the Consumer to your Topic

The confluent_kafka api provides a callback-hook for getting notified when a consumer has been assigned to a different Kafka partition

def print_assignment(consumer, partitions):
    """ 
    Callback called when a Kafka consumer is assigned to a partition
    """
    print('Assignment:', partitions)
# the consumer can be subscribed to multiple topics
topics = [TOPIC_NAME]
consumer.subscribe(topics, on_assign=print_assignment)

Produce Messages to your Topic Using the Producer

The confluent_kafka api provides a callback-hook so that we can get notified once messages have been successfully acknowledged by the Kafka brokers (the produce method is asynchronous so when it returns we cannot be guaranteed that messages actually was received by the brokers)

def delivery_callback(err, msg):
    """
    Optional per-message delivery callback (triggered by poll() or flush())
    when a message has been successfully delivered or permanently
    failed delivery (after retries).
    """
    if err:
        print("Message failed delivery: {}".format(err))
    else:
        print('Message: {} delivered to topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))
for i in range(0, 10):
    producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)

# Trigger the sending of all messages to the brokers, 10sec timeout
producer.flush(10) 
Message: b'message 0' delivered to topic: test, partition: 1, offset: 70, timestamp: (1, 1538566389535)
Message: b'message 1' delivered to topic: test, partition: 1, offset: 71, timestamp: (1, 1538566389535)
Message: b'message 2' delivered to topic: test, partition: 1, offset: 72, timestamp: (1, 1538566389535)
Message: b'message 3' delivered to topic: test, partition: 1, offset: 73, timestamp: (1, 1538566389535)
Message: b'message 4' delivered to topic: test, partition: 1, offset: 74, timestamp: (1, 1538566389535)
Message: b'message 5' delivered to topic: test, partition: 1, offset: 75, timestamp: (1, 1538566389535)
Message: b'message 6' delivered to topic: test, partition: 1, offset: 76, timestamp: (1, 1538566389535)
Message: b'message 7' delivered to topic: test, partition: 1, offset: 77, timestamp: (1, 1538566389535)
Message: b'message 8' delivered to topic: test, partition: 1, offset: 78, timestamp: (1, 1538566389535)
Message: b'message 9' delivered to topic: test, partition: 1, offset: 79, timestamp: (1, 1538566389535)
0

Poll Messages from your Topic Using the Consumer

for i in range(0, 10):
    msg = consumer.poll(timeout=5.0)
    if msg is not None:
        print('Consumed Message: {} from topic: {}, partition: {}, offset: {}, timestamp: {}'.format(msg.value(), msg.topic(), msg.partition(), msg.offset(), msg.timestamp()))
    else:
        print("Topic empty, timeout when trying to consume message, try to produce messages to the topic and then re-consume")
Consumed Message: b'message 0' from topic: test, partition: 1, offset: 70, timestamp: (1, 1538566389535)
Consumed Message: b'message 1' from topic: test, partition: 1, offset: 71, timestamp: (1, 1538566389535)
Consumed Message: b'message 2' from topic: test, partition: 1, offset: 72, timestamp: (1, 1538566389535)
Consumed Message: b'message 3' from topic: test, partition: 1, offset: 73, timestamp: (1, 1538566389535)
Consumed Message: b'message 4' from topic: test, partition: 1, offset: 74, timestamp: (1, 1538566389535)
Consumed Message: b'message 5' from topic: test, partition: 1, offset: 75, timestamp: (1, 1538566389535)
Consumed Message: b'message 6' from topic: test, partition: 1, offset: 76, timestamp: (1, 1538566389535)
Consumed Message: b'message 7' from topic: test, partition: 1, offset: 77, timestamp: (1, 1538566389535)
Consumed Message: b'message 8' from topic: test, partition: 1, offset: 78, timestamp: (1, 1538566389535)
Consumed Message: b'message 9' from topic: test, partition: 1, offset: 79, timestamp: (1, 1538566389535)