Hadoop平臺(tái)簡(jiǎn)介-肖韜南京大學(xué)計(jì)算機(jī)系.ppt
《Hadoop平臺(tái)簡(jiǎn)介-肖韜南京大學(xué)計(jì)算機(jī)系.ppt》由會(huì)員分享,可在線閱讀,更多相關(guān)《Hadoop平臺(tái)簡(jiǎn)介-肖韜南京大學(xué)計(jì)算機(jī)系.ppt(44頁(yè)珍藏版)》請(qǐng)?jiān)谘b配圖網(wǎng)上搜索。
Hadoop平臺(tái)簡(jiǎn)介 肖韜南京大學(xué)計(jì)算機(jī)科學(xué)與技術(shù)系2010 使用Hadoop的JavaAPI接口 在Hadoop文件系統(tǒng)中的文件是由一個(gè)HadoopPath對(duì)象來(lái)表示的 可以把一個(gè)Path對(duì)象想象成一個(gè)Hadoop文件系統(tǒng)的URI 例如hdfs localhost 9000 user xt input text dat 通過(guò)2個(gè)靜態(tài)工廠方法從抽象的Hadoop文件系統(tǒng)中抽取出一個(gè)具體的實(shí)現(xiàn)的實(shí)例 publicstaticFileSystemget Configurationconf throwsIOException 返回默認(rèn)的文件系統(tǒng) 在conf core site xml中指定 或者本地的文件系統(tǒng) 如果該文件中沒(méi)有指定 publicstaticFileSystemget URIuri Configurationconf throwsIOException 返回由uri決定的文件系統(tǒng) 或者默認(rèn)的文件系統(tǒng) 如果uri無(wú)效 新舊API變化的對(duì)比 以0 20 0版本為分水嶺 有一些API在新的版本中被舍棄了 且推薦不使用 而是改為使用新的API下面將以WordCount程序?yàn)槔M(jìn)行說(shuō)明 0 20 0之前的WordCount程序 publicWordCount publicstaticvoidmain String args throwsThrowable JobConfconf newJobConf WordCount class conf setJobName ASampleWordCountExample FileInputFormat addInputPath conf newPath args 0 FileInputFormat setOutputPath conf newPath args 1 conf setMapperClass WordCountMapper class conf setReducerClass WordCountReducer class conf setOutputKeyClass Text class conf setOutputValueClass IntWritable class JobClient runJob conf classWordCountMapperextendsMapReduceBaseimplementsMapper publicvoidmap LongWritableoffset Textline OutputCollectorcollector Reporterreporter throwsIOException StringTokenizertokenzier newStringTokenizer line toString while tokenizer hasMoreTokens collector collect newText tokenizer nextToken newIntWritable 1 classWordCountReducerextendsMapReduceBaseimplementsReducer publicvoidreduce Textword Iteratorcounts OutputCollectorcollector Reporterreporter throwsIOException intsum 0 while counts hasNext sum counts next get collector collect word newIntWritable sum 0 20 0之后的WordCount程序 publicclassWordCount publicstaticvoidmain String args throwException Configurationconf newConfiguration Jobjob newJob conf ASampleWordCountExample job setJarByClass WordCount class job setMapperClass WordCountMapper class job setReducerClass WordCountReducer class job setOutputKeyClass Text class job setOutputValueClass IntWritable class FileInputFormat addInputPath job newPath args 0 FileOutputFormat setOutputPath job newPath args 1 job waitForCompletion true classWordCountMapperextendsMapper publicvoidmap LongWritableoffset Textline Contextcontext throwsIOException InterruptedException StringTokenizertokenizer newStringTokenizer line toString while tokenizer hasMoreTokens context write newText tokenizer nextToken newIntWritable 1 classWordCountReducerextendsReducer publicvoidreduce Textword Iteratorcounts Contextcontext throwsIOException InterruptedException intsum 0 while counts hasNext sum counts next get context write word newIntWritable sum ShuffleandSort MapReduce保證每一個(gè)reducetask的輸入基于key排序的 MapReducemakestheguaranteethattheinputtoeveryreducerissortedbykey 系統(tǒng)進(jìn)行排序的過(guò)程 包括將map的輸出轉(zhuǎn)換為reduce的輸入 被稱(chēng)為shuffle Theprocessbywhichthesystemperformsthesort andtransfersthemapoutputstothereducerasinputs isknownastheshuffle shuffle過(guò)程maptask中生成了3個(gè)spillfile 每個(gè)spillfile中有3個(gè)partition shuffle過(guò)程 maptaskside 當(dāng)一個(gè)maptask開(kāi)始產(chǎn)生它的輸出時(shí) 輸出并非不經(jīng)處理被直接就寫(xiě)到磁盤(pán)上去的 每一個(gè)maptask都有一個(gè)circularmemorybuffer 缺省大小為100MB maptask會(huì)將它產(chǎn)生的輸出 key valuepairs 寫(xiě)入到它的memorybuffer中去 當(dāng)maptask寫(xiě)入到memorybuffer的數(shù)據(jù)占memorybuffer的大小百分比到達(dá)一個(gè)閾值 缺省為80 時(shí) 一個(gè)backgroundthread 記為thread 將開(kāi)始把memorybuffer中的內(nèi)容spill到磁盤(pán)上去 在thread將memorybuffer中的數(shù)據(jù)spill到磁盤(pán)中之前 thread首先將這些數(shù)據(jù)分成若干partition 每一個(gè)partition將被發(fā)送至一個(gè)reducer 在每一個(gè)partition內(nèi) thread將根據(jù)key對(duì)該partition內(nèi)的數(shù)據(jù) 即key valuepairs 進(jìn)行in memorysort 如果指定了combinerfunction 那么該combinerfunction將會(huì)被作用于上述in memorysort的輸出 每當(dāng)memorybuffer中的數(shù)據(jù)達(dá)到一個(gè)閾值時(shí) 就會(huì)產(chǎn)生一個(gè)spillfile 所以在maptask輸出了所有的record之后 就會(huì)存在多個(gè)spillfiles 1個(gè)record即1個(gè)key valuepair 在maptask結(jié)束之前 所有的spillfiles將被merge到一個(gè)單獨(dú)的outputfile中 該outputfile在結(jié)構(gòu)上由多個(gè)partition組成 每一個(gè)partition內(nèi)的數(shù)據(jù)都是排好序的 且每一個(gè)partition將被送至對(duì)應(yīng)的一個(gè)reducetask 如果指定了combinerfunction并且spill的數(shù)量不低于3個(gè) 那么在生成outputfile之前 combinerfunction將會(huì)作用于將要被寫(xiě)入到outputfile里的每一個(gè)partition內(nèi)的數(shù)據(jù) reducetaskside maptask的輸出存儲(chǔ)在maptask節(jié)點(diǎn)所在機(jī)器的本地文件系統(tǒng)中 reducetask會(huì)自己所需的某個(gè)partition數(shù)據(jù)復(fù)制到自己所在的HDFS中 且一個(gè)reducetask將會(huì)從多個(gè)maptask復(fù)制其所需要的partition 這些partition都是同一類(lèi)的 reducer怎樣知道從哪些maptasktracker那里去取自己所需要的partition 亦即maptask的輸出 當(dāng)maptask成功完成后 它會(huì)將狀態(tài)更新通知它所屬的tasktracker 該tasktracker進(jìn)而又會(huì)通知其所屬的jobtracker 這些通知是通過(guò)heartbeat通信機(jī)制實(shí)現(xiàn)的 這樣 對(duì)于一個(gè)job而言 jobtracker知道m(xù)apoutput與tasktracker之間的映射關(guān)系 reducer中的一個(gè)線程會(huì)周期性地向jobtracker詢(xún)問(wèn)mapoutput所在的位置 直到該reducer接收了所有的mapoutput combinerfunction與partitionerfunction 當(dāng)存在多個(gè)reducer時(shí) maptasks將會(huì)對(duì)它們的輸出進(jìn)行partition 每一個(gè)masktask都會(huì)為每一個(gè)reducetask生成一個(gè)partition 在每一個(gè)partition內(nèi)都可能會(huì)有很多keys 以及相應(yīng)的values 但是對(duì)于任一個(gè)key而言 它的records都在一個(gè)partition內(nèi) partition的過(guò)程可以由用戶(hù)定義的partitioning函數(shù)來(lái)控制 但是一般來(lái)說(shuō) 默認(rèn)的partitioner函數(shù) 根據(jù)key進(jìn)行hash映射 已經(jīng)可以令人滿意 存在多個(gè)reducetask時(shí)的partitioningpartition的數(shù)量與reducer的數(shù)量是一致的 定制個(gè)性化的partitioner 自定義的partitionerfunction需要繼承于一個(gè)抽象類(lèi)Partitionercontrolsthepartitioninigofthekeysoftheintermediatemap outputs Thekey orasubsetofkey isusedtoderivethepartition typicallybyahashfunction Thiscontrolswhichofthemreducetaskstheintermediatekey andhencetherecord issentforreduction 實(shí)現(xiàn)Partitioner中的getPartition函數(shù)原型abstractintgetPartition KEYkey VALUEvalue intnumPatitions 其中 key和value是mapper輸出的intermediateoutput 例如 在WordCount例子中就分別是word與1 numPartitions是reducers的數(shù)量 返回值是該record將被發(fā)送至的reducer的編號(hào) 0 1 m 1 指定多個(gè)reducers bin hadoopjarWordCount Dmapred reduce tasks 3inputoutput這樣 在reduce階段會(huì)有3個(gè)reducetasks運(yùn)行 speculativeexecution 默認(rèn)打開(kāi) 當(dāng)多個(gè)task并行運(yùn)行時(shí) 可能若干個(gè)task運(yùn)行明顯比其他task要慢 這種情況下 Hadoop將會(huì)為這些運(yùn)行較慢的task啟動(dòng)一個(gè)相同的backuptask 稱(chēng)為speculativeexecution 一個(gè)task及其speculativetask不會(huì)同時(shí)運(yùn)行 以避免競(jìng)爭(zhēng) 在一個(gè)job的所有task都已經(jīng)啟動(dòng)的情況下 對(duì)于那些同時(shí)滿足1 已經(jīng)運(yùn)行了一段時(shí)間 至少1分鐘 2 運(yùn)行的速度明顯慢于其余task的平均速度的task 一個(gè)speculativetask才會(huì)被啟動(dòng) 對(duì)于originaltask及其speculativetask而言 如果任何一方先運(yùn)行結(jié)束 則另一方將被killed Skippingbadrecords 當(dāng)一個(gè)task失敗時(shí) 原因可能是硬件故障 待處理數(shù)據(jù)非法等 該task將會(huì)被retried 但是如果該task失敗的次數(shù)達(dá)到4次 那么該task所屬的整個(gè)job就將被標(biāo)記為failed 當(dāng)maptask讀到一個(gè)badrecord時(shí) 可能會(huì)因?yàn)閽伋霎惓6?進(jìn)而整個(gè)job可能會(huì)失敗 有時(shí) 第三方的庫(kù)可能有bug 導(dǎo)致task因讀取了某個(gè)badrecord而失敗 而這個(gè)第三方的庫(kù)又無(wú)法修改 這時(shí) 可以使用Hadoop的skipmode 以使得讀取輸入文件使自動(dòng)地跳過(guò)badrecords 在打開(kāi)了skippingmode之后 task會(huì)將其所處理的records報(bào)告給tasktracker 當(dāng)task失敗時(shí) tasktracker會(huì)retry該task 并跳過(guò)引起失敗的records 為了減少skippingmode帶來(lái)的帶寬及記賬信息 bookkeeping 的消耗 當(dāng)一個(gè)task失敗達(dá)到2次時(shí) 才會(huì)開(kāi)啟skippingmode 如果一個(gè)task因?yàn)槟硞€(gè)badrecord而持續(xù)地失敗 那么tasktracker將會(huì)以下列的結(jié)果執(zhí)行taskattempts task失敗 task再次失敗 skippingmode被打開(kāi) task仍然失敗 但是badrecord被tasktracker記錄下來(lái) skippingmode處于使能狀態(tài) task因?yàn)樘^(guò)了前面導(dǎo)致失敗的badrecord而成功 skippingmode是默認(rèn)關(guān)閉的 注意 對(duì)于每一個(gè)taskattempt skippingmode只能發(fā)現(xiàn)一個(gè)badrecord Taskside effectfiles 要保證一個(gè)task的多個(gè)instance不會(huì)試圖向同一個(gè)文件進(jìn)行寫(xiě)操作 1 如果某個(gè)task失敗了 失敗前已經(jīng)向輸出文件中寫(xiě)了一部分?jǐn)?shù)據(jù) 那么當(dāng)其再次運(yùn)行 retry 時(shí) 必須先將舊的文件刪掉 2 當(dāng)speculativeexecution被使能時(shí) 某個(gè)originaltask與它的speculativetask可能會(huì)試圖向同一個(gè)文件進(jìn)行寫(xiě)操作 Hadoop為每一個(gè)taskattempt指定了一個(gè)臨時(shí)目錄 每一個(gè)taskattempt的輸出就會(huì)被寫(xiě)到這個(gè)目錄中去 從而避免了上述的問(wèn)題 這個(gè)目錄就是 mapred output dir InputFormat map k1 v1 list k2 v2 combine k2 list v2 list k2 v2 reduce k2 list v2 list k3 v3 可以看出 如果使用combiner 那么它的輸入 輸出格式與reducer是完全一樣的 同時(shí)也是Reducer的子類(lèi) 只不過(guò)combiner的輸出是intermediatekey valuepairs 這將是reducer的輸入 Inputtypes由Inputformat決定 例如TextInputFormat決定了輸入的key的類(lèi)型是LongWritable 首字符在文件中的偏移量 value的類(lèi)型是Text 一行文本內(nèi)容 如果希望產(chǎn)生其他類(lèi)型的輸入 可以顯式地調(diào)用JobConf的方法 否則 若不顯式地 setexplicitly 設(shè)置 則不論是否使用combiner intermediatetypes默認(rèn)與最終的輸出類(lèi)型相同 即LongWritable與Text 所以 若k2和k3相同 則不需要調(diào)用setMapKeyOutputClass 因?yàn)閕ntermediatekeytype已經(jīng)被setOutputKeyClass 設(shè)置好了 同理 若v2和v3相同 則只需要調(diào)用setOutputValueClass 即可 為什么要為intermediatekey valuepairs和最終的output指定類(lèi)型 似乎通過(guò)mapper與reducer就可以確定intermediatekey valuepairs和最終的output的類(lèi)型了 原因 Java的泛型機(jī)制中的typeerasure使得這些類(lèi)型信息在運(yùn)行時(shí)是不可知的 所以必須顯式地為Hadoop指定這些類(lèi)型 InputFormatclasshierarchy InputSplit 什么是inputsplit 1個(gè)inputsplit是inputfile中的1個(gè)chunk 該chunk將被1個(gè)單獨(dú)的map進(jìn)行處理 每一個(gè)map處理一個(gè)inputsplit 每一個(gè)split可被劃分為若干records 1個(gè)record即1個(gè)key valuepair map依次處理每一個(gè)record Inputsplit由一個(gè)Java抽象類(lèi)代表 即org apache hadoop mapreduce abstractclassInputSplit InputSplitrepresentsthedatatobeprocessedbyanindividualmapper Typically itpresentsabyte orientedviewontheinputandistheresponsibilityofRecordReaderofthejobtoprocessthisandpresentarecord orientedview 注意 InputSplit并不包含inputdata 而只是指向inputdata的一個(gè)reference Map Reduce系統(tǒng)利用getLocations 所得到的storagelocations信息來(lái)將maptasks放置在盡可能靠近inputsplit數(shù)據(jù)的地方 利用getLength 得到的size信息對(duì)splits進(jìn)行排序 使得最大的spilt先被處理 試圖來(lái)最小化job的運(yùn)行時(shí)間 Inputfile inputsplitandrecord inputfile Inputsplit record key valuepair MapReduce應(yīng)用程序開(kāi)發(fā)者不需要直接處理InputSplit 因?yàn)樗怯梢粋€(gè)InputFormat生成的 InputFormat負(fù)責(zé)生成inputsplits 并把它們劃分為records 0 20 0之前的定義如下publicinterfaceInputFormat InputSplit getSplits JobConfjob intnumSplits throwsIOException RecordReadergetRecordReader InputSplitsplit JobConfjob Reporterreporter throwsIOException 其實(shí)跟新的還是很類(lèi)似的 對(duì)于舊版InputFormat的解釋 TheJobClientcallsthegetSplits method passingthedesirednumberofmaptasksasthenumSplitsargument Thisnumberistreatedasahint asInputFormatimplementationsarefreetoreturnadifferentnumberofsplitstothenumberspecifiedinnumSplits Havingcalculatedthesplits theclientsendsthemtothejobtracker whichusestheirstoragelocationstoschedulemaptaskstoprocessthemonthetasktrackers Onatasktracker themaptaskpassesthesplittothegetRecordReader methodonInputFormattoobtainaRecordReaderforthatsplit ARecordReaderislittlemorethananiteratoroverrecords andthemaptaskusesonetogeneraterecordkey valuepairs whichitpassestothemapfunction TheabstractInputFormatclass TheMap ReduceframworkreliesontheInputFormatofthejobto 1 Validatetheinput specificationofthejob 2 Split uptheinputfile s intologicalInputSplits eachofwhichisthenassignedtoanindividualMapper ProvidetheRecordReaderimplementationtobeusedtogleaninputrecordsfromlogicalInputSplitforprocessingbyaMapper org apache hadoop mapredInterfaceRecordReader RecordReaderreadspairsfromanInputSplit RecordReader typically convertsthebyte orientedviewoftheinputprovidedbytheInputSplit andpresentsarecord orientedviewfortheMapper Reducertasksforprocessing Itthusassumestheresponsibilityofprocessingboundariesandpresentingthetaskswithkeys values MapRunnable MaptasksarerunbyMapRunner thedefaultimplementationofMapRunnablethatcallstheMapper smap methodsequentiallywitheachrecord NotethatMapRunnerisonlywayofrunningmappers MultithreadedMapRunnerisanotherimplementationoftheMapRunnableinterfacethatrunsmappersconcurrentlyinaconfigurablenumberofthreads setbymapred map multithreadedrunner threads FileInputFormat FileInputFormat提供了 1 對(duì)一個(gè)job的輸入路徑的定義2 為inputfiles產(chǎn)生splits的實(shí)現(xiàn)注意 輸入路徑不應(yīng)該包含子目錄 而只包含文件 因?yàn)镮nputFormat不會(huì)自動(dòng)解析子目錄 而是將其當(dāng)作一個(gè)文件 對(duì)于給定的若干文件 FileInputFormat怎樣將它們變?yōu)閟plits呢 FileInputFormat只對(duì) 大文件 進(jìn)行split 這里的 大 是指比HDFS的一個(gè)block還要大 Splitsize通常就等于一個(gè)HDFSblock的大小 MapReduce中的所有數(shù)據(jù)元素都是不可修改的 AlldataelementsinMapReduceareimmutable meaningthattheycannotbeupdated Ifinamappingtaskyouchangeaninput key value pair itdoesnotgetreflectedbackintheinputfiles Communicationoccursonlybygeneratingnewoutput key value pairswhicharethenforwardedbytheHadoopsystemintothenextphaseofexecution- 1.請(qǐng)仔細(xì)閱讀文檔,確保文檔完整性,對(duì)于不預(yù)覽、不比對(duì)內(nèi)容而直接下載帶來(lái)的問(wèn)題本站不予受理。
- 2.下載的文檔,不會(huì)出現(xiàn)我們的網(wǎng)址水印。
- 3、該文檔所得收入(下載+內(nèi)容+預(yù)覽)歸上傳者、原創(chuàng)作者;如果您是本文檔原作者,請(qǐng)點(diǎn)此認(rèn)領(lǐng)!既往收益都?xì)w您。
下載文檔到電腦,查找使用更方便
9.9 積分
下載 |
- 配套講稿:
如PPT文件的首頁(yè)顯示word圖標(biāo),表示該P(yáng)PT已包含配套word講稿。雙擊word圖標(biāo)可打開(kāi)word文檔。
- 特殊限制:
部分文檔作品中含有的國(guó)旗、國(guó)徽等圖片,僅作為作品整體效果示例展示,禁止商用。設(shè)計(jì)者僅對(duì)作品中獨(dú)創(chuàng)性部分享有著作權(quán)。
- 關(guān) 鍵 詞:
- Hadoop 平臺(tái) 簡(jiǎn)介 南京大學(xué) 計(jì)算機(jī)系
鏈接地址:http://www.szxfmmzy.com/p-6349373.html