博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
069 在SparkStreaming的窗口分析
阅读量:6207 次
发布时间:2019-06-21

本文共 2340 字,大约阅读时间需要 7 分钟。

一:说明

1.图例说明

  

  -------------------------------------------------------------------------------------------------------------------------------------------------------------------------

  

 

2.对比说明

  DStream:

    batchInterval: 批次产生间隔时间
  Window DStream:
    windowInterval: 窗口间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)
    slideInterval:窗口滑动间隔时间, 必须是父DStream的batchInterval的倍数(k >= 1, 整数)

 

3.API

  使用CTRL+F3,可以参考这篇文档的快捷键:https://blog.csdn.net/qq_36901488/article/details/80704245

  

 

二:程序

1.程序

1 package com.window.it 2 import org.apache.spark.{SparkConf, SparkContext} 3 import org.apache.spark.storage.StorageLevel 4 import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} 5 import org.apache.spark.streaming.dstream.DStream 6 import org.apache.spark.streaming.kafka.KafkaUtils 7  8 object ReduceWindow { 9   def main(args: Array[String]): Unit = {10     val conf = new SparkConf()11       .setAppName("StreamingWindowOfKafka")12       .setMaster("local[*]")13     val sc = SparkContext.getOrCreate(conf)14     val ssc = new StreamingContext(sc, Seconds(5))15     // 当调用updateStateByKey函数API的时候,必须给定checkpoint dir16     // 路径对应的文件夹不能存在17     ssc.checkpoint("hdfs://linux-hadoop01.ibeifeng.com:8020/beifeng/spark/streaming/452512")18 19     val kafkaParams = Map(20       "group.id" -> "streaming-kafka-78912151",21       "zookeeper.connect" -> "linux-hadoop01.ibeifeng.com:2181/kafka",22       "auto.offset.reset" -> "smallest"23     )24     val topics = Map("beifeng" -> 4) // topics中value是读取数据的线程数量,所以必须大于等于125     val dstream = KafkaUtils.createStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](26       ssc, // 给定SparkStreaming上下文27       kafkaParams, // 给定连接kafka的参数信息 ===> 通过Kafka HighLevelConsumerAPI连接28       topics, // 给定读取对应topic的名称以及读取数据的线程数量29       StorageLevel.MEMORY_AND_DISK_2 // 指定数据接收器接收到kafka的数据后保存的存储级别30     ).map(_._2)31 32     val resultWordCount = dstream33       .filter(line => line.nonEmpty)34       .flatMap(line => line.split(" ").map((_, 1)))35       .reduceByKeyAndWindow(36         (a: Int, b: Int) => a + b,37         Seconds(15), // 窗口大小38         Seconds(10) // 滑动大小39       )40     resultWordCount.print() // 这个也是打印数据41     42     // 启动开始处理43     ssc.start()44     ssc.awaitTermination() // 等等结束,监控一个线程的中断操作45   }46 }

 

2.效果

  这里主要看的是页面的DAG。

 

你可能感兴趣的文章
垃圾回收算法优缺点对比
查看>>
正则表达式 匹配常用手机号 (13、15\17\18开头的十一位手机号)
查看>>
GitLab 11.9 正式发布,自动化工具 ChatOps 已开源
查看>>
交换机的基本原理配置(一)
查看>>
android baidupush
查看>>
Lottie 站在巨人的肩膀上实现 Android 酷炫动画效果
查看>>
“陪护机器人”研报:距离真正“陪护”还差那么一点
查看>>
深入框架本源系列 —— Virtual Dom
查看>>
mongodb分布式集群搭建手记
查看>>
您有一个上云锦囊尚未领取!
查看>>
Java Web的web.xml文件作用及基本配置(转)
查看>>
区块链101:区块链的应用和用例是什么?
查看>>
马约拉纳费米子:推动量子计算的“天使粒子”
查看>>
使用ActionTrail Python SDK
查看>>
数据显示,中国近一半的独角兽企业由“BATJ”四巨头投资
查看>>
log日志轮转--logrotate
查看>>
安装输入发
查看>>
用户配置相关文件
查看>>
老王学linux-ftp
查看>>
kvm vnc的使用,鼠标漂移等
查看>>