现在的位置: 首页 > 黄专家专栏 > 正文

作业的提交和监控(一)

2014年11月03日 黄专家专栏 ⁄ 共 4672字 ⁄ 字号 评论关闭

整体流程

秒速赛车公式 www.l19l7.cn 简单的代码就可以运行一个作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
JobClient.runJob(job);

runJob 做的最主要两个过程是提交监控。 提交在函数 submitJobInternal 中执行,监控在函数 monitorAndPrintJob 中执行。我们看看 submitJobInternal 里面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
  public RunningJob run() throws FileNotFoundException, 
  ClassNotFoundException,
  InterruptedException,
  IOException {
    JobConf jobCopy = job;

    // 指定作业文件的上传路径
    // 默认在 mapreduce.jobtracker.staging.root.dir 中指定
    // 比如 mapreduce.jobtracker.staging.root.dir = /user
    // 后面加上用户名作为文件夹名字
    // 一般是这样 hdfs://host:port/user/${user}/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
        jobCopy);

    // 得到一个 jobID
    // jobID 是根据日期和时间生成的
    // 假设是 job_201410312221_0001
    JobID jobId = jobSubmitClient.getNewJobId();

    // 得到目录的路径
    // 可能是这样 hdfs://host:port/user/${user}/.staging/job_201410312221_0001
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    // 设置配置文件,将 mapreduce.job.dir 设置为作业文件上传目录
    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
    JobStatus status = null;
    try {
      // 得到 token
      populateTokenCache(jobCopy, jobCopy.getCredentials());

      // copy 文件到上传目录中
      // 支持以下这几种格式
      // -files -libjars -archives
      // 在如下的三种目录里:
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/files
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/archives
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/libjars
      copyAndConfigureFiles(jobCopy, submitJobDir);

      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                          new Path [] {submitJobDir},
                                          jobCopy);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          // 得到 reduces 的个数
      int reduces = jobCopy.getNumReduceTasks();

      // 设置本机 ip 地址
      InetAddress ip = InetAddress.getLocalHost();
      if (ip != null) {
        job.setJobSubmitHostAddress(ip.getHostAddress());
        job.setJobSubmitHostName(ip.getHostName());
      }
      JobContext context = new JobContext(jobCopy, jobId);

      jobCopy = (JobConf)context.getConfiguration();

      // Check the output specification
      // 检查指定的输出目录
      // 如果输出文件夹存在, 那么会抛出异常
      // checkOutputSpecs 在 FileOutputFormat.java 中
      if (reduces == 0 ? jobCopy.getUseNewMapper() : 
        jobCopy.getUseNewReducer()) {
          // 如果 readuce 数目是 0,但是 mapper 的数目不为 0
          // 得到指定的输入类型
          // 默认是 TextOutputFormat
        org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
          ReflectionUtils.newInstance(context.getOutputFormatClass(),
              jobCopy);
        output.checkOutputSpecs(context);
      } else {
        jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
      }

      // Create the splits for the job
      // 计算 job 的输入文件的分片
      FileSystem fs = submitJobDir.getFileSystem(jobCopy);
      LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
      // 设置 map
      int maps = writeSplits(context, submitJobDir);
      jobCopy.setNumMapTasks(maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 得到 queue 的名字
      String queue = jobCopy.getQueueName();
      // 然后根据这个 queue 名字获得访问控制列表
      AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
      jobCopy.set(QueueManager.toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

      // Write job file to JobTracker's fs  
      // 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
      FSDataOutputStream out = 
        FileSystem.create(fs, submitJobFile,
            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(jobCopy);

      try {
        jobCopy.writeXml(out);
      } finally {
        out.close();
      }
      //
      // Now, actually submit the job (using the submit name)
      //
      // 提交 job
      // 如果不是 local 模式,那么这个 submitJob 是一个 rpc 调用
      // 调用的是远程机子上的 JobTracker.submitJob
      // 如果是 local 模式, 就是调用 LocalJobRunner.submitJob
      printTokens(jobId, jobCopy.getCredentials());
      status = jobSubmitClient.submitJob(
          jobId, submitJobDir.toString(), jobCopy.getCredentials());
      if (status != null) {
        return new NetworkedJob(status);
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (fs != null && submitJobDir != null)
          fs.delete(submitJobDir, true);
      }
    }
  }
}

主要的流程如下:

  1. 找到 hdfs 上的 Staging 路径
  2. 向 JobTracker 申请一个 job id
  3. 根据 job id 在 hdfs 上生成上传文件的目录
  4. copy 指定文件到上传的目录中去
  5. 得到 reduce 个数,设置本机 ip
  6. 检查输出目录是否存在, 如果不存在,则抛出异常。这其实有一点不太合理的地方,因为是先上传作业文件,再判断是否输入目录存在。如果正常运行完作业,上传的文件是能被清理的,但是,如果输出文件异常,那么这些上传的文件就得不到清理?
  7. 计算 job 的输入文件的分片, 根据这个设置 map 的数量
  8. 得到 queue name 和相关授权
  9. 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
  10. 提交 job

抱歉!评论已关闭.

  • 第十六届中国经济论坛 2019-06-26
  • 拜博口腔医疗集团创始人、董事长黎昌仁获第十二届人民企业社会责任奖年度人物奖 2019-06-26
  • 县名解析晋城高平市地名来历 2019-06-25
  • “网络党课”第二课 杨禹《为美好生活而奋斗》 2019-06-25
  • 自然规律是不可改变的,社会规律是可以改变的。这是自然科学与社会科学的区别之一。 2019-06-25
  • 香港有祖国全面支持<br>港人对未来满怀憧憬 2019-06-25
  • 中央第四环保督察组向江西移交1034件信访问题线索 2019-06-24
  • 第十二届中国(南宁)国际园林博览会吉祥物正式发布 2019-06-24
  • C级总销量迫近A4L 宝马3系乏力 2019-06-24
  • 包车司机借口“学炒股”敲开门 抢钱后杀人抛尸 2019-06-23
  • 临汾“尧王杯”马拉松赛激情开跑 2019-06-23
  • 我们包住内力,在不断变化中寻找契机可出击可借力亦可卸力。 2019-06-23
  • “ONE NIGHT 给小孩”北京站探访周迅刘雯共奏可爱“交响曲” 2019-06-22
  • 爱护民生:什么基金都不能买,即使获利,也不会给分多少红利,只是意思意思。 2019-06-22
  • 三颗迄今最年轻行星现形 2019-06-22
  • 论坛高级特码心水 广东26选5开奖公告 香港赛马会黄大仙特码 福彩中心开机号今天 三肖中特网337888 黑龙江36选7开奖电视台 河南快赢481走势 河南11选5走势图 燕赵风采排列五走势图 体彩20选5开奖结果彩票2元网 北京快乐赛车pk拾 山西快乐10分开奖号码 重庆快乐十分前三遗漏数据 青海十一选五走势图表 秒速时时彩5球技巧玩法