网站怎么做团购,阿里云做网站吗,排版设计,网站开发培训多少钱官方文档
Spark Streaming 火花流是spark API的扩展#xff0c;它支持可伸缩、高吞吐量、容错的实时数据流处理。
数据可以从多种来源(如Kafka、Flume、Kinesis或tcp套接字)中摄取#xff0c;并且可以使用用高级函数表示的复杂算法进行处理#xff0c;例如map, reduce, jo…官方文档
Spark Streaming 火花流是spark API的扩展它支持可伸缩、高吞吐量、容错的实时数据流处理。
数据可以从多种来源(如Kafka、Flume、Kinesis或tcp套接字)中摄取并且可以使用用高级函数表示的复杂算法进行处理例如map, reduce, join和window…最后可以将处理过的数据推送到文件系统、数据库和活动仪表板。事实上你可以申请星火机器学习和图形处理数据流算法。 在内部它的工作方式如下。火花流接收实时输入数据流并将数据分成几个批次然后由火花引擎进行处理生成最终的结果流。 火花流提供了一个名为离散流或DStream表示连续的数据流。DStreams可以从Kafka、Flume和Kinesis等源的输入数据流中创建也可以通过对其他DStreams应用高级操作来创建。在内部dStream表示为RDD 也就是SparkStreaming是用DStream来操作的与Spark Core里RDD操作一样
下面来是第一个程序wordcount
maven配置pom.xml !--SparkStreaming--dependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.11/artifactIdversion2.1.1/version/dependencypackage date_10_16_SparkStreaming
import org.apache.spark.{SparkConf, streaming}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object wordCount {def main(args: Array[String]): Unit {//使用SparkStreaming完成wordcount//配置对象val conf new SparkConf().setMaster(local[*]).setAppName(wordcount)//实时数据分析的环境对象//StreamingContext需要两个参数一个conf一个是采集周期val streamingContext new StreamingContext(conf,Seconds(3))//从指定的端口中采集数据val socketLineDstream streamingContext.socketTextStream(chun1,9999)//将采集的数据进行分解(扁平化)val wordToSumDstream socketLineDstream.flatMap(_.split( )).map((_,1)).reduceByKey(__)wordToSumDstream.print()//这里不能停止采集功能也就是streamingContext不能结束//可以简单理解为启动采集器streamingContext.start()//Driver等待采集器采集器不挺Driver不停止streamingContext.awaitTermination()}
}
打开虚拟机安装netcat这里用netcat来写数据
yum install -y nc安装完成后输入nc -lk 9999
运行上面程序
在netcat输入数据这里设定的每三秒为一个采集周期 看到的结果如下所示