If you know anything about Apache Flink, you are probably familiar with how to send data to it and how to get results back. But in some cases, we need to send configuration data to the Flink cluster and receive some additional data from it.

In the first part of the article, I’ll describe how to send configuration data to our Flink cluster. There are many things that we want to configure: function parameters, configuration files, machine learning models. Flink provides several different ways to do this, and we will cover how to use them and when to use each one. In the second part of the article, I will describe a non-trivial way of sending data back from a Flink cluster.

This article requires some basic knowledge of Apache Flink. If you are not familiar with it, you can read some of my other articles on the topic: here, here, and here.

Sending data to task managers

Before we dig into the details of how to send data between different components in Apache Flink, let’s first talk about what components there are in a Flink cluster and what are we trying to achieve. The following diagram presents what main moving parts Flink has and how they interact:

When we need to execute a Flink application, we interact with a job manager that stores details about the job it is running, such as an execution graph. It controls task managers and each task manager contains a portion of the data and execute data processing functions that we’ve defined.

In many cases, we would like to configure the behavior of our functions that run in the Flink cluster. Depending on a use-case we may need to set a single variable or submit a file with a static configuration, and we will discuss how Flink supports these and other cases.

In addition to sending configuration data to task managers, sometimes we may want to return data from our functions in addition to regular outputs.

Configuring user-defined functions

Let’s say we have an application that reads a list of movies from a CSV file and needs to filter all movies of a particular genre:

// Read a dataset of movies
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv")
        .ignoreFirstLine()
        .parseQuotedStrings('"')
        .ignoreInvalidLines()
        .types(Long.class, String.class, String.class);

lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
    // Genres for a movie separated by the "|" symbol
    String[] genres = movie.f2.split("\\|");

    // Find all movies that has the "Action" genre
    return Stream.of(genres).anyMatch(g -> g.equals("Action"));
}).print();

It is very likely that we would like to extract movies of a different genre and to this we need to be able to configure our filter function. When you implement a function like this, the most straightforward way to configure it is to implement a constructor:

// Pass a genre name
lines.filter(new FilterGenre("Action"))
    .print();

...

class FilterGenre implements FilterFunction<Tuple3<Long, String, String>> {

    String genre;
    // Initialize filter function
    public FilterGenre(String genre) {
        this.genre = genre;
    }

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

Alternatively, if you are using lambda functions you can simply use a variable from its closure:

final String genre = "Action";

lines.filter((FilterFunction<Tuple3<Long, String, String>>) movie -> {
    String[] genres = movie.f2.split("\\|");

    // Using variable
    return Stream.of(genres).anyMatch(g -> g.equals(genre));
}).print();

Flink will serialize this variable and send it with the function to the cluster.

All these methods can get annoying if you need to pass a lot of variables to your function. To help with that Apache Flink provides the withParameters method. To use it you need to implement a Rich version of a function you are interested in, so instead of implementing the MapFunction interface, you will have to implement the RichMapFunction.

Rich functions allow you to pass a number of parameters using the withParameters method:

// Class in Flink to store parameters
Configuration configuration = new Configuration();
configuration.setString("genre", "Action");

lines.filter(new FilterGenreWithParameters())
        // Pass parameters to a function
        .withParameters(configuration)
        .print();
 ```
 
To read these parameters we need to implement the `open` and read parameters in it:

 ```java
class FilterGenreWithParameters extends RichFilterFunction<Tuple3<Long, String, String>> {

    String genre;

    @Override
    public void open(Configuration parameters) throws Exception {
        // Read the parameter
        genre = parameters.getString("genre", "");
    }

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}
 ```
 
All these options will work, but it can be tedious if you need to set the same parameter for multiple functions. To handle this Flink allows setting global environments variable that will be accessible by all task managers. 

To do this, you first need to read arguments from a command line using the `ParameterTool.fromArgs`:

```java
public static void main(String... args) {
    // Read command line arguments
    ParameterTool parameterTool = ParameterTool.fromArgs(args);
    ...
}

and then set global job parameters using the setGlobalJobParameters:

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
...

// This function will be able to read these global parameters
lines.filter(new FilterGenreWithGlobalEnv())
                .print();

Now we can implement a function that will read these parameters. As before it should be a rich function:

class FilterGenreWithGlobalEnv extends RichFilterFunction<Tuple3<Long, String, String>> {

