十九、CombineTextInputFormat切片机制源码分析
一、CombineTextInputFormat切片机制
1、思考?
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
2、CombineTextInputFormat应用场景
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理
3、虚拟存储切片最大值设置
它决定了切块与切片:
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虚拟存储切片最大值设置最好根据实际的小文件大小情况来设置具体的值。
4、切片机制过程
生成切片过程包括:虚拟存储过程和切片过程二部分。
(1)虚拟存储过程:将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。
(2)切片过程:
a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最终会形成3个切片,大小分别为:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
二、代码实操
1、只是在简单mr中设置如下两个步骤,就不在这里放出代码了,给出代码地址。主要是演示文件
(a)驱动类中添加代码如下:
// 如果不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
//虚拟存储切片最大值设置4m
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
三、源码阅读注释
1、CombineTextInputFormat重写了切片函数,在切片之前先进行了切块操作
//必须设置如下参数
//job.setInputFormatClass(CombineTextInputFormat.class);
//CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
2、进入CombineTextInputFormat 类中发现没有太多代码,需要进去父类CombineFileInputFormat中,这里重写了切片机制
//1.进入切片函数,省略非部分非主要代码
public List<InputSplit> getSplits(JobContext job) throws IOException {
long maxSize = 0L;
Configuration conf = job.getConfiguration();
if (this.maxSplitSize != 0L) {
maxSize = this.maxSplitSize;
} else {
//2.获取分块界限值
maxSize = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0L);
}
if (minSizeNode != 0L && maxSize != 0L && minSizeNode > maxSize) {
throw new IOException("Minimum split size pernode " + minSizeNode + " cannot be larger than maximum split size " + maxSize);
} else if (minSizeRack != 0L && maxSize != 0L && minSizeRack > maxSize) {
//略
} else {
//3.获取文件信息
List<FileStatus> stats = this.listStatus(job);
//4.创建切片容器
List<InputSplit> splits = new ArrayList();
if (stats.size() == 0) {
return splits;
} else {
//省略非主要逻辑代码
}
//5.进入返回切片执行函数,以上主要工作是获取分块大小,获取文件个数与所在位置
this.getMoreSplits(job, stats, maxSize, minSizeNode, minSizeRack, splits);
this.rackToNodes.clear();
return splits;
}
}
}
3、进入getMoreSplits,主要有两步:切块,分片
//6.进入更多切片函数,这里主要由两个步骤:虚拟切块,实际切片
private void getMoreSplits(JobContext job, List<FileStatus> stats, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) throws IOException {
Configuration conf = job.getConfiguration();
//数据块容器初始化
HashMap<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks = new HashMap();
HashMap<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes = new HashMap();
HashMap<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks = new HashMap();
//7.单个文件信息对象数组,几个文件就有几个vo
CombineFileInputFormat.OneFileInfo[] files = new CombineFileInputFormat.OneFileInfo[stats.size()];
//8.校验,文件个数不为0时开始分块
if (stats.size() != 0) {
//记录总数据量大小
long totLength = 0L;
//
int i = 0;
//9.遍历每个文件,开始切块
for(Iterator var18 = stats.iterator(); var18.hasNext(); totLength += files[i].getLength()) {
//文件信息:大小、所属人、修改时间、地址。。。。
FileStatus stat = (FileStatus)var18.next();
//10,封装文件对象,分块逻辑在此类中完成
files[i] = new CombineFileInputFormat.OneFileInfo(stat, conf, this.isSplitable(job, stat.getPath()), rackToBlocks, blockToNodes, nodeToBlocks, this.rackToNodes, maxSize);
}
//调用数据切片函数
this.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, maxSize, minSizeNode, minSizeRack, splits);
}
}
4、进入OneFileInfo,查看切块逻辑
//单个文件信息
@VisibleForTesting
static class OneFileInfo {
private long fileSize = 0L;//文件大小
//此文件所分的块容器
private CombineFileInputFormat.OneBlockInfo[] blocks;
//11.切片工作主要有此构造函数完成
OneFileInfo(FileStatus stat, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, Set<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
BlockLocation[] locations;
//12.判断文件坐落位置,也就是文件类型,这里是本地文件
if (stat instanceof LocatedFileStatus) {
//获取文件信息:大小、host,元素个数与文件个数对应
locations = ((LocatedFileStatus)stat).getBlockLocations();
} else {
FileSystem fs = stat.getPath().getFileSystem(conf);
locations = fs.getFileBlockLocations(stat, 0L, stat.getLen());
}
//文件位置为null则返回空的数据块数组,表示数据为空
if (locations == null) {
this.blocks = new CombineFileInputFormat.OneBlockInfo[0];
} else {
//判读是否有文件,是否是目录
if (locations.length == 0 && !stat.isDirectory()) {
locations = new BlockLocation[]{new BlockLocation()};
}
//13.是否需要切分
if (!isSplitable) {
//小文件直接返回
this.blocks = new CombineFileInputFormat.OneBlockInfo[1];
this.fileSize = stat.getLen();
this.blocks[0] = new CombineFileInputFormat.OneBlockInfo(stat.getPath(), 0L, this.fileSize, locations[0].getHosts(), locations[0].getTopologyPaths());
} else {
//14.开始准备切块,创建数据块容器
ArrayList<CombineFileInputFormat.OneBlockInfo> blocksList = new ArrayList(locations.length);
int i = 0;
while(true) {
if (i >= locations.length) {
this.blocks = (CombineFileInputFormat.OneBlockInfo[])blocksList.toArray(new CombineFileInputFormat.OneBlockInfo[blocksList.size()]);
break;
}
//15.初始化基本参数
//文件大小
this.fileSize += locations[i].getLength();
//待切块数据大小
long left = locations[i].getLength();
//数据块开始偏移量坐标
long myOffset = locations[i].getOffset();
//数据块大小
long myLength = 0L;
do {
//16.获取切块大小,这是一个循环操作,直到最后数据不满足切块条件
//未设置切块大小,则默认文件大小为块大小
if (maxSize == 0L) {
myLength = left;
//文件大小在设置参数 1.00000001 - 2 倍之间则平均切为两块
} else if (left > maxSize && left < 2L * maxSize) {
myLength = left / 2L;
} else {
//文件大于参数两倍以上则按照参数大小切块
//文件小于参数则默认为一块
myLength = Math.min(maxSize, left);
}
//17. 封装数据块信息
CombineFileInputFormat.OneBlockInfo oneblock = new CombineFileInputFormat.OneBlockInfo(stat.getPath(), myOffset, myLength, locations[i].getHosts(), locations[i].getTopologyPaths());
//减去以切块部分数据
left -= myLength;
//重制下块数据的偏移量坐标
myOffset += myLength;
//将切块信息对象添加到容器中
blocksList.add(oneblock);
} while(left > 0L);
++i;
}
}
//18.合并所有的数据块
populateBlockInfo(this.blocks, rackToBlocks, blockToNodes, nodeToBlocks, rackToNodes);
}
}
5、数据块对象介绍,主要就是起到封装数据块信息
//------------------------------数据块信息
/**虚拟切块信息封装类:
path::文件路径
offset:此数据块开始偏移量位置
len:数据块大小
hosts:如果是hdfs中表示数据来源于哪台服务器,这里是本地localhost
*/
@VisibleForTesting
static class OneBlockInfo {
OneBlockInfo(Path path, long offset, long len, String[] hosts, String[] topologyPaths) {
//将文件信息块位置,偏移量划分赋值
this.onepath = path;
this.offset = offset;
this.hosts = hosts;
this.length = len;
//略
}
}
6、populateBlockInfo-数据块合并到一个容器中
/**数据块合并到同一个容器中
blocks:新切分的块,待合并数据
rackToBlocks:从字面意思应该是同一机架上多上数据块为一条数据,机架为key,机架上节点所有数据块集合为value,这里本地所以只有一条数据。/default-rack为key,所有数据块集合为value
blockToNodes:数据块到节点,也就是每个数据块为key,服务器信息为value
nodeToBlocks:每台服务器上几个数据块为一条数据,host为key,数据块集合为value
rackToNodes:机架到节点的数据,看数据格式猜测应该是每个机架上有哪些机器。由于是本地环境所以此值为空
*/
@VisibleForTesting
static void populateBlockInfo(CombineFileInputFormat.OneBlockInfo[] blocks, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<String, Set<String>> rackToNodes) {
//19.需要合并数据块数组赋值给 var5
CombineFileInputFormat.OneBlockInfo[] var5 = blocks;
int var6 = blocks.length;
//20.遍历取出需要合并的块
for(int var7 = 0; var7 < var6; ++var7) {
//取出数据块
CombineFileInputFormat.OneBlockInfo oneblock = var5[var7];
//21.数据块到主机节点-数据块为key,主机地址为value
blockToNodes.put(oneblock, oneblock.hosts);
for(j = 0; j < oneblock.hosts.length; ++j) {
node = oneblock.hosts[j];
blklist = (Set)nodeToBlocks.get(node);
if (blklist == null) {
blklist = new LinkedHashSet();
//22.每个主机节点上有多少数据块-合并数据块
nodeToBlocks.put(node, blklist);
}
((Set)blklist).add(oneblock);
}
}
}
//后面做合块切片主要用这两个集合,由于又是本地环境所以其它的省略了
7、createSplits创建分片信息
void createSplits(Map<String, Set<CombineFileInputFormat.OneBlockInfo>> nodeToBlocks, Map<CombineFileInputFormat.OneBlockInfo, String[]> blockToNodes, Map<String, List<CombineFileInputFormat.OneBlockInfo>> rackToBlocks, long totLength, long maxSize, long minSizeNode, long minSizeRack, List<InputSplit> splits) {
// 23.初始化相关参数
//切片容器
ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks = new ArrayList();
//当前片大小
long curSplitSize = 0L;
int totalNodes = nodeToBlocks.size();
//总数据大小
long totalLength = totLength;
//完成切片的数据块集合
Multiset<String> splitsPerNode = HashMultiset.create();
do {
//24.迭代每台机器上的所有数据块,这里本地环境所以数据块都在一台机器上
Iterator iter = nodeToBlocks.entrySet().iterator();
while(true) {
while(true) {
Entry one;
String node;
do {
//没有下一个数据块则跳出此内循环
if (!iter.hasNext()) {
continue label170;
}
//25.获取数据所有数据块 (localhost,blocks 9 )此机器上的9个数据块
one = (Entry)iter.next();
//获取key,也就是机器位置:localhost
node = (String)one.getKey();
//判断是否已经取出过
} while(completedNodes.contains(node));
//26.获取数据块信息集合
Set<CombineFileInputFormat.OneBlockInfo> blocksInCurrentNode = (Set)one.getValue();
//获取迭代器
Iterator oneBlockIter = blocksInCurrentNode.iterator();
//27.迭代数据块
while(oneBlockIter.hasNext()) {
//获取数据块
CombineFileInputFormat.OneBlockInfo oneblock = (CombineFileInputFormat.OneBlockInfo)oneBlockIter.next();
//27.判断数据块是否是已经完成切片
//blockToNodes:所有数据块容器
if (!blockToNodes.containsKey(oneblock)) {
//移除切片完成的数据块
oneBlockIter.remove();
} else {
//28.添加到需要切片的容器中
validBlocks.add(oneblock);
//移除检验通过数据块
blockToNodes.remove(oneblock);
//29.切片数据大小
curSplitSize += oneblock.length;
/**判断切片数据是否符合要求
1)正好是切块参数大小,则切片
2)小于切片则与下一个数据块合并,直至不小于maxSize时进行切片,也就是小数据块合并切片
3)小数据块,等待与下个数据块合并满足条件后切片
*/
if (maxSize != 0L && curSplitSize >= maxSize) {
//39.满足切片条件,进行切片
this.addCreatedSplit(splits, Collections.singleton(node), validBlocks);
//切片完成减去已经切片的数据大小
totalLength -= curSplitSize;
//重置切片大小
curSplitSize = 0L;
//已经切片的数据块集合
splitsPerNode.add(node);
//移除完成切片数据
blocksInCurrentNode.removeAll(validBlocks);
//清空校验切片容器
validBlocks.clear();
break;
}
}
}
}
}
} while(completedNodes.size() != totalNodes && totalLength != 0L);
//以下非主要逻辑代码省略
}
7、addCreatedSplit-满足切片条件,创建切片信息
private void addCreatedSplit(List<InputSplit> splitList, Collection<String> locations, ArrayList<CombineFileInputFormat.OneBlockInfo> validBlocks) {
Path[] fl = new Path[validBlocks.size()];
long[] offset = new long[validBlocks.size()];
long[] length = new long[validBlocks.size()];
//40.取出需要切片的数据块信息
//因为存在多个小数据块在同一个切片中所以需要保存多个数据块的信息
for(int i = 0; i < validBlocks.size(); ++i) {
fl[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).onepath;
offset[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).offset;
length[i] = ((CombineFileInputFormat.OneBlockInfo)validBlocks.get(i)).length;
}
//41.封装切片信息对象
CombineFileSplit thissplit = new CombineFileSplit(fl, offset, length, (String[])locations.toArray(new String[0]));
splitList.add(thissplit);
}
8、CombineFileSplit切片信息类查看
//42. 切片对象构造器,初始化切片信息系
public CombineFileSplit(Path[] files, long[] start,
long[] lengths, String[] locations) {
//43.实际初始化函数
initSplit(files, start, lengths, locations);
}
/**43.数据片信息封装
初始化切片信息实际工作函数
files:文件位置
start:切片开始偏移量
lengths:数据块大小,也就是需要从文件上切多大数据
locations:服务器位置
以上参数之所以是数组接收,是因为有两个数据块信息切分到一个片中
*/
private void initSplit(Path[] files, long[] start,
long[] lengths, String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = locations;
for(long length : lengths) {
totLength += length;
}
}
9、执行过程
先将所有文件切块,合并数据块到一个容器中,在对数据块检验并切片,最后返回切片信息集合
10、日志查看
测试文件:
c2.txt 2.4MB
c7.txt 7.2MB
c14.txt 22.9MB
按照4MB大小切块:2.4、3.6、3.6、4、4、4、4、4、2.9
合并切分数据片:(2.4+4)、4、4、4、(3.6+3.6),(2.9+4)
如下图片
运行日志:
2020-05-12 00:16:47,895 INFO [org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat] - DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 0
2020-05-12 00:23:14,387 INFO [org.apache.hadoop.mapreduce.JobSubmitter] - number of splits:6