Java’s Synchronized Keyword in Three Minutes

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

The synchronized keyword can be used to ensure that only one thread at a time executes a particular section of code.
This is a simple way to prevent race conditions, which occur when several threads change shared data at the same time in a way that leads to incorrect results.
With synchronized either entire methods or selected blocks can be single-threaded.

This article requires basic knowledge of Java threads and race conditions.

Continue reading

The Dangers of Race Conditions in Five Minutes

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

A race condition is an undesired property of multithreaded code.
It expresses that the program’s outcome depends on a particular order of operations but that the underlying platform (in the case of Java, the JVM) does not guarantee that order.
As a consequence the outcome is often fluctuating across runs as it depends on how exactly operations from different threads interleave.
In Java, race conditions occur most often when multiple threads share and mutate the same object.

The only prerequisite for this article is a basic knowledge of threads.

Continue reading

Java Thread Class in Five Minutes

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

In a running Java program, all code is executed in threads and within a thread everything happens sequentially, one instruction after another.
When Java (or rather the JVM) launches, it creates one thread for the main method to be executed in.
From there, new threads can be created to execute code in parallel to the main one.
The most basic way to do that is to use the Thread class.

This article does not require any knowledge of multithreaded programming, but you need to be familiar with core Java concepts such as classes and interfaces.

Continue reading

Beyond POJOs – Ten More Ways to Reduce Boilerplate with Lombok

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

Lombok is a great library and its main selling point is how it declutters POJO definitions.
But it is not limited to that use case!
In this article, I will show you six stable and four experimental Lombok features that can make your Java code even cleaner.
They cover many different topics, from logging to accessors and from null safety to utility classes.
But they all have one thing in common: reducing boilerplate to make code easier to read and more expressive.

Continue reading

Distributed graphs processing with Pregel

Graphs processing is an important part of data analysis in many domains. But graphs processing is tricky may be tricky since general purpose distributed computing tools are not suited for graphs processing.

It is not surprising that an important advancement in the area of distributed graphs processing came from Google that has to process one of the biggest graphs: the Webgraph. Engineers in Google wrote a seminal paper where they described a new system for distributed graphs processing they called Pregel.

In this article, I will explain how Pregel works, and demonstrate how to implement algorithms using Pregel using API from Apache Flink.

If you are not familiar with Graph’s API in Apache Flink, you can read about it in my previous article.

How Pregel works

The basic idea of Pregel is that we implement an algorithm that is executed on every vertex of a graph. This algorithm works in iterations and on every iteration it processes incoming messages for a vertex and can update vertex’s value and send messages to other vertices.

Pregel stops algorithm execution when no messages are sent by any vertex during one iteration.

To implement an algorithm using Pregel, we need to implement two functions:

  • Compute function – this function is executed on every vertex once on every iteration. It receives all messages from neighbour vertices and can optionally send messages to other vertices or update vertex value. Messages send by this function will be received on the next iteration.
  • Combiner function – this function receives all messages sent to a particular vertex and combine these messages into a fewer messages. It is used to reduce an amount of data that should be processed on every iteration.

Now let’s take a look at how we can implement algorithms in Pregel using Flink’s API.

Pregel algorithm in Flink

The main class that we need to implement is called ComputeFunction. Here I’ve provided main methods in this class that we will use in this article.

public abstract class ComputeFunction<K, VV, EV, Message> implements Serializable {
	public abstract void compute(Vertex<K, VV> vertex, MessageIterator<Message> messages) throws Exception;

	public final Iterable<Edge<K, EV>> getEdges() {...}

    public final void sendMessageToAllNeighbors(Message m) {...}

    public final void sendMessageTo(K target, Message m) {...}

