dedecms建站教程,福田网站设计哪家好,wordpress网站菜单固定,有人免费资源吗窗口操作就是把多个采集周期设置成一个窗口#xff0c;一起来计算#xff0c;然后进行滑动#xff0c;根据设置的滑动大小。
窗口大小和滑动大小#xff0c;要是采集周期的倍数
package date_10_17_SparkStreamingimport org.apache.spark.SparkConf
import org.apache.s…窗口操作就是把多个采集周期设置成一个窗口一起来计算然后进行滑动根据设置的滑动大小。
窗口大小和滑动大小要是采集周期的倍数
package date_10_17_SparkStreamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka.KafkaUtilsobject ss {def main(args: Array[String]): Unit {//Scala中的窗口
// val ints List(1,2,3,4,5)
//
// val ites ints.sliding(2,2)
//
// for (list - ites){
// println(list.mkString(,))
// }//SparkStreaming窗口val conf new SparkConf().setAppName(wordCount).setMaster(local[*])val streamingContext new StreamingContext(conf,Seconds(3))streamingContext.checkpoint(cp)//连接kafkaval kafkaStream KafkaUtils.createStream(streamingContext,chun1:2181,chun,Map(chun-3))//一个是窗口大小和滑动大小要是采集周期的倍数val windowDStream kafkaStream.window(Seconds(6),Seconds(3))//wordcount运算val mapDStream windowDStream.flatMap(_._2.split( )).map((_,1))val resultDStream mapDStream.reduceByKey(__)resultDStream.print()//启动采集器streamingContext.start()//等待采集器关闭才关闭DriverstreamingContext.awaitTermination()}}