大数据 Spark平台5-3、spark-sql

你挡我一时,挡不了我一世

发布日期: 2019-04-10 15:19:41 浏览量: 832
评分:
star star star star star star star star star star_border
*转载请注明来自write-bug.com

前文链接:https://write-bug.com/article/2361.html

Spark-Sql

官网:http://spark.apache.org/docs/latest/sql-getting-started.html

这里对Spark家族进一步介绍,偏入门实践,优化概念会少一些。

我们在学习Hive时,本质上是把sql语句转换成MR job,节省工作时间,提升效率,其在存储数据时,分为这几个层次:table / partition / bucket / hdfs

spark sql同样也处理结构化数据,把数据源传来的数据用表格的形式解析并维护起来,与此同时也可和Hive结合使用(数据存储在Hive中)

在spark streaming中我们通常开发一个模板——Dstream, SparkStreamingContext

spark SQL同样也有类似的概念——DataFrame, 当成一个table(关系型表格)

  • 外部数据源(SQLContext):HDFS、网络接口、Mysql等

  • Hive数据源(HiveContext):Hive

两者关系:HiveContext继承自SQLContext,HiveContext只能支持HQL,SQLContext支持语法更多

DataFrame(由很多RDD组成)让数据处理更为简单,是一个分布式的Table

  • 与RDD区别:传统RDD以行为单位读数据,DataFrame基于列的内部优化

  • 与RDD相同点:都具备懒惰机制(基于RDD的抽象)

数据格式

Spark SQL处理核心:Catalyst工作流程(本质:把sql、dataframe相结合,以树tree的形式来存储、优化)

流程

把sql语句和Dataframe输入形成未解析的逻辑计划,加上元数据Catalog的支持,形成逻辑计划,再经过优化,形成物理计划,最后再通过代价模型转化成可执行代码。

优化点

  • 基于规则

    • 一种经验式、启发式优化思路(如sql语句优化)
    • join算子——两张表做join
      • 外排
        • 大循环外排:A、B,两张表都很大,O(M*N)——不用
        • 游标式外排:归并排序(指针滑动比大小)
      • 内排:小表放内存,大表做遍历(hive中的mapside join)
  • 基于代价

    • 评估每种策略选择的代价,根据代价估算,确定代价最小的方案
    • 代价估算模型——调整join的顺序,减少中间shuffle数据的规模

catalyst工作流程

  • parser:针对sql解析

  • 词法分析:讲输入的sql语句串解析为一个一个的token

  • 语法分析:再词法分析基础上,将单词序列组合成各类语法短语,组成各个LogicPlan

  1. SELECT sum(v)
  2. FROM(
  3. SELECT score.id, 100+80+score.math_score+score.english_score AS v
  4. FROM people JOIN score
  5. WHERE people.id=score.id
  6. AND people.age>10
  7. ) a

解析sql

  • analyzer:借助元数据(catalog)解析
    根据元数据表解析为包含必要列的表,并且相应字段解析为相应的数据类型,相应的计算逻辑解析为对应的函数

解析成对应函数

  • optimizer:基于规则的优化策略
    经典规则:谓词下推(Predicate Pushdown)、常量累加(Constant Folding)和列值裁剪(Column Pruning)
    谓词下推:把过滤条件放在join之前执行

谓词下推

常量累加(180)、列值裁剪(提前过滤掉不用的列值字段):

  • 物理计划:基于代价的优化策略
    用物理操作算子产生一个或者多个物理计划。然后用cost模型选择一个物理计划。目前基于cost-based的优化仅仅用 于选择join算法:对已知的很小的relations,sparksql会选择使用spark的提供的点对点的广播功能实现Broadcast join

执行计划查看方式:

  • Spark网页sql

  • sql语句后面追加 . queryExecution方法查看

官方:catalyst优化引擎,执行时间减少75%

内存管理:Tungsten 内存管理器—— off-heap(堆外内存)

  • 本质:突破JVM内存管理限制,分配堆外内存(GC、与磁盘做交换dump)

  • JVM:GC带来时间开销,可能出现“假死”情况

