Please find below spark kafka streaming example - SubscribePattern
Using SubscribePattern you can subscribe to topics using regex. In this example I have specified regex 'topic.*'. So this program will subscribe from all the topics which would fall into this regex. Example: topicA, topicB etc
Using SubscribePattern you can subscribe to topics using regex. In this example I have specified regex 'topic.*'. So this program will subscribe from all the topics which would fall into this regex. Example: topicA, topicB etc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.sps.test | |
import org.apache.kafka.clients.consumer.ConsumerRecord | |
import org.apache.kafka.common.serialization.StringDeserializer | |
import org.apache.spark.streaming.kafka010._ | |
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent | |
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe | |
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.streaming.StreamingContext | |
import org.apache.spark.streaming.Seconds | |
import java.util.regex.Pattern | |
import org.apache.spark.streaming.kafka010.SubscribePattern | |
import org.apache.spark.streaming.kafka010.ConsumerStrategy | |
import scala.collection.mutable.HashMap | |
import org.apache.kafka.common.TopicPartition | |
import org.apache.spark.rdd.RDD | |
import org.apache.spark.streaming.dstream.InputDStream | |
object SubscribePatternExample extends App{ | |
val spark = SparkSession | |
.builder | |
.appName("SaprkSqlTest") | |
.master("local[*]") | |
.config("spark.sql.warehouse.dir", "file:///C:/temp") | |
.getOrCreate() | |
val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) | |
val kafkaParams = Map[String, Object]( | |
"bootstrap.servers" -> "localhost:9092", | |
"zookeeper" -> "locahost:2181", | |
"key.deserializer" -> classOf[StringDeserializer], | |
"value.deserializer" -> classOf[StringDeserializer], | |
"group.id" -> "testGroup", | |
"auto.offset.reset" -> "latest", | |
"enable.auto.commit" -> (false: java.lang.Boolean) | |
) | |
val pattern = Pattern.compile("topic.*"); | |
val consumerStrategy=ConsumerStrategies.SubscribePattern[String,String](pattern, kafkaParams); | |
val stream = KafkaUtils.createDirectStream[String, String]( | |
ssc, | |
PreferConsistent, | |
consumerStrategy | |
) | |
startStreamProcessing(stream) | |
def startStreamProcessing(stream:InputDStream[ConsumerRecord[String, String]]):Unit={ | |
stream.foreachRDD(rdd => { | |
processStream(rdd) | |
}) | |
} | |
def processStream(rdd:RDD[ConsumerRecord[String, String]]):Unit={ | |
if (!rdd.isEmpty()) { | |
rdd.foreach(record=>{ | |
println("TOPIC NAME:"+record.topic()+"----KEY-"+record.key()+":"+record.value()) | |
}) | |
} | |
} | |
ssc.start() | |
ssc.awaitTermination() | |
} |
No comments:
Post a Comment