PCSalt
YouTube GitHub
Back to Architecture
Architecture · 6 min read

CQRS with Spring Boot, Kafka & MongoDB — Part 3: Query Side — Reads at Scale

Building the query service — consuming Kafka events, building read-optimized projections, and serving fast queries from MongoDB.


This is Part 3 of a series on building a CQRS architecture with Spring Boot, Kafka, and MongoDB.

  1. What is CQRS and why you need it
  2. Command side — writes done right
  3. Query side — reads at scale (this post)
  4. The hard parts
  5. Putting it all together

A separate service, not a module

The query service is a completely separate Spring Boot application. Different codebase. Different deployment. Different MongoDB database (or at least different collections). It doesn’t share code with the command service — not even domain classes.

Why this strict separation? Because the read model and write model serve different purposes. The moment you share an Order class between them, you’ve coupled their evolution. The write model needs to change for business rules. The read model needs to change for UI requirements. Shared code means one change breaks both.

┌──────────┐                        ┌──────────────────────────────────────┐
│          │                        │          Query Service               │
│  Kafka   │───────────────────────▶│  Consumer → Projection → MongoDB    │
│ (events) │                        │                                      │
└──────────┘                        └──────────────┬───────────────────────┘

                                    ┌──────────────▼───────────────────────┐
                                    │        REST API (reads only)         │
                                    │  GET /orders/{id}                    │
                                    │  GET /orders?customerId=X            │
                                    │  GET /orders/stats                   │
                                    └──────────────┬───────────────────────┘

                                    ┌──────────────▼───────────────────────┐
                                    │           Client (dashboard)         │
                                    └──────────────────────────────────────┘

Project setup

Same Spring Initializr recipe as the command service. Spring Web, Spring Data MongoDB, Spring for Apache Kafka.

application.yml for the query service:

server:
  port: 8081

spring:
  data:
    mongodb:
      uri: mongodb://localhost:27017/orders_read?readPreference=secondaryPreferred
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: order-query-service
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: earliest
      properties:
        spring.json.trusted.packages: "com.example.orders.command.event"
        spring.json.use.type.headers: true

Port 8081 because the command service is on 8080. In production they’d be on separate hosts, but for local development you need different ports.

The important bits:

  • readPreference=secondaryPreferred — reads go to secondary nodes when available, falling back to primary.
  • group-id: order-query-service — Kafka consumer group. All instances of the query service share this group and split partition processing.
  • auto-offset-reset: earliest — if this is a fresh consumer group, start from the beginning of the topic. You want every event, not just new ones.

Read-optimized projections

The command service stores an Order with IDs referencing products and customers. The query service stores an OrderView — a denormalized document ready to serve the UI without any joins.

import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.index.Indexed
import org.springframework.data.mongodb.core.mapping.Document
import java.math.BigDecimal
import java.time.Instant

@Document(collection = "order_views")
data class OrderView(
  @Id val orderId: String,
  @Indexed val customerId: String,
  val items: List<OrderItemView>,
  val shippingAddress: AddressView,
  val status: String,
  val totalAmount: BigDecimal,
  val itemCount: Int,
  val createdAt: Instant,
  val updatedAt: Instant,
  val cancelledAt: Instant? = null,
  val cancellationReason: String? = null
)

data class OrderItemView(
  val productId: String,
  val productName: String,
  val quantity: Int,
  val unitPrice: BigDecimal,
  val lineTotal: BigDecimal
)

data class AddressView(
  val street: String,
  val city: String,
  val state: String,
  val zipCode: String,
  val country: String
)

Compare this to the write model:

  • itemCount — pre-computed so the dashboard doesn’t count items on every render.
  • lineTotal on each item — pre-computed quantity * unitPrice so the UI doesn’t do math.
  • customerId is indexed — the dashboard queries by customer. The write model might not need this index.
  • cancellationReason and cancelledAt — flattened into the document instead of being in a separate audit log.
  • status is a String, not an enum — the query side doesn’t need to validate status values. It just stores whatever the event says.

This is the power of separate read models. Shape the data for the consumer, not for the business rules.

MongoDB read preference — secondaryPreferred

In Part 2, the command service wrote with w: majority to ensure durability. Here, the query service reads from secondaries.

Why secondaries? Because the query service handles the bulk of the traffic. Dashboard pages, search results, reports. All reads. Directing this traffic to secondaries keeps the primary free for writes.

secondaryPreferred means: read from a secondary if one is available. If all secondaries are down, read from the primary. It’s a graceful fallback.

