Menu Sidebar
Menu

Four ways to optimize your Flink applications

Flink is a complicated framework and provides many ways to tweak its execution. In this article, I’ll show four different ways to improve the performance of your Flink applications.

If you are not familiar with Flink, you can read other introductory articles like this, this, and this one. But if you are already familiar with Apache Flink this article will help you to make your applications a little bit faster.

Use Flink tuples

When you use operations like groupBy, join or keyBy Flink provides you a number of options to select a key in your dataset. You can use a key selector function:

// Join movies and ratings datasets
movies.join(ratings)
        // Use movie id as a key in both cases
        .where(new KeySelector<Movie, String>() {
            @Override
            public String getKey(Movie m) throws Exception {
                return m.getId();
            }
        })
        .equalTo(new KeySelector<Rating, String>() {
            @Override
            public String getKey(Rating r) throws Exception {
                return r.getMovieId();
            }
        })

Or you can specify a field names in POJO types:

movies.join(ratings)
    // Use same fields as in the previous example
    .where("id")
    .equalTo("movieId")

But if you are working with Flink tuple types you can simply specify a position of a field tuple that will be used as key:

DataSet<Tuple2<String, String>> movies = ...
DataSet<Tuple3<String, String, Double>> ratings = ...

movies.join(ratings)
    // Specify fields positions in tuples
    .where(0)
    .equalTo(1)

The last option will give you the best performance, but what about readability? Does it mean that your code will look like this now:

DataSet<Tuple3<Integer, String, Double>> result = movies.join(ratings)
    .where(0)
    .equalTo(0)
    .with(new JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
        // What is happening here?
        @Override
        public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
            // Some tuples are joined with some other tuples and some fields are returned???
            return new Tuple3<>(first.f0, first.f1, second.f1);
        }
    });

A common idiom to improve readability, in this case, is to create a class that inherits from one of the TupleX classes and implements getters and setters for these fields. Here how an Edge class from the Flink Gelly library that has three classes and extends the Tuple3 class:

public class Edge<K, V> extends Tuple3<K, K, V> {

    public Edge(K source, K target, V value) {
        this.f0 = source;
        this.f1 = target;
        this.f2 = value;
    }
    
    // Getters and setters for readability
    public void setSource(K source) {
        this.f0 = source;
    }

    public K getSource() {
        return this.f0;
    }
    
    // Also has getters and setters for other fields
    ...
}

Reuse Flink objects

Another option that you can use to improve the performance of your Flink application is to use mutable objects when you return data from a user-defined function. Take a look at this example:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            // A new Tuple instance is created on every execution
            collector.collect(new Tuple2<>(userName, changesCount));
        }
    }

As you can see on every execution of the apply function, we create a new instance of the Tuple2 class, which increases pressure on a garbage collector. One way to fix this problem would be to reuse the same instance again and again:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create an instance that we will reuse on every call
        private Tuple2<String, Long> result = new Tuple<>();
    
        @Override
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, Long>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Auto-boxing!! A new Long value may be created
            result.f1 = changesCount;
            
            // Reuse the same Tuple2 object
            collector.collect(result);
        }
    }

It’s a bit better. We create a new Tuple2 instance on every call, but we still, indirectly, create an instance of the Long class. To solve this problem, Flink has a number of so-called value classes: IntValue, LongValue, StringValue, FloatValue, etc. The point of this classes is to provide mutable versions of built-in types, so we could reuse them in our user-defined functions. Here is how we can use them:

stream
    .apply(new WindowFunction<WikipediaEditEvent, Tuple2<String, Long>, String, TimeWindow>() {
        // Create a mutable count instance
        private LongValue count = new IntValue();
        // Assign mutable count to the tuple
        private Tuple2<String, LongValue> result = new Tuple<>("", count);
    
        @Override
        // Notice that now we have a different return type
        public void apply(String userName, TimeWindow timeWindow, Iterable<WikipediaEditEvent> iterable, Collector<Tuple2<String, LongValue>> collector) throws Exception {
            long changesCount = ...
            
            // Set fields on an existing object instead of creating a new one
            result.f0 = userName;
            // Update mutable count value
            count.setValue(changesCount);
            
            // Reuse the same tuple and the same LongValue instance
            collector.collect(result);
        }
    }

This idiom is commonly used in Flink libraries like Flink Gelly.

Use function annotations

One more way to optimize your Flink application is to provide some information about what your user-defined functions are doing with input data. Since Flink can’t parse and understand code, you can provide crucial information that will help to build a more efficient execution plan. There are three annotations that we can use:

  • @ForwardedFields – specifies what fields in an input value were left unchanged and are used in an output value.
  • @NotForwardedFields – specifies fields which were not preserved in the same positions in the output.
  • @ReadFields – specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.

Let’s take a look at how we can use ForwardedFields annotation:

// Specify that the first element is copied without any changes
@ForwardedFields("0")
class MyFunction implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
       // Copy first field without change
        return new Tuple2<>(value.f0, value.f1 + 123);
    }
}