    @Override
    public boolean filter(Tuple3<Long, String, String> movie) throws Exception {
        String[] genres = movie.f2.split("\\|");
        // Get global parameters
        ParameterTool parameterTool = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        // Read parameter
        String genre = parameterTool.get("genre");

        return Stream.of(genres).anyMatch(g -> g.equals(genre));
    }
}

To read a parameter we need to call the getGlobalJobParameter to get all global parameters and then use the get method to get the parameter we are interested in.

Broadcast variables

All these methods that we’ve discussed before will suit you if you want to send data from a client to task managers, but what if data exists in task managers in the form of a dataset? In this case, it’s better to use another Flink feature called broadcast variables. It simply allows sending a dataset to task managers that will execute your functions.

Let’s say we have a dataset that contains words that we should ignore when we do text processing, and we want to set it our function. To set a broadcast variable for a single function, we need to use the withBroadcastSet method and a dataset to it.

DataSet<Integer> toBroadcast = env.fromElements(1, 2, 3);
// Get a dataset with words to ignore
DataSet<String> wordsToIgnore = ...

data.map(new RichFlatMapFunction<String, String>() {

    // A collection to store words. This will be stored in memory
    // of a task manager
    Collection<String> wordsToIgnore;

    @Override
    public void open(Configuration parameters) throws Exception {
        // Read a collection of words to ignore
        wordsToIgnore = getRuntimeContext().getBroadcastVariable("wordsToIgnore");
    }


    @Override
    public String map(String line, Collector<String> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words)
            // Use the collection of words to ignore
            if (wordsToIgnore.contains(word))
                out.collect(new Tuple2<>(word, 1));
    }
    // Pass a dataset via a broadcast variable
}).withBroadcastSet(wordsToIgnore, "wordsToIgnore");

You should keep in mind that if you use broadcast variables, a dataset will be stored in a task manager’s memory, so you should only use it for small datasets.

If you want to send more data to each task manager and do not want to store this data in memory, you can send a static file to task managers using Flink’s distributed cache. To use it you, first, need to store a file in one of the distributed file systems like HDFS and then register this file in the cache:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// Register a file from HDFS
env.registerCachedFile("hdfs:///path/to/file", "machineLearningModel")

...

env.execute()

To access the distributed cache, we again need to implement a rich function:

class MyClassifier extends RichMapFunction<String, Integer> {

    @Override
    public void open(Configuration config) {
      File machineLearningModel = getRuntimeContext().getDistributedCache().getFile("machineLearningModel");
      ...
    }

    @Override
    public Integer map(String value) throws Exception {
      ...
    }
}

Notice that to access a file in the distributed cache we need to use the same key that we used to register it.

Accumulators

We’ve covered how we can send data to task managers but now let’s talk about how we can send data from task managers back. You may wonder why do we need to do anything special. After all, Apache Flink is all about building data processing pipelines that read input data, process it, and return a result back.

To clarify what else can we possibly want let’s take a look at an example. Let’s say we need to count how many times each word occurs in a text and at the same time we want to calculate how many lines do we have in the text:

// Text to process
DataSet<String> lines = ...

// Word count algorithm
lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
    }
})
.groupBy(0)
.sum(1)
.print();

// Count a number of lines in the text to process
int linesCount = lines.count()
System.out.println(linesCount);

The problem is that if we run this application as it is will run two Flink jobs! First to get the word count and second to count a number of lines.

This is definitely inefficient, but how can we avoid this? One way is to use accumulators. They allow you to send data from task managers and this data to be aggregated using a predefined function. Flink has following built-in accumulators:

  • IntCounter, LongCounter, DoubleCounter – allows summing together int, long, double values sent from task managers
  • AverageAccumulator – calculates an average of double values
  • LongMaximum, LongMinimum, IntMaximum, IntMinimum, DoubleMaximum, DoubleMinimum – accumulators to determine maximum and minimum values for different types
  • Histogram – used to computed distribution of values from task managers

To use an accumulator, we need to create and register it an user-defined function and then read the result on the client. Here is how we can do this:

lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Integer>>() {

    // Create an accumulator
    private IntCounter linesNum = new IntCounter();

    @Override
    public void open(Configuration parameters) throws Exception {
        // Register accumulator
        getRuntimeContext().addAccumulator("linesNum", linesNum);
    }

    @Override
    public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
        String[] words = line.split("\\W+");
        for (String word : words) {
            out.collect(new Tuple2<>(word, 1));
        }
        
        // Increment after each line is processed
        linesNum.add(1);
    }
})
.groupBy(0)
.sum(1)
.print();

// Get accumulator result
int linesNum = env.getLastJobExecutionResult().getAccumulatorResult("linesNum");
System.out.println(linesNum);

This allows us to count how many times each word occurs in the input text and how many lines does it have.

If you need a custom accumulator, you can also implement your own accumulators using Accumulator or SimpleAccumulator interfaces.

More information

I hope you liked this article and found it useful. You can find the source code for this article in my git repository with other Apache Flink examples.

I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.

Posted by Ivan Mushketyk

Principal Software engineer and life-long learner.
Creating courses for Pluralsight.
Writing for DZone, SitePoint, and SimpleProgrammer.