PCSalt
YouTube GitHub
Back to Kotlin
Kotlin · 6 min read

Kotlin Coroutines — Part 3: Real-World Patterns

Flows, StateFlow, coroutines in Android and Spring Boot, Retrofit, Room, and testing — practical patterns you'll use every day.


This is Part 3 — the final post in the Kotlin Coroutines series.

  1. The Basics
  2. Structured Concurrency
  3. Real-World Patterns (this post)

Parts 1 and 2 covered the foundations — suspend functions, dispatchers, structured concurrency, exception handling, and cancellation. This post is about applying all of that. We’ll look at the patterns you’ll actually use in Android apps, Spring Boot services, and everywhere in between.

Flow basics — cold streams

A suspend function returns a single value. But what if you need a stream of values over time? That’s what Flow is for.

A Flow is a cold stream. It doesn’t produce values until someone collects it. Think of it like a Kotlin Sequence, but asynchronous.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay

fun countDown(from: Int): Flow<Int> = flow {
  for (i in from downTo 0) {
    delay(1000)
    emit(i)
  }
}

Nothing happens until you collect:

import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
  countDown(5).collect { value ->
    println(value) // prints 5, 4, 3, 2, 1, 0 — one per second
  }
}

Flow vs Sequence

Sequences are synchronous. They block the thread while producing values. Flows are asynchronous — they suspend instead of blocking.

// Sequence — blocks the thread during each delay
fun sequenceCountDown(from: Int): Sequence<Int> = sequence {
  for (i in from downTo 0) {
    Thread.sleep(1000) // can't use delay() here
    yield(i)
  }
}

// Flow — suspends, doesn't block
fun flowCountDown(from: Int): Flow<Int> = flow {
  for (i in from downTo 0) {
    delay(1000) // suspend, not block
    emit(i)
  }
}

Use Sequence when the data is already in memory and you want lazy evaluation. Use Flow when the data comes from async sources — network, database, sensors, user input.

Flow operators

Flows support operators that look similar to collection operations but work on streams.

Transforming and filtering

import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach

fun evenSquares(): Flow<Int> =
  countDown(10)
    .filter { it % 2 == 0 }
    .map { it * it }
    .onEach { println("Processing: $it") }

Error handling with catch

The catch operator handles upstream exceptions. It doesn’t catch errors in collect.

import kotlinx.coroutines.flow.catch

fun riskyFlow(): Flow<String> = flow {
  emit("first")
  emit("second")
  throw RuntimeException("something broke")
  emit("third") // never reached
}

suspend fun main() {
  riskyFlow()
    .catch { e -> emit("fallback: ${e.message}") }
    .collect { println(it) }
  // prints: first, second, fallback: something broke
}

Search-as-you-type with debounce and distinctUntilChanged

This is where Flow operators really shine. Imagine a search box where you want to query an API as the user types, but not on every keystroke.

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf

class SearchViewModel(
  private val repository: SearchRepository
) : ViewModel() {

  private val queryFlow = MutableStateFlow("")

  val searchResults: Flow<List<SearchResult>> =
    queryFlow
      .debounce(300) // wait 300ms after the user stops typing
      .distinctUntilChanged() // skip if the query hasn't changed
      .flatMapLatest { query ->
        if (query.isBlank()) {
          flowOf(emptyList())
        } else {
          repository.search(query) // returns Flow<List<SearchResult>>
        }
      }

  fun onQueryChanged(query: String) {
    queryFlow.value = query
  }
}

Without debounce, you’d fire a network request on every keystroke. Without distinctUntilChanged, you’d re-query when the user types a character and immediately deletes it. Without flatMapLatest, a slow first query could overwrite a fast second query’s results.

Three operators. Problem solved.

StateFlow and SharedFlow — hot streams

Flows are cold — they restart from scratch for each collector. Sometimes you need a hot stream that has a current value and shares it with multiple collectors. That’s StateFlow.

StateFlow

StateFlow always has a value. It’s like a reactive variable.

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow

class CounterViewModel : ViewModel() {

  private val _count = MutableStateFlow(0)
  val count: StateFlow<Int> = _count.asStateFlow()

  fun increment() {
    _count.value++
  }

  fun decrement() {
    _count.value--
  }
}

SharedFlow

SharedFlow is like StateFlow but without an initial value and with configurable replay. Use it for one-time events — navigation, snackbars, toasts.

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow

class OrderViewModel : ViewModel() {

