Kotlin Coroutines in Complex Features

By | 2019-11-25T13:14:05+00:00 October 25th, 2019|Kotlin|5 Comments

Kotlin steadily progresses towards becoming the dominant language in Android world. One of Kotlin’s most intriguing features is its new concurrency framework: coroutines. Coroutines are the hottest thing in Android community right now and much discussions and development surround them.

For the past two weeks I’ve been using Kotlin coroutines to implement some advanced concurrent features. I started from zero practical experience with this framework and slowly progressed towards the end result. In this article, I’ll share my experience and insights.

The Original Feature in Java

By the time I came to work with coroutines, I already had the desired feature implemented in Java. Therefore, my experiment basically amounted to refactoring of existing concurrent code to an alternative implementation.

For reference, below you can see implementation of the class that encapsulates the “benchmarking code” in Java (it’s on GitHub). It concurrently starts X producer tasks and X consumer tasks (X is 1000 by default). The moment a producer starts, it attempts to put a simple message into a blocking queue having limited capacity. There is also a potential delay inside producers to simulate real IO. The moment consumer starts, it attempts to take a message from the same queue. The benchmark completes after each started consumer took a message from the queue.

Depending on the number of messages to be exchanged, the size of the blocking queue and the delay, this benchmark results in different degrees of contention.

public class ProducerConsumerBenchmarkUseCase extends BaseObservable<ProducerConsumerBenchmarkUseCase.Listener> {

    public static interface Listener {
        void onBenchmarkCompleted(Result result);
    }

    public static class Result {
        private final long mExecutionTime;
        private final int mNumOfReceivedMessages;

        public Result(long executionTime, int numOfReceivedMessages) {
            mExecutionTime = executionTime;
            mNumOfReceivedMessages = numOfReceivedMessages;
        }

        public long getExecutionTime() {
            return mExecutionTime;
        }

        public int getNumOfReceivedMessages() {
            return mNumOfReceivedMessages;
        }
    }

    private static final int NUM_OF_MESSAGES = DefaultConfiguration.DEFAULT_NUM_OF_MESSAGES;
    private static final int BLOCKING_QUEUE_CAPACITY = DefaultConfiguration.DEFAULT_BLOCKING_QUEUE_SIZE;

    private final Object LOCK = new Object();

    private final UiThreadPoster mUiThreadPoster = new UiThreadPoster();
    private final BackgroundThreadPoster mBackgroundThreadPoster = new BackgroundThreadPoster();

    private final MyBlockingQueue mBlockingQueue = new MyBlockingQueue(BLOCKING_QUEUE_CAPACITY);

    private int mNumOfFinishedConsumers;

    private int mNumOfReceivedMessages;

    public void startBenchmarkAndNotify() {
        mBackgroundThreadPoster.post(() -> {

            mNumOfReceivedMessages = 0;
            mNumOfFinishedConsumers = 0;
            long startTimestamp = System.currentTimeMillis();

            // producers init thread
            mBackgroundThreadPoster.post(() -> {
                for (int i = 0; i < NUM_OF_MESSAGES; i++) {
                    startNewProducer(i);
                }
            });

            // consumers init thread
            mBackgroundThreadPoster.post(() -> {
                for (int i = 0; i < NUM_OF_MESSAGES; i++) {
                    startNewConsumer();
                }
            });

            waitForAllConsumersToFinish();

            Result result;
            synchronized (LOCK) {
                result = new Result(
                        System.currentTimeMillis() - startTimestamp,
                        mNumOfReceivedMessages
                );
            }

            notifySuccess(result);
            
        });

    }

    private void waitForAllConsumersToFinish() {
        synchronized (LOCK) {
            while (mNumOfFinishedConsumers < NUM_OF_MESSAGES) {
                try {
                    LOCK.wait();
                } catch (InterruptedException e) {
                    return;
                }
            }
        }
    }

