Kotlin Coroutines — Part 3: Real-World Patterns
Flows, StateFlow, coroutines in Android and Spring Boot, Retrofit, Room, and testing — practical patterns you'll use every day.
This is Part 3 — the final post in the Kotlin Coroutines series.
- The Basics
- Structured Concurrency
- Real-World Patterns (this post)
Parts 1 and 2 covered the foundations — suspend functions, dispatchers, structured concurrency, exception handling, and cancellation. This post is about applying all of that. We’ll look at the patterns you’ll actually use in Android apps, Spring Boot services, and everywhere in between.
Flow basics — cold streams
A suspend function returns a single value. But what if you need a stream of values over time? That’s what Flow is for.
A Flow is a cold stream. It doesn’t produce values until someone collects it. Think of it like a Kotlin Sequence, but asynchronous.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay
fun countDown(from: Int): Flow<Int> = flow {
for (i in from downTo 0) {
delay(1000)
emit(i)
}
}
Nothing happens until you collect:
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
countDown(5).collect { value ->
println(value) // prints 5, 4, 3, 2, 1, 0 — one per second
}
}
Flow vs Sequence
Sequences are synchronous. They block the thread while producing values. Flows are asynchronous — they suspend instead of blocking.
// Sequence — blocks the thread during each delay
fun sequenceCountDown(from: Int): Sequence<Int> = sequence {
for (i in from downTo 0) {
Thread.sleep(1000) // can't use delay() here
yield(i)
}
}
// Flow — suspends, doesn't block
fun flowCountDown(from: Int): Flow<Int> = flow {
for (i in from downTo 0) {
delay(1000) // suspend, not block
emit(i)
}
}
Use Sequence when the data is already in memory and you want lazy evaluation. Use Flow when the data comes from async sources — network, database, sensors, user input.
Flow operators
Flows support operators that look similar to collection operations but work on streams.
Transforming and filtering
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.onEach
fun evenSquares(): Flow<Int> =
countDown(10)
.filter { it % 2 == 0 }
.map { it * it }
.onEach { println("Processing: $it") }
Error handling with catch
The catch operator handles upstream exceptions. It doesn’t catch errors in collect.
import kotlinx.coroutines.flow.catch
fun riskyFlow(): Flow<String> = flow {
emit("first")
emit("second")
throw RuntimeException("something broke")
emit("third") // never reached
}
suspend fun main() {
riskyFlow()
.catch { e -> emit("fallback: ${e.message}") }
.collect { println(it) }
// prints: first, second, fallback: something broke
}
Search-as-you-type with debounce and distinctUntilChanged
This is where Flow operators really shine. Imagine a search box where you want to query an API as the user types, but not on every keystroke.
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
class SearchViewModel(
private val repository: SearchRepository
) : ViewModel() {
private val queryFlow = MutableStateFlow("")
val searchResults: Flow<List<SearchResult>> =
queryFlow
.debounce(300) // wait 300ms after the user stops typing
.distinctUntilChanged() // skip if the query hasn't changed
.flatMapLatest { query ->
if (query.isBlank()) {
flowOf(emptyList())
} else {
repository.search(query) // returns Flow<List<SearchResult>>
}
}
fun onQueryChanged(query: String) {
queryFlow.value = query
}
}
Without debounce, you’d fire a network request on every keystroke. Without distinctUntilChanged, you’d re-query when the user types a character and immediately deletes it. Without flatMapLatest, a slow first query could overwrite a fast second query’s results.
Three operators. Problem solved.
StateFlow and SharedFlow — hot streams
Flows are cold — they restart from scratch for each collector. Sometimes you need a hot stream that has a current value and shares it with multiple collectors. That’s StateFlow.
StateFlow
StateFlow always has a value. It’s like a reactive variable.
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
class CounterViewModel : ViewModel() {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() {
_count.value++
}
fun decrement() {
_count.value--
}
}
SharedFlow
SharedFlow is like StateFlow but without an initial value and with configurable replay. Use it for one-time events — navigation, snackbars, toasts.
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
class OrderViewModel : ViewModel() {
private val _events = MutableSharedFlow<UiEvent>()
val events: SharedFlow<UiEvent> = _events.asSharedFlow()
fun placeOrder(order: Order) {
viewModelScope.launch {
try {
repository.placeOrder(order)
_events.emit(UiEvent.ShowSnackbar("Order placed"))
_events.emit(UiEvent.NavigateTo(Screen.OrderConfirmation))
} catch (e: Exception) {
_events.emit(UiEvent.ShowSnackbar("Failed: ${e.message}"))
}
}
}
sealed interface UiEvent {
data class ShowSnackbar(val message: String) : UiEvent
data class NavigateTo(val screen: Screen) : UiEvent
}
}
StateFlow vs LiveData
If you’re coming from LiveData, StateFlow is the coroutine-native replacement.
| StateFlow | LiveData | |
|---|---|---|
| Requires initial value | Yes | No |
| Lifecycle-aware | No (need repeatOnLifecycle) | Yes |
| Kotlin-first | Yes | Designed for Java too |
| Operators | Full Flow operator support | Limited transformations |
| Testing | Standard coroutine testing | Needs InstantTaskExecutorRule |
Use StateFlow for new code. It integrates naturally with coroutines and gives you the full power of Flow operators.
Coroutines in Android
viewModelScope
Every ViewModel gets viewModelScope out of the box. It’s tied to the ViewModel’s lifecycle — when the ViewModel is cleared, all coroutines launched in this scope are cancelled automatically.
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.launch
class UserProfileViewModel(
private val userRepository: UserRepository
) : ViewModel() {
private val _uiState = MutableStateFlow<UiState>(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
fun loadProfile(userId: String) {
viewModelScope.launch {
_uiState.value = UiState.Loading
try {
val user = userRepository.getUser(userId)
_uiState.value = UiState.Success(user)
} catch (e: Exception) {
_uiState.value = UiState.Error(e.message ?: "Unknown error")
}
}
}
sealed interface UiState {
data object Loading : UiState
data class Success(val user: User) : UiState
data class Error(val message: String) : UiState
}
}
User navigates away, ViewModel is cleared, coroutine is cancelled. No leaked network calls. No callbacks firing after the screen is gone.
lifecycleScope
For Activities and Fragments, there’s lifecycleScope. Same idea — coroutines are cancelled when the lifecycle is destroyed.
import androidx.lifecycle.lifecycleScope
import kotlinx.coroutines.launch
class UserProfileActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
val config = configRepository.fetchRemoteConfig()
applyConfig(config)
}
}
}
repeatOnLifecycle — collecting flows safely
Here’s a common mistake. You collect a Flow in lifecycleScope, but the collection keeps running when the app is in the background. The UI isn’t visible, but you’re still processing updates.
The fix is repeatOnLifecycle. It starts collection when the lifecycle reaches a given state and cancels it when it drops below.
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.launch
class UserProfileActivity : AppCompatActivity() {
private val viewModel: UserProfileViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
when (state) {
is UiState.Loading -> showLoading()
is UiState.Success -> showProfile(state.user)
is UiState.Error -> showError(state.message)
}
}
}
}
}
}
When the activity goes to the background (STOPPED), collection is cancelled. When it comes back to the foreground (STARTED), collection restarts. StateFlow replays the latest value, so the UI immediately gets the current state.
The pattern: ViewModel exposes StateFlow, UI collects
This is the pattern you’ll use in every screen:
- ViewModel holds a
MutableStateFlow<UiState>privately. - ViewModel exposes it as
StateFlow<UiState>(read-only). - ViewModel updates the state in
viewModelScope.launch. - Activity/Fragment collects with
repeatOnLifecycle.
Clean separation. Testable ViewModel. Lifecycle-safe UI.
Coroutines in Spring Boot
Coroutines aren’t just for Android. Spring Boot with WebFlux has first-class support for suspend functions.
Suspend fun in controllers
With Spring WebFlux, your controller functions can be suspend. Spring handles the coroutine context for you.
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/api/v1/orders")
class OrderController(
private val orderService: OrderService
) {
@GetMapping("/{id}")
suspend fun getOrder(@PathVariable id: String): OrderResponse {
return orderService.findById(id)
?: throw OrderNotFoundException(id)
}
@PostMapping
suspend fun createOrder(@RequestBody request: CreateOrderRequest): OrderResponse {
return orderService.create(request)
}
}
No Mono<OrderResponse>. No Flux. Just regular Kotlin functions that happen to be suspend. Spring WebFlux converts them to reactive types under the hood.
If you’re building something like the order management system from the CQRS series, suspend controllers are a natural fit for the command side.
Coroutines with Spring Data
Spring Data’s ReactiveCrudRepository works with coroutines through CoroutineCrudRepository:
import org.springframework.data.repository.kotlin.CoroutineCrudRepository
import kotlinx.coroutines.flow.Flow
interface OrderRepository : CoroutineCrudRepository<Order, String> {
suspend fun findByCustomerId(customerId: String): List<Order>
fun findByStatus(status: OrderStatus): Flow<Order>
}
Single results use suspend. Multiple results return Flow. No reactive wrappers needed.
runBlocking in tests
In tests, you need to bridge between the synchronous test framework and suspend functions. That’s what runBlocking is for — though as we’ll see later, runTest is better for most cases.
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
@SpringBootTest
class OrderServiceTest {
@Autowired
private lateinit var orderService: OrderService
@Test
fun `should create order`() = runBlocking {
val request = CreateOrderRequest(
customerId = "cust-1",
items = listOf(OrderItem("product-1", 2))
)
val order = orderService.create(request)
assert(order.customerId == "cust-1")
assert(order.items.size == 1)
}
}
Retrofit + Coroutines
Remember callback hell? Nested enqueue calls, error handling scattered everywhere? Retrofit with coroutines makes that a thing of the past.
Just declare your API methods as suspend:
import retrofit2.http.Body
import retrofit2.http.GET
import retrofit2.http.POST
import retrofit2.http.Path
import retrofit2.http.Query
interface OrderApi {
@GET("api/v1/orders/{id}")
suspend fun getOrder(@Path("id") id: String): OrderResponse
@GET("api/v1/orders")
suspend fun listOrders(
@Query("customerId") customerId: String,
@Query("page") page: Int = 0
): PagedResponse<OrderResponse>
@POST("api/v1/orders")
suspend fun createOrder(@Body request: CreateOrderRequest): OrderResponse
}
No Call<OrderResponse>. No callbacks. The function suspends while the network request is in flight and resumes with the result.
Use it in a repository:
class OrderRepository(
private val api: OrderApi
) {
suspend fun getOrder(id: String): Order {
return api.getOrder(id).toDomain()
}
suspend fun createOrder(request: CreateOrderRequest): Order {
return api.createOrder(request).toDomain()
}
}
Straightforward, readable, and cancellation-safe. If the coroutine is cancelled (say, the user navigated away), the HTTP request is cancelled too.
Room + Coroutines
Room has excellent coroutine support. Use suspend for one-shot queries and Flow for observable queries.
import androidx.room.Dao
import androidx.room.Insert
import androidx.room.OnConflictStrategy
import androidx.room.Query
import kotlinx.coroutines.flow.Flow
@Dao
interface OrderDao {
// One-shot — returns once, used for writes or single reads
@Insert(onConflict = OnConflictStrategy.REPLACE)
suspend fun insert(order: OrderEntity)
@Query("SELECT * FROM orders WHERE id = :id")
suspend fun getById(id: String): OrderEntity?
// Observable — emits whenever the orders table changes
@Query("SELECT * FROM orders WHERE customer_id = :customerId ORDER BY created_at DESC")
fun observeByCustomerId(customerId: String): Flow<List<OrderEntity>>
@Query("SELECT COUNT(*) FROM orders WHERE status = :status")
fun observeCountByStatus(status: String): Flow<Int>
}
Notice that observable queries return Flow but are not suspend. That’s because Flow is already lazy — it starts querying when you collect it. The flow emits a new list every time the underlying data changes. No manual refresh needed.
Combine Room flows with the ViewModel pattern:
class OrderListViewModel(
private val orderDao: OrderDao
) : ViewModel() {
private val _customerId = MutableStateFlow("")
val orders: StateFlow<List<Order>> =
_customerId
.flatMapLatest { id ->
if (id.isBlank()) {
flowOf(emptyList())
} else {
orderDao.observeByCustomerId(id)
.map { entities -> entities.map { it.toDomain() } }
}
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
fun setCustomerId(id: String) {
_customerId.value = id
}
}
The stateIn operator converts the cold Flow into a hot StateFlow. WhileSubscribed(5000) keeps the upstream active for 5 seconds after the last subscriber disappears — so a quick configuration change (like screen rotation) doesn’t restart the database query.
Testing coroutines
runTest and TestDispatcher
runTest is the standard way to test coroutines. It uses a TestDispatcher that makes delay execute immediately — so your tests don’t actually wait.
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class OrderRepositoryTest {
@Test
fun `should fetch order by id`() = runTest {
val fakeApi = FakeOrderApi()
fakeApi.addOrder(OrderResponse(id = "order-1", customerId = "cust-1"))
val repository = OrderRepository(fakeApi)
val order = repository.getOrder("order-1")
assertEquals("cust-1", order.customerId)
}
}
advanceUntilIdle
When testing code that launches coroutines (like a ViewModel), you may need to advance the test dispatcher to let those coroutines complete.
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.resetMain
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.setMain
import kotlin.test.AfterTest
import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
@OptIn(ExperimentalCoroutinesApi::class)
class UserProfileViewModelTest {
private val testDispatcher = StandardTestDispatcher()
@BeforeTest
fun setup() {
Dispatchers.setMain(testDispatcher)
}
@AfterTest
fun tearDown() {
Dispatchers.resetMain()
}
@Test
fun `should load user profile`() = runTest(testDispatcher) {
val fakeRepository = FakeUserRepository()
fakeRepository.addUser(User(id = "user-1", name = "Alice"))
val viewModel = UserProfileViewModel(fakeRepository)
viewModel.loadProfile("user-1")
advanceUntilIdle() // let viewModelScope.launch complete
val state = viewModel.uiState.value
assertEquals(UiState.Success(User(id = "user-1", name = "Alice")), state)
}
}
Dispatchers.setMain replaces the Main dispatcher (which doesn’t exist in tests) with the test dispatcher. advanceUntilIdle runs all pending coroutines to completion.
Turbine for Flow testing
Testing Flows manually is verbose. Turbine makes it concise.
import app.cash.turbine.test
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class OrderFlowTest {
@Test
fun `should emit loading then success`() = runTest {
val viewModel = OrderListViewModel(FakeOrderDao())
viewModel.setCustomerId("cust-1")
viewModel.orders.test {
assertEquals(emptyList(), awaitItem()) // initial value
assertEquals(listOf(testOrder), awaitItem()) // after query completes
cancelAndIgnoreRemainingEvents()
}
}
}
test opens the flow, awaitItem suspends until the next emission, and cancelAndIgnoreRemainingEvents cleans up. If an unexpected value is emitted or the flow completes early, the test fails with a clear message.
Wrapping up the series
That’s all three parts done. Let’s recap where we’ve been.
Part 1 — The Basics covered what coroutines are, why they exist, suspend functions, dispatchers, and how coroutines compare to threads. The foundation.
Part 2 — Structured Concurrency covered coroutine scopes, launch vs async, parent-child relationships, exception handling with SupervisorJob and CoroutineExceptionHandler, and cancellation. The safety net.
Part 3 — Real-World Patterns (this post) covered Flows, StateFlow, SharedFlow, coroutines in Android and Spring Boot, Retrofit, Room, and testing. The application layer.
Here’s a quick reference for when to use what:
| Scenario | Use |
|---|---|
| Single async result | suspend fun |
| Stream of values | Flow |
| UI state | StateFlow |
| One-time events | SharedFlow |
| Parallel work, combine results | async / awaitAll |
| Fire-and-forget | launch in a proper scope |
| Debounced user input | Flow + debounce + distinctUntilChanged |
| Observable database queries | Room Flow return type |
| Network calls | Retrofit suspend functions |
| Spring Boot endpoints | suspend controller functions |
Coroutines are one of those Kotlin features that change how you think about async code. Once they click, callbacks and reactive chains feel unnecessarily complicated for most use cases.
If you want to practice with more Kotlin, the JSON Parsing series is a good next step — it walks through five different libraries for parsing JSON in Kotlin, with hands-on examples for each. And if you’re building backend services, the CQRS series puts coroutines to work in a real distributed architecture with Spring Boot, Kafka, and MongoDB.
Thanks for following along. Now go build something with coroutines.