Please find below spark kafka streaming example:
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sps.test | |
import org.apache.kafka.clients.consumer.ConsumerRecord | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.streaming.Seconds | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.dstream.InputDStream | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
import org.apache.spark.streaming.kafka010.KafkaUtils | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
object KafkaStreamTest extends App{ | |
val spark = SparkSession | |
.builder | |
.appName("SaprkSqlTest") | |
.master("local[*]") | |
.config("spark.sql.warehouse.dir", "file:///C:/temp") | |
.getOrCreate() | |
val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> "localhost:9092", | |
"zookeeper" -> "localhost:2181", | |
"key.deserializer" -> classOf[StringDeserializer], | |
"value.deserializer" -> classOf[StringDeserializer], | |
"group.id" -> "testGroup", | |
"auto.offset.reset" -> "latest", | |
"enable.auto.commit" -> (false: java.lang.Boolean) | |
) | |
val topics = Array("topicA,topicB") | |
val stream = KafkaUtils.createDirectStream[String, String]( | |
ssc, | |
PreferConsistent, | |
Subscribe[String, String](topics, kafkaParams) | |
) | |
startStreamProcessing(stream) | |
def startStreamProcessing(stream:InputDStream[ConsumerRecord[String, String]]):Unit={ | |
stream.foreachRDD(rdd => { | |
processStream(rdd) | |
}) | |
} | |
def processStream(rdd:RDD[ConsumerRecord[String, String]]):Unit={ | |
if (!rdd.isEmpty()) { | |
rdd.foreach(record=>{ | |
println("TOPIC NAME:"+record.topic()+"----KEY-"+record.key()+":"+record.value()) | |
}) | |
} | |
} | |
ssc.start() | |
ssc.awaitTermination() | |
} |
Here is the dependencies:
ReplyDeleteorg.apache.spark
spark-core_2.11
2.0.2
org.apache.spark
spark-sql_2.11
2.0.2
org.apache.spark
spark-streaming_2.11
2.0.2
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.0.2