Hadoop平臺簡介-肖韜南京大學計算機系.ppt
《Hadoop平臺簡介-肖韜南京大學計算機系.ppt》由會員分享,可在線閱讀,更多相關《Hadoop平臺簡介-肖韜南京大學計算機系.ppt(44頁珍藏版)》請在裝配圖網上搜索。
Hadoop平臺簡介 肖韜南京大學計算機科學與技術系2010 使用Hadoop的JavaAPI接口 在Hadoop文件系統(tǒng)中的文件是由一個HadoopPath對象來表示的 可以把一個Path對象想象成一個Hadoop文件系統(tǒng)的URI 例如hdfs localhost 9000 user xt input text dat 通過2個靜態(tài)工廠方法從抽象的Hadoop文件系統(tǒng)中抽取出一個具體的實現(xiàn)的實例 publicstaticFileSystemget Configurationconf throwsIOException 返回默認的文件系統(tǒng) 在conf core site xml中指定 或者本地的文件系統(tǒng) 如果該文件中沒有指定 publicstaticFileSystemget URIuri Configurationconf throwsIOException 返回由uri決定的文件系統(tǒng) 或者默認的文件系統(tǒng) 如果uri無效 新舊API變化的對比 以0 20 0版本為分水嶺 有一些API在新的版本中被舍棄了 且推薦不使用 而是改為使用新的API下面將以WordCount程序為例進行說明 0 20 0之前的WordCount程序 publicWordCount publicstaticvoidmain String args throwsThrowable JobConfconf newJobConf WordCount class conf setJobName ASampleWordCountExample FileInputFormat addInputPath conf newPath args 0 FileInputFormat setOutputPath conf newPath args 1 conf setMapperClass WordCountMapper class conf setReducerClass WordCountReducer class conf setOutputKeyClass Text class conf setOutputValueClass IntWritable class JobClient runJob conf classWordCountMapperextendsMapReduceBaseimplementsMapper publicvoidmap LongWritableoffset Textline OutputCollectorcollector Reporterreporter throwsIOException StringTokenizertokenzier newStringTokenizer line toString while tokenizer hasMoreTokens collector collect newText tokenizer nextToken newIntWritable 1 classWordCountReducerextendsMapReduceBaseimplementsReducer publicvoidreduce Textword Iteratorcounts OutputCollectorcollector Reporterreporter throwsIOException intsum 0 while counts hasNext sum counts next get collector collect word newIntWritable sum 0 20 0之后的WordCount程序 publicclassWordCount publicstaticvoidmain String args throwException Configurationconf newConfiguration Jobjob newJob conf ASampleWordCountExample job setJarByClass WordCount class job setMapperClass WordCountMapper class job setReducerClass WordCountReducer class job setOutputKeyClass Text class job setOutputValueClass IntWritable class FileInputFormat addInputPath job newPath args 0 FileOutputFormat setOutputPath job newPath args 1 job waitForCompletion true classWordCountMapperextendsMapper publicvoidmap LongWritableoffset Textline Contextcontext throwsIOException InterruptedException StringTokenizertokenizer newStringTokenizer line toString while tokenizer hasMoreTokens context write newText tokenizer nextToken newIntWritable 1 classWordCountReducerextendsReducer publicvoidreduce Textword Iteratorcounts Contextcontext throwsIOException InterruptedException intsum 0 while counts hasNext sum counts next get context write word newIntWritable sum ShuffleandSort MapReduce保證每一個reducetask的輸入基于key排序的 MapReducemakestheguaranteethattheinputtoeveryreducerissortedbykey 系統(tǒng)進行排序的過程 包括將map的輸出轉換為reduce的輸入 被稱為shuffle Theprocessbywhichthesystemperformsthesort andtransfersthemapoutputstothereducerasinputs isknownastheshuffle shuffle過程maptask中生成了3個spillfile 每個spillfile中有3個partition shuffle過程 maptaskside 當一個maptask開始產生它的輸出時 輸出并非不經處理被直接就寫到磁盤上去的 每一個maptask都有一個circularmemorybuffer 缺省大小為100MB maptask會將它產生的輸出 key valuepairs 寫入到它的memorybuffer中去 當maptask寫入到memorybuffer的數(shù)據占memorybuffer的大小百分比到達一個閾值 缺省為80 時 一個backgroundthread 記為thread 將開始把memorybuffer中的內容spill到磁盤上去 在thread將memorybuffer中的數(shù)據spill到磁盤中之前 thread首先將這些數(shù)據分成若干partition 每一個partition將被發(fā)送至一個reducer 在每一個partition內 thread將根據key對該partition內的數(shù)據 即key valuepairs 進行in memorysort 如果指定了combinerfunction 那么該combinerfunction將會被作用于上述in memorysort的輸出 每當memorybuffer中的數(shù)據達到一個閾值時 就會產生一個spillfile 所以在maptask輸出了所有的record之后 就會存在多個spillfiles 1個record即1個key valuepair 在maptask結束之前 所有的spillfiles將被merge到一個單獨的outputfile中 該outputfile在結構上由多個partition組成 每一個partition內的數(shù)據都是排好序的 且每一個partition將被送至對應的一個reducetask 如果指定了combinerfunction并且spill的數(shù)量不低于3個 那么在生成outputfile之前 combinerfunction將會作用于將要被寫入到outputfile里的每一個partition內的數(shù)據 reducetaskside maptask的輸出存儲在maptask節(jié)點所在機器的本地文件系統(tǒng)中 reducetask會自己所需的某個partition數(shù)據復制到自己所在的HDFS中 且一個reducetask將會從多個maptask復制其所需要的partition 這些partition都是同一類的 reducer怎樣知道從哪些maptasktracker那里去取自己所需要的partition 亦即maptask的輸出 當maptask成功完成后 它會將狀態(tài)更新通知它所屬的tasktracker 該tasktracker進而又會通知其所屬的jobtracker 這些通知是通過heartbeat通信機制實現(xiàn)的 這樣 對于一個job而言 jobtracker知道m(xù)apoutput與tasktracker之間的映射關系 reducer中的一個線程會周期性地向jobtracker詢問mapoutput所在的位置 直到該reducer接收了所有的mapoutput combinerfunction與partitionerfunction 當存在多個reducer時 maptasks將會對它們的輸出進行partition 每一個masktask都會為每一個reducetask生成一個partition 在每一個partition內都可能會有很多keys 以及相應的values 但是對于任一個key而言 它的records都在一個partition內 partition的過程可以由用戶定義的partitioning函數(shù)來控制 但是一般來說 默認的partitioner函數(shù) 根據key進行hash映射 已經可以令人滿意 存在多個reducetask時的partitioningpartition的數(shù)量與reducer的數(shù)量是一致的 定制個性化的partitioner 自定義的partitionerfunction需要繼承于一個抽象類Partitionercontrolsthepartitioninigofthekeysoftheintermediatemap outputs Thekey orasubsetofkey isusedtoderivethepartition typicallybyahashfunction Thiscontrolswhichofthemreducetaskstheintermediatekey andhencetherecord issentforreduction 實現(xiàn)Partitioner中的getPartition函數(shù)原型abstractintgetPartition KEYkey VALUEvalue intnumPatitions 其中 key和value是mapper輸出的intermediateoutput 例如 在WordCount例子中就分別是word與1 numPartitions是reducers的數(shù)量 返回值是該record將被發(fā)送至的reducer的編號 0 1 m 1 指定多個reducers bin hadoopjarWordCount Dmapred reduce tasks 3inputoutput這樣 在reduce階段會有3個reducetasks運行 speculativeexecution 默認打開 當多個task并行運行時 可能若干個task運行明顯比其他task要慢 這種情況下 Hadoop將會為這些運行較慢的task啟動一個相同的backuptask 稱為speculativeexecution 一個task及其speculativetask不會同時運行 以避免競爭 在一個job的所有task都已經啟動的情況下 對于那些同時滿足1 已經運行了一段時間 至少1分鐘 2 運行的速度明顯慢于其余task的平均速度的task 一個speculativetask才會被啟動 對于originaltask及其speculativetask而言 如果任何一方先運行結束 則另一方將被killed Skippingbadrecords 當一個task失敗時 原因可能是硬件故障 待處理數(shù)據非法等 該task將會被retried 但是如果該task失敗的次數(shù)達到4次 那么該task所屬的整個job就將被標記為failed 當maptask讀到一個badrecord時 可能會因為拋出異常而失敗 進而整個job可能會失敗 有時 第三方的庫可能有bug 導致task因讀取了某個badrecord而失敗 而這個第三方的庫又無法修改 這時 可以使用Hadoop的skipmode 以使得讀取輸入文件使自動地跳過badrecords 在打開了skippingmode之后 task會將其所處理的records報告給tasktracker 當task失敗時 tasktracker會retry該task 并跳過引起失敗的records 為了減少skippingmode帶來的帶寬及記賬信息 bookkeeping 的消耗 當一個task失敗達到2次時 才會開啟skippingmode 如果一個task因為某個badrecord而持續(xù)地失敗 那么tasktracker將會以下列的結果執(zhí)行taskattempts task失敗 task再次失敗 skippingmode被打開 task仍然失敗 但是badrecord被tasktracker記錄下來 skippingmode處于使能狀態(tài) task因為跳過了前面導致失敗的badrecord而成功 skippingmode是默認關閉的 注意 對于每一個taskattempt skippingmode只能發(fā)現(xiàn)一個badrecord Taskside effectfiles 要保證一個task的多個instance不會試圖向同一個文件進行寫操作 1 如果某個task失敗了 失敗前已經向輸出文件中寫了一部分數(shù)據 那么當其再次運行 retry 時 必須先將舊的文件刪掉 2 當speculativeexecution被使能時 某個originaltask與它的speculativetask可能會試圖向同一個文件進行寫操作 Hadoop為每一個taskattempt指定了一個臨時目錄 每一個taskattempt的輸出就會被寫到這個目錄中去 從而避免了上述的問題 這個目錄就是 mapred output dir InputFormat map k1 v1 list k2 v2 combine k2 list v2 list k2 v2 reduce k2 list v2 list k3 v3 可以看出 如果使用combiner 那么它的輸入 輸出格式與reducer是完全一樣的 同時也是Reducer的子類 只不過combiner的輸出是intermediatekey valuepairs 這將是reducer的輸入 Inputtypes由Inputformat決定 例如TextInputFormat決定了輸入的key的類型是LongWritable 首字符在文件中的偏移量 value的類型是Text 一行文本內容 如果希望產生其他類型的輸入 可以顯式地調用JobConf的方法 否則 若不顯式地 setexplicitly 設置 則不論是否使用combiner intermediatetypes默認與最終的輸出類型相同 即LongWritable與Text 所以 若k2和k3相同 則不需要調用setMapKeyOutputClass 因為intermediatekeytype已經被setOutputKeyClass 設置好了 同理 若v2和v3相同 則只需要調用setOutputValueClass 即可 為什么要為intermediatekey valuepairs和最終的output指定類型 似乎通過mapper與reducer就可以確定intermediatekey valuepairs和最終的output的類型了 原因 Java的泛型機制中的typeerasure使得這些類型信息在運行時是不可知的 所以必須顯式地為Hadoop指定這些類型 InputFormatclasshierarchy InputSplit 什么是inputsplit 1個inputsplit是inputfile中的1個chunk 該chunk將被1個單獨的map進行處理 每一個map處理一個inputsplit 每一個split可被劃分為若干records 1個record即1個key valuepair map依次處理每一個record Inputsplit由一個Java抽象類代表 即org apache hadoop mapreduce abstractclassInputSplit InputSplitrepresentsthedatatobeprocessedbyanindividualmapper Typically itpresentsabyte orientedviewontheinputandistheresponsibilityofRecordReaderofthejobtoprocessthisandpresentarecord orientedview 注意 InputSplit并不包含inputdata 而只是指向inputdata的一個reference Map Reduce系統(tǒng)利用getLocations 所得到的storagelocations信息來將maptasks放置在盡可能靠近inputsplit數(shù)據的地方 利用getLength 得到的size信息對splits進行排序 使得最大的spilt先被處理 試圖來最小化job的運行時間 Inputfile inputsplitandrecord inputfile Inputsplit record key valuepair MapReduce應用程序開發(fā)者不需要直接處理InputSplit 因為它是由一個InputFormat生成的 InputFormat負責生成inputsplits 并把它們劃分為records 0 20 0之前的定義如下publicinterfaceInputFormat InputSplit getSplits JobConfjob intnumSplits throwsIOException RecordReadergetRecordReader InputSplitsplit JobConfjob Reporterreporter throwsIOException 其實跟新的還是很類似的 對于舊版InputFormat的解釋 TheJobClientcallsthegetSplits method passingthedesirednumberofmaptasksasthenumSplitsargument Thisnumberistreatedasahint asInputFormatimplementationsarefreetoreturnadifferentnumberofsplitstothenumberspecifiedinnumSplits Havingcalculatedthesplits theclientsendsthemtothejobtracker whichusestheirstoragelocationstoschedulemaptaskstoprocessthemonthetasktrackers Onatasktracker themaptaskpassesthesplittothegetRecordReader methodonInputFormattoobtainaRecordReaderforthatsplit ARecordReaderislittlemorethananiteratoroverrecords andthemaptaskusesonetogeneraterecordkey valuepairs whichitpassestothemapfunction TheabstractInputFormatclass TheMap ReduceframworkreliesontheInputFormatofthejobto 1 Validatetheinput specificationofthejob 2 Split uptheinputfile s intologicalInputSplits eachofwhichisthenassignedtoanindividualMapper ProvidetheRecordReaderimplementationtobeusedtogleaninputrecordsfromlogicalInputSplitforprocessingbyaMapper org apache hadoop mapredInterfaceRecordReader RecordReaderreadspairsfromanInputSplit RecordReader typically convertsthebyte orientedviewoftheinputprovidedbytheInputSplit andpresentsarecord orientedviewfortheMapper Reducertasksforprocessing Itthusassumestheresponsibilityofprocessingboundariesandpresentingthetaskswithkeys values MapRunnable MaptasksarerunbyMapRunner thedefaultimplementationofMapRunnablethatcallstheMapper smap methodsequentiallywitheachrecord NotethatMapRunnerisonlywayofrunningmappers MultithreadedMapRunnerisanotherimplementationoftheMapRunnableinterfacethatrunsmappersconcurrentlyinaconfigurablenumberofthreads setbymapred map multithreadedrunner threads FileInputFormat FileInputFormat提供了 1 對一個job的輸入路徑的定義2 為inputfiles產生splits的實現(xiàn)注意 輸入路徑不應該包含子目錄 而只包含文件 因為InputFormat不會自動解析子目錄 而是將其當作一個文件 對于給定的若干文件 FileInputFormat怎樣將它們變?yōu)閟plits呢 FileInputFormat只對 大文件 進行split 這里的 大 是指比HDFS的一個block還要大 Splitsize通常就等于一個HDFSblock的大小 MapReduce中的所有數(shù)據元素都是不可修改的 AlldataelementsinMapReduceareimmutable meaningthattheycannotbeupdated Ifinamappingtaskyouchangeaninput key value pair itdoesnotgetreflectedbackintheinputfiles Communicationoccursonlybygeneratingnewoutput key value pairswhicharethenforwardedbytheHadoopsystemintothenextphaseofexecution- 配套講稿:
如PPT文件的首頁顯示word圖標,表示該PPT已包含配套word講稿。雙擊word圖標可打開word文檔。
- 特殊限制:
部分文檔作品中含有的國旗、國徽等圖片,僅作為作品整體效果示例展示,禁止商用。設計者僅對作品中獨創(chuàng)性部分享有著作權。
- 關 鍵 詞:
- Hadoop 平臺 簡介 南京大學 計算機系
裝配圖網所有資源均是用戶自行上傳分享,僅供網友學習交流,未經上傳用戶書面授權,請勿作他用。
鏈接地址:http://www.szxfmmzy.com/p-6349373.html