当前位置:首页 » 《随便一记》 » 正文

Spark综合学习笔记(八)SparkStreaming案例2 状态管理_斯特凡今天也很帅的博客

9 人参与  2022年05月02日 13:25  分类 : 《随便一记》  评论

点击全文阅读


学习致谢:

https://www.bilibili.com/video/BV1Xz4y1m7cv?p=42

需求:

对从Socket接收的数据做WordCoun并要求能够和历史数据进行累加!
如:先发了一个spark,得到spark,1然后不管隔多久再发一个spark,得到spark,2也就是说要对数据的历史状态进行维护!
在这里插入图片描述

实现思路:

一、updataStateByKey

先设置checkpoint存储状态status,使用updataStateByKey实现状态管理的单词统计,需要自己写一个updateFunc方法,如下:
在这里插入图片描述

代码实现

package streaming

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

/**
  * Author itcast
  * Desc 使用SparkStreaming接受node1"9999的数据并做WordCount+实现状态管理
  * 如输入 spark hadoop得到(spark,1)(hadoop,1)
  * 再下一个批次输入spark,得到spark,2
  */
object Status {
  def main(args: Array[String]): Unit = {
    //TODO 0.准备环境
    val conf:SparkConf=new SparkConf().setMaster("spark").setMaster("local[*]")
    val sc:   SparkContext=new SparkContext(conf)
    sc.setLogLevel("WARN")
    //the time interval at which streaming data will be dicided into batches
    val ssc:StreamingContext= new StreamingContext(sc,Seconds(5))

    //The checkpoint directory has not been set. PLease set it by streamingContext.checkpoint().
    //注意:state存在checkpoint中
    ssc.checkpoint("./ckp")
    //TODO 1.加载数据
    val lines:ReceiverInputDStream[String]=ssc.socketTextStream("node1",9999)
    //TODO 2.处理数据
    //定义一个函数用来处理状态:把当前数据和历史状态累加
    //currentValues:表示该key(spark)的当前批次的值,如:[1,1]
    //historyValue:表示该key(如spark)的历史值,第一次是0,后面之后就是之前的累加值,如1
    val updateFunc=(currentValues:Seq[Int],historyValue:Option[Int])=>{
      if(currentValues.size>0){
        val currentResult:Int=currentValues.sum+historyValue.getOrElse(0)
        Some(currentResult)
      }else{
        historyValue
      }
    }
    val resuleDS:DStream[(String,Int)]=lines.flatMap(_.split(" "))
      .map((_,1))
      //updateFunc:(Seq[v],Option[s]) =>option[s]
      .updateStateByKey(updateFunc)
    //TODO 3.输出结果
    resuleDS.print()
    //TODO 4.启动并等待结束
    ssc.start()
    ssc.awaitTermination()//注意:流式应用程序启动之后需要一直运行等待停止、等待到来
    //TODO 5.关闭资源
    ssc.stop(stopSparkContext = true,stopGracefully = true)//优雅关闭
  }
}

演示:

(1)如图,不同批次实现了累计
在这里插入图片描述

(2)停掉之后再重新启动,后之前的数据恢复不了了,就是说历史状态维护只能在当前应用
在这里插入图片描述

二、mapWithState

在这里插入图片描述


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/39331.html

状态  数据  累加  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

最新文章

  • 重生后我转嫁首富瘸腿独子,总裁前夫却疯了一口气看完_妹妹傅云琛沈明辉独家番外_小说后续在线阅读_无删减免费完结_
  • 我拒绝给系花捐款后,全系同学悔疯了在线阅读_小说后续在线阅读_无删减免费完结_
  • 我让位给女友的透视眼竹马,他却说如果能重生再也不来了。虐心反转_玉石林若女友推荐_小说后续在线阅读_无删减免费完结_
  • 相国独子的丫鬟砸坏我的玉佩后,我当场拒婚阅读_玉佩陈郡谢氏全新_小说后续在线阅读_无删减免费完结_
  • 手术时,我看着病人惨死最新试读_淼淼陆知衍姜颜全本完结_小说后续在线阅读_无删减免费完结_
  • 男友霸道给我开黑卡,转头却骂我是捞女最新章节_肖年顾客黑卡热文_小说后续在线阅读_无删减免费完结_
  • 他在回忆尽头全集_贺南舟许清梨叶絮完结txt_小说后续在线阅读_无删减免费完结_
  • 旅游结婚那天未婚夫另娶女秘书,我让他们一无所有连载_老公迎宾超长版_小说后续在线阅读_无删减免费完结_
  • 完结文异界修仙我的直播间能打赏核弹列表_完结文异界修仙我的直播间能打赏核弹(江流年沈红菱)
  • 全书浏览陪弟弟混骑行圈,离婚你哭什么?(韩星河柳千雪)_陪弟弟混骑行圈,离婚你哭什么?(韩星河柳千雪)全书结局
  • 老公出轨我助攻虐心反转_秦绍卿卿阿溪后续加长_小说后续在线阅读_无删减免费完结_
  • 替老婆坐牢出狱那天,我被送去斗兽场精校文本_许言沈宇郑子番茄热门_小说后续在线阅读_无删减免费完结_

    关于我们 | 我要投稿 | 免责申明

    Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1