ThreadPoolExecutor是一個ExecutorService,實現了Executor接口,是一個線程池服務的實現。通常通過Executors類的工廠方法來創建。
一、實現原理
1、關鍵要素
要理解線程池的實現原理,主要理解以下的關鍵要素:
• 線程池的狀態:提供了線程池生命周期中所處的不同狀態,分別為以下狀態(分別用0,1,2,3表示):
①RUNNING: 可以接收新任務和可以處理隊列中的任務;
②SHUTDOWN: 不接收新任務,可以繼續處理隊列中的任務;
③STOP: 不接收新任務,也不處理隊列任務,中斷在執行的任務;
④TERMINATE: 類似STOP,所有的線程都會停止。
• 線程數量控制參數:用于控制何時創建線程,何時不能創建線程和任務邏輯控制。
①corePoolSize: 池中核心線程的數量;
②maximunPoolSize: 池中最大的線程數量;
③poolSize: 當前池中的線程數量;
• 工作者線程和工作隊列:工作者線程從工作隊列中獲取任務并執行,如果設置了超時參數,工作者線程如果沒有任務時,會被回收。工作隊列使用的都是阻塞BlockingQueue。
• 線程工廠:ThreadFactory,用于創建新線程,可以自定義線程的一些屬性,比如名稱等,如果在初始化是沒有配置,就會使用默認的ThreadFactory。
• 保持活動的時間:keepAliveTime,當線程池中有多于corePoolSize的線程時,多出的線程空閑時間超過keepAliveTime時將會被清理。
• 拒絕策略:當線程池已經關閉或線程數量達到最大數或工作隊列已經飽和時,將拒絕執行新的任務, 主要實現了以下4種拒絕策略:
①AbortPolicy:中斷策略,默認的策略,拒絕任務并拋出運行時異常RejectedExecutionException;
②CallerRunsPolicy:使用調用者線程執行任務,除非線程池已經shutdown。
③DiscardPolicy:不能執行的任務將刪除;
④DiscardOldestPolicy:如果執行程序未關閉,位于隊列頭的任務被刪除,然后重試執行程序(失敗則重復此過程)。
以上的線程池關鍵要素都會在構造方法中初始化,并根據初始化配置的不同,將產生不同特點的線程池。比如
FixedThreadPool,CachedThreadPool,SingleThreadPool等。
2、執行任務
怎么執行提交到池中的任務是線程池的核心,是由execute方法實現的,執行的過程如下:
①判斷核心線程是否都在執行任務,如果不是,則創建新線程來執行任務,如果核心線程都在執行任務,則進入②;
②判斷工作隊列是否已經滿,如果不滿,則把任務加入工作隊列,如果已經滿,則進入步驟③;
③判斷線程池的線程是否都處于工作狀態,如果沒有,則創建一個新線程來執行任務,否則執行拒絕策略。
核心的execute代碼如下:
1 |
public void execute(Runnable command) { |
從代碼看出,可以分為以下情況:
①當前的線程數小于核心線程數,則可以繼續創建新線程來執行任務;
②如果運行的線程多于corePoolSize,則把任務加入workQueue隊列;
③如果無法將任務加入隊列(隊列已滿),則創建新線程執行任務;
④如果創建新線程后的線程數大于maximunPoolSize,則創建失敗,并通過拒絕策略拒絕執行新任務;
把任務交給worker線程處理,addThread方法實現了這個邏輯:
1 |
private Thread addThread(Runnable firstTask) { |
3、worker線程實現
worker線程是真正實現執行任務的線程,是通過內部類Worker來實現的,Worker通過在run方法中循環從
workQueue隊列中獲取任務來執行:
1 |
public void run() { |
run方法通過getTask()方法不斷從阻塞隊列中獲取任務,getTask實現過程如下:
1 |
Runnable getTask() { |
Worker類中的runTask方法是實際執行任務的方法,其實現過程如下:
1 |
private void runTask(Runnable task) { |
可以看到,在任務執行的前后都可以插入一些代碼,由子類來實現監控或其它功能。run方法退出前會執行workDown方法來維護線程池的一些狀態信息:
1 |
void workerDone(Worker w) { |
至此,線程池執行任務的相關邏輯結束。
4、關閉線程池
關閉線程池可以通過shutdown()方法或shutdownNow方法來實現,在關閉前都需要進行安全檢查,以防惡意代碼執行。兩個方法不同的地方在于中斷worker線程不一樣,shutdown調用的是interruptIfIdle方法,而shutdownNow比較粗暴,直接interrupt線程。但shutdownNow會把當前隊列中的任務拿出來返回。
1 |
public void shutdown() { |
1 |
void interruptIfIdle() { |
5、并發控制
ThreadPoolExecutor在實現中使用到了ReentrantLock來進行并發控制。核心的表示線程數量的int變量都使用了volatile來保證它的可見性。在Worker實現中也用到了ReentrantLock來控制任務的執行。ThreadPoolExecutor的主要方法都是在lock的控制下進行,比如創建線程時,關閉線程池時。所以,為了避免鎖帶來的問題,在execute方法中,有這樣一個判斷:如果當前線程數大于核心線程時,將會把任務加入隊列中,這個過程是不需要加鎖的,這樣就提升了線程池的性能。當線程池預熱創建線程數達到核心線程數后都將執行此邏輯。
二、ThreadPoolExecutor的使用
1、創建線程池
一般不推薦直接通過構造函數來創建,應該通過Executors類的工廠方法來創建線程池。因為如果配置的參數不當,線程池的性能可能會受到很大影響。
創建線程池需要指定如下參數:
• corePoolSize: 核心線程數;可以通過prestartAllCoreThreads方法來預熱創建核心線程;
• runnableTaskQueue: 任務隊列,是阻塞隊列blockingQueue的各種實現類。
• maximunPoolSize:池中允許的最大線程數。
• threadFactory: 線程工廠類
• RejectExecutionHandler:拒絕策略處理類。
2、提交任務
提交任務可以使用execute方法或者submit方法,后者會返回一個異步計算結果。
3、關閉線程池
可以通過調用shutdown或shutdownNow方法來關閉線程池。
4、根據任務來配置線程池
任務性質:CPU密集型 、IO密集型、混合型
任務的優先級:高、中、低
任務的執行時間:長、中、短
任務的依賴性:是否依賴外部資源,如數據庫。
CPU密集型任務可以配置少些線程,IO密集型可以配置多些線程,優先級不同可以用PriorityBlockingQueue來作為工作隊列。對于依賴外部資源的任務,因為空閑線程可能較多,可以配置多些線程來利用CPU資源 。