PCSalt
YouTube GitHub
Back to Kotlin
Kotlin · 5 min read

Kotlin Flows — Cold Flows, Hot Flows & StateFlow Explained

Understand Kotlin Flow from the ground up — cold vs hot flows, Flow builders, operators, StateFlow, SharedFlow, and when to use each in Android and backend projects.


If you’ve worked with coroutines, you already know how to handle one-shot async operations — make a network call, get a result. But what about streams of data? A search bar emitting queries as the user types. A WebSocket pushing real-time prices. A database that notifies you when rows change.

That’s what Kotlin Flow is for. It’s a coroutine-based stream API that handles sequential, asynchronous data emission.

This post covers:

  • What a Flow is and how it works
  • Cold flows vs hot flows — and why the distinction matters
  • Flow builders and operators
  • StateFlow and SharedFlow for state management
  • Practical examples for Android and backend

What is a Flow?

A Flow is a type that emits multiple values sequentially, as opposed to a suspend function that returns a single value.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay

fun numberFlow(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(500)
        emit(i)
    }
}

Nothing happens until you collect it:

import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.collect

fun main() = runBlocking {
    numberFlow().collect { value ->
        println("Received: $value")
    }
}
Received: 1
Received: 2
Received: 3

The flow { } builder is a suspend lambda. Inside it, you call emit() to send values downstream. The collector receives them one at a time, in order.

Key point: Flow is cold by default. The code inside flow { } doesn’t run until someone collects it. And each collector gets its own independent execution.

Cold Flows

A cold flow starts producing values only when collected. Each collector triggers a fresh execution.

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.launch

fun coldFlow(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(300)
        emit(i)
    }
}

fun main() = runBlocking {
    val myFlow = coldFlow()

    // First collector
    launch {
        myFlow.collect { println("Collector A: $it") }
    }

    // Second collector
    launch {
        myFlow.collect { println("Collector B: $it") }
    }
}
Flow started
Flow started
Collector A: 1
Collector B: 1
Collector A: 2
Collector B: 2
Collector A: 3
Collector B: 3

"Flow started" prints twice. Each collector gets its own independent execution of the flow body. This is exactly like how calling a function twice runs the function body twice.

Flow builders

There are several ways to create cold flows:

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.asFlow

// From fixed values
val fixedFlow = flowOf(1, 2, 3)

// From a collection
val listFlow = listOf("a", "b", "c").asFlow()

// From a suspend function
val computedFlow = flow {
    val result = fetchFromNetwork() // suspend function
    emit(result)
}

flowOf and asFlow are convenience builders. The flow { } builder is the most flexible — you can call suspend functions, loop, branch, whatever you need inside it.

Flow Operators

Flows support operators similar to collections — map, filter, take, etc. But they’re lazy. No intermediate collections are created. Values flow through the chain one at a time.

Transforming

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    flowOf(1, 2, 3, 4, 5)
        .filter { it % 2 != 0 }
        .map { it * it }
        .collect { println(it) }
}
1
9
25

Combining flows

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val names = flowOf("Alice", "Bob", "Charlie")
    val scores = flowOf(95, 87, 92)

    names.zip(scores) { name, score -> "$name: $score" }
        .collect { println(it) }
}
Alice: 95
Bob: 87
Charlie: 92

zip pairs elements one-to-one. If one flow is shorter, the result stops when the shorter one ends.

Error handling

import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.runBlocking

fun riskyFlow() = flow {
    emit(1)
    emit(2)
    throw RuntimeException("Something broke")
}

fun main() = runBlocking {
    riskyFlow()
        .catch { e -> println("Caught: ${e.message}") }
        .collect { println("Value: $it") }
}
Value: 1
Value: 2
Caught: Something broke

catch intercepts upstream exceptions. It doesn’t catch exceptions in the collect block itself — only what happens before it in the chain.

Common operators at a glance

OperatorWhat it does
mapTransforms each value
filterDrops values that don’t match
take(n)Takes first n values, then cancels
zipPairs two flows element-by-element
combineEmits when either flow emits, using latest from both
flatMapConcatMaps each value to a flow, collects them sequentially
flatMapMergeMaps each value to a flow, collects them concurrently
debounceWaits for a pause before emitting (search input)
distinctUntilChangedDrops consecutive duplicates
catchCatches upstream exceptions
onEachSide effect on each value (logging)
flowOnChanges the dispatcher for upstream operations

Hot Flows — SharedFlow and StateFlow

Cold flows are great for one-shot streams. But sometimes you need a flow that:

  • Emits values regardless of whether anyone is collecting
  • Shares a single stream across multiple collectors
  • Holds the latest value for new subscribers

That’s what hot flows are for.

SharedFlow

A SharedFlow emits values to all collectors simultaneously. Values are broadcast — every collector gets the same values.

import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val events = MutableSharedFlow<String>()

    // Collector 1
    launch {
        events.collect { println("Collector A: $it") }
    }

    // Collector 2
    launch {
        events.collect { println("Collector B: $it") }
    }

    delay(100) // give collectors time to subscribe

    events.emit("click")
    events.emit("scroll")
}
Collector A: click
Collector B: click
Collector A: scroll
Collector B: scroll

Both collectors receive every event. Unlike cold flows, the emission happens once — not per collector.

MutableSharedFlow has a replay parameter. replay = 1 means new collectors immediately get the last emitted value:

import kotlinx.coroutines.flow.MutableSharedFlow

val events = MutableSharedFlow<String>(replay = 1)

