Demystifying Kotlin’s Channel Flows

Why does Kotlin have two almost identical flow builders, and which one should you be using?

Sam Cooper
Better Programming

--

Photo by Dan Roizer on Unsplash

What’s the difference between flow and channelFlow in Kotlin?

Both of them are “flow builders” that accept a value-producing lambda and return a Flow of values. On the face of it, they look almost exactly the same, as if they’re just providing the same functions with different names.

So are channelFlow and send basically just different names for flow and emit? Well, no. Although the channelFlow builder can express the same things as a flow builder, the more sophisticated “channel flow” has some extra tricks up its sleeve.

For one thing, it introduces a buffer, so that the flow producer can prepare and queue up several items at once before handing them all off to the consumer. But that’s not its main purpose. Channel flows are really designed for concurrent decomposition. The channel flow builder offers a way to break down a flow into multiple different coroutines that each produce their own values. It’s a lot like using a coroutineScope to launch child coroutines inside a suspend function.

You can’t do this with a regular flow!

But why can’t you? Why can’t a regular flow call launch to start a coroutine, and why does swapping the words “flow” and “emit” for “channelFlow” and “send” suddenly make it possible?

In order to understand how it works, let’s start by breaking down what makes a flow a flow. After that, we’ll look at the simple ingredients we have to add on top to make it into a channel flow and enable concurrent decomposition.

As I’ve mentioned before, a flow in Kotlin is simply an asynchronous generator. Each time the consumer asks for a value from a flow, the flow will un-suspend its lambda code and run it to produce a single item.

Here’s a reminder showing how flows enable control to pass back and forth between a producer and a consumer.

When you run the example code, notice how the outputs from the caller and the generator are interleaved. First we get a line of output from the numbers generator, then a line from the collect function, then another line from the generator, and finally another line from the collect function. That’s because the generator actually suspends its execution between each emission, handing control back to the caller.

In a lot of ways, a flow is pretty similar to a suspend function. It’s a control structure containing a reusable chunk of asynchronous computation that can be executed by a coroutine. And because a flow is always executed by a coroutine, it’s allowed to call other suspending functions. That’s why the flow in the example can use the delay function to pause for a few milliseconds before emitting its first value.

The difference between a flow and a suspend function is just the number of values they produce. A function performs one computation and then returns a single value, but a flow can produce several values, performing more computation between each one.

It’s a key characteristic of Kotlin coroutines that code running in a single coroutine is sequential by default. When you call a suspend function, it might pause and resume its execution, but it still only does one thing at once. The order of its operations is rigidly defined by the order of the actual lines of code.

On its own, calling a suspend function doesn’t add concurrency. If you want to change that, you have to do so explicitly by launching additional coroutines. Only when there are multiple coroutines running can those coroutines start to execute concurrently, allowing the system to advance them in parallel and in whatever order it chooses.

Flows are the same. By itself, collecting a flow doesn’t start any new coroutines, and it doesn’t give the system the ability to parallelise or reorder any operations. When collecting a basic flow, the program follows a single code path in and out of the generator, executing the lines of code in a predictable order. At any given point in time, the program is either executing inside the flow or inside the collector, but never both. Each time the flow suspends by calling emit, control always passes back immediately to the caller.

Concurrent decomposition

To introduce concurrency to a simple suspend function in Kotlin, we use the coroutineScope function. Inside the coroutine scope, we can start new coroutines with launch or async.

In this example, the two async operations can wait out their one-second delays concurrently, so the total execution time is still closer to one second than two.

Launching coroutines in a coroutine scope like this one is a foundational tool of structured concurrency in Kotlin. It’s a powerful form of encapsulation that lets us define self-contained units of concurrent computation that follow clear, predictable rules. That means that we can call a suspend function and not have to worry whether it does its work concurrently or not. The coroutineScope packages up the concurrency and ensures that by the time the suspend function returns, any coroutines that were launched by the function have completed safely.

Photo by Andrew Karn on Unsplash

Just like a suspend function, a flow can’t launch new coroutines directly. That’s because the new coroutines would violate structured concurrency, allowing them to detach from and even outlive the coroutine that’s collecting the flow.