    private void startNewProducer(final int index) {
        mBackgroundThreadPoster.post(() -> {
            try {
                Thread.sleep(DefaultConfiguration.DEFAULT_PRODUCER_DELAY_MS);
            } catch (InterruptedException e) {
                return;
            }
            mBlockingQueue.put(index);
        });
    }

    private void startNewConsumer() {
        mBackgroundThreadPoster.post(() -> {
            int message = mBlockingQueue.take();
            synchronized (LOCK) {
                if (message != -1) {
                    mNumOfReceivedMessages++;
                }
                mNumOfFinishedConsumers++;
                LOCK.notifyAll();
            }
        });
    }

    private void notifySuccess(Result result) {
        mUiThreadPoster.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onBenchmarkCompleted(result);
            }
        });
    }
    
}

So, that’s the feature I wanted to re-implement using Kotlin coroutines. As a preliminary step I converted the code to Kotlin without changing anything, and then dived into coroutines world.

Kotlin Coroutines Documentation

Documentation is one of the most important aspects of a decent framework. It determines the speed of adoption and also affects the amount of problems you’ll have downstream. And I’m not talking about summarizing APIs and some simple examples exclusively. In my opinion, decent documentation should also cover the more advanced use cases and the “why” behind the framework.

The “why” part is especially important to me personally. If there is no clear “why”, then I simply pass on that tool. So far, this approach spared me and my clients vast amount of work. Therefore, the first thing I looked for was the “why”.

Unfortunately, I couldn’t get the rationale behind coroutines from either the official docs or Roman Elizarov’s numerous posts on Medium (Roman is kind of Kotlin coroutines’ “father”). Thankfully, at the end of one of his posts, Roman linked to this article by Nathaniel J. Smith. As far as I understand, Nathaniel is the one who coined the term “Structured Concurrency”. In that specific article he presents a very compelling case for this paradigm in both historical and technical contexts. [highly recommended read, regardless of your stance on coroutines]

So, after several hours of focused reading through docs and articles, I finally understood what’s all the fuss about. Now I could proceed to learning the “how” of coroutines.

One more word about the official documentation (in fact, many words).

Coroutines are concurrency framework. Therefore, I expected to find a list of “happens-before” guarantees provided by them in the docs, but there was none. So, I searched in the source code — maybe they specified the guarantees in Javadoc? — but nope:

