博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce源代码分析之JobSubmitter(一)
阅读量:5934 次
发布时间:2019-06-19

本文共 11988 字,大约阅读时间需要 39 分钟。

        JobSubmitter。顾名思义,它是MapReduce中作业提交者,而实际上JobSubmitter除了构造方法外。对外提供的唯一一个非private成员变量或方法就是submitJobInternal()方法,它是提交Job的内部方法,实现了提交Job的全部业务逻辑。

本文,我们将深入研究MapReduce中用于提交Job的组件JobSubmitter。

        首先,我们先看下JobSubmitter的类成员变量。例如以下:

// 文件系统FileSystem实例  private FileSystem jtFs;  // client通信协议ClientProtocol实例  private ClientProtocol submitClient;  // 提交作业的主机名  private String submitHostName;  // 提交作业的主机地址  private String submitHostAddress;
        它一共同拥有四个类成员变量,分别为:

        1、文件系统FileSystem实例jtFs:用于操作作业执行须要的各种文件等。

        2、client通信协议ClientProtocol实例submitClient:用于与集群交互。完毕作业提交、作业状态查询等;

        3、提交作业的主机名submitHostName。

        4、提交作业的主机地址submitHostAddress。

        当中,client通信协议ClientProtocol实例submitClient是通过Cluster的client通信协议ClientProtocol实例client来赋值的,我们在一文中以前提到过。它依据MapReduce中參数mapreduce.framework.name的配置为yarn或local。有Yarn模式的YARNRunner和Local模式的LocalJobRunner两种情况。

        接下来,我们再看下JobSubmitter的构造函数,例如以下:

JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)   throws IOException {	  	// 依据入參赋值成员变量submitClient、jtFs    this.submitClient = submitClient;    this.jtFs = submitFs;  }
        非常easy,依据入參赋值成员变量submitClient、jtFs而已。

        关键的来了,我们看下JobSubmitter唯一的对外核心功能方法submitJobInternal(),它被用于提交作业至集群,代码例如以下:

/**   * Internal method for submitting jobs to the system.   *    * 

The job submission process involves: *

    *
  1. * Checking the input and output specifications of the job. *
  2. *
  3. * Computing the {@link InputSplit}s for the job. *
  4. *
  5. * Setup the requisite accounting information for the * {@link DistributedCache} of the job, if necessary. *
  6. *
  7. * Copying the job's jar and configuration to the map-reduce system * directory on the distributed file-system. *
  8. *
  9. * Submitting the job to the JobTracker and optionally * monitoring it's status. *
  10. *

* @param job the configuration to submit * @param cluster the handle to the Cluster * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ JobStatus submitJobInternal(Job job, Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException { //validate the jobs output specs // 调用checkSpecs()方法,校验作业输出路径是否配置,且是否已存在, // 正确的情况应该是已配置且未存在,输出路径配置參数为mapreduce.output.fileoutputformat.outputdir, // 之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output checkSpecs(job); // 从作业job中获取配置信息conf Configuration conf = job.getConfiguration(); // 调用addMRFrameworkToDistributedCache()方法加入应用框架路径到分布式缓存中 addMRFrameworkToDistributedCache(conf); // 通过JobSubmissionFiles的getStagingDir()静态方法获取作业运行时阶段区域路径jobStagingArea // 取參数yarn.app.mapreduce.am.staging-dir。參数未配置默觉得/tmp/hadoop-yarn/staging // 然后后面是/提交作业username/.staging // 通过之前的WordCount任务的运行,我们查看历史记录,得知參数yarn.app.mapreduce.am.staging-dir配置的为/user。 // 而提交作业username为hdfs,所以完整的路径应该为/user/hdfs/.staging Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs // 获取当前本机地址 InetAddress ip = InetAddress.getLocalHost(); // 确定提交作业的主机地址、主机名,并设置入配置信息conf,相应參数分别为 // mapreduce.job.submithostname // mapreduce.job.submithostaddress if (ip != null) { submitHostAddress = ip.getHostAddress(); submitHostName = ip.getHostName(); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } // 生成作业ID。即JobID实例jobId JobID jobId = submitClient.getNewJobID(); // 将jobId设置入job job.setJobID(jobId); // 构造提交作业路径Path实例submitJobDir,jobStagingArea后接/jobId。比方/job_1459913635503_0005 // 之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); JobStatus status = null; // 设置作业一些參数: try { // 设置mapreduce.job.user.name为当前用户。之前的WordCount演示样例配置的为hdfs用户 conf.set(MRJobConfig.USER_NAME, UserGroupInformation.getCurrentUser().getShortUserName()); // 设置hadoop.http.filter.initializers为AmFilterInitializer conf.set("hadoop.http.filter.initializers", "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); // 设置mapreduce.job.dir为submitJobDir,比方/user/hdfs/.staging/job_1459913635503_0005 conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir // 获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法 TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); // 获取密钥和令牌。并将它们存储到令牌缓存TokenCache中 populateTokenCache(conf, job.getCredentials()); // generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { KeyGenerator keyGen; try { int keyLen = CryptoUtils.isShuffleEncrypted(conf) ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS) : SHUFFLE_KEY_LENGTH; keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(keyLen); } catch (NoSuchAlgorithmException e) { throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey = keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } // 复制而且配置相关文件 copyAndConfigureFiles(job, submitJobDir); // 获取配置文件路径:job.xml Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); // 调用writeSplits()方法,写分片数据文件job.split和分片元数据文件job.splitmetainfo, // 并获得计算得到的map任务数目maps int maps = writeSplits(job, submitJobDir); // 配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps conf.setInt(MRJobConfig.NUM_MAPS, maps); LOG.info("number of splits:" + maps); // write "queue admins of the queue to which job is being submitted" // to job file. // 获取作业队列名queue,取參数mapreduce.job.queuename,參数未配置默觉得default, // 之前的WordCount任务演示样例中,作业队列名queue就为default String queue = conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); // 获取队列的訪问权限控制列表AccessControlList实例acl,通过client通信协议ClientProtocol实例submitClient的getQueueAdmins()方法。传入队列名queue。 // 实际上之前的WordCount任务演示样例中,这里获取的是* AccessControlList acl = submitClient.getQueueAdmins(queue); // 配置信息中设置队列參数mapred.queue.default.acl-administer-jobs // 之前的WordCount任务演示样例中。该參数被设置成为* conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); // removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. // 清空缓存的令牌 TokenCache.cleanUpTokenReferral(conf); // 依据參数确定是否须要追踪令牌ID // 取參数mapreduce.job.token.tracking.ids.enabled,參数未配置默觉得false if (conf.getBoolean( MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { // 通过job获取令牌ID。并存储到trackingIds列表中 // Add HDFS tracking ids ArrayList
trackingIds = new ArrayList
(); for (Token
t : job.getCredentials().getAllTokens()) { trackingIds.add(t.decodeIdentifier().getTrackingId()); } // 将trackingIds列表中的内容设置到參数mapreduce.job.token.tracking.ids中 conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); } // Set reservation info if it exists // 如有必要,设置存在的预订信息 // 參数为mapreduce.job.reservation.id ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString()); } // Write job file to submit dir // 调用writeConf()方法,写入作业配置信息至文件job.xml writeConf(conf, submitJobFile); // // Now, actually submit the job (using the submit name) // 调用printTokens()方法打印令牌信息到Log文件 printTokens(jobId, job.getCredentials()); // 通过client通信协议ClientProtocol实例submitClient的submitJob()方法提交作业, // 并获取作业状态JobStatus实例status // 由集群连接一文的分析我们能够知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象, // 终于调用的是二者的submitJob()方法。我们留待以后分析 status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); // 假设作业状态JobStatus实例status不为null。直接返回,否则抛出无法载入作业的IO异常 if (status != null) { return status; } else { throw new IOException("Could not launch job"); } } finally { // 终于。抛出无法载入作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法。 // 删除作业提交的相关文件夹或文件submitJobDir if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete(submitJobDir, true); } } }
        submitJobInternal()方法篇幅比較长,逻辑也非常复杂,本文先介绍下它的大体逻辑。兴许分文会介绍各个环节的具体内容,且以下涉及到的之前WordCount作业演示样例在 及其姊妹篇中。敬请注意!submitJobInternal()方法大体逻辑例如以下:

        1、调用checkSpecs()方法,校验作业输出路径是否配置。且是否已存在:

              正确的情况应该是已配置且未存在。输出路径配置參数为mapreduce.output.fileoutputformat.outputdir,之前WordCount作业的输出路径配置为hdfs://nameservice1/output/output。

        2、从作业job中获取配置信息conf;

        3、调用addMRFrameworkToDistributedCache()方法加入应用框架路径到分布式缓存中;

        4、通过JobSubmissionFiles的getStagingDir()静态方法获取作业运行时阶段区域路径jobStagingArea:

              取參数yarn.app.mapreduce.am.staging-dir,參数未配置默觉得/tmp/hadoop-yarn/staging,然后后面是/提交作业username/.staging,通过之前的WordCount任务的运行,我们查看历史记录,得知參数yarn.app.mapreduce.am.staging-dir配置的为/user,而提交作业username为hdfs,所以完整的路径应该为/user/hdfs/.staging;

        5、获取当前本机地址ip。

        6、确定提交作业的主机地址、主机名。并设置入配置信息conf,相应參数分别为mapreduce.job.submithostname、mapreduce.job.submithostaddress;

        7、生成作业ID。即JobID实例jobId:

              通过client通信协议ClientProtocol实例submitClient的getNewJobID()方法生成作业ID。即JobID实例jobId;

        8、 将jobId设置入job;

        9、构造提交作业路径Path实例submitJobDir:

               jobStagingArea后接/jobId。比方/job_1459913635503_0005,之前WordCount作业的完整路径为/user/hdfs/.staging/job_1459913635503_0005;

        10、设置作业一些參数:

                 10.1、设置mapreduce.job.user.name为当前用户。之前的WordCount演示样例配置的为hdfs用户。

                 10.2、设置hadoop.http.filter.initializers为AmFilterInitializer;

                 10.3、设置mapreduce.job.dir为submitJobDir,比方/user/hdfs/.staging/job_1459913635503_0005。

        11、获取路径的授权令牌:调用TokenCache的obtainTokensForNamenodes()静态方法;

        12、通过populateTokenCache()方法获取密钥和令牌。并将它们存储到令牌缓存TokenCache中;

        14、复制而且配置相关文件:通过copyAndConfigureFiles()方法实现;

        15、获取配置文件路径:job.xml;

        16、调用writeSplits()方法。写分片数据文件job.split和分片元数据文件job.splitmetainfo,并获得计算得到的map任务数目maps;

        17、配置信息中设置map任务数目mapreduce.job.maps为上面得到的maps。

        18、获取作业队列名queue。取參数mapreduce.job.queuename。參数未配置默觉得default。之前的WordCount任务演示样例中。作业队列名queue就为default;

        19、获取队列的訪问权限控制列表AccessControlList实例acl:

                通过client通信协议ClientProtocol实例submitClient的getQueueAdmins()方法,传入队列名queue。实际上之前的WordCount任务演示样例中,这里获取的是*。

        20、配置信息中设置队列參数mapred.queue.default.acl-administer-jobs,之前的WordCount任务演示样例中,该參数被设置成为*。

        21、清空缓存的令牌:通过TokenCache的cleanUpTokenReferral()方法实现;

        22、依据參数确定是否须要追踪令牌ID,假设须要的话:

                取參数mapreduce.job.token.tracking.ids.enabled,參数未配置默觉得false。通过job获取令牌ID,并存储到trackingIds列表中。将trackingIds列表中的内容设置到參数mapreduce.job.token.tracking.ids中;

        23、如有必要。设置存在的预订信息:參数为mapreduce.job.reservation.id;

        24、调用writeConf()方法,写入作业配置信息至文件job.xml;

        25、调用printTokens()方法打印令牌信息到Log文件;

        26、通过client通信协议ClientProtocol实例submitClient的submitJob()方法提交作业。并获取作业状态JobStatus实例status:

                由集群连接一文的分析我们能够知道,这个submitClient实际上是YARNRunner或LocalJobRunner对象。终于调用的是二者的submitJob()方法。我们留待以后分析。

        27、假设作业状态JobStatus实例status不为null,直接返回,否则抛出无法载入作业的IO异常:

                终于,抛出无法载入作业的IO异常前,调用文件系统FileSystem实例jtFs的delete()方法,删除作业提交的相关文件夹或文件submitJobDir。

        总体流程如上,对于关键步骤的主要细节。限于篇幅。敬请关注《MapReduce源代码分析之JobSubmitter(二)》!

转载地址:http://ilctx.baihongyu.com/

你可能感兴趣的文章
symantec sep 11卸载工具
查看>>
ASP.NET中常用输出JS脚本的类(改进版)
查看>>
Windows数据类型探幽——千回百转你是谁?(2)
查看>>
JavaScript 开发人员需要知道的简写技巧
查看>>
VS2010工具箱变灰解决方案
查看>>
漏洞门 国产x86处理器纷纷表态不受影响
查看>>
Postfix 电子邮件系统精要
查看>>
Android GIS开发系列-- 入门季(14)FeatureLayer之范围查询
查看>>
潮汕冬至吃甜丸
查看>>
ftrace的使用【转】
查看>>
float数据类型研究,发现其能显示的有效数字极为有限
查看>>
内核工具 – Sparse 简介
查看>>
Unity3D默认的快捷键
查看>>
JQuery 选择器
查看>>
重磅译制 | 更新:MIT 6.S094自动驾驶课程第1讲(3)深度学习模型应用
查看>>
Alpha混合
查看>>
大话搜索 之 最难的搜索问题
查看>>
非唯一聚集索引上的唯一和非唯一非聚集索引
查看>>
Linux中断处理驱动程序编写【转】
查看>>
几款 .Net Reflector 的替代品
查看>>