  private val _events = MutableSharedFlow<UiEvent>()
  val events: SharedFlow<UiEvent> = _events.asSharedFlow()

  fun placeOrder(order: Order) {
    viewModelScope.launch {
      try {
        repository.placeOrder(order)
        _events.emit(UiEvent.ShowSnackbar("Order placed"))
        _events.emit(UiEvent.NavigateTo(Screen.OrderConfirmation))
      } catch (e: Exception) {
        _events.emit(UiEvent.ShowSnackbar("Failed: ${e.message}"))
      }
    }
  }

  sealed interface UiEvent {
    data class ShowSnackbar(val message: String) : UiEvent
    data class NavigateTo(val screen: Screen) : UiEvent
  }
}

StateFlow vs LiveData

If you’re coming from LiveData, StateFlow is the coroutine-native replacement.

StateFlowLiveData
Requires initial valueYesNo
Lifecycle-awareNo (need repeatOnLifecycle)Yes
Kotlin-firstYesDesigned for Java too
OperatorsFull Flow operator supportLimited transformations
TestingStandard coroutine testingNeeds InstantTaskExecutorRule

Use StateFlow for new code. It integrates naturally with coroutines and gives you the full power of Flow operators.

Coroutines in Android

viewModelScope

Every ViewModel gets viewModelScope out of the box. It’s tied to the ViewModel’s lifecycle — when the ViewModel is cleared, all coroutines launched in this scope are cancelled automatically.

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.launch

class UserProfileViewModel(
  private val userRepository: UserRepository
) : ViewModel() {

  private val _uiState = MutableStateFlow<UiState>(UiState.Loading)
  val uiState: StateFlow<UiState> = _uiState.asStateFlow()

  fun loadProfile(userId: String) {
    viewModelScope.launch {
      _uiState.value = UiState.Loading
      try {
        val user = userRepository.getUser(userId)
        _uiState.value = UiState.Success(user)
      } catch (e: Exception) {
        _uiState.value = UiState.Error(e.message ?: "Unknown error")
      }
    }
  }

  sealed interface UiState {
    data object Loading : UiState
    data class Success(val user: User) : UiState
    data class Error(val message: String) : UiState
  }
}

User navigates away, ViewModel is cleared, coroutine is cancelled. No leaked network calls. No callbacks firing after the screen is gone.

lifecycleScope

For Activities and Fragments, there’s lifecycleScope. Same idea — coroutines are cancelled when the lifecycle is destroyed.

import androidx.lifecycle.lifecycleScope
import kotlinx.coroutines.launch

class UserProfileActivity : AppCompatActivity() {

  override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)

    lifecycleScope.launch {
      val config = configRepository.fetchRemoteConfig()
      applyConfig(config)
    }
  }
}

repeatOnLifecycle — collecting flows safely

Here’s a common mistake. You collect a Flow in lifecycleScope, but the collection keeps running when the app is in the background. The UI isn’t visible, but you’re still processing updates.

The fix is repeatOnLifecycle. It starts collection when the lifecycle reaches a given state and cancels it when it drops below.

import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.launch

class UserProfileActivity : AppCompatActivity() {

  private val viewModel: UserProfileViewModel by viewModels()

  override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)

    lifecycleScope.launch {
      repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.uiState.collect { state ->
          when (state) {
            is UiState.Loading -> showLoading()
            is UiState.Success -> showProfile(state.user)
            is UiState.Error -> showError(state.message)
          }
        }
      }
    }
  }
}

When the activity goes to the background (STOPPED), collection is cancelled. When it comes back to the foreground (STARTED), collection restarts. StateFlow replays the latest value, so the UI immediately gets the current state.

The pattern: ViewModel exposes StateFlow, UI collects

This is the pattern you’ll use in every screen:

  1. ViewModel holds a MutableStateFlow<UiState> privately.
  2. ViewModel exposes it as StateFlow<UiState> (read-only).
  3. ViewModel updates the state in viewModelScope.launch.
  4. Activity/Fragment collects with repeatOnLifecycle.

Clean separation. Testable ViewModel. Lifecycle-safe UI.

Coroutines in Spring Boot

Coroutines aren’t just for Android. Spring Boot with WebFlux has first-class support for suspend functions.

Suspend fun in controllers

With Spring WebFlux, your controller functions can be suspend. Spring handles the coroutine context for you.

