现在的位置: 首页 > 黄专家专栏 > 正文

作业的提交和监控(一)

2014年11月03日 黄专家专栏 ⁄ 共 4672字 ⁄ 字号 评论关闭

整体流程

秒速赛车公式 www.l19l7.cn 简单的代码就可以运行一个作业

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Create a new JobConf
JobConf job = new JobConf(new Configuration(), MyJob.class);

// Specify various job-specific parameters     
job.setJobName("myjob");

job.setInputPath(new Path("in"));
job.setOutputPath(new Path("out"));

job.setMapperClass(MyJob.MyMapper.class);
job.setReducerClass(MyJob.MyReducer.class);

// Submit the job, then poll for progress until the job is complete
JobClient.runJob(job);

runJob 做的最主要两个过程是提交监控。 提交在函数 submitJobInternal 中执行,监控在函数 monitorAndPrintJob 中执行。我们看看 submitJobInternal 里面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
return ugi.doAs(new PrivilegedExceptionAction<RunningJob>() {
  public RunningJob run() throws FileNotFoundException, 
  ClassNotFoundException,
  InterruptedException,
  IOException {
    JobConf jobCopy = job;

    // 指定作业文件的上传路径
    // 默认在 mapreduce.jobtracker.staging.root.dir 中指定
    // 比如 mapreduce.jobtracker.staging.root.dir = /user
    // 后面加上用户名作为文件夹名字
    // 一般是这样 hdfs://host:port/user/${user}/.staging
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(JobClient.this,
        jobCopy);

    // 得到一个 jobID
    // jobID 是根据日期和时间生成的
    // 假设是 job_201410312221_0001
    JobID jobId = jobSubmitClient.getNewJobId();

    // 得到目录的路径
    // 可能是这样 hdfs://host:port/user/${user}/.staging/job_201410312221_0001
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());

    // 设置配置文件,将 mapreduce.job.dir 设置为作业文件上传目录
    jobCopy.set("mapreduce.job.dir", submitJobDir.toString());
    JobStatus status = null;
    try {
      // 得到 token
      populateTokenCache(jobCopy, jobCopy.getCredentials());

      // copy 文件到上传目录中
      // 支持以下这几种格式
      // -files -libjars -archives
      // 在如下的三种目录里:
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/files
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/archives
      // hdfs://host:port/user/${user}/.staging/job_201410312221_0001/libjars
      copyAndConfigureFiles(jobCopy, submitJobDir);

      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(jobCopy.getCredentials(),
                                          new Path [] {submitJobDir},
                                          jobCopy);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
          // 得到 reduces 的个数
      int reduces = jobCopy.getNumReduceTasks();

      // 设置本机 ip 地址
      InetAddress ip = InetAddress.getLocalHost();
      if (ip != null) {
        job.setJobSubmitHostAddress(ip.getHostAddress());
        job.setJobSubmitHostName(ip.getHostName());
      }
      JobContext context = new JobContext(jobCopy, jobId);

      jobCopy = (JobConf)context.getConfiguration();

      // Check the output specification
      // 检查指定的输出目录
      // 如果输出文件夹存在, 那么会抛出异常
      // checkOutputSpecs 在 FileOutputFormat.java 中
      if (reduces == 0 ? jobCopy.getUseNewMapper() : 
        jobCopy.getUseNewReducer()) {
          // 如果 readuce 数目是 0,但是 mapper 的数目不为 0
          // 得到指定的输入类型
          // 默认是 TextOutputFormat
        org.apache.hadoop.mapreduce.OutputFormat<?,?> output =
          ReflectionUtils.newInstance(context.getOutputFormatClass(),
              jobCopy);
        output.checkOutputSpecs(context);
      } else {
        jobCopy.getOutputFormat().checkOutputSpecs(fs, jobCopy);
      }

      // Create the splits for the job
      // 计算 job 的输入文件的分片
      FileSystem fs = submitJobDir.getFileSystem(jobCopy);
      LOG.debug("Creating splits at " + fs.makeQualified(submitJobDir));
      // 设置 map
      int maps = writeSplits(context, submitJobDir);
      jobCopy.setNumMapTasks(maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      // 得到 queue 的名字
      String queue = jobCopy.getQueueName();
      // 然后根据这个 queue 名字获得访问控制列表
      AccessControlList acl = jobSubmitClient.getQueueAdmins(queue);
      jobCopy.set(QueueManager.toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getACLString());

      // Write job file to JobTracker's fs  
      // 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
      FSDataOutputStream out = 
        FileSystem.create(fs, submitJobFile,
            new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(jobCopy);

      try {
        jobCopy.writeXml(out);
      } finally {
        out.close();
      }
      //
      // Now, actually submit the job (using the submit name)
      //
      // 提交 job
      // 如果不是 local 模式,那么这个 submitJob 是一个 rpc 调用
      // 调用的是远程机子上的 JobTracker.submitJob
      // 如果是 local 模式, 就是调用 LocalJobRunner.submitJob
      printTokens(jobId, jobCopy.getCredentials());
      status = jobSubmitClient.submitJob(
          jobId, submitJobDir.toString(), jobCopy.getCredentials());
      if (status != null) {
        return new NetworkedJob(status);
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (fs != null && submitJobDir != null)
          fs.delete(submitJobDir, true);
      }
    }
  }
}

主要的流程如下:

  1. 找到 hdfs 上的 Staging 路径
  2. 向 JobTracker 申请一个 job id
  3. 根据 job id 在 hdfs 上生成上传文件的目录
  4. copy 指定文件到上传的目录中去
  5. 得到 reduce 个数,设置本机 ip
  6. 检查输出目录是否存在, 如果不存在,则抛出异常。这其实有一点不太合理的地方,因为是先上传作业文件,再判断是否输入目录存在。如果正常运行完作业,上传的文件是能被清理的,但是,如果输出文件异常,那么这些上传的文件就得不到清理?
  7. 计算 job 的输入文件的分片, 根据这个设置 map 的数量
  8. 得到 queue name 和相关授权
  9. 将重新配置过的 JobConf 写入到 submitJobDir/job.xml 文件
  10. 提交 job

抱歉!评论已关闭.

  • 马上背!十九大报告中的四个“新” 2019-02-16
  • 蒲县工商质监局非公党委举办2018元旦文艺会 2019-02-16
  • 人民网评:建设数字中国时不我待 2019-02-16
  • 618史上最壕“买家”现身 Google以 5.5亿美元投资京东 2019-02-15
  • 雍正官窑:朕就是这样的品味(图) 2019-02-15
  • 西安司法考试将试点机考 2019-02-15
  • 人民日报新媒体矩阵聚焦十九大 融媒报道"给你好看" 2019-02-14
  • 社会主义是过渡阶段,最终实现共产主义才是其目的。社会主义是在消灭私有制,建立公有制直至无私,实现共产主义。 2019-02-14
  • 四轮电动车销售火爆存安全隐患 专家:需建国家标准 2019-02-14
  • 看懂汽车三元催化器工作原理后还能当金子卖?难为非洲兄弟了! 2019-02-14
  • 周杰伦昆凌为儿子庆生 小小周帅气入镜 2019-02-13
  • 都以为机器人普及了,一切都不是问题了?机器人不需要不断升级?机器人生产啥?不需要人设计? 2019-02-13
  • 价值-热门标签-华商生活 2019-02-13
  • 上合组织引领发展 吉中合作稳步前行——访吉尔吉斯斯坦总统热恩别科夫 2019-02-13
  • 互联网金融协会提示:防范变相“现金贷”业务风险 2019-02-12
  • 352| 192| 585| 537| 630| 733| 538| 582| 513| 40|