0%
2.7k 字 10 分钟

群内2018_3月讨论整理

说明

以下内容来自群中出现的问题,大家讨论的结果
Q群:432600958
微信群:加微信w3aboutyun,附上about云铁粉

2018.03.29_01

问题描述

如何成为技术大牛

根据阿里的分享

do more
do better
do exercise
如何成为技术大牛

2018.03.29_02

问题描述

淘宝如何保持宝贝数量的一致性

思路

淘宝宝贝数量先减去购卖数,不为0就可以同步处理,处理失败再加回来。如果宝贝数量减到一,则竞争。

2018.03.28_01

问题描述

worker 与 executor 线程和进程的相关理解,以及引申为spark 分区等概念

相关思路讨论

MR的多进程 Vs Spark的多线程

executor 是进程,在其中执行的task 是线程,spark 所谓的多线程,是一个executor 中可以多个task 执行。  
MR 采用了多进程模型,Spark采用了多线程模式,这里指的是同一个节点上多个任务的运行模式。因为无论MR 和 Spark 整体上看,都是多进程:MR是由多个独立的Task 进程组成,Spark 应用程序的运行环境是由多个独立的Executor 进程构建的临时资源池构成的。
多进程模型便于细粒度控制每个任务占用的资源,但会消耗较多的启动时间,不适合运行低延迟类型的作业,这是MapReduce广为诟病的原因之一。而多线程模型则相反,该模型使得Spark很适合运行低延迟类型的作业。总之,Spark同节点上的任务以多线程的方式运行在一个JVM进程中,可带来以下好处:
1)任务启动速度快,与之相反的是MapReduce Task进程的慢启动速度,通常需要1s左右;
2)同节点上所有任务运行在一个进程中,有利于共享内存。这非常适合内存密集型任务,尤其对于那些需要加载大量词典的应用程序,可大大节省内存。
3)同节点上所有任务可运行在一个JVM进程(Executor)中,且Executor所占资源可连续被多批任务使用,不会在运行部分任务后释放掉,这避免了每个任务重复申请资源带来的时间开销,对于任务数目非常多的应用,可大大降低运行时间。与之对比的是MapReduce中的Task:每个Task单独申请资源,用完后马上释放,不能被其他任务重用,尽管1.0支持JVM重用在一定程度上弥补了该问题,但2.0尚未支持该功能。
尽管Spark的过线程模型带来了很多好处,但同样存在不足,主要有:
1)由于同节点上所有任务运行在一个进程中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源。与之相反的是MapReduce,它允许用户单独为Map Task和Reduce Task设置不同的资源,进而细粒度控制任务占用资源量,有利于大作业的正常平稳运行
ref: 董西城的解释

spark分区数,task数目,core数,worker节点个数,excutor数量梳理

输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block。
当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。
随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。
随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

  • 每个节点可以起一个或多个Executor。
  • 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。
  • 每个Task执行的结果就是生成了目标RDD的一个partiton。
    这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。

至于partition的数目:

  • 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
  • 在Map阶段partition数目保持不变。
  • 在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。

ref 梳理

扩展

一种理解:

executor 不是越多越好,假设你给spark 分配总的资源 为48 个vcore
(这个不是物理的CPU核,一个物理CPU核可以划分成多个vcore,是为了更细粒度的资源控制,
这样对应小任务较多情况,能提升资源利用率,打个比方,我一个CPU能分成4个vcore 和 1 个vcore,
我任务1,2,3,4 其实都只要使用1个vcore 即1/4 个物理CPU,在分成4个vcore 的时候,
一个CPU能并行处理4个任务,在分1个vcore时,只能串行)
然后96G 的内存
及 48c 96g
假设你的参数如下:
num-executor 8
executor-cores 8

8*8 =64 > 48 了,这个时候,只会给你 48/8 =6 个executor

同理对于
num-executor 8
executor-menory 18
8*18=144 > 96 了, 这个时候只会给你 96/8 =12 个executor

当 cores 和 menory 都超的时候,取小的,总之资源不能超总的。

在每个stage的时候,会根据partition的数据,划分出task的数量,一个vcore 同一时间只能处理一个task
假设有4个executor ,每个executor 有4个vcore,即同时处理的task数量为16 (这是你集群的处理能力)
那么当你的分区,只有4个的时候,即只有4个task,就意味着你的集群资源是空着的,没有利用满。(最好reparation)

当你的分区有32个的时候, 意味着需要 32/16 2轮能处理完。这时候,你再调大并行度,也没有用,集群资源就那么多。

还有关于并行度的提高,也并不是越大越好。
1是小文件的问题
2是任务太小的话,启动任务的时间占比 相对任务的处理时间占比也会很高,这样得不偿失。