import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/api/v1/orders")
class OrderController(
  private val orderService: OrderService
) {

  @GetMapping("/{id}")
  suspend fun getOrder(@PathVariable id: String): OrderResponse {
    return orderService.findById(id)
      ?: throw OrderNotFoundException(id)
  }

  @PostMapping
  suspend fun createOrder(@RequestBody request: CreateOrderRequest): OrderResponse {
    return orderService.create(request)
  }
}

No Mono<OrderResponse>. No Flux. Just regular Kotlin functions that happen to be suspend. Spring WebFlux converts them to reactive types under the hood.

If you’re building something like the order management system from the CQRS series, suspend controllers are a natural fit for the command side.

Coroutines with Spring Data

Spring Data’s ReactiveCrudRepository works with coroutines through CoroutineCrudRepository:

import org.springframework.data.repository.kotlin.CoroutineCrudRepository
import kotlinx.coroutines.flow.Flow

interface OrderRepository : CoroutineCrudRepository<Order, String> {

  suspend fun findByCustomerId(customerId: String): List<Order>

  fun findByStatus(status: OrderStatus): Flow<Order>
}

Single results use suspend. Multiple results return Flow. No reactive wrappers needed.

runBlocking in tests

In tests, you need to bridge between the synchronous test framework and suspend functions. That’s what runBlocking is for — though as we’ll see later, runTest is better for most cases.

import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest

@SpringBootTest
class OrderServiceTest {

  @Autowired
  private lateinit var orderService: OrderService

  @Test
  fun `should create order`() = runBlocking {
    val request = CreateOrderRequest(
      customerId = "cust-1",
      items = listOf(OrderItem("product-1", 2))
    )

    val order = orderService.create(request)

    assert(order.customerId == "cust-1")
    assert(order.items.size == 1)
  }
}

Retrofit + Coroutines

Remember callback hell? Nested enqueue calls, error handling scattered everywhere? Retrofit with coroutines makes that a thing of the past.

Just declare your API methods as suspend:

import retrofit2.http.Body
import retrofit2.http.GET
import retrofit2.http.POST
import retrofit2.http.Path
import retrofit2.http.Query

interface OrderApi {

  @GET("api/v1/orders/{id}")
  suspend fun getOrder(@Path("id") id: String): OrderResponse

  @GET("api/v1/orders")
  suspend fun listOrders(
    @Query("customerId") customerId: String,
    @Query("page") page: Int = 0
  ): PagedResponse<OrderResponse>

  @POST("api/v1/orders")
  suspend fun createOrder(@Body request: CreateOrderRequest): OrderResponse
}

No Call<OrderResponse>. No callbacks. The function suspends while the network request is in flight and resumes with the result.

Use it in a repository:

class OrderRepository(
  private val api: OrderApi
) {

  suspend fun getOrder(id: String): Order {
    return api.getOrder(id).toDomain()
  }

  suspend fun createOrder(request: CreateOrderRequest): Order {
    return api.createOrder(request).toDomain()
  }
}

Straightforward, readable, and cancellation-safe. If the coroutine is cancelled (say, the user navigated away), the HTTP request is cancelled too.

Room + Coroutines

Room has excellent coroutine support. Use suspend for one-shot queries and Flow for observable queries.

import androidx.room.Dao
import androidx.room.Insert
import androidx.room.OnConflictStrategy
import androidx.room.Query
import kotlinx.coroutines.flow.Flow

@Dao
interface OrderDao {

  // One-shot — returns once, used for writes or single reads
  @Insert(onConflict = OnConflictStrategy.REPLACE)
  suspend fun insert(order: OrderEntity)

  @Query("SELECT * FROM orders WHERE id = :id")
  suspend fun getById(id: String): OrderEntity?

  // Observable — emits whenever the orders table changes
  @Query("SELECT * FROM orders WHERE customer_id = :customerId ORDER BY created_at DESC")
  fun observeByCustomerId(customerId: String): Flow<List<OrderEntity>>

  @Query("SELECT COUNT(*) FROM orders WHERE status = :status")
  fun observeCountByStatus(status: String): Flow<Int>
}

Notice that observable queries return Flow but are not suspend. That’s because Flow is already lazy — it starts querying when you collect it. The flow emits a new list every time the underlying data changes. No manual refresh needed.

Combine Room flows with the ViewModel pattern:

class OrderListViewModel(
  private val orderDao: OrderDao
) : ViewModel() {

  private val _customerId = MutableStateFlow("")

  val orders: StateFlow<List<Order>> =
    _customerId
      .flatMapLatest { id ->
        if (id.isBlank()) {
          flowOf(emptyList())
        } else {
          orderDao.observeByCustomerId(id)
            .map { entities -> entities.map { it.toDomain() } }
        }
      }
      .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = emptyList()
      )

  fun setCustomerId(id: String) {
    _customerId.value = id
  }
}

