第一章 Hadoop入门

常用端口号说明

端口名称Hadoop2.xHadoop3.x
NameNode内部通信端口8020 / 90008020 / 9000/9820
NameNode HTTP UI500709870
MapReduce查看执行任务端口80888088
历史服务器通信端口1988819888

HDFS的API操作

客户端环境准备

  • 在IDEA中创建一个Maven工程HDFSClient3.x,并导入相应的依赖坐标+日志添加
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <dependencies>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.30</version>
    </dependency>
    </dependencies>
  • 在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
    1
    2
    3
    4
    5
    6
    7
    8
    log4j.rootLogger=INFO, stdout  
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

客户端测试创建文件夹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package cn.jermyn.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
* TODO 1. 获取一个客户端对象 FileSystem.get(URL uri,Configuration conf,String user)
* TODO 2. 执行相关命令操作
* TODO 3. 关闭资源
*/
public class HdfsClient {
private FileSystem fs;

/**
* 执行test之前做的操作
*
* @throws URISyntaxException
* @throws IOException
* @throws InterruptedException
*/
@Before
public void init() throws URISyntaxException, IOException, InterruptedException {
// 1 获取文件系统
Configuration configuration = new Configuration();

fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration, "Jermyn");

}

/**
* 执行test之后做的操作
* @throws IOException
*/
@After
public void close() throws IOException {
// 3 关闭资源
fs.close();
}

/**
* 创建一个文件夹
* @throws IOException
* @throws URISyntaxException
* @throws InterruptedException
*/
@Test
public void testMkdirs() throws IOException, URISyntaxException, InterruptedException {

// 2 创建目录
fs.mkdirs(new Path("/demo"));
}
}

第二章 MapReduce概述

WordCount案例实操

环境准备

  1. 创建maven工程,MapReduce3.x
  2. 在pom.xml文件中添加如下依赖
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    <dependencies>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.30</version>
    </dependency>
    </dependencies>
  3. 在项目的src/main/resources目录下,新建一个文件,命名为“log4j.properties”,在文件中填入
    1
    2
    3
    4
    5
    6
    7
    8
    log4j.rootLogger=INFO, stdout  
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
  4. 创建包名:cn.Jermyn.mapreduce.wordcount

