CQRS with Spring Boot, Kafka & MongoDB — Part 2: Command Side — Writes Done Right
Building the command service — accepting orders, validating business rules, persisting to MongoDB, and publishing domain events to Kafka.
This is Part 2 of a series on building a CQRS architecture with Spring Boot, Kafka, and MongoDB.
- What is CQRS and why you need it
- Command side — writes done right (this post)
- Query side — reads at scale
- The hard parts
- Putting it all together
What we’re building
The command service. It does exactly three things:
- Accept a command (place order, update address, cancel order)
- Validate and persist to MongoDB
- Publish an event to Kafka saying what happened
No read APIs. No dashboards. No aggregation queries. Just writes.
┌──────────┐ ┌────────────────────────────────────────────┐ ┌──────────┐
│ │ │ Command Service │ │ │
│ Client │────▶│ Controller → Handler → MongoDB + Kafka │────▶│ Kafka │
│ │ │ │ │ │
└──────────┘ └────────────────────────────────────────────┘ └──────────┘
Project setup
Start with Spring Initializr. Select:
- Language: Kotlin
- Dependencies: Spring Web, Spring Data MongoDB, Spring for Apache Kafka
Your build.gradle.kts should have these dependencies:
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-data-mongodb")
implementation("org.springframework.kafka:spring-kafka")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("io.mockk:mockk:1.13.10")
}
Nothing unusual here. Jackson Kotlin module so Kotlin data classes serialize properly. MockK because Mockito and Kotlin don’t mix well.
Your application.yml:
spring:
data:
mongodb:
uri: mongodb://localhost:27017/orders_write
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
We’ll come back to why acks: all and idempotent producer matter. First, the domain.
The domain
An order has an ID, a customer, items, a shipping address, and a status. Nothing fancy.
import org.springframework.data.annotation.Id
import org.springframework.data.mongodb.core.mapping.Document
import java.math.BigDecimal
import java.time.Instant
@Document(collection = "orders")
data class Order(
@Id val id: String? = null,
val customerId: String,
val items: List<OrderItem>,
val shippingAddress: Address,
val status: OrderStatus,
val totalAmount: BigDecimal,
val createdAt: Instant = Instant.now(),
val updatedAt: Instant = Instant.now()
)
data class OrderItem(
val productId: String,
val productName: String,
val quantity: Int,
val unitPrice: BigDecimal
)
data class Address(
val street: String,
val city: String,
val state: String,
val zipCode: String,
val country: String
)
enum class OrderStatus {
PLACED,
CONFIRMED,
SHIPPED,
DELIVERED,
CANCELLED
}
This is the write model. It’s normalized, validated, and represents the true state of an order. The query side will have its own model shaped differently — but that’s Part 3.
Commands — what the outside world can ask for
Commands are requests to change state. They carry all the data needed to perform the change, and they can be rejected.
import java.math.BigDecimal
sealed class OrderCommand {
data class PlaceOrder(
val customerId: String,
val items: List<OrderItemRequest>,
val shippingAddress: Address
) : OrderCommand()
data class UpdateShippingAddress(
val orderId: String,
val newAddress: Address
) : OrderCommand()
data class CancelOrder(
val orderId: String,
val reason: String
) : OrderCommand()
}
data class OrderItemRequest(
val productId: String,
val productName: String,
val quantity: Int,
val unitPrice: BigDecimal
)
Kotlin sealed classes are perfect for this. The compiler enforces that every when branch handles all command types. Miss one and it won’t compile.
Notice PlaceOrder uses OrderItemRequest instead of OrderItem directly. Commands represent input from the outside world. Domain entities represent internal state. Keep them separate — even if they look identical today, they’ll diverge.
Domain events — what happened
Events are facts. A command says “I want this to happen.” An event says “this happened.” Commands can be rejected. Events cannot.
import java.math.BigDecimal
import java.time.Instant
sealed class OrderEvent {
abstract val orderId: String
abstract val occurredAt: Instant
data class OrderPlaced(
override val orderId: String,
val customerId: String,
val items: List<OrderItemData>,
val shippingAddress: Address,
val totalAmount: BigDecimal,
override val occurredAt: Instant = Instant.now()
) : OrderEvent()
data class ShippingAddressUpdated(
override val orderId: String,
val oldAddress: Address,
val newAddress: Address,
override val occurredAt: Instant = Instant.now()
) : OrderEvent()
data class OrderCancelled(
override val orderId: String,
val reason: String,
override val occurredAt: Instant = Instant.now()
) : OrderEvent()
}
data class OrderItemData(
val productId: String,
val productName: String,
val quantity: Int,
val unitPrice: BigDecimal
)
Each event carries enough data for any consumer to do its job without calling back to the command service. OrderPlaced includes items, address, and total — the query service needs all of it to build its projection.
ShippingAddressUpdated includes both old and new address. The old address isn’t strictly necessary for the query side, but it’s useful for audit logs and debugging. Include it.
The command handler
This is where the business logic lives. Validate the command. Apply the rules. Persist. Publish.
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import java.math.BigDecimal
@Service
class OrderCommandHandler(
private val orderRepository: OrderRepository,
private val kafkaTemplate: KafkaTemplate<String, OrderEvent>
) {
fun handle(command: OrderCommand): Order {
return when (command) {
is OrderCommand.PlaceOrder -> handlePlaceOrder(command)
is OrderCommand.UpdateShippingAddress -> handleUpdateAddress(command)
is OrderCommand.CancelOrder -> handleCancelOrder(command)
}
}
private fun handlePlaceOrder(command: OrderCommand.PlaceOrder): Order {
require(command.items.isNotEmpty()) { "Order must have at least one item" }
require(command.items.all { it.quantity > 0 }) { "Item quantity must be positive" }
require(command.items.all { it.unitPrice > BigDecimal.ZERO }) { "Item price must be positive" }
val totalAmount = command.items.sumOf {
it.unitPrice.multiply(BigDecimal(it.quantity))
}
val order = Order(
customerId = command.customerId,
items = command.items.map { item ->
OrderItem(
productId = item.productId,
productName = item.productName,
quantity = item.quantity,
unitPrice = item.unitPrice
)
},
shippingAddress = command.shippingAddress,
status = OrderStatus.PLACED,
totalAmount = totalAmount
)
val savedOrder = orderRepository.save(order)
val event = OrderEvent.OrderPlaced(
orderId = savedOrder.id!!,
customerId = savedOrder.customerId,
items = savedOrder.items.map { item ->
OrderItemData(
productId = item.productId,
productName = item.productName,
quantity = item.quantity,
unitPrice = item.unitPrice
)
},
shippingAddress = savedOrder.shippingAddress,
totalAmount = savedOrder.totalAmount
)
kafkaTemplate.send("order-events", savedOrder.id, event)
return savedOrder
}
private fun handleUpdateAddress(command: OrderCommand.UpdateShippingAddress): Order {
val order = orderRepository.findById(command.orderId)
.orElseThrow { IllegalArgumentException("Order not found: ${command.orderId}") }
require(order.status == OrderStatus.PLACED || order.status == OrderStatus.CONFIRMED) {
"Cannot update address for order in status: ${order.status}"
}
val oldAddress = order.shippingAddress
val updatedOrder = orderRepository.save(
order.copy(
shippingAddress = command.newAddress,
updatedAt = java.time.Instant.now()
)
)
val event = OrderEvent.ShippingAddressUpdated(
orderId = updatedOrder.id!!,
oldAddress = oldAddress,
newAddress = command.newAddress
)
kafkaTemplate.send("order-events", updatedOrder.id, event)
return updatedOrder
}
private fun handleCancelOrder(command: OrderCommand.CancelOrder): Order {
val order = orderRepository.findById(command.orderId)
.orElseThrow { IllegalArgumentException("Order not found: ${command.orderId}") }
require(order.status != OrderStatus.CANCELLED) {
"Order is already cancelled"
}
require(order.status != OrderStatus.DELIVERED) {
"Cannot cancel a delivered order"
}
val cancelledOrder = orderRepository.save(
order.copy(
status = OrderStatus.CANCELLED,
updatedAt = java.time.Instant.now()
)
)
val event = OrderEvent.OrderCancelled(
orderId = cancelledOrder.id!!,
reason = command.reason
)
kafkaTemplate.send("order-events", cancelledOrder.id, event)
return cancelledOrder
}
}
A few things to notice:
- Validation happens before persistence. If the command is invalid, we throw immediately. No half-written data.
- Kafka key is the order ID. This ensures all events for the same order land in the same partition, maintaining ordering per order.
- Events are published after the database write succeeds. This is important — and also a problem we’ll discuss shortly.
The repository is straightforward:
import org.springframework.data.mongodb.repository.MongoRepository
interface OrderRepository : MongoRepository<Order, String>
Spring Data does the rest.
MongoDB write concern — why w: majority
When the command service saves an order, MongoDB writes to the primary node. But what if the primary crashes before replicating to secondaries? Data lost.
Write concern w: majority tells MongoDB: “don’t acknowledge this write until a majority of replica set members have it.” For a 3-node replica set, that means at least 2 nodes.
Configure it in your MongoDB connection string:
spring:
data:
mongodb:
uri: mongodb://localhost:27017/orders_write?w=majority&wtimeoutMS=5000
The wtimeoutMS is a safety net. If majority acknowledgement takes longer than 5 seconds, the write fails rather than hanging forever. In production, tune this based on your replica set’s replication lag.
This matters for CQRS because the command service is the source of truth. If it says an order is placed, that order better exist — even if a node crashes.
Kafka producer — acks=all and idempotence
Look at the application.yml again:
spring:
kafka:
producer:
acks: all
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
acks: all means the Kafka broker won’t acknowledge the message until all in-sync replicas have written it. Same idea as MongoDB’s write concern — we don’t want to lose events.
enable.idempotence: true means the producer assigns a sequence number to each message. If a network glitch causes a retry, Kafka deduplicates based on the sequence number. No duplicate events.
max.in.flight.requests.per.connection: 5 is the maximum with idempotent producers. Kafka guarantees ordering even with 5 in-flight requests when idempotence is enabled. Before Kafka 3.x, this had to be 1 for ordering guarantees. Not anymore.
The elephant in the room — what if Kafka is down?
Look at the command handler again. We save to MongoDB, then publish to Kafka. Two separate operations. What if MongoDB succeeds but Kafka fails?
1. Save order to MongoDB ✅ Success
2. Publish event to Kafka ❌ Failure (Kafka is down)
Now the order exists in MongoDB but no event was published. The query side never hears about it. The customer can see their order if they hit the command service directly… except the command service doesn’t serve reads. The order is invisible.
This is the dual-write problem. You’re writing to two systems and you can’t make both atomic.
The standard solution is the transactional outbox pattern:
- Instead of publishing directly to Kafka, write the event to an
outboxcollection in the same MongoDB transaction as the order. - A separate process (or CDC connector like Debezium) reads the outbox and publishes to Kafka.
- Since the order and the outbox entry are in the same transaction, they either both succeed or both fail.
┌─────────────────────────────────────┐
│ MongoDB Transaction │
│ │
│ 1. Save order to 'orders' │
│ 2. Save event to 'outbox' │
│ │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ Outbox Relay (CDC / Polling) │
│ │
│ Reads from 'outbox' → Kafka │
└─────────────────────────────────────┘
We won’t implement the full outbox pattern in this post — it deserves its own treatment. For now, the direct publish approach works for development. Just know it’s not production-safe without outbox or a similar guarantee.
REST controller
The controller is thin. It translates HTTP requests into commands and hands them to the handler.
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.DeleteMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/orders")
class OrderCommandController(
private val commandHandler: OrderCommandHandler
) {
@PostMapping
fun placeOrder(@RequestBody request: PlaceOrderRequest): ResponseEntity<OrderResponse> {
val command = OrderCommand.PlaceOrder(
customerId = request.customerId,
items = request.items,
shippingAddress = request.shippingAddress
)
val order = commandHandler.handle(command)
return ResponseEntity
.status(HttpStatus.CREATED)
.body(order.toResponse())
}
@PutMapping("/{orderId}/address")
fun updateAddress(
@PathVariable orderId: String,
@RequestBody request: UpdateAddressRequest
): ResponseEntity<OrderResponse> {
val command = OrderCommand.UpdateShippingAddress(
orderId = orderId,
newAddress = request.newAddress
)
val order = commandHandler.handle(command)
return ResponseEntity.ok(order.toResponse())
}
@DeleteMapping("/{orderId}")
fun cancelOrder(
@PathVariable orderId: String,
@RequestBody request: CancelOrderRequest
): ResponseEntity<OrderResponse> {
val command = OrderCommand.CancelOrder(
orderId = orderId,
reason = request.reason
)
val order = commandHandler.handle(command)
return ResponseEntity.ok(order.toResponse())
}
}
data class PlaceOrderRequest(
val customerId: String,
val items: List<OrderItemRequest>,
val shippingAddress: Address
)
data class UpdateAddressRequest(
val newAddress: Address
)
data class CancelOrderRequest(
val reason: String
)
data class OrderResponse(
val id: String,
val customerId: String,
val status: OrderStatus,
val totalAmount: java.math.BigDecimal,
val createdAt: java.time.Instant
)
fun Order.toResponse() = OrderResponse(
id = id!!,
customerId = customerId,
status = status,
totalAmount = totalAmount,
createdAt = createdAt
)
Notice:
- No GET endpoints. This is the command service. Reads happen elsewhere.
- DELETE for cancellation. It takes a body with a reason. Some REST purists don’t like bodies on DELETE. Use POST
/orders/{id}/cancelif you prefer — the point is the command reaches the handler. - Thin mapping. The controller maps HTTP to commands. That’s it. No business logic.
Tests
Tests for the command handler focus on business rules. Mock the repository and Kafka template — we’re testing logic, not infrastructure.
import io.mockk.every
import io.mockk.mockk
import io.mockk.slot
import io.mockk.verify
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.springframework.kafka.core.KafkaTemplate
import java.math.BigDecimal
import java.util.Optional
class OrderCommandHandlerTest {
private lateinit var orderRepository: OrderRepository
private lateinit var kafkaTemplate: KafkaTemplate<String, OrderEvent>
private lateinit var handler: OrderCommandHandler
@BeforeEach
fun setUp() {
orderRepository = mockk()
kafkaTemplate = mockk(relaxed = true)
handler = OrderCommandHandler(orderRepository, kafkaTemplate)
}
@Nested
inner class PlaceOrderTests {
@Test
fun `should place order and publish event`() {
val command = OrderCommand.PlaceOrder(
customerId = "cust-1",
items = listOf(
OrderItemRequest("prod-1", "Widget", 2, BigDecimal("10.00"))
),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US")
)
val savedOrder = Order(
id = "order-1",
customerId = "cust-1",
items = listOf(
OrderItem("prod-1", "Widget", 2, BigDecimal("10.00"))
),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
status = OrderStatus.PLACED,
totalAmount = BigDecimal("20.00")
)
every { orderRepository.save(any()) } returns savedOrder
val result = handler.handle(command)
assertEquals("order-1", result.id)
assertEquals(BigDecimal("20.00"), result.totalAmount)
assertEquals(OrderStatus.PLACED, result.status)
val eventSlot = slot<OrderEvent.OrderPlaced>()
verify { kafkaTemplate.send("order-events", "order-1", capture(eventSlot)) }
assertEquals("cust-1", eventSlot.captured.customerId)
}
@Test
fun `should reject order with no items`() {
val command = OrderCommand.PlaceOrder(
customerId = "cust-1",
items = emptyList(),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US")
)
assertThrows(IllegalArgumentException::class.java) {
handler.handle(command)
}
}
@Test
fun `should reject order with zero quantity`() {
val command = OrderCommand.PlaceOrder(
customerId = "cust-1",
items = listOf(
OrderItemRequest("prod-1", "Widget", 0, BigDecimal("10.00"))
),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US")
)
assertThrows(IllegalArgumentException::class.java) {
handler.handle(command)
}
}
}
@Nested
inner class CancelOrderTests {
@Test
fun `should cancel a placed order`() {
val existingOrder = Order(
id = "order-1",
customerId = "cust-1",
items = listOf(OrderItem("prod-1", "Widget", 1, BigDecimal("10.00"))),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
status = OrderStatus.PLACED,
totalAmount = BigDecimal("10.00")
)
every { orderRepository.findById("order-1") } returns Optional.of(existingOrder)
every { orderRepository.save(any()) } answers { firstArg() }
val command = OrderCommand.CancelOrder("order-1", "Changed my mind")
val result = handler.handle(command)
assertEquals(OrderStatus.CANCELLED, result.status)
verify { kafkaTemplate.send("order-events", "order-1", any<OrderEvent.OrderCancelled>()) }
}
@Test
fun `should reject cancellation of delivered order`() {
val deliveredOrder = Order(
id = "order-1",
customerId = "cust-1",
items = listOf(OrderItem("prod-1", "Widget", 1, BigDecimal("10.00"))),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
status = OrderStatus.DELIVERED,
totalAmount = BigDecimal("10.00")
)
every { orderRepository.findById("order-1") } returns Optional.of(deliveredOrder)
assertThrows(IllegalArgumentException::class.java) {
handler.handle(OrderCommand.CancelOrder("order-1", "Too late"))
}
}
}
@Nested
inner class UpdateAddressTests {
@Test
fun `should update address for a placed order`() {
val existingOrder = Order(
id = "order-1",
customerId = "cust-1",
items = listOf(OrderItem("prod-1", "Widget", 1, BigDecimal("10.00"))),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
status = OrderStatus.PLACED,
totalAmount = BigDecimal("10.00")
)
every { orderRepository.findById("order-1") } returns Optional.of(existingOrder)
every { orderRepository.save(any()) } answers { firstArg() }
val newAddress = Address("456 Oak Ave", "Chicago", "IL", "60601", "US")
val command = OrderCommand.UpdateShippingAddress("order-1", newAddress)
val result = handler.handle(command)
assertEquals(newAddress, result.shippingAddress)
}
@Test
fun `should reject address update for shipped order`() {
val shippedOrder = Order(
id = "order-1",
customerId = "cust-1",
items = listOf(OrderItem("prod-1", "Widget", 1, BigDecimal("10.00"))),
shippingAddress = Address("123 Main St", "Springfield", "IL", "62701", "US"),
status = OrderStatus.SHIPPED,
totalAmount = BigDecimal("10.00")
)
every { orderRepository.findById("order-1") } returns Optional.of(shippedOrder)
assertThrows(IllegalArgumentException::class.java) {
handler.handle(
OrderCommand.UpdateShippingAddress(
"order-1",
Address("456 Oak Ave", "Chicago", "IL", "60601", "US")
)
)
}
}
}
}
These tests verify:
- Positive cases — placing an order calculates the total and publishes an event. Cancelling a placed order works. Updating address for a non-shipped order works.
- Negative cases — empty items rejected. Zero quantity rejected. Can’t cancel a delivered order. Can’t update address on a shipped order.
Mock the repository and Kafka template. Capture the event sent to Kafka to verify its contents. This gives you fast, isolated tests that run without Docker, MongoDB, or Kafka.
Project structure
Here’s how the command service looks when put together:
order-command-service/
├── src/main/kotlin/com/example/orders/command/
│ ├── OrderCommandApplication.kt
│ ├── domain/
│ │ ├── Order.kt
│ │ ├── OrderStatus.kt
│ │ ├── Address.kt
│ │ └── OrderItem.kt
│ ├── command/
│ │ └── OrderCommand.kt
│ ├── event/
│ │ └── OrderEvent.kt
│ ├── handler/
│ │ └── OrderCommandHandler.kt
│ ├── repository/
│ │ └── OrderRepository.kt
│ └── controller/
│ ├── OrderCommandController.kt
│ └── Requests.kt
└── src/test/kotlin/com/example/orders/command/
└── handler/
└── OrderCommandHandlerTest.kt
Clean separation. Domain knows nothing about Spring. Commands and events are plain data classes. The handler orchestrates. The controller translates HTTP.
What’s next
In Part 3, we’ll build the query side. A separate Spring Boot service that consumes events from Kafka, builds read-optimized projections in MongoDB, and serves fast queries for the customer dashboard and operations team.
Different service. Different model. Same events. That’s where CQRS starts to pay off.