由此简单瘦身,MapReduce常见难点解答

原标题:通过不难瘦身,解决Dataworks 10M文本限制难点

摘要:大数额测算服务(马克斯Compute)的机能详解和选取体验

马克斯Compute(原ODPS) MapReduce常见难点解答,odpsmaxcompute分区

摘要:
用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JA卡宴和能源文件无法上传到Dataworks,导致力不从心使用调度去定期执行MapReduce作业。
消除方案: jar -resources test_mr.

点此查看原版的书文:http://click.aliyun.com/m/41384/

本文用到的

Ali云数加-大多少测算服务马克斯Compute产品地址:


用户在DataWorks上执行MapReduce作业的时候,文件大于10M的JA奥迪Q5和能源文件不可能上传到Dataworks,导致无法使用调度去定期执行MapReduce作业。

前言

1. 作业应运而生ClassNotFoundException和NoClassDefFoundError相当失利?

A:
对于ClassNotFoundException格外,一般是凭借的class不在你的jar包中,需求把正视的库打到作业jar包中可能独立上传,并在-resources中钦赐;
对此NoClassDefFoundError非凡,先看看依赖class是否存在于你的jar包,很多景况下是出于版本争持造成的,或然您依靠的库和服务端自带的jar有争辩。


化解方案:

MapReduce已经有文档,用户可以参考文书档案使用。本文是在文书档案的底子上做一些好像注脚及细节解释上的做事。

2. M凯雷德提交命令中-resources和-classpath的敞亮?

A:
在MaxCompute中就好像M奥德赛那类分布式数据处理框架,用户的代码一般在偏下八个地方执行:

  • 运行客户端的长河/子进度:那里的用户代码负责准备实施环境、配置职责参数、提交任务,入口常常是main
    class。它不受沙箱限制,执行逻辑由用户代码驱动。同样的,这里的classpath由用户配置,或在console中选用-classpath选项添加注重的classpath路径。
  • 长距离执行的worker进度:那里的代码负责履行多少处理逻辑,入口是mapper/reducer
    class。它受限沙箱限制,且执行逻辑由MaxCompute框架驱动。用户在命令行配置的-classpath在此间不算(分明,远程机器的路线和客户端机器的路子不能够确定保证同一),任何第③方重视必须作为resource提前上传至马克斯Compute,并在提交任务时利用-resources选项或JobConf.setResources(String[])来设定。

首先步:大于10M的resources通过马克斯Compute CLI客户端上传,

功能介绍

3. Mapper数目怎么着设置?

A:假使没有输入表是能够平素钦定map数目setNumMapTasks
   
有输入表的话,setNumMapTasks不奏效,须要通过setSplitSize来控制map数,默认是256M。


客户端下载地址:

MapReduce

4. Reducer数目怎么着设置?

A: 通过JobConf的接口setNumReduceTasks能够设置。
对此pipeline作业,Jobconf的接口同样可以安装,只可是设置后有着reduce阶段的个数都是均等的值。
只要要分等级设置,设置方法如下:
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)

.addReducer(SumReducer.class).setNumTasks(5)

.addReducer(IdentityReducer.class).setNumTasks(1).createPipeline();


客户端配置AK、EndPoint:

图片 1

5. 报错java.lang.OutOfMemoryError: Java heap space,M中华V的内部存款和储蓄器设置难点?

A:mapper或reducer的内存由两部分构成,JVM的heap memory和JVM
之外的框架相关内部存款和储蓄器。
   
设置JVM内部存款和储蓄器的接口是(都以Java逻辑的话,调节内存是用上边五个接口):
    setMemoryForMapperJVMsetMemoryForReducerJVM (默认是1024
单位MB)
    设置框架内部存款和储蓄器(c++部分的)的接口是(一般不须要设置):
    setMemoryForMapTasksetMemoryForReduceTask(默认是2048 单位MB)


add jar C:\test_mr\test_mr.jar -f;//添加财富

说起MapReduce就少不了WordCount,作者专门喜欢文书档案里的这么些图形。

6. mr 输出到表或有些分区里时,输出的形式时扩充仍旧覆盖 ?

A: 会覆盖输出表或分区在此之前的剧情


其次步:近来透过马克斯Compute
CLI上传的能源,在Dataworks右边财富列表是找不到的,只可以通过list
resources查看确认财富;

