自己學(xué)習(xí)排序和二次排序的知識(shí)整理如下。
1.Hadoop的序列化格式介紹:Writable
2.Hadoop的key排序邏輯
3.全排序
4.如何自定義自己的Writable類型
5.如何實(shí)現(xiàn)二次排序
1.Hadoop的序列化格式介紹:Writable
要了解和編寫MR實(shí)現(xiàn)排序必須要知道的第一個(gè)知識(shí)點(diǎn)就是Writable相關(guān)的接口和類,這些是HADOOP自己的序列化格式。更多的可能是要關(guān)注他的Subinterfaces:WritableComparable<T>。他是繼承Writable和Comparable<T>接口,繼而WritableComparable<T>的實(shí)現(xiàn)除了具有序列化特性,更重要的是具有了比較的特性,而比較的特性在MapReduce里是很重要的,因?yàn)镸R中有個(gè)基于鍵的排序過程,所以可以作為鍵的類型必須具有Comparable<T>的特性。
除了WritableComparable接口外,還有一個(gè)接口RawComparaotor。
WritableComparable和RawComparator兩個(gè)接口的區(qū)別是:
WritableComparable是需要把數(shù)據(jù)流反序列化為對(duì)象后,然后做對(duì)象之間的比較,而RawComparator是直接比較數(shù)據(jù)流的數(shù)據(jù),不需要數(shù)據(jù)流反序列化成對(duì)象,省去了新建對(duì)象的開銷。
Hadoop本身Key的數(shù)據(jù)類型的排序邏輯其實(shí)就是依賴于Hadoop本身的繼承與WritableComparable<T>的基本數(shù)據(jù)類型和其他類型(相關(guān)類型可參考《Hadoop權(quán)威指南》第二版的90頁)的compareTo方法的定義。
Key排序的規(guī)則:
1.如果調(diào)用jobconf的setOutputKeyComparatorClass()設(shè)置mapred.output.key.comparator.class
2.否則,使用key已經(jīng)登記的comparator
3.否則,實(shí)現(xiàn)接口WritableComparable的compareTo()函數(shù)來操作
例如IntWritable的比較算法如下:
-
public
int
compareTo(Objecto){
-
int
thisValue=
this
.value;
-
int
thatValue=((IntWritable)o).value;
-
return
(thisValue<thatValue?-
1
:(thisValue==thatValue?
0
:
1
));
-
}
可以修改compareTo來實(shí)現(xiàn)自己所需的比較算法。
雖然我們知道是compareTo這個(gè)方法實(shí)現(xiàn)Key的排序,但其實(shí)我們?cè)谑褂肏adoop的基本數(shù)據(jù)類型時(shí)不需要關(guān)注這個(gè)排序如何實(shí)現(xiàn),因?yàn)镠adoop的框架會(huì)自動(dòng)調(diào)用compareTo這個(gè)方法實(shí)現(xiàn)key的排序。但是這個(gè)排序只是局限在map或者reduce內(nèi)部。針對(duì)于map與map,reduce與reduce之間的排序compareTo就管不著了,雖然這種情況不常出現(xiàn),但是確實(shí)存在這種問題的,而且確實(shí)有適用場(chǎng)景,比如說全排序。
3.全排序
這里就需要關(guān)注Partition這個(gè)階段,Partition階段是針對(duì)每個(gè)Reduce,需要?jiǎng)?chuàng)建一個(gè)分區(qū),然后把Map的輸出結(jié)果映射到特定的分區(qū)中。這個(gè)分區(qū)中可能會(huì)有N個(gè)Key對(duì)應(yīng)的數(shù)據(jù),但是一個(gè)Key的所有數(shù)據(jù)只能在一個(gè)分區(qū)中。在實(shí)現(xiàn)全排序的過程中,如果只有一個(gè)reduce,也就是只有一個(gè)Partition,那么所有Map的輸出都會(huì)經(jīng)過一個(gè)Partition到一個(gè)reduce里,在一個(gè)reduce里可以根據(jù)compareTo(也可以采用其他比較算法)來排序,實(shí)現(xiàn)全排序。但是這種情況就讓MapReduce失去了分布式計(jì)算的光環(huán)。
所以全排序的大概思路為:確保Partition之間是有序的就OK了,即保證Partition1的最大值小于Partition2的最小值就OK了,即便這樣做也還是有個(gè)問題:Partition的分布不均,可能導(dǎo)致某些Partition處理的數(shù)據(jù)量遠(yuǎn)大于其他Partition處理的數(shù)據(jù)量。而實(shí)現(xiàn)全排序的核心步驟為:取樣和Partition。
先“取樣”,保證Partition得更均勻:
1) 對(duì)Math.min(10, splits.length)個(gè)split(輸入分片)進(jìn)行隨機(jī)取樣,對(duì)每個(gè)split取10000個(gè)樣,總共10萬個(gè)樣
2) 10萬個(gè)樣排序,根據(jù)reducer的數(shù)量(n),取出間隔平均的n-1個(gè)樣
3) 將這個(gè)n-1個(gè)樣寫入partitionFile(_partition.lst,是一個(gè)SequenceFile),key是取的樣,值是nullValue
4) 將partitionFile寫入DistributedCache
4.如何自定義自己的Writable類型
自定義自己的Writable類型的場(chǎng)景應(yīng)該很簡(jiǎn)單:Hadoop自帶的數(shù)據(jù)類型要么在功能上不能滿足需求,要么在性能上滿足需求,畢竟Hadoop還在發(fā)展,不是所有情況都考慮的,但是他提供了自主的框架實(shí)現(xiàn)我們想要的功能。
定義自己的Writable類型需要實(shí)現(xiàn):
a.重載構(gòu)造函數(shù)
b.實(shí)現(xiàn)set和get方法
c.實(shí)現(xiàn)接口的方法:write()、readFields()、compareTo()
d.(可選)相當(dāng)于JAVA構(gòu)造的對(duì)象,重寫java.lang.Object的hashCode()、equals()、toString()。Partition階段默認(rèn)的hashpartitioner會(huì)根據(jù)hashCode()來選擇分區(qū),如果不要對(duì)自定義類型做key進(jìn)行分區(qū),hashCode()可不實(shí)現(xiàn)
具體例子可參考hadoop的基本類型IntWritable的實(shí)現(xiàn)
-
public
class
IntWritable
implements
WritableComparable{
-
private
int
value;
-
-
public
IntWritable(){}
-
-
public
IntWritable(
int
value){set(value);}
-
-
-
public
void
set(
int
value){
this
.value=value;}
-
-
-
public
int
get(){
return
value;}
-
-
public
void
readFields(DataInputin)
throws
IOException{
-
value=in.readInt();
-
}
-
-
public
void
write(DataOutputout)
throws
IOException{
-
out.writeInt(value);
-
}
-
-
-
public
boolean
equals(Objecto){
-
if
(!(o
instanceof
IntWritable))
-
return
false
;
-
IntWritableother=(IntWritable)o;
-
return
this
.value==other.value;
-
}
-
-
public
int
hashCode(){
-
return
value;
-
}
-
-
-
public
int
compareTo(Objecto){
-
int
thisValue=
this
.value;
-
int
thatValue=((IntWritable)o).value;
-
return
(thisValue<thatValue?-
1
:(thisValue==thatValue?
0
:
1
));
-
}
-
-
public
StringtoString(){
-
return
Integer.toString(value);
-
}
-
}
5.如何實(shí)現(xiàn)二次排序
二次排序的工作原理涉及到如下幾方面:
a.創(chuàng)建key的數(shù)據(jù)類型,key要包括兩次排序的元素
b.setPartitionerClass(Class<? extends Partitioner> theClass)
hadoop0.20.0以后的函數(shù)為setPartitionerClass
c.setOutputKeyComparatorClass(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函數(shù)為
setSortComparatorClass
d.setOutputValueGroupingComparator(Class<? extends RawComparator> theClass)
hadoop0.20.0以后的函數(shù)為
setGroupingComparatorClass
根據(jù)hadoop自己提供的example:org.apache.hadoop.examplesSecondarySort來說明二次排序具體是如何實(shí)現(xiàn)的.
SecondarySort實(shí)現(xiàn)IntPair、FirstPartitioner、FirstGroupingComparator、MapClass、Reduce這幾個(gè)內(nèi)部類,然后在main函數(shù)中調(diào)用。先說明下main函數(shù)中有哪些地方和普通的MR代碼不同。
不同點(diǎn)是多了這兩個(gè)set:
job.setPartitionerClass(FirstPartitioner.class);
設(shè)置自定義的Partition操作,在此是調(diào)用我們自定義的內(nèi)部類
FirstPartitioner
job.setGroupingComparatorClass(FirstGroupingComparator.class);
設(shè)置哪些value進(jìn)入哪些key的迭代器中,在此是調(diào)用自定義的內(nèi)部類
FirstGroupingComparator
具體的操作邏輯為:
a.定義一個(gè)作為key的類型IntPair,在IntPair中有兩個(gè)變量first、second,SecondarySort就是在對(duì)first排序后再對(duì)second再排序處理
b.定義分區(qū)函數(shù)類FirstPartitioner,Key的第一次排序。在FirstPartitioner實(shí)現(xiàn)如何處理key的first,把key對(duì)應(yīng)的數(shù)據(jù)劃分到不同的分區(qū)中。這樣key中first相同的value會(huì)被放在同一個(gè)reduce中,在reduce中再做第二次排序
c(代碼沒有實(shí)現(xiàn),其實(shí)內(nèi)部是有處理).key比較函數(shù)類,key的第二次排序,是繼承WritableComparator的一個(gè)比較器。
setSortComparatorClass可以實(shí)現(xiàn)。
為什么沒有使用
setSortComparatorClass()是因?yàn)閔adoop對(duì)key排序的規(guī)則(參看
2.Hadoop的key排序邏輯
)決定的。由于我們?cè)贗ntPair中已經(jīng)定義了compareTo()函數(shù)。
d.定義分組函數(shù)類
FirstGroupingComparator,
保證只要key的的第一部分相同,value就進(jìn)入key的value迭代器中。