Concurrency Frameworks in Android are Overrated

In my recent articles I argued (among other things) that concurrency frameworks are neither required nor sufficient to write clean concurrent code. In other words, you can write clean concurrent code without concurrency frameworks, and if you do use them, it won’t automatically make your code better.

This idea turned out to be both interesting and controversial because it led to two distinct types of feedback. One group of readers got interested and asked how to implement more complex concurrent requirements without frameworks. Another group of readers expressed their deepest disagreement with my statements and some developers even argued that my ideas are harmful and dangerous.

In this post I’m going to show you how I write complex and clean concurrent code without frameworks. In my opinion, this demonstration should cover both the questions about more complex scenarios and the concerns about the feasibility of this approach.

Requirements

The example which I’ll show you in this article is based on a real-world scenario that I dealt with in client’s codebase last week. Obviously, I can’t share the details and I can’t show you client’s code, but, luckily, these aren’t required to demonstrate the fundamentals of concurrent code.

So, imagine that you have these requirements for a specific app’s feature:

  1. Process and merge two groups of files into two new individual files
  2. Compress the resulting files into zip archive
  3. Upload zip archive to a server
  4. Notify other interested components within the codebase about the result of flow’s execution

Since files and networking are involved, this flow shouldn’t be executed on UI thread. However, I do want to be able to call it from UI thread and I do want to get completion notifications on UI thread. Therefore, I’ll need to write concurrent code.

Concurrency Implementation

When I implemented the requirements, client’s codebase already used ThreadPoster library. Therefore, I used it for this new functionality as well. However, to make a point, in this article I’ll use bare Thread class for background work. To return execution from background to UI thread, I’ll use the standard Android’s Handler class.

Basic Implementation

The most basic implementation of the above flow can be implemented in the following manner:

public class UploadFilesUseCase extends BaseObservable<UploadFilesUseCase.Listener> {

    public interface Listener {
        void onFilesUploaded();
        void onFilesUploadFailed();
    }

    private final Handler uiHandler = new Handler(Looper.getMainLooper());

    public void uploadFiles() {
        new Thread(() -> uploadFilesSync()).start();
    }

    @WorkerThread
    private void uploadFilesSync() {
        File mergedA = processAndMergeFilesOfTypeA();
        File mergedB = processAndMergeFilesOfTypeB();
        File archive = compressMergedFiles(mergedA, mergedB);

        HttpManager.getInstance.uploadFiles(
                archive,
                new HttpRequestListener() {
                    @Override
                    public void onDone(int code, byte[] body) {
                        if (code / 100 == 2) {
                            notifySuccess();
                        } else {
                            notifyFailure();
                        }
                    }

                    @Override
                    public void onFailure() {
                        notifyFailure();
                    }
                }
        );
    }

    @WorkerThread
    private File processAndMergeFilesOfTypeA() { ... }

    @WorkerThread
    private File processAndMergeFilesOfTypeB() { ... }

    @WorkerThread
    private File compressMergedFiles(File fileA, File fileB) { ... }

    private void notifySuccess() {
        uiHandler.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onFilesUploaded();
            }
        });
    }

    private void notifyFailure() {
        uiHandler.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onFilesUploadFailed();
            }
        });
    }

}

If you don’t understand why this “use case” class is structured the way it is, then you can read this article about use cases.

I left the methods related to file manipulations blank. Their logic is not relevant to our discussion here.

You might be surprised by the fact that I use HttpManager as Singleton. Given that I always advocate against Singletons, that’s odd. Well, if I’d write this app from scratch, I wouldn’t do that. However, in this case, I had to work with legacy code and refactoring of networking layer wasn’t the goal of this project. Theoretically, I could inject HttpManager into constructor to make this class a bit cleaner. However, I knew that the codebase suffered from bad temporal coupling and I didn’t want taking on additional risk by (potentially) changing the initialization order.

On the other hand, this Singleton with its callbacks will allow us to make this example a bit more interesting later, so that’s a plus.

Error Handling

The above basic implementation accounts for networking errors, but network isn’t the only source of potential errors in this flow. File operations can fail too.

Therefore, let’s assume that file-related methods can throw checked OperationFailedException if anything goes wrong:

@WorkerThread
private File processAndMergeFilesOfTypeA() throws OperationFailedException { ... }

@WorkerThread
private File processAndMergeFilesOfTypeB() throws OperationFailedException { ... }

@WorkerThread
private File compressMergedFiles(File fileA, File fileB) throws OperationFailedException { ... }

Then I can add the following error handling:

@WorkerThread
private void uploadFilesSync() {
    File archive = null;
    try {
        File mergedA = processAndMergeFilesOfTypeA();
        File mergedB = processAndMergeFilesOfTypeB();
        archive = compressMergedFiles(mergedA, mergedB);
    } catch (OperationFailedException e) {
        notifyFailure();
        return;
    }

    HttpManager.getInstance.uploadFiles(
            archive,
            new HttpRequestListener() {
                @Override
                public void onDone(int code, byte[] body) {
                    if (code / 100 == 2) {
                        notifySuccess();
                    } else {
                        notifyFailure();
                    }
                }

                @Override
                public void onFailure() {
                    notifyFailure();
                }
            }
    );
}

This is very simple error handling, but you can implement much more complex schemes if needed.

