spark在什么时候会使用内存存储处理spark读取数据库库

&&国之画&&&&&&
版权所有 京ICP备号-2
迷上了代码!Tachyon:Spark生态系统中的分布式内存文件系统
发表于 17:05|
来源《程序员》电子刊|
作者刘少山
摘要:Tachyon把内存存储的功能从Spark中分离出来, 使Spark可以更专注计算的本身, 以求通过更细的分工达到更高的执行效率。
Tachyon是Spark生态系统内快速崛起的一个新项目。 本质上, Tachyon是个分布式的内存文件系统, 它在减轻Spark内存压力的同时,也赋予了Spark内存快速大量数据读写的能力。Tachyon把内存存储的功能从Spark中分离出来, 使Spark可以更专注计算的本身, 以求通过更细的分工达到更高的执行效率。 本文将先向读者介绍Tachyon在Spark生态系统中的使用, 也将分享百度在大数据平台上利用Tachyon取得的性能改善的用例,以及在实际使用Tachyon过程中遇到的一些问题和解决方案。最后我们将介绍一下Tachyon的一些新功能。 &Tachyon简介Spark平台以分布式内存计算的模式达到更高的计算性能,在最近引起了业界的广泛关注,其开源社区也十分活跃。以百度为例,在百度内部计算平台已经搭建并运行了千台规模的Spark计算集群,百度也通过其BMR的开放云平台对外提供Spark计算平台服务。然而,分布式内存计算的模式也是一柄双刃剑,在提高性能的同时不得不面对分布式数据存储所产生的问题,具体问题主要有以下几个:当两个Spark作业需要共享数据时,必须通过写磁盘操作。比如:作业1要先把生成的数据写入HDFS,然后作业2再从HDFS把数据读出来。在此,磁盘的读写可能造成性能瓶颈。由于Spark会利用自身的JVM对数据进行缓存,当Spark程序崩溃时,JVM进程退出,所缓存数据也随之丢失,因此在工作重启时又需要从HDFS把数据再次读出。当两个Spark作业需操作相同的数据时,每个作业的JVM都需要缓存一份数据,不但造成资源浪费,也极易引发频繁的垃圾收集,造成性能的降低。仔细分析这些问题后,可以确认问题的根源来自于数据存储,由于计算平台尝试自行进行存储管理,以至于Spark不能专注于计算本身,造成整体执行效率的降低。Tachyon的提出就是为了解决这些问题:本质上,Tachyon是个分布式的内存文件系统,它在减轻Spark内存压力的同时赋予了Spark内存快速大量数据读写的能力。Tachyon把存储与数据读写的功能从Spark中分离,使得Spark更专注在计算的本身,以求通过更细的分工达到更高的执行效率。图1: Tachyon的部署图1显示了Tachyon的部署结构。Tachyon被部署在计算平台(Spark,MR)之下以及存储平台(HDFS, S3)之上,通过全局地隔离计算平台与存储平台, Tachyon可以有效地解决上文列举的几个问题,:当两个Spark作业需要共享数据时,无需再通过写磁盘,而是借助Tachyon进行内存读写,从而提高计算效率。在使用Tachyon对数据进行缓存后,即便在Spark程序崩溃JVM进程退出后,所缓存数据也不会丢失。这样,Spark工作重启时可以直接从Tachyon内存读取数据了。当两个Spark作业需要操作相同的数据时,它们可以直接从Tachyon获取,并不需要各自缓存一份数据,从而降低JVM内存压力,减少垃圾收集发生的频率。Tachyon系统架构在上一章我们介绍了Tachyon的设计,本章我们来简单看看Tachyon的系统架构以及实现。 图2显示了Tachyon在Spark平台的部署:总的来说,Tachyon有三个主要的部件:Master, Client,与Worker。在每个Spark Worker节点上,都部署了一个Tachyon Worker,Spark Worker通过Tachyon Client访问Tachyon进行数据读写。所有的Tachyon Worker都被Tachyon Master所管理,Tachyon Master通过Tachyon Worker定时发出的心跳来判断Worker是否已经崩溃以及每个Worker剩余的内存空间量。图2: Tachyon在Spark平台的部署图3显示了Tachyon Master的结构,其主要功能如下:首先,Tachyon Master是个主管理器,处理从各个Client发出的请求,这一系列的工作由Service Handler来完成。这些请求包括:获取Worker的信息,读取File的Block信息, 创建File等等;其次,Tachyon Master是个Name Node,存放着所有文件的信息,每个文件的信息都被封装成一个Inode,每个Inode都记录着属于这个文件的所有Block信息。在Tachyon中,Block是文件系统存储的最小单位,假设每个Block是256MB,如果有一个文件的大小是1GB,那么这个文件会被切为4个Block。每个Block可能存在多个副本,被存储在多个Tachyon Worker中,因此Master里面也必须记录每个Block被存储的Worker地址;第三,Tachyon Master同时管理着所有的Worker,Worker会定时向Master发送心跳通知本次活跃状态以及剩余存储空间。Master是通过Master Worker Info去记录每个Worker的上次心跳时间,已使用的内存空间,以及总存储空间等信息。&图3: Tachyon的Master设计图4显示了Tachyon Worker的结构,它主要负责存储管理:首先,Tachyon Worker的Service Handler处理来自Client发来的请求,这些请求包括:读取某个Block的信息,缓存某个Block,锁住某个Block,向本地内存存储要求空间等等。第二,Tachyon Worker的主要部件是Worker Storage,其作用是管理Local Data(本地的内存文件系统)以及Under File System(Tachyon以下的磁盘文件系统,比如HDFS)。第三,Tachyon Worker还有个Data Server以便处理其他的Client对其发起的数据读写请求。当由请求达到时,Tachyon会先在本地的内存存储找数据,如果没有找到则会尝试去其他的Tachyon Worker的内存存储中进行查找。如果数据完全不在Tachyon里,则需要通过Under File System的接口去磁盘文件系统(HDFS)中读取。图4: Tachyon的Worker设计图5显示了Tachyon Client的结构,它主要功能是向用户抽象一个文件系统接口以屏蔽掉底层实现细节。首先,Tachyon Client会通过Master Client部件跟Tachyon Master交互,比如可以向Tachyon Master查询某个文件的某个Block在哪里。Tachyon Client也会通过Worker Client部件跟Tachyon Worker交互, 比如向某个Tachyon Worker请求存储空间。在Tachyon Client实现中最主要的是Tachyon File这个部件。在Tachyon File下实现了Block Out Stream,其主要用于写本地内存文件;实现了Block In Stream主要负责读内存文件。在Block In Stream内包含了两个不同的实现:Local Block In Stream主要是用来读本地的内存文件,而Remote Block In Stream主要是读非本地的内存文件。请注意,非本地可以是在其它的Tachyon Worker的内存文件里,也可以是在Under File System的文件里。图5: Tachyon的Client设计现在我们通过一个简单的场景把各个部件都串起来:假设一个Spark作业发起了一个读请求,它首先会通过Tachyon Client去Tachyon Master查询所需要的Block所在的位置。如果所在的Block不在本地的Tachyon Worker里,此Client则会通过Remote Block In Stream向别的Tachyon Worker发出读请求,同时在Block读入的过程中,Client也会通过Block Out Stream把Block写入到本地的内存存储里,这样就可以保证下次同样的请求可以由本机完成。Tachyon在百度内部的使用在百度内部,我们使用Spark SQL进行大数据分析工作, 由于Spark是个基于内存的计算平台,我们预计绝大部分的数据查询应该在几秒或者十几秒完成以达到互动查询的目的。可是在Spark计算平台的运行中,我们却发现查询都需要上百秒才能完成,其原因如图6所示:我们的计算资源(Data Center 1)与数据仓库(Data Center 2)可能并不在同一个数据中心里面,在这种情况下,我们每一次数据查询都可能需要从远端的数据中心读取数据,由于数据中心间的网络带宽以及延时的问题,导致每次查询都需要较长的时间(&100秒)才能完成。更糟糕的是,很多查询的重复性很高,同样的数据很可能会被查询多次,如果每次都从远端的数据中心读取,必然造成资源浪费。&为了解决这个问题,我们借助Tachyon把数据缓存在本地,尽量避免跨数据中心调数据。当Tachyon被部署到Spark所在的数据中心后,每次数据冷查询时,我们还是从远端数据仓库拉数据,但是当数据再次被查询时,Spark将从同一数据中心的Tachyon中读取数据,从而提高查询性能。实验表明:如果从非本机的Tachyon读取数据,耗时降到10到15秒,比原来的性能提高了10倍;最好的情况下,如果从本机的Tachyon读数据,查询仅需5秒,比原来的性能提高了30倍,效果相当明显。在使用了这个优化后,热查询性能达到了互动查询的要求,可是冷查询的用户体验还是很差。分析了用户行为后,我们发现用户查询的模式比较固定:比如很多用户每天都会跑同一个查询,只是所使用过滤数据的日期会发生改变。借助这次特性,我们可以根据用户的需求进行线下预查询,提前把所需要的数据导入Tachyon,从而避免用户冷查询。图6: Tachyon在百度大数据平台的部署在使用Tachyon过程中,我们也遇到了一些问题:在刚开始部署Tachyon的时候, 我们发现数据完全不能被缓存,第一次与后续的查询耗时是一样的。如图7的源代码所示:只有整个数据Block被读取后,这个Block才会被缓存住;否则缓存的操作会被取消。比如一个Block是256MB,如果你读了其中的255MB,这个Block还是不会被缓存,因为它只需读取整个block中的部分数据。在百度内部,我们很多数据是用行列式存储的,比如ORC与Parquet文件,每次查询只会读其中的某几列, 因此不会读取完整的Block, 以致block缓存失败。为了解决这个问题,我们对Tachyon进行了修改,如果数据Block不是太大的话,冷查询时即使用户请求的只是其中几列,我们也会把整个Block都读进来,保证整个Block能被缓存住,然后再次查询的话就可以直接从Tachyon读取了。在使用了修改的版本后,Tachyon达到了我们期待的效果,大部分查询可以在10秒内完成。&图7: Tachyon缓存数据逻辑Tachyon的一些新功能我们把Tachyon当作缓存来使用,但是每台机器的内存有限,内存很快会被用完。 如果我们有50台机器,每台分配20GB的内存给Tachyon,那么总共也只有1TB的缓存空间,远远不能满足我们的需要。在Tachyon最新版本有一个新的功能: Hierarchical Storage,即使用不同的存储媒介对数据分层次缓存。如图8所示,它类于CPU的缓存设计:内存的读写速度最快所以可以用于第0级缓存,然后SSD可以用于第1级缓存,最后本地磁盘可以作为底层缓存。这样的设计可以为我们提供更大的缓存空间,同样50台机器,现在我们每台可贡献出20TB的缓存空间,使总缓存空间达到1PB,基本可以满足我们的储存需求。与CPU缓存类似,如果Tachyon的block Replacement Policy设计得当,99%的请求可以被第0级缓存(内存)所满足,从而在绝大部分时间可以做到秒级响应。图8: Tachyon Hierarchical Storage当Tachyon收到读请求时,它首先检查数据是否在第0层,如果命中,直接返回数据,否则它会查询下一层缓存,直到找到被请求的数据为止。数据找到后会直接返回给用户,同时也会被Promote到第0层缓存,然后第0层被替换的数据Block会被LRU算法置换到下一层缓存。如此一来,如果用户再次请求相同的数据就会直接从第0层快速得到,从而充分发挥缓存的Locality特性。&当Tachyon收到写请求时,它首先检查第0层是否有足够空间,如果有,则直接写入数据后返回。否则它会查询下一层缓存,直到找到一层缓存有足够空间,然后把上一层的一个Block用LRU算法推到下一层,如此类推,直到把第0层有足够空间以写入新的数据,然后再返回。这么做的目的是保证数据被写入第0层,如果读请求马上发生在写请求后,数据可以快速被读取。可是,这样做的话写的性能有可能变的很差:比如头两层缓存都满的话,它需要把一个Block从第1层丢到第2层,再把一个Block从第0层丢到第1层,然后才能写数据到第0层,再返回给用户。对此我们做了个优化, 与其层层类推腾出空间,我们的算法直接把数据写入有足够空间的缓存层,然后快速返回给用户。如果缓存全满,则把底层的一个Block置换掉,然后把数据写入底层缓存后返回。经过实验,我们发现优化后的做法会把写延时降低约50%,大大的提高了写的效率。但是读的效率又如何呢,由于在TACHYON里,写是通过Memory-Mapped File进行的,所以是先写入内存,再Flush到磁盘,如果读是马上发生在写之后的话,其实会从操作系统的Buffer,也就是内存里读数据,因此读的性能也不会下降。Hierarchical Storage很好地解决了我们缓存不够用的问题,下一步我们将继续对其进行优化。比如,现在它只有LRU一种置换算法,并不能满足所有的应用场景, 我们将针对不同的场景设计更高效的置换算法,尽量提高缓存命中率。结语我个人相信更细的分工会达到更高的效率,Spark作为一个内存计算平台,如果使用过多的资源去缓存数据,会引发频繁的垃圾收集,造成系统的不稳定,或者影响性能。在我们使用Spark的初期,系统不稳定是我们面临的最大挑战,而频繁的垃圾收集正是引起系统不稳定最大的原因。比如当一次垃圾收集耗时过长时,Spark Worker变的响应非常不及时,很容易被误认为已经崩溃,导致任务重新执行。Tachyon通过把内存存储的功能从Spark中分离出来,让Spark更专注在计算本身,从而很好的解决了这个问题。随着内存变的越来越便宜,我们可以预期未来一段时间里,我们的服务器里可使用的内存会不断增长,Tachyon会在大数据平台中发挥越来越重要的作用。现在还是Tachyon发展的初期,在本文完成时Tachyon才准备发布0.6版,还有很多功能亟需完善,这也是一个好机遇,有兴趣的同学们可以多关注Tachyon,到社区里进行技术讨论以及功能开发。刘少山百度美国硅谷研发中心高级架构师,主要研究方向分布式系统以及大数据计算与存储平台。
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章Spark 官方文档(5)——Spark SQL,DataFrames和Datasets 指南
Spark版本:1.6.2
Spark SQL用于处理结构化数据,与Spark RDD API不同,它提供更多关于数据结构信息和计算任务运行信息的接口,Spark SQL内部使用这些额外的信息完成特殊优化。可以通过SQL、DataFrames API、Datasets API与Spark SQL进行交互,无论使用何种方式,SparkSQL使用统一的执行引擎记性处理。用户可以根据自己喜好,在不同API中选择合适的进行处理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中执行。
执行SQL语句的方法有多种:
可以使用基础SQL语法或HiveQL语法在Spark SQL上执行查询,SparkSQL可以从已安装的Hive中读取数据。当使用其他编程语言时,结果集以DataFrame类型返回
通过SQL命令行进行交互(spark-sql)
可以通过JDBC/ODBC驱动进行交互
DataFrames
DataFrame是由分布式数据集合组成的一系列命名列,它与关系数据库的表类似,但有很多优化的地方。DataFrame支持多种数据源,包括结构化数据、Hive的表、外部数据库、RDDs等。DataFrame API支持scala 、java、Python和R语言。
数据集接口在Spark1.6才加入,它可以使用Spark SQL的优化器对RDD操作进行优化。Dataset有JVM对象构建,并可以进行map、flatMap、filter等操作。Dataset API统一接口支持java和scala语言。
程序入口: SQLContext
SQLContext是Spark SQL所有功能的入口,通过SparkContext可以创建该对象的实例:
val sc: SparkContext
除了SQLContext,还可以创建HiveContext对象,它包含更多的功能,例如HiveQL解析器支持更完善的语法、使用Hive用户自定义函数UDFs、从Hive表中读取数据等。HiveContext不依赖Hive是否安装,Spark默认支持HiveContext。从Spark1.3以后,推荐使用HiveContext,未来SQLContext会包含HiveContext中的功能。可以通过spark.sql.dialect选项更改SQL解析器,这个参数可以再SQLContext的setConf方法设置,也可以通过SQL的ky=value语法设计。在SQLContext中dialect只支持一种简单的SQL解析器&sql&。HiveContext默认解析器是&hiveql&,同时支持&sql&,但一般推荐hiveql,因为它语法更全。
创建DataFrames
DataFrames的数据源多种多样,例如RDD、Hive table或者其他数据源。 下面代码从JSON文件创建了一个DataFrame
JavaSparkContext sc = ...;
DataFrame 操作
DataFrame支持结构化数据领域常用的数据操作,支持Scala、Java、Python和R语言,下面是一些基本操作示例:
JavaSparkContext sc
对于DataFrame的所有操作类型可以参考。除了简单的列操作,DataFrame还支持字符串操作、日期算法、数据操作等等,可以参考
编码实现SQL查询
SQLContext的sql方法支持运行sql语法的查询,并返回DataFrame类型的结果集:
SQLContext sqlContext = ...
创建Datasets
Dataset与RDD类似,但它不适用java序列化也不适用Kryo,而是使用特定的Encoder作为序列化工具。Encoder可以对Spark对象进行序列化和反序列化,同时不需要反序列化在字节级别就能支持filtering、sorting和hashing等操作。
RDD交互操作
在Spark SQL中有两种方式可以在DataFrame和RDD进行转换,第一种方法是利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。第二种方法通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。
使用反射推断Schema
Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被利用反射机制作为列名。case class可以嵌套组合成Sequences或者Array。这种RDD可以高效的转换为DataFrame并注册为表。
编程指定schema
当case class不能提前定义好时,可以通过以下三步通过代码创建DataFrame
将RDD转为包含row对象的RDD
基于structType类型创建schema,与第一步创建的RDD相匹配
通过SQLContext的createDataFrame方法对第一步的RDD应用schema
DataFrame接口支持一系列的数据源,它可以按照普通RDD进行操作,也能被注册为临时表进行操作。注册临时表后可以使用SQL查询操作数据集,本章节介绍了常用加载保存数据的方法,同时给出了内部数据源的特殊操作。
常规Load/Save函数
未配置spark.sql.sources.default情况下,默认使用parquet数据源处理所有操作。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手动指定选项
用户可以手动指定数据源加载的选项,对于数据源类型需要使用完整名称指定例如(org.apache.spark.sql.parquet),但对于内部类型可以使用简称,例如(json parquet jdbc等)。可以通过以上方法在不同DataFrame之间进行转换。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
在文件上直接执行SQL
除了需要将文件加载到DataFrame再执行sql以外,还可以直接执行sql
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Save通过SaveMode指定如何维护现有的数据。需要注意的是savemode未对数据加锁,因而不是源自操作。若使用overwrite模式时,原有数据会先被清空。
Scala/JavaAny Language含义
SaveMode.ErrorIfExists (default)
"error" (default)
当数据输出的位置已存在时,抛出此异常
SaveMode.Append
当数据输出的位置已存在时,在文件后面追加
SaveMode.Overwrite
"overwrite"
当数据输出的位置已存在时,重写
SaveMode.Ignore
当数据输出的位置已存在时,不执行任何操作,与 CREATE IF NOT EXISTS类似
保存到持久化表中
使用HiveContext时,DataFrame可以使用saveAsTable方法保存到持久化表中。与registerTempTable不同,saveASTable会为其真正创建数据区并创建指向该区域的指针放入HiveMetaStore中。在持有同一个metastore的连接期间,持久化的数据会一直存在,即使spark程序重启也不影响。可以通过SQLContext的table方法创建用于持久化表的DataFrame。默认的saveASTable会创建&managed table&,其数据位置会被metastore维护,被管理的表数据会在表被删除时清空。
Parquet文件
parquet是一种流行的列式存储格式。SparkSQL支持对parquet的读写以及schema和数据的维护。在写parquet文件时,为了兼容,所有列都会转换为nullable格式。
编程实现数据加载
表分区是Hive等系统的常用优化手段。在一个分区表中,数据经常分布在不同目录下,分区列的值相同的数据分布在同一目录中。目前支持对parquet文件进行自动推断分区。例如我们可以将之前的数据增加两列gender和country,并将两列作为分区列进行数据分区。
└── table
├── gender=male
├── ...
├── country=US
└── data.parquet
├── country=CN
└── data.parquet
└── ...
└── gender=female
├── ...
├── country=US
└── data.parquet
├── country=CN
└── data.parquet
└── ...
将数据路径传给SQLContext后,可以自动推断DataFrame数据的分区信息。注意,数据的分区列是自动推断出来你的,目前分区列支持数值类型和string类型。若用户不希望自动推断分区列时,可以通过spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自动推断,此时会使用string类型列进行分区。 分区类型会根据传入的路径进行推断,但用户可以配置数据源的basePath属性设置分析的路径。
Schema合并
parquet支持列增加等操作,当出现多个互相兼容的schemas时,parquet可以自动检测并合并这些文件的schema。由于schema 合并会消耗大量的资源,默认关闭该操作,可以通过以下方法打开:
设置数据源mergeSchema属性为true
设置SQL的选项spark.sql.parquet.mergeSchema为true
Hive metasotre Parquet表转化
SparkSQL使用内部库而不是Hive SerDe,对Hive metasotre Parquet表进行读写,性能很好,可以通过spark.sql.hive.convertMetastoreParquet配置。
Hive/Parquet Schema Reconciliation
由于Hive和Parquet的元数据处理方式不同,如下所示
Hive忽略大小写,而Parquet没有
Hive所有字段都是nullable,而parquet中null是有意义的值(避免理解错误,贴上原文:Hive considers all columns nullable, while nullability in Parquet is significant)
将Hive metastore Parquet table转换为Spark SQL parquet表时,遵从以下规则:
相同名称的字段的数据类型必须相同,nullable类型被忽略。由于融合的数据类型需要在parquet中有对应的类型,所以nullability类型需要处理。
融合后schema中包含了Hive元数据中定义的值
任何只在Parquet schema中出现的字段被抛弃
任何旨在Hive元数据中出现的字段作为nullable增加到融合后元数据中
元数据刷新
Spark SQL会缓存parquet元数据以便提高性能。若Hive metastore Parquet table转换被启用,则转换的表元数据也会被cache。若这些元数据被外部工具修改,则需要手动更新缓存元数据保持一致性。
与parquet相关的配置参数如下所示
参数默认值描述
spark.sql.parquet.binaryAsString
该选项让SparkSQL将string安装二进制数据按照字符串处理,以便兼容老系统
spark.sql.parquet.int96AsTimestamp
Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.cacheMetadata
缓存Parquet的Schema元数据,提高查询静态数据效率
spark.pression.codec
设置Parquet文件的压缩编码方式,支持 uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdown
启用过滤谓词下推优化,将过滤下推到抽取数据时,取得性能的提升
spark.sql.hive.convertMetastoreParquet
若设为false,Spark SQL使用Hive SerDe支持对Parquet tables的操作.
spark.sql.mitter.class
org.apache.parquet.hadoop.ParquetOutputCommitter
The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter.
spark.sql.parquet.mergeSchema
是否开启Schema合并
JSON数据集
SQLContext.read.josn()接口可以自动推断JSON文件的schema。SparkSQL支持的JSON文件中每一行需要是一个完整的JSON对象,不支持跨行的json对象。
Spark SQL支持从Hive中读取数据,但由于Hive依赖过多,默认不支持Hive,需要在编译时添加-Phive -Phive-thriftserver选项。由于用到Hive的序列化和反序列化需要保证Hive包在各个worker中都存在。
将hive-site.xml、core-site.xml和hdfs-site.xml放入conf目录下配置Hive环境。在Yarn集群上面运行时,需要确定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通过spark-submit的--jars和--file参数检查是否存在。若通过Spark SQL操作Hive需要创建HiveContext,增加元数据功能及HiveQL支持。若没有部署Hive环境同样可以创建HiveContext。若没有在hive-site.xml中配置,会自动在当前目录创建metastore_db并在/user/hive/warehouse创建仓储目录,需要给hive对/user/hive/warehouse的写权限。
与不同版本Hive Metastore交互
由于Spark SQL可以与不同版本的Hive Metastor(而不是Hive的版本)进行交互,只需要修改部分的配置信息,相关配置如下:
属性默认值描述
spark.sql.hive.metastore.version
Hive metastore的版本信息,从0.12.0到1.2.1
spark.sql.hive.metastore.jars
指定metastore的Jar包位置,builtin:该jar被打包到spark应用程序中;maven:使用maven远程仓储下载;类路径:需要包含hive所有的依赖包
spark.sql.hive.metastore.sharedPrefixes
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc
一个逗号分隔的类名前缀列表,这些类使用classloader加载,且可以在Spark SQL和特定版本的Hive间共享。例如,用来访问hive metastore 的JDBC的driver就需要这种共享。其他需要共享的类,是与某些已经共享的类有交互的类。例如,自定义的log4j appender。
spark.sql.hive.metastore.barrierPrefixes
使用逗号分隔的类名前缀列表,Spark SQL所访问的每个Hive版本都会被显式的reload这些类。
JDBC连接其他数据库
SparkSQL通过JdbcRDD实现对支持jdbc的数据库进行数据加载,将其作为DataFrame进行操作。JDBC加载的数据源不需要提供classTag。使用前需要将JDBC Driver包含在spark的classpath中。例如连接postgres需要如下设置
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
数据库中的表可以作为DataFrame或SparkSQL的临时表加载,支持以下的选项:
JDBC连接URL
需要读取的JDBC表。任何在From子句中的元素都可以,例如表或者子查询等。
partitionColumn, lowerBound, upperBound, numPartitions
这些选项需要同时制定,他们制定了如何并发读取数据的同时进行分区。lowerBound, upperBound仅用于确定分区边界不用于过滤数据,所有数据都会被分区
决定了每次数据取多少行
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -& "jdbc:postgresql:dbserver",
"dbtable" -& "schema.tablename")).load()
JDBC的driver类需要在所有executor可见,因为Java的DriverManager会进行安全检查,忽略所有不可见的类。可以通过修改每个worker节点的compute_classpath.sh以便包含Jar包
有些数据库例如H2的名称是大写,需要在SparkSQL中同样使用大写
对于一些负载可以通过内存缓存数据或者调整参数提高性能。
内存缓存数据
Spark SQL可以通过sqlContext.cacheTable("tableName") 或 dataFrame.cache()接口将RDD数据缓存到内存中。SparkSql可以近扫描需要的列并自动压缩、进行垃圾回收等。可以通过sqlContext.uncacheTable("Tablename")从内存中移除表。
属性默认值描述
spark.pressed
若设为true,Spark SQL会基于列的统计数据自动选择压缩器进行数据压缩
spark.sql.inMemoryColumnarStorage.batchSize
控制列缓存的每批次的数据大小,数据越大则内存利用率及压缩比例越大,但OOM风险也越大
其他配置信息
可以通过修改以下配置提高查询执行的性能,以后可能会弃用以下设置,而变为自动进行最优化配置。
属性默认值描述
spark.sql.autoBroadcastJoinThreshold
配置做join操作时被广播变量的表的大小。当设为-1时禁用广播。目前只有Hive元数据支持统计信息,可以通过ANALYZE TABLE &tablename& COMPUTE STATISTICS进行信息统计
spark.sql.tungsten.enabled
若为true,或使用tungsten物理优化执行,显式地管理内存并动态生成表达式计算的字节码
spark.sql.shuffle.partitions
配置shuffle操作时的分区数量
分布式SQL引擎
当使用JDBC/ODBC或者命令行进行交互时,SparkSQL可以作为分布式查询引擎执行。在这种模式下,Spark SQL的应用能够不写代码便执行查询。
运行Thrift JDBC/ODBC驱动
这里的实现与HiveServer2类似,可以通过beeline测试Spakr或者Hive1.2.1的JDBC驱动。通过以下命令启动jdbc驱动
./sbin/start-thriftserver.sh
这脚本支持所有的spark-submit的参数,还支持--hiveconf指定特定的Hive属性。可以通过--help查看本脚本具体参数。默认server监听的端口是10000,可以覆盖一些环境变量:
export HIVE_SERVER2_THRIFT_PORT=
或者修改系统属性
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=&listening-port& \
--hiveconf hive.server2.thrift.bind.host=&listening-host& \
--master &master-uri&
可以通过beeline测试Thrift JDBC/ODBC驱动
./bin/beeline
连接JDBC/ODBC驱动
beeline& !connect jdbc:hive2:
可能需要输入用户和密码进行安全验证,在非安全模式下,只需要本机的用户名和空密码即可。通过hive-site.xml, core-site.xml 和 hdfs-site.xml配置Hive。ThriftJDBC驱动同时支持通过HTTP端口发送thrift RPC消息。通过hive-site.xml中的配置开启HTTP模式作为系统属性:
beeline可以通过http模式连接JDBC/ODBC
beeline& !connect jdbc:hive2://
通过Spark SQL CLI运行
CLI是在单点模式下执行Hive元数据服务和查询的命令工具,但它不能与Thrift JDBC驱动进行会话。
./bin/spark-sql
与Apache Hive的兼容性
Spark SQL设计时考虑对Hive metastore,SerDes以及UDF的兼容。目前是基于Hive-1.2.1版本,并且Spark SQL可以连到不同版本(0.12.0到1.2.1)的Hive metastore。Spark SQL Thrift JDBC可以直接在已经部署Hive的环境运行。
不支持的Hive功能
bucket表:butcket是Hive的哈希分区
unique join
字段统计信息
Hadoop归档文件
Hive的部分优化功能
Spark SQL和DataFrame支持以下数据类型
numeric类型
ByteType:单字节有符号整数
ShortType:2个字节的有符号整数
IntegerType:4字节整数
LongType:8字节整数
FloatType:4字节单精度浮点数
DoubleType:8字节双精度浮点数
DecimalType:任意精度有符号带小数的数值
String类型
Binary二进制类型
Boolean布尔类型
Datetime时间类型
TimestampType:时间戳类型
DateType:日期类型,只包含年月日
Complex复杂类型
ArrayType:数组类型
MapType:map类型
StructType:包含StructField序列的结构体
所有的数据类型都在org.apache.spark.sql.types中。
NaN是not a number的简写,用于处理不符合浮点数格式的float和double数据,其语义需要特殊处理:
NaN = NaN返回true
聚集过程中,所有NaN会被放到同一分组中
NaN在join过程中被看成普通的值
NaN在升序排序时放到最后,被认为是最大的数值
版权所有 爱编程 (C) Copyright 2012. . All Rights Reserved.
闽ICP备号-3
微信扫一扫关注爱编程,每天为您推送一篇经典技术文章。

我要回帖

更多关于 spark sql 数据库 的文章

 

随机推荐