    public final int getSuperstepNumber() {...}
    public final void setNewVertexValue(VV newValue) {...}

The only method that we should implement is the compute method. It receives a vertex on which it should operate and an iterator of messages that were sent to this vertex during the previous iteration.

The compute method does not return anything and should only change a state of the graph through other methods in this class. Three most important methods here are sendMessageToAllNeighbors that can be used to send a message to all neighbour vertices, the sendMessageTo that sends a message to a single vertex, and the setNewVertexValue that updates the value of the current vertex. The getEdges method is used to get all out-going edges for the current vertex.

The last method that we will cover here is getSuperstepNumber that simply returns current Pregel iteration’s number. We will use it later to send an initial batch of messages on the first iteration that will start the graph processing.

Gelly also provides the MessageCombiner class that we need to inherit to implement custom combiners. In this class, we only need to implement the combineMessages method that receives all messages that were sent from one vertex to a particular target. To define what messages should be delivered, we need to use the sendCombinedMessage to send combined messages to a target vertex.

public abstract class MessageCombiner<K, Message> implements Serializable {
	public abstract void combineMessages(MessageIterator<Message> messages) throws Exception;
	public final void sendCombinedMessage(Message combinedMessage) {...}

Compute functions and message combiners are not supposed to be used by users of our algorithm. Instead in Flink we need to implement the GraphAlgorithm interface that receives a Graph instance, processes it and returns the algorithm’s output.

public interface GraphAlgorithm<K, VV, EV, T> {
	T run(Graph<K, VV, EV> input) throws Exception;

This is used because algorithms based on Pregel also need some pre- and post-processing of graphs and this logic is usually put into implementations of the GraphAlgorithm class.

Implement Single Source Shortest Path Algorithm

Now we will apply our knowledge to implement a Single Source Shortest Path algorithm using Pregel. This algorithm will receive a source vertex in a graph and calculate the length of the shortest path to all other vertices in the graph from it.

Algorithm design

Before we start implementing the algorithm, let’s discuss how it is going to work. Let’s say we have a graph with the following intermediate state (every vertex contains the shortest path to it found so far, and every vertex has a length associated with it):

Initial graph state

On every iteration a vertex will receive messages from other vertices with a new shortest path to it:

Processing input messages

A vertex will find the minimum input value on this iteration and if it is lower than the previous shortest path it will send messages to its neighbours containing new shortest paths to these vertices:

Sending output messages

In a nutshell, the algorithm is sending messages when it discovers a new shorter path to one of the vertices. When all shortest paths are calculated, no more messages sent, and the algorithm terminates.

Algorithm implementation

Let’s start with the main part of the algorithm: the implementation of the ComputeFunction. Its implementation pretty much matches the general structure presented in the previous section. On every iteration, the function receives messages containing new minimum distances to it through its neighbours and computes the minimum on this iteration. If this minimum is less than the minimum recorded so far, the function updates the shortest path using the setNewVertexValue and sends new minimum distance to all its neighbours.

class ShortestPathComputeFunction<K> extends ComputeFunction<K, Double, Double, NewMinDistance> {

    private final K sourceVertex;

