The previous week, we shared data among threads to solve the now well-known word frequencies problem. The very next day, I joined Hazelcast as a Developer Advocate. Therefore, I thought it would be extremely interesting to use Hazelcast In-Memory-Data-Grid to improve on the previous solution.
This is the 18th post in the Exercises in Programming Style focus series.Other posts include:
- Introducing Exercises in Programming Style
- Exercises in Programming Style, stacking things up
- Exercises in Programming Style, Kwisatz Haderach-style
- Exercises in Programming Style, recursion
- Exercises in Programming Style with higher-order functions
- Composing Exercises in Programming Style
- Exercises in Programming Style, back to Object-Oriented Programming
- Exercises in Programming Style: maps are objects too
- Exercises in Programming Style: Event-Driven Programming
- Exercises in Programming Style and the Event Bus
- Reflecting over Exercises in Programming Style
- Exercises in Aspect-Oriented Programming Style
- Exercises in Programming Style: FP & I/O
- Exercises in Relational Database Style
- Exercises in Programming Style: spreadsheets
- Exercises in Concurrent Programming Style
- Exercises in Programming Style: sharing data among threads
- Exercises in Programming Style with Hazelcast (this post)
- Exercises in MapReduce Style
- Conclusion of Exercises in Programming Style
What’s an In-Memory-Data-Grid?
According to Wikipedia, a Data Grid is:
[…] an architecture or set of services that gives individuals or groups of users the ability to access, modify and transfer extremely large amounts of geographically distributed data for research purposes
Let’s drop the research purposes. The important bits of the above definition is that a Data Grid contains large amounts of data and is distributed among nodes.
The main issue with such architectures is that it requires the following steps:
- Read data from disk
- Load it into memory (RAM)
- Process it there
- And finally write it back on disk
While the processing part is actually quite fast because it happens in-memory, the reading/writing part is not. Hence, a data grid’s overall performance is bound by the time it takes to access and read from/write to the storage. To better understand the issue, here are the generally accepted metrics at the time of this writing:
Storage | Access time | Read/write speed |
---|---|---|
Random Access Memory (RAM) |
~5 ns |
> 10 GB/s |
Solid-State Disk (SSD) |
< 1 millisecond |
200 MB/s to +3.5 GB/s |
Hard Drive Disk (HDD) |
3-10 milliseconds |
< 200 MB/s |
If the disks parts could be removed, the data grid would be as fast as the memory in which it’s executed. That’s the whole point of an IMDG: trading off persistence for high speed and low latency.
Setting up Hazelcast IMDG
Hazelcast is able to run in two different modes:
- Client-server
- Embedded
The embedded mode is a breeze to set up. Just adding the Hazelcast core JAR to the application’s classpath makes it possible to start an embedded instance that will automatically join any other Hazelcast cluster. Note that while broadcasting is the default, it’s possible to be more selective by configuring which clusters can be joined.
With Maven, starting Hazelcast embedded is just a matter of adding a single dependency to the POM:
<?xml version="1.0" encoding="UTF-8"?>
<project>
<!-- ... -->
<dependencies>
<!-- Other dependencies -->
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
<version>3.12.2</version>
</dependency>
<dependencies>
</project>
At that point, it’s possible to create the Hazelcast instance, and related data structures.
Distributed data structures and IMap
Hazelcast is all about distributed data structures: lists, sets, queues, maps, etc.
Among them, the IMap
interface is of particular interest.
In essence, one can think of IMap
as a distributed version of Java’s ConcurrentMap
.
In fact, IMap
inherits from ConcurrentMap
.
In addition to the standard concurrent map’s features, IMap
offers the following:
- Transformation of all or only specific entries
- A rich event-listener model around all standard operations
- Statistics about the amp
- A locking feature on specific entries
- And much more…
The design of Hazelcast’s API makes it so that one doesn’t instantiate distributed data structures, such as IMap
.
The entry point to get a handle on such data structures is through an Hazelcast instance.
Here’s a simplified summary of the API:
With that, it’s straightforward to create the Hazelcast instance and get handles on the map:
val hazelcastInstance = Hazelcast.newHazelcastInstance() (1)
val freqSpace = hazelcastInstance.getMap<String, Int>("map") (2)
1 | Create a new instance |
2 | Get the handle on a distributed map |
Migrating the code to use Hazelcast
There’s still only thing that needs to be updated to migrate to Hazelcast. One needs to get the executor service from the Hazelcast instance, instead of the one from the standard Java API. This translates into the following code:
val executorService = hazelcastInstance.getExecutorService("executorService")
There’s still one not-so-minor caveat.
Since the map might be distributed among different JVMs - and that’s the whole point of distributed systems - stored objects need to be serializable:
in Java, this means they need to implement the Serializable
interface.
Given the current state of the code, it’s not the case:
neither the top-level Kotlin function nor the Callable
wrapper around it are serializable.
Hence, a full-fledged class implementing the necessary class hierarchy is required:
class ProcessWords(val words: List<String>) :
Callable<Unit>, Serializable, HazelcastInstanceAware { (1) (2)
private lateinit var hazelcastInstance: HazelcastInstance
override fun setHazelcastInstance(hazelcastInstance: HazelcastInstance) { (1)
this.hazelcastInstance = hazelcastInstance
}
override fun call() { (2)
val frequencies = hazelcastInstance.getMap<String, Int>("map") (3)
val stopWords = read("stop_words.txt").flatMap { it.split(",") }
words.forEach {
if (!stopWords.contains(it))
frequencies.merge(it, 1) { count, value ->
count + value
}
}
}
}
val callables = words.chunked(words.size / 4).map { ProcessWords(it) } (2)
executorService.invokeAll(callables) (4)
1 | HazelcastInstanceAware allows to inject… an Hazelcast instance.
The IExecutorService will inject it when executing the task. |
2 | The previous version a runnable.
As explained above, it cannot be wrapped in a Callable using the standard Executors class.
The Runnable.run() function needs to be changed to Callable.call() , but since there’s no need to return a result, the bound type can safely be Unit . |
3 | Instead of using a shared concurrent hash map, get one from the injected Hazelcast instance |
4 | There’s no need to change anything! As in the previous version, it will block until all threads have terminated |
At this point, the code can be scaled by running on multiple JVMs.
Granted, we still need to correctly handle the word input.
That would just require the usage of an IList
.
Forewarned is forearmed
As with other posts in this series, the above code is just an exercise that aims to showcase how easy it is to:
This is by no way a showcase of the performance one can gain by using Hazelcast, especially given the limited size of the test data sets.
Just to begin with, if you run tests, you’ll notice that the initialization of Hazelcast takes some time when the JVM starts.
Then, the Perhaps such an optimization will be the subject of a future post. |
Conclusion
Once a program correctly handles multi-threading, it’s very straightforward to migrate to Hazelcast.
Its API maps exactly to the java.util.concurrent
one!
While the abstraction is the same doesn’t mean the implementation is. One needs to understand the constraints brought by the usage of a distributed data structure.