The tradeoff is eventual consistency. A write to the primary might not be replicated to secondaries for a few milliseconds. In CQRS, this is already expected — the query side is behind the command side by the time it takes to publish an event, consume it, and update the projection. A few extra milliseconds of replication lag is negligible.

Consuming events — the Kafka listener

The consumer receives events from the order-events topic and updates projections.

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Component

@Component
class OrderEventConsumer(
  private val projectionService: OrderProjectionService
) {

  private val log = LoggerFactory.getLogger(OrderEventConsumer::class.java)

  @KafkaListener(
    topics = ["order-events"],
    groupId = "order-query-service"
  )
  fun consume(record: ConsumerRecord<String, OrderEvent>) {
    val event = record.value()
    log.info("Received event: ${event::class.simpleName} for order ${event.orderId}")

    try {
      when (event) {
        is OrderEvent.OrderPlaced -> projectionService.onOrderPlaced(event)
        is OrderEvent.ShippingAddressUpdated -> projectionService.onShippingAddressUpdated(event)
        is OrderEvent.OrderCancelled -> projectionService.onOrderCancelled(event)
      }
    } catch (e: Exception) {
      log.error("Failed to process event for order ${event.orderId}", e)
      throw e
    }
  }
}

Note that we re-throw the exception. By default, Spring Kafka will retry and eventually send the record to a dead letter topic (if configured) or stop the consumer. We want failures to be visible, not silently swallowed.

Building projections from events

The projection service translates events into read model updates.

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import java.math.BigDecimal
import java.time.Instant

@Service
class OrderProjectionService(
  private val orderViewRepository: OrderViewRepository
) {

  private val log = LoggerFactory.getLogger(OrderProjectionService::class.java)

  fun onOrderPlaced(event: OrderEvent.OrderPlaced) {
    val orderView = OrderView(
      orderId = event.orderId,
      customerId = event.customerId,
      items = event.items.map { item ->
        OrderItemView(
          productId = item.productId,
          productName = item.productName,
          quantity = item.quantity,
          unitPrice = item.unitPrice,
          lineTotal = item.unitPrice.multiply(BigDecimal(item.quantity))
        )
      },
      shippingAddress = AddressView(
        street = event.shippingAddress.street,
        city = event.shippingAddress.city,
        state = event.shippingAddress.state,
        zipCode = event.shippingAddress.zipCode,
        country = event.shippingAddress.country
      ),
      status = "PLACED",
      totalAmount = event.totalAmount,
      itemCount = event.items.sumOf { it.quantity },
      createdAt = event.occurredAt,
      updatedAt = event.occurredAt
    )

    orderViewRepository.save(orderView)
    log.info("Projection created for order ${event.orderId}")
  }

  fun onShippingAddressUpdated(event: OrderEvent.ShippingAddressUpdated) {
    val existing = orderViewRepository.findById(event.orderId).orElse(null)

    if (existing == null) {
      log.warn("OrderView not found for address update: ${event.orderId}")
      return
    }

    val updated = existing.copy(
      shippingAddress = AddressView(
        street = event.newAddress.street,
        city = event.newAddress.city,
        state = event.newAddress.state,
        zipCode = event.newAddress.zipCode,
        country = event.newAddress.country
      ),
      updatedAt = event.occurredAt
    )

    orderViewRepository.save(updated)
    log.info("Projection updated for order ${event.orderId} — address changed")
  }

  fun onOrderCancelled(event: OrderEvent.OrderCancelled) {
    val existing = orderViewRepository.findById(event.orderId).orElse(null)

    if (existing == null) {
      log.warn("OrderView not found for cancellation: ${event.orderId}")
      return
    }

    val updated = existing.copy(
      status = "CANCELLED",
      cancelledAt = event.occurredAt,
      cancellationReason = event.reason,
      updatedAt = event.occurredAt
    )

    orderViewRepository.save(updated)
    log.info("Projection updated for order ${event.orderId} — cancelled")
  }
}

Each event handler does exactly one thing — update the read model. No business logic. No validation. No side effects. If the event says the order was cancelled, the projection gets a CANCELLED status. The command side already validated that the cancellation was allowed.

Notice the null checks. If we receive a ShippingAddressUpdated event but the OrderView doesn’t exist yet, we log a warning and skip. This can happen if events arrive out of order (unlikely with single-partition-per-order, but defensive coding costs nothing).

The repository:

import org.springframework.data.mongodb.repository.MongoRepository

interface OrderViewRepository : MongoRepository<OrderView, String> {

  fun findByCustomerId(customerId: String): List<OrderView>

  fun findByStatus(status: String): List<OrderView>
}

Spring Data generates the queries from method names.

