CQRS with Spring Boot, Kafka & MongoDB — Part 5: Putting It All Together
Docker Compose setup, end-to-end flow walkthrough, integration tests with Testcontainers, and a production readiness checklist for the complete CQRS system.
This is Part 5 — the final post — of a series on building a CQRS architecture with Spring Boot, Kafka, and MongoDB.
- What is CQRS and why you need it
- Command side — writes done right
- Query side — reads at scale
- The hard parts
- Putting it all together (this post)
We’ve built the command service, the query service, and discussed every hard problem you’ll hit in production. Now let’s run the whole thing.
This post is hands-on. By the end of it, you’ll have a running CQRS system on your machine — two Spring Boot services, Kafka, MongoDB, all wired together.
Full architecture
Here’s what we’re running:
┌───────────────────────┐
│ Client │
│ (curl / Postman / UI) │
└───────┬───────┬────────┘
POST /orders │ │ GET /orders
│ │ │
┌─────────────▼───┘ └───▼──────────────┐
│ │
┌─────▼──────────┐ ┌──────────▼─────┐
│ Command Service │ │ Query Service │
│ :8081 │ │ :8082 │
│ │ │ │
│ - Validates │ │ - Consumes │
│ - Persists │ │ events │
│ - Writes to │ │ - Builds │
│ outbox │ │ projections │
│ - Publishes │ │ - Serves reads │
└────┬────────┬──┘ └──────▲──────────┘
│ │ │
│ │ ┌──────────────┐ │
│ └────▶│ Kafka │──────┘
│ │ broker:9092 │
│ │ │
│ │ Topic: │
│ │ order-events │
│ └──────────────┘
│
▼
┌──────────────┐
│ MongoDB │
│ rs0:27017 │
│ │
│ DB: orders │
│ Collections: │
│ - orders │
│ - outbox │
│ │
│ DB: order_ │
│ query │
│ Collections: │
│ - order_ │
│ projections │
│ - processed_ │
│ events │
└──────────────┘
Two services. One Kafka broker. One MongoDB replica set. The command service writes to the orders database. The query service writes to the order_query database. Same MongoDB instance, different databases — in production you might separate them entirely.
Docker Compose setup
Here’s the full docker-compose.yml. This uses KRaft mode for Kafka (no Zookeeper) and sets up MongoDB as a single-node replica set.
version: "3.9"
services:
mongodb:
image: mongo:7.0
container_name: cqrs-mongodb
ports:
- "27017:27017"
command: ["mongod", "--replSet", "rs0", "--bind_ip_all"]
volumes:
- mongodb_data:/data/db
healthcheck:
test: >
mongosh --eval "try { rs.status().ok } catch(e) { rs.initiate({_id:'rs0', members:[{_id:0, host:'mongodb:27017'}]}).ok }"
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
kafka:
image: apache/kafka:3.7.0
container_name: cqrs-kafka
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
healthcheck:
test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
interval: 10s
timeout: 10s
retries: 5
start_period: 30s
command-service:
build:
context: ./order-command-service
dockerfile: Dockerfile
container_name: cqrs-command-service
ports:
- "8081:8081"
environment:
SPRING_DATA_MONGODB_URI: mongodb://mongodb:27017/orders?replicaSet=rs0
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SERVER_PORT: 8081
depends_on:
mongodb:
condition: service_healthy
kafka:
condition: service_healthy
query-service:
build:
context: ./order-query-service
dockerfile: Dockerfile
container_name: cqrs-query-service
ports:
- "8082:8082"
environment:
SPRING_DATA_MONGODB_URI: mongodb://mongodb:27017/order_query?replicaSet=rs0
SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
SERVER_PORT: 8082
depends_on:
mongodb:
condition: service_healthy
kafka:
condition: service_healthy
volumes:
mongodb_data:
Each Spring Boot service has a standard Dockerfile:
FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY build/libs/*-SNAPSHOT.jar app.jar
EXPOSE 8081
ENTRYPOINT ["java", "-jar", "app.jar"]
Running the complete system
Step 1: Build both services
cd order-command-service && ./gradlew bootJar && cd ..
cd order-query-service && ./gradlew bootJar && cd ..
Step 2: Start everything
docker-compose up -d
Wait for all services to be healthy:
docker-compose ps
You should see all four containers running. MongoDB needs a few seconds to initialize the replica set, and Kafka needs about 30 seconds to be ready in KRaft mode.
Step 3: Place an order
curl -X POST http://localhost:8081/api/v1/orders \
-H "Content-Type: application/json" \
-d '{
"customerId": "CUST-001",
"items": [
{
"productId": "PROD-100",
"productName": "Mechanical Keyboard",
"quantity": 1,
"price": 129.99
},
{
"productId": "PROD-200",
"productName": "USB-C Cable",
"quantity": 2,
"price": 12.99
}
]
}'
Response:
{
"orderId": "ORD-a1b2c3d4",
"status": "PLACED",
"totalAmount": 155.97,
"createdAt": "2025-06-29T10:30:00Z"
}
Step 4: Query the order
Give it a second for the event to flow through Kafka, then:
curl http://localhost:8082/api/v1/orders/ORD-a1b2c3d4
Response:
{
"orderId": "ORD-a1b2c3d4",
"customerId": "CUST-001",
"items": [
{
"productId": "PROD-100",
"productName": "Mechanical Keyboard",
"quantity": 1,
"price": 129.99
},
{
"productId": "PROD-200",
"productName": "USB-C Cable",
"quantity": 2,
"price": 12.99
}
],
"totalAmount": 155.97,
"status": "PLACED",
"placedAt": "2025-06-29T10:30:00Z"
}
Step 5: Query the customer dashboard
curl http://localhost:8082/api/v1/customers/CUST-001/orders
This returns all orders for the customer — a denormalized, pre-joined projection optimized for the dashboard.
End-to-end flow walkthrough
Let’s trace what happens when you POST that order. Every step, every system boundary.
1. Client sends POST /api/v1/orders to Command Service (:8081)
│
2. ▼ PlaceOrderController receives the request
│ - Deserializes JSON into PlaceOrderCommand
│
3. ▼ OrderCommandService.placeOrder(command)
│ - Validates: customerId not blank, items not empty, prices > 0
│ - Generates orderId (e.g., "ORD-a1b2c3d4")
│ - Calculates totalAmount
│
4. ▼ MongoDB transaction begins
│ - Inserts Order document into 'orders' collection
│ - Inserts OutboxEntry into 'outbox' collection
│ - Transaction commits — both writes are atomic
│
5. ▼ Returns 201 Created with orderId to client
│
│ ── Client is done. Everything below is async. ──
│
6. ▼ OutboxPublisher (scheduled every 500ms) picks up the entry
│ - Reads unpublished entries from 'outbox'
│ - Serializes OrderPlacedEvent to JSON
│ - Publishes to Kafka topic 'order-events' with key = orderId
│ - Marks outbox entry as published
│
7. ▼ Kafka receives the message
│ - Routes to partition based on hash(orderId)
│ - Replicates (single broker in dev, 3 in production)
│
8. ▼ OrderEventConsumer in Query Service picks up the message
│ - Deserializes JSON into OrderPlacedEvent
│ - Checks processed_events collection — is this a duplicate?
│ - Not a duplicate, proceed
│
9. ▼ OrderProjectionHandler.handleOrderPlaced(event)
│ - Upserts into 'order_projections' collection
│ - Denormalizes data for fast reads
│ - Saves eventId to 'processed_events'
│
10.▼ Consumer commits Kafka offset
│ - Next poll will start from the next message
│
11.▼ GET /api/v1/orders/ORD-a1b2c3d4 now returns the order
Steps 1-5 take milliseconds. Steps 6-10 take somewhere between 500ms (outbox poll interval) and a few seconds. That’s the eventual consistency window we discussed in Part 4.
Testing the full system
Unit tests cover individual components. Integration tests verify the full flow — command in, event through Kafka, projection out.
Testcontainers gives us real MongoDB and Kafka instances in Docker, spun up and torn down per test class.
Dependencies
// build.gradle.kts
dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.testcontainers:testcontainers:1.19.7")
testImplementation("org.testcontainers:mongodb:1.19.7")
testImplementation("org.testcontainers:kafka:1.19.7")
testImplementation("org.testcontainers:junit-jupiter:1.19.7")
testImplementation("org.awaitility:awaitility-kotlin:4.2.1")
}
Full flow integration test
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class OrderFlowIntegrationTest {
companion object {
@Container
val mongodb = MongoDBContainer("mongo:7.0")
.withCommand("--replSet", "rs0")
@Container
val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))
@JvmStatic
@DynamicPropertySource
fun properties(registry: DynamicPropertyRegistry) {
registry.add("spring.data.mongodb.uri") {
"${mongodb.replicaSetUrl}?replicaSet=rs0"
}
registry.add("spring.kafka.bootstrap-servers") { kafka.bootstrapServers }
}
}
@Autowired
lateinit var restTemplate: TestRestTemplate
@Autowired
lateinit var mongoTemplate: MongoTemplate
@Test
fun `placing an order creates a queryable projection`() {
// Given — a valid order command
val command = mapOf(
"customerId" to "CUST-TEST-001",
"items" to listOf(
mapOf(
"productId" to "PROD-100",
"productName" to "Test Product",
"quantity" to 1,
"price" to 49.99
)
)
)
// When — we place the order via the command API
val response = restTemplate.postForEntity(
"/api/v1/orders",
command,
Map::class.java
)
// Then — the command succeeds
assertThat(response.statusCode).isEqualTo(HttpStatus.CREATED)
val orderId = response.body!!["orderId"] as String
assertThat(orderId).isNotBlank()
// And — the projection is available on the query side
// (with some delay for async processing)
await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(500))
.untilAsserted {
val projection = mongoTemplate.findOne(
Query.query(Criteria.where("orderId").`is`(orderId)),
Document::class.java,
"order_projections"
)
assertThat(projection).isNotNull
assertThat(projection!!.getString("customerId")).isEqualTo("CUST-TEST-001")
assertThat(projection.getString("status")).isEqualTo("PLACED")
}
}
@Test
fun `placing an order with invalid data returns 400`() {
val command = mapOf(
"customerId" to "", // Invalid — blank
"items" to emptyList<Any>() // Invalid — no items
)
val response = restTemplate.postForEntity(
"/api/v1/orders",
command,
Map::class.java
)
assertThat(response.statusCode).isEqualTo(HttpStatus.BAD_REQUEST)
}
@Test
fun `duplicate event processing is idempotent`() {
// Place an order
val command = mapOf(
"customerId" to "CUST-TEST-002",
"items" to listOf(
mapOf(
"productId" to "PROD-200",
"productName" to "Another Product",
"quantity" to 2,
"price" to 25.00
)
)
)
val response = restTemplate.postForEntity(
"/api/v1/orders",
command,
Map::class.java
)
val orderId = response.body!!["orderId"] as String
// Wait for projection
await()
.atMost(Duration.ofSeconds(10))
.untilAsserted {
val count = mongoTemplate.count(
Query.query(Criteria.where("orderId").`is`(orderId)),
"order_projections"
)
assertThat(count).isEqualTo(1)
}
// Even after waiting, there should still be exactly one projection
// (no duplicates from at-least-once delivery)
val count = mongoTemplate.count(
Query.query(Criteria.where("orderId").`is`(orderId)),
"order_projections"
)
assertThat(count).isEqualTo(1)
}
}
The key here is awaitility. The test places an order synchronously but waits asynchronously for the projection to appear. That’s the nature of CQRS — the write is synchronous, the read is eventually consistent.
What to test at each level
| Level | What to test | Tools |
|---|---|---|
| Unit | Command validation, event serialization, projection mapping | JUnit, MockK |
| Integration | Full flow: command → Kafka → projection | Testcontainers, Awaitility |
| Contract | Event schema compatibility between services | JSON Schema validation |
| E2E | User scenarios against Docker Compose | REST Assured, curl scripts |
Production checklist
Before deploying this to production, make sure you have these in place.
Monitoring
- Kafka consumer lag alerts (warning at 1K, critical at 10K)
- Dead letter topic monitoring (alert on any message)
- Projection freshness metric (alert if stale > 60s)
- Spring Actuator
/healthendpoint exposed and monitored - MongoDB connection pool metrics in Micrometer
- Kafka producer delivery failure rate
Resilience
- Dead letter topic configured for poison pill events
- Idempotent consumers with upserts or event ID tracking
- Manual Kafka offset commits (not auto-commit)
- Transactional outbox pattern (not dual writes)
- Graceful shutdown — finish processing current batch before stopping
- Circuit breaker on external calls (if any)
Data
- MongoDB replica set with at least 3 members
- Write concern
majorityon the command service - Backup strategy for both MongoDB databases
- Kafka topic retention policy set (7 days is a reasonable default)
- Kafka topic with enough partitions for your throughput (start with 6, increase if needed)
- Index on
outbox.publishedandoutbox.createdAtfor the polling publisher
Operations
- Runbook for “consumer lag is growing” — check consumer health, restart if stuck, check for poison pills in DLT
- Runbook for “projection is stale” — check consumer group, check Kafka connectivity, check MongoDB connectivity
- Runbook for “outbox table is growing” — check outbox publisher, check Kafka connectivity
- Ability to replay events from Kafka to rebuild projections (reset consumer group offset)
- Log correlation IDs across both services for distributed tracing
Capacity planning
- Load test the command service independently
- Load test the query service independently
- Measure maximum sustainable throughput of the Kafka consumer
- Size MongoDB collections — estimate growth per month
- Plan Kafka storage — how much data at your retention period
What we built
Let’s take a step back and look at the whole series.
Part 1 — We defined CQRS. Separate the reads from the writes. Use Kafka as the bridge. MongoDB for storage. We drew the line between CQRS and event sourcing and established when CQRS makes sense.
Part 2 — We built the command service. A Spring Boot application in Kotlin that accepts commands, validates them, persists orders to MongoDB, and publishes domain events to Kafka. Write concern majority. Clean separation of commands and events.
Part 3 — We built the query service. Another Spring Boot application that consumes events from Kafka, builds denormalized projections in MongoDB, and serves fast read APIs. Read preference secondaryPreferred. Projections shaped for specific consumers.
Part 4 — We tackled the hard parts. Eventual consistency strategies. Idempotent consumers with upserts and event ID tracking. Consumer failure handling with manual offsets and dead letter topics. Schema evolution. Partition ordering. The transactional outbox pattern.
Part 5 — This post. Docker Compose setup, running it end-to-end, integration tests with Testcontainers, and the production checklist.
Where to go from here
This series covers CQRS with state-based persistence — the command service stores current state, not events. There’s more to explore.
Event sourcing — instead of storing the latest order state, store every event that ever happened to that order. Rebuild state by replaying events. Powerful for audit trails and temporal queries, but significantly more complex. Consider it when you need a complete history of every state change.
Saga pattern — when a command spans multiple services (place order → reserve inventory → charge payment), you need a saga to coordinate them. Each step publishes an event that triggers the next. If any step fails, compensating events undo previous steps.
Different database combinations — CQRS doesn’t require MongoDB on both sides. A common production setup is PostgreSQL for the command side (strong consistency, ACID transactions) and Elasticsearch for the query side (full-text search, faceted queries). Or PostgreSQL for writes and Redis for reads if your query patterns are simple key-value lookups.
Scaling Kafka consumers — add more consumer instances to your consumer group. Kafka automatically rebalances partitions across consumers. With 6 partitions, you can have up to 6 consumer instances processing in parallel.
The full source code for this series is on GitHub: github.com/krrishnaaaa/cqrs-spring-boot-kafka
Final thoughts
CQRS is a powerful pattern, but it’s not free. You’re trading the simplicity of a single service for the operational complexity of two services, a message broker, eventual consistency, and all the failure modes that come with distributed systems.
Use it when the trade-off is worth it — when your reads and writes have fundamentally different requirements, when you need to scale them independently, when multiple consumers need to react to domain events.
Don’t use it when a simple service with a single database solves your problem. Most applications don’t need CQRS. A well-structured monolith with clean separation of concerns handles a surprising amount of traffic.
But when you do need it — when the dashboard is slow because your aggregation pipeline runs on every request, when your write path is bottlenecked by read traffic, when three different teams need to consume your domain events — CQRS gives you a clean, scalable architecture with well-defined boundaries.
Start simple. Add complexity when the pain is real. And when you add it, do it properly — with the outbox pattern, idempotent consumers, proper monitoring, and a team that understands the trade-offs.