The stateIn operator converts the cold Flow into a hot StateFlow. WhileSubscribed(5000) keeps the upstream active for 5 seconds after the last subscriber disappears — so a quick configuration change (like screen rotation) doesn’t restart the database query.

Testing coroutines

runTest and TestDispatcher

runTest is the standard way to test coroutines. It uses a TestDispatcher that makes delay execute immediately — so your tests don’t actually wait.

import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals

class OrderRepositoryTest {

  @Test
  fun `should fetch order by id`() = runTest {
    val fakeApi = FakeOrderApi()
    fakeApi.addOrder(OrderResponse(id = "order-1", customerId = "cust-1"))

    val repository = OrderRepository(fakeApi)
    val order = repository.getOrder("order-1")

    assertEquals("cust-1", order.customerId)
  }
}

advanceUntilIdle

When testing code that launches coroutines (like a ViewModel), you may need to advance the test dispatcher to let those coroutines complete.

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.setMain
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals

@OptIn(ExperimentalCoroutinesApi::class)
class UserProfileViewModelTest {

  private val testDispatcher = StandardTestDispatcher()

  @BeforeTest
  fun setup() {
    Dispatchers.setMain(testDispatcher)
  }

  @AfterTest
  fun tearDown() {
    Dispatchers.resetMain()
  }

  @Test
  fun `should load user profile`() = runTest(testDispatcher) {
    val fakeRepository = FakeUserRepository()
    fakeRepository.addUser(User(id = "user-1", name = "Alice"))

    val viewModel = UserProfileViewModel(fakeRepository)
    viewModel.loadProfile("user-1")

    advanceUntilIdle() // let viewModelScope.launch complete

    val state = viewModel.uiState.value
    assertEquals(UiState.Success(User(id = "user-1", name = "Alice")), state)
  }
}

Dispatchers.setMain replaces the Main dispatcher (which doesn’t exist in tests) with the test dispatcher. advanceUntilIdle runs all pending coroutines to completion.

Turbine for Flow testing

Testing Flows manually is verbose. Turbine makes it concise.

import app.cash.turbine.test
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals

class OrderFlowTest {

  @Test
  fun `should emit loading then success`() = runTest {
    val viewModel = OrderListViewModel(FakeOrderDao())
    viewModel.setCustomerId("cust-1")

    viewModel.orders.test {
      assertEquals(emptyList(), awaitItem()) // initial value
      assertEquals(listOf(testOrder), awaitItem()) // after query completes
      cancelAndIgnoreRemainingEvents()
    }
  }
}

test opens the flow, awaitItem suspends until the next emission, and cancelAndIgnoreRemainingEvents cleans up. If an unexpected value is emitted or the flow completes early, the test fails with a clear message.

Wrapping up the series

That’s all three parts done. Let’s recap where we’ve been.

Part 1 — The Basics covered what coroutines are, why they exist, suspend functions, dispatchers, and how coroutines compare to threads. The foundation.

Part 2 — Structured Concurrency covered coroutine scopes, launch vs async, parent-child relationships, exception handling with SupervisorJob and CoroutineExceptionHandler, and cancellation. The safety net.

Part 3 — Real-World Patterns (this post) covered Flows, StateFlow, SharedFlow, coroutines in Android and Spring Boot, Retrofit, Room, and testing. The application layer.

Here’s a quick reference for when to use what:

ScenarioUse
Single async resultsuspend fun
Stream of valuesFlow
UI stateStateFlow
One-time eventsSharedFlow
Parallel work, combine resultsasync / awaitAll
Fire-and-forgetlaunch in a proper scope
Debounced user inputFlow + debounce + distinctUntilChanged
Observable database queriesRoom Flow return type
Network callsRetrofit suspend functions
Spring Boot endpointssuspend controller functions

Coroutines are one of those Kotlin features that change how you think about async code. Once they click, callbacks and reactive chains feel unnecessarily complicated for most use cases.

If you want to practice with more Kotlin, the JSON Parsing series is a good next step — it walks through five different libraries for parsing JSON in Kotlin, with hands-on examples for each. And if you’re building backend services, the CQRS series puts coroutines to work in a real distributed architecture with Spring Boot, Kafka, and MongoDB.

Thanks for following along. Now go build something with coroutines.