For example, let’s say that in case of any error I want to retry this flow for a predefined number of times, before I declare it a “failure”. That would be surprisingly simple to achieve:

public void uploadFiles() {
    new Thread(() -> uploadFilesSync(0)).start();
}

@WorkerThread
private void uploadFilesSync(int retryCount) {
    File archive = null;
    try {
        File mergedA = processAndMergeFilesOfTypeA();
        File mergedB = processAndMergeFilesOfTypeB();
        archive = compressMergedFiles(mergedA, mergedB);
    } catch (OperationFailedException e) {
        retryOrFail(retryCount);
        return;
    }

    HttpManager.getInstance.uploadFiles(
            archive,
            new HttpRequestListener() {
                @Override
                public void onDone(int code, byte[] body) {
                    if (code / 100 == 2) {
                        notifySuccess();
                    } else {
                        retryOrFail(retryCount);
                    }
                }

                @Override
                public void onFailure() {
                    retryOrFail(retryCount);
                }
            }
    );
}

@WorkerThread
private void retryOrFail(int currentRetryCount) {
    if (currentRetryCount >= MAX_RETRIES - 1) {
        notifyFailure();
        return;
    }
    uploadFilesSync(currentRetryCount + 1);
}

As an exercise, I invite you to think how you’d implement exponential back-off for retry attempts. Hint: only retryOrFail method would be affected.

Using this approach you can implement clean error handling of any complexity.

Performance Optimization

The implementation that I have by now is great and will suffice in most situations. However, very rarely, you might need to optimize it. This was one of these rare cases.

See, the files in question are relatively big, so this flow took a considerable time. While it’s not the end of the world, client’s staff initiated the upload remotely and then waited to get the files. Given the fact that they’ll do it hundreds, or even thousands of times in the future, I decided that it’s worth complicating the implementation of this class to reduce the execution time.

What can be optimized here? Well, not that much because most of the steps in this flow are inter-dependent. However, the processing of two sets of files can be done concurrently. Therefore, I decided to process every set of files on a dedicated background thread.

The resulting implementation looks like this:

