Interaction with Apache Kafka

Apache Kafka is a distributed and scalable message broker. Interaction between Foresight Analytics Platform and Kafka can be organized via the use of Python units and writing procedures in the Python language. These procedures can also be further called using the Fore language.

Consider the example of a Kafka server deployment and working with it. If a working Kafka server already exists, skip Steps 2-4. Follow the steps:

  1. Check if Java and Python languages are installed. If any language is missing, follow the instructions given in the Connecting External Units to Foresight Analytics Platform repository object.

  2. Download a tgz archive with Kafka from the website https://downloads.apache.org/kafka/. Unpack the archive, open the \bin folder in Linux OS or the \bin\windows\ folder in Windows OS.

  3. Start Zookeeper:

./zookeeper-server-start.sh ../config/zookeeper.properties

zookeeper-server-start.bat ../../config/zookeeper.properties

  1. Start Kafka:

./kafka-server-start.sh ../config/server.properties

kafka-server-start.bat ../../config/server.properties

  1. Create a topic section, to which messages will be sent. Address and port (the example uses the address - localhost and - 9092) can differ from the specified ones:

./kafka-topics.sh --create --topic mytopic --bootstrap-server localhost:9092

kafka-topics.bat --create --topic mytopic --bootstrap-server localhost:9092

  1. Download and unpack the PyKafka archive at https://github.com/Parsely/pykafka.

  2. Install PyKafka using the command:

pip install pykafka

After this proceed to creating an populating objects in the repository. Create a Python unit, add the following code to it:

from pykafka import KafkaClient
from pykafka.common import OffsetType

def sendKafkaMessage(topic, message):
    client = KafkaClient("localhost:9092")
    topic = client.topics[topic]
    with topic.get_sync_producer() as producer:
        producer.produce(str.encode(message, 'utf-8'))

def readKafkaMessages(topic):
    client = KafkaClient("localhost:9092")
    topic = client.topics[topic]
    consumer = topic.get_simple_consumer(auto_offset_reset = OffsetType.LATEST, reset_offset_on_start = True)
    LAST_N_MESSAGES = 6
    offsets = [(p, op.next_offset - LAST_N_MESSAGES) for p, op in consumer._partitions.items()]
    consumer.reset_offsets(offsets)
    i = 0
    for message in consumer:
        print(str(message.offset) + ' : ' + message.value.decode('utf-8'))
        i += 1
        if i == 5break

# Send messages to Kafka
for i in range(10):
    sendKafkaMessage('mytopic''Test message: ' + str(i))
# Read messages from Kafka
readKafkaMessages('mytopic')

The sendKafkaMessage procedure allows for sending messages to the queue of the specified topic section. The example uses the address - localhost and port - 9092, in practice they may differ. The readKafkaMessages procedure goes to the end of the queue, then reads and displays the last five messages from the specified topic section in the development environment console. If everything is correct, after executing the specified example, ten messages will be sent to the message queue. After this the last five messages will be received and displayed in the development environment console.

To call Python procedures in a Fore code, use resources of the Python assembly. Below is the Fore code that uses various methods to send a message to the Kafka queue and receives messages from the queue by executing the above mentioned Python methods. It is assumed that Python methods are implemented in the Python unit with the MOD_PY_KAFKA identifier.

Sub TestSendMessage;
Begin
    Python.InvokeModule("MOD_PY_KAFKA""sendKafkaMessage""mytopic""Message sent to Kafka from Fore");
End Sub TestSendMessage;

Function TestReadMessage: Variant;
Var
    pUtils: IPythonUtils;
    Result: Variant;
Begin
    pUtils := New PythonUtils.Create;
    Result := pUtils.InvokeModule("MOD_PY_KAFKA""readKafkaMessages""mytopic");
    Return Result;
End Function TestReadMessage;

See also:

Developers Knowledge Base