另一种思路

一般每个partition对应一个task。在我的测试过程中,如果没有设置spark.default.parallelism参数,spark计算出来的partition非常巨大,与我的cores非常不搭。我在两台机器上(8cores 2 +6g 2)上,spark计算出来的partition达到2.8万个,也就是2.9万个tasks,每个task完成时间都是几毫秒或者零点几毫秒,执行起来非常缓慢。在我尝试设置了 spark.default.parallelism 后,任务数减少到10,执行一次计算过程从minute降到20second

spark.default.parallelism 的说明见 说明
一句话,就是触发shuffle 操作之后的默认分区数,相当于手动reparation

2018.03.28_02

问题描述

两个巨大的表,默认都要用reduce join. 且其中一个表中join依赖的相同key的数据大量重复.考虑到数据倾斜,这个partitioner怎么实现?

思路

这是一个半连接的问题
先把重复比较厉害的key 过滤了,(合并了) 再做map侧的join

ref:
如何利用spark快速计算笛卡尔积?
spark千万数据join问题?
MapReduce表连接之半连接SemiJoin

2018.03.28_03

问题描述

spark sql 的partition 数目

思路

我是这么理解Spark 200G 内存处理2T数据的。
打个比方,内存为100G 有2000G 的数据,一共10000个partition,每个partition就是200M (方便起见按1G=1000M 计算)
假设触发shuffle的stage1,每个partition处理的约为200M数据。
默认的配置,一个executor 1C2G。 2G内存处理200M数据,还是没问题的。
这样就有100/2=50 个executor,处理10000个task 一共需要10000/50 = 200轮。

stage1的数据处理的结果,你可以选择cache 到内存,也可以选择到disk。这样再进行stage2的操作。
(假设处理结果为1000G 还是10000个partition, 这样只能到disk 了)
那stage2处理方式一样,从disk读数据,然后进行10000/50 = 200轮 的操作。每个executor 有2G,处理100M 还是可以的。

hive的分区数目和Spark 的partition 数无关
你可以去hdfs上查看,hive的分区目录下也有分片的文件如:(假设分区字段为时间)
table/20180101/part-00000
~
table/20180101/part-00010
那这个分区下的文件,对应Spark 中10个partition
然后通过spark.sql.shuffle.partitions 参数来调节执行sql中shuffle 时的task数量

2017.03.27_01

问题描述:

如这样的 rdd(k,v):RDD[(String, String)]
需要对第一个字段 k进行归并,然后统计v去重之后出现次数最多的字符(分组 top1)。

例如
a,A
a,B
a,A
b,C
b,D
c,D
c,D

结果为 (a,A) (b,C) (c,D)

思路

1
2
3
4
5
6
7
8
9
10
11
12
13
rdd.map(x=>(x._1 + "|" +  x._2, 1))
.reduceByKey(_ + _)
.map(x=>(x._1.split("\\|", -1)(0), (x._1.split("\\|", -1)(1),x._2)))
.reduceByKey((x,y) =>{
val re = if (x._2 > y._2){
x
}else {
y
}
re
})
.map(x=>(x._1,x._2._1))

扩展

  • topN 的问题该如何处理? 除了groupBy 还有别的思路么? 效率?
  • 次数相同该如何处理?

2018.03.27_02

问题描述:

如这样的 rdd(k,v):RDD[(String, String)]
需要对第一个字段 k进行归并,然后统计v去重之后出现的次数。

例如
a,A
a,B
a,A
b,C
b,D
c,D
c,D

结果为 (a,2) (b,2) (c,1)

目前整理的几种思路是:

先进行reduceBy,将v合并为字符串,之后再拆分、去重,统计出现次数

1
2
3
map(a,b).reduceByKey((x1,x2)=>{
val sum =x1+"\t"+x2
sum}).map{x._2.split("\t").distinct.size}

将k,v 拼接为一个字符串,去重,之后再差分,然后再进行reduceBy

1
rdd.map(x=>(x._1 +"|" x._2)).distinct().map((_.split("|")(0),1)).reduceByKey(_+_)

将v 改为Set,reduce 之后再统计size

1
rdd.map(x=>(x._1, Set(x._2)).reduceByKey(_ ++ _).map(x=>(x._1, x._2.size))

使用dropDuplicates()

需要spark 2.x ,将rdd转为DataSet 之后再进行操作。

扩展

  • 四种思路比较? 是否有更好的解决方案?
  • 当v 也是多个字段该如何处理?
  • Set 和 String的序列化效率差异?即 String 合并之后再拆分的开销,和使用Set 序列化增长的开销 哪个比较大?