@WorkerThread
private void uploadFilesSync(int retryCount) {

    final AtomicReference<File> mergedA = new AtomicReference<>(null);
    final AtomicReference<File> mergedB = new AtomicReference<>(null);

    final CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        try {
            mergedA.set(processAndMergeFilesOfTypeA());
        } catch (OperationFailedException e) {
            // log the exception
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    new Thread(() -> {
        try {
            mergedB.set(processAndMergeFilesOfTypeB());
        } catch (OperationFailedException e) {
            // log the exception
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException("unexpected interrupt");
    }

    if (mergedA.get() == null || mergedB.get() == null) {
        retryOrFail(retryCount);
        return;
    }

    File archive;
    try {
        archive = compressMergedFiles(mergedA.get(), mergedB.get());
    } catch (OperationFailedException e) {
        retryOrFail(retryCount);
        return;
    }

    HttpManager.getInstance.uploadFiles(
            archive,
            new HttpRequestListener() {
                @Override
                public void onDone(int code, byte[] body) {
                    if (code / 100 == 2) {
                        notifySuccess();
                    } else {
                        retryOrFail(retryCount);
                    }
                }

                @Override
                public void onFailure() {
                    retryOrFail(retryCount);
                }
            }
    );
}

As you see, performance optimization using concurrency caused quite a jump in complexity of this class. That’s why I recommend avoiding optimization for the sake of optimization. Do that only if you have clear and compelling business case which justifies the trade-off.

Cleanup Of Temporary Files

The flow encapsulated in this use case class creates temporary files along the way. They aren’t needed after the flow completes and can even cause troubles. Therefore, I want to clean them up.

When should these files be deleted? When the flow either completes successfully or fails, of course. That’s right, but there is more. Since we added retries to this algorithm, I’d like to employ defensive programming and delete these files on each retry.

To simplify the implementation, I made sure that temp files are always created in one specific directory. This way the cleanup amounts to just deleting this directory each time.

That’s the result:

@WorkerThread
private void uploadFilesSync(int retryCount) {

    final AtomicReference<File> mergedA = new AtomicReference<>(null);
    final AtomicReference<File> mergedB = new AtomicReference<>(null);

    final CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        try {
            mergedA.set(processAndMergeFilesOfTypeA());
        } catch (OperationFailedException e) {
            // log the exception
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    new Thread(() -> {
        try {
            mergedB.set(processAndMergeFilesOfTypeB());
        } catch (OperationFailedException e) {
            // log the exception
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException("unexpected interrupt");
    }

    if (mergedA.get() == null || mergedB.get() == null) {
        retryOrFail(retryCount);
        return;
    }

    File archive;
    try {
        archive = compressMergedFiles(mergedA.get(), mergedB.get());
    } catch (OperationFailedException e) {
        retryOrFail(retryCount);
        return;
    }

    HttpManager.getInstance.uploadFiles(
            archive,
            new HttpRequestListener() {
                @Override
                public void onDone(int code, byte[] body) {
                    if (code / 100 == 2) {
                        notifySuccess();
                    } else {
                        retryOrFail(retryCount);
                    }
                }

                @Override
                public void onFailure() {
                    retryOrFail(retryCount);
                }
            }
    );

    deleteTempDir();
}

@WorkerThread
private void retryOrFail(int currentRetryCount) {
    deleteTempDir();
    if (currentRetryCount >= MAX_RETRIES - 1) {
        notifyFailure();
        return;
    }
    uploadFilesSync(currentRetryCount + 1);
}

Have you noticed the bug, though? If you haven’t, go on and look for it. This was a real bug that I planted in the original implementation and found during testing.

Hint: the uploaded file will always be 0 bytes in size.

The problem, of course, is that I start the upload of the archive and concurrently delete the temporary directory. Not a smart thing to do.

The fix is quite simple: just move the final call to deleteTempDir() into HttpRequestListener’s onDone method:

    HttpManager.getInstance.uploadFiles(
            archive,
            new HttpRequestListener() {
                @Override
                public void onDone(int code, byte[] body) {
                    if (code / 100 == 2) {
                        deleteTempDir();
                        notifySuccess();
                    } else {
                        retryOrFail(retryCount);
                    }
                }

                @Override
                public void onFailure() {
                    retryOrFail(retryCount);
                }
            }
    );

Now it should work as expected.

Synchronizing Asynchronous Code

One of the reasons for the bug in the previous section is that I mixed synchronous code with asynchronous callback. In essence, by using HttpManager inside this use case, I made the first step towards so-called callback hell. That bug was a direct consequence of the additional complexity.

As I already wrote, refactoring the networking layer of this app wasn’t on the table. So, looks like I had no other options left. Not exactly. I could still convert the entire flow to synchronous execution.

That’s how I could do that:

@WorkerThread
private void uploadFilesSync(int retryCount) {

    final AtomicReference<File> mergedA = new AtomicReference<>(null);
    final AtomicReference<File> mergedB = new AtomicReference<>(null);

    final CountDownLatch countDownLatch = new CountDownLatch(2);

    new Thread(() -> {
        try {
            mergedA.set(processAndMergeFilesOfTypeA());
        } catch (OperationFailedException e) {
            // log the exception
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    new Thread(() -> {
        try {
            mergedB.set(processAndMergeFilesOfTypeB());
        } catch (OperationFailedException e) {
            // log the exception
        } finally {
            countDownLatch.countDown();
        }
    }).start();

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException("unexpected interrupt");
    }

    if (mergedA.get() == null || mergedB.get() == null) {
        retryOrFail(retryCount);
        return;
    }

    File archive;
    try {
        archive = compressMergedFiles(mergedA.get(), mergedB.get());
    } catch (OperationFailedException e) {
        retryOrFail(retryCount);
        return;
    }

   int responseCode = uploadFileToServer(archive);

    if (responseCode / 100 == 2) {
        deleteTempDir();
        notifySuccess();
    } else {
        retryOrFail(retryCount);
    }
}

@WorkerThread
private int uploadFileToServer(File archive) {
    final AtomicInteger responseCode = new AtomicInteger(0);
    final CountDownLatch countDownLatch = new CountDownLatch(1);

    HttpManager.getInstance.uploadFiles(
            archive,
            new HttpRequestListener() {
                @Override
                public void onDone(int code, byte[] body) {
                    responseCode.set(code);
                    countDownLatch.countDown();
                }

                @Override
                public void onFailure() {
                    responseCode.set(0);
                    countDownLatch.countDown();
                }
            }
    );

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException("unexpected interrupt");
    }

    return responseCode.get();
}

One exceptionally important point to note here is that even though the code became longer, it also became much simpler. Now the algorithm in uploadFilesSync() is crystal clear and its steps form a simple sequential structure. That’s a big win for long-term maintainability.

However, that’s not all. There are also major design benefits.

Note that the original implementation assumed (implicitly) that HttpManager will invoke the callback on a background thread. If it would use UI thread instead, the app would freeze for prolonged periods of time. The “synchronized” implementation makes no such assumption anymore, so it’s less coupled to the implementation of HttpManager.

In addition, note that uploadFileToServer() method doesn’t depend on anything inside this use case. Therefore, I can extract it into standalone class which will abstract out all the irrelevant details, including the fact that HttpManager is a Singleton. If I’d do the same for other methods inside HttpManager, I’d create convenient synchronous networking API inside the codebase. Furthermore, if I’d break the API of HttpManager into multiple narrowly focused classes, it would be highly beneficial if I’d need to actually refactor the networking layer in this app. That’s so-called Interface Segregation Principle in action.

So, implementation with more code, but much better in pretty much every aspect. I want you to remember this point because there is so much hype about “less lines of code” out there, which is absolutely meaningless metric on this scale. Especially in concurrent code.

Preventing Concurrent Invocations

After I implemented all the business requirements in this use case, I realized that there is one additional implicit technical requirement here: prevent multiple concurrent invocations of this flow. It doesn’t make any sense to upload the same data multiple times concurrently and could lead to serious bugs, including data corruption.

I meet this requirement quite often, so I already have a standard pattern to implement it. First, I’ll make UploadFilesUseCase extend BaseBusyObservable instead of just BaseObservable. This will add “busy” flag to the use case, as well as several convenience methods to handle transitions to and from “busy” state.

Once I have that, the implementation of this requirement amounts to proper management of “busy” flag and aborting use case invocations when it’s “busy”.

Final Implementation

The final implementation of this complex use case, excluding irrelevant details, will look like this:

public class UploadFilesUseCase extends BaseBusyObservable<UploadFilesUseCase.Listener> {

    public interface Listener {
        void onFilesUploaded();
        void onFilesUploadFailed();
    }

    private final static int MAX_RETRIES = 3;

    private final Handler uiHandler = new Handler(Looper.getMainLooper());

    public void uploadFiles() {
        if (!isFreeAndBecomeBusy()) {
            // log concurrent invocation attempt
            return;
        }
        new Thread(() -> uploadFilesSync(0)).start();
    }

    @WorkerThread
    private void uploadFilesSync(int retryCount) {
        final AtomicReference<File> mergedA = new AtomicReference<>(null);
        final AtomicReference<File> mergedB = new AtomicReference<>(null);

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(() -> {
            try {
                mergedA.set(processAndMergeFilesOfTypeA());
            } catch (OperationFailedException e) {
                // log the exception
            } finally {
                countDownLatch.countDown();
            }
        }).start();

        new Thread(() -> {
            try {
                mergedB.set(processAndMergeFilesOfTypeB());
            } catch (OperationFailedException e) {
                // log the exception
            } finally {
                countDownLatch.countDown();
            }
        }).start();

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("unexpected interrupt");
        }

        if (mergedA.get() == null || mergedB.get() == null) {
            retryOrFail(retryCount);
            return;
        }

        File archive;
        try {
            archive = compressMergedFiles(mergedA.get(), mergedB.get());
        } catch (OperationFailedException e) {
            retryOrFail(retryCount);
            return;
        }

       int responseCode = uploadFileToServer(archive);

        if (responseCode / 100 == 2) {
            deleteTempDir();
            notifySuccess();
        } else {
            retryOrFail(retryCount);
        }
    }

    @WorkerThread
    private int uploadFileToServer(File archive) {
        final AtomicInteger responseCode = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        HttpManager.getInstance.uploadFiles(
                archive,
                new HttpRequestListener() {
                    @Override
                    public void onDone(int code, byte[] body) {
                        responseCode.set(code);
                        countDownLatch.countDown();
                    }

                    @Override
                    public void onFailure() {
                        responseCode.set(0);
                        countDownLatch.countDown();
                    }
                }
        );

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("unexpected interrupt");
        }

        return responseCode.get();
    }

    @WorkerThread
    private void retryOrFail(int currentRetryCount) {
        deleteTempDir();
        if (currentRetryCount >= MAX_RETRIES - 1) {
            notifyFailure();
            return;
        }
        uploadFilesSync(currentRetryCount + 1);
    }

    @WorkerThread
    private File processAndMergeFilesOfTypeA() throws OperationFailedException { ... }

    @WorkerThread
    private File processAndMergeFilesOfTypeB() throws OperationFailedException { ... }

    @WorkerThread
    private File compressMergedFiles(File fileA, File fileB) throws OperationFailedException { ... }

    @WorkerThread
    private void deleteTempDir() { ... }

    private void notifySuccess() {
        uiHandler.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onFilesUploaded();
            }
            becomeNotBusy();
        });
    }

    private void notifyFailure() {
        uiHandler.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onFilesUploadFailed();
            }
            becomeNotBusy();
        });
    }

}

