Kafka Code Examples


This page will show you example code to connect to our Kafka cluster in PHP, Python, and Go. You will need to fill in your username, password, consumer group id and the topic you wish to pull from. For ACLs, our cluster requires using the SASL mechanism “SCRAM-SHA-256” which will be shown in the example code. Instructions on installing necessary libraries will also be provided. The IPs listed in each example are the actual IPs you should point at to connect to the cluster.



The sample code requires the library “kafka-python”. You may also need to install pip to install this library.

apt install python3-pip 
pip3 install kafka-python


from kafka import KafkaConsumer 
servers = ['', 
consumer = KafkaConsumer( 
 '', #Fill in topic here 
 auto_offset_reset='latest', #Or 'earliest' 
 group_id='', #Consumer group id 
 sasl_mechanism = "SCRAM-SHA-256", 
 sasl_plain_username='', #username 
 sasl_plain_password='', #password 
 value_deserializer=lambda x: x.decode('utf-8'))

 for message in consumer;
## PHP

### Installation

The sample code requires the library "php-rdkafka". You will need to add
"extension=rdkafka.so" to your php.ini. You may also need to install PECL.

apt install librdkafka-dev apt install php-pear pecl install rdkafka #Add extension=rdkafka.so to php.ini

### Code
set('metadata.broker.list', implode(",", $brokers)); $conf->set('auto.offset.reset', 'latest'); //Or 'earliest' $conf->set('group.id', ''); #Consumer group id $conf->set('security.protocol', 'SASL_PLAINTEXT'); $conf->set('sasl.mechanisms', 'SCRAM-SHA-256'); $conf->set('sasl.username', ''); #username $conf->set('sasl.password', ''); #password $consumer = new RdKafka\KafkaConsumer($conf); // Fill in topic here $consumer->subscribe(['']); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } ?>

## Go

### Installation

This sample code requires two libraries "github.com/Shopify/sarama" and "github.com/xdg/
scram". If you are not using Go modules you can simply "go get" these libraries. I will show
instructions to install both ways. 
Without Go modules: 

go get github.com/Shopify/sarama go get github.com/xdg/scram

With Go modules:

#Run in a directory containing the sample code go mod init example.com/kafka-sample-code go mod tidy

### Code

package main import ( “context” “fmt” “log” “os” “os/signal” “syscall” “github.com/twmb/franz-go/pkg/kgo” “github.com/twmb/franz-go/pkg/sasl/scram” ) var sig = make(chan os.Signal, 1) func main() { signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) signal.Ignore(os.Signal(syscall.SIGHUP)) var brokers = []string{ “”, “”, “”, “”, } TOPIC := “” // Fill in topic here GROUP := “” //Consumer group id KafkaClient, err := kgo.NewClient( kgo.SeedBrokers(brokers…), kgo.ConsumerGroup(GROUP), kgo.ConsumeTopics(TOPIC), kgo.ConsumeResetOffset(kgo.NewOffset().AtEnd()), // Or kgo.NewOffset().AtStart() kgo.SASL(scram.Auth{ User: “”, //username Pass: “”, //password }.AsSha256Mechanism()), ) if err != nil { fmt.Println(err) os.Exit(1) } defer KafkaClient.Close() // Iterate over consumer sessions. ctx := context.Background() mainLoop: for { fetches := KafkaClient.PollFetches(ctx) if errs := fetches.Errors(); len(errs) > 0 { log.Fatal(fmt.Sprint(errs)) } iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() //fmt.Printf(”%#v\n”, record) fmt.Println(string(record.Value)) } select { case <-sig: break mainLoop default: } } fmt.Println(”Consumer Killed Gracefully”) }