PCSalt
YouTube GitHub
Back to Spring Boot
Spring Boot · 2 min read

Spring Boot + Kafka — Producer, Consumer & Dead Letter Topics

Integrate Apache Kafka with Spring Boot 4 — producer configuration, consumer groups, error handling, dead letter topics, and JSON serialization.


Kafka is the default choice for event streaming in the Java ecosystem. Spring Boot makes it easy to produce and consume messages, but the defaults aren’t production-ready. This post covers setup, producer/consumer configuration, JSON serialization, error handling, and dead letter topics.

Dependencies

dependencies {
  implementation("org.springframework.boot:spring-boot-starter-web")
  implementation("org.springframework.kafka:spring-kafka")
  implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

  testImplementation("org.springframework.boot:spring-boot-starter-test")
  testImplementation("org.springframework.kafka:spring-kafka-test")
  testImplementation("org.springframework.boot:spring-boot-testcontainers")
  testImplementation("org.testcontainers:kafka")
}

Configuration

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring.json.add.type.headers: false
      acks: all
      retries: 3
    consumer:
      group-id: my-app
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: com.example.demo.event
    listener:
      ack-mode: record

Key settings:

  • acks: all — the producer waits for all replicas to acknowledge. Slower but no message loss.
  • auto-offset-reset: earliest — new consumers start from the beginning of the topic.
  • spring.json.trusted.packages — only deserialize classes from trusted packages. Security measure.
  • ack-mode: record — acknowledge each message individually.

Event classes

package com.example.demo.event

import java.math.BigDecimal
import java.time.Instant
import java.util.UUID

data class ProductCreatedEvent(
  val productId: UUID,
  val name: String,
  val price: BigDecimal,
  val timestamp: Instant = Instant.now()
)

data class ProductUpdatedEvent(
  val productId: UUID,
  val name: String,
  val price: BigDecimal,
  val timestamp: Instant = Instant.now()
)

data class OrderPlacedEvent(
  val orderId: UUID,
  val userId: UUID,
  val productIds: List<UUID>,
  val totalAmount: BigDecimal,
  val timestamp: Instant = Instant.now()
)

Producer

Basic producer

package com.example.demo.producer

import com.example.demo.event.ProductCreatedEvent
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
import java.util.concurrent.CompletableFuture

@Component
class ProductEventProducer(
  private val kafkaTemplate: KafkaTemplate<String, Any>
) {

  private val log = LoggerFactory.getLogger(ProductEventProducer::class.java)

  fun sendProductCreated(event: ProductCreatedEvent) {
    val future = kafkaTemplate.send("product-events", event.productId.toString(), event)

    future.whenComplete { result, ex ->
      if (ex != null) {
        log.error("Failed to send product event: ${event.productId}", ex)
      } else {
        val metadata = result.recordMetadata
        log.info(
          "Sent product event: ${event.productId} to ${metadata.topic()} " +
          "partition=${metadata.partition()} offset=${metadata.offset()}"
        )
      }
    }
  }
}

The key (event.productId.toString()) determines which partition receives the message. Messages with the same key always go to the same partition, preserving order per product.

Producing from the service layer

package com.example.demo.service

import com.example.demo.domain.Product
import com.example.demo.dto.CreateProductRequest
import com.example.demo.dto.ProductResponse
import com.example.demo.event.ProductCreatedEvent
import com.example.demo.producer.ProductEventProducer
import com.example.demo.repository.ProductRepository
import org.springframework.stereotype.Service
import org.springframework.transaction.annotation.Transactional

@Service
class ProductService(
  private val productRepository: ProductRepository,
  private val productEventProducer: ProductEventProducer
) {

  @Transactional
  fun create(request: CreateProductRequest): ProductResponse {
    val product = Product(
      name = request.name,
      description = request.description,
      price = request.price,
      stockQuantity = request.stockQuantity
    )
    val saved = productRepository.save(product)

    productEventProducer.sendProductCreated(
      ProductCreatedEvent(
        productId = saved.id!!,
        name = saved.name,
        price = saved.price
      )
    )

    return saved.toResponse()
  }
}

Note: the Kafka send happens outside the database transaction. If the send fails after the database commit, you have an inconsistency. For critical flows, use the transactional outbox pattern instead.

Consumer

Basic consumer

package com.example.demo.consumer

import com.example.demo.event.ProductCreatedEvent
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class ProductEventConsumer {

  private val log = LoggerFactory.getLogger(ProductEventConsumer::class.java)

  @KafkaListener(
    topics = ["product-events"],
    groupId = "inventory-service"
  )
  fun handleProductCreated(event: ProductCreatedEvent) {
    log.info("Received product event: ${event.productId} - ${event.name}")

    // Process the event — update inventory, notify subscribers, etc.
    processEvent(event)
  }

  private fun processEvent(event: ProductCreatedEvent) {
    // Business logic here
    log.info("Processed product event: ${event.productId}")
  }
}

Multiple consumer groups

Different services can consume the same topic independently:

// Inventory service — updates stock
@KafkaListener(topics = ["product-events"], groupId = "inventory-service")
fun handleForInventory(event: ProductCreatedEvent) {
  inventoryService.initializeStock(event.productId)
}

// Search service — updates search index
@KafkaListener(topics = ["product-events"], groupId = "search-service")
fun handleForSearch(event: ProductCreatedEvent) {
  searchService.indexProduct(event.productId, event.name)
}

