Spring Boot + Kafka — Producer, Consumer & Dead Letter Topics
Integrate Apache Kafka with Spring Boot 4 — producer configuration, consumer groups, error handling, dead letter topics, and JSON serialization.
Kafka is the default choice for event streaming in the Java ecosystem. Spring Boot makes it easy to produce and consume messages, but the defaults aren’t production-ready. This post covers setup, producer/consumer configuration, JSON serialization, error handling, and dead letter topics.
Dependencies
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.kafka:spring-kafka")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.springframework.boot:spring-boot-testcontainers")
testImplementation("org.testcontainers:kafka")
}
Configuration
application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.add.type.headers: false
acks: all
retries: 3
consumer:
group-id: my-app
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.example.demo.event
listener:
ack-mode: record
Key settings:
acks: all— the producer waits for all replicas to acknowledge. Slower but no message loss.auto-offset-reset: earliest— new consumers start from the beginning of the topic.spring.json.trusted.packages— only deserialize classes from trusted packages. Security measure.ack-mode: record— acknowledge each message individually.
Event classes
package com.example.demo.event
import java.math.BigDecimal
import java.time.Instant
import java.util.UUID
data class ProductCreatedEvent(
val productId: UUID,
val name: String,
val price: BigDecimal,
val timestamp: Instant = Instant.now()
)
data class ProductUpdatedEvent(
val productId: UUID,
val name: String,
val price: BigDecimal,
val timestamp: Instant = Instant.now()
)
data class OrderPlacedEvent(
val orderId: UUID,
val userId: UUID,
val productIds: List<UUID>,
val totalAmount: BigDecimal,
val timestamp: Instant = Instant.now()
)
Producer
Basic producer
package com.example.demo.producer
import com.example.demo.event.ProductCreatedEvent
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
import java.util.concurrent.CompletableFuture
@Component
class ProductEventProducer(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
private val log = LoggerFactory.getLogger(ProductEventProducer::class.java)
fun sendProductCreated(event: ProductCreatedEvent) {
val future = kafkaTemplate.send("product-events", event.productId.toString(), event)
future.whenComplete { result, ex ->
if (ex != null) {
log.error("Failed to send product event: ${event.productId}", ex)
} else {
val metadata = result.recordMetadata
log.info(
"Sent product event: ${event.productId} to ${metadata.topic()} " +
"partition=${metadata.partition()} offset=${metadata.offset()}"
)
}
}
}
}
The key (event.productId.toString()) determines which partition receives the message. Messages with the same key always go to the same partition, preserving order per product.
Producing from the service layer
package com.example.demo.service
import com.example.demo.domain.Product
import com.example.demo.dto.CreateProductRequest
import com.example.demo.dto.ProductResponse
import com.example.demo.event.ProductCreatedEvent
import com.example.demo.producer.ProductEventProducer
import com.example.demo.repository.ProductRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional
@Service
class ProductService(
private val productRepository: ProductRepository,
private val productEventProducer: ProductEventProducer
) {
@Transactional
fun create(request: CreateProductRequest): ProductResponse {
val product = Product(
name = request.name,
description = request.description,
price = request.price,
stockQuantity = request.stockQuantity
)
val saved = productRepository.save(product)
productEventProducer.sendProductCreated(
ProductCreatedEvent(
productId = saved.id!!,
name = saved.name,
price = saved.price
)
)
return saved.toResponse()
}
}
Note: the Kafka send happens outside the database transaction. If the send fails after the database commit, you have an inconsistency. For critical flows, use the transactional outbox pattern instead.
Consumer
Basic consumer
package com.example.demo.consumer
import com.example.demo.event.ProductCreatedEvent
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class ProductEventConsumer {
private val log = LoggerFactory.getLogger(ProductEventConsumer::class.java)
@KafkaListener(
topics = ["product-events"],
groupId = "inventory-service"
)
fun handleProductCreated(event: ProductCreatedEvent) {
log.info("Received product event: ${event.productId} - ${event.name}")
// Process the event — update inventory, notify subscribers, etc.
processEvent(event)
}
private fun processEvent(event: ProductCreatedEvent) {
// Business logic here
log.info("Processed product event: ${event.productId}")
}
}
Multiple consumer groups
Different services can consume the same topic independently:
// Inventory service — updates stock
@KafkaListener(topics = ["product-events"], groupId = "inventory-service")
fun handleForInventory(event: ProductCreatedEvent) {
inventoryService.initializeStock(event.productId)
}
// Search service — updates search index
@KafkaListener(topics = ["product-events"], groupId = "search-service")
fun handleForSearch(event: ProductCreatedEvent) {
searchService.indexProduct(event.productId, event.name)
}
// Analytics service — records metrics
@KafkaListener(topics = ["product-events"], groupId = "analytics-service")
fun handleForAnalytics(event: ProductCreatedEvent) {
analyticsService.recordProductCreation(event.productId)
}
Each consumer group gets every message independently. Consumers within the same group share the load (each partition is assigned to one consumer in the group).
Error handling
Default behavior
By default, if a consumer throws an exception, Spring retries 10 times with no delay, then logs the error and moves to the next message. The failed message is lost.
Custom error handler
package com.example.demo.config
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.util.backoff.FixedBackOff
@Configuration
class KafkaConfig {
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, Any>,
kafkaTemplate: KafkaTemplate<String, Any>
): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory
// Retry 3 times with 1 second delay, then send to DLT
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate)
val errorHandler = DefaultErrorHandler(recoverer, FixedBackOff(1000L, 3L))
// Don't retry on certain exceptions
errorHandler.addNotRetryableExceptions(
com.fasterxml.jackson.core.JsonParseException::class.java,
IllegalArgumentException::class.java
)
factory.setCommonErrorHandler(errorHandler)
return factory
}
}
This configuration:
- Retries failed messages 3 times with a 1-second delay
- After all retries fail, sends the message to a dead letter topic (DLT)
- Skips retries for deserialization errors and validation errors — retrying those won’t help
Dead letter topics
When a message fails all retries, DeadLetterPublishingRecoverer sends it to a topic named {original-topic}.DLT by default.
Custom DLT topic naming
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
org.apache.kafka.common.TopicPartition(
"${record.topic()}-dlq",
record.partition()
)
}
DLT consumer for monitoring
@Component
class DeadLetterConsumer {
private val log = LoggerFactory.getLogger(DeadLetterConsumer::class.java)
@KafkaListener(
topics = ["product-events.DLT"],
groupId = "dlt-monitor"
)
fun handleDeadLetter(record: ConsumerRecord<String, Any>) {
log.error(
"Dead letter received — topic: ${record.topic()}, " +
"key: ${record.key()}, value: ${record.value()}, " +
"headers: ${record.headers().map { "${it.key()}=${String(it.value())}" }}"
)
// Alert, store for manual review, etc.
}
}
Reprocessing dead letters
When the underlying issue is fixed (bug patched, downstream service recovered), reprocess the DLT:
@Component
class DeadLetterReprocessor(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
fun reprocess(dltTopic: String, targetTopic: String) {
// Read from DLT and republish to original topic
// In practice, use a consumer to read and a producer to republish
}
}
Topic configuration
Create topics programmatically:
package com.example.demo.config
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder
@Configuration
class KafkaTopicConfig {
@Bean
fun productEventsTopic(): NewTopic {
return TopicBuilder.name("product-events")
.partitions(6)
.replicas(3)
.config("retention.ms", "604800000") // 7 days
.build()
}
@Bean
fun productEventsDltTopic(): NewTopic {
return TopicBuilder.name("product-events.DLT")
.partitions(1)
.replicas(3)
.config("retention.ms", "2592000000") // 30 days
.build()
}
}
DLT topics need longer retention — you want time to investigate and reprocess failures.
Testing with embedded Kafka
package com.example.demo.consumer
import com.example.demo.event.ProductCreatedEvent
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.test.context.EmbeddedKafka
import java.math.BigDecimal
import java.time.Duration
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
@SpringBootTest
@EmbeddedKafka(
partitions = 1,
topics = ["product-events"],
brokerProperties = ["listeners=PLAINTEXT://localhost:9093"]
)
class ProductEventConsumerTest {
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, Any>
@Test
fun `consumer processes product event`() {
val event = ProductCreatedEvent(
productId = UUID.randomUUID(),
name = "Test Widget",
price = BigDecimal("19.99")
)
kafkaTemplate.send("product-events", event.productId.toString(), event).get()
// In a real test, verify the side effect (database update, etc.)
// Using a CountDownLatch or Awaitility for async verification
Thread.sleep(2000) // Simplified — use Awaitility in production code
}
}
For more realistic tests, use Testcontainers with a real Kafka broker:
companion object {
@Container
@ServiceConnection
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
}
Production checklist
- Set
acks=allon producers for durability - Configure idempotence:
enable.idempotence=true(default in newer Kafka clients) - Set meaningful consumer group IDs per service
- Configure DLT for every topic
- Monitor consumer lag — if consumers fall behind, you have a throughput problem
- Set appropriate partition counts — you can’t reduce partitions after creation
- Use schema registry (Avro/Protobuf) for production event contracts
Kafka with Spring Boot is straightforward for basic produce/consume flows. The complexity is in error handling, ordering guarantees, and exactly-once semantics. Start with the setup in this post, add monitoring, and evolve from there.