This means that the first element in an input tuple is not being changed and it is returned in the same position.

If you don’t change a field, but simply move it into a different position, you can specify this with the ForwardedFields annotation as well. In the next example we swap fields in an input tuple and warn Flink about this:

// 1st element goes into the 2nd position, and 2nd element goes into the 1st position
@ForwardedFields("0->1; 1->0")
class SwapArguments implements MapFunction<Tuple2<Long, Double>, Tuple2<Double, Long>> {
    @Override
    public Tuple2<Double, Long> map(Tuple2<Long, Double> value) {
       // Swap elements in a tuple
        return new Tuple2<>(value.f1, value.f0);
    }
}

The annotations mentioned above can only be applied to functions that have one input parameter, such as map or flatMap. If you have two input parameters, you can use the ForwardedFieldsFirst and the ForwardedFieldsSecond annotations that provide information about the first and the second parameters respectively.

Here how we can use these annotations in an implementation of the JoinFunction interface:

// Two fields from the input tuple are copied to the first and second positions of the output tuple
@ForwardedFieldsFirst("0; 1")
// The third field from the input tuple is copied to the third position of the output tuple
@ForwardedFieldsSecond("2")
class MyJoin implements JoinFunction<Tuple2<Integer,String>, Tuple2<Integer,Double>, Tuple3<Integer, String, Double>>() {
    @Override
    public Tuple3<Integer, String, Double> join(Tuple2<Integer, String> first, Tuple2<Integer, Double> second) throws Exception {
        return new Tuple3<>(first.f0, first.f1, second.f1);
    }
})

Flink also provides NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, and ReadFirldsSecond annotations for similar purposes.

Select join type

You can make your joins faster if you give Flink another hint, but before we discuss why it works, let’s talk about how Flink executes joins.

When Flink is processing batch data, each machine in a cluster stores part of data. To perform a join Apache Flink needs to find all pairs of two datasets where a join condition is satisfied. To do this Flink first has to put items from both datasets that have the same key on the same machine in the cluster. There are two strategies for this:

  • Repartition-Repartition strategy – in this case, both datasets are partitioned by their keys and send across the network. It means that if datasets are big, it may take a significant amount of time to copy them across the network.
  • Broadcast-Forward strategy – in this case, one dataset is left untouched, but the second dataset is copied to every machine in the cluster that has part of the first dataset.

If you are joining a small dataset with a much bigger dataset, you can use the Broadcast-Forward strategy and avoid costly partition of the first dataset. This is really easy to do:

ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)

This hints that the first dataset is a much smaller than the second one.

You can also use other join hints:

  • BROADCAST_HASH_SECOND – the second dataset is much smaller
  • REPARTITION_HASH_FIRST – the first dataset it a bit smaller
  • REPARTITION_HASH_SECOND – the second dataset is a bit smaller
  • REPARTITION_SORT_MERGE – repartition both datasets and use sorting and merging strategy
  • OPTIMIZER_CHOOSES – Flink optimizer will decide how to join datasets

You can read more about how Flink performs joins in this article.

More information

I hope you liked this article and found it useful.

I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.

Getting started with stream processing using Apache Flink

If in your mind “Apache Flink” and “streaming programming” does not have a strong link you probably was not following news recently. Apache Flink took the world of Big Data by storm. Now is a perfect opportunity for a tool like this to thrive: stream processing becomes more and more prevalent in data processing, and Apache Flink presents a number of important innovations.

In this article, I will show how to start writing stream processing algorithms using Apache Flink. We will read a stream of Wikipedia edits and will see how can get some meaningful data out of it. In the process, you will see how to read and write stream data, how to perform simple operations, and how to implement more complex algorithms.

Read More

Getting started with batch processing using Apache Flink

