Overview

In this article, I will use the Go language as an example to demonstrate how to produce and consume messages through Kafka using the Sarama SDK. For message consumption, I will demonstrate two modes: auto-committing messages that have been consumed and actively committing message consumption ACKs. In fact, this is also the practice of different message processing models (i.e., at most once and at least once). However, I will not demonstrate the mode of consuming messages exactly once here because this is not a feature supported by a simple message queue. It requires some additional business support, so I will only briefly discuss it without deep dive.

Producer

Message sending to Kafka by the producer can be divided into asynchronous and synchronous modes. Synchronous mode is easy to understand. After sending a message, we can wait for the result (whether it is successfully received and saved by the message queue). However, asynchronous mode is a bit more complicated. We cannot directly wait for Kafka to receive the message at the place where the message is produced. Instead, we need to listen to a channel to get the message result (success or failure) and determine whether our message has been successfully processed based on some identifiers from the result.

Both modes have their pros and cons:

Mode Synchronous Asynchronous
Advantages - High reliability: Can provide immediate feedback on the processing result of business events, making it easy for businesses to take corresponding actions (retry/return error) - Higher performance: Business code will not be blocked by Kafka’s communication and processing delays, allowing it to immediately return to processing other content
Disadvantages - Poor performance: Performance may be affected, possibly due to delays between business and Kafka or issues between different brokers of Kafka causing business blockage - Lower maintainability: Asynchronous logic is not as straightforward, often requiring caching or other third-party dependencies to maintain the state of asynchronous messages; - Lower reliability: When a producer exception occurs, the business code may not be able to handle events indicating whether the message was processed successfully

However, different modes have their pros and cons, so in practical use, we can choose according to our needs. Here, I will use two different modes as examples to briefly introduce simple code implementations. The complete code has been uploaded to Github: Golang Kafka Producer.

Synchronous message production

  1. [root@liqiang.io]# cat producer/standalone_sync.go
  2. // Create configuration
  3. config := sarama.NewConfig()
  4. config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all replicas to respond before responding
  5. config.Producer.Partitioner = sarama.NewRandomPartitioner // Random partitioning strategy
  6. config.Producer.Return.Successes = true // Successful delivery messages will be returned in the success channel
  7. // Create a producer with the given broker address and configuration
  8. producer, err := sarama.NewSyncProducer(strings.Split(brokers, ","), config)
  9. defer func() {
  10. producer.Close()
  11. }()
  12. // Build the message to be sent
  13. msg := &sarama.ProducerMessage{
  14. Topic: topic,
  15. Key: sarama.StringEncoder("hello-world"),
  16. Value: sarama.StringEncoder("Hello, Kafka!"),
  17. }
  18. // Send the message
  19. partition, offset, err := producer.SendMessage(msg)
  20. fmt.Printf("Message %s sent to partition %d at offset %d\n", msg.Key, partition, offset)

Asynchronous message production

  1. [root@liqiang.io]# cat producer/standalone_async.go
  2. // Create configuration
  3. config := sarama.NewConfig()
  4. config.Producer.Return.Successes = true // Enable success notification
  5. config.Producer.Return.Errors = true // Enable error notification
  6. // Create a producer
  7. producer, err := sarama.NewAsyncProducer(strings.Split(brokers, ","), config)
  8. // Send messages asynchronously
  9. go func() {
  10. for {
  11. select {
  12. case msg := <-producer.Successes():
  13. fmt.Printf("Produced message to topic %s partition %d at offset %d\n", msg.Topic, msg.Partition, msg.Offset)
  14. case err := <-producer.Errors():
  15. log.Printf("Error producing message: %v\n", err)
  16. }
  17. }
  18. }()
  19. producer.Input() <- &sarama.ProducerMessage{
  20. Topic: "test-topic",
  21. Key: sarama.StringEncoder("hello-world"),
  22. Value: sarama.StringEncoder("Hello, Kafka!"),
  23. }

Consumer

In Kafka consumer, we also have different modes, distinguished by how we tell the message queue that we have successfully consumed a message. Therefore, there are two scenarios here: when we get a

message, how do we tell the message queue that we have consumed the message successfully? This situation may lead to the problem that the processing function may fail to process the message (for example, if the program crashes during processing, or the processing function cannot process the message successfully at the moment), resulting in the message not being marked as processed in the message queue, which we call the at most once scenario.

Another scenario is that we hand over the message to the processing function, and only when the processing function returns successfully, do we commit the Offset to Kafka. The complete example code has been submitted to Github: Golang Kafka Consumer.

Automatic commit mode (at most once)

