Apache Spark Operations implementation in Java

Apache Spark Operations implementation in Java

Spark with Java


How to Use non-serializable classes in Spark closures-

Spark closures, objects must be serializable otherwise spark engine throws ‘NotSerializableException’. You will often come across the situation when you can’t change the actual class implementation. Resolve this error using the Kryo.

Spark init-

/*
The objects (SparkSession, SparkConf and JavaSparkContext) are being used throughout the examples-  */

SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Test");
SparkSession session = SparkSession.builder().config(conf).getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());

String path= "<dataset path>/employee";

Register classes as serializable in SparkContent-

//Exact exception spark throws when class is not serialize

java.io.NotSerializableException: com.ts.blog.batch.serialize.ABean
Serialization stack:
- object not serializable (class: com.ts.blog.batch.serialize.ABean, value: com.ts.blog.batch.serialize.ABean@27f71dca)

/**
* Create Custom KryoRegistrator implementation
*/
public class CustomKKryoRegistrator implements org.apache.spark.serializer.KryoRegistrator{
    @Override
    public void registerClasses(Kryo kryo) {
        kryo.register(ABean.class);
        //Register non serialize classes
    }
}

//Register Kryo in SparkConf-
sparkConf.set("spark.kryo.registrar",CustomKKryoRegistrator.class.getName());

Create Dataset using Encoder-

import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class); Dataset<Employee> dataset= session.read().json(employee.json).as(employeeEncoder);

public class Employee {

    private Integer id;
    private  String name;
}

cat employee.json
{ “id”: 100, “name”: “xyz”}
{“id”: 200,“name”: “prq”}

Find out the Max value from Dataset column-

Row max = dataset.agg(org.apache.spark.sql.functions.max(dataset.col("id"))).as("max").head();
System.out.println(max);

Define custom UDF Function with SparkSession

Dataset<Long> ds= SessionRegistry.session.range(1,20);

        ds.sparkSession().udf().register("add100",(Long l)->l+100,org.apache.spark.sql.types.DataTypes.LongType);

        ds.show();
        ds.registerTempTable("allnum");

        ds.sparkSession().sql("select add100(id) from allnum").show();

Custom Property file in Spark

Create property file- e.g. job.properties

custom.prop=xyz

//Supply Propty to spark using spark-submit
${SPARK_HOME}/bin/spark-submit --files job.properties
//Read file in drive

import java.util.Properties;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkFiles;
import java.io.InputStream;
import java.io.FileInputStream;

//Load file to propert object using HDFS FileSystem
String fileName = SparkFiles.get("job.properties")
Configuration hdfsConf = new Configuration();
FileSystem fs = FileSystem.get(hdfsConf);

//THe file name contains absolute path of file
FSDataInputStream is = fs.open(new Path(fileName));

Properties prop = new Properties();
//load properties
prop.load(is)
//retrieve properties
prop.getProperty("custom.prop");

Accumulator implementation-

/**
 * The sample accumulator to store set of string values
 */
class CustomAccumulator extends AccumulatorV2<String,Set<String>>{
    Set<String> myval= new HashSet<>();
    @Override
    public void merge(AccumulatorV2<String, Set<String>> other) {
        other.value().stream().forEach(val->myval.add(val));
    }
    @Override
    public boolean isZero() {
        return myval.size()==0;
    }

    @Override
    public AccumulatorV2<String, Set<String>> copy() {
        return this;
    }

    @Override
    public void reset() {
        myval.clear();
    }

    @Override
    public void add(String v) {
        myval.add(v);
    }
    @Override
    public Set<String> value() {
        return myval;
    }
}

   //Register accumulator to SparkContext. jsc object is created during init section (begining)

AccumulatorV2 accumulatorV2 = new CustomAccumulator();
jsc.sc().register(accumulatorV2);
//Use accumulatorV2 like normal accumulator

Custom Comparator implementation for the compare operations-

/**
* Comparator for Integer
*/
public class LengthComparator implements Comparator<Integer>{

    @Override
    public int compare(Integer o1, Integer o2) {
        return 0;
    }
}
//jsc is JavaSparkContext defined in the beginning during init. 
JavaRDD<Integer> javaRDD  = jsc.parallelize(Arrays.asList(new Integer[]{100,20,10,1020,100}));
//Find max value using custom implementation
Integer maxVal=  javaRDD.max(new LengthComparator());

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.