If you’ve been following software development news recently you probably heard about the new project called Apache Flink. I’ve already written about it a bit here and here, but if you are not familiar with it, Apache Flink is a new generation Big Data processing tool that can process either finite sets of data (this is also called batch processing) or potentially infinite streams of data (stream processing). In terms of new features, many believe Apache Flink is a game changer and can even replace Apache Spark in the future.

In this article, I’ll introduce you to how you can use Apache Flink to implement simple batch processing algorithms. We will start with setting up our development environment, and then we will see how we can load data, process a dataset, and write data back to an external system.

Read More

Apache Flink vs. Apache Spark

If you look at this image with a list of Big Data tools it may seem that all possible niches in this field are already occupied. With so much competition it should be very tough to come up with a groundbreaking technology.

Apache Flink creators have a different thought about this. It started as a research project called Stratosphere. Stratosphere was forked, and this fork became what we know as Apache Flink. In 2014 it was accepted as an Apache Incubator project, and just a few months later it became a top-level Apache project. At the time of this writing, the project has almost twelve thousand commits and more than 300 contributors.

Why is there so much attention? This is because Apache Flink was called a new generation Big Data processing framework and has enough innovations under its belt to replace Apache Spark and become the new de-facto tool for batch and stream processing.

Should you switch to Apache Flink? Should you stick with Apache Spark for a while? Or is Apache Flink just a new gimmick? This article will attempt to give you answers to these and other questions.

Read More

Java’s Synchronized Keyword in Three Minutes

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

The synchronized keyword can be used to ensure that only one thread at a time executes a particular section of code.
This is a simple way to prevent race conditions, which occur when several threads change shared data at the same time in a way that leads to incorrect results.
With synchronized either entire methods or selected blocks can be single-threaded.

This article requires basic knowledge of Java threads and race conditions.

Read More

The Dangers of Race Conditions in Five Minutes

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

A race condition is an undesired property of multithreaded code.
It expresses that the program’s outcome depends on a particular order of operations but that the underlying platform (in the case of Java, the JVM) does not guarantee that order.
As a consequence the outcome is often fluctuating across runs as it depends on how exactly operations from different threads interleave.
In Java, race conditions occur most often when multiple threads share and mutate the same object.

The only prerequisite for this article is a basic knowledge of threads.

Read More

Java Thread Class in Five Minutes

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

In a running Java program, all code is executed in threads and within a thread everything happens sequentially, one instruction after another.
When Java (or rather the JVM) launches, it creates one thread for the main method to be executed in.
From there, new threads can be created to execute code in parallel to the main one.
The most basic way to do that is to use the Thread class.

This article does not require any knowledge of multithreaded programming, but you need to be familiar with core Java concepts such as classes and interfaces.

Read More

Beyond POJOs – Ten More Ways to Reduce Boilerplate with Lombok

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

Lombok is a great library and its main selling point is how it declutters POJO definitions.
But it is not limited to that use case!
In this article, I will show you six stable and four experimental Lombok features that can make your Java code even cleaner.
They cover many different topics, from logging to accessors and from null safety to utility classes.
But they all have one thing in common: reducing boilerplate to make code easier to read and more expressive.

Read More

Distributed graphs processing with Pregel

Graphs processing is an important part of data analysis in many domains. But graphs processing is tricky may be tricky since general purpose distributed computing tools are not suited for graphs processing.

It is not surprising that an important advancement in the area of distributed graphs processing came from Google that has to process one of the biggest graphs: the Webgraph. Engineers in Google wrote a seminal paper where they described a new system for distributed graphs processing they called Pregel.

In this article, I will explain how Pregel works, and demonstrate how to implement algorithms using Pregel using API from Apache Flink.

Read More

Declutter Your POJOs with Lombok

I wrote this article for SitePoint’s Java channel, where you can find a lot of interesting articles about our programming language. Check it out!

I have a love/hate relationship with Java.
On one hand, it’s a mature programming language with a diverse number of frameworks and libraries that make development relatively easy.
On the other hand, it’s very verbose and requires writing massive amounts of boilerplate code for common tasks.
The situation got better with the introduction of lambdas and streams in Java 8, but it is still sub-par in some areas, like writing plain old Java objects POJO.
In this post, I’ll show you how to rewrite POJOs in only a few lines of code with Lombok.

Read More

Older Posts

Follow me on Twitter

Subscribe to Blog via Email

Enter your email address to subscribe to this blog and receive notifications of new posts by email.