In fact, Sarama’s automatic commit mode is not a strict at most once semantics, because Sarama does not commit the offset when handing over the message to the processing function, but rather commits the offset after a period of time. Therefore, it is possible that some messages have been handed over to the processing function, but because the offset has not been committed to Kafka due to the interval time, if an application exception occurs at this time, it may lead to the situation that these messages are consumed multiple times.

However, it is generally considered that this automatic commit mode is at most once semantics. The example code is as follows:

  1. [root@liqiang.io]# cat consumer/at_most_once.go
  2. // Create configuration
  3. config := sarama.NewConfig()
  4. config.Consumer.Return.Errors = true
  5. config.Consumer.Offsets.AutoCommit.Enable = true // Enable automatic commit, default is true
  6. config.Version = sarama.V3_6_0_0 // Set Kafka version
  7. consumer, err := sarama.NewConsumer(strings.Split(brokers, ","), config)
  8. defer consumer.Close()
  9. partitions, err := consumer.Partitions(topic)
  10. for _, partition := range partitions {
  11. partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
  12. go func(pc sarama.PartitionConsumer) {
  13. for {
  14. msg := <-pc.Messages()
  15. fmt.Printf("Partition: %d, Offset: %d, Key: %s, Value: %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  16. }
  17. }(partitionConsumer)
  18. }

Manual commit mode (at least once)

In manual commit mode, we need to customize a ConsumerGroupHandler, which has three method interfaces:

  1. type ConsumerGroupHandler interface {
  2. Setup(ConsumerGroupSession) error
  3. Cleanup(ConsumerGroupSession) error
  4. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  5. }

Their calling timing and calling specifications are as follows:

  • Setup: Before the consumer is actually started, this function is used to configure the consumer, and after execution, the actual consumption begins;
  • ConsumeClaim: This processing function must run a loop internally to consume messages from ConsumerGroupClaim. If this channel is closed, the function should exit the loop and return;
  • Cleanup: This function is called after all ConsumeClaim goroutines have returned and before the offset is reported for the last time.

Below is an example using this custom Handler:

  1. [root@liqiang.io]# cat consumer/at_least_once.go
  2. // Create configuration
  3. config := sarama.NewConfig()
  4. config.Version = sarama.V3_6_0_0
  5. config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
  6. sarama.NewBalanceStrategyRoundRobin(),
  7. }
  8. config.Consumer.Offsets.Initial = sarama.OffsetNewest
  9. consumer := CustomCommitConsumer{
  10. ready: make(chan bool),
  11. }
  12. client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
  13. go func() {
  14. for {
  15. // If a rebalance occurs, Consume will return, and we need to create a new Consumer at this time
  16. client.Consume(ctx, strings.Split(topic, ","), &consumer)
  17. }
  18. }()
  19. <-sigterm
  20. }
  21. type CustomCommitConsumer struct {
  22. ready chan bool
  23. }
  24. func (c *CustomCommitConsumer) Setup(sarama.ConsumerGroupSession) error {
  25. // Mark the consumer as ready
  26. close(c.ready)
  27. return nil
  28. }
  29. func (c *CustomCommitConsumer) Cleanup(sarama.ConsumerGroupSession) error {
  30. return nil
  31. }
  32. func (c *CustomCommitConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  33. for {
  34. message, ok := <-claim.Messages()
  35. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
  36. session.MarkMessage(message, "") // Manually commit Offset
  37. }
  38. }

Stream API

In the Java SDK, there is a Stream API, which allows applications to define a processing function for messages, specify the input and output topics, and then the SDK can help us directly put the messages from the input topic into the output topic after processing. However, this is a superficial understanding

. In fact, it also supports stateless map and reduce operations, as well as stateful aggregate, join, and other operations. In addition, it also needs to consider the processing thread model and fault tolerance. Actually, I just briefly understood it and did not delve into trying and learning it. If you are interested, you can check the Kafka official documentation: STREAMS DSL.

At the same time, for Sarama SDK in Go, it does not support Stream semantics, so I will not delve into it here.

Admin API

In a previous article I wrote about installing and operating Kafka (Kafka Installation and Operation), I introduced many Kafka operational commands. If we want to develop based on Kafka or need to create a platform to manage Kafka, we may need to see if our SDK can support similar functionality, allowing us to get or control Kafka’s status and configuration information through an API.

Unlike the Stream API, Sarama does support the Admin API. However, when initializing, we need to initialize the ClusterAdmin object. For example, here is an example that retrieves all topics from Kafka:

  1. [root@liqiang.io]# cat admin/topics.go
  2. // Create Kafka configuration
  3. config := sarama.NewConfig()
  4. config.Version = sarama.V3_6_0_0
  5. // Create Admin client
  6. admin, err := sarama.NewClusterAdmin(strings.Split(brokers, ","), config)
  7. defer admin.Close()
  8. // Get the list of topics
  9. topics, err := admin.ListTopics()
  10. // Print the list of topics
  11. for name := range topics {
  12. fmt.Println(name)
  13. }

