PCSalt
YouTube GitHub
Back to Architecture
Architecture · 7 min read

CQRS with Spring Boot, Kafka & MongoDB — Part 5: Putting It All Together

Docker Compose setup, end-to-end flow walkthrough, integration tests with Testcontainers, and a production readiness checklist for the complete CQRS system.


This is Part 5 — the final post — of a series on building a CQRS architecture with Spring Boot, Kafka, and MongoDB.

  1. What is CQRS and why you need it
  2. Command side — writes done right
  3. Query side — reads at scale
  4. The hard parts
  5. Putting it all together (this post)

We’ve built the command service, the query service, and discussed every hard problem you’ll hit in production. Now let’s run the whole thing.

This post is hands-on. By the end of it, you’ll have a running CQRS system on your machine — two Spring Boot services, Kafka, MongoDB, all wired together.

Full architecture

Here’s what we’re running:

                          ┌───────────────────────┐
                          │       Client           │
                          │  (curl / Postman / UI) │
                          └───────┬───────┬────────┘
                        POST /orders  │   │  GET /orders
                                  │   │   │
                    ┌─────────────▼───┘   └───▼──────────────┐
                    │                                         │
              ┌─────▼──────────┐                  ┌──────────▼─────┐
              │ Command Service │                  │  Query Service  │
              │   :8081         │                  │   :8082         │
              │                 │                  │                 │
              │ - Validates     │                  │ - Consumes      │
              │ - Persists      │                  │   events        │
              │ - Writes to     │                  │ - Builds        │
              │   outbox        │                  │   projections   │
              │ - Publishes     │                  │ - Serves reads  │
              └────┬────────┬──┘                  └──────▲──────────┘
                   │        │                            │
                   │        │     ┌──────────────┐       │
                   │        └────▶│    Kafka      │──────┘
                   │              │  broker:9092  │
                   │              │               │
                   │              │ Topic:        │
                   │              │ order-events  │
                   │              └──────────────┘


            ┌──────────────┐
            │   MongoDB     │
            │  rs0:27017    │
            │               │
            │ DB: orders    │
            │ Collections:  │
            │ - orders      │
            │ - outbox      │
            │               │
            │ DB: order_     │
            │     query     │
            │ Collections:  │
            │ - order_      │
            │   projections │
            │ - processed_  │
            │   events      │
            └──────────────┘

Two services. One Kafka broker. One MongoDB replica set. The command service writes to the orders database. The query service writes to the order_query database. Same MongoDB instance, different databases — in production you might separate them entirely.

Docker Compose setup

Here’s the full docker-compose.yml. This uses KRaft mode for Kafka (no Zookeeper) and sets up MongoDB as a single-node replica set.

version: "3.9"

services:

  mongodb:
    image: mongo:7.0
    container_name: cqrs-mongodb
    ports:
      - "27017:27017"
    command: ["mongod", "--replSet", "rs0", "--bind_ip_all"]
    volumes:
      - mongodb_data:/data/db
    healthcheck:
      test: >
        mongosh --eval "try { rs.status().ok } catch(e) { rs.initiate({_id:'rs0', members:[{_id:0, host:'mongodb:27017'}]}).ok }"
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 10s

  kafka:
    image: apache/kafka:3.7.0
    container_name: cqrs-kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    healthcheck:
      test: /opt/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
      interval: 10s
      timeout: 10s
      retries: 5
      start_period: 30s

  command-service:
    build:
      context: ./order-command-service
      dockerfile: Dockerfile
    container_name: cqrs-command-service
    ports:
      - "8081:8081"
    environment:
      SPRING_DATA_MONGODB_URI: mongodb://mongodb:27017/orders?replicaSet=rs0
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      SERVER_PORT: 8081
    depends_on:
      mongodb:
        condition: service_healthy
      kafka:
        condition: service_healthy

  query-service:
    build:
      context: ./order-query-service
      dockerfile: Dockerfile
    container_name: cqrs-query-service
    ports:
      - "8082:8082"
    environment:
      SPRING_DATA_MONGODB_URI: mongodb://mongodb:27017/order_query?replicaSet=rs0
      SPRING_KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      SERVER_PORT: 8082
    depends_on:
      mongodb:
        condition: service_healthy
      kafka:
        condition: service_healthy

volumes:
  mongodb_data:

Each Spring Boot service has a standard Dockerfile:

FROM eclipse-temurin:21-jre-alpine
WORKDIR /app
COPY build/libs/*-SNAPSHOT.jar app.jar
EXPOSE 8081
ENTRYPOINT ["java", "-jar", "app.jar"]

Running the complete system

Step 1: Build both services

cd order-command-service && ./gradlew bootJar && cd ..
cd order-query-service && ./gradlew bootJar && cd ..

Step 2: Start everything

docker-compose up -d

Wait for all services to be healthy:

docker-compose ps

You should see all four containers running. MongoDB needs a few seconds to initialize the replica set, and Kafka needs about 30 seconds to be ready in KRaft mode.

Step 3: Place an order

curl -X POST http://localhost:8081/api/v1/orders \
  -H "Content-Type: application/json" \
  -d '{
    "customerId": "CUST-001",
    "items": [
      {
        "productId": "PROD-100",
        "productName": "Mechanical Keyboard",
        "quantity": 1,
        "price": 129.99
      },
      {
        "productId": "PROD-200",
        "productName": "USB-C Cable",
        "quantity": 2,
        "price": 12.99
      }
    ]
  }'

Response:

{
  "orderId": "ORD-a1b2c3d4",
  "status": "PLACED",
  "totalAmount": 155.97,
  "createdAt": "2025-06-29T10:30:00Z"
}

Step 4: Query the order

Give it a second for the event to flow through Kafka, then:

curl http://localhost:8082/api/v1/orders/ORD-a1b2c3d4

Response:

{
  "orderId": "ORD-a1b2c3d4",
  "customerId": "CUST-001",
  "items": [
    {
      "productId": "PROD-100",
      "productName": "Mechanical Keyboard",
      "quantity": 1,
      "price": 129.99
    },
    {
      "productId": "PROD-200",
      "productName": "USB-C Cable",
      "quantity": 2,
      "price": 12.99
    }
  ],
  "totalAmount": 155.97,
  "status": "PLACED",
  "placedAt": "2025-06-29T10:30:00Z"
}

Step 5: Query the customer dashboard

curl http://localhost:8082/api/v1/customers/CUST-001/orders

This returns all orders for the customer — a denormalized, pre-joined projection optimized for the dashboard.

End-to-end flow walkthrough

Let’s trace what happens when you POST that order. Every step, every system boundary.

1. Client sends POST /api/v1/orders to Command Service (:8081)

2. ▼ PlaceOrderController receives the request
   │  - Deserializes JSON into PlaceOrderCommand

3. ▼ OrderCommandService.placeOrder(command)
   │  - Validates: customerId not blank, items not empty, prices > 0
   │  - Generates orderId (e.g., "ORD-a1b2c3d4")
   │  - Calculates totalAmount

4. ▼ MongoDB transaction begins
   │  - Inserts Order document into 'orders' collection
   │  - Inserts OutboxEntry into 'outbox' collection
   │  - Transaction commits — both writes are atomic

5. ▼ Returns 201 Created with orderId to client

   │  ── Client is done. Everything below is async. ──

6. ▼ OutboxPublisher (scheduled every 500ms) picks up the entry
   │  - Reads unpublished entries from 'outbox'
   │  - Serializes OrderPlacedEvent to JSON
   │  - Publishes to Kafka topic 'order-events' with key = orderId
   │  - Marks outbox entry as published

7. ▼ Kafka receives the message
   │  - Routes to partition based on hash(orderId)
   │  - Replicates (single broker in dev, 3 in production)

8. ▼ OrderEventConsumer in Query Service picks up the message
   │  - Deserializes JSON into OrderPlacedEvent
   │  - Checks processed_events collection — is this a duplicate?
   │  - Not a duplicate, proceed

9. ▼ OrderProjectionHandler.handleOrderPlaced(event)
   │  - Upserts into 'order_projections' collection
   │  - Denormalizes data for fast reads
   │  - Saves eventId to 'processed_events'

10.▼ Consumer commits Kafka offset
   │  - Next poll will start from the next message

11.▼ GET /api/v1/orders/ORD-a1b2c3d4 now returns the order

Steps 1-5 take milliseconds. Steps 6-10 take somewhere between 500ms (outbox poll interval) and a few seconds. That’s the eventual consistency window we discussed in Part 4.

Testing the full system

Unit tests cover individual components. Integration tests verify the full flow — command in, event through Kafka, projection out.

Testcontainers gives us real MongoDB and Kafka instances in Docker, spun up and torn down per test class.

Dependencies

// build.gradle.kts
dependencies {
  testImplementation("org.springframework.boot:spring-boot-starter-test")
  testImplementation("org.testcontainers:testcontainers:1.19.7")
  testImplementation("org.testcontainers:mongodb:1.19.7")
  testImplementation("org.testcontainers:kafka:1.19.7")
  testImplementation("org.testcontainers:junit-jupiter:1.19.7")
  testImplementation("org.awaitility:awaitility-kotlin:4.2.1")
}

Full flow integration test

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
class OrderFlowIntegrationTest {

  companion object {
    @Container
    val mongodb = MongoDBContainer("mongo:7.0")
      .withCommand("--replSet", "rs0")

    @Container
    val kafka = KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.0"))

    @JvmStatic
    @DynamicPropertySource
    fun properties(registry: DynamicPropertyRegistry) {
      registry.add("spring.data.mongodb.uri") {
        "${mongodb.replicaSetUrl}?replicaSet=rs0"
      }
      registry.add("spring.kafka.bootstrap-servers") { kafka.bootstrapServers }
    }
  }

  @Autowired
  lateinit var restTemplate: TestRestTemplate

  @Autowired
  lateinit var mongoTemplate: MongoTemplate

  @Test
  fun `placing an order creates a queryable projection`() {
    // Given — a valid order command
    val command = mapOf(
      "customerId" to "CUST-TEST-001",
      "items" to listOf(
        mapOf(
          "productId" to "PROD-100",
          "productName" to "Test Product",
          "quantity" to 1,
          "price" to 49.99
        )
      )
    )

    // When — we place the order via the command API
    val response = restTemplate.postForEntity(
      "/api/v1/orders",
      command,
      Map::class.java
    )

    // Then — the command succeeds
    assertThat(response.statusCode).isEqualTo(HttpStatus.CREATED)
    val orderId = response.body!!["orderId"] as String
    assertThat(orderId).isNotBlank()

    // And — the projection is available on the query side
    // (with some delay for async processing)
    await()
      .atMost(Duration.ofSeconds(10))
      .pollInterval(Duration.ofMillis(500))
      .untilAsserted {
        val projection = mongoTemplate.findOne(
          Query.query(Criteria.where("orderId").`is`(orderId)),
          Document::class.java,
          "order_projections"
        )
        assertThat(projection).isNotNull
        assertThat(projection!!.getString("customerId")).isEqualTo("CUST-TEST-001")
        assertThat(projection.getString("status")).isEqualTo("PLACED")
      }
  }

  @Test
  fun `placing an order with invalid data returns 400`() {
    val command = mapOf(
      "customerId" to "",  // Invalid — blank
      "items" to emptyList<Any>()  // Invalid — no items
    )

    val response = restTemplate.postForEntity(
      "/api/v1/orders",
      command,
      Map::class.java
    )

    assertThat(response.statusCode).isEqualTo(HttpStatus.BAD_REQUEST)
  }

  @Test
  fun `duplicate event processing is idempotent`() {
    // Place an order
    val command = mapOf(
      "customerId" to "CUST-TEST-002",
      "items" to listOf(
        mapOf(
          "productId" to "PROD-200",
          "productName" to "Another Product",
          "quantity" to 2,
          "price" to 25.00
        )
      )
    )

    val response = restTemplate.postForEntity(
      "/api/v1/orders",
      command,
      Map::class.java
    )
    val orderId = response.body!!["orderId"] as String

    // Wait for projection
    await()
      .atMost(Duration.ofSeconds(10))
      .untilAsserted {
        val count = mongoTemplate.count(
          Query.query(Criteria.where("orderId").`is`(orderId)),
          "order_projections"
        )
        assertThat(count).isEqualTo(1)
      }

    // Even after waiting, there should still be exactly one projection
    // (no duplicates from at-least-once delivery)
    val count = mongoTemplate.count(
      Query.query(Criteria.where("orderId").`is`(orderId)),
      "order_projections"
    )
    assertThat(count).isEqualTo(1)
  }
}

The key here is awaitility. The test places an order synchronously but waits asynchronously for the projection to appear. That’s the nature of CQRS — the write is synchronous, the read is eventually consistent.

What to test at each level

LevelWhat to testTools
UnitCommand validation, event serialization, projection mappingJUnit, MockK
IntegrationFull flow: command → Kafka → projectionTestcontainers, Awaitility
ContractEvent schema compatibility between servicesJSON Schema validation
E2EUser scenarios against Docker ComposeREST Assured, curl scripts

Production checklist

Before deploying this to production, make sure you have these in place.

Monitoring

  • Kafka consumer lag alerts (warning at 1K, critical at 10K)
  • Dead letter topic monitoring (alert on any message)
  • Projection freshness metric (alert if stale > 60s)
  • Spring Actuator /health endpoint exposed and monitored
  • MongoDB connection pool metrics in Micrometer
  • Kafka producer delivery failure rate

Resilience

  • Dead letter topic configured for poison pill events
  • Idempotent consumers with upserts or event ID tracking
  • Manual Kafka offset commits (not auto-commit)
  • Transactional outbox pattern (not dual writes)
  • Graceful shutdown — finish processing current batch before stopping
  • Circuit breaker on external calls (if any)

Data

  • MongoDB replica set with at least 3 members
  • Write concern majority on the command service
  • Backup strategy for both MongoDB databases
  • Kafka topic retention policy set (7 days is a reasonable default)
  • Kafka topic with enough partitions for your throughput (start with 6, increase if needed)
  • Index on outbox.published and outbox.createdAt for the polling publisher

Operations

  • Runbook for “consumer lag is growing” — check consumer health, restart if stuck, check for poison pills in DLT
  • Runbook for “projection is stale” — check consumer group, check Kafka connectivity, check MongoDB connectivity
  • Runbook for “outbox table is growing” — check outbox publisher, check Kafka connectivity
  • Ability to replay events from Kafka to rebuild projections (reset consumer group offset)
  • Log correlation IDs across both services for distributed tracing

Capacity planning

  • Load test the command service independently
  • Load test the query service independently
  • Measure maximum sustainable throughput of the Kafka consumer
  • Size MongoDB collections — estimate growth per month
  • Plan Kafka storage — how much data at your retention period

What we built

Let’s take a step back and look at the whole series.

Part 1 — We defined CQRS. Separate the reads from the writes. Use Kafka as the bridge. MongoDB for storage. We drew the line between CQRS and event sourcing and established when CQRS makes sense.

Part 2 — We built the command service. A Spring Boot application in Kotlin that accepts commands, validates them, persists orders to MongoDB, and publishes domain events to Kafka. Write concern majority. Clean separation of commands and events.

Part 3 — We built the query service. Another Spring Boot application that consumes events from Kafka, builds denormalized projections in MongoDB, and serves fast read APIs. Read preference secondaryPreferred. Projections shaped for specific consumers.

Part 4 — We tackled the hard parts. Eventual consistency strategies. Idempotent consumers with upserts and event ID tracking. Consumer failure handling with manual offsets and dead letter topics. Schema evolution. Partition ordering. The transactional outbox pattern.

Part 5 — This post. Docker Compose setup, running it end-to-end, integration tests with Testcontainers, and the production checklist.

Where to go from here

This series covers CQRS with state-based persistence — the command service stores current state, not events. There’s more to explore.

Event sourcing — instead of storing the latest order state, store every event that ever happened to that order. Rebuild state by replaying events. Powerful for audit trails and temporal queries, but significantly more complex. Consider it when you need a complete history of every state change.

Saga pattern — when a command spans multiple services (place order → reserve inventory → charge payment), you need a saga to coordinate them. Each step publishes an event that triggers the next. If any step fails, compensating events undo previous steps.

Different database combinations — CQRS doesn’t require MongoDB on both sides. A common production setup is PostgreSQL for the command side (strong consistency, ACID transactions) and Elasticsearch for the query side (full-text search, faceted queries). Or PostgreSQL for writes and Redis for reads if your query patterns are simple key-value lookups.

Scaling Kafka consumers — add more consumer instances to your consumer group. Kafka automatically rebalances partitions across consumers. With 6 partitions, you can have up to 6 consumer instances processing in parallel.

The full source code for this series is on GitHub: github.com/krrishnaaaa/cqrs-spring-boot-kafka

Final thoughts

CQRS is a powerful pattern, but it’s not free. You’re trading the simplicity of a single service for the operational complexity of two services, a message broker, eventual consistency, and all the failure modes that come with distributed systems.

Use it when the trade-off is worth it — when your reads and writes have fundamentally different requirements, when you need to scale them independently, when multiple consumers need to react to domain events.

Don’t use it when a simple service with a single database solves your problem. Most applications don’t need CQRS. A well-structured monolith with clean separation of concerns handles a surprising amount of traffic.

But when you do need it — when the dashboard is slow because your aggregation pipeline runs on every request, when your write path is bottlenecked by read traffic, when three different teams need to consume your domain events — CQRS gives you a clean, scalable architecture with well-defined boundaries.

Start simple. Add complexity when the pain is real. And when you add it, do it properly — with the outbox pattern, idempotent consumers, proper monitoring, and a team that understands the trade-offs.