编写程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package cn.Jermyn.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN :map阶段输入的key的数据类型 LongWritable
* VALUEIN :map阶段输入的value的数据类型 Text
* KEYOUT:map阶段输出的key的数据类型 Text
* VALUEOUT :map阶段输出的value的数据类型 IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {


/**
* TODO 1. 获取一行 line = value.toString()
* TODO 2. 切割 line.split(" ")
* TODO 3. 循环遍历封装KV outK.set(); outV.set();
* TODO 4. 写出KV
*
* @param key 偏移量
* @param value 数据子偏移量起的一行数据
* @param context 允许任务输入和输出的上下文对象。它只提供给Mapper或Reducer
* @throws IOException
* @throws InterruptedException
*/
Text outK = new Text();
IntWritable outV = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {

// 1. 获取一行
String line = value.toString();

// 2. 切割
String[] words = line.split(" ");

// 3. 循环遍历封装KV
for (String word : words) {

// 封装K
outK.set(word);

// 写出
context.write(outK, outV);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package cn.Jermyn.mapreduce.wordcount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN :reduce阶段输入的key的数据类型 Text
* VALUEIN :reduce阶段输入的value的数据类型 IntWritable
* KEYOUT:reduce阶段输出的key的数据类型 Text
* VALUEOUT :reduce阶段输出的value的数据类型 IntWritable
*/
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

/**
*
* TODO 1. 累加求和
* TODO 2. 输出(封装)
*
* @param key map阶段输出的key的类型
* @param values map阶段输出的value的类型
* @param context 允许任务输入和输出的上下文对象。它只提供给Mapper或Reducer
* @throws IOException
* @throws InterruptedException
*/
int sum;
IntWritable outV = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

// 1 累加求和
sum = 0;
for (IntWritable value : values) {
sum += value.get();
}

// 2 输出
outV.set(sum);
context.write(key, outV);

}
}



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package cn.Jermyn.mapreduce.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* mapreduce的驱动类
*/
public class WordCountDriver {

/**
* TODO 0 直接设置输入输出的路径
* TODO 1 获取配置信息以及获取job对象
* TODO 2 关联本Driver程序的jar
* TODO 3 关联Mapper和Reducer的jar
* TODO 4 设置Mapper输出的kv类型
* TODO 5 设置最终输出kv类型
* TODO 6 设置输入和输出路径
* TODO 7 提交job
*
* @param args 需要处理的文件和处理后数据存放的位置
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

// 0 直接设置输入输出的路径
args = new String[]{"C:\\Users\\Administrator\\Desktop\\input\\wordcount\\data.txt",
"C:\\Users\\Administrator\\Desktop\\output\\wordcount"};

// 1 获取配置信息以及获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

// 2 关联本Driver程序的jar
job.setJarByClass(WordCountDriver.class);

// 3 关联Mapper和Reducer的jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4 设置Mapper阶段输出的kv类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5 设置最终输出kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

第三章 Hadoop序列化

自定义bean对象实现序列化接口(Writable)

在企业开发中往往常用的基本序列化类型不能满足所有需求,比如在Hadoop框架内部传递一个bean对象,那么该对象就需要实现序列化接口。

实现bean对象序列化步骤

第1步

必须实现Writable接口

第2步

反序列化时,需要反射调用空参构造函数,所以必须有空参构造

1
2
3
public FlowBean() {
super();
}

第3步

重写序列化方法:

1
2
3
4
5
6
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

第4步

重写反序列化方法:

1
2
3
4
5
6
@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}

第5步

注意反序列化的顺序和序列化的顺序完全一致

第6步

要想把结果显示在文件中,需要重写toString(),可用”\t”分开,方便后续用。

第7步

如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序。

1
2
3
4
5
@Override
public int compareTo(FlowBean o) {
// 倒序排列,从大到小
return this.sumFlow > o.getSumFlow() ? -1 : 1;
}

序列化案例实操

  1. 需求:统计每一个手机号耗费的总上行流量、下行流量、总流量
  2. 需求分析
  3. 编写MapReduce程序
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    package cn.jermyn.mr.flowsum;

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class FlowBean implements Writable {


    private long upFlow; // 上行流量
    private long downFlow; // 下行流量
    private long sumFlow; // 总流量

    public FlowBean(long upFlow, long downFlow) {
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    sumFlow = upFlow + downFlow;
    }

    public FlowBean() {
    }

    public long getUpFlow() {
    return upFlow;
    }

    public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
    }

    public long getDownFlow() {
    return downFlow;
    }

    public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
    }

    public long getSumFlow() {
    return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
    }

    // 序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeLong(upFlow);
    dataOutput.writeLong(downFlow);
    dataOutput.writeLong(sumFlow);
    }

    // 反序列方法
    @Override
    public void readFields(DataInput dataInput) throws IOException {
    // 必须和序列化顺序一致
    upFlow = dataInput.readLong();
    downFlow = dataInput.readLong();
    sumFlow = dataInput.readLong();
    }


    @Override
    public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    public void set(long upFlow1, long downFlow1) {
    upFlow = upFlow1;
    downFlow = downFlow1;
    sumFlow = upFlow1 + downFlow1;
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    package cn.jermyn.mr.flowsum;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    /**
    * map 阶段
    * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
    * KEYIN 输入数据的key
    * VALUEIN 输入数据的value
    * KEYOUT 输出数据的key的类型
    * VALUEOUT 输出数据的value类型
    */
    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {


    Text k = new Text();
    FlowBean v = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

    // 1.获取第一行
    String line = value.toString();

    // 2.切割以“\t”
    String[] keys = line.split("\t");

    // 3.封装对象
    k.set(keys[1]);
    long upFlow = Long.parseLong(keys[keys.length - 3]);
    long downFlow = Long.parseLong(keys[keys.length - 2]);


    v.set(upFlow,downFlow);
    // 4. 写出
    context.write(k, v);
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package cn.jermyn.mr.flowsum;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    FlowBean v = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {

    //1.累加求和
    long sumUpFlow = 0;
    long sumDownFlow = 0;
    for (FlowBean flowbean : values) {
    sumUpFlow += flowbean.getUpFlow();
    sumDownFlow += flowbean.getDownFlow();
    }
    v.set(sumUpFlow, sumDownFlow);

    //2.写出
    context.write(key, v);
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package cn.jermyn.mr.flowsum;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class FlowSumDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    args = new String[]{"C:\\Users\\Administrator\\Desktop\\phone.txt", "C:\\Users\\Administrator\\Desktop\\output"};

    Configuration conf = new Configuration();
    // 1.获取Job对象
    Job job = Job.getInstance(conf);

    // 2.设置jar的路径
    job.setJarByClass(FlowSumDriver.class);

    // 3.关联Map和Reduce类
    job.setMapperClass(FlowCountMapper.class);
    job.setReducerClass(FlowCountReducer.class);

    // 4.设置Mapper阶段输出的key和value的数据类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);


    // 5.设置最终输出的key和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    // 6.设置输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 7.提交job
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
    }
    }
    点击查看运行结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    PS C:\Users\Administrator\Desktop\output> ls


    目录: C:\Users\Administrator\Desktop\output


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    -a---- 2023/5/31 11:05 16 .part-r-00000.crc
    -a---- 2023/5/31 11:05 8 ._SUCCESS.crc
    -a---- 2023/5/31 11:05 550 part-r-00000
    -a---- 2023/5/31 11:05 0 _SUCCESS


    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00000
    13470253144 180 180 360
    13509468723 7335 110349 117684
    13560439638 918 4938 5856
    13568436656 3597 25635 29232
    13590439668 1116 954 2070
    13630577991 6960 690 7650
    13682846555 1938 2910 4848
    13729199489 240 0 240
    13736230513 2481 24681 27162
    13768778790 120 120 240
    13846544121 264 0 264
    13956435636 132 1512 1644
    13966251146 240 0 240
    13975057813 11058 48243 59301
    13992314666 3008 3720 6728
    15043685818 3659 3538 7197
    15910133277 3156 2936 6092
    15959002129 1938 180 2118
    18271575951 1527 2106 3633
    18390173782 9531 2412 11943
    84188413 4116 1432 5548
    PS C:\Users\Administrator\Desktop\output>

第四章 MapReduce框架原理

InputFormat数据输入

MapTask并行度决定机制

数据块:Block是HDFS物理上把数据分成一块一块。数据块是HDFS存储数据单位。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。数据切片是MapReduce程序计算输入数据的单位,一个切片会对应启动一个MapTask。

  1. 一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
  2. 每一个Split切片分配一个MapTask并行实例处理
  3. 默认情况下,切片大小=BlockSize
  4. 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

Job提交流程源码和切片源码详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
waitForCompletion()

submit();

// 1建立连接
connect();
// 1)创建提交Job的代理
new Cluster(getConfiguration());
// (1)判断是本地运行环境还是yarn集群运行环境
initialize(jobTrackAddr, conf);

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)

// 1)创建给集群提交数据的Stag路径
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

// 2)获取jobid ,并创建Job路径
JobID jobId = submitClient.getNewJobID();

// 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);
rUploader.uploadFiles(job, jobSubmitDir);

// 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
maps = writeNewSplits(job, jobSubmitDir);
input.getSplits(job);

// 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
conf.writeXml(out);

// 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

FileInputFormat切片源码解析(input.getSplits(job))

FileInputFormat切片机制

  1. 切片机制
    • 简单地按照文件的内容长度进行切片
    • 切片大小,默认等于Block大小
    • 切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
  2. 案例分析
    • 输入数据有两个文件:
         file1.txt    320M   
         file2.txt    10M
      
    • 经过FileInputFormat的切片机制运算后,形成的切片信息如下:
         file1.txt.split1--  0~128
         file1.txt.split2--  128~256
         file1.txt.split3--  256~320
         file2.txt.split1--  0~10M
      
  • 源码中计算切片大小的公式
      Math.max(minSize, Math.min(maxSize, blockSize)); 
      mapreduce.input.fileinputformat.split.minsize=1 默认值为1
      mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
      因此,默认情况下,切片大小=blocksize。
    
  • 切片大小设置
    maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数的值。
    minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。
  • 获取切片信息API
    1
    2
    3
    4
    5
    // 获取切片的文件名称
    String name = inputSplit.getPath().getName();

    // 根据文件类型获取切片信息
    FileSplit inputSplit = (FileSplit) context.getInputSplit();

Shuffle机制

Partition分区

  1. 问题引出
    要求将统计结果按照条件输出到不同文件中(分区)。比如:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
  2. 默认Partitioner分区
    1
    2
    3
    4
    5
    6
    public class HashPartitioner<K, V> extends Partitioner<K, V> {

    public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
    }
    默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。
  3. 自定义Partitioner步骤
    1. 自定义类继承Partitioner,重写getPartition()方法
      1
      2
      3
      4
      5
      6
      7
      8
       public class CustomPartitioner extends Partitioner<Text, FlowBean> {
        @Override
      public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 控制分区代码逻辑
      … …
      return partition;
      }
      }
    2. 在Job驱动中,设置自定义Partitioner
      1
      job.setPartitionerClass(CustomPartitioner.class);
    3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
      1
      job.setNumReduceTasks(5);
  4. 分区总结
    1. 如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
    2. 如果1 < ReduceTask的数量 < getPartition的结果数,则有一部分分区数据无处安放,会Exception;
    3. 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件 part-r-00000
    4. 分区号必须从零开始,逐一累加。
  5. 案例分析:
    1
    2
    3
    4
    例如:假设自定义分区数为5,则
    job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件
    job.setNumReduceTasks(2); 会报错
    job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

