總覺得階段性的總結(jié)是個好習(xí)慣,很多自己做的事情,如果不及時總結(jié)一下,過一段時間就忘記了,當(dāng)要用到時,又需要花費較多的時間去重新熟悉。于是決定抽點時間總結(jié)一下以前對國際搜索離線系統(tǒng)做的一些優(yōu)化(這里說的國際搜索,主要指AE、SC和SC店鋪,AE即Ali
總覺得階段性的總結(jié)是個好習(xí)慣,很多自己做的事情,如果不及時總結(jié)一下,過一段時間就忘記了,當(dāng)要用到時,又需要花費較多的時間去重新熟悉。于是決定抽點時間總結(jié)一下以前對國際搜索離線系統(tǒng)做的一些優(yōu)化(這里說的國際搜索,主要指AE、SC和SC店鋪,AE即AliExpress,SC即Sourcing,這些優(yōu)化對這幾個應(yīng)用都是通用的),不僅起到一個備忘的作用,如果能給讀者帶來一些啟發(fā),想必也是極好的。
既然是搜索離線系統(tǒng)相關(guān),我們就先看一下國際搜索全量流程的幾個主要環(huán)節(jié),如圖1所示。
圖1. 全量流程
1)dump,將數(shù)據(jù)從數(shù)據(jù)庫讀出來,寫入hbase,只有做大全量的時候才會全量dump數(shù)據(jù)庫,一般情況下每天只需跑一次小全量,數(shù)據(jù)庫中數(shù)據(jù)的更新會以增量的方式更新hbase。
2)join,讀取hbase,做多表join,生成一條條doc,一條doc包含了一條產(chǎn)品的全部字段。
3)global sort,即全局排序,按產(chǎn)品全局分global_score對產(chǎn)品進(jìn)行全局排序,生成的單個文件內(nèi)部并不要求有序。
4)abuild,讀取全局排序后生成的文件,構(gòu)建索引,生成的索引會存儲在HDFS上。
5)dispatch,將索引從HDFS上分發(fā)到對應(yīng)的search機(jī)器上。
6)switch,切換索引、程序、配置和算法詞典,新索引上線,對外提供服務(wù)。
這次先總結(jié)一下全局排序優(yōu)化,任何項目或需求都有相應(yīng)的背景,我們的離線計算中為何要做全局排序?
說到這個,又引出了分層檢索,早些時候,國際站搜索引擎對外提供服務(wù)時,在處理每個搜索請求時,都會查詢所有的segment,但其實對于每個請求,都只需返回一定數(shù)量的結(jié)果集,因此,查詢所有的segment并非必要,只會帶來性能上的損失。于是,分層檢索就在千呼萬喚中出來了。
何謂分層檢索,顧名思義,就是只查詢一定數(shù)量的segment,當(dāng)結(jié)果集夠了就不再繼續(xù)查詢,這對搜索引擎查詢性能的優(yōu)化是顯而易見的。
但這里存在一個問題,就是對于賣家發(fā)布的產(chǎn)品,質(zhì)量是良莠不齊的,我們需要把質(zhì)量好的優(yōu)先搜索出來,所以前面segment的產(chǎn)品質(zhì)量要高于后面的segment,否則一些質(zhì)量高的展品就沒有展示機(jī)會了。比如,我們有3個segment,seg_1, seg_2, seg_3,那么seg_1中的產(chǎn)品質(zhì)量就要比seg_2中的產(chǎn)品質(zhì)量高,seg_2中的產(chǎn)品質(zhì)量要比seg_3中的產(chǎn)品質(zhì)量高,在每個segment內(nèi)部并不做要求。
判斷產(chǎn)品質(zhì)量好壞的標(biāo)準(zhǔn)是什么呢?我們引入了一個全局分global_score,每條產(chǎn)品的global_score都是離線計算好的,以此作為分層檢索的依據(jù)。
如圖1所示,在搜索引擎的離線計算中,有個多表join的環(huán)節(jié),在多表join的過程中會有一些業(yè)務(wù)邏輯的計算,global_score就是在這個階段計算出來的。有了global_score,我們就可以對產(chǎn)品做全局排序了。假如排序之后我們生成3個文件,part_1, part_2, part_3,就要求part_1中每條doc的global_score要高于part_2中的每條doc,part_2之于part_3亦如此,但每個part內(nèi)部并不要求有序。在后面建索引的過程中,會有一個保序邏輯,以此保證多個segment之間的有序。
全局排序怎么做呢?由于數(shù)據(jù)量大,我們各個應(yīng)用的離線計算任務(wù)基本上都是運行在hadoop集群上的,全局排序亦如此。要達(dá)到上述的效果,即各個partition之間是按global_score有序的,我們采用的方案是:首先對數(shù)據(jù)進(jìn)行采樣,按global_score進(jìn)行分區(qū),將定義分區(qū)的鍵寫入_partitions文件,再實現(xiàn)自定義的TotalOrderPartitioner(這里實現(xiàn)自定義的TotalOrderPartitioner是為了在輸出的單個文件內(nèi)部將同一家公司的產(chǎn)品聚合在一起,即按company_id聚合,從而大大提高輸出文件的壓縮比,顯著縮短了后面abuild構(gòu)建索引的運行時間),進(jìn)行全局排序。采樣的核心思想是只查看一小部分鍵,獲得鍵的近似分布,并由此構(gòu)建分區(qū)。
這里有必要先提一下列的概念,由于單臺search能承載的索引量有限,所以數(shù)據(jù)量大時,需要對數(shù)據(jù)進(jìn)行分列,使所有數(shù)據(jù)盡量均勻分布到不同的列上。比如SC有19列,采用的做法就是根據(jù)product_id % 19將全部數(shù)據(jù)分布到19列上。在做多表join的之后,數(shù)據(jù)的分列就已經(jīng)做好了。因此全局排序是對多列的數(shù)據(jù)分別進(jìn)行全局排序。
在分層檢索項目上線到SC BT集群(預(yù)發(fā)布環(huán)境)時,全局排序需要80min才能運行完成,經(jīng)分析,大部分的時間耗在采樣上面。看了代碼,發(fā)現(xiàn)每列的全局排序都對應(yīng)一個job,SC有19列數(shù)據(jù),就跑19個job分別對每列數(shù)據(jù)進(jìn)行全局排序。排序之前先采樣,采樣器是在客戶端運行的,因此,分片的下載數(shù)量以加速采樣器的運行就顯得尤為重要。在優(yōu)化之前的代碼實現(xiàn)中,每個job都是讀取對應(yīng)列的數(shù)據(jù),自己采樣的,而且多個job是串行采樣。因此,一個可行的優(yōu)化方案就是多個job并行采樣,但由于我們的產(chǎn)品數(shù)據(jù)是分列存儲的,每一列的數(shù)據(jù)量也足夠大。比如SC現(xiàn)在3.6億的數(shù)據(jù)量,單列的數(shù)據(jù)就接近2千萬,因此其實每一列產(chǎn)品global_score的分布是基本一致的,所以,我們是否可以只對一列數(shù)據(jù)進(jìn)行采樣,然后所有job都共享這一個樣本呢?這樣就不僅能大大縮短采樣時間,而且也不會引入并行的復(fù)雜性。答案是可行的。
簡單的說,全局排序優(yōu)化的基本思想,就是根據(jù)數(shù)據(jù)的分布特點,使多列數(shù)據(jù)的多個全局排序job共享同一個樣本。
下面我們來看一下優(yōu)化后的代碼實現(xiàn):
Vector vecRunningJob = new Vector(build_num); Vector vecJobClient = new Vector(build_num); for (int j = 0; j < build_num; j++) { job.setJobName("Doc Sort job" + String.valueOf(j)); job.setInt("dc.sort.jobindex", j); Vector vecInput = fileGenerator.getInPutFiles(j, build_num); JobConf newjob = makeJob(job, inputPath, vecInput, outputPath + "/" + j, aggregateField); // Make a job for each column JobClient jc = new JobClient(newjob); vecJobClient.add(jc); vecRunningJob.add(jc.submitJob(newjob)); }
其中build_num表示列數(shù),從上面的代碼可以看出,對每列數(shù)據(jù)都會調(diào)用makeJob方法,然后提交任務(wù)進(jìn)行全局排序。注意這里調(diào)用makeJob方法和提交任務(wù)是串行的,不過任務(wù)提交后是并行跑的。
?我們再看一下makeJob方法的實現(xiàn):
private static JobConf makeJob(JobConf basejob, String inputPath, Vector vecInPutFile, String outPutPath, String aggregateField) throws Exception { JobConf conf = new JobConf(basejob); conf.setJarByClass(DCSortMain.class); for (int i = 0; i < vecInPutFile.size(); i++) { FileInputFormat.addInputPath(conf, new Path(vecInPutFile.get(i))); } Path outputDir = new Path(outPutPath); FileOutputFormat.setOutputPath(conf, outputDir); conf.setMapOutputKeyClass(DCText.class); conf.setMapOutputValueClass(DCText.class); conf.setOutputKeyClass(DCText.class); conf.setOutputValueClass(DCText.class); conf.setMapperClass(IdentityMapper.class); conf.setReducerClass(IdentityReducer.class); conf.setInputFormat(DCTextInputFormat.class); conf.setOutputFormat(DCTextOutputFormat.class); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec"); conf.setOutputKeyComparatorClass(DCText.AggregateFieldComparator.class); // Sort numbericly by desc conf.setNumReduceTasks(conf.getInt("dc.sort.reduce_num", 1)); sample(conf, inputPath); // Sample before global sort return conf; }
可見,在做好相關(guān)設(shè)置后,makeJob中會調(diào)用sample方法進(jìn)行采樣,也就是說,其實針對每一列的makeJob都會調(diào)用sample方法。
再來看看sample方法的實現(xiàn):
private static void sample(JobConf conf, String inputPath) throws IOException, URISyntaxException { int jobIndex = 0; Path partitionFile = new Path(inputPath, jobIndex + "_partitions"); conf.setPartitionerClass(MyTotalOrderPartitioner.class); conf.set("total.order.partitioner.natural.order", "false"); MyTotalOrderPartitioner.setPartitionFile(conf, partitionFile); if (!sampleDone) { LOG.info("sample start ..."); MyInputSampler.Sampler sampler = new MyInputSampler.RandomSampler(1, 20000, 10); MyInputSampler.writePartitionFile(conf, sampler); LOG.info("sample end ..."); sampleDone = true; } // Add to DistributedCache URI partitionUri = new URI(partitionFile.toString() + "#" + jobIndex + "_partitions"); DistributedCache.addCacheFile(partitionUri, conf); DistributedCache.createSymlink(conf); }
可以看出,我們引入了一個布爾變量sampleDone對采樣進(jìn)行了控制,只在第1次調(diào)用makeJob方法時才執(zhí)行采樣操作,后面的創(chuàng)建的job都不再進(jìn)行采樣,而是與第1個job共享同一個_partitions文件,載入到自己使用的分布式緩存中,供后面的全局排序使用。sampleDone定義如下:
private static boolean sampleDone = false;
順便提一下采樣操作,hadoop內(nèi)置的采樣器有3個:
1)RandomSampler,以指定的采樣率均勻地從一個數(shù)據(jù)集中選擇樣本;
2)SplitSampler,只采樣一個分片中的前n個記錄;
3)IntervalSampler,以一定的間隔定期從劃分中選擇鍵,對于已排好序的數(shù)據(jù)來說是一個更好的選擇。
RandomSampler是優(yōu)秀的通用采樣器,我們最終也是選擇RandomSampler,因為雖然使用另外兩個采用器,采樣時間更短,但最終數(shù)據(jù)分布卻很不均勻,只有RandomSampler才能達(dá)到預(yù)期效果。同時,我們將采樣率設(shè)置為1,最大樣本數(shù)設(shè)置為20000,最大分區(qū)設(shè)置為10。最大樣本數(shù)和最大分區(qū)只需滿足其一,即停止采樣。可以通過調(diào)整RandomSampler的這些參數(shù)達(dá)到不同的采樣效果。
優(yōu)化版本上線SC BT之后,全局排序的運行時間從80min降到了30min,縮短了50min。正式環(huán)境由于hadoop集群更加強(qiáng)大,全局排序的運行時間更短。
原文地址:國際搜索離線系統(tǒng)優(yōu)化之一 —— 全局排序優(yōu)化, 感謝原作者分享。
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com