本文共 2725 字,大约阅读时间需要 9 分钟。
package MapReduce;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.util.StringUtils;import java.io.IOException;// 四个泛型,前两个是输入数据的类型,后两个是输出数据的类型public class Map extends Mapper{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 将数据转换为String类型 String line = value.toString(); // 分割字符串 String[] words = StringUtils.split(line,' '); // 遍历这个数组 for (String word:words){ context.write(new Text(word),new LongWritable(1)); } }}
package MapReduce;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class Red extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value:values){ count+=value.get(); } context.write(key,new LongWritable(count)); }}
package MapReduce;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class Run { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf =new Configuration(); Job job = Job.getInstance(conf); // 设置job所用的那些类在那个jar包 job.setJarByClass(Run.class); //设置使用的Map和Reduced的类 job.setMapperClass(Map.class); job.setReducerClass(Red.class); // 设置Reduce输出的数据类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置map输出数据的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 设置原始数据存放路径 FileInputFormat.setInputPaths(job,new Path("/input/")); //设置处理结果输出路径 FileOutputFormat.setOutputPath(job,new Path("/output/")); // 将job提交给集群运行 job.waitForCompletion(true); }}
创建input目录
hadoop fs -mkdir /input
提交文件
hadoop fs -put 文件路径 /input
运行
hadoop jar MapReduce.jar MapReduce.Run
转载地址:http://cisnb.baihongyu.com/