诸如有一张不小的表。表里有个String字段记录的是用空格分割开单词。最终索要总计全数记录中,每一种单词出现的次数是稍微。那完全的计量流程是

7. 二遍排序作用,MXC60相关配置解释,setMapOutputKeySchema? setOutputKeySortColumns? setPartitionColumns? setOutputGroupingColumns?

A:
平常情状下,GroupingColumns包罗在KeySortColumns中,KeySortColumns和PartitionColumns要含有在Key
schema中。

  • 在Map端,Mapper输出的Record会遵照设置的PartitionColumns总计哈希值,决定分配到哪个Reducer,会基于KeySortColumns对Record进行排序。
  • 在Reduce端,输入Records在依据KeySortColumns排序好后,会依据GroupingColumns内定的列对输入的Records实行分组,即会相继遍历输入的Records,把GroupingColumns所钦命列相同的Records作为一回reduce函数调用的输入。

list resources;//查看财富

输入阶段:依照工作量,生成多少个Mapper,把那么些表的数额分配给这一个Mapper。各种Mapper分配到表里的一有的记录。

8. 请问mr job的map恐怕reduce假诺想提前终止job, 执行如何代码?

A:
抛非凡就能够,例如throw new RuntimeException("XXX"); 会导致job失利,job也就终止了。


其三步:瘦身Jar,因为Dataworks执行M宝马7系作业的时候,一定要当地执行,所以保留个main就足以;

Map阶段:各样Mapper针对每条数据,解析当中的字符串,用空格切开字符串,获得一组单词。针对内部每种单词,写一条记下

9. 请问map阶段有时候为啥会有interrupted,不过map 最后照旧达成了?

A:因为有backup instance在跑,产生backup instance一般是因为有少数map
instances鲜明慢于任何的,就会在别的机器上运行二个如出一辙的worker来跑,那几个意义相近于hadoop的前瞻执行,只要在那之中某些成功跑完,别的的就能够停掉了(变为interrupted)


图片 2

Shuffle阶段-合并排序:也是发生在Mapper上。会先对数据开始展览排序。比如WordCount的例子,会依照单词举办排序。排序后的联结,又称Combiner阶段,因为前边已经依照单词排序过了,相同的单词都是连在一起的。那能够把1个相邻的集合成3个。Combiner能够减去在继续Reduce端的总结量,也能够削减Mapper往Reducer的数量传输的工作量。

10. mr如何获得输入表的信息?

A:
参考:
使用Mapper.TaskContext的接口getInputTableInfo(),会博得输入表的TableInfo对象
每一种map
worker只会处理来自单一表或分区的数码,在mapper的setup阶段得到该信息即可。


透过上述措施,大家可以在Dataworks上跑大于10M的M奇骏作业。

Shuffle阶段-分配Reducer:把Mapper输出的单词分发给Reducer。Reducer得到数码后,再做三次排序。因为Reducer得到的数据现已在Mapper里已经是排序过的了,所以那里的排序只是指向排序过的数目做联合排序。

11. 怎么样运用自定义partitioner ?

A: 参考如下代码:

import com.aliyun.odps.mapred.Partitioner;

...

public static class MyPartitioner extends Partitioner {

@Override
public int getPartition(Record key, Record value, int numPartitions) {
  // numPartitions即对应reducer的个数
  // 通过该函数决定map输出的key value去往哪个reducer
  String k = key.get(0).toString();
  return k.length() % numPartitions;
}
}

在jobconf里展开安装:jobconf.setPartitionerClass(MyPartitioner.class)
其它部供给要在jobconf里显著钦命reducer的个数:jobconf.setNumReduceTasks(num)


作者:隐林

Reduce阶段:Reducer拿前面已经排序好的输入,相同的单词的有着输入进去同二个Redue循环,在循环里,做个数的增加。

12. 怎么着设置Key排体系的一一(ASC or DESC)?

A: 类似如下: 
//key按那几个列排序
job.setOutputKeySortColumns(new String[] { "custid", "msgtype","amount" });
//设置每种列正序如故倒序
job.setOutputKeySortOrder(new SortOrder[]{SortOrder.ASC,SortOrder.ASC,SortOrder.DESC});


​本文为云栖社区原创内容,未经同意不得转发。回来腾讯网,查看更多

输出阶段:输出Reduce的乘除结果,写入到表里也许重回给客户端。

