Kotlin Flows — Cold Flows, Hot Flows & StateFlow Explained
Understand Kotlin Flow from the ground up — cold vs hot flows, Flow builders, operators, StateFlow, SharedFlow, and when to use each in Android and backend projects.
If you’ve worked with coroutines, you already know how to handle one-shot async operations — make a network call, get a result. But what about streams of data? A search bar emitting queries as the user types. A WebSocket pushing real-time prices. A database that notifies you when rows change.
That’s what Kotlin Flow is for. It’s a coroutine-based stream API that handles sequential, asynchronous data emission.
This post covers:
- What a Flow is and how it works
- Cold flows vs hot flows — and why the distinction matters
- Flow builders and operators
- StateFlow and SharedFlow for state management
- Practical examples for Android and backend
What is a Flow?
A Flow is a type that emits multiple values sequentially, as opposed to a suspend function that returns a single value.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay
fun numberFlow(): Flow<Int> = flow {
for (i in 1..3) {
delay(500)
emit(i)
}
}
Nothing happens until you collect it:
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.flow.collect
fun main() = runBlocking {
numberFlow().collect { value ->
println("Received: $value")
}
}
Received: 1
Received: 2
Received: 3
The flow { } builder is a suspend lambda. Inside it, you call emit() to send values downstream. The collector receives them one at a time, in order.
Key point: Flow is cold by default. The code inside flow { } doesn’t run until someone collects it. And each collector gets its own independent execution.
Cold Flows
A cold flow starts producing values only when collected. Each collector triggers a fresh execution.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.launch
fun coldFlow(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(300)
emit(i)
}
}
fun main() = runBlocking {
val myFlow = coldFlow()
// First collector
launch {
myFlow.collect { println("Collector A: $it") }
}
// Second collector
launch {
myFlow.collect { println("Collector B: $it") }
}
}
Flow started
Flow started
Collector A: 1
Collector B: 1
Collector A: 2
Collector B: 2
Collector A: 3
Collector B: 3
"Flow started" prints twice. Each collector gets its own independent execution of the flow body. This is exactly like how calling a function twice runs the function body twice.
Flow builders
There are several ways to create cold flows:
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.asFlow
// From fixed values
val fixedFlow = flowOf(1, 2, 3)
// From a collection
val listFlow = listOf("a", "b", "c").asFlow()
// From a suspend function
val computedFlow = flow {
val result = fetchFromNetwork() // suspend function
emit(result)
}
flowOf and asFlow are convenience builders. The flow { } builder is the most flexible — you can call suspend functions, loop, branch, whatever you need inside it.
Flow Operators
Flows support operators similar to collections — map, filter, take, etc. But they’re lazy. No intermediate collections are created. Values flow through the chain one at a time.
Transforming
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
flowOf(1, 2, 3, 4, 5)
.filter { it % 2 != 0 }
.map { it * it }
.collect { println(it) }
}
1
9
25
Combining flows
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val names = flowOf("Alice", "Bob", "Charlie")
val scores = flowOf(95, 87, 92)
names.zip(scores) { name, score -> "$name: $score" }
.collect { println(it) }
}
Alice: 95
Bob: 87
Charlie: 92
zip pairs elements one-to-one. If one flow is shorter, the result stops when the shorter one ends.
Error handling
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.runBlocking
fun riskyFlow() = flow {
emit(1)
emit(2)
throw RuntimeException("Something broke")
}
fun main() = runBlocking {
riskyFlow()
.catch { e -> println("Caught: ${e.message}") }
.collect { println("Value: $it") }
}
Value: 1
Value: 2
Caught: Something broke
catch intercepts upstream exceptions. It doesn’t catch exceptions in the collect block itself — only what happens before it in the chain.
Common operators at a glance
| Operator | What it does |
|---|---|
map | Transforms each value |
filter | Drops values that don’t match |
take(n) | Takes first n values, then cancels |
zip | Pairs two flows element-by-element |
combine | Emits when either flow emits, using latest from both |
flatMapConcat | Maps each value to a flow, collects them sequentially |
flatMapMerge | Maps each value to a flow, collects them concurrently |
debounce | Waits for a pause before emitting (search input) |
distinctUntilChanged | Drops consecutive duplicates |
catch | Catches upstream exceptions |
onEach | Side effect on each value (logging) |
flowOn | Changes the dispatcher for upstream operations |
Hot Flows — SharedFlow and StateFlow
Cold flows are great for one-shot streams. But sometimes you need a flow that:
- Emits values regardless of whether anyone is collecting
- Shares a single stream across multiple collectors
- Holds the latest value for new subscribers
That’s what hot flows are for.
SharedFlow
A SharedFlow emits values to all collectors simultaneously. Values are broadcast — every collector gets the same values.
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val events = MutableSharedFlow<String>()
// Collector 1
launch {
events.collect { println("Collector A: $it") }
}
// Collector 2
launch {
events.collect { println("Collector B: $it") }
}
delay(100) // give collectors time to subscribe
events.emit("click")
events.emit("scroll")
}
Collector A: click
Collector B: click
Collector A: scroll
Collector B: scroll
Both collectors receive every event. Unlike cold flows, the emission happens once — not per collector.
MutableSharedFlow has a replay parameter. replay = 1 means new collectors immediately get the last emitted value:
import kotlinx.coroutines.flow.MutableSharedFlow
val events = MutableSharedFlow<String>(replay = 1)
StateFlow
StateFlow is a specialized SharedFlow that always holds a value. Think of it as a reactive variable — it has a current state, and collectors get notified when it changes.
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val counter = MutableStateFlow(0)
launch {
counter.collect { println("Counter is now: $it") }
}
delay(100)
counter.value = 1
delay(100)
counter.value = 2
delay(100)
counter.value = 2 // same value — won't emit again
delay(100)
counter.value = 3
}
Counter is now: 0
Counter is now: 1
Counter is now: 2
Counter is now: 3
Notice: setting counter.value = 2 the second time doesn’t trigger the collector. StateFlow is conflated and uses structural equality — it only emits when the new value is different from the current one (via equals()).
StateFlow vs SharedFlow
| Feature | StateFlow | SharedFlow |
|---|---|---|
| Has initial value | Yes (required) | No |
| Holds current value | Yes (.value) | No |
| Replays to new collectors | Always replays latest | Configurable (replay) |
| Conflation | Yes (skips equal values) | No (emits everything) |
| Use case | UI state, current value | Events, one-time actions |
Rule of thumb: Use StateFlow for state (loading indicator, form data, current user). Use SharedFlow for events (navigation, snackbars, errors that should be handled once).
StateFlow in Android ViewModel
This is the most common use of StateFlow — holding UI state in a ViewModel:
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.launch
data class UiState(
val items: List<String> = emptyList(),
val isLoading: Boolean = false,
val error: String? = null
)
class ItemsViewModel(
private val repository: ItemRepository
) : ViewModel() {
private val _uiState = MutableStateFlow(UiState())
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
fun loadItems() {
viewModelScope.launch {
_uiState.value = _uiState.value.copy(isLoading = true)
try {
val items = repository.getItems()
_uiState.value = UiState(items = items)
} catch (e: Exception) {
_uiState.value = UiState(error = e.message)
}
}
}
}
In the Activity or Fragment, collect the state:
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.lifecycleScope
import androidx.lifecycle.repeatOnLifecycle
import kotlinx.coroutines.launch
class ItemsActivity : AppCompatActivity() {
private val viewModel: ItemsViewModel by viewModels()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.uiState.collect { state ->
when {
state.isLoading -> showLoading()
state.error != null -> showError(state.error)
else -> showItems(state.items)
}
}
}
}
}
}
repeatOnLifecycle(STARTED) ensures collection only happens when the Activity is at least STARTED. When the Activity goes to the background, collection stops. When it comes back, collection resumes. This prevents wasted work and memory leaks.
flowOn — Changing Dispatchers
By default, a flow runs in the coroutine context of the collector. To move upstream work to a different dispatcher:
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.runBlocking
fun heavyComputation() = flow {
for (i in 1..5) {
Thread.sleep(100) // simulate CPU work
emit(i)
}
}.flowOn(Dispatchers.Default) // runs on Default dispatcher
fun main() = runBlocking {
heavyComputation()
.map { it * 2 }
.collect { println("$it on ${Thread.currentThread().name}") }
}
flowOn only affects operations upstream of it. The map and collect still run on the collector’s dispatcher. This is different from withContext — it doesn’t change the context for everything, just for what comes before it in the chain.
Practical: Debounced Search
A common real-world pattern — debouncing a search input so you don’t fire a network request on every keystroke:
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.Flow
class SearchViewModel(
private val searchRepository: SearchRepository
) : ViewModel() {
val queryFlow = MutableStateFlow("")
val searchResults: Flow<List<SearchResult>> = queryFlow
.debounce(300) // wait 300ms after last keystroke
.distinctUntilChanged() // skip if query hasn't changed
.filter { it.length >= 2 } // don't search for 1 char
.flatMapLatest { query ->
if (query.isEmpty()) {
flowOf(emptyList())
} else {
searchRepository.search(query)
}
}
}
flatMapLatest cancels the previous search when a new query arrives. If the user types “kot”, then “kotl”, the search for “kot” is cancelled and only “kotl” runs.
Practical: Combining Multiple Data Sources
When your UI depends on data from multiple sources:
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.Flow
data class DashboardState(
val user: User,
val notifications: List<Notification>,
val recentOrders: List<Order>
)
class DashboardViewModel(
userRepository: UserRepository,
notificationRepository: NotificationRepository,
orderRepository: OrderRepository
) : ViewModel() {
val dashboardState: Flow<DashboardState> = combine(
userRepository.currentUser(),
notificationRepository.unreadNotifications(),
orderRepository.recentOrders()
) { user, notifications, orders ->
DashboardState(user, notifications, orders)
}
}
combine emits a new value whenever any of the source flows emits. It always uses the latest value from each source. This is different from zip, which waits for all sources to emit before producing a pair.
When to Use What
| Scenario | Use |
|---|---|
| One-shot API call | Suspend function |
| Stream from database/network | Cold Flow |
| UI state in ViewModel | StateFlow |
| One-time events (navigation, snackbar) | SharedFlow |
| Combining multiple streams | combine / zip |
| Search with debounce | MutableStateFlow + debounce + flatMapLatest |
What’s Next
If you haven’t already, read the coroutines series that leads into this:
- Kotlin Coroutines — The Basics
- Kotlin Coroutines — Structured Concurrency
- Kotlin Coroutines — Real-World Patterns
Flows build directly on coroutines. If launch, async, and dispatchers feel solid, flows are the natural next step for handling streams of data in your Kotlin projects.