Suspend Mediator — A Simple Way To Communicate Between Coroutines in Kotlin

Learn how to make simple WebSocket data sources with suspend functions

José Braz
Better Programming

--

Photo by Zach Lucero on Unsplash

In this story, we will learn how to communicate different coroutines and use them to implement a remote data source with functions that look like a local function. These functions are a request returning response using WebSocket under the cloth.

Motivation

When I was writing a data source that internally uses a WebSocket connection, I noticed the send/receive socket pattern increases the software complexity and difficult maintainability. Send a request, and receiving the response in another part of the code forces the programmer to declare temporary variables to use in the response code.

Imagine if it was possible to send the request and suspend it until the response has been received and then resume it. Like the code below, the data source provides a nice suspend login function that returns the result of the login (full code at the end):

The SuspendMediator solves the problem

Well, with SuspendMediator, it is possible! SuspendMediator is my implementation with only two methods: suspend and resume.

Note that the two functions receive a key to identify the request and the response exclusively. Suppose that request has a message identifier sent back on the response. This identifier can be the key to this message. The more “Unique” the key is, the more parallel requests can be because duplicate keys need to await in the request queue. This restriction is necessary for the SuspendMediator to know who to match uniquely the request sent with the response received.

Implementation

The SuspendMediator can have several implementations, with a queue for requests or not, with request/queue timeouts, and so on.

The queue of requests is used to cache requests with the same key or avoid parallel requests:

  • NO QUEUE: Executions with different keys can be in parallel, but when the SuspendMediator detects a duplicate key, throws an error on suspend method.
  • ENQUEUE SAME KEY: Executions with different keys can be in parallel and enqueue duplicates key.
  • ENSURE ORDER: Avoid executions in parallel. It sends them one by one, awaiting the response in order.

We can have two types of timeout:

  • Queue Timeout: The timeout is used when we have duplicate keys, and it is necessary to await in a queue to do the request.
  • Request Timeout: The timeout used to avoid awaiting a response that will never be received.

Enough of the rambling. Show me the code:

Examples Using SuspendMediator

The main objective of SuspendMediator is to communicate between coroutines. Suppose a ping pong of three coroutines multiplies a number by two multiple times.

Output:

Coroutine 1 - result1 Success(5)
Coroutine 2 - result2 Success(10)
Coroutine 3 - result3 Success(20)
Coroutine 1 - result4 Success(40)

The coroutine 1 suspends with key 1 until coroutine 2 resumes this key with the value 5. Coroutine 1 resumes receiving the value and sends double this value with key 2. The value of key 3 is produced in coroutine 2 and sent to coroutine 3, and so on. Note that the SuspendMediator is not a producer/consumer but a way to communicate bidirectionally between coroutines.

The more interesting example is using the action callback of suspend function. When the action is called and returns true, it is guaranteed that the key is ready for another coroutine call to resume with this key.

Backing to the login example at the start of the story, imagine this simple login string protocol:

Request: 1|username|password
Response: 1|loginResult

Note that the first field represents the type of message (Login) that, in this example, has the value of 1, and we can use this as a Key of the message. The protocol can be implemented using sealed classes like this:

Let’s see the DataSource example code using the SuspendMediator:

When the login function is called, the function suspends until the queuedSuspendMediator has no key of value 1 in the queue because queueMode equals ENQUEUE_SAME_KEY. In the simple case, the queue is empty or does not have key 1, so no suspension is needed.

After that, the WebSocket sendMessage is called with the value of LoginRequest. When the server responds, the listener parses the message and calls the resume function with the LoginResult message.

Conclusion

With the SuspendMediator, we can write simpler codes with less complexity of temporary variables for the request and response messages. WebSocket DataSource is one of many use cases of SuspendMediator.

Thanks for reading. Stay tuned for more.

--

--