First, MapReduce basic principle

1. Basic Overview of MapReduce

1. Definition

It is a programming framework for distributed computing programs. The core function is to integrate the business logic code written by the user and the built-in default components into a complete distributed program, which runs concurrently on a hadoop cluster.

2. Advantages and Disadvantages

(1) Advantages
1>Easy to program: With the programming method of ordinary programs and the interface provided by MapReduce, distributed programs can be completed quickly Preparation.
2>Good scalability: When computing resources are not satisfied, you can simply add computing machines to expand the computing power.
3>High fault tolerance: If the computing node where a task is located fails, the calculation above Tasks can be automatically transferred to another node for execution, that is, automatic failover. This process is completed internally without manual intervention.
4>Suitable for offline processing of data above the PB level

(2) Disadvantages
1>Real-time calculation: Cannot return calculation results in milliseconds or seconds like mysql
2>Streaming calculation: The input data of streaming calculation is dynamic, and MapReduce requires the input data to be static. Persistent in storage.
3>DAG (directed acyclic graph) calculation: multiple applications have dependencies, and the input of the latter application is the output of the previous one. In this case, the performance of MapReduce is very low. Because the output of each stage of MapReduce will be written to disk first, a large amount of disk IO will cause a sharp drop in performance.

3. Core Ideas of MapReduce

1. Basic Principles of MapReduce }
}

From this we can see that the default class for processing input data is TextInputFormat, but this class does not implement the slicing method, which is implemented in its parent class FileInputFormat Slicing method:

/*
FileInputFormat.java
*/
public List getSplits(JobContext job) throws IOException {
StopWatch sw = ( new StopWatch()).start();
long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
< br /> //This is the array that stores slice information
List splits = new ArrayList();
//Get all files in the input path
List files = this.listStatus(job);
Iterator i$ = files. iterator();

while(true) {
while(true) {
while(i$.hasNext()) {
FileStatus file = (FileStatus) i$.next();
Path path = file.getPath();
long length = file.getLen();
if (length != 0L) {
BlockLocation [] blkLocations;
if (file instanceof LocatedFileStatus) {
//Get file block information
blkLocations = ((LocatedFileStatus)file).getBlockLocations();
} else {< br /> FileSystem fs = path.getFileSystem(job.getConfiguration());
blkLocations = fs.getFileBlockLocations(file, 0L, length);
}

//Start officially slicing from here
if (this.isSplitable(job, path)) {
long blockSize = file.getBlockSize();
//Get the slice size
long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

long bytesRemaining;
int blkIndex;
//Loop pair The file is sliced, and you can see that here is to determine whether the remaining part of the file is greater than 1.1 times the slice size.
for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize> 1.1D; bytesRemaining -= splitSize) {
blkIndex = this.getBlockIndex(blkLocations, length-bytesRemaining);
//Record the file, the starting and ending position of the slice, the slice size, the host where the block of the slice is located, etc. into the slice array as slice information .
splits.add(this.makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
< br /> //Here is to add the last content of the file as the last slice to the slice plan
if (bytesRemaining != 0L) {
blkIndex = this.getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(this.makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
}
} else {
splits.add(this.makeSplit(path, 0L, length, new String[0]));
}
}

job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size( ));
sw.stop();
if (LOG.isDebugEnabled()) {
LOG.debug("Total # of splits generated by getSplits: "+ splits.size() + ", TimeTaken: "+ sw.now(TimeUnit.MILLISECONDS));
}

return splits;
}
}
}

/*
This method determines the size of the slice. Simply put, it is mainly determined by the size of maxsize and blocksize.
maxsize> blockSize, then splitsize = blockSize
maxsize
minSize>blockSize, then splitsize = minSize
minSize
Of course, it should be noted that maxsize needs to be Always greater than minSize
*/
protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));< br /> }

(2) Slicing calculation summary-FileInputFormat method

