Kafka Code Examples

Introduction

This page will show you example code to connect to our Kafka cluster in PHP, Python, and Go. You will need to fill in your username, password, consumer group id and the topic you wish to pull from. For ACLs, our cluster requires using the SASL mechanism “SCRAM-SHA-256” which will be shown in the example code. Instructions on installing necessary libraries will also be provided. The IPs listed in each example are the actual IPs you should point at to connect to the cluster.

Python

Installation

The sample code requires the library “kafka-python”. You may also need to install pip to install this library.

apt install python3-pip 
pip3 install kafka-python

Code

from kafka import KafkaConsumer 
servers = ['10.131.0.11:9092', 
 '10.131.0.12:9092', 
 '10.131.0.13:9092', 
 '10.131.0.14:9092',
 '10.131.0.15:9092',
 '10.131.0.16:9092',
 '10.131.0.17:9092',
 '10.131.0.18:9092',
] 
consumer = KafkaConsumer( 
 '', #Fill in topic here 
 bootstrap_servers=servers, 
 auto_offset_reset='latest', #Or 'earliest' 
 enable_auto_commit=True, 
 group_id='', #Consumer group id 
 security_protocol="SASL_PLAINTEXT", 
 sasl_mechanism = "SCRAM-SHA-256", 
 sasl_plain_username='', #username 
 sasl_plain_password='', #password 
 value_deserializer=lambda x: x.decode('utf-8'))

for message in consumer:
    print(message)
        

PHP

Installation

The sample code requires the library “php-rdkafka”. You will need to add “extension=rdkafka.so” to your php.ini. You may also need to install PECL.

apt install librdkafka-dev 
apt install php-pear 
pecl install rdkafka 
#Add extension=rdkafka.so to php.ini

Code

<?php 
 $brokers = array( 
   '10.131.0.11:9092', 
   '10.131.0.12:9092', 
   '10.131.0.13:9092', 
   '10.131.0.14:9092',
   '10.131.0.15:9092',
   '10.131.0.16:9092',
   '10.131.0.17:9092',
   '10.131.0.18:9092', 
 ); 
 $conf = new RdKafka\Conf(); 
 $conf->set('metadata.broker.list', implode(",", $brokers)); 
 $conf->set('auto.offset.reset', 'latest'); //Or 'earliest' 
 $conf->set('group.id', ''); #Consumer group id 
 $conf->set('security.protocol', 'SASL_PLAINTEXT'); 
 $conf->set('sasl.mechanisms', 'SCRAM-SHA-256'); 
 $conf->set('sasl.username', ''); #username 
 $conf->set('sasl.password', ''); #password 
 $consumer = new RdKafka\KafkaConsumer($conf); 
 // Fill in topic here 
 $consumer->subscribe(['']); 
 while (true) { 
   $message = $consumer->consume(120*1000); 
   switch ($message->err) { 
     case RD_KAFKA_RESP_ERR_NO_ERROR: 
       var_dump($message); 
       break; 
     case RD_KAFKA_RESP_ERR__PARTITION_EOF: 
       echo "No more messages; will wait for more\n"; 
       break; 
   case RD_KAFKA_RESP_ERR__TIMED_OUT: 
     echo "Timed out\n"; 
     break; 
   default: 
     throw new \Exception($message->errstr(), $message->err); 
     break; 
   } 
 } 
?>

Go

Installation

This sample code requires two libraries “github.com/Shopify/sarama” and “github.com/xdg/ scram”. If you are not using Go modules you can simply “go get” these libraries. I will show instructions to install both ways.
Without Go modules:

go get github.com/twmb/franz-go/pkg/kgo
go get github.com/twmb/franz-go/pkg/sasl/scram

With Go modules:

#Run in a directory containing the sample code 
go mod init example.com/kafka-sample-code 
go mod tidy

Code

package main 
import ( 
 "context" 
 "fmt" 
 "log" 
 "os" 
 "os/signal" 
 "syscall" 
 "github.com/twmb/franz-go/pkg/kgo" 
 "github.com/twmb/franz-go/pkg/sasl/scram" 
) 
var sig = make(chan os.Signal, 1) 
func main() { 
 signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) 
 signal.Ignore(os.Signal(syscall.SIGHUP)) 
 var brokers = []string{ 
   "10.131.0.11:9092", 
   "10.131.0.12:9092", 
   "10.131.0.13:9092", 
   "10.131.0.14:9092", 
   "10.131.0.15:9092",
   "10.131.0.16:9092",
   "10.131.0.17:9092",
   "10.131.0.18:9092",
 } 
 TOPIC := "" // Fill in topic here 
 GROUP := "" //Consumer group id 
 KafkaClient, err := kgo.NewClient( 
   kgo.SeedBrokers(brokers...), 
   kgo.ConsumerGroup(GROUP), 
   kgo.ConsumeTopics(TOPIC), 
   kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), // Or kgo.NewOffset().AtStart() 
   kgo.SASL(scram.Auth{ 
     User: "", //username 
     Pass: "", //password 
   }.AsSha256Mechanism()), 
 ) 
 if err != nil { 
   fmt.Println(err) 
   os.Exit(1) 
 } 
 defer KafkaClient.Close() 
 
 // Iterate over consumer sessions. 
  ctx, cancel := context.WithCancel(context.Background())
  listenSig(cancel)
mainLoop: 
 for { 
   fetches := KafkaClient.PollFetches(ctx) 
   if errs := fetches.Errors(); len(errs) > 0 { 
     log.Fatal(fmt.Sprint(errs))
      break mainLoop 
   } 
   iter := fetches.RecordIter() 
   for !iter.Done() { 
     record := iter.Next() 
     //fmt.Printf("%#v\n", record) 
     fmt.Println(string(record.Value)) 
   } 
 } 
 fmt.Println("Consumer Killed Gracefully") 
}

func listenSig(cancel context.CancelFunc) {
  go func() {
   <-sig
   cancel()
  }() 
}