CQRS with Spring Boot, Kafka & MongoDB — Part 4: The Hard Parts
Eventual consistency, idempotent consumers, schema evolution, the outbox problem — the real-world challenges that make CQRS hard and how to handle them.
This is Part 4 of a series on building a CQRS architecture with Spring Boot, Kafka, and MongoDB.
- What is CQRS and why you need it
- Command side — writes done right
- Query side — reads at scale
- The hard parts (this post)
- Putting it all together
Parts 2 and 3 gave us a working system. The command service accepts writes, publishes events to Kafka, and the query service consumes them to build read-optimized projections. It works on your machine. It works in staging.
Then production happens.
This post covers the problems you’ll actually hit. None of them are theoretical. All of them have bitten real systems.
1. Eventual consistency — your query side is always behind
This is the defining trade-off of CQRS. When the command service persists an order and publishes an event, there’s a gap before the query service processes that event and updates its projection.
How big is that gap? In a healthy system, milliseconds to low single-digit seconds. Under load or during consumer rebalances, it can stretch to minutes.
The problem shows up like this: a customer places an order, gets a 201 response, immediately navigates to “My Orders” — and their order isn’t there. They place it again. Now you have two orders.
What to tell users
Be honest in your UI. After a successful command, show a confirmation that doesn’t depend on the query side:
"Your order #12345 has been placed. It will appear in your order history shortly."
This isn’t a hack. It’s accurate. The order has been placed — it’s persisted on the command side. The query side just hasn’t caught up yet.
Strategies to bridge the gap
Optimistic UI — the simplest approach. After the command succeeds, the client adds the new order to its local state immediately. When the query side catches up, the real data replaces the optimistic version. This is what most modern SPAs do anyway.
Polling — the client polls the query API after a write. Simple, works everywhere, but wasteful.
// Client-side pseudo-code for polling
suspend fun waitForOrder(orderId: String, maxAttempts: Int = 10): Order? {
repeat(maxAttempts) { attempt ->
val order = queryApi.getOrder(orderId)
if (order != null) return order
delay(500L * (attempt + 1)) // Back off
}
return null
}
WebSocket notifications — the query service pushes an update when the projection is ready. More complex to set up, but gives real-time feedback.
@Configuration
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun configureMessageBroker(config: MessageBrokerRegistry) {
config.enableSimpleBroker("/topic")
config.setApplicationDestinationPrefixes("/app")
}
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS()
}
}
After the query service updates a projection, it sends a notification:
@Component
class OrderProjectionNotifier(
private val messagingTemplate: SimpMessagingTemplate
) {
fun notifyOrderUpdated(orderId: String) {
messagingTemplate.convertAndSend(
"/topic/orders/$orderId",
mapOf("status" to "ready", "orderId" to orderId)
)
}
}
Measuring lag
You can’t fix what you can’t measure. Track consumer lag — the difference between the latest offset in the Kafka topic and the offset your consumer group has committed.
@Component
class ConsumerLagMetrics(
private val meterRegistry: MeterRegistry,
private val adminClient: AdminClient
) {
@Scheduled(fixedRate = 10_000)
fun recordLag() {
val groupOffsets = adminClient
.listConsumerGroupOffsets("order-query-group")
.partitionsToOffsetAndMetadata()
.get()
val endOffsets = adminClient
.listOffsets(
groupOffsets.keys.associateWith { OffsetSpec.latest() }
)
.all()
.get()
groupOffsets.forEach { (partition, offsetAndMetadata) ->
val lag = endOffsets[partition]!!.offset() - offsetAndMetadata.offset()
meterRegistry.gauge(
"kafka.consumer.lag",
Tags.of("partition", partition.partition().toString()),
lag.toDouble()
)
}
}
}
Alert when lag exceeds a threshold. In our order system, if lag exceeds 30 seconds, someone should look at it.
2. Idempotent consumers — Kafka delivers at-least-once
Kafka guarantees at-least-once delivery. That means your consumer will see the same event more than once. Network hiccups, consumer rebalances, offset commit failures — all of these cause redelivery.
What happens when your consumer processes OrderPlaced twice? If your projection handler does an insert, you get a duplicate order in the query database. If it increments a counter, your order count is wrong.
Strategy 1: Idempotent operations with upserts
Design your projection updates to be naturally idempotent. Use upserts instead of inserts.
@Component
class OrderProjectionHandler(
private val mongoTemplate: MongoTemplate
) {
fun handleOrderPlaced(event: OrderPlacedEvent) {
val query = Query.query(Criteria.where("orderId").`is`(event.orderId))
val update = Update()
.set("orderId", event.orderId)
.set("customerId", event.customerId)
.set("items", event.items)
.set("totalAmount", event.totalAmount)
.set("status", "PLACED")
.set("placedAt", event.timestamp)
.set("lastEventId", event.eventId)
mongoTemplate.upsert(query, update, OrderProjection::class.java)
}
}
Processing the same OrderPlaced event twice results in the same document. No duplicates, no corruption.
Strategy 2: Event ID tracking
For operations that aren’t naturally idempotent — like sending a notification or calling an external API — track which events you’ve already processed.
@Document("processed_events")
data class ProcessedEvent(
@Id val eventId: String,
val processedAt: Instant
)
@Component
class IdempotentEventHandler(
private val processedEventRepository: ProcessedEventRepository,
private val projectionHandler: OrderProjectionHandler
) {
fun handle(event: OrderEvent) {
// Check if already processed
if (processedEventRepository.existsById(event.eventId)) {
log.info("Event ${event.eventId} already processed, skipping")
return
}
// Process the event
projectionHandler.handleEvent(event)
// Mark as processed
processedEventRepository.save(
ProcessedEvent(
eventId = event.eventId,
processedAt = Instant.now()
)
)
}
companion object {
private val log = LoggerFactory.getLogger(IdempotentEventHandler::class.java)
}
}
This has a race condition — two threads could both check, find nothing, and both process. Fix it with a unique index on eventId:
@CompoundIndex(def = "{'eventId': 1}", unique = true)
@Document("processed_events")
data class ProcessedEvent(
@Id val eventId: String,
val processedAt: Instant
)
Now the second insert throws a DuplicateKeyException, and you catch and ignore it.
Strategy 3: Make your events carry enough context
Include enough information in each event to compute the final state, not a delta. Instead of OrderTotalChanged(delta = +5.00), use OrderTotalChanged(newTotal = 25.00). Applying the same absolute value twice is safe. Applying the same delta twice is not.
3. Consumer failure and replay
Your query service crashes mid-batch. It processed 8 out of 10 events in a poll, then died. What happens to those 10 events?
It depends on your offset commit strategy.
Auto-commit (the default, and usually wrong)
With enable.auto.commit=true, Kafka commits offsets periodically in the background. If your consumer crashes after auto-commit but before finishing processing, those events are gone — skipped. If it crashes before auto-commit, you’ll reprocess everything since the last commit.
Manual commit after processing
The safer approach. Commit offsets only after you’ve successfully processed the batch.
@Component
class OrderEventConsumer(
private val projectionHandler: OrderProjectionHandler
) {
@KafkaListener(
topics = ["order-events"],
groupId = "order-query-group",
properties = ["enable.auto.commit=false"]
)
fun consume(
records: List<ConsumerRecord<String, String>>,
acknowledgment: Acknowledgment
) {
records.forEach { record ->
val event = deserialize(record.value())
projectionHandler.handleEvent(event)
}
// Commit only after all records are processed
acknowledgment.acknowledge()
}
}
Configure the container for manual acknowledgment:
@Configuration
class KafkaConsumerConfig {
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, String>
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory
factory.containerProperties.ackMode = ContainerProperties.AckMode.MANUAL
factory.isBatchListener = true
return factory
}
}
If the consumer crashes mid-batch, those events will be redelivered on restart. Combined with idempotent consumers from the previous section, this is safe.
Dead letter topics
Some events are poison pills — they’ll never process successfully no matter how many times you retry. A malformed event, a bug in your handler, a missing reference. Without a dead letter topic, your consumer gets stuck in an infinite retry loop.
@Configuration
class KafkaConsumerConfig {
@Bean
fun kafkaListenerContainerFactory(
consumerFactory: ConsumerFactory<String, String>,
kafkaTemplate: KafkaTemplate<String, String>
): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory
val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { record, _ ->
TopicPartition("order-events.DLT", record.partition())
}
factory.setCommonErrorHandler(
DefaultErrorHandler(recoverer, FixedBackOff(1000L, 3L))
)
return factory
}
}
Three retries with 1-second backoff, then the event goes to order-events.DLT. Monitor the dead letter topic. Alert on it. Have a process to investigate and replay dead letters after fixing the root cause.
4. Schema evolution — events are forever
You deployed version 1 with this event:
data class OrderPlacedEvent(
val eventId: String,
val orderId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
val timestamp: Instant
)
Now the business wants to track the order source — was it from the website, mobile app, or API? You need to add a source field.
The problem: your Kafka topic still has thousands of old events without source. If you replay your consumer from the beginning to rebuild projections (a legitimate operation), it needs to handle both old and new events.
Backward-compatible changes
Add new fields as optional with sensible defaults:
data class OrderPlacedEvent(
val eventId: String,
val orderId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
val timestamp: Instant,
val source: String? = null // New field, defaults to null
)
Old events deserialize fine — source is null. New events have the value. Your projection handler deals with both:
fun handleOrderPlaced(event: OrderPlacedEvent) {
val update = Update()
.set("orderId", event.orderId)
.set("customerId", event.customerId)
.set("items", event.items)
.set("totalAmount", event.totalAmount)
.set("status", "PLACED")
.set("source", event.source ?: "UNKNOWN")
mongoTemplate.upsert(
Query.query(Criteria.where("orderId").`is`(event.orderId)),
update,
OrderProjection::class.java
)
}
What breaks backward compatibility
- Removing a field
- Renaming a field
- Changing a field’s type
If you must make breaking changes, version your events:
// V1 — original
data class OrderPlacedEventV1(
val eventId: String,
val orderId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
val timestamp: Instant
)
// V2 — breaking change: items structure changed
data class OrderPlacedEventV2(
val eventId: String,
val orderId: String,
val customerId: String,
val lineItems: List<LineItem>, // Different structure
val totalAmount: BigDecimal,
val currency: String, // New required field
val timestamp: Instant
)
Include a version field or use different event type names. Your consumer needs to handle both:
fun handleEvent(eventType: String, payload: String) {
when (eventType) {
"OrderPlaced.v1" -> handleOrderPlacedV1(deserializeV1(payload))
"OrderPlaced.v2" -> handleOrderPlacedV2(deserializeV2(payload))
else -> log.warn("Unknown event type: $eventType")
}
}
This is ugly. This is also reality. Events in Kafka are immutable. You can’t go back and rewrite them. Design your events carefully from the start, and prefer additive changes whenever possible.
5. Ordering guarantees — partitions matter
Kafka guarantees ordering within a partition, not across partitions. If your order-events topic has 6 partitions and you’re using the default partitioner, events for the same order might end up on different partitions.
That means OrderPlaced might be processed after OrderShipped for the same order. Your projection is now broken.
Partition by entity ID
Always set the message key to the entity ID that requires ordering:
@Component
class OrderEventPublisher(
private val kafkaTemplate: KafkaTemplate<String, String>
) {
fun publish(event: OrderEvent) {
kafkaTemplate.send(
"order-events",
event.orderId, // Key — Kafka hashes this to pick a partition
serialize(event)
)
}
}
All events for order ORD-123 go to the same partition. Kafka guarantees they’re consumed in order.
The cross-entity ordering problem
What if you need to process all events for a customer in order? A customer might have multiple orders, each partitioned by order ID — potentially on different partitions processed by different consumer instances.
There’s no clean solution. Options:
- Partition by customer ID instead of order ID. Now customer events are ordered, but events across orders for different customers on the same partition create a bottleneck.
- Accept eventual consistency at the customer level. The customer dashboard might briefly show stale data. For most use cases, this is fine.
- Use a single partition. Guarantees global ordering. Kills throughput. Almost never the right answer.
Pick your ordering granularity based on your domain. For an order management system, ordering by orderId is almost always correct.
6. Monitoring and observability
A CQRS system has more moving parts than a monolith. If you can’t see what’s happening, you can’t operate it.
Consumer lag
The most important metric. It tells you how far behind the query side is.
# application.yml for the query service
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
tags:
application: order-query-service
Spring Boot with Micrometer and the Kafka binder exposes consumer lag automatically. Scrape it with Prometheus and visualize in Grafana.
Alert thresholds:
- Warning at 1,000 messages behind — something is slow
- Critical at 10,000 messages behind — something is broken
Projection freshness
Track when each projection was last updated:
@Component
class ProjectionFreshnessMetrics(
private val mongoTemplate: MongoTemplate,
private val meterRegistry: MeterRegistry
) {
@Scheduled(fixedRate = 30_000)
fun recordFreshness() {
val latestProjection = mongoTemplate.findOne(
Query().with(Sort.by(Sort.Direction.DESC, "lastUpdatedAt")).limit(1),
OrderProjection::class.java
)
latestProjection?.let {
val lagSeconds = Duration.between(it.lastUpdatedAt, Instant.now()).seconds
meterRegistry.gauge("projection.freshness.seconds", lagSeconds.toDouble())
}
}
}
If this number keeps climbing, your consumer is stuck or dead.
Health checks
Add a custom health indicator that checks both Kafka connectivity and consumer group status:
@Component
class KafkaConsumerHealthIndicator(
private val adminClient: AdminClient
) : HealthIndicator {
override fun health(): Health {
return try {
val description = adminClient
.describeConsumerGroups(listOf("order-query-group"))
.describedGroups()["order-query-group"]
?.get(5, TimeUnit.SECONDS)
if (description?.state() == ConsumerGroupState.STABLE) {
Health.up()
.withDetail("state", description.state().name)
.withDetail("members", description.members().size)
.build()
} else {
Health.down()
.withDetail("state", description?.state()?.name ?: "UNKNOWN")
.build()
}
} catch (e: Exception) {
Health.down(e).build()
}
}
}
7. The outbox problem
This is the sneakiest failure mode in CQRS. The command service does two things: write to MongoDB and publish to Kafka. What if the MongoDB write succeeds but the Kafka publish fails?
The order is saved. The event is never published. The query side never knows about it. The order exists in the command database but is invisible to every read consumer.
The reverse is also bad. Kafka publish succeeds, MongoDB write fails (maybe due to a timeout). The query side processes an event for an order that doesn’t exist.
Why this happens
MongoDB and Kafka are two separate systems. You can’t do an atomic transaction across both. This is the dual-write problem.
// THIS IS BROKEN — dual write
fun placeOrder(command: PlaceOrderCommand): Order {
val order = Order.from(command)
orderRepository.save(order) // Step 1: DB write — succeeds
// Application crashes here
kafkaTemplate.send("order-events", order.id, serialize(OrderPlacedEvent(order)))
// Step 2: Kafka publish — never happens
return order
}
Solution: Transactional outbox pattern
Instead of publishing directly to Kafka, write the event to an outbox collection in the same MongoDB transaction as the order. A separate process reads the outbox and publishes to Kafka.
@Service
class OrderCommandService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val mongoTemplate: MongoTemplate
) {
fun placeOrder(command: PlaceOrderCommand): Order {
val session = mongoTemplate.mongoDbFactory.getMongoDatabase()
.let { mongoTemplate.mongoDatabaseFactory.getSession(ClientSessionOptions.builder().build()) }
return session.use { s ->
s.startTransaction()
try {
val order = Order.from(command)
orderRepository.save(order)
val outboxEntry = OutboxEntry(
id = UUID.randomUUID().toString(),
aggregateId = order.id,
eventType = "OrderPlaced",
payload = serialize(OrderPlacedEvent.from(order)),
createdAt = Instant.now(),
published = false
)
outboxRepository.save(outboxEntry)
s.commitTransaction()
order
} catch (e: Exception) {
s.abortTransaction()
throw e
}
}
}
}
@Document("outbox")
data class OutboxEntry(
@Id val id: String,
val aggregateId: String,
val eventType: String,
val payload: String,
val createdAt: Instant,
val published: Boolean
)
Now the order and the event are written atomically. Either both succeed or both fail.
Publishing from the outbox
Two approaches:
Polling publisher — a scheduled job reads unpublished entries and sends them to Kafka.
@Component
class OutboxPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Scheduled(fixedDelay = 500)
fun publishPending() {
val pending = outboxRepository.findByPublishedFalseOrderByCreatedAtAsc()
pending.forEach { entry ->
try {
kafkaTemplate.send("order-events", entry.aggregateId, entry.payload).get()
outboxRepository.save(entry.copy(published = true))
} catch (e: Exception) {
log.error("Failed to publish outbox entry ${entry.id}", e)
// Will retry on next poll
}
}
}
companion object {
private val log = LoggerFactory.getLogger(OutboxPublisher::class.java)
}
}
Simple, reliable, but adds latency (up to fixedDelay milliseconds). Good enough for most systems.
MongoDB change streams — MongoDB can notify you when documents are inserted into the outbox collection. Near real-time, but more complex to operate.
@Component
class OutboxChangeStreamPublisher(
private val mongoTemplate: MongoTemplate,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@PostConstruct
fun startWatching() {
val pipeline = listOf(
Aggregation.match(Criteria.where("operationType").`is`("insert"))
)
Thread {
mongoTemplate.getCollection("outbox")
.watch(pipeline.map { it.toDocument() })
.forEach { change ->
val doc = change.fullDocument ?: return@forEach
val aggregateId = doc.getString("aggregateId")
val payload = doc.getString("payload")
kafkaTemplate.send("order-events", aggregateId, payload).get()
}
}.apply {
isDaemon = true
name = "outbox-change-stream"
start()
}
}
}
Change streams require a MongoDB replica set. If you’re already running a replica set (you should be for production), this is a natural fit.
Clean up the outbox
Published entries are dead weight. Schedule a cleanup:
@Scheduled(fixedRate = 3_600_000) // Every hour
fun cleanupPublished() {
val cutoff = Instant.now().minus(Duration.ofHours(24))
outboxRepository.deleteByPublishedTrueAndCreatedAtBefore(cutoff)
}
Keep them for 24 hours so you can debug issues, then delete.
The common thread
Every problem in this post boils down to one thing: distributed systems don’t give you atomicity for free. CQRS splits your system across multiple services and data stores. That’s the point — it gives you independent scalability and optimized models. But it means you have to handle the gaps explicitly.
The good news: these are solved problems. Idempotent consumers, transactional outbox, manual offset management, dead letter topics — these patterns exist because thousands of production systems have hit these exact issues and built solutions.
The bad news: you have to actually implement them. They don’t come for free.
What’s next
In Part 5, we’ll put everything together. Docker Compose with MongoDB, Kafka, and both services. A full end-to-end walkthrough from placing an order to seeing it on the dashboard. Integration tests with Testcontainers. And a production readiness checklist so you know what to have in place before deploying CQRS for real.