The slice value above is only a plan, and there is no real slice, but when the job is submitted to yarn for execution, Only then will the data be read according to the slice plan. The characteristics of the above slicing are summarized as follows:
1) Slicing according to the length of the content of the file
2) Slicing is slicing according to each file independently, and not all files are sliced ​​as a whole, which has disadvantages (Later)
3) Slice size: the default is blocksize, the computer system is as above, and it is not repeated here

FileInputFormat.setMaxInputSplitSize(); maxsize
FileInputFormat.setMinIutputSplitSize(); minsize

pre>

The slice size can be changed by setting two values

4) The method of slicing: According to the source code, every time the slice is sliced, it will be judged whether the remaining part is larger than splitSize. 1.1 times of , if it is not greater than, then the slice is terminated at this time, and the remaining part is taken as the last slice.

(3) A large number of small file slicing optimization-CombineTextInputFormat method

We can know from (2) that TextInputFormat (FileInputFormat) is sliced ​​according to the file when slicing, that is Say that a file is at least one slice, no matter how big the file is. And if there are a lot of small files, then a lot of maptasks will be generated, and the processing efficiency is very low. In this case, the solution is:
1) Solve from the data source, merge the data and upload it to HDFS, without generating a large number of small files
2) If a large number of small files must be processed, then use CombineTextInputFormat To slice.

The slicing logic is as follows (the source code is quite long, and I will directly talk about the results after I studied the source code):
First of all, CombineTextInputFormat does not implement the getSplit() method, but is implemented by its parent class CombineFileInputformat. It slices multiple files in a directory as a whole data source. The size of the slice depends on the maximum slice size set by MaxSplitSize, and the unit is byte. The slicing logic is

totalSize<=1.5*MaxSplitSize 1 piece, splitSize=totalSize
1.5*MaxSplitSizetotalsize>2*MaxSplitSize n slices, splitSize=MaxSplitSize

It should be noted that:
If the total data size is much larger than MaxSplitSize, when the last slice is cut, it will be judged whether the remaining part after slice Larger than 2 times MaxSplitSize, if it is not larger, it is counted as one slice, if it is larger than two slices

Use CombineTextInputFormat as the operation class of InpuFormat:

//Set the class of InputFormat to CombineTextInputFormat
job.setInputFormatClass(CombineTextInputFormat.class);
//Set the maximum and minimum slices respectively
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job , 2097152);// 2m

3. Partitioning mechanism

As mentioned earlier, the number of map tasks is determined by the number of slices, so what does the number of reduce tasks determine? It depends on the number of partitions.

(1) Basic partition mechanism

1) First, you need to customize a partition class and inherit Partitioner
2) Rewrite public Int getPartition() method. What is returned is the partition number
3) Set the custom class in the job as the partition class, otherwise the default partition class is HashPartitioner

job.setPartitionerClass(CustomPartitioner.class);

< p>4) Set the number of reduce tasks, generally the same as the number of partitions,

job.setNumReduceTasks(N);

Note: the relationship between the number of partitions and the number of reduce tasks
If The number of reduceTasks> the number of results of getPartition will produce a few more empty output files part-r-000xx;
If 1If the number of reduceTasks=1, no matter how many partition files are output by the mapTask side, the final result will be given to this reduceTask, and only one result file part-r-00000 will be generated in the end;

(2) Partition example

public class ProvincePartitioner extends Partitioner {

@Override
public int getPartition(Text key, FlowBean value, int numPartitions) {

// 1 Get the first three digits of the phone number
String preNum = key.toString().substring(0, 3);

//The default partition number, if the following conditions are not met, KV is divided into this partition
int partition = 4;

// 2 Determine which province it is
if (" 136".equals(preNum)) {
partition = 0;
}else if ("137".equals(preNum)) {
partition = 1;
}else if ("138".equals(preNum)) {
partition = 2;
}else if ("139".equals(preNum)) {
partition = 3;
}

return partition;
}
}

Leave a Comment

Your email address will not be published.