14:50 kotlinx.coroutines $ find ./ -type f | xargs grep -Pi 'happens[- ]?before' --color
./kotlinx-coroutines-core/common/src/sync/Mutex.kt: * An unlock on a [Mutex] happens-before every subsequent successful lock on that [Mutex].
./kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt: fun testFlowOnCancellationHappensBefore() = runTest {

For comparison, Executor’s Javadoc contains this statement:

Memory consistency effects: Actions in a thread prior to submitting a Runnable object to an Executor happen-before its execution begins, perhaps in another thread.

The above statement in the API, though short, allows you to reason about thread-safety of any code that uses Executors.

Now, since coroutines use Executors under the hood, I can assume that their guarantees are essentially the same. However, making assumptions in context of concurrency can result in a lot of pain down the road. You could say that Kotlin isn’t JVM language exclusively, so it wouldn’t make sense to put JVM specific guarantees into its API. Fine, but these guarantees should be specified somewhere.

Therefore, I see missing “happens-before” guarantees in coroutines’ documentation and Javadoc as a red flag. [BTW, I asked about it on StackOverflow, but, as of now, there is still no definitive answer there]

Coroutines Dispatchers

Once I understood how to work with coroutines, I needed to choose the appropriate coroutines Dispatcher. If you aren’t familiar with this construct, then think of an Executor, because that’s what Dispatcher basically is. There are several “background” Dispatchers provided by the framework out of the box.

Dispatchers.Default is limited to the number of cores returned by Runtime.getRuntime().availableProcessors(). In my opinion, this dispatcher is pretty much useless and the fact that it’s named “default” will lead to lots of problems in the future. Now, don’t get me wrong, if you’ve got CPU bound flow and you want to limit its parallelism, then it might be a good idea. In fact, one of the examples in this article does exactly that. However, in such cases, you want to have standalone Dispatcher (Executor) dedicated to that specific flow. However, using one severely limited Dispatcher for your entire app by default is a recipe for disaster IMHO.

Another dispatcher provided by the framework is Dispatchers.IO, and that was what I needed because I knew that my “benchmark” is IO bound. So, I used this Dispatcher… and the flow hanged. Have I already said that making any kind of assumption in context of concurrency can lead to very painful results? Well, after another hour of debug, I finally realized that I might’ve assumed something about Dispatchers.IO which isn’t true. So, I read its Javadoc (for the first time, I must admit).

Wait, what?!?! Dispatchers.IO is limited to 64 threads? I must’ve misunderstood. Nope, it’s true: limited to 64 threads!

Now, I understand that coroutines are designed to be kind of “block-free”. Not sure it’s possible to achieve that, but let’s say it is. In the ideal world of non-blocking IO, maybe, just maybe, 64 threads would suffice. However, one of Kotlin’s main selling points is its interoperability with Java. And there is a lot of blocking code in Java. So, until the entire ecosystem, including all other libraries and frameworks out there, retool to Kotlin and coroutines, I can’t see how 64 threads is a reasonable concurrency limit for IO Dispatcher. It will work in most Android apps, of course, but JetBrain tries to push Kotlin into backend as well. What will backend folks say about being limited to 64 threads?

One could say: “just define your own Executor, convert it to Dispatcher, and get on with your task”. Yes, that would work, but, IMHO, it’s not that simple. What if there are many teams working on the project? How do you ensure that no one uses Dispatchers.IO? Lint rules, maybe. But then what if some of the libraries you import uses Dispatchers.IO? Again, it wouldn’t be a problem if this Dispatcher wouldn’t be called IO, but it is. So, one can expect that most developers will use it for IO bound tasks without looking too deep into the details.

Luckily, Dispatchers.IO Javadoc specifies how to change its limit. So I did. And now I recommend every single app which uses coroutines to put this line as close to app’s entry point as possible (change the limit if you know better):

System.setProperty(IO_PARALLELISM_PROPERTY_NAME, String.valueOf(Integer.MAX_VALUE));

And there is also Dispatchers.Unconfined. To be honest, I didn’t fully understand what it does, but still decided to try it out when I struggled with Dispatchers.IO. That had an effect of speeding up the benchmark by an order of magnitude compared to the same flow implemented using ThreadPool or RxJava. Wow, amazing. By this point, however, I didn’t assume anything anymore and decided to investigate the results. As suspected, there was a gotcha: the flow turned from concurrent contention of producers and consumers into sequential execution on one or two threads (don’t remember exactly).

So, all in all, the story of framework’s Dispatchers is one of confusion, nuances and, in my opinion, misconfiguration.

Kotlin Coroutines Cancellation

When I refactored the code to coroutines, I decided to start with the basic execution and deal with flow cancellation later. Now, after I won the “Battle of Dispatchers”, it was time to add the cancellation logic back.

“Simple cancellation” is one of the main stated advantages of coroutines, and it was indeed very simple to cancel this flow. I grabbed the Job returned by top-level launch call, and then cancelled it in onStop(). Took just several minutes to implement and worked like a charm.

However, by this point I was already suspicious of coroutines, so I decided to look deeper than my initial “I didn’t see the notification, so everything works”. I started CPU profiler, extracted heap dump and there were they: memory leaks, silently eating my app’s memory away. Little bastards, where did they come from?

Long story short: coroutines designers made a conscious decision not to interrupt any thread when you cancel coroutines. Therefore, in my benchmark, cancellation of the flow when the number of started producers and the number of started consumers aren’t equal (plus the size of the blocking queue), either excessive producers, or excessive consumers remain blocked forever and their respective threads are leaked. Oops.

There is probably some way to interrupt the threads manually on coroutine cancellation (maybe catch CancellationException), but I didn’t bother with it. You might’ve noticed that in Java version, the process of “cancellation” is just unregistering of listeners. This way, the benchmark always completes, but produces no side effects. I decided to implement the same approach with coroutines. Another hour of googling and I knew about GlobalScope, NonCancellable, async, Deferred and awaitAll.

The end result looks like this (also on GitHub):

class ProducerConsumerBenchmarkUseCase {

    class Result(val executionTime: Long, val numOfReceivedMessages: Int)

    private val blockingQueue = MyBlockingQueue(BLOCKING_QUEUE_CAPACITY)

    private val numOfReceivedMessages: AtomicInteger = AtomicInteger(0)
    private val numOfProducers: AtomicInteger = AtomicInteger(0)
    private val numOfConsumers: AtomicInteger = AtomicInteger(0)

    suspend fun startBenchmark() : Result {

        return withContext(Dispatchers.IO) {

            numOfReceivedMessages.set(0)
            numOfProducers.set(0)
            numOfConsumers.set(0)

            val startTimestamp = System.currentTimeMillis()

            // producers init coroutine
            val deferredProducers = async(Dispatchers.IO + NonCancellable) {
                for (i in 0 until NUM_OF_MESSAGES) {
                    startNewProducer(i)
                }
            }

            // consumers init coroutine
            val deferredConsumers = async(Dispatchers.IO + NonCancellable) {
                for (i in 0 until NUM_OF_MESSAGES) {
                    startNewConsumer()
                }
            }

            awaitAll(deferredConsumers, deferredProducers)

            Result(
                    System.currentTimeMillis() - startTimestamp,
                    numOfReceivedMessages.get()
            )
        }

    }

    private fun CoroutineScope.startNewProducer(index: Int) = launch(Dispatchers.IO) {
        Log.d("Producer", "producer ${numOfProducers.incrementAndGet()} started; " +
                "on thread ${Thread.currentThread().name}");
        Thread.sleep(DefaultConfiguration.DEFAULT_PRODUCER_DELAY_MS.toLong())
        blockingQueue.put(index)
    }

    private fun CoroutineScope.startNewConsumer() = launch(Dispatchers.IO) {
        Log.d("Consumer", "consumer ${numOfConsumers.incrementAndGet()} started; " +
                "on thread ${Thread.currentThread().name}");
        val message = blockingQueue.take()
        if (message != -1) {
            numOfReceivedMessages.incrementAndGet()
        }
    }

    companion object {
        private const val NUM_OF_MESSAGES = DefaultConfiguration.DEFAULT_NUM_OF_MESSAGES
        private const val BLOCKING_QUEUE_CAPACITY = DefaultConfiguration.DEFAULT_BLOCKING_QUEUE_SIZE
    }
}

So, what can I say about cancellation story with coroutines? As I suspected, it’s not that simple as you’d hope it to be.

Cancellation of any flow is a difficult task in general, but cancellation of concurrent flow is exceptionally difficult, nuanced and error prone task. That’s why I always prefer to let these flows complete and just ignore the result. In situations when it’s not an option (e.g. transfer of files over network, long CPU intensive processes, etc.), I try to work like a heart surgeon (or, at least, like I imagine them working).

Therefore, the fact that coroutines pass cancellation signal down the hierarchy for you is double-edged sword. It makes it simple to implement clean cancellation flows, but it also makes it simple to introduce serious and hard-to-spot bugs into your code. The sole notion of “simple cancellation” of concurrent flows is dangerously misleading.

In addition, given the fact that coroutines don’t interrupt threads, there is the question of integration with blocking code. Maybe this design choice was the correct decision, but it’s definitely a trade-off. Therefore, it would be great to have some sort of a guide containing a set of best practices for calling blocking methods from within coroutines. Maybe it will be as short as “make sure all blocking calls have timeouts”, but there should be something to start with.

Death at Runtime

Memory leaks are nasty and dangerous, but they are part of the game we all play. Sometimes they just happen.

When memory leak happens in Android app, then it becomes sluggish and eventually crashes. You get a crash report and dive into the depths of your app to find the root cause. In some cases, you fail and accept the fact that 0.x% of app’s sessions will result in a crash. In other cases, you find that stupid forgotten listener, or static reference to Activity kept deep inside your business logic. The teammate responsible for that abomination buys you a beer and live becomes rainbows and unicorns again.

If memory leak strikes in backend server and it crashes, then the resulting scenarios are much more diverse and interesting. If that’s just a single node out of numerous that crashed, there is a dynamic load balancer in place and all flows inside the app were atomic, then it will be noted and a ticket will be created in Jira to take care of that. If, however, it was your single server that crashed, some data became corrupted and there are no backups, then developers will be called from homes in the middle of the night and work long hours until the server will be up and running again.

I’ve got my fair share of memory leaks, both introduced and debugged, so I wasn’t too stressed about that. However, I was curious to see after how many start-cancel cycles will my device crash with OutOfMemory error.

Well, it never crashed. Instead, coroutines framework simply died. Well, I’m not sure that the verb “died” is the correct one in this case, but I haven’t seen anything like that in the past. At some point, after a chunk of application’s memory was leaked, coroutines simply stopped working and I couldn’t submit any more tasks.

So, it looked like if you leak memory with coroutines, then, instead of crashing the app, they just stop working. Needless to say that the prospect of app NOT crashing on memory leaks is a scary one. Now, instead of figuring out the problem from specific crash reports, you’ll need to figure it out from 1-star reviews of frustrated users. That’s in Android. In backend, it’ll probably result in all your users’ connections timing out.

I wasn’t sure whether this was a feature or a bug, but, in my opinion, it’s terrible. However, that still wasn’t the worst part of it.

After I completed the implementation and fixed memory leaks, I wanted to do some “load testing”. Basically, just see the difference in performance between different implementations of the same flow (thread, thread pools, RxJava, coroutines). And what do you think? It turns out that coroutines framework dies even under normal conditions when loaded! I couldn’t truly believe my eyes, so I added release build variant and tested on several devices. It just simply dies and the flow hangs!

I’m really not sure what to make of it.

[After further investigation and assistance from the community, we managed to find the root cause of this problem. It’s related to undocumented limit of Dispatchers.IO thread pool size. I filed this issue that describes the problem and the workaround.]

Readability of Code with Coroutines

Needless to say that I was quite discouraged by the results so far. However, I had additional class to refactor to coroutines. This time, it was CPU bound code that implemented simple parallelization scheme for factorial computation.

Here you can find it’s Java implementation (too long to post here).

This time, the refactoring went much faster and I didn’t encounter any issues. Huh.

But then, after I completed the refactoring, I noticed one very interesting thing: the code after refactoring was much cleaner than before. Now, I don’t get excited by the prospect of sparing some lines of code, and I didn’t jump on Kotlin hype-wagon to eliminate semicolons. I’m the kind of guy who can write plain Runnables instead of lambdas (as long as they’re auto-generated, of course). But this was something different.

Take a look at this code and compare it to Java implementation.

Forget about Kotlin, Java, semicolons and the rest of unimportant stuff for a moment. Concentrate on the fundamental stuff. Notice that there are zero class properties in coroutines version. Notice how the code in coroutines version reads like a story.

Now, don’t go “meditating” on coroutines now. I could massage Java variant with Futures and further convert it to Kotlin to spare some lines of code, so the end result would look very similar. Therefore, the point isn’t that coroutines are rainbows and unicorns.

However, the fact that I wrote this code with very little experience with the framework and without noticing it is intriguing. In some sense, I feel like the framework kind of guided me towards this result. And if it’s true, then there might be something worth exploring in more details in there.

Conclusion

So, that was my experience with coroutines in the past two weeks. I’m pretty sure that part of the problems outlined in this post are on me. Maybe I misunderstood the documentation, or didn’t follow some rules. Hopefully, coroutines pros will jump in to put me straight.

Said that, I can’t get rid of a feeling that this framework is horribly immature. It needs better documentation, it needs better best practices, it needs to tune its default dispatchers, it needs to document patterns and limitations of working with blocking code explicitly (at least) and, probably, much more. And how comes the framework “dies” under load? Is this some kind of feature, or the authors didn’t do load testing?

And it’s not like I experienced all these problems when trying to run the entire Netflix’s backend using coroutines. No, I simply wanted to prepare some non-trivial tutorials and exercises for students in my new course about multithreading in Android. Given the severity of problems I encountered with this relatively simple task, it makes me really pessimistic about coroutines used in production at scale. Sure, they’ll work alright for simple use cases like sending a network request in Android app, but that’s not enough. Multithreading framework must be rock solid.

On the positive side, I was really surprised by the cleanness of code after I refactored the factorial computation algorithm to coroutines. I don’t know whether it’s a general aspect of the framework, one-off thing, or, maybe, I had just been in a flow when I wrote that code, but it’s an interesting result in any case. At least for me, personally.

So, given all these issues should you learn coroutines?

Well, if you’re an Android developer, then, in my opinion, you don’t have choice. Coroutines seem to be destined to become the default concurrency approach in Kotlin. Therefore, given the ongoing transition and re-tooling of Android ecosystem, eventually you’ll meet coroutines in production. It might take several more years, but it’ll happen. If Android will still be around in several years, of course. Therefore, even though you don’t need to rush and jump on the hype-wagon with all the cool folks, keep an eye on the development in this area.

If you’re a backend developer, I can’t give you any recommendations, unfortunately. I know a thing or two about backend, but I don’t live it as I live Android.

By the way, as I mentioned earlier, I had to dive that deep into Kotlin Coroutines to record one of the modules for my new course about multithreading in Android. I wanted this course to be the most complete and, at the same time, the most advanced resource out there. It took me almost three months to produce this course, but it’s finally live! So, if you feel like you could use a refresher on some of the topics discussed in this article, or you’re new to this topic in general, then check out my new course: Android Multithreding Masterclass.

That’s all for now. As usual, please leave your comments and questions below.

If you liked this post, then you'll surely like my courses

Subscribe for new posts!

5 Comments

  1. Dmitry October 28, 2019 at 8:36 pm - Reply

    Most likely Thread.sleep in the first example is the reason for the unexpected result. It blocks the underlaying Java thread rather than coroutine itself eliminating the non-blocking nature of coroutines.
    In the coroutine world use https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/delay.html
    I would be glad to see the updated results.

    • Vasiliy October 29, 2019 at 3:22 am - Reply

      Hello Dmitry,
      The sleep call is intentional and the problem reproduced before I added it. That said, the source code is on GitHub and you have reproduction steps in the article, so you can test your hypothesis. Let us know about the results.
      Regards
      Vasiliy

    • Bartek December 9, 2019 at 7:51 pm - Reply

      Sleep is not the issue. It’s a deadlock with very small queue. ProducerConsumerBenchmarkUseCase is not proper coroutine code. Transferring 1:1 threads to coroutines gains nothing. Use right tool for the job.

      Bartek

      • Vasiliy December 9, 2019 at 8:05 pm - Reply

        Hi Bartek,
        I’m jumping in instead of Dmitry here.
        I posted the link to the issue in the article. It was a bug in coroutines framework: Dispatchers.IO had a second limit on the amount of threads, in addition to the one that had been documented in Javadoc. It’s not deadlock, but more like indefinite starvation.
        As for whether it’s proper coroutines code, I disagree. However, I’m open to admit my mistake if you show how to implement this benchmark using a more “proper coroutines code”.

  2. Slava December 4, 2019 at 1:59 am - Reply

    What a fascinating article, thank you! I really enjoyed your investigative approach to digging into Coroutines. Very excited to check out the multithreading class now 🙂

Leave A Comment