实践

1、基本demo:

读数据:

1)从hdfs的原始text中读数据:sqlTest

  1. //建立学生表Scheme
  2. val StudentSchema: StructType = StructType(mutable.ArraySeq(
  3. StructField("Sno", StringType, nullable = false),
  4. StructField("Sname", StringType, nullable = false),
  5. StructField("Ssex", StringType, nullable = false),
  6. StructField("Sbirthday", StringType, nullable = true),
  7. StructField("SClass", StringType, nullable = true)
  8. ))
  9. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sqltest")
  10. val sc = new SparkContext(sparkConf)
  11. //sqlContext入口
  12. val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  13. //RDD导入并解析数据
  14. val StudentData = sc.textFile("hdfs://master:9000/sql_stu.data").map{
  15. lines =>
  16. val line = lines.split(",")
  17. Row(line(0),line(1),line(2),line(3),line(4))
  18. }
  19. //把RDD数据和Schema通过sqlContext维护起来形成DataFrame
  20. val StudentTable = sqlContext.createDataFrame(StudentData, StudentSchema)
  21. StudentTable.registerTempTable("Student")//表名
  22. //sql语句使用
  23. sqlContext.sql("SELECT Sname, Ssex, SClass FROM Student").show()

2)从hdfs的原始text中读数据(json串):sqlJsonText

  1. val personInfo = sqlContext.read.json("hdfs://master:9000/person_info.json")
  2. //json串中的数据都是维护好的数据,不需要schema
  3. personInfo.registerTempTable("personInfo")
  4. sqlContext.sql("SELECT id, name, age FROM personInfo").show()
  5. println(personInfo.schema)

3)从hive中读数据:sqlHiveTest

启动mysql:]# systemctl start mariadb(hive中元数据)

终端submit需jar包:lib/mysql-connector-java-5.1.41-bin.jar

  1. val hiveContext = new HiveContext(sc)
  2. hiveContext.table("movie_table").registerTempTable("movie_table")
  3. //可对现有表直接进行操作
  4. hiveContext.sql("SELECT movieid, title FROM movie_table").show()

2、UDF相关操作:

1)udf:单条记录处理(map):sqlUdf

  1. sqlContext.udf.register("strlen", (input: String) => input.length)
  2. //函数名字注册,及简单实现功能
  3. val personInfo = sqlContext.read.json("hdfs://master:9000/person_info.json")
  4. personInfo.registerTempTable("personInfo")
  5. sqlContext.sql("SELECT id, name, strlen(name), age FROM personInfo").show()//字段套用函数

2)udaf:聚合场景(groupby)

例子:每一个打分背后,有多少人参与

  1. class WordcountUdaf extends UserDefinedAggregateFunction {
  2. // 该方法指定具体输入数据的类型
  3. override def inputSchema: StructType = StructType(Array(StructField("input", StringType, true)))
  4. //在进行聚合操作的时候所要处理的数据的结果的类型
  5. override def bufferSchema: StructType = StructType(Array(StructField("count", IntegerType, true)))
  6. //指定UDAF函数计算后返回的结果类型
  7. override def dataType: DataType = IntegerType
  8. // 确保一致性 一般用true
  9. override def deterministic: Boolean = true
  10. //在Aggregate之前每组数据的初始化结果
  11. override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) =0}
  12. // 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
  13. // 本地的聚合操作,相当于Hadoop MapReduce模型中的Combiner
  14. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
  15. buffer(0) = buffer.getAs[Int](0) + 1
  16. }
  17. //最后在分布式节点进行Local Reduce完成后需要进行全局级别的Merge操作
  18. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  19. buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)
  20. }
  21. //返回UDAF最后的计算结果
  22. override def evaluate(buffer: Row): Any = buffer.getAs[Int](0)
  23. }
  1. val hiveContext = new HiveContext(sc)
  2. hiveContext.table("rating_table").registerTempTable("rating_table")
  3. hiveContext.udf.register("strlen", (input: String) => input.length)
  4. hiveContext.udf.register("wordCount", new WordcountUdaf)
  5. //注册
  6. hiveContext.sql("select rating, wordCount(rating) as count, strlen(rating) as length" +
  7. " from rating_table group by rating").show()
  8. //这里两函数可做个对比

