Spark Socket streaming example windows

Spark Socket streaming example windows

Spark Socket streaming example windows:
Windows OS doesn’t provide any netcat utility and if you are trying to test your spark streaming socket program in windows then either you download external netcat utility or create socket program equivalent to netcat.

The spark streaming socket word count example is implemented using netcat command.

Create socket utility using java, integrate it with Spark streaming and CustomSocketReceiver in java.

  • Create socket server utility which opens port and write data to socket:

  • package com.ts.spark.streaming;
     
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Scanner;
     
    /**
     * <p>This class create server socket, open InputStream through
     * System.in and read data until find "close" text in separate line</p>
     */
    public class SocketWriter {
     
        public static void main(String[] args) throws Exception {
            System.out.println("Begin");
            /**
             * Create socket server
             */
            ServerSocket server = new ServerSocket(9999);
            //open socket
            Socket socket = server.accept();
     
            DataOutputStream outputStream = new DataOutputStream(socket.getOutputStream());
     
            System.out.println("Start writing data. Enter close when finish");
            Scanner sc = new Scanner(System.in);
            String str;
            /**
             * Read content from scanner and write to socket.
             */
            while (!(str = sc.nextLine()).equals("close")) {
                outputStream.writeUTF(str);
            }
            //close connection now.
            server.close();
        }
     
    }
     
    </pre
     
     
    <li>
    <strong>Default socket receiver will not work as DataOutputStream is required to read data from socket. Use below receiver:</strong>
    </li>
    <pre lang="java">
    package com.ts.spark.streaming;
     
    import org.apache.spark.storage.StorageLevel;
    import org.apache.spark.streaming.receiver.Receiver;
     
    import java.io.DataInputStream;
    import java.net.ConnectException;
    import java.net.Socket;
     
    /**
     * Custom java socket receiver
     * Replaced BufferedReader with DataSocketStream.
     */
    public class JavaCustomSocketReceiver extends Receiver<String> {
     
        String host = null;
        int port = -1;
     
        public JavaCustomSocketReceiver(String host_ , int port_) {
            super(StorageLevel.MEMORY_AND_DISK_2());
            host = host_;
            port = port_;
        }
     
        public void onStart() {
            // Start the thread that receives data over a connection
            new Thread()  {
                @Override public void run() {
                    receive();
                }
            }.start();
        }
     
        public void onStop() {
            // There is nothing much to do as the thread calling receive()
            // is designed to stop by itself if isStopped() returns false
        }
     
        /** Create a socket connection and receive data until receiver is stopped */
        private void receive() {
            Socket socket = null;
            String userInput = null;
     
            try {
                // connect to the server
                socket = new Socket(host, port);
     
                DataInputStream reader=new DataInputStream(socket.getInputStream());
                // Until stopped or connection broken continue reading
                while (!isStopped() && (userInput = reader.readUTF()) != null) {
                    store(userInput);
                }
                reader.close();
                socket.close();
     
                // Restart in an attempt to connect again when server is active again
                restart("Trying to connect again");
            } catch(ConnectException ce) {
                // restart if could not connect to server
                restart("Could not connect", ce);
            } catch(Throwable t) {
                // restart if there is any other error
                restart("Error receiving data", t);
            }
        }
    }
  • Finally use your new Receiver in wordCount. I am saving each red to disk:
  • package com.ts.spark.streaming;
     
    import org.apache.spark.Accumulator;
    import org.apache.spark.SparkConf;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import scala.Tuple2;
    import scala.tools.cmd.Spec;
     
    import java.util.Arrays;
     
    /**
     * Socket stream word count example.
     */
    public class SocketStreamingExample {
        public static void main(String[] args) throws Exception {
     
            SparkConf sparkConf = new SparkConf().setAppName("word count example").setMaster("local[*]");
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(1));
     
            Accumulator<Integer> count= ssc.sparkContext().accumulator(0,"Ts");
            JavaReceiverInputDStream<String> lines = ssc.receiverStream(new JavaCustomSocketReceiver("localhost", 9999));
     
            JavaDStream<String> words = lines.flatMap(x ->{count.add(1); return  Arrays.asList(x.split(" ")).iterator();});
     
            JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<String, Integer>(s, 1))
                    .reduceByKey((i1, i2) -> i1 + i2);
     
            wordCounts.foreachRDD(rdd->rdd.saveAsTextFile("<path to save data>"));
     
            ssc.start();
            ssc.awaitTermination();
     
        }
    }

    Thanks for reading this blog.

Leave a Reply

Your email address will not be published. Required fields are marked *

Time limit is exhausted. Please reload CAPTCHA.