# Graphs processing with Apache Flink

Graphs are everywhere. Internet, maps, and social networks to name just a few are all examples of massive graphs that contains vast amounts of useful information. Since the size of these networks is growing and processing them become more and more ubiquitous, we need better tools to do the job.

In this article, I’ll describe how we can use Flink Gelly library to process large graphs and will provide the simple example of how we can find a shortest path between two users in the Twitter graph.

# Introduction to Gelly

In a nutshell, Flink Gelly is a library for graph processing implemented on top batch processing API in Apache Flink:

It allows us to process huge graphs in a distributed fashion using Apache Flink API.

You may wonder why do we need to have one more graph library? Since there are other existing graph processing systems (e.g. Apache Giraph) or general purpose Big Data systems that can be used for some intermediate graph processing it may seem that new graphs processing library may be superfluous.

Gelly still has one important advantage over other systems. Since it is a part of Apache Flink, it allows us to preprocess graph data, process graphs and transform result graphs using one system and one API. This is convenient both from development and operational standpoint since in this case we only have one API to learn and one system to operate.

# Graph intro

As you probably know a graph is a set vertices connected by edges. Flink represents graphs as two datasets: a dataset of vertices and a dataset of edges.

```
/**
* @param <K> the key type for edge and vertex identifiers
* @param <VV> the value type for vertices
* @param <EV> the value type for edges
*/
public class Graph<K, VV, EV> {
private final DataSet<Vertex<K, VV>> vertices;
private final DataSet<Edge<K, EV>> edges;
...
}
```

The `Graph`

class has three generics arguments to specify a type of vertices keys that uniquely define, type of values associated with vertices and type of values associated with edges. So the following `Graph`

definition:

```
Graph<Long, String, Integer> graph = ...
```

represents a graph with vertices with keys of type `Long`

, vertices with values of type `String`

and edges with values of type `Integer`

.

The `Vertex`

in Gelly is essentially a tuple with two values: id of a vertex and an optional value associated with it. Similarly, the `Edge`

is tuple with three elements: ids of source and target values

Since the `DataSet`

is immutable Graph class is immutable as well, and all operations that change a graph create a new instance. Notice that the `Graph`

class in Gelly is always directed.

# Creating Graph

We can create a graph using one of three ways:

- Generate a graph using one of existing graph generators
- Create a graph from one or two
`DataSet`

instances - Read a graph from a CSV file

Let’s see how we can create a graph in Gelly in different ways.

## Graph generators

Flink supports a number of graph generators to generate star graphs, cycle graphs, complete graphs and so on. Here is an example of how to generate a complete graph where every vertex is connected to all other vertices:

```
Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
.generate();
```

Graph generators are useful for testing and allow you to quickly create a graph for your experiments.

## Create a graph from datasets

The most common way to create a graph with Gelly is from one or several `DataSet`

instances. They are usually read from an external system, such as distributed filesystem or a database.

Gelly allows a lot of freedom here. The simplest way is to create a dataset of vertices and a dataset of edges and pass them to the `Graph.fromDataSet`

method:

```
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Vertex<Long, NullValue>> vertices = ...
DataSet<Edge<Long, NullValue>> edges = ...
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
```

For testing purposes we can the `fromCollection`

method that allows to create small graphs from in-memory collections:

```
List<Vertex<Long, NullValue>> vertices = ...
List<Edge<Long, NullValue>> edges = ...
Graph<Long, NullValue, NullValue> graph = Graph.fromCollection(vertices, edges, env);
```

We can also create a graph using only a dataset of edges:

```
DataSet<Edge<Long, String>> edges = ...
Graph<Long, NullValue, String> graph = Graph.fromDataSet(edges, env);
```

In this case Gelly would extract vertices keys from edges and assign no values to vertices.

Gelly provides more methods to create a graph and has different variations that accept datasets of `Tuple`

s and collections. You can find them in the official documentation.

## Reading graph from CSV files

Of course, we can create a `DataSet`

instance from a CSV file and then create a `Graph`

instance out of it. But to simplify this common case, Gelly has special support for this use case. To do this, we need to use the `Graph.fromCsvReader`

method that reads vertices and edges data from two CSV files:

```
Graph<Long, Double, String> graph =
Graph.fromCsvReader("vertices.csv", "edges.csv", env)
.types(Long.class, Double.class, String.class);
```

The `types`

method is used to specify types of keys and values in the graph.

As before Gelly can create a graph from edges only:

```
Graph<String, NullValue, NullValue> simpleGraph =
Graph.fromCsvReader("edges.csv", env)
.keyType(String.class);
```

# Working with graphs

Once we have a graph, it would be handy to be able to process it in some way. Let’s briefly go through what we can do with Gelly graphs:

## Graph properties

These are the most straightforward methods that allow to query basic graph properties, such as number of vertices, number of edges, in-degrees, etc:

```
Graph<Long, Double, String> graph = ...
// Get graph vertices
DataSet<Vertex<Long, Double>> vertices = graph.getVertices();
// Get graph edges
DataSet<Edge<Long, String>> edges = graph.getEdges()
// get the IDs of the vertices as a DataSet
DataSet<Long> vertexIds = graph.getVertexIds()
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
DataSet<Tuple2<Long, LongValue>> inDegrees = graph.inDegrees()
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
DataSet<Tuple2<Long, LongValue>> outDegrees = graph.outDegrees()
```

