当前位置:首页 » 《随便一记》 » 正文

SparkStreaming接收Flume数据方式一:Push模式_若兰幽竹

18 人参与  2022年04月19日 13:25  分类 : 《随便一记》  评论

点击全文阅读


SparkStreaming接收Flume数据方式一:Push模式

    • 一、前置工作
    • 二、Flume之Push模式
    • 三、Flume配置
    • 四、编写程序
    • 五、运行验证

一、前置工作


  • 需安装Flume,安装可参考:Flume安装配置与基本操作

二、Flume之Push模式


  • Push模式说明

    Flume 被 用 于 在 Flume agents 之 间 推 送 数 据 . 在 这 种 方 式 下 ,Spark Streaming 可以很方便的建立一个 receiver,起到一个 Avro agent 的作用.Flume 可以将数据推送到改 receiver.

三、Flume配置


  1. Flume之Agent的结构,如下
    在这里插入图片描述
  2. 确定source、channel、sink,如下:
    1. source为:spooldir
    2. channel:memory
    3. sink:avro
  3. 在Flume的安装目录下的conf目录下创建a3.conf,添加如下内容
    # 定义agent的三大组件
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #定义source
    a1.sources.r1.type=spooldir
    a1.sources.r1.spoolDir=/training/nginx/logs/flumeLogs
    a1.sources.r1.fileHeader=true
    
    # 定义sink
    a1.sinks.k1.type=avro
    a1.sinks.k1.channel=c1
    a1.sinks.k1.hostname=192.168.88.1
    a1.sinks.k1.port=1234
    
    # 定义channel
    a1.channels.c1.type=memory
    a1.channels.c1.capacity=10000
    a1.channels.c1.transactionCapacity= 100
    
    # 定义三者关系
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

四、编写程序


  • 程序功能说明

    该程序接收Flume推送过来的数据:
    一、查看下原始数据;
    二、将数据简单处理,完成数据的统计,如求各个部门员工工资

  • 编写程序,代码如下

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object FlumeDemo {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumeDemo")
        val streamingContext = new StreamingContext(sparkConf, Seconds(2))
    	// 	192.168.88.1为Windows中虚拟机网卡vmnet8的IP地址
        val flumeEvent: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(streamingContext, "192.168.88.1", 2345,StorageLevel.MEMORY_AND_DISK_SER)
    
        // 查看接收到的原始数据
        val flumeDStream: DStream[String] = flumeEvent.map(e => {
          println(new String(e.event.getBody.array()))
          new String(e.event.getBody.array())
        })
    
        // 分词,组合
        val mapDStream: DStream[(Int,Int)] = flumeDStream.map(x => {
          val strings = x.trim.split(",")
          val salary = strings(5).toInt
          val deptNo = strings(7).toInt
          (deptNo,salary)
        })
        // 实现统计
        val resultDStream: DStream[(Int, Int)] = mapDStream.reduceByKey(_ + _)
    
        // 打印结果
        resultDStream.print()
    
        // 启动实时计算
        streamingContext.start()
        // 等待计算结束
        streamingContext.awaitTermination()
      }
    }
    

    特别注意: 程序中的IP地址为Windows上的虚拟机网卡即vmnet8的IP地址,如下图所示
    在这里插入图片描述

五、运行验证


  1. 实验成功关键需关闭Windows的防火墙,代码中的IP地址为Windows中虚拟机网卡vmnet8的IP地址,虚拟机中flume启动后需要访问到该地址,故需要关闭Windows防火墙。关闭后,如下图所示:
    在这里插入图片描述

  2. 运行上述编写的程序,如下所示:
    在这里插入图片描述

  3. 启动Flume,如下命令:

    flume-ng agent -n a1 -f conf/a3.conf -Dflume.root.logger=INFO,console
    

    启动成功后,如下所示:
    在这里插入图片描述

  4. 将文件emp.csv(如没有,则先上传到虚拟机中)复制到Fume的source所对应的目录下(/training/nginx/logs/flumeLogs),如下所示:

    cp /root/emp.csv /training/nginx/logs/flumeLogs/
    
  5. 查看程序是否读取到数据并完成数据的处理,结果如下图所示:
    在这里插入图片描述


点击全文阅读


本文链接:http://m.zhangshiyu.com/post/38441.html

所示  程序  数据  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

最新文章

  • 山海不相逢内容精选(温逸尘沈衿)_山海不相逢内容精选(温逸尘沈衿)
  • (番外)+(全书)霍沉洲沈青禾此去经年人未还(霍沉洲沈青禾)_(霍沉洲沈青禾此去经年人未还)列表_笔趣阁(霍沉洲沈青禾)
  • (番外)+(全书)霍沉洲沈青禾(此去经年人未还霍沉洲沈青禾)完结_(霍沉洲沈青禾)列表_笔趣阁(此去经年人未还霍沉洲沈青禾)
  • 「重回八零,拒绝替嫁冲喜」章节彩蛋限时释出‌_卫东玉兰苏夏人气小说未删减节选
  • 重生七零祁同伟不再是农民儿子结局+番外纯净版全书免费重生七零祁同伟不再是农民儿子结局+番外纯净版全书免费
  • 傅雅宁的神女老婆,却在背地承欢作乐顾尘傅雅宁全书在线
  • 全文神女老婆,却在背地承欢作乐全局(顾尘傅雅宁)列表_全文神女老婆,却在背地承欢作乐全局
  • (番外)+(全书)此去经年人未还全书+番外+后续免费下载_(沈青禾霍沉洲)此去经年人未还全书+番外+后续列表_笔趣阁(沈青禾霍沉洲)
  • 完结文毁容的姐姐和瞎眼的我离开后,姜家两兄弟悔哭了+后续列表_完结文毁容的姐姐和瞎眼的我离开后,姜家两兄弟悔哭了+后续(林梦婉)
  • 妻子辱我爸受贿自杀,我掏出一等军功章节选推荐_[陈素云辰朋友]小说精彩章节分享
  • 全书浏览苔藓爬满旧日诺言新上(顾砚廷慕晚夏)_苔藓爬满旧日诺言新上(顾砚廷慕晚夏)全书结局
  • 顾尘傅雅宁(神女老婆,却在背地承欢作乐+后续+结局)结局_(顾尘傅雅宁神女老婆,却在背地承欢作乐+后续+结局全书结局)结局列表_笔趣阁(顾尘傅雅宁)

    关于我们 | 我要投稿 | 免责申明

    Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1