一:说明
1.图例说明
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2.对比说明
DStream:
batchInterval: 批次产生间隔时间 Window DStream: windowInterval: 窗口间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数) slideInterval:窗口滑动间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)
3.API
使用CTRL+F3,可以参考这篇文档的快捷键:https://blog.csdn.net/qq_36901488/article/details/80704245
二:程序
1.程序
1 package com.window.it 2 import org.apache.spark.{SparkConf, SparkContext} 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} 5 import org.apache.spark.streaming.dstream.DStream 6 import org.apache.spark.streaming.kafka.KafkaUtils 7 8 object ReduceWindow { 9 def main(args: Array[String]): Unit = {10 val conf = new SparkConf()11 .setAppName("StreamingWindowOfKafka")12 .setMaster("local[*]")13 val sc = SparkContext.getOrCreate(conf)14 val ssc = new StreamingContext(sc, Seconds(5))15 // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir16 // 路径对应的文件夹不能存在17 ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/452512")18 19 val kafkaParams = Map(20 "group.id" -> "streaming-kafka-78912151",21 "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",22 "auto.offset.reset" -> "smallest"23 )24 val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于125 val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](26 ssc, // 给定SparkStreaming上下文27 kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接28 topics, // 给定读取对应topic的名称以及读取数据的线程数量29 StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别30 ).map(_._2)31 32 val resultWordCount = dstream33 .filter(line => line.nonEmpty)34 .flatMap(line => line.split(" ").map((_, 1)))35 .reduceByKeyAndWindow(36 (a: Int, b: Int) => a + b,37 Seconds(15), // 窗口大小38 Seconds(10) // 滑动大小39 )40 resultWordCount.print() // 这个也是打印数据41 42 // 启动开始处理43 ssc.start()44 ssc.awaitTermination() // 等等结束,监控一个线程的中断操作45 }46 }
2.效果
这里主要看的是页面的DAG。