// Analytics service — records metrics
@KafkaListener(topics = ["product-events"], groupId = "analytics-service")
fun handleForAnalytics(event: ProductCreatedEvent) {
  analyticsService.recordProductCreation(event.productId)
}

Each consumer group gets every message independently. Consumers within the same group share the load (each partition is assigned to one consumer in the group).

Error handling

Default behavior

By default, if a consumer throws an exception, Spring retries 10 times with no delay, then logs the error and moves to the next message. The failed message is lost.

Custom error handler

package com.example.demo.config

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer
import org.springframework.kafka.listener.DefaultErrorHandler
import org.springframework.util.backoff.FixedBackOff

@Configuration
class KafkaConfig {

  @Bean
  fun kafkaListenerContainerFactory(
    consumerFactory: ConsumerFactory<String, Any>,
    kafkaTemplate: KafkaTemplate<String, Any>
  ): ConcurrentKafkaListenerContainerFactory<String, Any> {
    val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
    factory.consumerFactory = consumerFactory

    // Retry 3 times with 1 second delay, then send to DLT
    val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate)
    val errorHandler = DefaultErrorHandler(recoverer, FixedBackOff(1000L, 3L))

    // Don't retry on certain exceptions
    errorHandler.addNotRetryableExceptions(
      com.fasterxml.jackson.core.JsonParseException::class.java,
      IllegalArgumentException::class.java
    )

    factory.setCommonErrorHandler(errorHandler)
    return factory
  }
}

This configuration:

  1. Retries failed messages 3 times with a 1-second delay
  2. After all retries fail, sends the message to a dead letter topic (DLT)
  3. Skips retries for deserialization errors and validation errors — retrying those won’t help

Dead letter topics

When a message fails all retries, DeadLetterPublishingRecoverer sends it to a topic named {original-topic}.DLT by default.

Custom DLT topic naming

val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
  org.apache.kafka.common.TopicPartition(
    "${record.topic()}-dlq",
    record.partition()
  )
}

DLT consumer for monitoring

@Component
class DeadLetterConsumer {

  private val log = LoggerFactory.getLogger(DeadLetterConsumer::class.java)

  @KafkaListener(
    topics = ["product-events.DLT"],
    groupId = "dlt-monitor"
  )
  fun handleDeadLetter(record: ConsumerRecord<String, Any>) {
    log.error(
      "Dead letter received — topic: ${record.topic()}, " +
      "key: ${record.key()}, value: ${record.value()}, " +
      "headers: ${record.headers().map { "${it.key()}=${String(it.value())}" }}"
    )

    // Alert, store for manual review, etc.
  }
}

Reprocessing dead letters

When the underlying issue is fixed (bug patched, downstream service recovered), reprocess the DLT:

@Component
class DeadLetterReprocessor(
  private val kafkaTemplate: KafkaTemplate<String, Any>
) {

  fun reprocess(dltTopic: String, targetTopic: String) {
    // Read from DLT and republish to original topic
    // In practice, use a consumer to read and a producer to republish
  }
}

Topic configuration

Create topics programmatically:

package com.example.demo.config

import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder

@Configuration
class KafkaTopicConfig {

  @Bean
  fun productEventsTopic(): NewTopic {
    return TopicBuilder.name("product-events")
      .partitions(6)
      .replicas(3)
      .config("retention.ms", "604800000") // 7 days
      .build()
  }

  @Bean
  fun productEventsDltTopic(): NewTopic {
    return TopicBuilder.name("product-events.DLT")
      .partitions(1)
      .replicas(3)
      .config("retention.ms", "2592000000") // 30 days
      .build()
  }
}

DLT topics need longer retention — you want time to investigate and reprocess failures.

Testing with embedded Kafka

package com.example.demo.consumer

import com.example.demo.event.ProductCreatedEvent
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.test.context.EmbeddedKafka
import java.math.BigDecimal
import java.time.Duration
import java.util.UUID
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

@SpringBootTest
@EmbeddedKafka(
  partitions = 1,
  topics = ["product-events"],
  brokerProperties = ["listeners=PLAINTEXT://localhost:9093"]
)
class ProductEventConsumerTest {

  @Autowired
  lateinit var kafkaTemplate: KafkaTemplate<String, Any>

  @Test
  fun `consumer processes product event`() {
    val event = ProductCreatedEvent(
      productId = UUID.randomUUID(),
      name = "Test Widget",
      price = BigDecimal("19.99")
    )

    kafkaTemplate.send("product-events", event.productId.toString(), event).get()

    // In a real test, verify the side effect (database update, etc.)
    // Using a CountDownLatch or Awaitility for async verification
    Thread.sleep(2000) // Simplified — use Awaitility in production code
  }
}

For more realistic tests, use Testcontainers with a real Kafka broker:

companion object {
  @Container
  @ServiceConnection
  val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
}

Production checklist

  • Set acks=all on producers for durability
  • Configure idempotence: enable.idempotence=true (default in newer Kafka clients)
  • Set meaningful consumer group IDs per service
  • Configure DLT for every topic
  • Monitor consumer lag — if consumers fall behind, you have a throughput problem
  • Set appropriate partition counts — you can’t reduce partitions after creation
  • Use schema registry (Avro/Protobuf) for production event contracts

Kafka with Spring Boot is straightforward for basic produce/consume flows. The complexity is in error handling, ordering guarantees, and exactly-once semantics. Start with the setup in this post, add monitoring, and evolve from there.