Of course, ClusterAdmin not only provides viewing and operating Topic, but also supports other information viewing and operating, such as ACL, ConsumerGroup, partition, etc., but I won’t demonstrate them one by one. It’s enough to know that these functions are supported.

Additional Topics

Exactly Once

When using a message queue, we are concerned about the message processing semantics. We have mentioned at most once and at least once semantics before, but sometimes we need exactly once, which means exactly once, neither more nor less. This problem is a more complex one, involving multiple aspects:

  • Producer: To achieve exactly once, the first step is to ensure that the producer produces only once. Kafka provides message sequencing to ensure that data is not duplicated, that is, if multiple messages with the same sequence number are sent, Kafka will only save one;
    • To learn: How is it implemented? What should be paid attention to?
  • Message queue: After the message is successfully received by the message queue, we also need to ensure that the message queue’s own exceptions will not lead to data loss. For example, if a Kafka node goes down, through the Replication mechanism, we can ensure that other nodes still hold replicas, thereby ensuring that data is not lost. (If all nodes holding replicas go down at the same time and cannot be recovered, there is nothing we can do but lose messages.)
  • Consumer: Consumers need business support to ensure that messages are not lost and that they are not duplicated. Therefore, when implementing under the at least once mode, further optimization is carried out:
    • Because it is not known from the Kafka level when a message belongs to processed, it needs to be supported by the business;
    • Consumer can detect duplication during re-consumption, for example, by reusing Kafka’s sequence number (can it be obtained?) and so on;

Passing Meta Information

When using a message queue, we may also need to pass some information, such as the commonly used Tracing information. When there are two services communicating using Kafka in the middle of the entire call chain, we certainly do not want the Tracing information between them to be interrupted. Therefore, a natural idea is whether Kafka can directly pass our Tracing information.

Comparing the ProducerMessage and ConsumerMessage of Kafka:

  1. type ProducerMessage struct {
  2. Topic string // The Kafka topic for this message.
  3. Key Encoder
  4. Value Encoder
  5. // The headers are key-value pairs that are transparently passed
  6. // by Kafka between producers and consumers.
  7. Headers []RecordHeader
  8. Metadata interface{}
  9. Offset int64
  10. Partition int32
  11. ...
  12. }
  13. type ConsumerMessage struct {
  14. Headers []*RecordHeader // only set if kafka is version 0.11+
  15. Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
  16. BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
  17. Key, Value []byte
  18. Topic string
  19. Partition int32
  20. Offset int64
  21. }

We can see that Kafka provides a Header field in the Message for us to pass metadata (note that ProducerMessage also has a Metadata field, but it is not used to pass data between Consumer and Producer, but only for Producer).

Disadvantages of Sarama Library

When looking for information, we can see that many people say that there are the following problems with using the Sarama client to send and receive messages:

  • Sarama client cannot detect partition changes. When the number of partitions of a topic increases, the client needs to be restarted to consume messages normally.
  • Sarama client’s maximum message processing time (MaxProcessingTime) default value is 100ms. Exceeding the maximum processing time may cause the consumer to be unable to consume messages.
  • When the consumer offset reset strategy is set to Oldest (earliest), when the client restarts, the offset is reset and may start re-consuming all messages from the minimum offset.
  • When consumers subscribe to multiple topics at the same time, some partitions may not consume messages.

And the solution provided is to suggest using Confluent-Kafka-go as the Kafka client library (😂), and it is easy to find several common Golang SDK comparisons.

However, when I was studying Sarama, I found that the maintainer of this SDK has been migrated from Shopify to IBM. Previously, we imported the package as github.com/shopify/sarama, but as you can see in my sample code, it has now changed to github.com/ibm/sarama. So, regarding these issues:

  • Sarama client cannot detect partition changes. When the number of partitions of a topic increases, the client needs to be restarted to consume messages normally.
    • This has been resolved. From the example of actively reporting offsets, we can see that when the server rebalances, the Consumer will be rebuilt.
  • Sarama client’s maximum message processing time (MaxProcessingTime) default value is 100ms. Exceeding the maximum processing time may cause the consumer to be unable to consume messages.
    • I am not sure about this, I need to learn more about it.
  • When the consumer offset reset strategy is set to Oldest (earliest), when the client restarts, the offset is reset and may start re-consuming all messages from the minimum offset.
    • I am not sure about this, I need to learn more about it.
  • When consumers subscribe to multiple topics at the same time, some partitions may not consume messages.
    • I am not sure about this, I need to learn more about it.