火了媒平台账号排序媒气灶火太大怎么办排的?

目前比较火的自媒体平台有哪些?哪个自媒体平台比价赚钱?_搜外问答
1、头条号头条号的注册比百家号的简单,但是头条号必须过新手期才能赚收益,过新手期因人而异,转正后可以赚收益根据你的阅读量,过万阅读可能是几块钱,过几十万也就几十,所以你的文笔一定要了得。优点:尊重不同用户的多元性,满足不同需求缺点:有一定的文笔的作者,才能赚取比较多的收益2、百度百家百度旗下的自媒体平台,是目前为止唯一一个一旦注册成功,发表文章就有收益的平台。但是准入门槛高,百度百家也是目前单价比较高的平台。优点:百度平台大,如果你发布的文章高质量、原创,很可能马上在全球最大中文搜索引擎上找到,收入可观。缺点:发文不像头条号那么随意,必须深度好文才有此希望。3、乐观号乐观号是自媒体交易服务平台,区别于头条、百家号这些自媒体平台。乐观号写手和作者可以入驻,一般几天都会通过,然后绑定自媒体账号可以自主的接单,基本上每月有一两百个单子,单篇的无流量无阅读的收益都是几十,阅读量高的话单篇可达上百上千,并且可以直接体现。优点:赚收益比较简单,功能比较强大缺点:并不是每天都有,需要等待平台派单4、UC平台UC推出的UC云观也是一个不错的自媒体平台,注册相对简单,注册后是一个试运营,非常容易通过,一般三天。但是通过试运营并不意味着有收入,还要开通广告服务才行,具体时间也要一个月到三个月左右。优点:订阅号主页定制,评论撩粉,素材管理,一步到位,缺点:知名度不高,使用的人群较少,难以获取流浪和推广。做自媒体想运营成品牌,内容需要打上自己独特的风格标签,内容要强化在读者心目当中的风格,如果这篇文章你能写,别人也能写,那么阅读率再高也没有用,因为他不具备长期品牌的构建。
管理文章:
微信自媒体实战案例
从零开始玩转微信自媒体
一周发布文章最多的作者
24小时获得「赞」最多的人
45 回复(55 赞)
16 回复(6 赞)
18 回复(5 赞)
(5 个回答)
(10 个回答)
(7 个回答)
(3 个回答)
(2 个回答)
(1 个回答)
(10 小时前)
(14 小时前)
(15 小时前)
(16 小时前)
(16 小时前)八卦的正确排列方法?_百度知道
八卦的正确排列方法?
八卦:乾、坤、坎、离、震、艮、巽、兑 的正确排列顺序是怎么样的?另外:在八卦图中,上、下、左、右、左上、左下、右上、右下 的分别是哪个卦?
我有更好的答案
逆时针方向,从正上开始:乾一,兑二,离三,震四,顺时针方向从正上开始是:乾一,巽五,坎六,艮七,坤八.正上为乾,下下为坤,左为离,右为坎.八卦:八卦图共分先天八卦及后天八卦两种。 八卦图共分两种,一种名为先天八卦,又名伏羲八卦,相传为伏羲氏的创作,据说此八卦乃太极图的演化。另一种则是后天八卦,双名文王八卦,相传是周文五的创作。 八卦为乾、坤、震、巽、离、坎、艮、兑。先天八卦与后天八卦的排列、象徵方位及所象徵的数,都有所不同。 先天八卦的所主是:1乾居南方,数目为一,与坤相对。2坤居北方,数目为八,与乾相对。3震居东北,数目为四,与巽相对。4巽居西南,数目为五,与震相对。5离居东方,数目为三,与坎相对。6坎居西方,数目为六,与离相对。7艮居西北,数目为七,与兑相对。8兑居东南,数目为二,与艮相对。 以上各相对的数字合,便为九,乾与坤合为九,震与巽合为九,离与坎合为九,艮与兑相合亦为九。 后天八卦与先天八卦所主的有异: 1离居南方,五行为火,数目为九,与坎相对。2坎居北方,五行为水,数目为一,与离相对。3震居东方,五行为木,数目为三,与兑相对。4兑居西方,五行为金,数目为七,与震相对。5乾居西北,五行为金,数目为六,与巽相对。6巽居东南,五行为木,数目为四,与乾相对。7坤居西南,五行为土,数目为二,与艮相对。8艮居东北,五行为土,数目为八,与坤相对。 八宅风水学所取的卦为后天八卦,此八卦中的每一卦皆主一位六亲:乾主父亲,坤主母亲,震主长男,巽主长女,坎主中男,离主中女,艮主少男,兑主少女。当家宅内某一方位不吉时,我们可根据该方位的卦,推断其不吉对谁的影响最大,同样道理,哪一方最吉,亦可据此方位的卦推断谁获益最大。
采纳率:83%
逆时针方向,从正上开始:乾一,兑二,离三,震四,顺时针方向从正上开始是:乾一,巽五,坎六,艮七,坤八.正上为乾,下下为坤,左为离,右为坎.
本回答被提问者采纳
为您推荐:
其他类似问题
八卦的相关知识
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。火了媒新手指南,一文教你看懂广告主如何派单? - 简书
火了媒新手指南,一文教你看懂广告主如何派单?
火了媒作为国内领先的新媒体广告交易平台,有上千万的媒体资源,是广告主的首选平台。第一步:注册1.打开火了媒网址点下方的注册。
2.进入注册页面后点广告主开始进行注册。用户名为数字或者字母,密码为数字或者字母,确认密码就重新输入密码,填入手机号,最后填入手机验证码。点立即注册。
第二步:派单1.登陆你的广告主后,上面有5个大板块,选择相对应得板板块,进行挑选资源,比如你要发自媒体客户端。
2.选择账号,可以在搜索框输入账号名字,也可以条件选择。勾选你所需要的资源,点立即投放,进入下一步。
3.点立即投放后就是添加文案。填写订单主体信息,订单标题就是要媒体主做什么事情,标题就是文章标题。文章正文内容一般是以word的文档上传附件的形式展现出来。正文内容那里不用填。
4.点提交订单,会有个提示,文章内容要符合发文规范。提交订单后如果账户余额充足,即下单完毕,如果账户余额不足,会提示你先充值。下完单后,火了媒会短信通知媒体主接单。订单信息会显示在我的订单那里,让你实时查看订单状态。
5.若媒体主拒单或者执行完毕后,会有短信通知你相应状况,你去我的订单那里查看详细状况。点击订单号即可查看。执行情况这里有媒体主返回的链接,如无异议退出即可,如有异议请联系客服。
第三步:充值1.进入用户中心点在线充值。
2.输入充值金额,勾选支付宝(只支持支付宝充值,若其余充值请联系客服)点充值。
3.用你的支付宝扫一扫二维码,然后输入支付密码直至付款成功,付款成功后,平台会自动在你的账户里增加相对应的金额即充值成功。
火了媒是新媒体资源广告交易首选平台
火了媒作为国内领先的新媒体广告交易平台,是各位自媒体号主的首选挣钱平台.很多新朋友不知道如何加入火了媒来挣钱,现在就由小编来告诉你怎么来使用火了媒。这里值得注意的是:火了媒是广告交易平台,是由广告主自行选择账号派单,媒体主选择接单挣钱。 工具/原料 一台电脑第一步:注册账号...
聚来宝官网注册推荐好友ID号:wang75 也可以直接点开聚来宝免费注册地址 http://www.julaibao.com/member/reg.aspx?refman=wang75 咨询QQ: 学习QQ群: 新会员如何起步 1.先完善...
___________________________________________________________________________ 一、小额融资聚集人气
一个网站,要想能够发展起来,必须具备三大因素:定位,流量,资金。三者缺一不可,有创新的定位,足够...
用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金 Cover 有什么料? 从这篇文章中你能获得这些料: 知道setContentView()之后发生了什么? ... Android 获取 View 宽高的常用正确方式,避免为零 - 掘金 相信有很多...
用两张图告诉你,为什么你的 App 会卡顿? - Android - 掘金Cover 有什么料? 从这篇文章中你能获得这些料: 知道setContentView()之后发生了什么? ... Android 获取 View 宽高的常用正确方式,避免为零 - 掘金相信有很多朋友...
所有的故事,开始和结束都没有电影里那般惊天动地,也不似小说里那般不期而遇。只是在某一个契合的时间点与一个对的人开始了青春的一场盛宴,只是谁也没有料想故事的原委竟是这般狼狈不堪。故事结束了,那些幻境中的东西若隐若现,无数次闪过脑海里。纠缠着我们在现实和回忆里千回百转。 已是1...
姓名:何飞 性别:男 身高:173 笔名:千秋梦 家乡:云南昭通 学校:云南大学 专业:缅甸语 现居地址:缅甸曼德勒外国语大学 爱好:骑行,拍照,书法,听歌(古风),写文章,看书,旅游,对古诗词和现代诗情有独钟……… 微信:hy 希望遇到志趣相投的朋友,无...
1. 我需要改变 我问问自己:&毕业5年,从制药行业转到物流,再到现在的电商,是否满足现状?” 回顾5年来走的路,虽然遇到不少坑,走到今天也不容易,情感上给自己一个及格分,但是我明白我的潜力不止于此。我的学习能力并不差,甚至有点“小聪明”,大脑就像一台机器,你不用它就会慢慢...
哈罗,《繁星似水》是我的第一部中篇小说连载,讲述一个性格软弱,从小被严厉的家教束缚性格和情感的女孩林雪,她的青春,她的爱情,她的婚姻,最终她是否能与深爱着她的江左修成正果呢? 以下是目录~ 繁星似水(一)离开(上) 繁星似水(二)离开(下) 繁星似水(三)初识舍友 繁星似水...腾讯qq群成员怎么排序的,为什么我每个群都是,我排群主的前面呀?_百度知道
腾讯qq群成员怎么排序的,为什么我每个群都是,我排群主的前面呀?
我有更好的答案
如果大家同时在线或者离线,默认是按照群昵称的拼音首字母进行排序,数字字母优先于汉字;QQ会员排在同类别成员的最前面,会员之间也是按照群昵称的首字母排序。管理员排在群员前面,群主排在管理员前面。如果你不是会员,但是你在线,其他人都不在线(离线或隐身),无论那些离线的人是会员、群主还是管理员,都会排在你后面。也就是说,离线的人永远排在在线的人后面。所以我猜测你的QQ群里面都没多少人上线吧。你可以在上线的人比较多的时候再看看你在群成员列表内的排位。
如果大家同时在线或者离线,默认是按照群昵称的拼音首字母进行排序,数字字母优先于汉字;QQ会员排在同类别成员的最前面,会员之间也是按照群昵称的首字母排序。管理员排在群员前面,群主排在管理员前面。 如果你不是会员,但是你在线,其他人都不在线(离线或隐身),无论那些离线的人是会员、群主还是管理员,都会排在你后面。也就是说,离线的人永远排在在线的人后面。 所以我猜测你的QQ群里面都没多少人上线吧。你可以在上线的人比较多的时候再看看你在群成员列表内的排位。
本回答被提问者和网友采纳
如果大家同时在线或者离线,默认是按照群昵称的拼音首字母进行排序,数字/字母优先于汉字;QQ会员排在同类别成员的最前面,会员之间也是按照群昵称的首字母排序。管理员排在群员前面,群主排在管理员前面.如果你不是会员,但是你在线,其他人都不在线(离线或隐身),无论那些离线的人是会员、群主还是管理员,都会排在你后面。也就是说,离线的人永远排在在线的人后面。
不能排序,正常情况下群主、管理员在前,如果状态Q我吧,然后会员VIP7、6、5、4、3、2、1排,
其他1条回答
为您推荐:
其他类似问题
您可能关注的内容
腾讯qq的相关知识
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。查看: 9412|回复: 1
大数据:Spark排序算子sortByKey来看大数据平台下如何做排序
主题帖子积分
问题导读:
1. 排序算子是如何做排序的?
2. 完整的排序流程是?
解决方案:
在前面一系列博客中,特别在Shuffle博客系列中,曾描述过在生成ShuffleWrite的文件的时候,对每个partition会先进行排序并spill到文件中,最后合并成ShuffleWrite的文件,也就是每个Partition里的内容已经进行了排序,在最后的action操作的时候需要对每个executor生成的shuffle文件相同的Partition进行合并,完成Action的操作。
排序算子和常见的reduce算子算法有何区别?
常见的一些聚合、reduce算子,不需要排序
将相同的hashcode分配到同一个partition,哪怕是不同的executor在做最后的合并的时候,只需要合并不同的executor里相同的partition就可以了对每个partition进行排序,考虑内存因数,解决相同的Partition多文件合并的问题,使用外排序进行相同的key合并
下面是一个常见的排序的小例子:
[Scala] 纯文本查看 复制代码
package spark.sort
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object sortsample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(&sortsample&)
val sc = new SparkContext(conf)
var pairs = sc.parallelize(Array((&a&,0),(&b&,0),(&c&,3),(&d&,6),(&e&,0),(&f&,0),(&g&,3),(&h&,6)), 2);
pairs.sortByKey(true, 3).collect().foreach(println);
核心代码:OrderedRDDFunctions.scala
会很奇怪么?RDD里面并没有sortByKey的方法?在这里和前面博客里提到的PairRDDFunctions一样,隐式转换:
[Scala] 纯文本查看 复制代码
implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])
: OrderedRDDFunctions[K, V, (K, V)] = {
new OrderedRDDFunctions[K, V, (K, V)](rdd)
调用的是OrderedRDDFunctions.scala里的方法
[Scala] 纯文本查看 复制代码 def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
对Partition采用了范围分配的策略,为何要使用范围分配的策略?
对其它非排序类型的算子,使用散列算法,只要保证相同的key是分配在相同的partition就可以了,并不会影响相同的key的合并,计算。对排序来说,如果只是保证相同的key在相同的Partition并不足够,最后还是需要合并所有的Partition进行排序合并,如果这发生在Driver端做这件事,将会非常可怕,那么我们可以做一些策略改变,制定一些Range,使排序相近的key分配到同一个Range上,在把Range扩大化,比如:一个Partition管理一个Range
_104124.png (24.68 KB, 下载次数: 0)
10:42 上传
2.1 分配Range
Range的分配不合理,会影响数据的不均衡,导致executor在做同Partition排序的时候会不均衡,并行计算的整体性能往往会被单个最糟糕的运行节点所拖累,如果提高运算的速度,需要考虑数据分配的均衡性。
2.1.1 每个区块采样大小
获取所有的key,依据所有的Key制定区间,这显然是不明智的,后果变成一个全量数据的排序。我们可以采用部分采样的策略,基于采样数据进行区间划分,首先我们需要评估一个简单的采样大小的阈值。
Partitioner.scala rangeBounds
代码如下:
[Scala] 纯文本查看 复制代码val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
partitions: 参数在指定sortByKey的时候设置的区块大小:3
[Scala] 纯文本查看 复制代码pairs.sortByKey(true, 3)
rdd.partitions: 指的是在数据的分区块大小:2
[Scala] 纯文本查看 复制代码sc.parallelize(Array((&a&,0),(&b&,0),(&c&,3),(&d&,6),(&e&,0),(&f&,0),(&g&,3),(&h&,6)), 2)
每个区块需要采样的数量是通过几个固定参数来计算
[Scala] 纯文本查看 复制代码val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
2.1.2 Sketch采样(蓄水池采样法)
[Scala] 纯文本查看 复制代码
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =&
val seed = byteswap32(idx ^ (shift && 16))
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
mapPartitionsWithIndex, collection 这些都是RDD ,都是需要在提交job进行运算的,也就是采样的过程中,是通过executor执行了一次job
[Scala] 纯文本查看 复制代码
def reservoirSampleAndCount[T: ClassTag](
input: Iterator[T],
seed: Long = Random.nextLong())
: (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
while (i & k && input.hasNext) {
val item = input.next()
reservoir(i) = item
// If we have consumed all the elements, return them. Otherwise do the replacement.
if (i & k) {
// If input size & k, trim the array to return only an array of input size.
val trimReservoir = new Array[T](i)
System.arraycopy(reservoir, 0, trimReservoir, 0, i)
(trimReservoir, i)
// If input size & k, continue the sampling process.
var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
// There are k elements in the reservoir, and the l-th element has been
// consumed. It should be chosen with probability k/l. The expression
// below is a random long chosen uniformly from [0,l)
val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex & k) {
reservoir(replacementIndex.toInt) = item
(reservoir, l)
函数reservoirSampleAndCount采样
当数据小于要采样的集合的时候,可以使用数据为样本当数据集合超过需要采样数目的时候会继续遍历整个数据集合,通过随机数进行位置的随机替换,保证采样数据的随机性
返回的结果里包含了总数据集,区块编号,区块的数量,每个区块的采样集
2.1.3 重新采样
为了避免某些区块的数据量过大,设置了一个阈值:
[Scala] 纯文本查看 复制代码val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
阈值=采样数除于总数据量,当某个区块的数据量*阈值大于每个区的采样率的时候,认为这个区块的采样率是不足的,需要重新采样
[Scala] 纯文本查看 复制代码val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates ++= reSampled.map(x =& (x, weight))
2.1.4 采样集key的权重
我们在前面对每个区进行了相同数量的采样(不包含重新采样),但是每个区的数量有可能是不均衡的,为了避免不均衡性需要对每个区采样的key进行权重设置,尽量分配高权重给数据量多的区
权重因子:
[Scala] 纯文本查看 复制代码val weight = (n.toDouble / sample.length).toFloat
n 是区的数据数量
sample 是采样的数量
这里权重的最小值是1,因为采样的数量肯定是小于等于数据
当数据量大于采样数量的时候,每个区的采样数量是相同的,那么意味着区的数据量越大,该区块的key的权重也就越大
2.1.5 分配每个区块的range
样本已经采集好了,现在需要对依据样本进行区块的range进行分配
先对样本进行排序依据每个样本的权重计算每个区块平均所分配的权重最后通过每个区分配的权重按照顺序来决定获取哪些样本用作range,一个区分配一个样本区间
[Scala] 纯文本查看 复制代码
def determineBounds[K : Ordering : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
val ordered = candidates.sortBy(_._1)
val numCandidates = ordered.size
val sumWeights = ordered.map(_._2.toDouble).sum
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
val bounds = ArrayBuffer.empty[K]
var previousBound = Option.empty[K]
while ((i & numCandidates) && (j & partitions - 1)) {
val (key, weight) = ordered(i)
cumWeight += weight
if (cumWeight &= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
previousBound = Some(key)
bounds.toArray
2.2 ShuffleWriter
在以前的博客里介绍了SortShuffleWrite,在sortByKey的排序情况下使用了BypassMergeSortShuffleWriter,把焦点聚焦到key如何分配到Partitioner和每个Partition的文件将会如何写入key,value生成Shuffle文件,在这两点上BypassMergeSortShuffleWriter将明显的不同于SortShuffleWrite
[Scala] 纯文本查看 复制代码while (records.hasNext()) {
final Product2&K, V& record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
2.2.1 分配key到Partition
在函数调用了partitioner.getPartition方法,还是回到RangePartitioner类中
[Scala] 纯文本查看 复制代码 def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
if (rangeBounds.length &= 128) {
// If we have less than 128 partitions naive search
while (partition & rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 1
// Determine which binary search method to use only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition & 0) {
partition = -partition-1
if (partition & rangeBounds.length) {
partition = rangeBounds.length
if (ascending) {
rangeBounds.length - partition
当Partition的分配数小于128的时候,轮训的查找每个Partition当Partition大于128的时候,使用二分法查找Partition
2.2.2 生成shuffle文件
基于前面对key进行排序的partition的分配,写到对应的partition文件中合并Partition文件生成index和data文件(shuffle_shuffleid_mapid_0.index)(shuffle_shuffleid_mapid_0.data)因为Partition已经合并了,最后一位reduceID都是为0
_104930.png (32.21 KB, 下载次数: 0)
10:51 上传
注意:在这里并没有象SortShuffleWrite 对每个Partition进行排序,Spill 文件,最后合并文件,而是直接写到了Partition文件中。
2.3 Shuffle Read读取Shuffle文件
在BlockStoreShuffleReader的read函数里
[Scala] 纯文本查看 复制代码
dep.keyOrdering match {
case Some(keyOrd: Ordering[K]) =&
// Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
// the ExternalSorter won't spill to disk.
val sorter =
new ExternalSorter[K, C, C](context, ordering = Some(keyOrd), serializer = dep.serializer)
sorter.insertAll(aggregatedIter)
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
case None =&
aggregatedIter
ExternalSorter.insertAll函数
[Scala] 纯文本查看 复制代码 while (records.hasNext) {
addElementsRead()
val kv = records.next()
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
maybeSpillCollection(usingMap = false)
ExternalSorter函数,这个函数在前面的这篇博客里介绍的比较清楚,这里使用了buffer结构体
[Scala] 纯文本查看 复制代码 @volatile private var map = new PartitionedAppendOnlyMap[K, C]
@volatile private var buffer = new PartitionedPairBuffer[K, C]
在reduceByKey的这些算子相同的Key是需要合并的,所以需要使用Map结构处理相同的Key的值的合并问题,而对排序来说,并不需要相同的值合并,使用Array结构就可以了。
注:在Spark上实现Map、Array都使用了数组的结构,并没有用链表结构
_105134.png (47.56 KB, 下载次数: 0)
10:53 上传
在上图的PartitionPairBuffer结构中,有以下几点要注意:
插入KV结构的时候,不进行排序,也就是在处理相同的Partition的时候直接读取插入Array
会存在当内存不够Spill到磁盘的情况,关于Spill请具体参考博客链接
2.3.1 排序
当ExternalSorter.insertAll函数完成后,才会构建一个排序的迭代器
[Scala] 纯文本查看 复制代码
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
val collection: WritablePartitionedPairCollection[K, C] = if (usingMap) map else buffer
val usingMap = aggregator.isDefined
if (spills.isEmpty) {
// Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
// we don't even need to sort by anything other than partition ID
if (!ordering.isDefined) {
// The user hasn't requested sorted keys, so only sort by partition ID, not key
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
// We do need to sort by both partition ID and key
groupByPartition(destructiveIterator(
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
// Merge spilled and in-memory data
merge(spills, destructiveIterator(
collection.partitionedDestructiveSortedIterator(comparator)))
这里分成两种情况:
还在内存里没有Spill到文件中去,这时候构建一个内存里的PartitionedDestructiveSortedIterator迭代器,在迭代器中已经排序好了PartitionPairBuffer里的内容
[Scala] 纯文本查看 复制代码
/** Iterate through the data in a given order. For this class this is not really destructive. */
override def partitionedDestructiveSortedIterator(keyComparator: Option[Comparator[K]])
: Iterator[((Int, K), V)] = {
val comparator = keyComparator.map(partitionKeyComparator).getOrElse(partitionComparator)
new Sorter(new KVArraySortDataFormat[(Int, K), AnyRef]).sort(data, 0, curSize, comparator)
Spill到文件里的,文件里的已经排好序了,需要对内存里的PartitionPairBuffer进行排序(和前面一种情况相同的处理),最后对文件和内存进行外排序(外排序可参考博客)
2.4 最后的归并
在Driver端Dag-scheduler-event-loop 线程中会处理每个executor返回的结果(刚才Partition排序后的结果)
[Scala] 纯文本查看 复制代码
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
case Success =&
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =&
// Cast to ResultStage here because it's part of the ResultTask
// TODO Refactor this out to a function that accepts a ResultStage
val resultStage = stage.asInstanceOf[ResultStage]
resultStage.activeJob match {
case Some(job) =&
if (!job.finished(rt.outputId)) {
updateAccumulators(event)
job.finished(rt.outputId) = true
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
// taskSucceeded runs some user code that might throw an exception. Make sure
// we are resilient against that.
job.listener.taskSucceeded(rt.outputId, event.result)
case e: Exception =&
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
通过方法taskSucceeded的方法进行不同的Partition的合并
[Scala] 纯文本查看 复制代码job.listener.taskSucceeded(rt.outputId, event.result)
[Scala] 纯文本查看 复制代码
override def taskSucceeded(index: Int, result: Any): Unit = {
// resultHandler call must be synchronized in case resultHandler itself is not thread safe.
synchronized {
resultHandler(index, result.asInstanceOf[T])
if (finishedTasks.incrementAndGet() == totalTasks) {
jobPromise.success(())
实际上是调用了resultHandler方法,我们来看看resultHandler是怎样定义的
[Scala] 纯文本查看 复制代码
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) =& U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) =& results(index) = res)
在runJob的方法里
[Scala] 纯文本查看 复制代码
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) =& U,
partitions: Seq[Int],
resultHandler: (Int, U) =& Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException(&SparkContext has been shutdown&)
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo(&Starting job: & + callSite.shortForm)
if (conf.getBoolean(&spark.logLineage&, false)) {
logInfo(&RDD's recursive dependencies:\n& + rdd.toDebugString)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
[Scala] 纯文本查看 复制代码(index, res) =& results(index) = res)
构建了一个数组result,将每个Partition的数值保存到result的数组里
result[0]=partition[0] =array(tuple&k,v&,tuple&k,v&.....)
什么时候对所有的Partition最后合并呢?
来看RDD的collect算子
[Scala] 纯文本查看 复制代码
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) =& iter.toArray)
Array.concat(results: _*)
runJob返回的是result的数组,每个Partition是管理不同的范围,最后的合并只要简单的将不同的Partition合并就可以了
3. 排序完整的流程
Driver 提交一个采样任务,需要Executor对每个Partition进行数据采样,数据采样是一次全数据的扫描Driver 获取采样数据,每个Partition的数据量,依据数据量的权重,进行Range的分配Driver 开始进行排序,先提交ShuffleMapTask ,Executor对分配到自己的数据基于Range进行Partition的分配,直接写入Shuffle文件中Driver 提交ResultTask,Executor读取Shuffle文件中相同的Partition进行合并(相同的key不做值的合并)、排序Driver 接收到ResultTask的值后,最后进行不同的Partition数据合并
转自:csdn
作者:raintungli
本帖被以下淘专辑推荐:
& |主题: 258, 订阅: 6
主题帖子积分
高级会员, 积分 1527, 距离下一级还需 3473 积分
高级会员, 积分 1527, 距离下一级还需 3473 积分
站长推荐 /4
会员注册不成功的原因
新手获取积分方法
hadoop3.0学习:零基础安装部署hadoop集群
about云课程:大数据日志实时分析
Powered by

我要回帖

更多关于 媒氣火搶 的文章

 

随机推荐