亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

ThreadPoolExecutor實現分析

2023-08-01 12:32:49
6
0

ThreadPoolExecutor是一個ExecutorService,實現了Executor接口,是一個線程池服務的實現。通常通過Executors類的工廠方法來創建。

一、實現原理

1、關鍵要素

要理解線程池的實現原理,主要理解以下的關鍵要素:

• 線程池的狀態:提供了線程池生命周期中所處的不同狀態,分別為以下狀態(分別用0,1,2,3表示):
①RUNNING: 可以接收新任務和可以處理隊列中的任務;
②SHUTDOWN: 不接收新任務,可以繼續處理隊列中的任務;
③STOP: 不接收新任務,也不處理隊列任務,中斷在執行的任務;
④TERMINATE: 類似STOP,所有的線程都會停止。
• 線程數量控制參數:用于控制何時創建線程,何時不能創建線程和任務邏輯控制。
①corePoolSize: 池中核心線程的數量;
②maximunPoolSize: 池中最大的線程數量;
③poolSize: 當前池中的線程數量;
• 工作者線程和工作隊列:工作者線程從工作隊列中獲取任務并執行,如果設置了超時參數,工作者線程如果沒有任務時,會被回收。工作隊列使用的都是阻塞BlockingQueue。
• 線程工廠:ThreadFactory,用于創建新線程,可以自定義線程的一些屬性,比如名稱等,如果在初始化是沒有配置,就會使用默認的ThreadFactory。
• 保持活動的時間:keepAliveTime,當線程池中有多于corePoolSize的線程時,多出的線程空閑時間超過keepAliveTime時將會被清理。
• 拒絕策略:當線程池已經關閉或線程數量達到最大數或工作隊列已經飽和時,將拒絕執行新的任務, 主要實現了以下4種拒絕策略:
①AbortPolicy:中斷策略,默認的策略,拒絕任務并拋出運行時異常RejectedExecutionException;
②CallerRunsPolicy:使用調用者線程執行任務,除非線程池已經shutdown。
③DiscardPolicy:不能執行的任務將刪除;
④DiscardOldestPolicy:如果執行程序未關閉,位于隊列頭的任務被刪除,然后重試執行程序(失敗則重復此過程)。

以上的線程池關鍵要素都會在構造方法中初始化,并根據初始化配置的不同,將產生不同特點的線程池。比如
FixedThreadPool,CachedThreadPool,SingleThreadPool等。

2、執行任務

怎么執行提交到池中的任務是線程池的核心,是由execute方法實現的,執行的過程如下:
①判斷核心線程是否都在執行任務,如果不是,則創建新線程來執行任務,如果核心線程都在執行任務,則進入②;
②判斷工作隊列是否已經滿,如果不滿,則把任務加入工作隊列,如果已經滿,則進入步驟③;
③判斷線程池的線程是否都處于工作狀態,如果沒有,則創建一個新線程來執行任務,否則執行拒絕策略。

核心的execute代碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果核心線程數小于當前池中的線程數,則創建新線程執行任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果當前池中的線程數大于等于核心線程數,或者創建線程失敗,則加入工作隊列
if (runState == RUNNING && workQueue.offer(command)) {
//如果線程池不再運行或為第一個核心線程執行時
if (runState != RUNNING || poolSize == 0)
 ensureQueuedTaskHandled(command);
}
//如果線程池不在運行狀態或者加入工作隊列失敗
//則嘗試在最大線程數下創建新線程執行任務
else if (!addIfUnderMaximumPoolSize(command))
//再失敗,則拒絕任務
reject(command); // is shutdown or saturated
}
}

從代碼看出,可以分為以下情況:
①當前的線程數小于核心線程數,則可以繼續創建新線程來執行任務;
②如果運行的線程多于corePoolSize,則把任務加入workQueue隊列;
③如果無法將任務加入隊列(隊列已滿),則創建新線程執行任務;
④如果創建新線程后的線程數大于maximunPoolSize,則創建失敗,并通過拒絕策略拒絕執行新任務;

把任務交給worker線程處理,addThread方法實現了這個邏輯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Thread addThread(Runnable firstTask) {
//構造一個worker任務
Worker w = new Worker(firstTask);
//產生一個新線程,把woker封裝進去
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}

3、worker線程實現

worker線程是真正實現執行任務的線程,是通過內部類Worker來實現的,Worker通過在run方法中循環從