Evidently, there is quite a bit of code in this class. Some of it might be extracted into collaborator objects, but I kept it grouped in a single class for the purpose of this article.

However, what’s important here is not the amount of code, but the fact that every developer who understands concurrency on JVM can maintain this class. Even though the requirements are complex, the code is very explicit about what’s going on and no knowledge of any framework is required. As you saw, even something as complex as exponential back-off can be added to this implementation by making small change in one single method.

This code would be good ten years ago (with some syntactic changes), it’s good today and it’s going to be good ten years from now. That’s what makes for maintainable code, not the number of lines of code.

Alternative Implementations Using Frameworks

I posted this article on Reddit to get a bit of critical feedback, and, boy, did I get it. There are some very interesting discussions there and other developers started to provide alternative implementations. I invite you to read that thread to see other opinions on this topic.

Currently, this gist is the best alternative implementation using RxJava. It uses Kotlin instead of Java, assumes different implementation of HttpManager and lacks logging, but, in principle, it should implement similar requirements. You can see that it’s much shorter than my original implementation and has less “moving parts” (at least on paper). However, even though I’m familiar with RxJava in general, I can’t verify the correctness of this solution from just reading the code. This has always been my biggest problem with this framework: you need to invest a lot of time into it to be able to use it, or even just read someone else’s code. There is no way to reason about RxJava code from the fundamental principles. Therefore, I always recommended against coupling your project to this third-party framework. In addition, RxJava is long past its popularity peak and will soon become legacy in Android.

The version which uses Coroutines is much more interesting in my opinion. It’s definitely much shorter than a solution that uses multithreading primitives and easier to understand. However, it’s not simpler and surely not better. This implementation contains at least two bugs, one of which is directly caused by the complexity of Coroutines. Try to find these bugs, and, maybe, others. You can find the discussion of the bug related to Coroutines in the comments section after the post (I failed to notice it myself). So, would you prefer nice-looking concurrent code, or reliable and maintainable one?

Lastly, this gist shows the implementation using Flow framework. This version of the algorithm implements “granular retry” described below, so it packs more functionality than all the preceeding implementations. In my opinion, it looks like a more complicated version of a solution with RxJava. It’s too early to say anything specific about Flow framework, but, as of today, I wouldn’t use it in production code.

