Processing Big Data in Batch with Apache Flink – A Java Example

Apache Flink has emerged as one of the leading open source frameworks for processing large amounts of data efficiently. While it is best known for its powerful stream processing capabilities, Flink is also highly capable when it comes to batch processing of bounded datasets. In this article, we‘ll dive into what makes Flink great for batch workloads and walk through an end-to-end Java example of implementing a batch analytics pipeline.

Apache Flink

What is Apache Flink?

Apache Flink is a distributed data processing engine for stateful computations over bounded and unbounded datasets. It provides a unified API for defining data-driven applications that can process static data in batch mode as well as real-time data in streaming mode.

Some of the key features and benefits of Flink include:

  • High performance and low latency with optimized execution
  • Support for event time and stateful stream processing
  • Exactly-once state consistency guarantees
  • Connectors for integration with many data sources and sinks
  • Flexible deployment modes including YARN, Mesos, Kubernetes, and standalone
  • High-level APIs in Java, Scala, Python, and SQL
  • Libraries for machine learning, graph processing, and complex event processing

Compared to earlier Big Data frameworks like Hadoop MapReduce, Flink offers much faster execution times thanks to features like native memory management, iterative algorithms, and optimized execution plans. It also provides a more complete platform for many types of data processing applications beyond just MapReduce.

In the realm of batch processing, Flink is often compared to Apache Spark. Both are general-purpose distributed processing engines with APIs for Java and Scala. Some advantages of Flink over Spark for certain use cases include its "pipeline-as-a-service" architecture that allows for separate scaling of state and compute, its use of managed memory and automated memory management, and its exactly-once state consistency model. Flink‘s DataSet API for batch is also fully unified with its DataStream API for a seamless experience.

Setting Up a Flink Batch Project

To get started with Flink batch processing in Java, you‘ll need a development environment with a few prerequisites:

  • Java 8 or 11
  • Apache Maven 3.x
  • An IDE like IntelliJ or Eclipse
  • A running Flink cluster or local Flink installation

Once you have Java and Maven installed, create a new Maven project and add the following Flink dependencies to your pom.xml file:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.16.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>1.16.0</version>
    </dependency>
</dependencies>

The DataSet API

Flink‘s DataSet API is the core abstraction for implementing batch applications. It provides a rich set of operations and transformations that can be composed into dataflow programs.

The starting point is to create an ExecutionEnvironment, which serves as the context for creating DataSets and executing Flink jobs:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

A DataSet represents a collection of elements of a specific type, such as DataSet or DataSet. You can create a DataSet from various sources like local Java collections, text files, CSV files, or connectors to external systems like databases or message queues.

Here‘s an example of creating a DataSet from a text file:

DataSet<String> textLines = env.readTextFile("path/to/file.txt");

Once you have a DataSet, you can apply transformations to it. These return a new DataSet representing the result of the transformation. Some common transformations include:

  • map(): Applies a user-defined function to each element
  • flatMap(): Like map but can return zero or more elements per input element
  • filter(): Applies a predicate function to each element and retains those that pass
  • groupBy(): Groups elements by key and allows aggregation or reduction on each group
  • join(): Joins two DataSets by key with support for inner, outer, and semi joins
  • reduce(): Combines grouped elements into a single value

For example, this code performs a word count on the lines of text:

DataSet<Tuple2<String, Integer>> counts = 
    textLines
        .flatMap(new Tokenizer())
        .groupBy(0)
        .sum(1);

The Tokenizer is a user-defined function that splits each line into words and returns a DataSet<Tuple2<String, Integer>> where the Integer is a 1 for each occurrence. The groupBy and sum perform the counting.

To produce a result from a DataSet, you need to call an execution method like collect(), print(), or writeAsText(). These trigger the actual Flink job to run.

counts.print();

Batch Processing Example

To illustrate Flink batch processing in action, let‘s walk through an example that computes some statistics on a dataset. We‘ll use a public dataset from GroupLens that contains movie ratings.

Download the dataset from https://files.grouplens.org/datasets/movielens/ml-latest-small.zip and unzip it. We‘ll be using the movies.csv and ratings.csv files.

Create a new Java class in your project called MovieAnalytics with a main method:

public class MovieAnalytics {

