在线不卡日本ⅴ一区v二区_精品一区二区中文字幕_天堂v在线视频_亚洲五月天婷婷中文网站

  • <menu id="lky3g"></menu>
  • <style id="lky3g"></style>
    <pre id="lky3g"><tt id="lky3g"></tt></pre>

    flink流式增量查詢hudi表流程分析

    flink流式增量查詢hudi表流程分析

    環(huán)境

    • flink 1.13.6
    • hudi 0.11.0
    • merge on read 表

    代碼示例

    tEnv.executeSql(“CREATE TABLE tb_person_hudi ( id BIGINT, age INT, name STRING,create_time TIMESTAMP ( 3 ), time_stamp TIMESTAMP(3),PRIMARY KEY ( id ) NOT ENFORCED ) WITH (” + “‘connector’ = ‘hudi’,” + “‘table.type’ = ‘MERGE_ON_READ’,” + “‘path’ = ‘file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi’,” + “‘read.start-commit’ = ‘20220722103000’,” + // “‘read.end-commit’ = ‘20220722104000’,” + “‘read.task’ = ‘1’,” + “‘read.streaming.enabled’ = ‘true’,” + “‘read.streaming.check-interval’ = ’30’ ” + “)”);Table table = tEnv.sqlQuery(“select * from tb_person_hudi “);tEnv.toChangelogStream(table).print().setParallelism(1);env.execute(“test”);

    流程分析

    hudi源入口(HoodieTableSource)

    HoodieTableSource實(shí)現(xiàn)ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4個(gè)接口主要是支持對(duì)查詢計(jì)劃的優(yōu)化。ScanTableSource則提供了讀取hudi表的具體實(shí)現(xiàn),核心方法為org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider:

    if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { //開啟了流式讀(read.streaming.enabled) StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName(“split_monitor”)) .setParallelism(1) .transform(“split_reader”, typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource(source);}

    上面代碼在流環(huán)境中創(chuàng)建了一個(gè)SourceFunction(StreamReadMonitoringFunction)和一個(gè)自定義的轉(zhuǎn)換(StreamReadOperator)

    • StreamReadMonitoringFunction: 監(jiān)控hudi表元數(shù)據(jù)目錄(.hoodie)獲取需要被讀取的文件分片(MergeOnReadInputSplit,一個(gè)base parquet文件和一組log文件),然后把分片遞給下游的轉(zhuǎn)換算子StreamReadOperator進(jìn)行文件讀??;固定一個(gè)線程去監(jiān)控,名稱為split_monitorxxxxx.
    • StreamReadOperator:將按timeline升序收到的MergeOnReadInputSplit一個(gè)一個(gè)地讀取分片數(shù)據(jù);算子名稱為split_reader->xxxxx,可以通過設(shè)置read.tasks進(jìn)行設(shè)置并行度

    定時(shí)監(jiān)控元數(shù)據(jù)獲得增量分片(StreamReadMonitoringFunction)

    StreamReadMonitoringFunction負(fù)責(zé)定時(shí)(read.streaming.check-interval)掃描hudi表的元數(shù)據(jù)目錄.hoodie,如果發(fā)現(xiàn)在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],從這些instant信息中可以知道數(shù)據(jù)變更寫到了哪些文件(parquet,log),然后構(gòu)建成分片對(duì)象(MergeOnReadInputSplit)。

    • 核心屬性:issuedInstant,這個(gè)是增量查詢的依據(jù),記錄著當(dāng)前已經(jīng)消費(fèi)的數(shù)據(jù)的最新instant,類似于kafka的offset,但是hudi是基于timeline.該值是有狀態(tài)的,維護(hù)在ListState中,所以flink job重啟依然可以做到增量。
    • 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很簡單,就做了兩件事,調(diào)用IncrementalInputSplits#inputSplits獲取到增量分片(有序),然后傳遞給下游的算子(StreamReadOperator)

    public void monitorDirAndForwardSplits(SourceContext context) { HoodieTableMetaClient metaClient = getOrCreateMetaClient(); IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); }}

    獲取增量分片(IncrementalInputSplits)

    主要邏輯在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi關(guān)于timeline和instant的一些基本概念,詳細(xì)的流程如下圖所示:

    如果flink job首次運(yùn)行指定了read.start-commit和read.end-commit,但是該范圍是比較久以前,instant已經(jīng)被歸檔,那么流作業(yè)將永遠(yuǎn)不能消費(fèi)到數(shù)據(jù)

    https://github.com/apache/hudi/issues/6167

    讀取數(shù)據(jù)文件(StreamReadOperator)

    StreamReadOperator算子接收分片后會(huì)緩存在隊(duì)列Queue splits,然后不停從隊(duì)列中poll分片放到線程池中執(zhí)行

    private void processSplits() throws IOException { format.open(split); consumeAsMiniBatch(split); enqueueProcessSplits(); }

    主要有三個(gè)步驟

  • 從隊(duì)列中peek分片,調(diào)用MergeOnReadInputFormat.open構(gòu)建迭代器,迭代器是用來進(jìn)行文件的數(shù)據(jù)讀取,一個(gè)迭代器對(duì)應(yīng)一個(gè)分片(多個(gè)物理文件,base+log),對(duì)應(yīng)不同讀取的場景,有幾種迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator
  • 微批量消費(fèi),每批只讀2048記錄,將把記錄傳遞給下游的算子消費(fèi)同時(shí)標(biāo)記消費(fèi)的總數(shù),如果該分片讀到了尾,則將該分片從隊(duì)列中彈出,并關(guān)閉MergeOnReadInputFormat
  • 繼續(xù)處理隊(duì)列中的分片,回到步驟1,如果上一次的分片沒消費(fèi)完,那么本次循環(huán)將繼續(xù)消費(fèi),只不過是由另一個(gè)線程處理。
  • 鄭重聲明:本文內(nèi)容及圖片均整理自互聯(lián)網(wǎng),不代表本站立場,版權(quán)歸原作者所有,如有侵權(quán)請(qǐng)聯(lián)系管理員(admin#wlmqw.com)刪除。
    用戶投稿
    上一篇 2022年7月24日 18:43
    下一篇 2022年7月24日 18:43

    相關(guān)推薦

    • 分享4條發(fā)微商朋友圈的方法(微商朋友圈應(yīng)該怎么發(fā))

      對(duì)于微商朋友來說,朋友圈的重要性不言而喻了。 那么微商的朋友圈到底該怎么發(fā)呢? 為什么同樣是經(jīng)營一個(gè)朋友圈,有的微商看起來逼格滿滿,實(shí)際效果也不錯(cuò);而有的卻動(dòng)都不動(dòng)就被屏蔽甚至拉黑…

      2022年11月27日
    • 奶茶的做法和配方(草莓奶茶的做法和配方)

      今天小編給各位分享奶茶的做法和配方的知識(shí),其中也會(huì)對(duì)草莓奶茶的做法和配方進(jìn)行解釋,如果能碰巧解決你現(xiàn)在面臨的問題,別忘了關(guān)注本站,現(xiàn)在開始吧! 奶茶的怎么做法 主料 牛奶200ml…

      2022年11月26日
    • 怎么轉(zhuǎn)行總結(jié)出成功轉(zhuǎn)行的3個(gè)步驟

      01 前段時(shí)間,由麥可思研究院發(fā)布的《就業(yè)藍(lán)皮書:2019年中國大學(xué)生就業(yè)報(bào)告》顯示,2018大學(xué)畢業(yè)生半年內(nèi)的離職率為33%,主動(dòng)離職的主要原因是“個(gè)人發(fā)展空間不夠”和“薪資福利…

      2022年11月26日
    • 《寶可夢朱紫》夢特性怎么獲得?隱藏特性獲取方法推薦

      寶可夢朱紫里有很多寶可夢都是擁有夢特性會(huì)變強(qiáng)的寶可夢,很多玩家不知道夢特性怎么獲得,下面就給大家?guī)韺毧蓧糁熳想[藏特性獲取方法推薦,感興趣的小伙伴一起來看看吧,希望能幫助到大家。 …

      2022年11月25日
    • 《寶可夢朱紫》奇魯莉安怎么進(jìn)化?奇魯莉安進(jìn)化方法分享

      寶可夢朱紫中的奇魯莉安要怎么進(jìn)化呢?很多玩家都不知道,下面就給大家?guī)韺毧蓧糁熳掀骠斃虬策M(jìn)化方法分享,感興趣的小伙伴一起來看看吧,希望能幫助到大家。 奇魯莉安進(jìn)化方法分享 奇魯莉安…

      2022年11月25日
    • 客服的崗位職責(zé)怎么寫(客服工作內(nèi)容及職責(zé))

      各位小伙伴們大家周一好,又到了每周一給大家分享干貨內(nèi)容的時(shí)候啦~ 本期來跟大家分享一下客服工作管理流程以及客服崗位里面的每項(xiàng)職能崗位的核心細(xì)則,也是干貨滿滿推薦收藏~ 一.補(bǔ)償流程…

      2022年11月25日
    • 什么是推廣cpa一篇文章帶你看懂CPA推廣渠道

      CPA渠道 CPA指的是按照指定的行為結(jié)算,可以是搜索,可以是注冊(cè),可以是激活,可以是搜索下載激活,可以是綁卡,實(shí)名認(rèn)證,可以是付費(fèi),可以是瀏覽等等。甲乙雙方可以根據(jù)自己的情況來定…

      2022年11月25日
    • 抖音直播帶貨有哪些方法技巧(抖音直播帶貨有哪些痛點(diǎn))

      如今抖音這個(gè)短視頻的變現(xiàn)能力越來越突顯了,尤其是在平臺(tái)上開通直播,更具有超強(qiáng)的帶貨屬性,已經(jīng)有越來越多的普通人加入到其中了。不過直播帶貨雖然很火,但是也不是每個(gè)人都能做好的,那么在…

      2022年11月24日
    • 《原神》3.2無相交響詩第一天無相之冰怎么打?無相交響詩攻略

      原神3.2無相交響詩第一天無相之冰怎么打?最近新版本3.2版本的無相交響詩活動(dòng)又開啟了,不少玩家還不清楚具體的玩法,下面一起來看一下原神被隱去的原神3.2無相交響詩第一天無相之冰打…

      2022年11月24日
    • 科比19歲女兒遭自稱與她生“科比式孩子”男子跟蹤騷擾

      極目新聞?dòng)浾咄趿亮咙S佳琪 據(jù)??怂剐侣劸W(wǎng)報(bào)道,當(dāng)?shù)貢r(shí)間11月21日,已故籃球巨星科比·布萊恩特的長女娜塔莉亞·布萊恩特21日向法院提交臨時(shí)限制令,聲稱這位32歲的前科從十幾歲起就騷…

      2022年11月24日

    聯(lián)系我們

    聯(lián)系郵箱:admin#wlmqw.com
    工作時(shí)間:周一至周五,10:30-18:30,節(jié)假日休息