So, just like in a suspend function, we need to use the coroutineScope builder to create an encapsulated scope to decompose a flow into concurrent subtasks in a controlled way. That way, the lifecycle of the coroutines is completely tied to the lifecycle of the flow itself. Errors from the coroutines propagate to the flow, and callers are guaranteed that when the flow terminates, any coroutines it created are also terminated.

There is a limitation to this approach, though: you can’t emit a value from inside a coroutine. Trying to call emit from a coroutine that was launched inside a flow will fail with an error when you try to run it.

Why this restriction? The error message gives a clue, when it tells us that “FlowCollector is not thread-safe and concurrent emissions are prohibited.” Recall that in a flow, calling emit passes control directly back to the flow’s collector. If you could call emit from a child coroutine, you could end up with multiple calls to emit happening concurrently, in separate coroutines! From the point of view of the flow collector, it’d be like serving a tennis ball and finding two balls flying back towards you.

The flow collector is designed as a single control flow that suspends and resumes as each new item becomes available. It would be an error to “resume” the flow collector while it’s already running, and so concurrent emissions from the flow are strictly forbidden. Concurrency introduced inside the flow must always remain confined to the flow producer, and can never leak out to the consumer.

Introducing Channels

So what’s the solution? What do we do if we want to launch multiple coroutines and have them each compute one or more of the values for the flow?

The error message from the example above actually does have a suggestion. “To mitigate this restriction please use channelFlow builder instead of flow.” Okay, but… why? The channelFlow still just returns a Flow, so what magic does it have that lets it circumvent this apparently inherent limitation of flows?

Unsurprisingly, the clue is in the name. Under the hood, a channel flow actually uses a Channel to pass the values from the background coroutines back to the main control flow. When a background coroutine wants to emit a value, it sends the value to the channel. In the foreground, the flow’s main control flow receives those values from the channel one by one and passes them to the emit function.

These foreground flow emissions are happening in the same single control flow that is collecting the flow, so they don’t violate the flow’s constraints and are guaranteed never to happen concurrently. Meanwhile, the background coroutines can execute concurrently with both the flow collector and the flow producer, since they never attempt to interact with the flow collector directly.

Simply extracting the two launch blocks to a lambda function is enough to show the basics of how the standard library’s channel flow builder might work.

I did gloss over a few details in this example, though. For instance, my code doesn’t bother closing the channel when the coroutines are complete. Depending on how it’s used, that could mean the flow never terminates, which is going to be a problem in real-world applications! Once you start to fill in those gaps, the boilerplate code gets a little more complicated — so in practice, it’s almost certainly going to be better to use the built-in channelFlow builder, with all its optimizations, than trying to come up with your own correct implementation.

I don’t recommend actually using this simplified example code, but I hope it demystifies the mechanism of concurrent decomposition in flows and makes it clear that there’s no real magic here.

Does that mean you should always just use channelFlow instead of flow? You certainly could, but it does have some implications. Recall that a channel flow introduces a buffer, allowing the system to vary the order of operations as control passes between producer and consumer. It also launches an extra job to run the value-producing code, even if you don’t end up launching any child coroutines of your own.

If you don’t need to emit values from child coroutines inside your flow, that extra baggage is just adding weight. Flows on their own are very simple: they don’t introduce concurrency, and they don’t start coroutines. Using a channel flow when you just need a flow is a bit like wearing sunglasses indoors: it can look cool, but it’s ultimately unnecessary and a little confusing.

Photo by EJ Strat on Unsplash

Recap

Flows are just asynchronous generators that run some suspending code when you collect them. They don’t introduce new coroutines or concurrency on their own. If you want to do that, you need to introduce a coroutineScope, just like you would in a suspend function, so that the coroutines remain properly owned and contained inside the flow. A channelFlow is just a pre-packaged way to introduce that coroutineScope, along with a helpful Channel that lets those new child coroutines send values to be emitted.

So, if you want to use multiple coroutines inside a flow, use a channelFlow. It’s pre-built and convenient, and easier than trying to figure out how to build it from scratch. But if you just need to emit values one by one, without any concurrency, stick to a regular flow. It’s more lightweight, it’s less confusing, and it’s probably easier to test, too.

--

--