In the last episode of Exercises in Programming Style, we solved the word frequency problem with the Hazelcast library. This time, we are going to use the MapReduce approach for that.
This is the 19th post in the Exercises in Programming Style focus series.Other posts include:
- Introducing Exercises in Programming Style
- Exercises in Programming Style, stacking things up
- Exercises in Programming Style, Kwisatz Haderach-style
- Exercises in Programming Style, recursion
- Exercises in Programming Style with higher-order functions
- Composing Exercises in Programming Style
- Exercises in Programming Style, back to Object-Oriented Programming
- Exercises in Programming Style: maps are objects too
- Exercises in Programming Style: Event-Driven Programming
- Exercises in Programming Style and the Event Bus
- Reflecting over Exercises in Programming Style
- Exercises in Aspect-Oriented Programming Style
- Exercises in Programming Style: FP & I/O
- Exercises in Relational Database Style
- Exercises in Programming Style: spreadsheets
- Exercises in Concurrent Programming Style
- Exercises in Programming Style: sharing data among threads
- Exercises in Programming Style with Hazelcast
- Exercises in MapReduce Style (this post)
- Conclusion of Exercises in Programming Style
MapReduce in a few words
MapReduce is a process that consists of two steps:
- Map: Performs transformations, filtering and sorting into different "queues"
- Reduce: Aggregates the content of the "queues" into a result
The biggest benefit of MapReduce is that both map and reduce steps can be both potentially executed in parallel. This makes it a great fit to handle large data sets.
The following diagram helps visualize the overall flow:
While MapReduce make parallelism possible, it’s not mandatory to implement it: parallelism is only an option. It’s used neither in the original Python code, nor in the Kotlin port.
Migrating to Kotlin
The reference code uses the Python yield
keyword:
with it, functions don’t return simple collections of items, but streams instead.
While the code looks quite the same, what happens under the hood is different.
Performance improves when compared to standard collections, in proportion to the number of items associated within the stream.
While in Java, streams are implemented using Stream
, while in Kotlin, they are using Sequence
.
To do that, Kotlin provides the Iterable<T>.asSequence()
extension function.
For a more in-depth understanding of collections and sequences in Kotlin, and how they relate to Java streams, please check this earlier post.
Applied to the problem at hand, the processing pipeline looks like the following:
This translates into the following code:
fun run(filename: String): Map<String, Int> = read(filename)
.asSequence()
.chunked(200)
.map(::splitWords)
.reduce { acc, pair -> countWords(acc, pair) }
.sortedBy { it.second }
.takeLast(25)
.toMap()
An overview of reduction
The map()
function has been amply used in previous posts of this series.
Besides, I believe most developers are already familiar with it, in one way or another.
The reduce()
function, on the other hand, is much less understood, even if it’s used as much.
For example, in Java, streams' terminal operations are reduction functions:
those include sum()
, average()
, max()
but also collect()
!
From collect()
Javadoc:
Performs a mutable reduction operation on the elements of this stream using a Collector
.
Collect functions are only specializations of the more general reduce()
functions provided by Stream
:
As seen in the diagram, reduction is made available in three "flavors":
- The first flavor accepts a single
BinaryOperator
parameter. ABinaryOperator<T>
accepts two parameters of typeT
, and combines them together to return aT
. Note that because the starting stream might be empty, the method returns anOptional<T>
.For example, the following snippet sums the items in the stream:
Stream.of(1, 2, 3, 4, 5) .reduce { a, b -> a + b }
- The second flavor is similar to the first one, but requires a starting value.
For this reason, the return type is not an
Optional<T>
but aT
. If the stream is empty, the result will be the starting value. If it isn’t, it should be the same as with the previous flavor, provided the starting value is the neutral element for the reduction function.Stream.of(1, 2, 3, 4, 5) .reduce { a, b -> a + b }
- The third and last flavor additionally allows to change the returned type, at the cost of more complexity
In Kotlin, reduction is quite similar what exists in Java.
There’s a slight difference, though:
the function that accepts a starting value is called fold()
, the one that doesn’t is called reduce()
.
In addition, both of them also provide another signature that provides an index for the current item.
Signature | Starting value | Indexed |
---|---|---|
|
||
|
||
|
||
|
The no-starting value function above working on empty sequences will throw an exception at runtime.
This is one of the few places where I believe the Kotlin API pales compared to Java’s.
I’ve raised the point on Kotlin’s Slack, there might be a new xxxOrNull() coming in one of the future version of the stdlib.
|
The implemented functions in Kotlin
The mapping function just creates pairs of words with count of 1 from a list of lines:
fun splitWords(lines: Iterable<String>): Iterable<Pair<String, Int>> {
fun Iterable<String>.scan() = this
.flatMap { it.toLowerCase().split("\\W|_".toRegex()) }
.filter { it.isNotBlank() && it.length >= 2 }
fun Iterable<String>.removeStopWords(): Iterable<String> {
val stopWords = read("stop_words.txt")
.flatMap { it.split(",") }
return this - stopWords
}
return lines.scan()
.removeStopWords()
.map { it to 1 }
}
I believe this is pretty straightforward. The reducing function combines two iterables of word frequencies together to create a new iterable.
fun countWords(frequencies1: Iterable<Pair<String, Int>>,
frequencies2: Iterable<Pair<String, Int>>): Iterable<Pair<String, Int>> {
val results = mutableMapOf<String, Int>()
frequencies1.forEach {
results.merge(it.first, it.second) {
count, value -> count + value
}
}
frequencies2.forEach {
results.merge(it.first, it.second) {
count, value -> count + value
}
}
return results.toList()
}
It’s crude, but it works. Sadly, I found no other elegant way to achieve merging. Proposals welcome!
A little hiccup along the way
That could be the end, but it’s unfortunately not. Most tests succeed, for the smallest data set - when the frequency is one for each word, and for the largest one - pride and prejudice. Yet, it fails for the following sample:
White tigers live mostly in India Wild lions live mostly in Africa
The reason behind this is that the mapping function assigns a default frequency of 1, but read lines are separated into chunks of words of size 200 (see above).
Since the sample is not big enough, the countWords()
reducing function is not applied.
Hence, pairs of word/1 are not merged together, and the resulting map has a frequency of 1 for each word.
In order to return the correct result, one should first check the sample size before sending it through the MapReduce process.
Conclusion
MapReduce is nothing mind blowing. Most developers already use it, without knowing. The only challenge is to code the correct mapping and reducing functions.
Also, one should be extra careful about the input size, and how it’s processed, because if it’s too small, the reducing function won’t be necessary and its code won’t be executed. In most scenarios, however, this shouldn’t be an issue as MapReduce is targeted at large data sets.