Partition分区案例实操

  1. 需求:将统计结果按照手机归属地不同省份输出到不同文件中(分区)
  2. 期望输出数据:手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。
  3. 需求分析
  4. 编写代码

    在案例3.2的基础上,增加一个分区类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package cn.jermyn.mr.flowsum;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;

    public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    // key是手机号
    // value 是流量信息
    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {

    int partition = 4;
    // 获取手机号前3位
    String pre3OfPhoneNum = text.toString().substring(0, 3);
    if ("136".equals(pre3OfPhoneNum)) {
    partition = 0;
    } else if ("137".equals(pre3OfPhoneNum)) {
    partition = 1;
    } else if ("138".equals(pre3OfPhoneNum)) {
    partition = 2;
    } else if ("139".equals(pre3OfPhoneNum)) {
    partition = 3;
    }
    return partition;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    package cn.jermyn.mr.flowsum;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class FlowSumDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    args = new String[]{"C:\\Users\\Administrator\\Desktop\\input\\phone.txt", "C:\\Users\\Administrator\\Desktop\\output"};

    Configuration conf = new Configuration();
    // 1.获取Job对象
    Job job = Job.getInstance(conf);

    //指定自定义数据分区
    job.setPartitionerClass(ProvincePartitioner.class);

    //同时指定相应数量的reduce task
    job.setNumReduceTasks(5);

    // 2.设置jar的路径
    job.setJarByClass(FlowSumDriver.class);

    // 3.关联Map和Reduce类
    job.setMapperClass(FlowCountMapper.class);
    job.setReducerClass(FlowCountReducer.class);

    // 4.设置Mapper阶段输出的key和value的数据类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(FlowBean.class);


    // 5.设置最终输出的key和value类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    // 6.设置输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 7.提交job
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
    }
    }
    点击查看运行结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    PS C:\Users\Administrator\Desktop\output> ls


    目录: C:\Users\Administrator\Desktop\output


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    -a---- 2023/6/3 17:12 12 .part-r-00000.crc
    -a---- 2023/6/3 17:12 12 .part-r-00001.crc
    -a---- 2023/6/3 17:12 12 .part-r-00002.crc
    -a---- 2023/6/3 17:12 12 .part-r-00003.crc
    -a---- 2023/6/3 17:12 12 .part-r-00004.crc
    -a---- 2023/6/3 17:12 8 ._SUCCESS.crc
    -a---- 2023/6/3 17:12 53 part-r-00000
    -a---- 2023/6/3 17:12 75 part-r-00001
    -a---- 2023/6/3 17:12 22 part-r-00002
    -a---- 2023/6/3 17:12 105 part-r-00003
    -a---- 2023/6/3 17:12 295 part-r-00004
    -a---- 2023/6/3 17:12 0 _SUCCESS


    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00000
    13630577991 6960 690 7650
    13682846555 1938 2910 4848
    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00001
    13729199489 240 0 240
    13736230513 2481 24681 27162
    13768778790 120 120 240
    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00002
    13846544121 264 0 264
    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00003
    13956435636 132 1512 1644
    13966251146 240 0 240
    13975057813 11058 48243 59301
    13992314666 3008 3720 6728
    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00004
    13470253144 180 180 360
    13509468723 7335 110349 117684
    13560439638 918 4938 5856
    13568436656 3597 25635 29232
    13590439668 1116 954 2070
    15043685818 3659 3538 7197
    15910133277 3156 2936 6092
    15959002129 1938 180 2118
    18271575951 1527 2106 3633
    18390173782 9531 2412 11943
    84188413 4116 1432 5548
    PS C:\Users\Administrator\Desktop\output>

