0%
1.2k 字 5 分钟

群内2018_4月讨论整理1

说明

以下内容来自群中出现的问题,大家讨论的结果
Q群:432600958
微信群:加微信w3aboutyun,附上about云铁粉

2018.04.13_01

问题描述

面试资源题目分享

资料

TOP 25大常见Hadoop面试题及答案
Spark面试题汇总 密码:bcpc

2018.04.12_01

问题描述

sklearn 训练的模型如何在spark streaming 中使用

解决思路

使用PMML
虽然spark streaming 有的模型不支持
可以使用 java 导入,调用java类

2018.04.11_01

问题描述

已经建立好hive表,利用spark-streaming 往对应的hdfs目录写数据,却无法select出来。

分析

hive建好表,对应的目录为
/user/hive/warehouse/testdb.db/cljwifi

假设文件为file1
/user/hive/warehouse/testdb.db/cljwifi/file1
这种形式,访问正常

spark-streaming 落盘数据,假设目录已经存在会报错,因而只能新建一个目录,比如
/user/hive/warehouse/testdb.db/cljwifi/dir1/file1
这样的情况是无法访问的

解决思路

1 建立分区表

这样spark-streaming 落盘的目录,加上分区字段
/user/hive/warehouse/testdb.db/cljwifi/day=20180304

2 mv数据

spark-streaming 落盘结束之后,编写定时脚本,移动数据

2018.04.10_01

问题描述

在代码中设定了appName
val sparkConf = new SparkConf().setAppName(“name1”)
提交时用了—name参数
spark-submit —master yarn-cluster —name name2
最终在yarn 的监控上显示的name2

解释

代码中setMaster永远生效,优先级高于—master
代码中的setAppName 当 yarn-client 时,优先级高于—name 当yarn-cluster 模式时,优先级低于—name
原因待阅读源码细查

2018.04.10_02

问题描述

Yarn 监控界面中的Reserve 的作用

解释

An implementation detail of this change that prevents applications from starving under this new flexibility is the notion of reserved containers. Imagine two jobs are running that each have enough tasks to saturate more than the entire cluster. One job wants each of its mappers to get 1GB, and another job wants its mappers to get 2GB. Suppose the first job starts and fills up the entire cluster. Whenever one of its task finishes, it will leave open a 1GB slot. Even though the second job deserves the space, a naive policy will give it to the first one because it’s the only job with tasks that fit. This could cause the second job to be starved indefinitely.

One big change in the YARN Fair Scheduler is how it defines a “resource”.
To prevent this unfortunate situation, when space on a node is offered to an application, if the application cannot immediately use it, it reserves it, and no other application can be allocated a container on that node until the reservation is fulfilled. Each node may have only one reserved container. The total reserved memory amount is reported in the ResourceManager UI. A high number means that it may take longer for new jobs to get space.

简单来说,就是一个任务在执行的时候,预计还需要一定的资源,先声明了,将这些资源抢了,其他任务就无法使用这部分资源,这些资源就是 resever

当一个NodeManager上的资源不足以满足当前一个Application的请求却有不得不分配给这个Application时,当前节点会为此Application预留资源,逐渐累加空余的剩余资源直至满足要求后才把资源封装成一个Container发给ApplicationMaster。如果一个Container已经被创建,并且处在剩余资源的累加过程中,它就处于上图中的RESERVED状态。当此Container已经分配给ApplicationMaster,并且此时ApplicationMaster还没发送通知说它已经得到了资源时,此Container处于ALLOCATED状态,直至ApplicationMaster发送通知给ResourceManager说它已经拿到了资源,则状态变为ACQUIRED。

ref:
Improvements in the Hadoop YARN Fair Scheduler
Yarn中的几种状态机

2018.04.04_01

问题描述

val rdd = sc.parallelize(Array(“a”,”ab”,”abc”,”abcd”,””),2)
val rdd3 = rdd.aggregate(“”)((x,y)=>(x.length.toString + y.length.toString),(x,y)=>(x.length.toString + y.length.toString))
问rdd3 的结果为什么是”22”

过程

aggregate 之后,对两个分区分别进行seqOP
第一个分区 “a” “ab”
“”.length.toString + “a”.length.toString = “0”+”1” = “01”
“01”.length.toString + “ab”.length.toString = “2”+”2” = “22”

第二个分区 “abc” “abcd” “”
“”.length.toString + “abc”.length.toString = “0”+”3” = “03”
“04”.length.toString + “abcd”.length.toString = “2” + “4” = “24”
“24”.length.toString + “”.length.toString = “2” + “0” = “20”

然后 对 “22” “20” 进行 comOP
“”.length.toString + “22”.length.toString = “0” + “2” = “02”
“02”.length.toString + “20”.length.toString = “2” + “2” = “22”

拓展

2018.04.04_02

问题描述

请教大家一个问题
一个目录下有100个文件,假设都是SequenceFile,但是其中有1个文件为空,大小为0(由于集群限制无法删除)
spark 运行时报错 not a SequenceFile
(文件名比较奇怪,无法在提交脚本中用正则过滤)
请问该如何处理?

解决思路

那相当于在 sc.sequenceFileLongWritable,Text 之前 ,先遍历一次inputs,然后其中不为空的文件,放入一个数组input2 ,然后执行sc.sequenceFileLongWritable,Text

1
val file_rdd = sc.sequenceFile[LongWritable,Text](inputs)

改为

1
2
3
4
5
val conf = new Configuration()
val fileSystem = FileSystem.get(conf)
val children = fileSystem.listStatus(new Path(inputs))
val input2 = for(path <- children if (path.getLen > 0)) yield {path.getPath.toString}
val file_rdd = sc.sequenceFile[LongWritable,Text](input2.mkString(","))