大数据Spark平台5-2、spark-streaming

你挡我一时,挡不了我一世

发布日期: 2019-04-08 17:49:12 浏览量: 762
评分:
star star star star star star star star star star_border
*转载请注明来自write-bug.com

前文链接:https://write-bug.com/article/2091.html

Spark Streaming

官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html

在之前我们已经对Storm流式计算框架和Spark-core核心计算引擎进行了介绍,以此为基础更好理解SparkStreaming。

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。

对于大多数业务而言,这两种并没有很大差别:

  • Storm的数据是类似水流式的流转数据,毫秒级别

  • Spark Streaming的数据是类似离散化后的水状数据,秒级别

数据来源:

处理后的数据可以推送到文件系统,数据库和实时仪表板。

在Spark-core中我们主要处理的其实就是不同的RDD算子(Transformation&action),实行懒惰机制。

在这里我们的action类算子换为了output类算子,其实开发的时候也是有细微的不同,Spark Streaming针对Dstream开发处理结构。
Dstream:每个RDD包含特定的时间间隔。

这里文字说可能不太明白,这一张图就能直接明了。

Spark Streaming接收实时输入数据流并将数据分成批处理,然后由Spark引擎处理以批量生成最终结果流。

在这里与其说是对Dstream做开发不如说是把不同的RDD排列组合,再在不同的入口处声明批次秒数生成新的Dstream罢了。(即DStream中的各种操作是可以映射到内部的RDD上运行的)

在transformation算子中Spark提供了窗口操作,例如:

统计最近一个小时的PV量,要求每分钟更新。

参数

  • window length - The duration of the window (3 in the figure).

  • sliding interval - The interval at which the window operation is performed (2 in the figure).

output内分两种算子

  • 执行算子:foreachRDD:主要负责对接外部开发,Hbase,Kafka,Hive

  • 输出算子:print()、saveAsTextFiles(prefix, [suffix])、saveAsObjectFiles(prefix, [suffix])、saveAsHadoopFiles(prefix, [suffix])

Streaming架构

  • master:分配任务(画Graph蓝图)

  • worker:处理任务(接收、发送)

  • client:喂数据

模式

  • Recevier:被动接收数据-异步两线程

  • direct:主动拉数据-同步

容错—WAL

数据流从右侧进入,先在内存中顺序存储(有offset偏移量),再同步存储在磁盘文件系统中(如HDFS),executor处理后再把存储的元数据发送给AM,AM得到请求后并且已知数据存储结构就可以通过SSC入口处理数据,处理数据之前根据元数据把内存中的数据先同步过来,处理数据时可能有数据落地需求;与此同时,元数据结构和数据checkpoint形式存储在文件系统中比便数据恢复。

重启恢复

实践

1.word count:

无状态:不记录上一批次数据

  1. //定义入口,初始化配置
  2. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("wordCount")
  3. val ssc = new StreamingContext(sparkConf, Seconds(5))
  4. //和之前的Spark-core入口不同,并且参数多了个批次时间设置
  5. ssc.checkpoint("hdfs://master:9000/hdfs_checkpoint")
  6. //可设置checkpoint点以便恢复数据

在context被初始化后,你还需要做如下几点:

  • 通过input DStream来定义输入源

  • 通过DStream的转换操作和输出操作来定义流数据处理规则

  • 开始接受数据并处理:ssc.start()

  • 等待流处理终止(或者出现异常退出):
    ssc.awaitTermination()

  • 手动终止流处理:ssc.stop()

  1. //这里从一个TCP数据源接收流式数据,在这里我们需要指定主机和端口。还指定了存储等级:内存、磁盘、序列
  2. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
  1. //处理数据逻辑后想要真正开始启动任务调用的方法
  2. ssc.start()
  3. ssc.awaitTermination()

我们用netcat模拟服务器发送数据:

  1. //监听模式向9999端口发送数据
  2. nc -l 9999

有状态的:记录上批次数据并累加

  1. //改变原来的reducebykey为updateStateByKey(updateFunction _)
  2. //updateFunction为自己开发的函数,即把前面批次数据和后面批次数据相加
  3. def updateFunction(currentValues: Seq[Int], preValues: Option[Int]): Option[Int] = {
  4. //Scala Option(选项)类型用来表示一个值是可选的(有值或无值)。
  5. //Option[T] 是一个类型为 T 的可选值的容器: 如果值存在, Option[T] 就是一个 Some[T] 即成功返回T类型,如果不存在, Option[T] 就是对象 None 。
  6. val current = currentValues.sum
  7. val pre = preValues.getOrElse(0)// getOrElse() 方法来获取元组中存在的元素或者使用其默认的值
  8. Some(current + pre)//Scala数据类型
  9. }

2.时间窗口

这里只需要把我们的reducebykey算子改为:

  1. reduceByKeyAndWindow((v1: Int, v2:Int) => v1 + v2, Seconds(30), Seconds(10))

每10秒更新一次数据,更新最近30秒钟的结果,后面10秒参数要和前面设置的批次时间相同;如果批次时间小于10秒,则更新数据时间和批次数据无关,如果大于10秒,则无论时间窗口怎样更新数据,都不会显示。

3.Kafka+Streaming -wordcount

conf 加:set(“spark.cores.max”, “8”)

Recevier模式:

如果改为local【1】个线程,将不会正常工作出结果。

在有状态的基础上添加:

  1. //ReceiverInputDStream类型
  2. val zkQuorum = "master:2181,slave1:2181,slave2:2181"
  3. val groupId = "group_1"
  4. val topicAndLine: ReceiverInputDStream[(String, String)] =
  5. KafkaUtils.createStream(ssc, zkQuorum, groupId,
  6. Map("topic_1013" -> 1), StorageLevel.MEMORY_AND_DISK_SER)
  7. val lines: DStream[String] = topicAndLine.map{ x =>
  8. x._2
  9. }