More Granular Retry

During the discussion with other developers, I’ve been asked to implement one additional requirement: more granular retry mechanism. For example, if the upload fails, we can just retry the upload and there is really no need to re-process the files.

This is the result:

public class UploadFilesUseCase extends BaseBusyObservable<UploadFilesUseCase.Listener> {

    public interface Listener {
        void onFilesUploaded();
        void onFilesUploadFailed();
    }

    private final static int MAX_RETRIES = 3;

    private final Handler uiHandler = new Handler(Looper.getMainLooper());

    public void uploadFiles() {
        if (!isFreeAndBecomeBusy()) {
            // log concurrent invocation attempt
            return;
        }
        new Thread(this::uploadFilesSync).start();
    }

    @WorkerThread
    private void uploadFilesSync() {
        final AtomicReference<File> mergedA = new AtomicReference<>(null);
        final AtomicReference<File> mergedB = new AtomicReference<>(null);

        final CountDownLatch countDownLatch = new CountDownLatch(2);

        new Thread(() -> {
            mergedA.set(processAndMergeFilesOfTypeAWithRetry());
            countDownLatch.countDown();
        }).start();

        new Thread(() -> {
            mergedB.set(processAndMergeFilesOfTypeBWithRetry());
            countDownLatch.countDown();
        }).start();

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("unexpected interrupt");
        }

        if (mergedA.get() == null || mergedB.get() == null) {
            flowFailed();
            return;
        }

        File archive;
        try {
            archive = compressMergedFiles(mergedA.get(), mergedB.get());
        } catch (OperationFailedException e) {
            flowFailed();
            return;
        }

        if (uploadFileToServerWithRetry(archive)) {
            deleteTempDir();
            notifySuccess();
        } else {
            flowFailed();
        }
    }

    @WorkerThread
    private boolean uploadFileToServerWithRetry(File archive) {
        for (int i = 0; i < MAX_RETRIES; i++) {
            int responseCode = uploadFileToServer(archive);
            if (responseCode / 100 == 2 ) {
                return true;
            }
        }
        return false;
    }

    @WorkerThread
    private int uploadFileToServer(File archive) {

        final AtomicInteger responseCode = new AtomicInteger(0);
        final CountDownLatch countDownLatch = new CountDownLatch(1);

        HttpManager.getInstance.uploadFiles(
                archive,
                new HttpRequestListener() {
                    @Override
                    public void onDone(int code, byte[] body) {
                        responseCode.set(code);
                        countDownLatch.countDown();
                    }

                    @Override
                    public void onFailure() {
                        responseCode.set(0);
                        countDownLatch.countDown();
                    }
                }
        );

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            throw new RuntimeException("unexpected interrupt");
        }

        return responseCode.get();
    }

    @WorkerThread
    private void flowFailed() {
        deleteTempDir();
        notifyFailure();
    }

    @WorkerThread
    private @Nullable File processAndMergeFilesOfTypeAWithRetry() {
        for (int i = 0; i < MAX_RETRIES; i++) {
            try {
                ...
            } catch (OperationFailedException e) {
                // log the exception
            }
        }
        return null;
    }

    @WorkerThread
    private @Nullable File processAndMergeFilesOfTypeBWithRetry() {
        for (int i = 0; i < MAX_RETRIES; i++) {
            try {
                ...
            } catch (OperationFailedException e) {
                // log the exception
            }
        }
        return null;
    }

    @WorkerThread
    private File compressMergedFiles(File fileA, File fileB) throws OperationFailedException { ... }

    @WorkerThread
    private void deleteTempDir() { ... }

    private void notifySuccess() {
        uiHandler.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onFilesUploaded();
            }
            becomeNotBusy();
        });
    }

    private void notifyFailure() {
        uiHandler.post(() -> {
            for (Listener listener : getListeners()) {
                listener.onFilesUploadFailed();
            }
            becomeNotBusy();
        });
    }

}

As you see, if your code already executes on a background thread synchronously, retries become just loops.

It’s not as short and sweet as RxJava or Flow operators, that’s sure. However, every developer who undertand concurrency on JVM can maintain this code in the future. In my opinion, that’s not something you can say about RxJava or Flow solution linked above.

Conclusion

In this article I demonstrated how to implement a complex concurrent flow using just the most fundamental concurrency tools. The requirements were based on a real-world scenario that solved a real problem of a real business, though I complicated them a bit to cover more ground.

The resulting implementation is robust (unless I made a mistake, of course), readable, maintainable and very flexible. You could easily change this implementation because the code is explicit and straightforward.

By the way, I’m not against frameworks. Well, not against all of them, at least. In this article I intentionally used bare Thread class to make a point, but that’s not something I’d do in real production code. Instead, I used ThreadPoster framework there. For pure Kotlin codebase I’d use Coroutines. So, there is definitely a place for frameworks in your toolbox (especially if you want to write unit tests for concurrent code). But don’t buy into the hype and don’t be that developer who uses concurrency frameworks, but doesn’t know the principles of good design and doesn’t undersand concurrency fundamentals.

Lastly, if you found the concurrency details in this post a bit challenging, check out my multithreading course. It’s the best resource to learn about concurrency for Android developers. In fact, I’m sure that all developers who work on JVM will find it useful.

