Getting started with batch processing using Apache Flink

If you’ve been following software development news recently you probably heard about the new project called Apache Flink. I’ve already written about it a bit here and here, but if you are not familiar with it, Apache Flink is a new generation Big Data processing tool that can process either finite sets of data (this is also called batch processing) or potentially infinite streams of data (stream processing). In terms of new features, many believe Apache Flink is a game changer and can even replace Apache Spark in the future.

In this article, I’ll introduce you to how you can use Apache Flink to implement simple batch processing algorithms. We will start with setting up our development environment, and then we will see how we can load data, process a dataset, and write data back to an external system.

Why batch processing?

You might have heard that stream processing is “the new hot thing right now” and that Apache Flink is a tool for stream processing. This can pose a question, why do we need to learn how to implement batch processing applications.

While it is true, that stream processing becomes more and more widespread; many tasks still require batch processing. Also if you are just getting started with Apache Flink, in my opinion, it is better to start with batch processing since it is simpler and in a way resembles working with a database. Once you’ve covered batch processing, you can learn about stream processing where Apache Flink really shines!

How to follow examples

If you want to implement some Apache Flink applications yourself, first you need to create a Flink project. In this article, we are going to write applications in Java, but you can also write Flink application in Scala, Python or R.

To create a Flink Java project execute the following command:

 mvn archetype:generate                              \
      -DarchetypeGroupId=org.apache.flink            \
      -DarchetypeArtifactId=flink-quickstart-java    \
      -DarchetypeVersion=1.3.2

After you enter group id, artifact id, and a project version this command will create the following project structure:

.
├── pom.xml
└── src
    └── main
        ├── java
        │   └── flinkProject
        │       ├── BatchJob.java
        │       ├── SocketTextStreamWordCount.java
        │       ├── StreamingJob.java
        │       └── WordCount.java
        └── resources
            └── log4j.properties

The most important here is the massive pom.xml that specifies all the necessary dependencies. Automatically created Java classes are examples of some simple Flink applications that you can take a look at, but we don’t need them for our purposes.

To start developing your first Flink application create a class with the main method like this:

public class FilterMovies {

    public static void main(String[] args) throws Exception {
        // Create Flink execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        // We will write our code here
        
        // Start Flink application
        env.execute();
    }
}

There is nothing special about this main method. All we have to do is to add some boilerplate code.

First, we need to create a Flink execution environment that will behave differently if you run it on a local machine or in a Flink cluster:

  • On a local machine, it will create a full-fledged Flink cluster with multiple local nodes. This is a good way to test how your application will work in a realistic environment
  • On a Flink cluster, it won’t create anything but will use existing cluster resources instead

Alternatively, you could create a collection environment like this:

    ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();

This will create a Flink execution environment that instead of running Flink application on a local cluster will emulate all operations using in-memory collections in a single Java process. Your application will run faster, but this environment some subtle differences from a local cluster with multiple nodes.

Where do we start?

Before we can do anything, we need to read data into Apache Flink. We can read data from numerous systems including: local filesystem, S3, HDFS, HBase, Cassandra, etc. No matter where we read a dataset from, Apache Flink allows us to work with data in a uniform way using the DataSet class:

DataSet<Integer> numbers = ...

All items in a dataset should have the same type. The single generics parameter specifies a type of the data that is stored in a dataset.

To read data from a file, we can use the readTextFile method that will read lines in a file line by line and return a dataset of type String:

DataSet<String> lines = env.readTextFile("path/to/file.txt");

If you specify a file path like this, Flink will attempt to read a local file. If you want to read a file from HDFS you need to specify the hdfs:// protocol:

env.readCsvFile("hdfs:///path/to/file.txt")

Flink also has support for CSV files, but in this case, it won’t return a dataset of strings. It will try to parse every line and return a dataset of Tuple instances:

DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv")
                .types(Long.class, String.class);

Tuple2 is a class that stores an immutable pair of two fields, but there are other classes like Tuple0, Tuple1, Tuple3, up to Tuple25 that store from zero to twenty-five fields. Later we will see how to work with these classes.

The types method specifies types and number of columns in a CSV file, so Flink could read a parse them.

We can also create small datasets that are very good for small experiments and unit tests:

// Create from a list
DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c"));
// Create from an array
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);

A question that you may ask is what data we can store in a DataSet? Not every Java type can be used in a dataset, and there are four different categories of types that you can use:

  • Built-in Java types and POJO classes
  • Flink Tuples and Scala case classes
  • Values – these are special mutable wrappers for Java primitive types that you can use to increase performance (I’ll write about this in one of the upcoming articles)
  • Implementations of Hadoop Writable interface

Processing data with Apache Flink

