1 package com.mengyao.hadoop.mapreduce; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.conf.Configured; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19 20 /** 21 * NLineInputFormat前面的N表示每个Mapper收到输入的行数,N的默认输入行数是1。mapreduce.input.lineinputformat.linespermap属性实现N值的设定。如果希望使Mapper收到固定行数的输入可以使用该类实现。 22 * 与TextInputFormat相同,key是文件中行的字节偏移量,value是行本身。 23 * 通常情况下,对少量的输入行执行map任务是比较低效的(任务初始化的额外开销导致)。 24 * 25 * 使用NLineInputFormat设置Mapper任务每次输入处理4行,此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下: 26 * 第1章 PostgresQL服务器简介 27 * 1.1 为什么在服务器中进行程序设计 28 * 1.2 关于本书的代码示例 29 * 1.3 超越简单函数 30 * 1.4 使用触发器管理相关数据 31 * 1.5 审核更改 32 * 1.6 数据清洗 33 * 1.7 定制排序方法 34 * 1.8 程序设计最佳实践 35 * 1.8.1 KISS——尽量简单(keep it simple stupid) 36 * 1.8.2 DRY——不要写重复的代码(don't repeat yourself) 37 * 1.8.3 YAGNI——你并不需要它(you ain'tgonnaneedit) 38 * 1.8.4 SOA——服务导向架构(service-oriented architecture) 39 * 1.8.5 类型的扩展 40 * 1.9 关于缓存 41 * 1.10 总结——为什么在服务器中进行程序设计 42 * 1.10.1 性能 43 * 1.10.2 易于维护 44 * 1.10.3 保证安全的简单方法 45 * 1.11 小结 46 * 第2章 服务器程序设计环境 47 * 2.1 购置成本 48 * 2.2 开发者的可用性 49 * 2.3 许可证书 50 * 2.4 可预测性 51 * 2.5 社区 52 * 2.6 过程化语言 53 * 2.6.1 平台兼容性 54 * 2.6.2 应用程序设计 55 * 2.6.3 更多基础 56 * 2.7 小结 57 * 第3章 第一个PL/pgsQL函数 58 * 3.1 为什么是PL/pgSQL 59 * 3.2 PL/pgSQL函数的结构 60 * ... 61 * 62 * 输出到HDFS目录下的文件/mapreduces/nlineinputformat/part-r-00000,内容如下: 63 * 第1章 PostgresQL服务器简介 64 * 第2章 服务器程序设计环境 65 * 第3章 第一个PL/pgsQL函数 66 * 67 * @author mengyao 68 * 69 */ 70 public class NLineInputFormatApp extends Configured implements Tool { 71 72 static class NLineInputFormatMapper extends Mapper{ 73 74 private Text outputValue; 75 76 @Override 77 protected void setup(Context context) 78 throws IOException, InterruptedException { 79 this.outputValue = new Text(); 80 } 81 82 @Override 83 protected void map(LongWritable key, Text value, Context context) 84 throws IOException, InterruptedException { 85 final String line = value.toString(); 86 //如果行第一个字是“第”则认为是一级索引 87 if (line.startsWith("第")) { 88 outputValue.set(line); 89 context.write(key, this.outputValue); 90 } 91 } 92 } 93 94 static class NLineInputFormatReducer extends Reducer { 95 96 private Text outputKey; 97 private NullWritable outputValue; 98 99 @Override100 protected void setup(Context context)101 throws IOException, InterruptedException {102 this.outputKey = new Text();103 this.outputValue = NullWritable.get();104 }105 106 @Override107 protected void reduce(LongWritable key, Iterable value, Context context)108 throws IOException, InterruptedException {109 outputKey.set(value.iterator().next());110 context.write(this.outputKey, outputValue);111 }112 }113 114 @Override115 public int run(String[] args) throws Exception {116 Job job = Job.getInstance(getConf(), NLineInputFormatApp.class.getSimpleName());117 job.setJarByClass(NLineInputFormatApp.class);118 119 NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[0]));120 job.setInputFormatClass(NLineInputFormat.class);121 FileInputFormat.addInputPath(job, new Path(args[1]));122 FileOutputFormat.setOutputPath(job, new Path(args[2]));123 124 job.setMapperClass(NLineInputFormatMapper.class);125 job.setMapOutputKeyClass(LongWritable.class);126 job.setMapOutputValueClass(Text.class);127 128 job.setReducerClass(NLineInputFormatReducer.class);129 job.setOutputKeyClass(Text.class);130 job.setOutputValueClass(NullWritable.class);131 132 return job.waitForCompletion(true)?0:1;133 }134 135 public static int createJob(String[] args) {136 Configuration conf = new Configuration();137 conf.set("dfs.datanode.socket.write.timeout", "7200000");138 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");139 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");140 int status = 0;141 142 try {143 status = ToolRunner.run(conf, new NLineInputFormatApp(), args);144 } catch (Exception e) {145 e.printStackTrace();146 }147 148 return status;149 }150 151 public static void main(String[] args) {152 args = new String[]{"4", "/mapreduces/bookOutline.txt", "/mapreduces/nlineinputformat"};153 if (args.length!=3) {154 System.out.println("Usage: "+NLineInputFormatApp.class.getName()+" Input paramters ");155 } else {156 int status = createJob(args);157 System.exit(status);158 }159 }160 161 }