The Merge

The crypto community is celebrating a milestone for the Ethereum blockchain, which just transitioned from the proof-of-work (PoW) consensus mechanism to a proof-of-stake (PoS) model. The move is…

Smartphone

独家优惠奖金 100% 高达 1 BTC + 180 免费旋转




How to handle poison pills in Spring Kafka?

Modern applications are increasingly build using smaller components that communicate with each other using events, a so-called event-driven architecture. Event-driven architectures have three key components: event producers, event broker, and event consumers. It is important that the consumers can consume the events the producer has produced. Whenever this is not the case and you’re not ready for this scenario a single event can cause your systems to halt. Such an event is a poison pill.

In this blog I will discuss what exactly are poison pills, how they occur and how to deal with them. I also created an example project where you can see the effects of a poison pill and what you can do to handle them.

Serialization is the process of converting an object state into a format (series of bytes) that can be stored or transmitted and reconstructed later possibly in a different computer environment. The opposite operation, converting a series of bytes to an object state, is called deserialization.

In the context of Kafka, serialization is used to convert a Java object into a byte array that is subsequently produced by the producer to a topic. Deserialization is again used in the opposite manner, consuming a series of bytes from a topic and converting it into a Java object. Both the key and value of the record are serialized and later when consumed deserialized.

If the producer and consumer are using compatible serializers and deserializers everything will work as expected. In the image below an example of compatible JSON serializers and deserializers is given. The poison pill scenario is in this case avoided.

Deserialization errors will occur when producers and consumers have incompatible serializers and deserializers. Both the key and value deserializers should be compatible in order to prevent deserialization errors. In the example below the producer is using a StringSerializer while the consumer is using the incompatible deserializer FloatDeserializer.

After running the application, you can run the following command or pasting the url in your browser to trigger the poison pill:

In the logs you will now see a continuous stream of error logs. Here is an example of a log message:

Here is what happened:

You can imagine that if for every deserialization failure a message is logged, many gigabytes of logs are written to disk rapidly. Many production grade systems will also ship their logs to a centralized system where flooding this system can lead to huge costs. Also, finding other non-Kafka logs will become basically impossible. Depending on the size of the storage you have in place, all the storage could be filled by Kafka logs which will probably result in a non-functioning machine. So, in short, a poison pill could have quite a severe impact.

There are four options of dealing with poison pills, however I would strongly recommend only one.

The first option is to wait until the retention period of the Kafka topic has passed, which is 7 days per default. After the retention period the poison pill is discarded. Waiting for 7 days until the consumer can consume messages again is far from ideal. There is the possibility to set the retention period to a lower number, which will discard poison pills earlier. However, records that are produced closely after the poison pill will likely be discarded as well. Depending on the situation this can be even more damaging.

The second option is to manually update the offset to after the offset of the poison pill. The advantage of this option is that you have a lot of control. You can set the offset to exactly to the offset after the poison pill. The disadvantage is that it is not straightforward. You must have access to the production Kafka cluster — which is never a good sign — and you will need to have knowledge about the Kafka binaries.

In case you do need to reset the offset programmatically you can use the following command:

Note: The consumer should be stopped before resetting the offset

The third option is easier than the second option and does not require the execution of commands. It is also possible to change the consumer group and start consuming from the latest offset. The disadvantage of this ‘solution’ is that messages between the poison pill and the last produced record to the topic won’t be consumed and will be lost. Should another poison pill occur in the future, the consumer group will need to be changed again. The image below shows what happens after the consumer group is changed.

The last and recommended option is to configure a ErrorHandlingDeserializer using Spring Kafka.

The first thing we have to do is to replace the deserializers by the ErrorHandlingDeserializer for both the key and value. Since the ErrorHandlingDeserializer will delegate the deserialization to the real deserializers, we need to add the classes where the ErrorHandlingDeserializer can delegate to. The delegate classes can be added to the config by adding the spring.deserializer.key.delegate.class and spring.deserializer.value.delegate.class properties:

application.yml

How does the ErrorHandlingDeserializer work in practice? The ErrorHandlingDeserializer will try to deserialize the key and value using the delegated class. If no DeserializationException is thrown the record is passed to the consumer and will work as normal. However, if a DeserializationException is thrown, the record is no longer passed to the consumer. The configured ErrorHandler is instead called with the thrown exception and the failing record. After the ErrorHandler has handled the error, the consumer will resume consuming as if nothing has happened. The offset has now been moved forward. The ErrorHandler swallowed the poison pill and no more continuous streams of error messages.

Optionally, a simple bean can be configured that will log some additional information about the failing record compared to the default SeekToCurrentErrorHandler:

In case you have multiple consumers, it is also possible to programmatically configure the ErrorHandlingDeserializer. The following examples demonstrates how to configure a JsonDeserializer with the LoggingErrorHandler:

With the ErrorHandlingDeserializer now configured you should see only one log message with the DeserializationException. Crisis averted 😃 !

You might have noticed that using the LoggingErrorHandler does not actually log the value of the poison pill, but instead only shows the sequence of bytes. It is possible to log the actual value of the poison pill, by publishing the poison pill to a so-called dead letter topic (DLT). You can think of a dead letter topic as a backup topic where records that were not able to be consumed are send to.

You can configure a DeadLetterPublishingRecoverer that will send the poison pill to the dead letter topic. It can be configured as follows:

Since you’re most likely already configured a serializer and deserializer that is not using the byte[] type, we'll need to configure a separate serializer and deserializer for the DeadLetterPublishingRecoverer:

You can now create a separate consumer that is able to consume messages from the DLT topic:

Note: In the consumer above the poison pill’s value is converted from bytes to a string. This conversion only works if the event value is a serialized string. In case the event value is of a different type, the poison pill consumer should convert it this type instead.

A poison pill is a record that cannot be consumed, no matter how many times it is attempted by the consumer. Poison pills occur either because of a corrupt record , deserialization error or a schema mismatch. Without a configured ErrorHandlingDeserializer, many gigabytes of logs can be rapidly written to disk, which can cause the system to halt. The recommended way to handle a deserialization error is to configure an ErrorHandlingDeserializer, which will make sure that the consumer can continue processing new records. In case you need the actual value of a poison pill, a DeadLetterPublishingRecoverer can be configured, which allows you to log the actual value of the record.

Follow my blog for more upcoming posts about Kafka and related topics!

Add a comment

Related posts:

Softmax Function in Deep Learning!

Softmax function is also known as Softargmax or normalized exponential function. It is a generalization of the logistic function to multiple dimensions. It is used in multinomial logistic regression…

Which NFTs the Whales minted in the last 24 hours? Uniswap V3 LP proves that NFT tops Whales Mint list.

Element Ethereum Aggregated NFT Marketplace is providing users ‘asset recommendation services’ based on a combination of on-chain data analysis and social signal tracking, with dimensions divided…

Three Stories from One of the Unsung Heroes of NYC

Susan Orlean for the New Yorker described one of his occupations as a “gentle reign.” This, of course, referred to the man’s time as the king and supreme ruler of the African Ashanti people in Ghana…