Now to the data processing part! How do you implement an algorithm for processing your data? To do this, you can use a number of operations that resemble Java 8 streams operations, such as:

  • map – converts items in a dataset using a user-defined function. Every input element is converted into exactly one output element
  • filter – filters items in a dataset according to a user defined function
  • flatMap – similar to the map operator, but allows returning zero, one or many elements
  • groupBy – groups elements by a key. Similar to the GROUP BY operator in SQL
  • project – select specified fields in a dataset of tuples, similar to the SELECT operator from SQL
  • reduce – combines elements in a dataset into a single value using a user-defined function

Keep in mind that the biggest difference between Java streams and these operations is that Java 8, works with data in memory and can access local data, while Flink works with data on a cluster in a distributed environment.

Let’s take a look at a simple example that uses these operations. The following example is very straightforward. It creates a dataset of numbers, which squares every number and filters out all odd numbers.

// Create a dataset of numbers
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7);

// Square every number
DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer integer) throws Exception {
        return integer * integer;
    }
})
// Leave only even numbers
.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer integer) throws Exception {
        return integer % 2 == 0;
    }
});

If you have any experience with Java 8 you are probably wondering why I don’t use lambdas here. We can use lambdas here but it can cause some complications, as I’ve written here.

Saving data back

After we’ve finished processing our data it would make sense to save the result of our hard work. Flink can store data into a number of third-party systems such as HDFS, S3, Cassandra, etc.

For example, to write data to a file, we need to use the writeAsText method from the DataSet class:

DataSet<Integer> ds = ...

ds.writeAsText("path/to/file");

For debugging/testing purposes Flink can write data to standard output or to standard output:

DataSet<Integer> ds = ...

// Output dataset to the standard output
ds.print();

// Output dataset to the standard err
ds.printToErr()

More complicated example

To implement some meaningful algorithms we need to first download a Grouplens movies dataset. It contains several CSV files with information about movies and movie ratings. We are going to work with the movies.csv file from this dataset which contains a list of all movies and looks like this:

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller

It has three columns:

  • movieId – a unique movie id for a movie in this dataset
  • title – a title of the movie
  • genres – a “|” separated list of genres for each movie

We can now load this CSV file in Apache Flink and perform some meaningful processing. Here we will load a file from a local filesystem, while in a realistic environment you would read a much bigger dataset and it would probably reside in a distributed system, such as S3 or HDFS.

In this demo let’s find all movies of the “Action” genre. Here is a code snippet that that does this:

// Load dataset of movies
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
            .ignoreFirstLine()
            .parseQuotedStrings('"')
            .ignoreInvalidLines()
            .types(Long.class, String.class, String.class);


DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
    @Override
    public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
        String movieName = csvLine.f1;
        String[] genres = csvLine.f2.split("\\|");
        return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
    }
});
DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
    @Override
    public boolean filter(Movie movie) throws Exception {
        return movie.getGenres().contains("Action");
    }
});

filteredMovies.writeAsText("output.txt");
        

Let’s break it down. First, we read a CSV file using the readCsvFile method:

DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
    // ignore CSV header
    .ignoreFirstLine()
    // Set strings quotes character
    .parseQuotedStrings('"')
    // Ignore invalid lines in the CSV file
    .ignoreInvalidLines()
    // Specify types of columns in the CSV file
    .types(Long.class, String.class, String.class);

Using helper methods, we specify how to parse strings in the CSV file and that we need to skip the first line. In the last line, we specify a type of each column in the CSV file and Flink will parse data for us.

Now when we have a dataset loaded in a Flink cluster we can do some data processing. First, we parse a list of genres for every movie using the map method:

DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() {
    @Override
    public Movie map(Tuple3<Long, String, String> csvLine) throws Exception {
        String movieName = csvLine.f1;
         String[] genres = csvLine.f2.split("\\|");
         return new Movie(movieName, new HashSet<>(Arrays.asList(genres)));
    }
});

To transform every movie we need to implement the MapFunction that will receive every CSV record as a Tuple3 instance and will convert it into the Movie POJO class:

class Movie {
    private String name;
    private Set<String> genres;

    public Movie(String name, Set<String> genres) {
        this.name = name;
        this.genres = genres;
    }

    public String getName() {
        return name;
    }

    public Set<String> getGenres() {
        return genres;
    }
}

If you recall the structure of the CSV file, the second column contains a name of a movie and the third column contains a list of genres. Hence we access these columns using fields f1 and f2 respectively.

Now when we have a dataset of movies we can implement the core part of our algorithm and filter all action movies:

DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() {
    @Override
    public boolean filter(Movie movie) throws Exception {
        return movie.getGenres().contains("Action");
    }
});

This will only return movies that contain “Action” in the set of genres.

Now the last step is very straightforward; we store result data into a file:

 filteredMovies.writeAsText("output.txt");

This simply stores the result data into a local text file, but as with the readTextFile method, we could write this file into HDFS or S3 by specifying a protocol like hdfs://.

More information

This was an introductory article, and there is much more to Apache Flink. I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.