REST endpoints — reads only

import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/orders")
class OrderQueryController(
  private val orderViewRepository: OrderViewRepository,
  private val statsService: OrderStatsService
) {

  @GetMapping("/{orderId}")
  fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderView> {
    val order = orderViewRepository.findById(orderId).orElse(null)
      ?: return ResponseEntity.notFound().build()
    return ResponseEntity.ok(order)
  }

  @GetMapping
  fun getOrdersByCustomer(@RequestParam customerId: String): ResponseEntity<List<OrderView>> {
    val orders = orderViewRepository.findByCustomerId(customerId)
    return ResponseEntity.ok(orders)
  }

  @GetMapping("/stats")
  fun getOrderStats(): ResponseEntity<OrderStats> {
    return ResponseEntity.ok(statsService.getStats())
  }
}

Three endpoints. Three different use cases:

  1. GET /orders/{id} — single order detail for a customer viewing their order.
  2. GET /orders?customerId=X — customer dashboard showing all their orders.
  3. GET /orders/stats — operations dashboard showing aggregate numbers.

No POST. No PUT. No DELETE. This service only reads.

Different read models for different consumers

The OrderView works for the customer dashboard. But the operations team needs different data — total revenue, order counts by status, average order value. They don’t care about individual shipping addresses.

Instead of bending OrderView to serve both, build a separate projection.

import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document
import java.math.BigDecimal
import java.time.LocalDate

@Document(collection = "order_stats_daily")
data class DailyOrderStats(
  @Id val date: LocalDate,
  val totalOrders: Long,
  val totalRevenue: BigDecimal,
  val cancelledOrders: Long,
  val averageOrderValue: BigDecimal,
  val statusBreakdown: Map<String, Long>
)

The stats service aggregates from this collection:

import org.springframework.data.mongodb.core.MongoTemplate
import org.springframework.data.mongodb.core.aggregation.Aggregation
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.stereotype.Service
import java.math.BigDecimal
import java.math.RoundingMode
import java.time.LocalDate

@Service
class OrderStatsService(
  private val mongoTemplate: MongoTemplate,
  private val orderViewRepository: OrderViewRepository
) {

  fun getStats(): OrderStats {
    val allOrders = orderViewRepository.findAll()

    val totalOrders = allOrders.size.toLong()
    val totalRevenue = allOrders.sumOf { it.totalAmount }
    val cancelledOrders = allOrders.count { it.status == "CANCELLED" }.toLong()
    val averageOrderValue = if (totalOrders > 0) {
      totalRevenue.divide(BigDecimal(totalOrders), 2, RoundingMode.HALF_UP)
    } else {
      BigDecimal.ZERO
    }

    val statusBreakdown = allOrders.groupBy { it.status }
      .mapValues { it.value.size.toLong() }

    return OrderStats(
      totalOrders = totalOrders,
      totalRevenue = totalRevenue,
      cancelledOrders = cancelledOrders,
      averageOrderValue = averageOrderValue,
      statusBreakdown = statusBreakdown
    )
  }
}

data class OrderStats(
  val totalOrders: Long,
  val totalRevenue: BigDecimal,
  val cancelledOrders: Long,
  val averageOrderValue: BigDecimal,
  val statusBreakdown: Map<String, Long>
)

In production, you wouldn’t load all orders into memory. You’d pre-compute the DailyOrderStats projection in the event consumer and read directly from that collection. But the pattern is the same — different consumers get different read models, each shaped for its use case.

This is the real payoff of CQRS. The customer dashboard query is a single document lookup by customer ID. The operations dashboard reads pre-aggregated stats. Neither query is compromised by the other’s requirements.

Handling consumer failures

What happens when the consumer crashes halfway through processing an event?

Kafka tracks the consumer’s offset — the position in the partition it has read up to. By default, Spring Kafka auto-commits the offset after the listener method returns. If the method throws an exception, the offset is not committed, and Kafka will re-deliver the message.

This means your projection updates must be idempotent. Processing the same event twice should produce the same result.

Our projections are already idempotent:

  • OrderPlaced — we save a new document with orderId as the @Id. Saving it twice overwrites with the same data.
  • ShippingAddressUpdated — we load the existing document, update the address, and save. Doing this twice with the same address produces the same result.
  • OrderCancelled — same pattern. Setting status to CANCELLED twice is fine.

If your projections involve counters or increments (like totalOrders += 1), idempotency is harder. You need to track which events have been processed — typically by storing the event’s offset or a unique event ID alongside the projection.

import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document