    public ShortestPathComputeFunction(K sourceVertex) {
        this.sourceVertex = sourceVertex;

    public void compute(Vertex<K, Double> vertex, MessageIterator<NewMinDistance> messageIterator) throws Exception {
        // Send initial group of messages from the source vertex
        if (vertex.getId().equals(sourceVertex) && getSuperstepNumber() == 1) {

        // Calculate new min distance from source node
        double minDistance = minDistance(messageIterator);

        // Send new min distance to neighbour vertices if new min distance is less
        if (minDistance < vertex.getValue()) {

    private double minDistance(MessageIterator<NewMinDistance> messageIterator) {
        double minDistance = Double.MAX_VALUE;
        for (NewMinDistance message : messageIterator) {
            minDistance = Math.min(message.getDistance(), minDistance);
        return minDistance;

    private void sendNewDistanceToAll(double newDistance) {
        for (Edge<K, Double> edge : getEdges()) {
            sendMessageTo(edge.getTarget(), new NewMinDistance(edge.getValue() + newDistance));

To transfer distance values between vertices we use a custom NewMinDistance class. It is a simple POJO class with a single field distance.

class NewMinDistance {
    private final double distance;

    public NewMinDistance(double distance) {
        this.distance = distance;

    public double getDistance() {
        return distance;

All that is left to do is to implement a combiner that would reduce the number of outgoing messages from every vertex. It can happen that during one iteration the ShortestPathComputeFunction would recompute the shortest path for a single vertex multiple times and this will generate multiple messages to the same neighbour.

To somewhat optimize this we can provide a combiner that would find the shortest distance to every neighbour and combine all output messages to a single one:

class ShortestPathCombiner<K> extends MessageCombiner<K, NewMinDistance> {
    public void combineMessages(MessageIterator<NewMinDistance> messageIterator) throws Exception {
        double minDistance = Double.MAX_VALUE;
        for (NewMinDistance message : messageIterator) {
            minDistance = Math.min(message.getDistance(), minDistance);

        sendCombinedMessage(new NewMinDistance(minDistance));

This is all we need to do to implement an algorithm using Pregel framework but to make usage of our algorithm more convenient we need to wrap it into an implementation of the GraphAlgorithm interface.

class ShortestPath<K, VV> implements GraphAlgorithm<K, VV, Double, DataSet<Vertex<K, Double>>> {

    private final K sourceVertex;
    private final int maxIterations;

    public ShortestPath(K sourceVertex, int maxIterations) {
        this.sourceVertex = sourceVertex;
        this.maxIterations = maxIterations;

    public DataSet<Vertex<K, Double>> run(Graph<K, VV, Double> graph) throws Exception {
        Graph<K, Double, Double> resultGraph = graph.mapVertices(new VertexDoubleMapFunction<>(sourceVertex))
        .runVertexCentricIteration(new ShortestPathComputeFunction(sourceVertex),
                                   new ShortestPathCombiner(),
        return resultGraph.getVertices();

First we need to set initial paths length for all vertices in the graph which is done using the mapVertices method. As result of the runVertexCentricIteration that will run our algorithm on the input graph we will have a graph where vertex values contain shortest paths from the sourceVertex. All that we need to do after this is to return a dataset of vertices that contains vertices ids and path lengths.

Initialization of the initial state of the graph is pretty straight forward. For the source vertex we set the value to 0, and for any other vertex we set the maximum Double value:

private static class ShortestPathInit<K, VV> implements MapFunction<Vertex<K,VV>, Double> {
    private final K sourceVertex;

    public ShortestPathInit(K sourceVertex) {
        this.sourceVertex = sourceVertex;

    public Double map(Vertex<K, VV> vertex) throws Exception {
        if (vertex.getId().equals(sourceVertex)) {
            return 0d;
        return Double.MAX_VALUE;

This is it! Now we can apply an algorithm on a Graph instance. To do this, we need to call the run method and pass an instance of our algorithm to it:

Graph<Integer, String, Double> graph = ...

int sourceVertex = 1;
int maxIterations = 10; ShortestPath<>(sourceVertex, maxIterations)).print();

You can find the full source code of the algorithm from this article in my GitHub repository with other Flink projects examples.

Declutter Your POJOs with Lombok

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

I have a love/hate relationship with Java.
On one hand, it’s a mature programming language with a diverse number of frameworks and libraries that make development relatively easy.
On the other hand, it’s very verbose and requires writing massive amounts of boilerplate code for common tasks.
The situation got better with the introduction of lambdas and streams in Java 8, but it is still sub-par in some areas, like writing plain old Java objects POJO.
In this post, I’ll show you how to rewrite POJOs in only a few lines of code with Lombok.

Continue reading

Graphs processing with Apache Flink

Graphs are everywhere. Internet, maps, and social networks to name just a few are all examples of massive graphs that contains vast amounts of useful information. Since the size of these networks is growing and processing them become more and more ubiquitous, we need better tools to do the job.

In this article, I’ll describe how we can use Flink Gelly library to process large graphs and will provide the simple example of how we can find a shortest path between two users in the Twitter graph.

Continue reading

Implementing Flink batch data connector

Apache Flink has a versatile set of connectors for externals data sources. It can read and write data from databases, local and distributed file systems. However, sometimes what Flink provides is not enough, and we need to read some uncommon data format.

In this article, I will show you how to implement a custom connector for reading a dataset in Flink.

Continue reading

Using Apache Flink with Java 8

JDK 8 has introduced a lot of long-anticipated features to Java language. Among those, the most notable was the introduction of lambda functions. They allowed adding new frameworks such as Java 8 Streams, as well as, new features to existing frameworks like JUnit 5.

Apache Flink also supports lambda functions, and in this post, I’ll show how to enable them and how to use them in your applications.

Continue reading

Calculating movies ratings distribution with Apache Flink

If you’ve been following recent news in the Big Data world, you’ve probably heard about Apache Flink. This platform for batch and stream processing, which is built on a few significant technical innovations, can become a real game changer and it is starting to compete with existing products like Apache Spark.

In this post, I would like to show how to implement a simple batch processing algorithm using Apache Flink. We will work with a dataset of movie ratings and will produce a distribution of user ratings. In the process, I’ll show few tricks that you can use to improve the performance of your Flink applications.

Continue reading

« Older posts

© 2017 Brewing Codes

Theme by Anders NorenUp ↑