KafkaUtils.createDstream

构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] )

使用了receivers来接收数据,利用的是Kafka高层次的消费者api,对于所有的receivers接收到的数据将会保存在spark executors中,然后通过Spark Streaming启动job来处理这些数据,默认会丢失,可启用WAL日志,该日志存储在HDFS上

  • 创建一个receiver来对kafka进行定时拉取数据,ssc的rdd分区和kafka的topic分区不是一个概念,故如果增加特定主体分区数仅仅是增加一个receiver中消费topic的线程数,并不增加spark的并行处理数据数量

  • 对于不同的group和topic可以使用多个receivers创建不同的DStream

  • 如果启用了WAL,需要设置存储级别,即KafkaUtils.createStream(….,StorageLevel.MEMORY_AND_DISK_SER)

Direct模式:

  1. val brokers = "192.168.88.101:9092";
  2. val topics = "topic_1013";
  3. val topicSet = topics.split(",").toSet
  4. val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
  5. val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

KafkaUtils.createDirectStream

区别Receiver接收数据,这种方式定期地从kafka的topic+partition中查询最新的偏移量,再根据偏移量范围在每个batch里面处理数据,使用的是kafka的简单消费者api

优点:

  • 简化并行,不需要多个kafka输入流,该方法将会创建和kafka分区一样的rdd个数,而且会从kafka并行读取。

  • 高效,这种方式并不需要WAL,WAL模式需要对数据复制两次,第一次是被kafka复制,另一次是写到wal中

  • 恰好一次语义(Exactly-once-semantics),传统的读取kafka数据是通过kafka高层次api把偏移量写入zookeeper中,存在数据丢失的可能性是zookeeper中和ssc的偏移量不一致。EOS通过实现kafka低层次api,偏移量仅仅被ssc保存在checkpoint中,消除了zk和ssc偏移量不一致的问题。缺点是无法使用基于zookeeper的kafka监控工具

数据挤压问题:

数据挤压:下游处理速度慢(并发不够、处理速度慢)

kafka -> streaming

  • 数据分布,调节offset——紧急

  • 并发调大,需要kafka配合(增加分区数),提高线程数量

  • 控制批次的规模—— max.poll.records

  • 控制数据处理时间(timeout)—— max.poll.interval.ms

4.Kafka+Streaming+Kafka

上面对接数据后,这里后面要对接外部服务,用到了前面说的执行算子foreachRdd:

  1. val array = ArrayBuffer[String]()
  2. lines.foreachRDD(rdd => {//遍历每批次数据
  3. val count = rdd.count().toInt
  4. rdd.take(count + 1).take(count).foreach(x => {//遍历每条数据
  5. array += x + "--read"
  6. })
  7. ProducerSender(array)
  8. array.clear()
  9. })

自己为producer发送数据:

  1. def ProducerSender(args: ArrayBuffer[String]): Unit = {
  2. if (args != null) {
  3. val brokers = "192.168.88.101:9092"
  4. // Zookeeper connection properties
  5. val props = new HashMap[String, Object]()
  6. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
  7. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  8. "org.apache.kafka.common.serialization.StringSerializer")
  9. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  10. "org.apache.kafka.common.serialization.StringSerializer")
  11. val producer = new KafkaProducer[String, String](props)
  12. val topic = "topic_1013"//可输出到不同的topic,相同会有好玩现象
  13. // Send some messages
  14. for (arg <- args) {
  15. println("i have send message: " + arg)
  16. val message = new ProducerRecord[String, String](topic, null, arg)
  17. producer.send(message)
  18. }
  19. Thread.sleep(500)
  20. producer.close()
  21. }

5.Kafka+Streaming+Hbase
HBase 配置:

  1. object HbaseHandler {
  2. def insert(row: String, column: String, value: String) {
  3. // Hbase配置
  4. val tableName = "sparkstream_kafkahbase_table" // 定义表名
  5. val hbaseConf = HBaseConfiguration.create()
  6. hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
  7. hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
  8. hbaseConf.set("hbase.defaults.for.version.skip", "true")
  9. val hTable = new HTable(hbaseConf, tableName)
  10. val thePut = new Put(row.getBytes)
  11. thePut.add("info".getBytes,column.getBytes,value.getBytes)
  12. hTable.setAutoFlush(false, false)
  13. // 写入数据缓存
  14. hTable.setWriteBufferSize(3*1024*1024)
  15. hTable.put(thePut)
  16. // 提交
  17. hTable.flushCommits()
  18. }
  19. }
  1. val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
  2. val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
  3. val line = lines.flatMap(_.split("\n"))
  4. val words = line.map(_.split("\\|"))
  5. words.foreachRDD(rdd => {//遍历批次数据
  6. rdd.foreachPartition(partitionOfRecords => {//遍历Kafka分区数据
  7. partitionOfRecords.foreach(pair => {//遍历每条记录
  8. val key = pair(0)
  9. val col = pair(1)
  10. val value = pair(2)
  11. println(key + "_" + col + " : " + value)
  12. HbaseHandler.insert(key, col, value)
  13. })
  14. })
  15. })

运行模式:

  • idea中:注意pom中指定好版本文件

  • Linux终端中:
    /bin/spark-submit —master local[2] (代码中也可指定), —class classname jar-path IP port
    Linux文件重定向:bash run.sh 1>1.log 2>2.log 便于查看数据

  • Linux终端中Standard: —master spark: //master: 7077

  • Linux终端中yarn模式: —master yarn-cluster

后两种可指定参数:

  1. --num-executors 2 \
  2. --executor-memory 1g \
  3. --executor-cores 2 \
  4. --driver-memory 1g \
上传的附件
最近文章
eject