workQueue隊列中獲取任務來執行:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//從阻塞隊列中獲取任務執行
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

run方法通過getTask()方法不斷從阻塞隊列中獲取任務,getTask實現過程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Runnable getTask() {
for (;;) {
try {
int state = runState;
//線程池不在運行狀態
if (state > SHUTDOWN)
return null;
Runnable r;
//關閉狀態,把任務poll出
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//超時版本的poll
 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//帶有阻塞的take,如果隊列為空則阻塞
r = workQueue.take();
if (r != null)
return r;
//如果worker線程能夠退出
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
//清理空閑的worker
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

Worker類中的runTask方法是實際執行任務的方法,其實現過程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
//加鎖
runLock.lock();
try {
//檢查線程池狀態和線程狀態
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
//執行之前可以插入的操作
beforeExecute(thread, task);
try {
task.run();
ran = true;
//執行之后
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}

可以看到,在任務執行的前后都可以插入一些代碼,由子類來實現監控或其它功能。run方法退出前會執行workDown方法來維護線程池的一些狀態信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//增加完成的任務計數
completedTaskCount += w.completedTasks;
//從worker隊列移出
workers.remove(w);
//如果當前的線程數為0,則嘗試停止線程池
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}

至此,線程池執行任務的相關邏輯結束。

4、關閉線程池

關閉線程池可以通過shutdown()方法或shutdownNow方法來實現,在關閉前都需要進行安全檢查,以防惡意代碼執行。兩個方法不同的地方在于中斷worker線程不一樣,shutdown調用的是interruptIfIdle方法,而shutdownNow比較粗暴,直接interrupt線程。但shutdownNow會把當前隊列中的任務拿出來返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void shutdown() { 

SecurityManager security = System.getSecurityManager();
//檢查是否有權限
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再次檢查調用者是否有權限修改線程
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
//修改線程池狀態
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;

try {
//中斷所有空閑worker線程
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
//關閉線程池
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
//嘗試加鎖
if (runLock.tryLock()) {
try {
//如果worker中的線程不是當前調用線程
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}

5、并發控制

ThreadPoolExecutor在實現中使用到了ReentrantLock來進行并發控制。核心的表示線程數量的int變量都使用了volatile來保證它的可見性。在Worker實現中也用到了ReentrantLock來控制任務的執行。ThreadPoolExecutor的主要方法都是在lock的控制下進行,比如創建線程時,關閉線程池時。所以,為了避免鎖帶來的問題,在execute方法中,有這樣一個判斷:如果當前線程數大于核心線程時,將會把任務加入隊列中,這個過程是不需要加鎖的,這樣就提升了線程池的性能。當線程池預熱創建線程數達到核心線程數后都將執行此邏輯。

二、ThreadPoolExecutor的使用

1、創建線程池

一般不推薦直接通過構造函數來創建,應該通過Executors類的工廠方法來創建線程池。因為如果配置的參數不當,線程池的性能可能會受到很大影響。
創建線程池需要指定如下參數:
• corePoolSize: 核心線程數;可以通過prestartAllCoreThreads方法來預熱創建核心線程;
• runnableTaskQueue: 任務隊列,是阻塞隊列blockingQueue的各種實現類。
• maximunPoolSize:池中允許的最大線程數。
• threadFactory: 線程工廠類
• RejectExecutionHandler:拒絕策略處理類。

2、提交任務

提交任務可以使用execute方法或者submit方法,后者會返回一個異步計算結果。

3、關閉線程池

可以通過調用shutdown或shutdownNow方法來關閉線程池。

4、根據任務來配置線程池

任務性質:CPU密集型 、IO密集型、混合型
任務的優先級:高、中、低
任務的執行時間:長、中、短
任務的依賴性:是否依賴外部資源,如數據庫。

CPU密集型任務可以配置少些線程,IO密集型可以配置多些線程,優先級不同可以用PriorityBlockingQueue來作為工作隊列。對于依賴外部資源的任務,因為空閑線程可能較多,可以配置多些線程來利用CPU資源 。

0條評論
作者已關閉評論
chuoo
13文章數
0粉絲數
chuoo
13 文章 | 0 粉絲
原創

ThreadPoolExecutor實現分析

2023-08-01 12:32:49
6
0

ThreadPoolExecutor是一個ExecutorService,實現了Executor接口,是一個線程池服務的實現。通常通過Executors類的工廠方法來創建。

一、實現原理

1、關鍵要素

要理解線程池的實現原理,主要理解以下的關鍵要素:

• 線程池的狀態:提供了線程池生命周期中所處的不同狀態,分別為以下狀態(分別用0,1,2,3表示):
①RUNNING: 可以接收新任務和可以處理隊列中的任務;
②SHUTDOWN: 不接收新任務,可以繼續處理隊列中的任務;
③STOP: 不接收新任務,也不處理隊列任務,中斷在執行的任務;
④TERMINATE: 類似STOP,所有的線程都會停止。
• 線程數量控制參數:用于控制何時創建線程,何時不能創建線程和任務邏輯控制。
①corePoolSize: 池中核心線程的數量;
②maximunPoolSize: 池中最大的線程數量;
③poolSize: 當前池中的線程數量;
• 工作者線程和工作隊列:工作者線程從工作隊列中獲取任務并執行,如果設置了超時參數,工作者線程如果沒有任務時,會被回收。工作隊列使用的都是阻塞BlockingQueue。
• 線程工廠:ThreadFactory,用于創建新線程,可以自定義線程的一些屬性,比如名稱等,如果在初始化是沒有配置,就會使用默認的ThreadFactory。
• 保持活動的時間:keepAliveTime,當線程池中有多于corePoolSize的線程時,多出的線程空閑時間超過keepAliveTime時將會被清理。
• 拒絕策略:當線程池已經關閉或線程數量達到最大數或工作隊列已經飽和時,將拒絕執行新的任務, 主要實現了以下4種拒絕策略:
①AbortPolicy:中斷策略,默認的策略,拒絕任務并拋出運行時異常RejectedExecutionException;
②CallerRunsPolicy:使用調用者線程執行任務,除非線程池已經shutdown。
③DiscardPolicy:不能執行的任務將刪除;
④DiscardOldestPolicy:如果執行程序未關閉,位于隊列頭的任務被刪除,然后重試執行程序(失敗則重復此過程)。

以上的線程池關鍵要素都會在構造方法中初始化,并根據初始化配置的不同,將產生不同特點的線程池。比如
FixedThreadPool,CachedThreadPool,SingleThreadPool等。

2、執行任務

怎么執行提交到池中的任務是線程池的核心,是由execute方法實現的,執行的過程如下:
①判斷核心線程是否都在執行任務,如果不是,則創建新線程來執行任務,如果核心線程都在執行任務,則進入②;
②判斷工作隊列是否已經滿,如果不滿,則把任務加入工作隊列,如果已經滿,則進入步驟③;
③判斷線程池的線程是否都處于工作狀態,如果沒有,則創建一個新線程來執行任務,否則執行拒絕策略。

核心的execute代碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//如果核心線程數小于當前池中的線程數,則創建新線程執行任務
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
//如果當前池中的線程數大于等于核心線程數,或者創建線程失敗,則加入工作隊列
if (runState == RUNNING && workQueue.offer(command)) {
//如果線程池不再運行或為第一個核心線程執行時
if (runState != RUNNING || poolSize == 0)
 ensureQueuedTaskHandled(command);
}
//如果線程池不在運行狀態或者加入工作隊列失敗
//則嘗試在最大線程數下創建新線程執行任務
else if (!addIfUnderMaximumPoolSize(command))
//再失敗,則拒絕任務
reject(command); // is shutdown or saturated
}
}

從代碼看出,可以分為以下情況:
①當前的線程數小于核心線程數,則可以繼續創建新線程來執行任務;
②如果運行的線程多于corePoolSize,則把任務加入workQueue隊列;
③如果無法將任務加入隊列(隊列已滿),則創建新線程執行任務;
④如果創建新線程后的線程數大于maximunPoolSize,則創建失敗,并通過拒絕策略拒絕執行新任務;

把任務交給worker線程處理,addThread方法實現了這個邏輯:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Thread addThread(Runnable firstTask) {
//構造一個worker任務
Worker w = new Worker(firstTask);
//產生一個新線程,把woker封裝進去
Thread t = threadFactory.newThread(w);
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}

3、worker線程實現

worker線程是真正實現執行任務的線程,是通過內部類Worker來實現的,Worker通過在run方法中循環從

workQueue隊列中獲取任務來執行:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
try {
Runnable task = firstTask;
firstTask = null;
//從阻塞隊列中獲取任務執行
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}

run方法通過getTask()方法不斷從阻塞隊列中獲取任務,getTask實現過程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Runnable getTask() {
for (;;) {
try {
int state = runState;
//線程池不在運行狀態
if (state > SHUTDOWN)
return null;
Runnable r;
//關閉狀態,把任務poll出
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
//超時版本的poll
 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
//帶有阻塞的take,如果隊列為空則阻塞
r = workQueue.take();
if (r != null)
return r;
//如果worker線程能夠退出
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
//清理空閑的worker
interruptIdleWorkers();
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}

Worker類中的runTask方法是實際執行任務的方法,其實現過程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
//加鎖
runLock.lock();
try {
//檢查線程池狀態和線程狀態
if (runState < STOP &&
Thread.interrupted() &&
runState >= STOP)
thread.interrupt();
boolean ran = false;
//執行之前可以插入的操作
beforeExecute(thread, task);
try {
task.run();
ran = true;
//執行之后
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}

可以看到,在任務執行的前后都可以插入一些代碼,由子類來實現監控或其它功能。run方法退出前會執行workDown方法來維護線程池的一些狀態信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//增加完成的任務計數
completedTaskCount += w.completedTasks;
//從worker隊列移出
workers.remove(w);
//如果當前的線程數為0,則嘗試停止線程池
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
}

至此,線程池執行任務的相關邏輯結束。

4、關閉線程池

關閉線程池可以通過shutdown()方法或shutdownNow方法來實現,在關閉前都需要進行安全檢查,以防惡意代碼執行。兩個方法不同的地方在于中斷worker線程不一樣,shutdown調用的是interruptIfIdle方法,而shutdownNow比較粗暴,直接interrupt線程。但shutdownNow會把當前隊列中的任務拿出來返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void shutdown() { 

SecurityManager security = System.getSecurityManager();
//檢查是否有權限
if (security != null)
security.checkPermission(shutdownPerm);

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再次檢查調用者是否有權限修改線程
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
//修改線程池狀態
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN;

try {
//中斷所有空閑worker線程
for (Worker w : workers) {
w.interruptIfIdle();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
//關閉線程池
tryTerminate(); // Terminate now if pool and queue empty
} finally {
mainLock.unlock();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
//嘗試加鎖
if (runLock.tryLock()) {
try {
//如果worker中的線程不是當前調用線程
if (thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}

5、并發控制

ThreadPoolExecutor在實現中使用到了ReentrantLock來進行并發控制。核心的表示線程數量的int變量都使用了volatile來保證它的可見性。在Worker實現中也用到了ReentrantLock來控制任務的執行。ThreadPoolExecutor的主要方法都是在lock的控制下進行,比如創建線程時,關閉線程池時。所以,為了避免鎖帶來的問題,在execute方法中,有這樣一個判斷:如果當前線程數大于核心線程時,將會把任務加入隊列中,這個過程是不需要加鎖的,這樣就提升了線程池的性能。當線程池預熱創建線程數達到核心線程數后都將執行此邏輯。

二、ThreadPoolExecutor的使用

1、創建線程池

一般不推薦直接通過構造函數來創建,應該通過Executors類的工廠方法來創建線程池。因為如果配置的參數不當,線程池的性能可能會受到很大影響。
創建線程池需要指定如下參數:
• corePoolSize: 核心線程數;可以通過prestartAllCoreThreads方法來預熱創建核心線程;
• runnableTaskQueue: 任務隊列,是阻塞隊列blockingQueue的各種實現類。
• maximunPoolSize:池中允許的最大線程數。
• threadFactory: 線程工廠類
• RejectExecutionHandler:拒絕策略處理類。

2、提交任務

提交任務可以使用execute方法或者submit方法,后者會返回一個異步計算結果。

3、關閉線程池

可以通過調用shutdown或shutdownNow方法來關閉線程池。

4、根據任務來配置線程池

任務性質:CPU密集型 、IO密集型、混合型
任務的優先級:高、中、低
任務的執行時間:長、中、短
任務的依賴性:是否依賴外部資源,如數據庫。

CPU密集型任務可以配置少些線程,IO密集型可以配置多些線程,優先級不同可以用PriorityBlockingQueue來作為工作隊列。對于依賴外部資源的任務,因為空閑線程可能較多,可以配置多些線程來利用CPU資源 。

文章來自個人專欄
文章 | 訂閱
0條評論
作者已關閉評論
作者已關閉評論
0
0