Last week, we solved the word count problem using the Actor model: objects running on different threads and communicating through messages. This week, we will drop objects, and use data structures that are shared among the threads: such a shared structure is called data space in the book.
This is the 17th post in the Exercises in Programming Style focus series.Other posts include:
- Introducing Exercises in Programming Style
- Exercises in Programming Style, stacking things up
- Exercises in Programming Style, Kwisatz Haderach-style
- Exercises in Programming Style, recursion
- Exercises in Programming Style with higher-order functions
- Composing Exercises in Programming Style
- Exercises in Programming Style, back to Object-Oriented Programming
- Exercises in Programming Style: maps are objects too
- Exercises in Programming Style: Event-Driven Programming
- Exercises in Programming Style and the Event Bus
- Reflecting over Exercises in Programming Style
- Exercises in Aspect-Oriented Programming Style
- Exercises in Programming Style: FP & I/O
- Exercises in Relational Database Style
- Exercises in Programming Style: spreadsheets
- Exercises in Concurrent Programming Style
- Exercises in Programming Style: sharing data among threads (this post)
- Exercises in Programming Style with Hazelcast
- Exercises in MapReduce Style
- Conclusion of Exercises in Programming Style
Modelling the data space
The original Python code uses two dedicated data spaces:
- To store the words read from the source file,
- To store the word frequencies
The Python code uses queues to model the data spaces. Hence, it makes sense to do the same in Kotlin. However, the Java API offers lots of choices, as this abridged diagram shows:
Let’s describe the queue and the blocking queue.
Queue
In Java, the Queue
interface has no definite semantics regardering the ordering of elements:
it can be FIFO, LIFO or something completely different, such as based on a priority attribute.
Queues offer two different ways to add, check and remove elements: one throws an exception if the operation fails, the other returns a specific value. For example, removing an element might fail if the queue is empty.
Feature | Exception | Special value |
---|---|---|
Checks an element |
|
|
Adds an element |
|
|
Removes an element |
|
|
BlockingQueue
A blocking queue is:
A Queue that additionally supports operations that wait for the queue to become non-empty when retrieving an element, and wait for space to become available in the queue when storing an element.
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html
It adds two different ways to achieve the above operations: a blocking one and one with timeout.
Feature | Blocking | Timeout |
---|---|---|
Adds |
|
|
Removes |
|
|
There are two out-of-the-box implementations of blocking queue of interest:
ArrayBlockingQueue
backed by a simple arrayLinkedBlockingQueue
using nodes linked together
Porting the Python code
The straightforward porting of the original Python code relies on the same design: mutable data structures and timeout.
The implementation of the main function goes like this:
fun run(filename: String): Map<String, Int> {
val freqSpace = LinkedBlockingQueue<Map<String, Int>>()
val wordSpace = read(filename)
.flatMap { it.toLowerCase().split("\\W|_".toRegex()) }
.filter { it.isNotBlank() && it.length >= 2 }
.toBlockingQueue()
val count = 4
val executorService = Executors.newFixedThreadPool(count)
val callables = IntRange(1, 4).map { _ ->
{ processWords(wordSpace, freqSpace) } (1)
}.map { Executors.callable(it) } (2)
executorService.invokeAll(callables) (3)
val frequencies = mutableMapOf<String, Int>() (4)
while (freqSpace.isNotEmpty()) { (5)
val partial = freqSpace.poll(1, TimeUnit.SECONDS) (5) (6)
partial?.entries?.forEach { (5)
frequencies.merge(it.key, it.value) { (5)
count, value -> count + value (5)
}
}
}
return frequencies
.toList()
.sortedByDescending { it.second }
.take(25)
.toMap()
}
fun <E> List<E>.toBlockingQueue() = LinkedBlockingDeque<E>(this)
1 | Wrap the processWord() function in a lambda |
2 | Transform a Callable -compatible lambda to a Runnable |
3 | Start to run the threads, waiting for the last one to finish |
4 | Mutable map to store the final results |
5 | Empty the queue and collect results in the map |
6 | Remove elements from the queue using the timeout flavor |
fun processWords(words: BlockingQueue<String>,
frequencies: BlockingQueue<Map<String, Int>>) {
val stopWords = read("stop_words.txt")
.flatMap { it.split(",") }
val wordFreq = mutableMapOf<String, Int>()
while (words.isNotEmpty()) { (1)
val word = words.poll(1, TimeUnit.SECONDS) (1) (2)
if (word != null && !stopWords.contains(word)) (1)
wordFreq.merge(word, 1) { (1)
count, value -> count + value (1)
}
}
frequencies.put(wordFreq)
}
1 | As above, empty the queue and collect the results in a map |
2 | Likewise, remove elements using a timeout |
Introducing concurrent hash maps
The above code, just as the original Python solution, has an issue: it uses a queue to store maps of partial word frequencies. Then, those partial results need to be combined together to get the final word frequencies. Why not compute the count in each thread?
We need a thread-safe map: for this, the Java API offers the concurrent hash map. It offers or overrides methods that are thread-safe.
With concurrent hash maps, the processWords()
function can be updated to:
fun processWords(words: BlockingQueue<String>,
frequencies: ConcurrentMap<String, Int>) {
val stopWords = read("stop_words.txt")
.flatMap { it.split(",") }
while (words.isNotEmpty()) {
val word = words.poll(1, TimeUnit.SECONDS)
if (word != null && !stopWords.contains(word))
frequencies.merge(word, 1) {
count, value -> count + value
}
}
}
Since frequencies are now combined in the previous function, the run()
function can be simplified:
val executorService = Executors.newFixedThreadPool(count)
val callables = IntRange(1, 4).map { _ ->
{ processWords(wordSpace, freqSpace) }
}.map { Executors.callable(it) }
executorService.invokeAll(callables)
return freqSpace
.toList()
.sortedByDescending { it.second }
.take(25)
.toMap()
The share-nothing benefit
Sharing data among threads requires to synchronize thread access. In turn, this requires locks and impacts performances. Concurrency experts now this: the best way to avoid concurrency issues is to avoid working on shared data when possible. In our case, the words space is shared among all threads for no good reason.
To avoid locks, we could split the words into lists, and send a list to a each thread to be processed separately:
val count = 4
val callables = words.chunked(words.size / count) (1)
.map { Runnable { processWords(it, freqSpace) }} (2)
.map { Executors.callable(it) } (3)
val executorService = Executors.newFixedThreadPool(count) (3)
executorService.invokeAll(callables) (3)
1 | Create chunks of nearly-equivalent size out of the overall words list |
2 | Map each chunk to a Runnable .
Note that explicitly assigning the lambda to a Runnable is not strictly necessary, it improves readability |
3 | The rest of the code is as before |
Conclusion
Concurrent code is hard to write, hard to reason about, and bugs are more likely to occur. Java provides thread-safe dedicated data structures to lessen the burden, even a bit.
Finally, the best way to completely avoid those issues is to completely avoid sharing data. Be sure to have that as your fist option when possible.