CQRS with Spring Boot, Kafka & MongoDB — Part 3: Query Side — Reads at Scale
Building the query service — consuming Kafka events, building read-optimized projections, and serving fast queries from MongoDB.
This is Part 3 of a series on building a CQRS architecture with Spring Boot, Kafka, and MongoDB.
- What is CQRS and why you need it
- Command side — writes done right
- Query side — reads at scale (this post)
- The hard parts
- Putting it all together
A separate service, not a module
The query service is a completely separate Spring Boot application. Different codebase. Different deployment. Different MongoDB database (or at least different collections). It doesn’t share code with the command service — not even domain classes.
Why this strict separation? Because the read model and write model serve different purposes. The moment you share an Order class between them, you’ve coupled their evolution. The write model needs to change for business rules. The read model needs to change for UI requirements. Shared code means one change breaks both.
┌──────────┐ ┌──────────────────────────────────────┐
│ │ │ Query Service │
│ Kafka │───────────────────────▶│ Consumer → Projection → MongoDB │
│ (events) │ │ │
└──────────┘ └──────────────┬───────────────────────┘
│
┌──────────────▼───────────────────────┐
│ REST API (reads only) │
│ GET /orders/{id} │
│ GET /orders?customerId=X │
│ GET /orders/stats │
└──────────────┬───────────────────────┘
│
┌──────────────▼───────────────────────┐
│ Client (dashboard) │
└──────────────────────────────────────┘
Project setup
Same Spring Initializr recipe as the command service. Spring Web, Spring Data MongoDB, Spring for Apache Kafka.
application.yml for the query service:
server:
port: 8081
spring:
data:
mongodb:
uri: mongodb://localhost:27017/orders_read?readPreference=secondaryPreferred
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order-query-service
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "com.example.orders.command.event"
spring.json.use.type.headers: true
Port 8081 because the command service is on 8080. In production they’d be on separate hosts, but for local development you need different ports.
The important bits:
readPreference=secondaryPreferred— reads go to secondary nodes when available, falling back to primary.group-id: order-query-service— Kafka consumer group. All instances of the query service share this group and split partition processing.auto-offset-reset: earliest— if this is a fresh consumer group, start from the beginning of the topic. You want every event, not just new ones.
Read-optimized projections
The command service stores an Order with IDs referencing products and customers. The query service stores an OrderView — a denormalized document ready to serve the UI without any joins.
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.index.Indexed
import org.springframework.data.mongodb.core.mapping.Document
import java.math.BigDecimal
import java.time.Instant
@Document(collection = "order_views")
data class OrderView(
@Id val orderId: String,
@Indexed val customerId: String,
val items: List<OrderItemView>,
val shippingAddress: AddressView,
val status: String,
val totalAmount: BigDecimal,
val itemCount: Int,
val createdAt: Instant,
val updatedAt: Instant,
val cancelledAt: Instant? = null,
val cancellationReason: String? = null
)
data class OrderItemView(
val productId: String,
val productName: String,
val quantity: Int,
val unitPrice: BigDecimal,
val lineTotal: BigDecimal
)
data class AddressView(
val street: String,
val city: String,
val state: String,
val zipCode: String,
val country: String
)
Compare this to the write model:
itemCount— pre-computed so the dashboard doesn’t count items on every render.lineTotalon each item — pre-computedquantity * unitPriceso the UI doesn’t do math.customerIdis indexed — the dashboard queries by customer. The write model might not need this index.cancellationReasonandcancelledAt— flattened into the document instead of being in a separate audit log.statusis a String, not an enum — the query side doesn’t need to validate status values. It just stores whatever the event says.
This is the power of separate read models. Shape the data for the consumer, not for the business rules.
MongoDB read preference — secondaryPreferred
In Part 2, the command service wrote with w: majority to ensure durability. Here, the query service reads from secondaries.
Why secondaries? Because the query service handles the bulk of the traffic. Dashboard pages, search results, reports. All reads. Directing this traffic to secondaries keeps the primary free for writes.
secondaryPreferred means: read from a secondary if one is available. If all secondaries are down, read from the primary. It’s a graceful fallback.
The tradeoff is eventual consistency. A write to the primary might not be replicated to secondaries for a few milliseconds. In CQRS, this is already expected — the query side is behind the command side by the time it takes to publish an event, consume it, and update the projection. A few extra milliseconds of replication lag is negligible.
Consuming events — the Kafka listener
The consumer receives events from the order-events topic and updates projections.
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component
@Component
class OrderEventConsumer(
private val projectionService: OrderProjectionService
) {
private val log = LoggerFactory.getLogger(OrderEventConsumer::class.java)
@KafkaListener(
topics = ["order-events"],
groupId = "order-query-service"
)
fun consume(record: ConsumerRecord<String, OrderEvent>) {
val event = record.value()
log.info("Received event: ${event::class.simpleName} for order ${event.orderId}")
try {
when (event) {
is OrderEvent.OrderPlaced -> projectionService.onOrderPlaced(event)
is OrderEvent.ShippingAddressUpdated -> projectionService.onShippingAddressUpdated(event)
is OrderEvent.OrderCancelled -> projectionService.onOrderCancelled(event)
}
} catch (e: Exception) {
log.error("Failed to process event for order ${event.orderId}", e)
throw e
}
}
}
Note that we re-throw the exception. By default, Spring Kafka will retry and eventually send the record to a dead letter topic (if configured) or stop the consumer. We want failures to be visible, not silently swallowed.
Building projections from events
The projection service translates events into read model updates.
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.math.BigDecimal
import java.time.Instant
@Service
class OrderProjectionService(
private val orderViewRepository: OrderViewRepository
) {
private val log = LoggerFactory.getLogger(OrderProjectionService::class.java)
fun onOrderPlaced(event: OrderEvent.OrderPlaced) {
val orderView = OrderView(
orderId = event.orderId,
customerId = event.customerId,
items = event.items.map { item ->
OrderItemView(
productId = item.productId,
productName = item.productName,
quantity = item.quantity,
unitPrice = item.unitPrice,
lineTotal = item.unitPrice.multiply(BigDecimal(item.quantity))
)
},
shippingAddress = AddressView(
street = event.shippingAddress.street,
city = event.shippingAddress.city,
state = event.shippingAddress.state,
zipCode = event.shippingAddress.zipCode,
country = event.shippingAddress.country
),
status = "PLACED",
totalAmount = event.totalAmount,
itemCount = event.items.sumOf { it.quantity },
createdAt = event.occurredAt,
updatedAt = event.occurredAt
)
orderViewRepository.save(orderView)
log.info("Projection created for order ${event.orderId}")
}
fun onShippingAddressUpdated(event: OrderEvent.ShippingAddressUpdated) {
val existing = orderViewRepository.findById(event.orderId).orElse(null)
if (existing == null) {
log.warn("OrderView not found for address update: ${event.orderId}")
return
}
val updated = existing.copy(
shippingAddress = AddressView(
street = event.newAddress.street,
city = event.newAddress.city,
state = event.newAddress.state,
zipCode = event.newAddress.zipCode,
country = event.newAddress.country
),
updatedAt = event.occurredAt
)
orderViewRepository.save(updated)
log.info("Projection updated for order ${event.orderId} — address changed")
}
fun onOrderCancelled(event: OrderEvent.OrderCancelled) {
val existing = orderViewRepository.findById(event.orderId).orElse(null)
if (existing == null) {
log.warn("OrderView not found for cancellation: ${event.orderId}")
return
}
val updated = existing.copy(
status = "CANCELLED",
cancelledAt = event.occurredAt,
cancellationReason = event.reason,
updatedAt = event.occurredAt
)
orderViewRepository.save(updated)
log.info("Projection updated for order ${event.orderId} — cancelled")
}
}
Each event handler does exactly one thing — update the read model. No business logic. No validation. No side effects. If the event says the order was cancelled, the projection gets a CANCELLED status. The command side already validated that the cancellation was allowed.
Notice the null checks. If we receive a ShippingAddressUpdated event but the OrderView doesn’t exist yet, we log a warning and skip. This can happen if events arrive out of order (unlikely with single-partition-per-order, but defensive coding costs nothing).
The repository:
import org.springframework.data.mongodb.repository.MongoRepository
interface OrderViewRepository : MongoRepository<OrderView, String> {
fun findByCustomerId(customerId: String): List<OrderView>
fun findByStatus(status: String): List<OrderView>
}
Spring Data generates the queries from method names.
REST endpoints — reads only
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/orders")
class OrderQueryController(
private val orderViewRepository: OrderViewRepository,
private val statsService: OrderStatsService
) {
@GetMapping("/{orderId}")
fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderView> {
val order = orderViewRepository.findById(orderId).orElse(null)
?: return ResponseEntity.notFound().build()
return ResponseEntity.ok(order)
}
@GetMapping
fun getOrdersByCustomer(@RequestParam customerId: String): ResponseEntity<List<OrderView>> {
val orders = orderViewRepository.findByCustomerId(customerId)
return ResponseEntity.ok(orders)
}
@GetMapping("/stats")
fun getOrderStats(): ResponseEntity<OrderStats> {
return ResponseEntity.ok(statsService.getStats())
}
}
Three endpoints. Three different use cases:
GET /orders/{id}— single order detail for a customer viewing their order.GET /orders?customerId=X— customer dashboard showing all their orders.GET /orders/stats— operations dashboard showing aggregate numbers.
No POST. No PUT. No DELETE. This service only reads.
Different read models for different consumers
The OrderView works for the customer dashboard. But the operations team needs different data — total revenue, order counts by status, average order value. They don’t care about individual shipping addresses.
Instead of bending OrderView to serve both, build a separate projection.
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document
import java.math.BigDecimal
import java.time.LocalDate
@Document(collection = "order_stats_daily")
data class DailyOrderStats(
@Id val date: LocalDate,
val totalOrders: Long,
val totalRevenue: BigDecimal,
val cancelledOrders: Long,
val averageOrderValue: BigDecimal,
val statusBreakdown: Map<String, Long>
)
The stats service aggregates from this collection:
import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.aggregation.Aggregation
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.stereotype.Service
import java.math.BigDecimal
import java.math.RoundingMode
import java.time.LocalDate
@Service
class OrderStatsService(
private val mongoTemplate: MongoTemplate,
private val orderViewRepository: OrderViewRepository
) {
fun getStats(): OrderStats {
val allOrders = orderViewRepository.findAll()
val totalOrders = allOrders.size.toLong()
val totalRevenue = allOrders.sumOf { it.totalAmount }
val cancelledOrders = allOrders.count { it.status == "CANCELLED" }.toLong()
val averageOrderValue = if (totalOrders > 0) {
totalRevenue.divide(BigDecimal(totalOrders), 2, RoundingMode.HALF_UP)
} else {
BigDecimal.ZERO
}
val statusBreakdown = allOrders.groupBy { it.status }
.mapValues { it.value.size.toLong() }
return OrderStats(
totalOrders = totalOrders,
totalRevenue = totalRevenue,
cancelledOrders = cancelledOrders,
averageOrderValue = averageOrderValue,
statusBreakdown = statusBreakdown
)
}
}
data class OrderStats(
val totalOrders: Long,
val totalRevenue: BigDecimal,
val cancelledOrders: Long,
val averageOrderValue: BigDecimal,
val statusBreakdown: Map<String, Long>
)
In production, you wouldn’t load all orders into memory. You’d pre-compute the DailyOrderStats projection in the event consumer and read directly from that collection. But the pattern is the same — different consumers get different read models, each shaped for its use case.
This is the real payoff of CQRS. The customer dashboard query is a single document lookup by customer ID. The operations dashboard reads pre-aggregated stats. Neither query is compromised by the other’s requirements.
Handling consumer failures
What happens when the consumer crashes halfway through processing an event?
Kafka tracks the consumer’s offset — the position in the partition it has read up to. By default, Spring Kafka auto-commits the offset after the listener method returns. If the method throws an exception, the offset is not committed, and Kafka will re-deliver the message.
This means your projection updates must be idempotent. Processing the same event twice should produce the same result.
Our projections are already idempotent:
OrderPlaced— we save a new document withorderIdas the@Id. Saving it twice overwrites with the same data.ShippingAddressUpdated— we load the existing document, update the address, and save. Doing this twice with the same address produces the same result.OrderCancelled— same pattern. Setting status toCANCELLEDtwice is fine.
If your projections involve counters or increments (like totalOrders += 1), idempotency is harder. You need to track which events have been processed — typically by storing the event’s offset or a unique event ID alongside the projection.
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document
@Document(collection = "processed_events")
data class ProcessedEvent(
@Id val eventId: String,
val processedAt: java.time.Instant = java.time.Instant.now()
)
Before processing an event, check if its ID exists in processed_events. If yes, skip. If no, process and save the ID. This gives you exactly-once semantics at the application level.
Tests
Consumer tests verify that events correctly update projections. Projection tests verify the shape of the read model.
import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import java.math.BigDecimal
import java.time.Instant
import java.util.Optional
class OrderProjectionServiceTest {
private lateinit var orderViewRepository: OrderViewRepository
private lateinit var projectionService: OrderProjectionService
@BeforeEach
fun setUp() {
orderViewRepository = mockk()
projectionService = OrderProjectionService(orderViewRepository)
}
@Nested
inner class OnOrderPlacedTests {
@Test
fun `should create projection with pre-computed fields`() {
val event = OrderEvent.OrderPlaced(
orderId = "order-1",
customerId = "cust-1",
items = listOf(
OrderItemData("prod-1", "Widget", 2, BigDecimal("10.00")),
OrderItemData("prod-2", "Gadget", 1, BigDecimal("25.00"))
),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
totalAmount = BigDecimal("45.00"),
occurredAt = Instant.parse("2025-06-01T10:00:00Z")
)
val savedSlot = slot<OrderView>()
every { orderViewRepository.save(capture(savedSlot)) } answers { savedSlot.captured }
projectionService.onOrderPlaced(event)
val saved = savedSlot.captured
assertEquals("order-1", saved.orderId)
assertEquals(3, saved.itemCount)
assertEquals("PLACED", saved.status)
assertEquals(BigDecimal("20.00"), saved.items[0].lineTotal)
assertEquals(BigDecimal("25.00"), saved.items[1].lineTotal)
}
@Test
fun `should be idempotent — processing same event twice produces same result`() {
val event = OrderEvent.OrderPlaced(
orderId = "order-1",
customerId = "cust-1",
items = listOf(
OrderItemData("prod-1", "Widget", 1, BigDecimal("10.00"))
),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
totalAmount = BigDecimal("10.00")
)
val savedSlot = mutableListOf<OrderView>()
every { orderViewRepository.save(capture(savedSlot)) } answers { savedSlot.last() }
projectionService.onOrderPlaced(event)
projectionService.onOrderPlaced(event)
assertEquals(savedSlot[0].orderId, savedSlot[1].orderId)
assertEquals(savedSlot[0].totalAmount, savedSlot[1].totalAmount)
assertEquals(savedSlot[0].status, savedSlot[1].status)
}
}
@Nested
inner class OnShippingAddressUpdatedTests {
@Test
fun `should update address in projection`() {
val existingView = OrderView(
orderId = "order-1",
customerId = "cust-1",
items = listOf(
OrderItemView("prod-1", "Widget", 1, BigDecimal("10.00"), BigDecimal("10.00"))
),
shippingAddress = AddressView("123 Main St", "Springfield", "IL", "62701", "US"),
status = "PLACED",
totalAmount = BigDecimal("10.00"),
itemCount = 1,
createdAt = Instant.parse("2025-06-01T10:00:00Z"),
updatedAt = Instant.parse("2025-06-01T10:00:00Z")
)
every { orderViewRepository.findById("order-1") } returns Optional.of(existingView)
every { orderViewRepository.save(any()) } answers { firstArg() }
val event = OrderEvent.ShippingAddressUpdated(
orderId = "order-1",
oldAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
newAddress = Address("456 Oak Ave", "Chicago", "IL", "60601", "US"),
occurredAt = Instant.parse("2025-06-02T10:00:00Z")
)
projectionService.onShippingAddressUpdated(event)
val savedSlot = slot<OrderView>()
verify { orderViewRepository.save(capture(savedSlot)) }
assertEquals("Chicago", savedSlot.captured.shippingAddress.city)
assertEquals(Instant.parse("2025-06-02T10:00:00Z"), savedSlot.captured.updatedAt)
}
@Test
fun `should handle missing projection gracefully`() {
every { orderViewRepository.findById("order-999") } returns Optional.empty()
val event = OrderEvent.ShippingAddressUpdated(
orderId = "order-999",
oldAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
newAddress = Address("456 Oak Ave", "Chicago", "IL", "60601", "US")
)
// Should not throw — just logs a warning
projectionService.onShippingAddressUpdated(event)
verify(exactly = 0) { orderViewRepository.save(any()) }
}
}
@Nested
inner class OnOrderCancelledTests {
@Test
fun `should mark projection as cancelled with reason`() {
val existingView = OrderView(
orderId = "order-1",
customerId = "cust-1",
items = listOf(
OrderItemView("prod-1", "Widget", 1, BigDecimal("10.00"), BigDecimal("10.00"))
),
shippingAddress = AddressView("123 Main St", "Springfield", "IL", "62701", "US"),
status = "PLACED",
totalAmount = BigDecimal("10.00"),
itemCount = 1,
createdAt = Instant.parse("2025-06-01T10:00:00Z"),
updatedAt = Instant.parse("2025-06-01T10:00:00Z")
)
every { orderViewRepository.findById("order-1") } returns Optional.of(existingView)
every { orderViewRepository.save(any()) } answers { firstArg() }
val event = OrderEvent.OrderCancelled(
orderId = "order-1",
reason = "Customer changed their mind",
occurredAt = Instant.parse("2025-06-03T10:00:00Z")
)
projectionService.onOrderCancelled(event)
val savedSlot = slot<OrderView>()
verify { orderViewRepository.save(capture(savedSlot)) }
assertEquals("CANCELLED", savedSlot.captured.status)
assertEquals("Customer changed their mind", savedSlot.captured.cancellationReason)
assertEquals(Instant.parse("2025-06-03T10:00:00Z"), savedSlot.captured.cancelledAt)
}
}
}
These tests cover:
- Projection creation —
OrderPlacedbuilds a view with pre-computeditemCountandlineTotal. - Idempotency — processing the same
OrderPlacedevent twice produces identical results. - Projection updates — address changes and cancellations modify the right fields.
- Missing projections — if a view doesn’t exist yet, the update is skipped gracefully.
- Timestamp tracking —
updatedAtreflects the event’s timestamp, not the processing time.
Project structure
order-query-service/
├── src/main/kotlin/com/example/orders/query/
│ ├── OrderQueryApplication.kt
│ ├── model/
│ │ ├── OrderView.kt
│ │ ├── DailyOrderStats.kt
│ │ └── OrderStats.kt
│ ├── event/
│ │ └── OrderEvent.kt
│ ├── consumer/
│ │ └── OrderEventConsumer.kt
│ ├── projection/
│ │ └── OrderProjectionService.kt
│ ├── repository/
│ │ └── OrderViewRepository.kt
│ ├── service/
│ │ └── OrderStatsService.kt
│ └── controller/
│ └── OrderQueryController.kt
└── src/test/kotlin/com/example/orders/query/
└── projection/
└── OrderProjectionServiceTest.kt
Notice the event/ package. Yes, the query service has its own copy of OrderEvent. Same fields, different package. This is intentional. The command and query services should be independently deployable. If they share a library, deploying one requires deploying the other. That defeats the purpose.
In practice, you might share event schemas through a schema registry (like Confluent Schema Registry with Avro or Protobuf). But at the code level, each service owns its own representation.
The full picture so far
┌──────────┐ ┌────────────────┐ ┌──────────┐ ┌────────────────┐ ┌──────────┐
│ │ │ Command │ │ │ │ Query │ │ │
│ Client │────▶│ Service │────▶│ Kafka │────▶│ Service │────▶│ Client │
│ (writes) │ │ (8080) │ │ │ │ (8081) │ │ (reads) │
│ │ │ │ │ │ │ │ │ │
└──────────┘ └───────┬────────┘ └──────────┘ └───────┬────────┘ └──────────┘
│ │
▼ ▼
┌───────────────┐ ┌───────────────┐
│ MongoDB │ │ MongoDB │
│ (primary) │ │ (secondaries) │
│ w:majority │ │ readPref: │
│ │ │ secondary │
└───────────────┘ └───────────────┘
Two services. One Kafka topic. One MongoDB cluster with different access patterns. The command side writes to the primary with durability guarantees. The query side reads from secondaries for scale. Events flow through Kafka to keep them in sync.
What’s next
In Part 4, we’ll tackle the hard parts. What happens when the consumer falls behind? How do you handle schema evolution when events change? What about the gap between when a customer places an order and when it shows up on the dashboard? Eventual consistency is easy to explain and hard to get right. We’ll dig into the real-world problems and practical solutions.