## Graph Transformations

With these methods, we can update vertices or edges. The group includes methods like:

**map**– change values associated with edges or vertices**filter**– leave only edges and vertices that match a predefined predicate**reverse**– creates a graph where the direction of edges is reversed**union**– merges two graphs together**difference**– leave only vertices and edges that do not exist in another graph

Here is an example of filtering edges that keeps only edges where source and target vertices are different:

```
Graph<Integer, NullValue, NullValue> graph = ...
Graph<Integer, NullValue, NullValue> filteredGraph = graph.filterOnEdges(new FilterFunction<Edge<Integer, NullValue>>() {
@Override
public boolean filter(Edge<Integer, NullValue> edge) throws Exception {
// Keep only edges where source and target are different
return !edge.getSource().equals(edge.getTarget());
}
});
```

## Graph Mutations

This group of methods includes methods that allow to add or remove edges and vertices:

```
Graph<Integer, Double, String> graph = ...
// Add edge to the graph
graph.addEdge(new Edge<Integer, String>(1, 2, "1-2"));
// Add vertex to the graph
graph.addVertex(new Vertex<Integer, Double>(1, 4.2));
// Remove edge from the graph
graph.removeEdge(new Edge<Integer, String>(1, 2, "1-2"));
```

# Shortest Path in Twitter

Let’s get more practical. In the following example, I’ll show how to find the shortest path between two users in Twitter social graph. To do this, I’ll use `Graph`

methods mentioned above and the `SingleSourceShortestPath`

algorithm that calculates path length from a source vertex to all vertices in a graph.

Instead of reading data from Twitter I use Stanford Twitter Dataset. If you want to know more about data format and how we can read, please refer to my previous post. All it is important to know here is that before processing dataset I load data into a dataset of `TwitterFollower`

objects:

```
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<TwitterFollower> twitterFollowers = env.createInput(new StanfordTweetsDataSetInputFormat("/Users/ivanmushketyk/Flink/twitter"));
```

Every `TwitterFollower`

object contains a pair of two Twitter users: a follower and the one he/she follows.

To process this data we first need to convert it into a dataset of `Edge`

s that we will use later to create a graph instance. To do this, we can use the `map`

method:

```
DataSet<Edge<Integer, NullValue>> twitterEdges = twitterFollowers
.map(new MapFunction<TwitterFollower, Edge<Integer, NullValue>>() {
@Override
public Edge<Integer, NullValue> map(TwitterFollower value) throws Exception {
Edge<Integer, NullValue> edge = new Edge<>();
edge.setSource(value.getFollower());
edge.setTarget(value.getUser());
return edge;
}
});
Graph<Integer, NullValue, NullValue> followersGraph = Graph.fromDataSet(twitterEdges, env);
```

When we create a graph from a dataset of edges Gelly populates a dataset of vertices using keys specified in input edges.

To calculate the shortest path we will use the `SingleSourceShortestPaths`

that calculates the shortest path from a source vertex to all other vertices in the graph. The problem is that the `SingleSourceShortestPaths`

algorithm only works on a weighted graph, meaning that every edge should have a `Double`

value associated with it. Since out graph so far has no values associated with edges we need to add them first.

To do this we can use the `mapEdges`

method that updates all edges in the graph. In this case we simply set `1.0`

as a weight for every edge:

```
Graph<Integer, NullValue, Double> weightedFollowersGraph = followersGraph.mapEdges(new MapFunction<Edge<Integer, NullValue>, Double>() {
@Override
public Double map(Edge<Integer, NullValue> edge) throws Exception {
return 1.0;
}
});
```

Now when we have a weighted graph we can use the `SingleSourceShortestPath`

algorithm.

The process is pretty straightforward. First, we need to initialize the algorithm with a starting node, and a maximum number of iterations the algorithm will do before it returns a result:

```
// @fourzerotwo
int sourceVertex = 3359851;
int maxIterations = 10;
SingleSourceShortestPaths<Integer, NullValue> singleSourceShortestPaths = new SingleSourceShortestPaths<>(sourceVertex, maxIterations);
DataSet<Vertex<Integer, Double>> result = singleSourceShortestPaths.run(weightedFollowersGraph);
```

Once we have an instance of the `SingleSourceShortestPaths`

we need to call the `run`

method and pass the weighted graph to it. This method returns a dataset of Vertices. IDs of these vertices correspond to vertices ids from the original graph, while values represent distances from the source vertex.

The `SingleSourceShortestPaths`

algorithm calculates the shortest path to all vertices in the network. Just out of curiosity we can display a path length from the target vertex to a random Twitter user:

```
// @soulpancake
int targetVertex = 19636959;
result.filter(vertex -> vertex.getId().equals(targetVertex))
.print();
```

Here is the output that I got:

```
(19636959,3.0)
```

# Conclusions

Graph processing is ubiquitous and can be used in different domains. To tackle this Gelly helps us to use power of Flink API to process large scale graphs. It provides simple API to create and edit graphs and has a plethora of handy algorithms for different graphs processing tasks.

You can find the full code of the example from this post in my Git repository with other Flink examples.

# More information

If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink

Here is a short preview of this course:

## Leave A Comment