@Document(collection = "processed_events")
data class ProcessedEvent(
  @Id val eventId: String,
  val processedAt: java.time.Instant = java.time.Instant.now()
)

Before processing an event, check if its ID exists in processed_events. If yes, skip. If no, process and save the ID. This gives you exactly-once semantics at the application level.

Tests

Consumer tests verify that events correctly update projections. Projection tests verify the shape of the read model.

import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import java.math.BigDecimal
import java.time.Instant
import java.util.Optional

class OrderProjectionServiceTest {

  private lateinit var orderViewRepository: OrderViewRepository
  private lateinit var projectionService: OrderProjectionService

  @BeforeEach
  fun setUp() {
    orderViewRepository = mockk()
    projectionService = OrderProjectionService(orderViewRepository)
  }

  @Nested
  inner class OnOrderPlacedTests {

    @Test
    fun `should create projection with pre-computed fields`() {
      val event = OrderEvent.OrderPlaced(
        orderId = "order-1",
        customerId = "cust-1",
        items = listOf(
          OrderItemData("prod-1", "Widget", 2, BigDecimal("10.00")),
          OrderItemData("prod-2", "Gadget", 1, BigDecimal("25.00"))
        ),
        shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
        totalAmount = BigDecimal("45.00"),
        occurredAt = Instant.parse("2025-06-01T10:00:00Z")
      )

      val savedSlot = slot<OrderView>()
      every { orderViewRepository.save(capture(savedSlot)) } answers { savedSlot.captured }

      projectionService.onOrderPlaced(event)

      val saved = savedSlot.captured
      assertEquals("order-1", saved.orderId)
      assertEquals(3, saved.itemCount)
      assertEquals("PLACED", saved.status)
      assertEquals(BigDecimal("20.00"), saved.items[0].lineTotal)
      assertEquals(BigDecimal("25.00"), saved.items[1].lineTotal)
    }

    @Test
    fun `should be idempotent — processing same event twice produces same result`() {
      val event = OrderEvent.OrderPlaced(
        orderId = "order-1",
        customerId = "cust-1",
        items = listOf(
          OrderItemData("prod-1", "Widget", 1, BigDecimal("10.00"))
        ),
        shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
        totalAmount = BigDecimal("10.00")
      )

      val savedSlot = mutableListOf<OrderView>()
      every { orderViewRepository.save(capture(savedSlot)) } answers { savedSlot.last() }

      projectionService.onOrderPlaced(event)
      projectionService.onOrderPlaced(event)

      assertEquals(savedSlot[0].orderId, savedSlot[1].orderId)
      assertEquals(savedSlot[0].totalAmount, savedSlot[1].totalAmount)
      assertEquals(savedSlot[0].status, savedSlot[1].status)
    }
  }

  @Nested
  inner class OnShippingAddressUpdatedTests {

    @Test
    fun `should update address in projection`() {
      val existingView = OrderView(
        orderId = "order-1",
        customerId = "cust-1",
        items = listOf(
          OrderItemView("prod-1", "Widget", 1, BigDecimal("10.00"), BigDecimal("10.00"))
        ),
        shippingAddress = AddressView("123 Main St", "Springfield", "IL", "62701", "US"),
        status = "PLACED",
        totalAmount = BigDecimal("10.00"),
        itemCount = 1,
        createdAt = Instant.parse("2025-06-01T10:00:00Z"),
        updatedAt = Instant.parse("2025-06-01T10:00:00Z")
      )

      every { orderViewRepository.findById("order-1") } returns Optional.of(existingView)
      every { orderViewRepository.save(any()) } answers { firstArg() }

      val event = OrderEvent.ShippingAddressUpdated(
        orderId = "order-1",
        oldAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
        newAddress = Address("456 Oak Ave", "Chicago", "IL", "60601", "US"),
        occurredAt = Instant.parse("2025-06-02T10:00:00Z")
      )

      projectionService.onShippingAddressUpdated(event)

      val savedSlot = slot<OrderView>()
      verify { orderViewRepository.save(capture(savedSlot)) }
      assertEquals("Chicago", savedSlot.captured.shippingAddress.city)
      assertEquals(Instant.parse("2025-06-02T10:00:00Z"), savedSlot.captured.updatedAt)
    }

    @Test
    fun `should handle missing projection gracefully`() {
      every { orderViewRepository.findById("order-999") } returns Optional.empty()

      val event = OrderEvent.ShippingAddressUpdated(
        orderId = "order-999",
        oldAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
        newAddress = Address("456 Oak Ave", "Chicago", "IL", "60601", "US")
      )

      // Should not throw — just logs a warning
      projectionService.onShippingAddressUpdated(event)

      verify(exactly = 0) { orderViewRepository.save(any()) }
    }
  }

