当前位置: 首页 > web开发 > 工具 > 正文

DistributtedShell的container在所有节点上如何仅执行一次

时间:2016-05-31 csdn博客 jiewuyou

问题

在上Hadoop2培训课的时候,老师出了这么一道题

修改Distributedshell的源代码,使得用户提供的命令(由“--shell_command”参数指定)可以在所有节点上仅执行一次。(目前的实现是,如果该命令由N个task同时执行,则这N个task可能位于任意节点上,比如都在node1上。)

修改代码

该问题需要在两个地方对源码进行修改:

用参数指定配置是否生效

让每一个container运行在不同的节点上

博客将主要介绍过程2的实现过程,主要思路是首先获取节点列表,再在申请container时,指定节点。具体过程如下:

打开源码。编译好Hadoop-2.3.0之后,用Eclipse打开工程,DistributedShell的源码的位置在/hadoop-2.3.0-src/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java

获取计算节点列表。定义nodeList用于保存计算节点列表,在ApplicationMaster的init()函数中添加初始化nodeList的代码。初始化完成后,nodeList中保存有计算节点的列表(不包括RM 节点)。

public class ApplicationMaster {  
    // 所有计算节点  
    private static List nodeList = new ArrayList();  
    public boolean init(String[] args) throws ParseException, IOException {  
        //该函数的末尾添加如下代码,用于获取计算节点列表  
        try {  
        YarnClient yarnClient = YarnClient.createYarnClient();  
            yarnClient.init(conf);  
            yarnClient.start();  
            List<NodeReport> clusterNodeReports;  
        clusterNodeReports = yarnClient.getNodeReports(  
                NodeState.RUNNING);  
             for (NodeReport node : clusterNodeReports) {  
             this.nodeList.add(node.getNodeId().getHost());  
                }    
    } catch (YarnException e) {  
        // TODO Auto-generated catch block  
            e.printStackTrace();  
    }  
    return true;  
    }  
}

本栏目更多精彩内容:http://www.bianceng.cn/webkf/tools/

让container运行在不同的节点上。申请资源的时候,会调用函数setupContainerAskForRM,修改该函数即可,函数如下:

private ContainerRequest setupContainerAskForRM() {  
    // setup requirements for hosts  
    // using * as any host will do for the distributed shell app  
    // set the priority for the request  
    Priority pri = Records.newRecord(Priority.class);  
    // TODO - what is the range for priority? how to decide?  
    pri.setPriority(requestPriority);  
    // Set up resource type requirements  
    // For now, memory and CPU are supported so we set memory and cpu  
    // requirements  
    Resource capability = Records.newRecord(Resource.class);  
    capability.setMemory(containerMemory);  
    capability.setVirtualCores(containerVirtualCores);  
    String []nodes=null;  
    if(!nodeList.isEmpty()){  
            nodes=new String[1];  
        nodes[0]=(String) nodeList.get(0);  
        nodeList.remove(0);  
    }  
    ContainerRequest request = new ContainerRequest(capability, nodes, null,  
            pri);//默认的nodes为null  
    LOG.info("Requested container ask: " + request.toString());  
    return request;  
}

改好之后,打成jar包,覆盖${HADOOP_HOME}/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar即可生效;

验证,书写如下脚本并运行。发现3个container运行在不同的节点上,表示改写成功

bin/hadoop jar \
share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar \
org.apache.hadoop.yarn.applications.distributedshell.Client \
--jar share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.3.0.jar \
--shell_command "ls" \
--num_containers 3 \
--container_memory 512 \
--container_vcores 1 \
--master_memory 350 \
--priority 10

总结

在获取计算节点列表时,被卡住了,最后在和别人交流的时候,知道ApplicationMaster通过yarnClient可以从RM中获取计算节点列表。最后将问题解决了。