13. 报错kInstanceMonitorTimeout, usually caused by bad udf performance,怎么化解?

A:
报那一个错的原由是mapper也许reducer有逻辑执行时间专门长,且并未从输入表的读数据或然写出多少,超越暗中同意10min后,会报那些万分;有二种缓解办法:

  • 将过期的时辰调的更长一些,能够安装参数odps.function.timeout照旧安装JobConf#setFunctionTimeout,最长能够安装为3600,即三个时辰。
  • 定期向框架汇报心跳 TaskContext#progress(),注意progress不要调用过于频仍,否则有质量难点,能担保五次调用之间的时间低于设置的timeout时间即可。

主编:

拓展MapReduce

14. 框架map恐怕reduce接口里的Record对象是复用的?

A:是的,为了减小对象的支付,框架对于map,
reduce接口里的Record对象是复用的,也便是说每便map也许reduce的历次迭代,Record对象没有变,只是其中的多寡变化了。如若要保存上一回的Record要求toArray()得到个中的数码对象开展封存。具体能够参照:


借使Reduce后边还亟需做进一步的Reduce计算,能够用拓展MapReduce模型(简称M奥迪Q3酷路泽)。M安德拉逍客其实就是Reduce阶段甘休后,不间接出口结果,而是再一次通过Shuffle后接别的3个Reduce。

15. 写完一条记下后,想把outputRecord里面的数目清空,那么些要怎么弄,要不然,再写下一条记下的时候,假如某些字段没有值,就会用原来的笔录填充?

   
A:假诺写的Record对象是复用的,如若某些值没有新set,则仍然封存着后面的值对象。最近不曾直接能够清空的api能够用,能够经过Record.getColumnCount获得column
count,用四个for 循环去一一set null即可。


Q:如何贯彻M->奥迪Q5->M->奥德赛这种逻辑吗

16. M奥迪Q3协助多路输入输出,应该怎么写这么的程序?

    A:参考:多路输入输出示例
对于多路输入,每种输入源对应单独的三个Map阶段,即三个map
task只会读取四个输入表的数量。能够钦命一个表的多级分区列来作为三个输入,例如a,
b, c三分区列,钦定分区时能够内定a=1/b=1/c=2类似那样。
   
如若一致级其余八个分区,则供给各自作为独立的分区输入,例如多少个表的a=1和a=3分区作为多路输入的俩不比的输入,需求各自钦点。
    maponly的作业也同样协助多路输入输出,完毕方式类似。


A:在Reduce代码里直接嵌套上Map的逻辑就足以了,把第3个M的工作在前二个昂科威里完毕,而不是作为计量引擎调度规模上的多个单独步骤,比如

17. sdk如何通过instance获取logview url?

A: 能够行使如下的法门获得logview的url

RunningJob rj = JobClient.runJob(job);
com.aliyun.odps.Instance instance = SessionState.get().getOdps().instances().get(rj.getInstanceID());
String logview = SessionState.get().getOdps().logview().generateLogView(instance, 7 * 24);
System.out.println(logview);

reduce(){

18.  M奥迪Q7作业怎么着钦命输入表的Project名字?

A: 能够按如下的格局钦赐:

InputUtils.addTable(TableInfo.builder().projectName("test_project_name").tableName("test_table_name").build(), job);

通过TableInfo.builder()projectName接口来钦定,如若不钦命,暗许值是在运维MENVISION作业的老大project.


    …

19. 差别的Mapper可能Reducer怎么样获得可分别的ID?

A:
某些业务场景供给区分差异的Mapper或Reducer,能够经过TaskContextgetTaskID接口获取到二个Mapper/Reducer独有的id。

String id = context.getTaskID().toString();

    map();

20. MR代码里有JNI的调用该怎么写?

A:首先project要开通jni的有关权限,在编译准备好so文件后,须要将so以file类型的款式丰富为Resource,并在MTiggo作业提交的时候-resources参数里内定,例如:

add file libtestjni.so as libtestjni.so -f;
jar -resources testmr.jar,libtestjni.so -classpath testmr.jar Test.MRDriver xxx xxx;

在M福特Explorer的java代码应用jni的时候要小心,使用方式如下:

System.loadLibrary("testjni");    // 这里不要写成libtestjni.so,否则会报错,原因是java会自动添加lib前缀和.so后缀的

jni的行使办法能够参照:


}

21. M奥迪Q5作业读取表财富,Archive能源应该什么操作?