Thanks for reading and don’t forget to subscribe for notifications about new posts.

Check out my premium

Android Development Courses

30 comments on "Concurrency Frameworks in Android are Overrated"

  1. I’ve thrown together a version using plain Kotlin Coroutines instead of the Java primitives: https://gist.github.com/roschlau/246e9ae7f2f8e4abb0576e4d8415c858

    So obviously this won’t work if you’re bound to Java, but I think the readability of the `uploadFilesSync` function benefits hugely from being able to leave out things like the CountDownLatch for synchronization.

    Granted, the `uploadFileToServer` function is a little awkward with that `suspendCancellableCoroutine` call now, but I did this to touch as little of its code as I could, in a real scenario I’d spent some time making that easier to understand, or just use Ktor as a my http client, which integrates much more seamlessly into coroutines.

    Would like to hear your opinion on it!

    Reply
  2. Hi Vasily,
    Thanks for this post!

    I summarise this post and your general attitude as “For a software development team, it does not pay off to learn a concurrency framework, as the abstractions they provide don’t bring any benefits in terms of code maintainability”. So you stick with the “concurrency fundamentals” and rather use low-level concurrency primitives.

    You also said that you are not against frameworks. So I am interested to hear in which situations frameworks do make sense for you? Furthermore, I wonder how you made the decision that in the case of concurrency framework on Android, using a framework does not make sense?

    Thanks!

    Reply
    • Hello Lukas,
      Allow me correct your summary a bit.
      As I stated in the article, I used bare Thread just as an example and I use a simple multithreading framework in production myself. In addition, I think that if you write purely Kotlin app, then you should probably use Coroutines for concurrency. So, I’m not against all frameworks. The goal of this post was to show that you don’t need them, not that you shouldn’t use them.
      As for how to evaluate the trade-offs, that’s an outstanding question. The answer is very long, though, so I won’t be able to write it as a comment. Therefore, I’ll add this topic to my list of ideas for posts and try to address it in the future.

      Reply
  3. I have watched your interview with Mitch and I strongly disagree with your concurrency attitude. On one hand, you’re saying you like simple code and on another hand, you’re taking the harder path of concurrency implementation.
    I just think you are stubborn on an issue for some personal reason.

    Reply
  4. This is a nice illustration of plain Java API. Is there any reason you didn’t use Executors and Futures (Java 1.7)? This is my preferred option. I do believe it can simplify your code a lot, no need to use AtomicReference and CountDownLatch.

    E.g.
    val f1 = executor.submit(…)
    val f2 = executor.submit(…)
    f1.get()
    f2.get()
    // work is done

    Reply
    • Hi Dmytro,
      As I wrote somewhere in the text, I intentionally wanted to use the most fundamental constructs in this post. The real implementation uses my ThreadPoster library. If you’ll take a look at its source, you’ll notice that it’s a very thin wrapper around Executor.
      I don’t use Futures in my code because I don’t see much value in them TBH. In most cases, all you need is to offload to bg thread once and then get back to UI thread once. ThreadPoster is ideal for that.
      That said, Futures is a valid implementation choice in my opinion.

      Reply
  5. Thanks for taking time to write this controversial post. Sometimes, different opinions can bring good solutions.

    I’ve found a typo in the code.
    mergedB.set(processAndMergeFilesOfTypeA());

    Have a good day

    Reply
  6. In general, I am all for using simple, specialized solutions instead of using generic, heavy weight libraries. However, this proposed solution completely lacks any form of thread management: you can’t control the amount of threads created, you can’t cancel a running thread, you can’t reuse threads, you can’t detect deadlocked threads (i.e. indefinite `await()`), etc.
    Adding this functionality and properly validating its correctness will make you change your mind about concurrency frameworks, because with rx (or any other framework) such functionality usually comes out of the box.
    I would strongly advise against using home-brewed solutions in case of concurrency, unless you’re a concurrency expert with years of experience with multithreaded programming. In this case, it’s a very strong “don’t reinvent the wheel”.
    For 99.9% of developers, using frameworks that are properly tested, optimized, supported and used by thousands of other developers will be a much better choice.

    Reply
    • Hi Kamil,
      Thanks for sharing your opinion.
      You’re right that this solution doesn’t implement all these features that you listed. However, in my experience, they aren’t needed in absolute majority of use cases and some of them bring more trouble than benefits.
      For example, I spent quite a bit of time in my multithreading course to demonstrate why limiting the amount of threads in your app will have negligible impact in terms of perf improvements (unless you have very special singular use case). On the other hand, I also demonstrated that configuring ThreadPoolExecutor is one of the most difficult tasks and even Google’s and JetBrains’ devs got it wrong (in cases of AsyncsTask and Coroutines).
      In addition, note that Thread, Atomic* classes, CountDownLatch, etc. aren’t “home-brewed solutions”. These are the most stable and battle-tested concurrency constructs. Therefore, if you’re indeed looking for “properly tested, optimized supported and used by thousands of other developers” tools, these are your best bet.

      Reply
      • I do believe that at least being able to track and cancel async tasks is a must, as those are very common sources of resource leaks, unnecessary battery drain and very hard to debug race conditions in apps.

        In terms of “home-brewed solutions” I didn’t mean the basic building blocks provided by the language, which I 100% agree that are the most stable and well tested, but solutions that put these constructs together in order to achieve a specific result (especially if those results can already be achieved properly by open-source frameworks).

        I.e. the last source code example:
        “new Thread(() -> {
        mergedB.set(processAndMergeFilesOfTypeBWithRetry());
        countDownLatch.countDown();
        }).start();”
        will cause a deadlock if runnable (`processAndMergeFilesOfTypeBWithRetry()` function) throws an exception, even though it uses only “the most stable and battle-tested concurrency constructs” 🙂
        Other code examples properly wrap latch operations in `try/finally` block, but this just shows how easy it is to write a very simple multithreaded code with very serious (potential) bugs.

        Reply
        • Kamil,
          In the last code example, there will be no exceptions since processAndMergeFilesOfTypeBWithRetry() doesn’t throw any. I wrote this code and I know what it does. In addition, even if it’ll throw some unexpected RuntimeException, the app will crash, not hang (I guess that’s what you meant by “deadlock”, since there is absolutely no deadlock here). So, there are no “serious (potential) bugs” here, as far as I can tell.
          As for cancellation due to “resource leaks and race coditions”, it sounds like you don’t trust your own code. If you suspect that any of these can happen, then the proper response is to make the code robust because cancellation won’t help solve the problem. And to make it robust, you actually need to understand all these concepts that I used in this article.
          As for battery drain, that would be preliminary optimization in absolute majority of the cases. Even for this flow, which might take on the order of 1 minute, I wouldn’t write one single line of code to make it cancellable. The potential benefits don’t justify the additional complexity and risks. In the last 4 projects I’ve been involved in, I had to cancel async flows just twice. Once to cancel big download and once to stop playback in a custom media player that I designed. So, once again, it’s important to understand that in most cases cancellation is preliminary optimization and unneeded complexity (which is one of the biggest drawbacks of Coroutines, for example).
          Therefore, all in all, what you say kind of demonstrates why I wrote this article to begin with. There is too much emphasis on frameworks among Android devs.

          Reply
  7. “For example, in my estimation, many developers who’d implement this algorithm using Coroutines would not use SupervisorJob” – actually in that particular implementation there is no difference if it uses Job or SupervisorJob because only one coroutine is launched on this scope

    Reply
    • Hello Roman,
      Please correct me if I’m wrong, but I thought that if you don’t use SupervisorJob in this case, then, if one of the async blocks fail, it’ll cause cancellation of the parent, in which case the retries will be skipped.

      Reply
      • [code language=”java”]
        fun test1() {
        val scope = CoroutineScope(Job() + Dispatchers.IO)

        scope.launch {
        repeat(2) { i -&gt;
        try {
        val result = async { if (i == 0) throw Exception("Fail") else 42 }.await()
        println(result)
        } catch (t: Throwable) {
        println("catch $t")
        }
        }
        }
        }

        fun test2() {
        val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

        scope.launch {
        repeat(2) { i -&gt;
        try {
        val result = async { if (i == 0) throw Exception("Fail") else 42 }.await()
        println(result)
        } catch (t: Throwable) {
        println("catch $t")
        }
        }
        }
        }

        fun test3() {
        val scope = CoroutineScope(Job() + Dispatchers.IO)

        scope.launch {
        repeat(2) { i -&gt;
        try {
        val result = supervisorScope {
        async { if (i == 0) throw Exception("Fail") else 42 }.await()
        }
        println(result)
        } catch (t: Throwable) {
        println("catch $t")
        }
        }
        }
        }
        [/code]

        Output of the test1 and test2 is the same:
        catch java.lang.Exception: Fail
        catch kotlinx.coroutines.JobCancellationException: Parent job is Cancelling;…

        Output of the test3 is:
        catch java.lang.Exception: Fail
        42

        But if the code is like this:
        [code language=”java”]
        GlobalScope.launch {
        repeat(2) { i -&gt;
        try {
        val a = scope.async {
        if (i == 0) throw Exception("Fail") else 42
        }.await()

        println(a)
        } catch (t: Throwable) {
        println("catch $t")
        }
        }
        }
        [/code]

        here is a huge difference in usage of Job and SupervisorJob(I think as you expected)

        Overall difference is that in the second case asyncs are siblings called on scope and in the first case asyncs are child coroutines on the parent coroutine, which is created on scope

        Reply
        • Crap. Each time I think I finally “got it” with Coroutines, reality proves me wrong. I’ll need to dig deeper once again to understand why it works this way.
          I also noticed that if you just add SupervisorJob to async, it also works reliably.
          Anyway, this example just proves my point so much more: if you want to write reliable concurrent code, frameworks aren’t your friends. One could argue: “just learn them properly”, but that’s just Stockholm syndrome at this point. The author of that code obviously did not realize that it’s buggy either. I personally invested into Coroutines so much time that there is no way it’ll ever pay off. And mind you, I didn’t start from scratch.
          So much waste…

          Reply
  8. Hi Vasiliy, good job with questioning authorities! I really respect everyone who does that in non-trolling way and offers an alternative to start a discussion – I love your post about MVC (Netflix inspired) and please continue what you’re doing!

    Picking up a real-life use case was really good idea and I totally agree with a comment about RxJava learning curve/complexity understanding. I have committed a lot of time to do that and tried to encourage my team to do that (and I failed – people have more interesting stuff to do in their spare time – sic! 😉 as well as I tried to mentor it (but I have failed miserably here as well – cause even after weeks of practising, reading and learning I still didn’t feel like I can verify anyone’s code by just reading it!). So from my perspective, RxJava is a perfect tool to use among perfect developers – who are super excited about their work and spend their own time learning and exploring their tools and techniques. But it isn’t a reality I have ever had a chance to work in. Moreover, “most” of developers have less than 5 years of experience (StackOverflow survey 2015-2018, it has changed slightly in 2019), and I really do think it isn’t a good idea to assume that picking up RxJava is a better idea than sticking to basics as you just presented!

    And just a cherry on top this topic’s pie – Danny Preussler posted on Twitter:
    “While we are discussing RxJava vs Flow, Google uses Java with Guava’s ListenableFuture in their Corona reference app, stay tuned, let’s see what’s next – https://github.com/google/exposure-notifications-android/blob/fbba9296bda9ae3b2c02d2bfd7590c742963875e/app/src/main/java/com/google/android/apps/exposurenotification/activities/ExposureFragment.java#L142” – https://twitter.com/PreusslerBerlin/status/1259895427555622912

    Reply
  9. I am currently getting started with Camera2 APIs and the number of callbacks which comes along with it doesnot really make the code look concise. Add to it you can specify thread on which each callbacks will be invoked. Every time reading / debugging a callback I have to keep this info at the back of my mind.

    Compared to this coroutines look a more readable API.

    A good article nevertheless. Just that from my past experience even this code with language threading primitives can be difficult to maintain if it changes hand or undergoes feature modifications.

    Reply
  10. I’m surprised nobody’s brought up the fact that the way your code is written makes it virtually impossible to unit test. Both Coroutines and RxJava take a declarative approach to concurrency by introducing the concepts of Dispatchers and Schedulers. Want your code to run on a managed pool of threads and call back on the Android’s UI thread? Just inject the right Dispatchers/Schedulers and you’re good to go. And more importantly, this approach allows you to execute your code synchronously on the test runner’s thread, which makes reasoning about the data flow and the expected output trivial. Achieving something like this with the tools you’ve used in this article is possible, but would likely increase the complexity tenfold. You can definitely make an argument that this code doesn’t require unit tests, and you’ve mentioned that you’ve tested it manually and managed to uncover some bugs, but relying on manual testing alone is slow and error-prone. Testability is very important to me personally, so if I had to write concurrent code using primitives only I’d likely introduce some abstractions to make my design testable, and those abstractions will likely be very similar to what concurrency libraries already provide, so I’ll essentially be reinventing the wheel.

    Reply
    • Egor, you’re pushing at an open door here 🙂
      First of all, nobody needs to bring this fact up because it’s in the article:

      So, there is definitely a place for frameworks in your toolbox (especially if you want to write unit tests for concurrent code)

      Second, as I said, the real implementation used ThreadPoster lib, not bare threads. If you’ll bother to check it out, you’ll find out that unit-testability is its main feature.

      Reply
      • Ah, apologies Vasiliy, I must’ve missed it – my bad!

        Should mention though that it’s not about writing unit tests for concurrent code (which is exactly what you want to avoid), it’s about writing tests for the business logic. It has to run concurrently in production code to be performant but under test you want it to run synchronously – this is where using bare threads falls short!

        The fact that your real implementation does use a library for more controlled threading makes your argument for using primitives much weaker, in my opinion. Anyway, I completely agree that having a good knowledge of concurrency primitives is mandatory, even if you rely on higher-level abstractions in your application code, so thanks for bringing this topic up.

        Reply
  11. Vasiliy, thank you for writing this post and sharing your point of view. I both agree and disagree with you. I do not prefer RxJava myself when I have option to choose, although I have to use it here and there in existing projects. May be it’s just me, but sometimes my mind can’t properly reason what I’m seeing in the case of multiple chained RxJava operators, although in theory I know what they are supposed to do individually.

    Your approach is more easily verifiable if the developer is aware of fundamental Java concurrency primitives.

    My favorite variant would be probably Kotlin Coroutines as it keeps the imperative style I’m more fluent with compared to declarative magic for the uninitiated developer in RxJava. At the same time it hides away some of the synchronization details. With Coroutines I can follow logically the code and understand the sequence of events better. Of course one needs to know good the cancellation rules and exception handling, which are not trivial.

    I have one question: I can totally understand why you are using a CountDownLatch, but why do you need the two AtomicReferences wrappers for both files? You do not access their value concurrently, because you are waiting for both threads that handle files A and B to finish first by waiting on countDownLatch.await(). Then you get the values from both AtomicReferences when there’s no concurrency involved anymore. A simple volatile variables for File A and File B should suffice in this case. What am I missing?

    Reply
    • Hello Georgi,
      That’s an excellent question.
      Looking at this code right now, there is no need for either atomic variables or even just volatiles. The reason why additional thread-safety isn’t needed here is because there are two happens-before edges in this code already: 1) between a thread that starts a new thread, and any subsequent action in the newly started thread and 2) between calls to countDownLatch.countDown() and return of countDownLatch.await().
      I copy-pasted the initial version of this code from the project, so maybe there was some additional logic there. It’s also possible that these additional thread-safety mechanisms weren’t required there either. It’s not uncommon for me to use defensive programming and plant extra safety measures into my code for the future.

      Reply

Leave a Comment