Apache Beam Spark Runner example using Maven

Apache Beam Spark Runner example using Maven

In this post I will show you how to create Apache Beam Spark Runner project using Maven.

Tools/ Frameworks used:

  • Java 8
  • Apache Spark
  • Maven
  • Intellij
  • Apache Beam

  1. Add Cloudera repository in maven settings.xml
  2. <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>

    full settings.xml file:

    <settings></settings><profiles><profile>
          <id>cld</id>
     
          <repositories>
           <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
         <repository>
          <id>mvn</id>
          <url>http://repo1.maven.org/maven2/</url>
        <release>
        	<enabled>true</enabled>
        </release>
        <snapshots>
        	<enabled>true</enabled>
        </snapshots>
        </repository>
          </repositories>
     
        </profile> </profiles> <activeProfiles>
        <activeProfile>cld</activeProfile>
      </activeProfiles>
     
    </settings>


  3. Create maven project
  4. mvn archetype:generate -DgroupId=com.ts.spark
       -DartifactId=beam-spark
       -DarchetypeArtifactId=maven-archetype-quickstart

  5. Add Below dependencies to pom file:
  6. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
     
    	<groupId>com.ts.spark</groupId>
    	<artifactId>beam-spark</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
     
    	<name>beam-spark</name>
    	<url>http://maven.apache.org</url>
     
    	<properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.7</java.version>
            <spark.version>1.5.2</spark.version>
            <google-cloud-dataflow-version>1.3.0</google-cloud-dataflow-version>
        </properties>
     
    	<dependencies>
    		<dependency>
    			<groupId>org.testng</groupId>
    			<artifactId>testng</artifactId>
    			<version>6.8</version>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>com.cloudera.dataflow.spark</groupId>
    			<artifactId>spark-dataflow</artifactId>
    			<version>0.4.2</version>
    		</dependency>
    		<dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.google.cloud.dataflow</groupId>
                <artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
                <version>${google-cloud-dataflow-version}</version>
                <exclusions>
                    <!-- Use Hadoop/Spark's backend logger -->
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-jdk14</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.google.cloud.dataflow</groupId>
                <artifactId>google-cloud-dataflow-java-examples-all</artifactId>
                <version>${google-cloud-dataflow-version}</version>
                <exclusions>
                    <!-- Use Hadoop/Spark's backend logger -->
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-jdk14</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
           	</dependencies>
     
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
     
            </plugins>
     
        </build>
    </project>

  7. Function to extract words:
  8. static class ExtractWordsFn extends DoFn<String, String> {
    		private final Aggregator<Long, Long> emptyLines =
    				createAggregator("emptyLines", new Sum.SumLongFn());
     
    		@Override
    		public void processElement(ProcessContext c) {
    			if (c.element().trim().isEmpty()) {
    				emptyLines.addValue(1L);
    			}
     
    			// Split the line into words.
    			String[] words = c.element().split("[^a-zA-Z']+");
     
    			// Output each word encountered into the output PCollection.
    			for (String word : words) {
    				if (!word.isEmpty()) {
    					c.output(word);
    				}
    			}
    		}
    	}

  9. Count words function:
  10. 	public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
    		@Override
    		public PCollection<String> apply(PCollection<String> lines) {
     
    			// Convert lines of text into individual words.
    			PCollection<String> words = lines.apply(
    					ParDo.of(new ExtractWordsFn()));
     
    			// Count the number of times each word occurs.
    			PCollection<KV<String, Long>> wordCounts =
    					words.apply(Count.<String>perElement());
     
    			// Format each word and count into a printable string.
    			return wordCounts.apply(ParDo.of(new FormatCountsFn()));
    		}
     
    	}

  11. Format output:
  12. private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
    		@Override
    		public void processElement(ProcessContext c) {
    			c.output(c.element().getKey() + ": " + c.element().getValue());
    		}
    	}

  13. Driver Method:
  14. public class WordCount {
    private static final String[] WORDS_ARRAY = {
    			"hi there", "hi", "hi sue bob",
    			"hi sue", "", "bob hi"};
    	private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
    	private static final Set<String> EXPECTED_COUNT_SET =
    			ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
     
    	public static void main(String[] args) {
    		SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
     
    		options.setRunner(SparkPipelineRunner.class);
    		Pipeline p = Pipeline.create(options);
    		PCollection<String> output=p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()).apply(new CountWords());
    		SparkPipelineRunner runner = SparkPipelineRunner.create(options);
    		EvaluationResult result = runner.run(p);
    		DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
    			}}

  15. Full Driver class:
  16. package com.ts.beam.spark;
     
    import com.cloudera.dataflow.spark.EvaluationResult;
    import com.cloudera.dataflow.spark.SparkPipelineOptions;
    import com.cloudera.dataflow.spark.SparkPipelineOptionsFactory;
    import com.cloudera.dataflow.spark.SparkPipelineRunner;
    import com.google.cloud.dataflow.sdk.Pipeline;
    import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
    import com.google.cloud.dataflow.sdk.io.TextIO;
    import com.google.cloud.dataflow.sdk.options.PipelineOptions;
    import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableSet;
    import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
    import com.google.cloud.dataflow.sdk.transforms.*;
    import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
    import com.google.cloud.dataflow.sdk.values.KV;
    import com.google.cloud.dataflow.sdk.values.PCollection;
     
    import java.nio.channels.Pipe;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Set;
     
    public class WordCount {
    	private static final String[] WORDS_ARRAY = {
    			"hi there", "hi", "hi sue bob",
    			"hi sue", "", "bob hi"};
    	private static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
    	private static final Set<String> EXPECTED_COUNT_SET =
    			ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
     
    	public static void main(String[] args) {
    		SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
     
    		options.setRunner(SparkPipelineRunner.class);
    		Pipeline p = Pipeline.create(options);
    		PCollection<String> output=p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()).apply(new CountWords());
    		SparkPipelineRunner runner = SparkPipelineRunner.create(options);
    		EvaluationResult result = runner.run(p);
    		DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
     
    	}
     
    	static class ExtractWordsFn extends DoFn<String, String> {
    		private final Aggregator<Long, Long> emptyLines =
    				createAggregator("emptyLines", new Sum.SumLongFn());
     
    		@Override
    		public void processElement(ProcessContext c) {
    			if (c.element().trim().isEmpty()) {
    				emptyLines.addValue(1L);
    			}
     
    			// Split the line into words.
    			String[] words = c.element().split("[^a-zA-Z']+");
     
    			// Output each word encountered into the output PCollection.
    			for (String word : words) {
    				if (!word.isEmpty()) {
    					c.output(word);
    				}
    			}
    		}
    	}
    	public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> {
    		@Override
    		public PCollection<String> apply(PCollection<String> lines) {
     
    			// Convert lines of text into individual words.
    			PCollection<String> words = lines.apply(
    					ParDo.of(new ExtractWordsFn()));
     
    			// Count the number of times each word occurs.
    			PCollection<KV<String, Long>> wordCounts =
    					words.apply(Count.<String>perElement());
     
    			// Format each word and count into a printable string.
    			return wordCounts.apply(ParDo.of(new FormatCountsFn()));
    		}
     
    	}
    	private static class FormatCountsFn extends DoFn<KV<String, Long>, String> {
    		@Override
    		public void processElement(ProcessContext c) {
    			c.output(c.element().getKey() + ": " + c.element().getValue());
    		}
    	}
     
    }

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.