  @Nested
  inner class OnOrderCancelledTests {

    @Test
    fun `should mark projection as cancelled with reason`() {
      val existingView = OrderView(
        orderId = "order-1",
        customerId = "cust-1",
        items = listOf(
          OrderItemView("prod-1", "Widget", 1, BigDecimal("10.00"), BigDecimal("10.00"))
        ),
        shippingAddress = AddressView("123 Main St", "Springfield", "IL", "62701", "US"),
        status = "PLACED",
        totalAmount = BigDecimal("10.00"),
        itemCount = 1,
        createdAt = Instant.parse("2025-06-01T10:00:00Z"),
        updatedAt = Instant.parse("2025-06-01T10:00:00Z")
      )

      every { orderViewRepository.findById("order-1") } returns Optional.of(existingView)
      every { orderViewRepository.save(any()) } answers { firstArg() }

      val event = OrderEvent.OrderCancelled(
        orderId = "order-1",
        reason = "Customer changed their mind",
        occurredAt = Instant.parse("2025-06-03T10:00:00Z")
      )

      projectionService.onOrderCancelled(event)

      val savedSlot = slot<OrderView>()
      verify { orderViewRepository.save(capture(savedSlot)) }
      assertEquals("CANCELLED", savedSlot.captured.status)
      assertEquals("Customer changed their mind", savedSlot.captured.cancellationReason)
      assertEquals(Instant.parse("2025-06-03T10:00:00Z"), savedSlot.captured.cancelledAt)
    }
  }
}

These tests cover:

  • Projection creationOrderPlaced builds a view with pre-computed itemCount and lineTotal.
  • Idempotency — processing the same OrderPlaced event twice produces identical results.
  • Projection updates — address changes and cancellations modify the right fields.
  • Missing projections — if a view doesn’t exist yet, the update is skipped gracefully.
  • Timestamp trackingupdatedAt reflects the event’s timestamp, not the processing time.

Project structure

order-query-service/
├── src/main/kotlin/com/example/orders/query/
│   ├── OrderQueryApplication.kt
│   ├── model/
│   │   ├── OrderView.kt
│   │   ├── DailyOrderStats.kt
│   │   └── OrderStats.kt
│   ├── event/
│   │   └── OrderEvent.kt
│   ├── consumer/
│   │   └── OrderEventConsumer.kt
│   ├── projection/
│   │   └── OrderProjectionService.kt
│   ├── repository/
│   │   └── OrderViewRepository.kt
│   ├── service/
│   │   └── OrderStatsService.kt
│   └── controller/
│       └── OrderQueryController.kt
└── src/test/kotlin/com/example/orders/query/
    └── projection/
        └── OrderProjectionServiceTest.kt

Notice the event/ package. Yes, the query service has its own copy of OrderEvent. Same fields, different package. This is intentional. The command and query services should be independently deployable. If they share a library, deploying one requires deploying the other. That defeats the purpose.

In practice, you might share event schemas through a schema registry (like Confluent Schema Registry with Avro or Protobuf). But at the code level, each service owns its own representation.

The full picture so far

┌──────────┐     ┌────────────────┐     ┌──────────┐     ┌────────────────┐     ┌──────────┐
│          │     │   Command      │     │          │     │   Query        │     │          │
│  Client  │────▶│   Service      │────▶│  Kafka   │────▶│   Service      │────▶│  Client  │
│ (writes) │     │   (8080)       │     │          │     │   (8081)       │     │ (reads)  │
│          │     │                │     │          │     │                │     │          │
└──────────┘     └───────┬────────┘     └──────────┘     └───────┬────────┘     └──────────┘
                         │                                       │
                         ▼                                       ▼
                 ┌───────────────┐                       ┌───────────────┐
                 │   MongoDB     │                       │   MongoDB     │
                 │   (primary)   │                       │ (secondaries) │
                 │   w:majority  │                       │ readPref:     │
                 │               │                       │ secondary     │
                 └───────────────┘                       └───────────────┘

Two services. One Kafka topic. One MongoDB cluster with different access patterns. The command side writes to the primary with durability guarantees. The query side reads from secondaries for scale. Events flow through Kafka to keep them in sync.

What’s next

In Part 4, we’ll tackle the hard parts. What happens when the consumer falls behind? How do you handle schema evolution when events change? What about the gap between when a customer places an order and when it shows up on the dashboard? Eventual consistency is easy to explain and hard to get right. We’ll dig into the real-world problems and practical solutions.