多線程在現(xiàn)在工作中出現(xiàn)越來越頻繁、需要我們熟記并且能熟練地使用之、對相關(guān)線程池的一些配置需要我們非常熟悉。
1. 參數(shù)詳解
1.1 corePoolSize
corePoolSize 核心線程數(shù)
– 核心線程會(huì)一直存活,即使沒有任務(wù)需要執(zhí)行- 當(dāng)線程數(shù)小于核心線程數(shù)時(shí),即使有線程空閑,線程池也會(huì)優(yōu)先創(chuàng)建新線程處理- 設(shè)置allowCoreThreadTimeOut=true(默認(rèn)false)時(shí),核心線程會(huì)超時(shí)關(guān)閉
1.2 queueCapacity
queueCapacity 任務(wù)隊(duì)列容量(阻塞隊(duì)列)
- 當(dāng)核心線程數(shù)達(dá)到最大時(shí),新任務(wù)會(huì)放在隊(duì)列中排隊(duì)等待
1.3 maxPoolSize
maxPoolSize 最大線程數(shù)
– 當(dāng)線程數(shù) >= corePoolSize 且任務(wù)隊(duì)列已滿時(shí),線程池會(huì)創(chuàng)建新線程來處理任務(wù)- 當(dāng)線程數(shù) = maxPoolSize且任務(wù)隊(duì)列已滿時(shí),線程池會(huì)拒絕處理任務(wù)而拋出異常
1.4 keepAliveTime
keepAliveTime 線程空閑時(shí)間
– 當(dāng)線程空閑時(shí)間達(dá)到keepAliveTime時(shí),線程會(huì)退出,直到線程數(shù)量=coolPoolSize- 如果allowCoreThreadTimeOut=true時(shí),則會(huì)直到線程數(shù)量=0
1.5 allowCoreThreadTimeOut
allowCoreThreadTimeOut 允許核心線程超時(shí)
1.6 rejectedExecutionHandler
rejectedExecutionHandler 任務(wù)拒絕處理器
- 兩種情況會(huì)拒絕處理任務(wù)
- 當(dāng)線程數(shù)已經(jīng)達(dá)到maxPoolSize且任務(wù)隊(duì)列已滿,就會(huì)拒絕新的任務(wù)
- 當(dāng)線程池被調(diào)運(yùn)shutdown() 然后,會(huì)等待線程池里的任務(wù)執(zhí)行完畢在shutdown。如果在調(diào)運(yùn)shutdown() 和線程池真正shutdown之間提交的任務(wù),會(huì)拒絕新的任務(wù)
- 線程池會(huì)調(diào)運(yùn)rejectedExecutionHandler來處理這個(gè)任務(wù),如果沒有設(shè)置默認(rèn)是AbortPolicy,會(huì)拋出異常
– ThreadPoolExecutor類有幾個(gè)內(nèi)部實(shí)現(xiàn)類來處理這類情況 1. **AbortPolicy** 丟棄任務(wù)、拋運(yùn)行時(shí)異常 1. **CallerRunsPolicy** 執(zhí)行任務(wù) 1. **DisCardPolicy** 忽視,什么都不會(huì)發(fā)生 1. **DisCardOldestPolicy** 從隊(duì)列中提出最先 進(jìn)入隊(duì)列(最后一個(gè)執(zhí)行)的任務(wù)
- 實(shí)現(xiàn)RejectedExecutionHandler接口,可自定義處理器
2. ThreadPoolExecutor 執(zhí)行順序
線程池按照以下行為執(zhí)行任務(wù)
1. 當(dāng)線程數(shù) = 核心線程數(shù),且任務(wù)隊(duì)列未滿時(shí),將任務(wù)放入任務(wù)隊(duì)列。3. 當(dāng)線程數(shù) >= 核心線程數(shù),且任務(wù)隊(duì)列已滿- 若線程數(shù) < 最大線程數(shù),創(chuàng)建線程- 若線程數(shù) = 最大線程數(shù),拋出異常,拒絕任務(wù)
3. 如何設(shè)置參數(shù)
3.1 默認(rèn)值
corePoolSize = 1queueCapacity = Integer.MAX_VALUEmaxPoolSize = Integer.MAX_VALUEkeepAliveTime = 60sallowCoreThreadTimeOut = falserejectedExecutionHandler = AbortPolicy
3.2 如何來設(shè)置
- 需要根據(jù)幾個(gè)值來決定
tasks : 每秒的任務(wù)數(shù),假設(shè)為 500 – 1000taskcost : 每個(gè)任務(wù)花費(fèi)時(shí)間,假設(shè)為0.1sresponsetime : 系統(tǒng)允許容忍的最大響應(yīng)時(shí)間,假設(shè)為1s
- 計(jì)算方式
corePoolSize = 每秒需要多少個(gè)線程處理 * threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 個(gè)線程。corePoolSize設(shè)置應(yīng)該大于50 * 根據(jù)8020原則,如果80%的每秒任務(wù)數(shù)小于800,那么corePoolSize設(shè)置為80即可queueCapacity = (coreSizePool/taskcost)*responsetime * 計(jì)算可得 queueCapacity = 80/0.1*1 = 80。意思是隊(duì)列里的線程可以等待1s,超過了的需要新開線程來執(zhí)行 * 切記不能設(shè)置為Integer.MAX_VALUE,這樣隊(duì)列會(huì)很大,線程數(shù)只會(huì)保持在corePoolSize大小,當(dāng)任務(wù)陡增時(shí),不能新開線程來執(zhí)行,響應(yīng)時(shí)間會(huì)隨之陡增。maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost) * 計(jì)算可得 maxPoolSize = (1000-80)/10 = 92 * (最大任務(wù)數(shù)-隊(duì)列容量)/每個(gè)線程每秒處理能力 = 最大線程數(shù)rejectedExecutionHandler:根據(jù)具體情況來決定,任務(wù)不重要可丟棄,任務(wù)重要?jiǎng)t要利用一些緩沖機(jī)制來處理keepAliveTime和allowCoreThreadTimeout采用默認(rèn)通常能滿足
4. 使用案例
@Componentpublic class KafkaReceiver { @Autowired private BroadbandService broadbandService; private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat(“deal-pool-%d”) .build(); private ExecutorService threadPool = new ThreadPoolExecutor(10, 20, 5, TimeUnit.SECONDS, new LinkedBlockingQueue(10), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); /** * 監(jiān)聽kafka消息受理業(yè)務(wù) */ @KafkaListener(topics = “${kafka.topics[1]}”) public void listenerKafkaProductCommit(ConsumerRecord record) { threadPool.execute(() -> { try { BusinessOrder businessOrder = JsonUtil.jsonToObject(record.value(), BusinessOrder.class); log.info(“消息隊(duì)列正在工作,訂單號(hào): {}”, businessOrder.getOrderNo()); //嘗試多線程處理 broadbandService.asynSchoolBroadbandRemoveService(businessOrder); } catch (Exception e) { log.error(“消息隊(duì)列正在工作異常”, e); } }); } }