现在的位置: 首页 > 搜索技术 > 黄专家专栏 > 正文

Hadoop I/O 上 SequenceFile 类在 Pipes 上的应用

2014年10月30日 搜索技术, 黄专家专栏 ⁄ 共 3830字 ⁄ 字号 评论关闭

秒速赛车公式 www.l19l7.cn 有的时候,我们在 hadoop 上的输入可能不是一些基于行的文本,是希望自定义一些结构化的数据。这种情况,一般会选用工具将结构化的数据序列化成字节流,存储在磁盘上。然后在 maper 中读取进来,反序列化即可得到原来的数据。

我们使用 google protobuf 作为这种结构化的信息传递的工具。

首先可以先定义 person.proto 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
option java_package = "com.hackerlight.proto";
option java_outer_classname = "ProtobufMessage";

message Person {
  required string name = 1;
  required int32 id = 2;
  optional string email = 3;

  enum PhoneType {
    MOBILE = 0;
    HOME = 1;
    WORK = 2;
  }

  message PhoneNumber {
    required string number = 1;
    optional PhoneType type = 2 [default = HOME];
  }

  repeated PhoneNumber phone = 4;

  message CountryInfo {
          required string name = 1;
          required string code = 2;
          optional int32 number = 3;
  }
}

生成 protobuf 的 java 代码

1
protoc --java_out=./ test.proto

java 代码会生成在 当前目录下的 com/hackerlight/proto 下.

生成 SequenceFile 结构的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.hackerlight.writer;

import java.io.FileOutputStream;
import java.net.URI;

import com.hackerlight.proto.ProtobufMessage;
import com.hackerlight.proto.ProtobufMessage.Person;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;

public class Writer {
public static void main(String[] args) throws Exception {
  String uri = args[0];

  org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
  FileSystem fs = FileSystem.get(URI.create(uri), conf);
  Path path = new Path(uri);

  Person.Builder builder = Person.newBuilder();
  builder.setEmail("[email protected]");
  builder.setId(1024);
  builder.setName("Moon");

  Person.PhoneNumber.Builder p = Person.PhoneNumber.newBuilder();
  p.setNumber("18900000000");
  builder.addPhone(p);

  byte[] serialize_string = builder.build().toByteArray();
  IntWritable key = new IntWritable();
  BytesWritable value = new BytesWritable();
  SequenceFile.Writer write = null;
  try {
    write = SequenceFile.createWriter(fs, conf, path, key.getClass(), value.getClass());
    for (int i = 0; i < 100; ++i) {
      key.set(i);
      value.set(serialize_string, 0, serialize_string.length);
      write.append(key, value);
    }
  } finally {
    IOUtils.closeStream(write);
  }
}
}

可以写一个读取的代码反序列化写入的结构数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.hackerlight.reader;

import java.io.FileOutputStream;
import java.net.URI;

import com.hackerlight.proto.ProtobufMessage;
import com.hackerlight.proto.ProtobufMessage.Person;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class Reader {
public static void main(String[] args) throws Exception {
  String uri = args[0];
  org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
  FileSystem fs = FileSystem.get(URI.create(uri), conf);
  Path path = new Path(uri);
  SequenceFile.Reader reader = null;
  try {
    reader = new SequenceFile.Reader(fs, path, conf);
    Writable key = (Writable)ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    BytesWritable value = (BytesWritable)ReflectionUtils.newInstance(reader.getValueClass(), conf);
    while (reader.next(key, value)) {
      System.out.println("key : " + key);
      Person new_person = Person.PARSER.parseFrom(value.getBytes(), 0, value.getLength());
      System.out.println("name  : " + new_person.getName());
      System.out.println("email : " + new_person.getEmail());
    }
  } finally {
    IOUtils.closeStream(reader);
  }
}
}

在 Pipes 上可以这样读取

1
2
3
4
5
6
7
8
9
10
class MyMap: public HadoopPipes::Mapper {
 public:
   WordCountMap(HadoopPipes::TaskContext& context) {}
   void map(HadoopPipes::MapContext& context) {
     const std::string& key = context.getInputKey();
     const std::string& value = context.getInputValue();
     Person person;
     assert(person.ParseFromString(value));
   }
};

执行 hadoop 命令的时候一定要记得加上参数 -inputformat org.apache.hadoop.mapred.SequenceFileInputFormat

抱歉!评论已关闭.

  • 第十六届中国经济论坛 2019-06-26
  • 拜博口腔医疗集团创始人、董事长黎昌仁获第十二届人民企业社会责任奖年度人物奖 2019-06-26
  • 县名解析晋城高平市地名来历 2019-06-25
  • “网络党课”第二课 杨禹《为美好生活而奋斗》 2019-06-25
  • 自然规律是不可改变的,社会规律是可以改变的。这是自然科学与社会科学的区别之一。 2019-06-25
  • 香港有祖国全面支持<br>港人对未来满怀憧憬 2019-06-25
  • 中央第四环保督察组向江西移交1034件信访问题线索 2019-06-24
  • 第十二届中国(南宁)国际园林博览会吉祥物正式发布 2019-06-24
  • C级总销量迫近A4L 宝马3系乏力 2019-06-24
  • 包车司机借口“学炒股”敲开门 抢钱后杀人抛尸 2019-06-23
  • 临汾“尧王杯”马拉松赛激情开跑 2019-06-23
  • 我们包住内力,在不断变化中寻找契机可出击可借力亦可卸力。 2019-06-23
  • “ONE NIGHT 给小孩”北京站探访周迅刘雯共奏可爱“交响曲” 2019-06-22
  • 爱护民生:什么基金都不能买,即使获利,也不会给分多少红利,只是意思意思。 2019-06-22
  • 三颗迄今最年轻行星现形 2019-06-22
  • 山东11选5软件破解 天下精英一波中特 辛运28 福建十三水游戏下载 北京pk计划软件手机版 七乐彩30期开奖结果 双色球基本走势图体彩 欢乐斗地主欢乐豆不够 山东群英会图表走势图 5张牌梭哈看牌器 吉林十一选五手机版走势图连线 香港内部透码保正板彩票 足彩半全场推荐 体彩甘肃十一选五开奖结果 北京赛车pk10连续出双