Kotlin Coroutines and Flows: tips and tricks

Mauricio Andrada
6 min readJul 14, 2024

--

Introduction

In this article I will describe some interesting ways — in my opinion —for using coroutines and flows and how to avoid some of the pitfalls and mistakes which can lead to your app behaving incorrectly.

As usual I will use Android as the platform for the code below, but they are applicable to any application using these features.

All the examples below were tested and run inside the onCreate() method from an Activity. You can use string System.out to show the output strings (it’s easier to see what’s happeing in the logs).

Without further ado let’s dig into it.

Use flows to handle exceptions and retries

If you have a function that can potentially throw exceptions, and you want to be able to retry in case some specific exception happens, do this:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val coroutine = CoroutineScope(SupervisorJob())

generateExceptionInAFlow()
.retry(3) {
println("retrying...")
it is IOException
}
.catch {
println("caght the exception $it")
}
.onEach {
println("show $it if it gets here (not supposed to")
}.launchIn(coroutine)
}

private fun generateExceptionInAFlow() = flow{
throw IOException()
// your real code will emit some data if there was no exception
emit("something")
}

It’s elegant, no cumbersome try/catch blocks to handle the errors, it’s easy to read and understand, and has very little memory or CPU overhead, if any.

Use flows and filters instead of if/else blocks

If you have a data source that streams data “forever” — e.g. a messaging service — and you have conditional blocks of code to execute depending on different independent conditions for the data values, do this:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val coroutine = CoroutineScope(SupervisorJob())

//this handles even numbers
generateNumbersInACycle()
.filter {
it % 2 == 0
}
.onEach {
println("Caught an even number = $it")
}
.launchIn(coroutine)

//this handles odd numbers
generateNumbersInACycle()
.filter {
it % 2 != 0
}
.onEach {
println("Caught an odd number = $it")
}
.launchIn(coroutine)
}

private fun generateNumbersInACycle() = flow {
var couter = 0;
while(true) {
emit(couter++)
delay(2000)
}
}

Each condition is handled independently, so it’s easier to maintain the code and collaborate; code is executed concurrently, so there’s potential for some performance gain.

The caveat is that both collectors receive all data, but even that may be an advantage if the filters have to be changed.

It also may become a bit repetitive if the filters replace nested if/else conditions.

Be careful when calling coroutineScope and supervisorScope inside a coroutine

These functions, although very useful for creating and launching coroutines, are suspending functions, so they will suspend the coroutine execution until they are completed; depending on code block being executed (e.g. collecting from a flow) they may never complete!!

Try this example with a hot flow that never completes to confirm that:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val emptyFlow = MutableStateFlow(Unit)

val coroutine = CoroutineScope(SupervisorJob())

coroutine.launch {
supervisorScope {
emptyFlow.collect {
println("will never complete")
}
}

println("print if it gets here (it doesn't)")
}
}

If that happens, other code in the parent coroutine may never get executed. To avoid this pitfall do this:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val emptyFlow = MutableStateFlow(Unit)

val coroutine = CoroutineScope(SupervisorJob())

coroutine.launch {
launch { <= put the supervisor scope in a child coroutine
supervisorScope {
emptyFlow.collect {
println("will never complete")
}
}
}

println("now it gets here while the flow is running in a child coroutine")
}
}

Remember that a parent coroutine only completes after all its children do

Every launch() called inside a coroutine is a child coroutine; the parent coroutine will not complete until all children are.

You can confirm that by running the code below:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val emptyFlow = MutableStateFlow(Unit)

val coroutine = CoroutineScope(SupervisorJob())

coroutine.launch {
launch {
supervisorScope {
emptyFlow.collect {
println("will never complete")
}
}
}.invokeOnCompletion {
println("this is never called, this coroutine doe snot complete")
}

launch {
println("now it gets here while the flow is running in a child coroutine")
}.invokeOnCompletion {
println("the println coroutine completed")
}

}.invokeOnCompletion {
println("this is never printed, the parent coroutine doe snot complete")
}
}

This is important if you have code that depends on the parent coroutine to complete for it to be executed.