3、终端

  1. /usr/local/src/spark-2.0.2-bin-hadoop2.6/bin/spark-sql \
  2. --master local[2] \
  3. --jars /usr/local/src/spark-2.0.2-bin-hadoop2.6/lib/mysql-connector-java-5.1.41-bin.jar

测试与hive数据:

  1. select rating, count(*) from rating_table_ex group by rating limit 100;

4、streaming+sql:sqlAndStreamingWC

  1. nc -l 9999
  1. //单例模式
  2. object SQLContextSingleton {
  3. @transient private var instance: SQLContext = _
  4. def getInstance(sparkContext: SparkContext): SQLContext = {
  5. if (instance == null) {
  6. instance = new SQLContext(sparkContext)
  7. }
  8. instance
  9. }
  10. }
  1. if (args.length < 2) {
  2. System.err.println("Usage: NetworkWordCount <hostname> <port>")
  3. System.exit(1)
  4. }
  5. val sparkConf = new SparkConf().setMaster("local[2]").setAppName("sqlAndStreamingWC")
  6. val sc = new SparkContext(sparkConf)
  7. val ssc = new StreamingContext(sc, Seconds(30))
  8. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
  9. val words = lines.flatMap(_.split(" "))
  10. words.foreachRDD((rdd: RDD[String], time: Time) => {
  11. val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  12. import sqlContext.implicits._
  13. //每条数据都用一个对象操作内存,复用性
  14. val wordsDataFrame = rdd.map(w => Record(w)).toDF()
  15. wordsDataFrame.registerTempTable("words")
  16. val wordCountsDataFrame =
  17. sqlContext.sql("select word, count(*) as total from words group by word")
  18. println(s"========= $time =========")
  19. wordCountsDataFrame.show()
  20. })
  21. ssc.start()
  22. ssc.awaitTermination()

5、streaming+sql + hbase:streamSqlHbase

  1. nc -l 9999
  1. object HbaseHandler {
  2. def insert(row: String, column: String, value: String) {
  3. // Hbase配置
  4. val tableName = "sparkstream_kafkahbase_table" // 定义表名
  5. val hbaseConf = HBaseConfiguration.create()
  6. hbaseConf.set("hbase.zookeeper.quorum", "master,slave1,slave2")
  7. hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
  8. hbaseConf.set("hbase.defaults.for.version.skip", "true")
  9. val hTable = new HTable(hbaseConf, tableName)
  10. val thePut = new Put(row.getBytes)
  11. thePut.add("info".getBytes,column.getBytes,value.getBytes)
  12. hTable.setAutoFlush(false, false)
  13. // 写入数据缓存
  14. hTable.setWriteBufferSize(3*1024*1024)
  15. hTable.put(thePut)
  16. // 提交
  17. hTable.flushCommits()
  18. }
  19. }
  1. //从套接字中读取到的信息遍历解析
  2. lines.foreachRDD((rdd: RDD[String], time: Time) => {
  3. val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  4. import sqlContext.implicits._
  5. val wordsDataFrame = rdd.map{ x=>
  6. (x.split(" ")(0),x.split(" ")(1),x.split(" ")(2))
  7. }.map(w => (w._1, w._2, w._3)).toDF("key", "col", "val")
  8. wordsDataFrame.registerTempTable("words")
  9. val wordCountsDataFrame =
  10. sqlContext.sql("select key, col, val from words")
  11. println(s"========= $time =========")
  12. wordCountsDataFrame.show()
  13. //对dataframe行遍历插入
  14. wordCountsDataFrame.foreach(x => HbaseHandler.insert(
  15. x.getAs[String]("key"),
  16. x.getAs[String]("col"),
  17. x.getAs[String]("val")))
  18. })
上传的附件
最近文章
eject