  public static void main(String[] args) throws Exception {
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    // Read the movies and ratings CSV files
    DataSet<Tuple3<Long, String, String>> movies = env
      .readCsvFile("ml-latest-small/movies.csv")
      .ignoreFirstLine()
      .parseQuotedStrings(‘"‘)
      .ignoreInvalidLines()
      .types(Long.class, String.class, String.class);

    DataSet<Tuple3<Long, Double, Long>> ratings = env
      .readCsvFile("ml-latest-small/ratings.csv")
      .ignoreFirstLine()
      .includeFields(false, true, true, true, false)
      .types(Long.class, Double.class, Long.class);

    // Get the average rating per movie
    DataSet<Tuple3<Long, String, Double>> avgRatings = movies
      .join(ratings)
        .where(0)
        .equalTo(0)
      .groupBy(0)  
      .reduceGroup(new AvgRatingReducer());

    // Find the top 10 movies by average rating
    DataSet<Tuple3<Long, String, Double>> top10 = avgRatings
      .sortPartition(2, Order.DESCENDING)
      .setParallelism(1)
      .first(10);

    top10.print();
  }

  public static class AvgRatingReducer implements GroupReduceFunction<Tuple3<Long, String, Tuple3<Long, Double, Long>>, Tuple3<Long, String, Double>> {
    @Override
    public void reduce(Iterable<Tuple3<Long, String, Tuple3<Long, Double, Long>>> values, Collector<Tuple3<Long, String, Double>> out) {
      long movieId = 0L;
      String title = "";
      int numRatings = 0;
      double sumRatings = 0;
      for (Tuple3<Long, String, Tuple3<Long, Double, Long>> value : values) {
        movieId = value.f0;
        title = value.f1;
        sumRatings += value.f2.f1;
        numRatings++;
      }
      double avgRating = sumRatings / numRatings;
      out.collect(new Tuple3<>(movieId, title, avgRating));
    }
  }
}

Let‘s break down what‘s happening:

  1. We create a DataSet<Tuple3<Long, String, String>> from the movies.csv file, ignoring the first header line and any invalid records. The fields are the movie ID, title, and genres.

  2. We create a DataSet<Tuple3<Long, Double, Long>> from the ratings.csv file, again ignoring the first line. The fields are the user ID, rating, and timestamp. We only need the movie ID and rating for this example.

  3. To get the average rating per movie, we perform an inner join between the movies and ratings DataSets on the movie ID field. This gives us Tuple3<Long, String, Tuple3<Long, Double, Long>> elements containing the movie data and rating data.

  4. We group the joined DataSet by movie ID and apply a custom GroupReduceFunction called AvgRatingReducer. This iterates through the values for each movie, sums the ratings, and divides by the total number of ratings to get the average.

  5. The AvgRatingReducer outputs a DataSet<Tuple3<Long, String, Double>> containing the movie ID, title, and average rating for each movie.

  6. To find the top 10 movies by average rating, we sort the avgRatings DataSet in descending order on the rating field and take the first 10 elements. We set the parallelism to 1 to ensure correct ordering.

  7. Finally, we print out the results.

When you run this Flink job, you should see output like:

(318,Shawshank Redemption, The (1994),4.429022082018927)
(858,Godfather, The (1972),4.289625167334001)
(50,Usual Suspects, The (1995),4.276217133307647)
(527,Schindler‘s List (1993),4.275937183383992)
(2019,Seven Samurai (Shichinin no samurai) (1954),4.271929824561403)
(260,Star Wars: Episode IV - A New Hope (1977),4.2310606060606055)
(1276,Cool Hand Luke (1967),4.227129337539433)
(1210,Apocalypse Now (1979),4.220797227036395)
(7502,Band of Brothers (2001),4.2138364779874215)
(1196,Star Wars: Episode V - The Empire Strikes Back (1980),4.204721753794266)

This is just a taste of what you can do with Flink‘s DataSet API for batch processing. You have a wide range of transformations and utilities at your disposal for data cleansing, validation, enrichment, aggregation, joins, and analytics.

Packaging and Running a Flink Job

To package your Flink job for execution on a cluster, you simply build a JAR file containing your application code and dependencies. Flink provides a maven-shade-plugin for this purpose.

Add the following build config to your pom.xml:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.2.4</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <artifactSet>
                            <excludes>
                                <exclude>org.apache.flink:force-shading</exclude>
                                <exclude>com.google.code.findbugs:jsr305</exclude>
                            </excludes>
                        </artifactSet>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Then run mvn clean package to build the JAR.

To run the job on a Flink cluster, use the flink run command:

flink run -c <your main class> target/<your jar name>.jar

For local testing, you can also use the flink run command and pass the –embedded flag.

Monitoring and Debugging

Flink provides a web UI for monitoring and debugging jobs. By default, it runs on port 8081 of the JobManager node.

Flink web UI

From the dashboard, you can drill down into each job to view key metrics, a graph visualization of the operators, timelines of events, backpressure status, and more. The Task Managers provide detailed metrics on resource consumption as well.

Flink also supports taking savepoints of application state for stopping and restarting jobs gracefully. You can use the command line tools to trigger savepoints and restore from them.

Conclusion

Apache Flink is a powerful framework for efficient, scalable data processing on bounded and unbounded datasets. Its DataSet API makes it easy to develop sophisticated batch applications at a high level with the full power and expressiveness of Java and Scala.

Beyond the core APIs, Flink offers many libraries for common use cases:

  • Gelly for graph processing and analysis
  • FlinkML for machine learning
  • the Table API and SQL support for a unified declarative API
  • CEP for complex event processing

Flink has seen widespread adoption across many industries for use cases like data analytics, ETL, fraud detection, inventory management, anomaly detection, and more. Its "pipeline-as-a-service" architecture and exactly-once state semantics make it especially well suited for stateful applications that require consistency and reliability.

To learn more, check out the Apache Flink documentation at https://flink.apache.org/. The user mailing list is also a great resource for questions and discussion.

Similar Posts