Use callbackFlow when dealing with listeners/callbacks

I recently learned about callbackFlows and I say this is an useful and elegant tool.

Android has quite a bit of functionality that requires the application to implement some listener or callback interface; right off top of my head, binding to a service requires providing an implementation of ServiceConnection.

The biggest advantage of using callbackFlow is that data is generated by the callback independently in its own coroutine — from handling it, so it removes some concerns with blocking the main thread or potential ANRs due to long execution times.

Here is an implementation for ServiceConnection, when connected the service sends a Messenger back to the Activity, which then sends a string back to the service:

package com.example.coroutineexample

import android.app.Service
import android.content.ComponentName
import android.content.Intent
import android.content.ServiceConnection
import android.os.Bundle
import android.os.Handler
import android.os.IBinder
import android.os.Looper
import android.os.Message
import android.os.Messenger
import androidx.activity.ComponentActivity
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach

class MainActivity : ComponentActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val coroutineScope = CoroutineScope(Dispatchers.Default)

val serviceIntent = Intent().apply {
setClass(applicationContext, TestCallbackFlowService::class.java)
}

serviceConnection(serviceIntent)
.onEach {
it?.send(Message().apply {
obj = "got the message connection"
})
}
.launchIn(coroutineScope) <= this coroutine handles the data
}

private fun serviceConnection(serviceIntent: Intent) = callbackFlow {
val serviceConnection = object : ServiceConnection {
override fun onServiceConnected(name: ComponentName?, service: IBinder?) {
trySend(Messenger(service)) <= sends the messenger in
a separate coroutine
}

override fun onServiceDisconnected(name: ComponentName?) {}
}

bindService(serviceIntent, serviceConnection, Service.BIND_AUTO_CREATE)

awaitClose() // <= keeps the channel open for further communication
}
}

class TestCallbackFlowService : Service() {

private val serviceHandler = Handler(Looper.getMainLooper()) {
println(it.obj)
true
}

override fun onBind(intent: Intent?): IBinder? {
return Messenger(serviceHandler).binder
}
}

A cancelled coroutine scope cannot be re-used to launch new coroutines

The Kotlin documentation is not very explicit about this, but once cancel() is called on a coroutine scope, that scope can no longer be used to launch new coroutines and a new scope is required.

You can test this with the code below:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val emptyFlow = MutableStateFlow(Unit)

val coroutine = CoroutineScope(SupervisorJob())

coroutine.launch {
println("This is only called once")
delay(10000)
}.invokeOnCompletion {
println("Called when the coroutine is cancelled")
}

coroutine.cancel()

coroutine.launch {
println("This is never called")
delay(10000)
}.invokeOnCompletion {
println("Called immediately after launch")
}
}

Coroutine scopes created inside another coroutine are not cancelled when the surrounding coroutine scope is cancelled

This one is just something to be careful about, since it may lead to memory leaks if the coroutines don’t complete.

Here is an example you can use to verify this:

    override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)

val coroutine = CoroutineScope(SupervisorJob())

val sharedFlow = MutableSharedFlow<Unit>()

coroutine.launch {
launch {
sharedFlow.collect()
}.invokeOnCompletion {
println("cancelled because the parent coroutine was cancelled")
}

CoroutineScope(SupervisorJob()).launch {
launch {
sharedFlow.collect()
}.invokeOnCompletion {
println("not a child coroutine, will not be called")
}
}
}.invokeOnCompletion {
println("the parent coroutine was cancelled = $it")
}

coroutine.cancel()
}
}

Conclusion

In my opinion, using concepts like coroutines and flows requires a far from trivial shift in the way one thinks how an application is structured, in particular for those that worked with object oriented for a significant part of their careers.

However, I also think that, when combined with Kotlin’s conciseness, coroutines and flows allow for applications that are faster to code, easier to debug and less prone to errors.

So I hope this small collection of tips will be useful in your day-to-day work.

--

--

Mauricio Andrada

20+ years of experience with software development in the Telecommunications industry; currently DMTS at Verizon working on applications for 5G, AI and MEC