StateFlow

StateFlow is a specialized SharedFlow that always holds a value. Think of it as a reactive variable — it has a current state, and collectors get notified when it changes.

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main() = runBlocking {
    val counter = MutableStateFlow(0)

    launch {
        counter.collect { println("Counter is now: $it") }
    }

    delay(100)
    counter.value = 1
    delay(100)
    counter.value = 2
    delay(100)
    counter.value = 2 // same value — won't emit again
    delay(100)
    counter.value = 3
}
Counter is now: 0
Counter is now: 1
Counter is now: 2
Counter is now: 3

Notice: setting counter.value = 2 the second time doesn’t trigger the collector. StateFlow is conflated and uses structural equality — it only emits when the new value is different from the current one (via equals()).

StateFlow vs SharedFlow

FeatureStateFlowSharedFlow
Has initial valueYes (required)No
Holds current valueYes (.value)No
Replays to new collectorsAlways replays latestConfigurable (replay)
ConflationYes (skips equal values)No (emits everything)
Use caseUI state, current valueEvents, one-time actions

Rule of thumb: Use StateFlow for state (loading indicator, form data, current user). Use SharedFlow for events (navigation, snackbars, errors that should be handled once).

StateFlow in Android ViewModel

This is the most common use of StateFlow — holding UI state in a ViewModel:

import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch

data class UiState(
    val items: List<String> = emptyList(),
    val isLoading: Boolean = false,
    val error: String? = null
)

class ItemsViewModel(
    private val repository: ItemRepository
) : ViewModel() {

    private val _uiState = MutableStateFlow(UiState())
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()

    fun loadItems() {
        viewModelScope.launch {
            _uiState.value = _uiState.value.copy(isLoading = true)
            try {
                val items = repository.getItems()
                _uiState.value = UiState(items = items)
            } catch (e: Exception) {
                _uiState.value = UiState(error = e.message)
            }
        }
    }
}

In the Activity or Fragment, collect the state:

import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.launch

class ItemsActivity : AppCompatActivity() {

    private val viewModel: ItemsViewModel by viewModels()

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        lifecycleScope.launch {
            repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.uiState.collect { state ->
                    when {
                        state.isLoading -> showLoading()
                        state.error != null -> showError(state.error)
                        else -> showItems(state.items)
                    }
                }
            }
        }
    }
}

repeatOnLifecycle(STARTED) ensures collection only happens when the Activity is at least STARTED. When the Activity goes to the background, collection stops. When it comes back, collection resumes. This prevents wasted work and memory leaks.

flowOn — Changing Dispatchers

By default, a flow runs in the coroutine context of the collector. To move upstream work to a different dispatcher:

import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking

fun heavyComputation() = flow {
    for (i in 1..5) {
        Thread.sleep(100) // simulate CPU work
        emit(i)
    }
}.flowOn(Dispatchers.Default) // runs on Default dispatcher

fun main() = runBlocking {
    heavyComputation()
        .map { it * 2 }
        .collect { println("$it on ${Thread.currentThread().name}") }
}

flowOn only affects operations upstream of it. The map and collect still run on the collector’s dispatcher. This is different from withContext — it doesn’t change the context for everything, just for what comes before it in the chain.

A common real-world pattern — debouncing a search input so you don’t fire a network request on every keystroke:

import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.Flow

class SearchViewModel(
    private val searchRepository: SearchRepository
) : ViewModel() {

    val queryFlow = MutableStateFlow("")

    val searchResults: Flow<List<SearchResult>> = queryFlow
        .debounce(300) // wait 300ms after last keystroke
        .distinctUntilChanged() // skip if query hasn't changed
        .filter { it.length >= 2 } // don't search for 1 char
        .flatMapLatest { query ->
            if (query.isEmpty()) {
                flowOf(emptyList())
            } else {
                searchRepository.search(query)
            }
        }
}

flatMapLatest cancels the previous search when a new query arrives. If the user types “kot”, then “kotl”, the search for “kot” is cancelled and only “kotl” runs.

Practical: Combining Multiple Data Sources

When your UI depends on data from multiple sources:

import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.Flow

data class DashboardState(
    val user: User,
    val notifications: List<Notification>,
    val recentOrders: List<Order>
)

class DashboardViewModel(
    userRepository: UserRepository,
    notificationRepository: NotificationRepository,
    orderRepository: OrderRepository
) : ViewModel() {

    val dashboardState: Flow<DashboardState> = combine(
        userRepository.currentUser(),
        notificationRepository.unreadNotifications(),
        orderRepository.recentOrders()
    ) { user, notifications, orders ->
        DashboardState(user, notifications, orders)
    }
}

combine emits a new value whenever any of the source flows emits. It always uses the latest value from each source. This is different from zip, which waits for all sources to emit before producing a pair.

When to Use What

ScenarioUse
One-shot API callSuspend function
Stream from database/networkCold Flow
UI state in ViewModelStateFlow
One-time events (navigation, snackbar)SharedFlow
Combining multiple streamscombine / zip
Search with debounceMutableStateFlow + debounce + flatMapLatest

What’s Next

If you haven’t already, read the coroutines series that leads into this:

  1. Kotlin Coroutines — The Basics
  2. Kotlin Coroutines — Structured Concurrency
  3. Kotlin Coroutines — Real-World Patterns

Flows build directly on coroutines. If launch, async, and dispatchers feel solid, flows are the natural next step for handling streams of data in your Kotlin projects.