WritableComparable排序案例实操(全排序)

  1. 需求:根据案例3.2产生的结果再次对总流量进行排序。
  2. 需求分析
  3. 代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    package cn.jermyn.mr.writablecomparable;

    import org.apache.hadoop.io.WritableComparable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow, long sumFlow) {
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    this.sumFlow = sumFlow;
    }

    public long getUpFlow() {
    return upFlow;
    }

    public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
    }

    public long getDownFlow() {
    return downFlow;
    }

    public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
    }

    public long getSumFlow() {
    return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
    this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }


    @Override
    public int compareTo(FlowBean o) {
    int result;
    if (this.sumFlow > o.sumFlow) {
    result = -1;
    } else if (this.sumFlow < o.sumFlow) {
    result = 1;
    } else {
    result = 0;
    }
    return result;
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
    }

    /**
    * 反序列化方法 注意反序列化的顺序和序列化的顺序完全一致
    *
    * @param in
    * @throws IOException
    */
    @Override
    public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
    }

    public void set(long upFlow, long downFlow) {
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    this.sumFlow = upFlow + downFlow;
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package cn.jermyn.mr.writablecomparable;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    FlowBean bean = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    // 1 获取一行
    String line = value.toString();

    // 2 截取
    String[] fields = line.split("\t");

    // 3 封装对象
    String phoneNbr = fields[0];
    long upFlow = Long.parseLong(fields[1]);
    long downFlow = Long.parseLong(fields[2]);

    bean.set(upFlow, downFlow);
    v.set(phoneNbr);

    // 4 输出
    context.write(bean, v);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    package cn.jermyn.mr.writablecomparable;

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

    for (Text value : values) {
    context.write(value, key);
    }
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package cn.jermyn.mr.writablecomparable;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class FlowCountSortDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    args = new String[]{"C:\\Users\\Administrator\\Desktop\\output\\part-r-00000",
    "C:\\Users\\Administrator\\Desktop\\output1"};

    Configuration configuration = new Configuration();
    // 获取一个job
    Job job = Job.getInstance(configuration);

    // 设置jar包的位置
    job.setJarByClass(FlowCountSortDriver.class);

    // 关联map和reduce类
    job.setMapperClass(FlowCountSortMapper.class);
    job.setReducerClass(FlowCountSortReducer.class);

    // 设置Map阶段输出的kv
    job.setMapOutputKeyClass(FlowBean.class);
    job.setMapOutputValueClass(Text.class);

    // 设置最终的输出的kv
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FlowBean.class);

    // 设置文件输入输入的路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 提交job
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
    }
    }

    点击查看运行结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    PS C:\Users\Administrator\Desktop\output1> ls


    目录: C:\Users\Administrator\Desktop\output1


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    -a---- 2023/6/3 21:16 16 .part-r-00000.crc
    -a---- 2023/6/3 21:16 8 ._SUCCESS.crc
    -a---- 2023/6/3 21:16 550 part-r-00000
    -a---- 2023/6/3 21:16 0 _SUCCESS


    PS C:\Users\Administrator\Desktop\output1> cat .\part-r-00000
    13509468723 7335 110349 117684
    13975057813 11058 48243 59301
    13568436656 3597 25635 29232
    13736230513 2481 24681 27162
    18390173782 9531 2412 11943
    13630577991 6960 690 7650
    15043685818 3659 3538 7197
    13992314666 3008 3720 6728
    15910133277 3156 2936 6092
    13560439638 918 4938 5856
    84188413 4116 1432 5548
    13682846555 1938 2910 4848
    18271575951 1527 2106 3633
    15959002129 1938 180 2118
    13590439668 1116 954 2070
    13956435636 132 1512 1644
    13470253144 180 180 360
    13846544121 264 0 264
    13729199489 240 0 240
    13768778790 120 120 240
    13966251146 240 0 240
    PS C:\Users\Administrator\Desktop\output1>

OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。

  1. 文本输出TextOutputFormat
    默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
  2. SequenceFileOutputFormat
    将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
  3. 自定义OutputFormat
    根据用户需求,自定义实现输出。

自定义OutputFormat

  1. 使用场景
    为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。
    例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。
  2. 自定义OutputFormat步骤
    1. 自定义一个类继承FileOutputFormat。
    2. 改写RecordWriter,具体改写输出数据的方法write()。

自定义OutputFormat案例实操

  1. 需求:过滤输入的log日志,包含jermyn的网站输出到C:\Users\Administrator\Desktop\output\jermyn,不包含jermyn的网站输出到C:\Users\Administrator\Desktop\output\others。
  2. 需求分析
  3. 案例实操
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    package cn.jermyn.mr.outputformat;

    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;

    import java.io.IOException;

    public class FRecordWriter extends RecordWriter<Text, NullWritable> {

    FSDataOutputStream fosAtJermyn;
    FSDataOutputStream fosAtOther;

    public FRecordWriter(TaskAttemptContext job) {

    try {
    // 1 获取文件系统
    FileSystem fs = FileSystem.get(job.getConfiguration());

    // 2 创建输出到jermyn.log的输出流

    fosAtJermyn = fs.create(
    new Path("C:\\Users\\Administrator\\Desktop\\output\\jermyn\\jermyn.log"));

    // 3 创建输出到other.log的输出流
    fosAtOther = fs.create(
    new Path("C:\\Users\\Administrator\\Desktop\\output\\other\\other.log"));

    } catch (IOException e) {
    throw new RuntimeException(e);
    }

    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
    // 判断key中是否有jermyn,有的话写出到jermyn.log 没有的话就写到other.log
    if (key.toString().contains("jermyn")) {
    //jermyn输出流
    fosAtJermyn.write(key.toString().getBytes());
    }else {
    //other输出流
    fosAtOther.write(key.toString().getBytes());
    }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
    IOUtils.closeStream(fosAtJermyn);
    IOUtils.closeStream(fosAtOther);
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package cn.jermyn.mr.outputformat;

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class OutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {


    return new FRecordWriter(job);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    package cn.jermyn.mr.outputformat;

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    Text k = new Text();

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {

    String string = key.toString();
    string = string + "\n";

    for (NullWritable nu : values) {
    k.set(string);
    context.write(k, NullWritable.get());
    }
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    package cn.jermyn.mr.outputformat;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    import java.io.IOException;

    public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

    context.write(value,NullWritable.get());
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    package cn.jermyn.mr.outputformat;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;

    public class FilterDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

    args = new String[]{"C:\\Users\\Administrator\\Desktop\\input\\log.txt",
    "C:\\Users\\Administrator\\Desktop\\output"};
    Configuration entries = new Configuration();
    // 获取一个job
    Job job = Job.getInstance(entries);

    // 设置jar的路径
    job.setJarByClass(FilterDriver.class);

    // 关联mapper和reducer
    job.setMapperClass(FilterMapper.class);
    job.setReducerClass(FilterReducer.class);

    // 要将自定义的输出格式组件设置到job中
    job.setOutputFormatClass(OutputFormat.class);

    // 设置map阶段输出的kv数据类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(NullWritable.class);

    // 设置最终阶段输出的kv数据类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    // 设置文件的输入输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 提交job
    boolean b = job.waitForCompletion(true);
    System.exit(b ? 0 : 1);
    }
    }

    点击查看运行结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    PS C:\Users\Administrator\Desktop\output\jermyn> ls
    目录: C:\Users\Administrator\Desktop\output\jermyn


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    -a---- 2023/6/4 16:56 12 .jermyn.log.crc
    -a---- 2023/6/4 16:56 43 jermyn.log


    PS C:\Users\Administrator\Desktop\output\jermyn> cat .\jermyn.log
    http://www.jermyn.cn
    PS C:\Users\Administrator\Desktop\output\jermyn> cd ..
    PS C:\Users\Administrator\Desktop\output> ls


    目录: C:\Users\Administrator\Desktop\output


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    d----- 2023/6/4 16:56 jermyn
    d----- 2023/6/4 16:56 other
    -a---- 2023/6/4 16:56 8 ._SUCCESS.crc
    -a---- 2023/6/4 16:56 0 _SUCCESS


    PS C:\Users\Administrator\Desktop\output> cd .\other\
    PS C:\Users\Administrator\Desktop\output\other> ls


    目录: C:\Users\Administrator\Desktop\output\other


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    -a---- 2023/6/4 16:56 12 .other.log.crc
    -a---- 2023/6/4 16:56 171 other.log


    PS C:\Users\Administrator\Desktop\output\other> cat .\other.log
    http://cn.bing.com
    http://www.baidu.com
    http://www.google.com
    http://www.sin2a.com
    http://www.sin2desa.com
    http://www.sina.com
    http://www.sindsafa.com
    http://www.sohu.com
    PS C:\Users\Administrator\Desktop\output\other>

Join多种应用

Reduce Join

  • Reduce Join工作原理
    • Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
    • Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

Reduce Join案例实操

  1. 需求
    1
    2
    3
    4
    5
    6
    7
    8
    订单数据表t_order
    id pid amount
    1001 01 1
    1002 02 2
    1003 03 3
    1004 01 4
    1005 02 5
    1006 03 6
    1
    2
    3
    4
    5
    商品信息表t_product
    pid pname
    01 小米
    02 华为
    03 格力
    将商品信息表中数据根据商品pid合并到订单数据表中。
    1
    2
    3
    4
    5
    6
    7
    8
    最终数据形式
    id pname amount
    1001 小米 1
    1004 小米 4
    1002 华为 2
    1005 华为 5
    1003 格力 3
    1006 格力 6
  2. 需求分析
    通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联
  3. 代码实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    package cn.jermyn.mr.table;

    import org.apache.hadoop.io.Writable;

    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;

    public class TableBean implements Writable {

    private String id; // 订单号
    private String pid; // 产品号
    private int amount; // 产品数量
    private String pName; // 产品名称
    private String flag; // 表的名称

    public TableBean() {
    }

    public TableBean(String id, String pid, int amount, String pName, String flag) {
    this.id = id;
    this.pid = pid;
    this.amount = amount;
    this.pName = pName;
    this.flag = flag;
    }


    /**
    * 序列化
    * @param out <code>DataOuput</code> to serialize this object into.
    * @throws IOException
    */
    @Override
    public void write(DataOutput out) throws IOException {
    out.writeUTF(id);
    out.writeUTF(pid);
    out.writeInt(amount);
    out.writeUTF(pName);
    out.writeUTF(flag);
    }

    /**
    * 反序列化,注意和序列的顺序一致
    * @param in <code>DataInput</code> to deseriablize this object from.
    * @throws IOException
    */
    @Override
    public void readFields(DataInput in) throws IOException {
    id=in.readUTF();
    pid=in.readUTF();
    amount=in.readInt();
    pName=in.readUTF();
    flag=in.readUTF();
    }

    public String getId() {
    return id;
    }

    public void setId(String id) {
    this.id = id;
    }

    public String getPid() {
    return pid;
    }

    public void setPid(String pid) {
    this.pid = pid;
    }

    public int getAmount() {
    return amount;
    }

    public void setAmount(int amount) {
    this.amount = amount;
    }

    public String getpName() {
    return pName;
    }

    public void setpName(String pName) {
    this.pName = pName;
    }

    public String getFlag() {
    return flag;
    }

    public void setFlag(String flag) {
    this.flag = flag;
    }

    @Override
    public String toString() {
    return id + "\t" + pName + "\t" + amount + "\t" ;
    }

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    package cn.jermyn.mr.table;

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;

    import java.io.File;
    import java.io.IOException;

    public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean> {
    String name;
    TableBean tableBean = new TableBean();
    Text k = new Text();

    /**
    * 启动 maptask 会先执行setup加载配置信息
    * @param context
    * @throws IOException
    * @throws InterruptedException
    */
    @Override
    protected void setup(Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {

    // 因为我们有两个文件,默认情况下会分两个切片,也是两个maptask,
    // 可以通过切片获取到文件的传入路径,进而得到文件名(即订单名)
    FileSplit inputSplit = (FileSplit) context.getInputSplit();
    name = inputSplit.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, TableBean>.Context context) throws IOException, InterruptedException {

    // 获取一行
    String line = value.toString();

    // 得到文件名后和"order","pb"相匹配目的是new出相对应的实例对象,用来封装,
    // 获取相应字段,按照order和pb
    if (name.startsWith("order")) {
    String[] fields = line.split("\t");

    tableBean.setId(fields[0]);
    tableBean.setPid(fields[1]);
    tableBean.setAmount(Integer.parseInt(fields[2]));
    tableBean.setpName("");
    tableBean.setFlag("order");

    // 根据商品pid合并到订单数据表中,所以以pid为key,TableBean实例为value
    k.set(fields[1]);
    } else {
    String[] fields = line.split("\t");

    tableBean.setId("");
    tableBean.setPid(fields[0]);
    tableBean.setAmount(0);
    tableBean.setpName(fields[1]);
    tableBean.setFlag("pd");

    // 根据商品pid合并到订单数据表中,所以以pid为key,TableBean实例为value
    k.set(fields[0]);
    }

    // 封装
    context.write(k, tableBean);
    }
    }


    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    package cn.jermyn.mr.table;

    import org.apache.commons.beanutils.BeanUtils;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;
    import java.lang.reflect.InvocationTargetException;
    import java.util.ArrayList;

    public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<TableBean> values, Reducer<Text, TableBean, TableBean, NullWritable>.Context context) throws IOException, InterruptedException {

    // 对于一个商品信息表可以有多个订单表,所以封装TableBean实例封装数组
    ArrayList<TableBean> orderBeans = new ArrayList<>();

    // 商品信息表只有一个
    TableBean pdBean = new TableBean();

    // 此时的values中是一个的TableBean对象,包含order和pb
    for (TableBean tableBean : values) {
    // 如果实例对象的flag是order,需要将其添加到orderBeans中
    if ("order".equals(tableBean.getFlag())) {

    // 在迭代器中的value存储的地址,不可以直接添加到数组
    TableBean tmpBean = new TableBean();
    try {
    // 通过工具类来进行拷贝
    BeanUtils.copyProperties(tmpBean, tableBean);
    } catch (IllegalAccessException | InvocationTargetException e) {
    throw new RuntimeException(e);
    }
    // 添加到数组中
    orderBeans.add(tmpBean);

    // 如果实例对象的flag是pb
    } else {
    try {
    // 直接进行拷贝
    BeanUtils.copyProperties(pdBean, tableBean);
    } catch (IllegalAccessException | InvocationTargetException e) {
    throw new RuntimeException(e);
    }
    }
    }


    // 遍历订单表构成的数组,把每个orderBean实例的pname属性,设置为相同pid的name
    for (TableBean orderBean : orderBeans) {
    orderBean.setpName(pdBean.getpName());

    // 封装
    context.write(orderBean, NullWritable.get());
    }
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    package cn.jermyn.mr.table;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


    public class TableDriver {

    public static void main(String[] args) throws Exception {

    // 0 根据自己电脑路径重新配置
    args = new String[]{"C:\\Users\\Administrator\\Desktop\\input",
    "C:\\Users\\Administrator\\Desktop\\output"};

    // 1 获取配置信息,或者job对象实例
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);

    // 2 指定本程序的jar包所在的本地路径
    job.setJarByClass(TableDriver.class);

    // 3 指定本业务job要使用的Mapper/Reducer业务类
    job.setMapperClass(TableMapper.class);
    job.setReducerClass(TableReducer.class);

    // 4 指定Mapper输出数据的kv类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(TableBean.class);

    // 5 指定最终输出的数据的kv类型
    job.setOutputKeyClass(TableBean.class);
    job.setOutputValueClass(NullWritable.class);

    // 6 指定job的输入原始文件所在目录
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
    boolean result = job.waitForCompletion(true);
    System.exit(result ? 0 : 1);
    }
    }
    点击查看运行结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    PS C:\Users\Administrator\Desktop\output> ls


    目录: C:\Users\Administrator\Desktop\output


    Mode LastWriteTime Length Name
    ---- ------------- ------ ----
    -a---- 2023/6/4 21:39 12 .part-r-00000.crc
    -a---- 2023/6/4 21:39 8 ._SUCCESS.crc
    -a---- 2023/6/4 21:39 90 part-r-00000
    -a---- 2023/6/4 21:39 0 _SUCCESS


    PS C:\Users\Administrator\Desktop\output> cat .\part-r-00000
    1004 灏忕背 4
    1001 灏忕背 1
    1005 鍗庝负 5
    1002 鍗庝负 2
    1006 鏍煎姏 6
    1003 鏍煎姏 3
    PS C:\Users\Administrator\Desktop\output>
  4. 总结
    缺点:这种方式中,合并的操作是在Reduce阶段完成,Reduce端的处理压力太大,Map节点的运算负载则很低,资源利用率不高,且在Reduce阶段极易产生数据倾斜。
    解决方案:Map端实现数据合并

Map Join

  1. 使用场景:Map Join适用于一张表十分小、一张表很大的场景。
  2. 优点:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
    在Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
  3. 具体办法:采用DistributedCache
    • 在Mapper的setup阶段,将文件读取到缓存集合中。
    • 在驱动函数中加载缓存。
      1
      2
      // 缓存普通文件到Task运行节点。
      job.addCacheFile(new URI("file://e:/cache/pd.txt"));

Map Join案例实操

  1. 需求
    1
    2
    3
    4
    5
    6
    7
    8
    订单数据表t_order
    id pid amount
    1001 01 1
    1002 02 2
    1003 03 3
    1004 01 4
    1005 02 5
    1006 03 6
    1
    2
    3
    4
    5
    商品信息表t_product
    pid pname
    01 小米
    02 华为
    03 格力
    将商品信息表中数据根据商品pid合并到订单数据表中。
    1
    2
    3
    4
    5
    6
    7
    8
    最终数据形式
    id pname amount
    1001 小米 1
    1004 小米 4
    1002 华为 2
    1005 华为 5
    1003 格力 3
    1006 格力 6
  2. 需求分析:MapJoin适用于关联表中有小表的情形
  3. 实现代码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    package cn.jermyn.mr.table;

    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.URI;
    import java.util.HashMap;
    import java.util.Map;

    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;

    public class DistributedCacheMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    Map<String, String> pdMap = new HashMap<>();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

    // 1 获取缓存的文件
    URI[] cacheFiles = context.getCacheFiles();
    String path = cacheFiles[0].getPath().toString();

    BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

    String line;
    while (StringUtils.isNotEmpty(line = reader.readLine())) {

    // 2 切割
    String[] fields = line.split("\t");

    // 3 缓存数据到集合
    pdMap.put(fields[0], fields[1]);
    }

    // 4 关流
    reader.close();
    }

    Text k = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    // 1 获取一行
    String line = value.toString();

    // 2 截取
    String[] fields = line.split("\t");

    // 3 获取产品id
    String pId = fields[1];

    // 4 获取商品名称
    String pdName = pdMap.get(pId);

    // 5 拼接
    k.set(line + "\t" + pdName);

    // 6 写出
    context.write(k, NullWritable.get());
    }
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    package cn.jermyn.mr.table;

    import java.net.URI;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class DistributedCacheDriver {

    public static void main(String[] args) throws Exception {

    // 0 根据自己电脑路径重新配置
    args = new String[]{"C:\\Users\\Administrator\\Desktop\\input\\order",
    "C:\\Users\\Administrator\\Desktop\\output"};

    // 1 获取job信息
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration);

    // 2 设置加载jar包路径
    job.setJarByClass(DistributedCacheDriver.class);

    // 3 关联map
    job.setMapperClass(DistributedCacheMapper.class);

    // 4 设置最终输出数据类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    // 5 设置输入输出路径
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    // 6 加载缓存数据
    job.addCacheFile(new URI("file:///C:\\Users\\Administrator\\Desktop\\input\\pd"));

    // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
    job.setNumReduceTasks(0);

    // 8 提交
    boolean result = job.waitForCompletion(true);
    System.exit(result ? 0 : 1);
    }
    }