博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
MapReduce的NLineInputFormat使用
阅读量:5039 次
发布时间:2019-06-12

本文共 6146 字,大约阅读时间需要 20 分钟。

1 package com.mengyao.hadoop.mapreduce;  2   3 import java.io.IOException;  4   5 import org.apache.hadoop.conf.Configuration;  6 import org.apache.hadoop.conf.Configured;  7 import org.apache.hadoop.fs.Path;  8 import org.apache.hadoop.io.LongWritable;  9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 17 import org.apache.hadoop.util.Tool; 18 import org.apache.hadoop.util.ToolRunner; 19  20 /** 21  * NLineInputFormat前面的N表示每个Mapper收到输入的行数,N的默认输入行数是1。mapreduce.input.lineinputformat.linespermap属性实现N值的设定。如果希望使Mapper收到固定行数的输入可以使用该类实现。 22  * 与TextInputFormat相同,key是文件中行的字节偏移量,value是行本身。 23  * 通常情况下,对少量的输入行执行map任务是比较低效的(任务初始化的额外开销导致)。 24  * 25  * 使用NLineInputFormat设置Mapper任务每次输入处理4行,此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下: 26  *     第1章    PostgresQL服务器简介 27  *         1.1    为什么在服务器中进行程序设计 28  *         1.2    关于本书的代码示例 29  *         1.3    超越简单函数 30  *         1.4    使用触发器管理相关数据 31  *         1.5    审核更改 32  *         1.6    数据清洗 33  *         1.7    定制排序方法 34  *         1.8    程序设计最佳实践 35  *         1.8.1    KISS——尽量简单(keep it simple stupid) 36  *         1.8.2    DRY——不要写重复的代码(don't repeat yourself) 37  *         1.8.3    YAGNI——你并不需要它(you ain'tgonnaneedit) 38  *         1.8.4    SOA——服务导向架构(service-oriented architecture) 39  *         1.8.5    类型的扩展 40  *         1.9    关于缓存 41  *         1.10    总结——为什么在服务器中进行程序设计 42  *         1.10.1    性能 43  *         1.10.2    易于维护 44  *         1.10.3    保证安全的简单方法 45  *         1.11    小结 46  *     第2章    服务器程序设计环境 47  *         2.1    购置成本 48  *         2.2    开发者的可用性 49  *         2.3    许可证书 50  *         2.4    可预测性 51  *         2.5    社区 52  *         2.6    过程化语言 53  *         2.6.1    平台兼容性 54  *         2.6.2    应用程序设计 55  *         2.6.3    更多基础 56  *         2.7    小结 57  *     第3章    第一个PL/pgsQL函数 58  *         3.1    为什么是PL/pgSQL 59  *         3.2    PL/pgSQL函数的结构 60  *         ... 61  * 62  * 输出到HDFS目录下的文件/mapreduces/nlineinputformat/part-r-00000,内容如下: 63  *        第1章    PostgresQL服务器简介 64  *        第2章    服务器程序设计环境 65  *        第3章    第一个PL/pgsQL函数 66  * 67  * @author mengyao 68  * 69  */ 70 public class NLineInputFormatApp extends Configured implements Tool { 71  72     static class NLineInputFormatMapper extends Mapper
{ 73 74 private Text outputValue; 75 76 @Override 77 protected void setup(Context context) 78 throws IOException, InterruptedException { 79 this.outputValue = new Text(); 80 } 81 82 @Override 83 protected void map(LongWritable key, Text value, Context context) 84 throws IOException, InterruptedException { 85 final String line = value.toString(); 86 //如果行第一个字是“第”则认为是一级索引 87 if (line.startsWith("第")) { 88 outputValue.set(line); 89 context.write(key, this.outputValue); 90 } 91 } 92 } 93 94 static class NLineInputFormatReducer extends Reducer
{ 95 96 private Text outputKey; 97 private NullWritable outputValue; 98 99 @Override100 protected void setup(Context context)101 throws IOException, InterruptedException {102 this.outputKey = new Text();103 this.outputValue = NullWritable.get();104 }105 106 @Override107 protected void reduce(LongWritable key, Iterable
value, Context context)108 throws IOException, InterruptedException {109 outputKey.set(value.iterator().next());110 context.write(this.outputKey, outputValue);111 }112 }113 114 @Override115 public int run(String[] args) throws Exception {116 Job job = Job.getInstance(getConf(), NLineInputFormatApp.class.getSimpleName());117 job.setJarByClass(NLineInputFormatApp.class);118 119 NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[0]));120 job.setInputFormatClass(NLineInputFormat.class);121 FileInputFormat.addInputPath(job, new Path(args[1]));122 FileOutputFormat.setOutputPath(job, new Path(args[2]));123 124 job.setMapperClass(NLineInputFormatMapper.class);125 job.setMapOutputKeyClass(LongWritable.class);126 job.setMapOutputValueClass(Text.class);127 128 job.setReducerClass(NLineInputFormatReducer.class);129 job.setOutputKeyClass(Text.class);130 job.setOutputValueClass(NullWritable.class);131 132 return job.waitForCompletion(true)?0:1;133 }134 135 public static int createJob(String[] args) {136 Configuration conf = new Configuration();137 conf.set("dfs.datanode.socket.write.timeout", "7200000");138 conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");139 conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");140 int status = 0;141 142 try {143 status = ToolRunner.run(conf, new NLineInputFormatApp(), args);144 } catch (Exception e) {145 e.printStackTrace();146 }147 148 return status;149 }150 151 public static void main(String[] args) {152 args = new String[]{"4", "/mapreduces/bookOutline.txt", "/mapreduces/nlineinputformat"};153 if (args.length!=3) {154 System.out.println("Usage: "+NLineInputFormatApp.class.getName()+" Input paramters
");155 } else {156 int status = createJob(args);157 System.exit(status);158 }159 }160 161 }

 

转载于:https://www.cnblogs.com/mengyao/archive/2013/02/06/4865579.html

你可能感兴趣的文章
TensorFlow2.0矩阵与向量的加减乘
查看>>
NOIP 2010题解
查看>>
javascript中的each遍历
查看>>
String中各方法多数情况下返回新的String对象
查看>>
浅谈tcp粘包问题
查看>>
UVA11524构造系数数组+高斯消元解异或方程组
查看>>
排序系列之——冒泡排序、插入排序、选择排序
查看>>
爬虫基础
查看>>
jquery.lazyload延迟加载图片第一屏问题
查看>>
HDU 1011 Starship Troopers (树形DP)
查看>>
手把手教你写DI_1_DI框架有什么?
查看>>
.net常见的一些面试题
查看>>
OGRE 源码编译方法
查看>>
上周热点回顾(10.20-10.26)
查看>>
C#正则表达式引发的CPU跑高问题以及解决方法
查看>>
云计算之路-阿里云上:“黑色30秒”走了,“黑色1秒”来了,真相也许大白了...
查看>>
APScheduler调度器
查看>>
设计模式——原型模式
查看>>
【jQuery UI 1.8 The User Interface Library for jQuery】.学习笔记.1.CSS框架和其他功能
查看>>
如何一个pdf文件拆分为若干个pdf文件
查看>>