A long time ago, one had to manually start new threads when wanting to run code concurrently in Java. Not only was this hard to write, it was easy to introduce bugs that were hard to find. Testing, reading and maintaining such code was no walk in the park either. Since that time - and with a little incentive coming from multi-core machines, the Java API has evolved to make developing concurrent code easier. Meanwhile, alternative JVM languages also have their opinion about helping developers write such code. In this post, I’ll compare how it’s implemented in Java and Kotlin.
To keep the post focused, I deliberately left out performance to write about code readability.
About the use-case
The use-case is not very original. We need to call different web services. The naive solution would be to call them sequentially, one after the other, and collect the result of each of them. In that case, the overall call time would be the sum of the call time of each service. An easy improvement is to call them in parallel, and wait for the last one to finish. Thus, performance improves from linear to constant - or for the more mathematically inclined, from o(n) to o(1).
To simulate calling a web service with a delay, let’s use the following code (in Kotlin because this is so much less verbose):
class DummyService(private val name: String) {
private val random = SecureRandom()
val content: ContentDuration
get() {
val duration = random.nextInt(5000)
Thread.sleep(duration.toLong())
return ContentDuration(name, duration)
}
}
data class ContentDuration(val content: String, val duration: Int)
The Java Future API
Java offers a whole class hierarchy to handle concurrent calls. It’s based on the following classes:
- Callable
-
A
Callable
is a "task that returns a result". From another view point, it’s similar to a function that takes no parameter and returns this result. - Future
-
A
Future
is "the result of an asynchronous computation". Also, "The result can only be retrieved using methodget
when the computation has completed, blocking if necessary until it is ready". In other words, it represents a wrapper around a value, where this value is the outcome of a calculation. - Executor Service
-
An
ExecutorService
"provides methods to manage termination and methods that can produce aFuture
for tracking progress of one or more asynchronous tasks". It is the entry point into concurrent handling code in Java. Implementations of this interface - as well are more specialized ones, can be obtained through static methods in theExecutors
class.
This is summarized in the following class diagram:
Calling our services using the concurrent package is a 2-steps process.
Creating a collection of callables
First, there need to be a collection of Callable
to pass to the executor service.
This is how it might go:
- From a stream of service names
- For each service name, create a new dummy service initialized with the string
- For every service, return the service’s
getContent()
method reference as aCallable
. This works because the method signature, matchesCallable.call()
andCallable
is a functional interface.
This is the preparation phase. It translates into the following code:
List<Callable<ContentDuration>> callables = Stream.of("Service A", "Service B", "Service C")
.map(DummyService::new)
.map(service -> (Callable<ContentDuration>) service::getContent)
.collect(Collectors.toList());
Processing the callables
Once the list has been prepared, it’s time for the ExecutorService
to process it aka the "real work".
- Create a new executor service - any will do
- Pass the list of
Callable
to the executor service, and stream the resulting list ofFuture
- For every future,
- Either return the result
- Or handle the exception
The following snippet is a possible implementation:
ExecutorService executor = Executors.newWorkStealingPool();
List<ContentDuration> results = executor.invokeAll(callables).stream()
.map(future -> {
try { return future.get(); }
catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); }
}).collect(Collectors.toList());
The Future API, but in Kotlin
Let’s face it, while Java makes it possible to write concurrent code, reading and maintaining it is not that easy, mainly due to:
- Going back and forth between collections and streams
- Handling checked exception in lambdas
- Casting explicitly
Just porting the above code to Kotlin removes those limitations and makes it more straightforward:
var callables: List<Callable<ContentDuration>> = arrayOf("Service A", "Service B", "Service C")
.map { DummyService(it) }
.map { Callable<ContentDuration> { it.content } }
val executor = Executors.newWorkStealingPool()
val results = executor.invokeAll(callables).map { it.get() }
Kotlin Coroutines
With version 1.1 of Kotlin comes a new experimental feature called coroutines.
Basically, coroutines are computations that can be suspended without blocking a thread. Blocking threads is often expensive, especially under high load […]. Coroutine suspension is almost free, on the other hand. No context switch or any other involvement of the OS is required.
The leading design principle behind coroutines is that they must feel like sequential code but run like concurrent code. They are based on the following (simplified) class diagram:
Nothing beats the code itself though. Let’s implement the same as above, but with coroutines in Kotlin instead of Java futures.
As a pre-step, let’s just extend the service to ease further processing by adding a new computed property wrapped around content
, of type Deferred
:
val DummyService.asyncContent: Deferred<ContentDuration>
get() = async(CommonPool) { content }
This is standard Kotlin extension property code, but notice the CommonPool
parameter.
This is the magic that makes the code run concurrent.
It’s a companion object (i.e. a singleton) that uses a multi-fallback algorithm to get an ExecutorService
instance.
Now, on to the code flow proper:
- Coroutines are handled inside a block. Declare a variable list outside the block to be assigned inside it
- Open the synchronization block
- Create the array of service names
- For each name, create a service and return it
- For each service, get its async content (declared above) and return it
- For each deferred, get the result and return it
// Variable must be initialized or the compiler complains
// And the variable cannot be used afterwards
var results = runBlocking {
arrayOf("Service A", "Service B", "Service C")
.map { DummyService(it) }
.map { it.asyncContent }
.map { it.await() }
}
Takeaways
The Future API is not so much a problem than the Java language itself. As soon as the code is translated into Kotlin, readability improves a lot. Yet, having to create a collection to pass to the executor service breaks the nice functional pipeline.
On the coroutines side, remember that they are still experimental. Despite that, code does look sequential - and is thus more readable, and behaves parallel.