Apache Spark Operations implementation in Java

Apache Spark Operations implementation in Java

Spark with Java

How to Use non-serializable classes in Spark closures-

Spark closures, objects must be serializable otherwise spark engine throws ‘NotSerializableException’. You will often come across the situation when you can’t change the actual class implementation. Resolve this error using the Kryo.

Spark init-

The objects (SparkSession, SparkConf and JavaSparkContext) are being used throughout the examples-  */

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Test");
SparkSession session = SparkSession.builder().config(conf).getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());

String path= "<dataset path>/employee";

Register classes as serializable in SparkContent-

//Exact exception spark throws when class is not serialize

java.io.NotSerializableException: com.ts.blog.batch.serialize.ABean
Serialization stack:
- object not serializable (class: com.ts.blog.batch.serialize.ABean, value: com.ts.blog.batch.serialize.ABean@27f71dca)

* Create Custom KryoRegistrator implementation
public class CustomKKryoRegistrator implements org.apache.spark.serializer.KryoRegistrator{
    public void registerClasses(Kryo kryo) {
        //Register non serialize classes

//Register Kryo in SparkConf-

Create Dataset using Encoder-

import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class); Dataset<Employee> dataset= session.read().json(employee.json).as(employeeEncoder);

public class Employee {

    private Integer id;
    private  String name;

cat employee.json
{ “id”: 100, “name”: “xyz”}
{“id”: 200,“name”: “prq”}

Find out the Max value from Dataset column-

Row max = dataset.agg(org.apache.spark.sql.functions.max(dataset.col("id"))).as("max").head();

Define custom UDF Function with SparkSession

Dataset<Long> ds= SessionRegistry.session.range(1,20);

        ds.sparkSession().udf().register("add100",(Long l)->l+100,org.apache.spark.sql.types.DataTypes.LongType);


        ds.sparkSession().sql("select add100(id) from allnum").show();

Custom Property file in Spark

Create property file- e.g. job.properties


//Supply Propty to spark using spark-submit
${SPARK_HOME}/bin/spark-submit --files job.properties
//Read file in drive

import java.util.Properties;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkFiles;
import java.io.InputStream;
import java.io.FileInputStream;

//Load file to propert object using HDFS FileSystem
String fileName = SparkFiles.get("job.properties")
Configuration hdfsConf = new Configuration();
FileSystem fs = FileSystem.get(hdfsConf);

//THe file name contains absolute path of file
FSDataInputStream is = fs.open(new Path(fileName));

Properties prop = new Properties();
//load properties
//retrieve properties

Accumulator implementation-

 * The sample accumulator to store set of string values
class CustomAccumulator extends AccumulatorV2<String,Set<String>>{
    Set<String> myval= new HashSet<>();
    public void merge(AccumulatorV2<String, Set<String>> other) {
    public boolean isZero() {
        return myval.size()==0;

    public AccumulatorV2<String, Set<String>> copy() {
        return this;

    public void reset() {

    public void add(String v) {
    public Set<String> value() {
        return myval;

   //Register accumulator to SparkContext. jsc object is created during init section (begining)

AccumulatorV2 accumulatorV2 = new CustomAccumulator();
//Use accumulatorV2 like normal accumulator

Custom Comparator implementation for the compare operations-

* Comparator for Integer
public class LengthComparator implements Comparator<Integer>{

    public int compare(Integer o1, Integer o2) {
        return 0;
//jsc is JavaSparkContext defined in the beginning during init. 
JavaRDD<Integer> javaRDD  = jsc.parallelize(Arrays.asList(new Integer[]{100,20,10,1020,100}));
//Find max value using custom implementation
Integer maxVal=  javaRDD.max(new LengthComparator());

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.