A: 马克斯Compute上的能源(file, table,
archive等)能够类比于Hadoop的DistributedCache来明白,同样是会散发到每一种计算节点上去,worker再从本地来读取,因此财富文件不可能过大,不然分发能源就是贰个瓶颈,近日默许有2G的总能源大小限制。
读取财富表,Archive财富总体上来说和读取file类型财富是相近的,只是利用的接口不一样。读取财富文件的办法能够参考文书档案:使用财富示例

对此表财富:
将表添加为能源表: add table xxx as xxx -f;
读能源表的接口为:TaskContext#readResourceTable

对于Archive资源:
将当地archive(.tar, .zip等archive文件)上传为财富: add archive as xxx
-f;
读archive财富的接口为:TaskContext#readResourceArchiveAsStream


…不断更新中…

MapReduce常见难点解答,odpsmaxcompute分区 本文用到的
阿里云数加-大数据测算服务MaxCompute产品地址:…

立刻开端

运维环境

工欲善其事,必先利其器。M宝马X3的开支提供了依照IDEA和Eclipse的插件。在那之中相比推荐用IDEA的插件,因为IDEA大家还在不断做迭代,而Eclipse已经终止做立异了。而且IDEA的功能也相比较丰盛。

具体的插件的装置方式步骤能够参考文档,本文不在赘言。

别的后续还需求用到客户端,能够参照文档安装。

后续为了进一步明亮地表明难点,笔者会尽量地在客户端上操作,而不用IDEA里已经合并的不二法门。

线上运转

以WordCount为例,文书档案可以参照这里

步骤为

做多少准备,包涵创造表和选择Tunnel命令行工具导入数据

将代码拷贝到IDE里,编写翻译打包成mapreduce-examples.jar

在odpscmd里执行add jar命令:

add jar /JarPath/mapreduce-examples.jar -f;

此地的/JarPath/mapreduce-examples.jar的路子要替换花费地实际的公文路径。那几个命令能把地面包车型客车jar包传到服务器上,-f是借使已经有同名的jar包就覆盖,实际应用中对此是报错照旧覆盖须要审慎考虑。

在odpscmd里执行

`jar -resources mapreduce-examples.jar -classpath
mapreduce-examples.jar

com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out`

等候作业执行成功后,能够在SQL通过查询wc_out表的数码,看到实行的结果

作用解读

任务交给

职务的是在马克斯Comput(ODPS)上运维的,客户端通过jar命令发起呼吁。

比较前边的神速开首,能够见见除了数据准备阶段,和MKuga相关的,有财富的上传(add
jar步骤)和jar命令运行MCR-V作业两步。

客户端发起add jar/add
file等能源操作,把在客户端的机器(比如笔者测试的时候是从我的台式机)上,运行职务涉及的能源文件传到服务器上。那样后边运维职分的时候,服务器上才能有相应的代码和文件能够用。假诺原先曾经传过了,这一步能够省略。

jar -resources mapreduce-examples.jar -classpath mapreduce-examples.jar
com.aliyun.odps.mapred.open.example.WordCount wc_in wc_out

本条命令发起作业。MapReduce的任务是运维在马克斯Compute集群上的,客户端须求经过那一个命令把任务运营相关的音信报告集群。

客户端先解析-classpath参数,找到main方法有关的jar包的岗位

依据com.aliyun.odps.mapred.open.example.WordCount,找到main方法所在类的门道和名字

wc_in wc_out是传给main方法的参数,通过解析main方法传入参数String[]
args获得这几个参数

-resources告诉服务器,在运营职务的时候,要求采纳的财富有何。

JobConfig

JobConf定义了那几个职责的细节,依然那个图,解释一下JobConf的别样设置项的用法。

输入数据

InputUtils.addTable(TableInfo table, JobConf conf)设置了输入的表。

setSplitSize(long size)通过调整分片大小来调整Mapper个数,单位
MB,暗中同意256。Mapper个数不通过void setNumMapTasks(int n)设置。

setMemoryForJVM(int mem)设置 JVM虚拟机的内部存款和储蓄器资源,单位:MB,私下认可值 1024.

Map阶段

setMapperClass(Class theClass)设置Mapper使用的Java类。

setMapOutputKeySchema(Column[] schema)设置 Mapper 输出到 Reducer 的
Key 行属性。

