现在的位置: 首页 > 黄专家专栏 > 正文

作业的提交和监控(一)

2014年11月03日 黄专家专栏 ⁄ 共 4672字 ⁄ 字号 评论关闭

整体流程

秒速赛车公式 www.l19l7.cn 简单的代码就可以运行一个作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
JobClient.runJob(job);

runJob 做的最主要两个过程是提交监控。 提交在函数 submitJobInternal 中执行,监控在函数 monitorAndPrintJob 中执行。我们看看 submitJobInternal 里面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
  public RunningJob run() throws FileNotFoundException, 
  ClassNotFoundException,
  InterruptedException,
  IOException {
    JobConf jobCopy = job;

    // 指定作业文件的上传路径
    // 默认在 mapreduce.jobtracker.staging.root.dir 中指定
    // 比如 mapreduce.jobtracker.staging.root.dir = /user
    // 后面加上用户名作为文件夹名字
    // 一般是这样 hdfs://host:port/user/${user}/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
        jobCopy);

    // 得到一个 jobID
    // jobID 是根据日期和时间生成的
    // 假设是 job_201410312221_0001
    JobID jobId = jobSubmitClient.getNewJobId();

    // 得到目录的路径
    // 可能是这样 hdfs://host:port/user/${user}/.staging/job_201410312221_0001
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    // 设置配置文件,将 mapreduce.job.dir 设置为作业文件上传目录
    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
    JobStatus status = null;
    try {
      // 得到 token
      populateTokenCache(jobCopy, jobCopy.getCredentials());

      // copy 文件到上传目录中
      // 支持以下这几种格式
      // -files -libjars -archives
      // 在如下的三种目录里:
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/files
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/archives
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/libjars
      copyAndConfigureFiles(jobCopy, submitJobDir);

      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                          new Path [] {submitJobDir},
                                          jobCopy);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          // 得到 reduces 的个数
      int reduces = jobCopy.getNumReduceTasks();

      // 设置本机 ip 地址
      InetAddress ip = InetAddress.getLocalHost();
      if (ip != null) {
        job.setJobSubmitHostAddress(ip.getHostAddress());
        job.setJobSubmitHostName(ip.getHostName());
      }
      JobContext context = new JobContext(jobCopy, jobId);

      jobCopy = (JobConf)context.getConfiguration();

      // Check the output specification
      // 检查指定的输出目录
      // 如果输出文件夹存在, 那么会抛出异常
      // checkOutputSpecs 在 FileOutputFormat.java 中
      if (reduces == 0 ? jobCopy.getUseNewMapper() : 
        jobCopy.getUseNewReducer()) {
          // 如果 readuce 数目是 0,但是 mapper 的数目不为 0
          // 得到指定的输入类型
          // 默认是 TextOutputFormat
        org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
          ReflectionUtils.newInstance(context.getOutputFormatClass(),
              jobCopy);
        output.checkOutputSpecs(context);
      } else {
        jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
      }

      // Create the splits for the job
      // 计算 job 的输入文件的分片
      FileSystem fs = submitJobDir.getFileSystem(jobCopy);
      LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
      // 设置 map
      int maps = writeSplits(context, submitJobDir);
      jobCopy.setNumMapTasks(maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 得到 queue 的名字
      String queue = jobCopy.getQueueName();
      // 然后根据这个 queue 名字获得访问控制列表
      AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
      jobCopy.set(QueueManager.toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

      // Write job file to JobTracker's fs  
      // 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
      FSDataOutputStream out = 
        FileSystem.create(fs, submitJobFile,
            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(jobCopy);

      try {
        jobCopy.writeXml(out);
      } finally {
        out.close();
      }
      //
      // Now, actually submit the job (using the submit name)
      //
      // 提交 job
      // 如果不是 local 模式,那么这个 submitJob 是一个 rpc 调用
      // 调用的是远程机子上的 JobTracker.submitJob
      // 如果是 local 模式, 就是调用 LocalJobRunner.submitJob
      printTokens(jobId, jobCopy.getCredentials());
      status = jobSubmitClient.submitJob(
          jobId, submitJobDir.toString(), jobCopy.getCredentials());
      if (status != null) {
        return new NetworkedJob(status);
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (fs != null && submitJobDir != null)
          fs.delete(submitJobDir, true);
      }
    }
  }
}

主要的流程如下:

  1. 找到 hdfs 上的 Staging 路径
  2. 向 JobTracker 申请一个 job id
  3. 根据 job id 在 hdfs 上生成上传文件的目录
  4. copy 指定文件到上传的目录中去
  5. 得到 reduce 个数,设置本机 ip
  6. 检查输出目录是否存在, 如果不存在,则抛出异常。这其实有一点不太合理的地方,因为是先上传作业文件,再判断是否输入目录存在。如果正常运行完作业,上传的文件是能被清理的,但是,如果输出文件异常,那么这些上传的文件就得不到清理?
  7. 计算 job 的输入文件的分片, 根据这个设置 map 的数量
  8. 得到 queue name 和相关授权
  9. 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
  10. 提交 job

抱歉!评论已关闭.

  • 倒着走能治腰颈椎痛?假的! 2019-04-19
  • 长效机制加速推进 楼市下半年或持续降温 2019-04-19
  • 树立文化自信 创新节庆模式 2019-04-19
  • 朝韩将军级会谈时隔11年后在板门店重启 2019-04-19
  • 经济日报多媒体数字报刊 2019-04-18
  • 搞好公有制就是好,故得出结论:计划经济好。 2019-04-18
  • 云南理发店老板涉嫌杀害女演员因办卡纠纷起杀心 2019-04-18
  • 南海网-海南新闻网-权威媒体 海南门户 2019-04-17
  • 海底捞回应侵犯音乐人林海著作权:已停止播放 2019-04-17
  • 自然型社会和规则性社会,是会随着科技的改变而发生改变的,当然只有规矩也就是制度才能规范人的行为,所以国家是不会灭亡的,但国家的形式是会发生改变的。 2019-04-17
  • 惊艳!上外学子英译60首热门中文歌  让世界倾听中国 2019-04-16
  • 西安,给盲人朋友留一条路吧…无障碍设施盲道-编辑整合 2019-04-16
  • 的确如此。报刊亭取消的确是短视行为。把报刊亭设计的现代化一些,与城市绿化衔接起来,相得益彰,成为文化一景多好。 2019-04-16
  • 让更多企业和劳动者尝到协商的“甜头” 2019-04-16
  • 2014金家岭财富论坛嘉宾云集(二) 2019-04-15
  • 897| 101| 886| 515| 238| 901| 663| 588| 964| 968|