Kafka Code Examples
Introduction
This page will show you example code to connect to our Kafka cluster in PHP, Python, and Go. You will need to fill in your username, password, consumer group id and the topic you wish to pull from. For ACLs, our cluster requires using the SASL mechanism “SCRAM-SHA-256” which will be shown in the example code. Instructions on installing necessary libraries will also be provided. The IPs listed in each example are the actual IPs you should point at to connect to the cluster.
Python
Installation
The sample code requires the library “kafka-python”. You may also need to install pip to install this library.
apt install python3-pip
pip3 install kafka-python
Code
from kafka import KafkaConsumer
servers = ['10.131.0.11:9092',
'10.131.0.12:9092',
'10.131.0.13:9092',
'10.131.0.14:9092',
]
consumer = KafkaConsumer(
'', #Fill in topic here
bootstrap_servers=servers,
auto_offset_reset='latest', #Or 'earliest'
enable_auto_commit=True,
group_id='', #Consumer group id
security_protocol="SASL_PLAINTEXT",
sasl_mechanism = "SCRAM-SHA-256",
sasl_plain_username='', #username
sasl_plain_password='', #password
value_deserializer=lambda x: x.decode('utf-8'))
for message in consumer:
print(message)
PHP
Installation
The sample code requires the library “php-rdkafka”. You will need to add “extension=rdkafka.so” to your php.ini. You may also need to install PECL.
apt install librdkafka-dev
apt install php-pear
pecl install rdkafka
#Add extension=rdkafka.so to php.ini
Code
<?php
$brokers = array(
'10.131.0.11:9092',
'10.131.0.12:9092',
'10.131.0.13:9092',
'10.131.0.14:9092'
);
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', implode(",", $brokers));
$conf->set('auto.offset.reset', 'latest'); //Or 'earliest'
$conf->set('group.id', ''); #Consumer group id
$conf->set('security.protocol', 'SASL_PLAINTEXT');
$conf->set('sasl.mechanisms', 'SCRAM-SHA-256');
$conf->set('sasl.username', ''); #username
$conf->set('sasl.password', ''); #password
$consumer = new RdKafka\KafkaConsumer($conf);
// Fill in topic here
$consumer->subscribe(['']);
while (true) {
$message = $consumer->consume(120*1000);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
}
?>
Go
Installation
This sample code requires two libraries “github.com/Shopify/sarama” and “github.com/xdg/
scram”. If you are not using Go modules you can simply “go get” these libraries. I will show
instructions to install both ways.
Without Go modules:
go get github.com/twmb/franz-go/pkg/kgo
go get github.com/twmb/franz-go/pkg/sasl/scram
With Go modules:
#Run in a directory containing the sample code
go mod init example.com/kafka-sample-code
go mod tidy
Code
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/scram"
)
var sig = make(chan os.Signal, 1)
func main() {
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
signal.Ignore(os.Signal(syscall.SIGHUP))
var brokers = []string{
"10.131.0.11:9092",
"10.131.0.12:9092",
"10.131.0.13:9092",
"10.131.0.14:9092",
}
TOPIC := "" // Fill in topic here
GROUP := "" //Consumer group id
KafkaClient, err := kgo.NewClient(
kgo.SeedBrokers(brokers...),
kgo.ConsumerGroup(GROUP),
kgo.ConsumeTopics(TOPIC),
kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), // Or kgo.NewOffset().AtStart()
kgo.SASL(scram.Auth{
User: "", //username
Pass: "", //password
}.AsSha256Mechanism()),
)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer KafkaClient.Close()
// Iterate over consumer sessions.
ctx, cancel := context.WithCancel(context.Background())
listenSig(cancel)
mainLoop:
for {
fetches := KafkaClient.PollFetches(ctx)
if errs := fetches.Errors(); len(errs) > 0 {
log.Fatal(fmt.Sprint(errs))
break mainLoop
}
iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
//fmt.Printf("%#v\n", record)
fmt.Println(string(record.Value))
}
}
fmt.Println("Consumer Killed Gracefully")
}
func listenSig(cancel context.CancelFunc) {
go func() {
<-sig
cancel()
}()
}