setMapOutputValueSchema(Column[] schema)设置 Mapper 输出到 Reducer 的
Value 行属性。和上个设置一起定义了Mapper到Reducer的多寡格式。

Shuffle-合并排序

setOutputKeySortColumns(String[] cols)设置 Mapper 输出到 Reducer 的
Key 排序列。

setOutputKeySortOrder(JobConf.SortOrder[] order)设置 Key
排系列的种种。

setCombinerOptimizeEnable(boolean
isCombineOpt)设置是或不是对Combiner实行优化。

setCombinerClass(Class theClass)设置作业的 combiner。

Shuffle-分配Reduce

setNumReduceTasks(int n)设置 Reducer 任务数,暗中同意为 Mapper 职分数的
四分一。假诺是Map
only的天职,要求安装成0。能够参照这里。

setPartitionColumns(String[]
cols)设置作业的分区列,定义了数码分配到Reducer的分红政策。

Reduce阶段

setOutputGroupingColumns(String[]
cols)数据在Reducer里排序好了后,是何等数据进入到同3个reduce方法的,正是看那里的装置。一般的话,设置的和setPartitionColumns(String[]
cols)一样。能够看到一遍排序的用法。

setReducerClass(Class theClass)设置Reducer使用的Java类。

数据输出

setOutputOverwrite(boolean
isOverwrite)设置对输出表是还是不是进行覆盖。类似SQL里的Insert into/overwrite
Talbe的界别。

OutputUtils.addTable(TableInfo table, JobConf
conf)设置了出口的表。多路输入输出能够参考这里。

其他

void setResources(String
resourceNames)有和jar命令的-resources一样的成效,不过优先级高于-resources(也正是说代码里的装置优先级相比高)

末尾通过JobClient.runJob(job);客户端往服务器发起了这些MapReduce作业。

详细的SDK的文档,可以在Maven里下载。这是下载地址。

Map/Reduce

读表

在3个Mapper里,只会读一张表,不一样的表的数据会在区别的Mapper
worker上运维,所以能够用示例里的这么些方法先取得这一个Mapper读的是怎么样表。

资源表/文件

财富表和文书能够让有个别小表/小文件能够方便被读取。鉴于读取数据的限制内需小于陆10次,一般是在setup里读取后缓存起来,具体的例子能够参照这里。

生育及周期调度

职务交给

客户端做的正是给服务器发起职责的调度的授命。在此之前提到的jar命令正是一种艺术。鉴于实际上运营情况的四种性,那里介绍任何的两种常见方法:

odpscmd
-e/-f:odpscmd的-e命令能够在shell脚本里直接运维一个odpscmd里的一声令下,所以能够在shell脚本里运维odpscmd
-e ‘jar -resources
xxxxxx’那样的下令,在shell脚本里调用MapReduce作业。三个完完全全的例子是

odpscmd  -u accessId  -p  accessKey  –project=testproject
–endpoint=  -e “jar -resources
aaa.jar -classpath ./aaa.jar com.XXX.A”

万一在odpscmd的安插文件里早已配备好了,那只须要写-e的一些。

-f和-e一样,只是把命令写到文件里,然后用odpscmd -f
xxx.sql引用那一个文件,那这一个文件里的三个指令都会被实践。

大数据开发套件能够陈设MapReduce作业。

大数额开发套件能够配备Shell作业。能够在Shell作业里参考下面的艺术用odpscmd
-e/-f来调度MapReduce作业。

在JAVA代码里向来调用MapReduce作业,能够由此设置SessionState.setLocalRun(false); 达成,具体能够参考这里。

定时调度

大数目开发套件的定时职责/工作流能够配备调度周期和任务正视,协作前边提到的不二法门里的MapReduce作业/Shell作业,完毕职务的调度。

产品范围

平安沙箱

沙箱是马克斯Compute的一套安全系统,使得在马克斯Compute上运转的学业不可能得到其余用户的音讯,也无能为力获得系统的有的音信。主要不外乎以下几点,完整的列表能够参见文档

不能够访问外部数据源(不可能当爬虫,不可能读凯雷德DS等)

心中无数起四线程/多进度

不帮衬反射/自定义类加载器(所以不匡助部分第2方包)

不一样意读本地文件(比如JSON里就用到了,就要求改用GSON)

不允许JNI调用

此外限制

详见马克斯Compute M本田CR-V限制项汇总