author: 覃育龍
update:2023/3/16
本文主要講移植入mysql中的線程池的原理
MySQL中的連接管理
連接和線程管理相關的類
mysql中連接和線程管理相關的代碼主要在mysql/sql/conn_handler中,其中主要涉及的類包括:
- Connection_handler_manager:單例類,用來管理連接處理器;
- Connection_handler:連接處理的抽象類,具體實現由其子類實現;
- Per_thread_connection_handler:繼承了Connection_handler,每一個連接用單獨的線程處理,默認為該實現,通過thread_handling參數可以設置;
- One_thread_connection_handler:繼承了Connection_handler,所有連接用同一個線程處理;
- Plugin_connection_handler:繼承了Connection_handler,支持由Plugin具體實現的handler,例如線程池;
- Connection_acceptor:一個模版類,以模版方式支持不同的監聽實現(三個listener)。并且提供一個死循環,用以監聽連接;
- Mysqld_socket_listener:實現以Socket的方式監聽客戶端的連接事件,并且支持TCP socket和Unix socket,即對應的TCP_socket和Unix_socket;
- Shared_mem_listener:通過共享內存的監聽方式(windows下);
- Named_pipe_listener:通過命名管道來監聽和接收客戶請求(windows下);
- Channel_info:連接信道的抽象類,具體實現有Channel_info_local_socket和Channel_info_tcpip_socket,
- Channel_info_local_socket:與本地方式與服務器進行交互;
- Channel_info_tcpip_socket:以TCP/IP方式與服務器進行交互;
- TCP_socket:TCP socket,與MySQL服務不同機器的連接訪問;
- Unix_socket:Unix socket,與MySQL服務相同主機的連接訪問;
其類圖關系如下:
其中,三個Listener將作為Connection_acceptor模版類作為具體的Listener。
簡單來說就是通過listener監聽請求,并創建連接Channel_Info的具體類,然后通過單例Connection_handler_manager指派給具體的Connection_Handler(Per_thread_connection_handler/One_thread_connection_handler/Thread_pool_connection_handler)進行調度。
連接的流程
連接管理器的初始化(Connection_handler_manager)
Connection_handler_manager這個單例類初始化時會根據server啟動參數或配置文件來初始化,根據thread_handling來初始化connection_handler
其中Per_thread_connection_handler無論mysqld使用哪種方式均會被初始化
> mysqld_main()
> init_common_variables() // 初始化變量
> get_options() // 獲取server啟動參數
> Connection_handler_manager::init() {// 初始化連接管理的單例類
Per_thread_connection_handler::init()
Connection_handler *connection_handler = nullptr;
switch (Connection_handler_manager::thread_handling) {
case SCHEDULER_ONE_THREAD_PER_CONNECTION:
connection_handler = new (std::nothrow) Per_thread_connection_handler();
break;
case SCHEDULER_NO_THREADS:
connection_handler = new (std::nothrow) One_thread_connection_handler();
break;
case SCHEDULER_THREAD_POOL:
connection_handler = new (std::nothrow) Thread_pool_connection_handler();
break;
default:
assert(false);
}
m_instance =
new (std::nothrow) Connection_handler_manager(connection_handler);
}
監聽處理循環
mysqld主函數開啟后,進入Connection_acceptor::connection_event_loop,在windows系統上是由三個線程分別調用Socket/NamePipe/SharedMemory的connection_event_loop;(Connection_acceptor是一個模板類,分別有3種實現)
//sql/conn_handler/connection_acceptor.h
void connection_event_loop() {
Connection_handler_manager *mgr =
Connection_handler_manager::get_instance();
while (!connection_events_loop_aborted()) {
Channel_info *channel_info = m_listener->listen_for_connection_event();
if (channel_info != nullptr) mgr->process_new_connection(channel_info);
}
}
該函數作用就是通過監聽客戶請求listen_for_connection_event(),然后處理請求m_listener->process_new_connection()
其中通過connection_events_loop_aborted()來判斷是否被終止
Mysqld_socket_listener的處理
初始化
通過Mysqld_socket_listener::setup_listener()來進行初始化工作。
Mysqld_socket_listener具體監聽的列表存儲在m_socket_vector中,由setup_listener()來完成初始化。
在m_socket_vector插入的順序為: admin socket-> tcp socket-> unix socket -> event socket。 所以說linster會監聽多個socket,最后才是普通的socket。
獲取請求
Mysqld_socket_listener::listen_for_connection_event() {
#ifdef HAVE_POLL
int retval = poll();
#else
int retval = select();
// 獲取一個新的連接請求
const Listen_socket *listen_socket = get_listen_socket();
...
// 接受請求
accept_connection(listen_socket->m_socket, &onnect_sock)
// 構造Channel_info
Channel_info *channel_info = nullptr;
if (listen_socket->m_socket_type == ocket_type::UNIX_SOCKET)
channel_info = new (std::nothrow) hannel_info_local_socket(connect_sock);
else
channel_info = new (std::nothrow) hannel_info_tcpip_socket(...);
return channel_info;
}
通過Mysqld_socket_listener::listen_for_connection_event()來監聽請求,有poll與select兩種方式。
在監聽到有新請求之后,調用Mysqld_socket_listener::get_listen_socket()獲取監聽到新請求的socket,具體邏輯如下:
- 首先檢查是否有admin的請求,有則返回admin的socket(首先處理admin socket)
- 返回第一個有請求的socket
接受請求
獲取到有效socket后,進入accept_connection()->mysql_socket_accept調用accept接收請求。
然后構造一個Channel_info,Channel_info_local_socket(localhost登錄)或Channel_info_tcpip_socket(ip登錄),并作為listen_for_connection_event()的返回值,所有與連接的相關的信息都保存在Channel_info中。
處理新連接(process_new_connection)
// sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
Channel_info *channel_info) {
if (connection_events_loop_aborted() ||
!check_and_incr_conn_count(channel_info->is_admin_connection())) {
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
sql_print_warning("%s", ER_DEFAULT(ER_CON_COUNT_ERROR));
delete channel_info;
return;
}
if (m_connection_handler->add_connection(channel_info)) {
inc_aborted_connects();
delete channel_info;
}
}
1.首先判斷是否超過連接上限
其代碼邏輯如下:
- 首先檢查服務是否已經停止,然后check_and_incr_conn_count增加連接計數,如果超過連接上限,那么將拒絕連接,并關閉連接;
- 將連接傳入Connection_handler::add_connection()中
實際上mysql會運行max_connections + 1個連接,當連接數滿時,為super用戶保留最后一個連接
2.執行add_connection()
最終add_connetion()的執行mysql中配置的線程處理方式決定,有三種:
- one-thread-per-connection: 每個連接一個線程,線程關閉后線程會等待,直到下次新的連接后復用該線程。
- no-threads:在一個工作線程中處理的所有連接。
- pool-of-threads: 線程池方式
one-thread-per-connection方式
先查看是否有空閑線程,如果有則直接用空閑線程處理;否則將創建一個新的線程來處理新連接。
bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
int error = 0;
my_thread_handle id;
if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;
/*
There are no idle threads avaliable to take up the new
connection. Create a new thread to handle the connection
*/
channel_info->set_prior_thr_create_utime();
error =
mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
handle_connection, (void *)channel_info);
...
Global_THD_manager::get_instance()->inc_thread_created();
DBUG_PRINT("info", ("Thread created"));
return false;
}
check_idle_thread_and_enqueue_connection 做的事情就是檢查當前是否有空閑狀態的線程;有的話,則將channel_info加入到隊列中,并向空閑線程發送信號;否則創建新線程,并執行handle_connection函數。
handle_connection函數的邏輯如下:
- 首先是初始化線程需要的內存;
- 然后創建一個THD對象init_new_thd;
- 創建/或重用psi對象,并加到thd對象;
- 將thd對象加到thd manager中;
- 調用thd_prepare_connection做驗證;
- do_command(thd)處理query
- end_connection,關閉一個已經建立的連接,
- close_connection,disconnect關閉當前會話的vio;
- 釋放資源thd中的資源
- 阻塞等待下一個連接,block_until_new_connection(),監聽接收前面提到的check_idle_thread_and_enqueue_connection發過來的信號
線程池
線程池的總覽
主要數據結構
connection_t
連接結構體,一個連接/事件對應一個connection_t,一個connection_t對應一個THD
struct connection_t {
THD *thd; // THD
thread_group_t *thread_group; // 該連接所在的線程組
connection_t *next_in_queue; // 用于connection_queue_t
connection_t **prev_in_queue; // 用于connection_queue_t
ulonglong abs_wait_timeout; // 連接時間,用于超時關閉
bool logged_in; // 是否登入
bool bound_to_poll_descriptor; // 是否獲取到poolfd
bool waiting; // 是否在等待
uint tickets; // threadpool_high_prio_mode=TP_HIGH_PRIO_MODE_TRANSACTIONS下,在高優先級隊列中的票數
};
thread_group_t
線程組結構體,在該線程池的框架中,最多有128(MAX_THREAD_GROUPS)個線程組
struct alignas(128) thread_group_t {
mysql_mutex_t mutex;
connection_queue_t queue; // 待處理連接隊列
connection_queue_t high_prio_queue; // 高優先級待處理連接隊列
worker_list_t waiting_threads; // 等待線程隊列,線程每次執行完后會進入睡眠狀態等待下次喚醒工作
worker_thread_t *listener; // 監聽線程,負責監聽命令事件,事件隊列為空時也會充當工作線程
pthread_attr_t *pthread_attr; // 線程屬性
int pollfd; // poll文件描述符
int thread_count; // 線程數
int active_thread_count; // 活躍線程數
int connection_count; // 連接數
int waiting_thread_count; // 活躍連接數
/* Stats for the deadlock detection timer routine.*/
int io_event_count; // io事件數
int queue_event_count; // 隊列中已處理事件數量
ulonglong last_thread_creation_time;// 上一個線程創建的時間
int shutdown_pipe[2]; // 管道通信
bool shutdown; // 關閉標志
bool stalled; // 停滯標志
char padding[328]; // 填充
};
waiting_threads: 等待線程隊列,線程每次執行完后會將線程添加到等待列表的頭部并等待。重要的是要把線加在頭部而不是尾部,因為它確保了LIFO喚醒順序(熱緩存,工作不活動超時)
worker_thread_t
工作線程結構體,一個結構體對應一個worker
struct worker_thread_t {
ulonglong event_count; /* number of request handled by this thread */
thread_group_t *thread_group; // 所在線程組
worker_thread_t *next_in_list; // 用于worker_list_t
worker_thread_t **prev_in_list; // 用于worker_list_t
mysql_cond_t cond; // 用于喚醒線程的信號量
bool woken; // 是否被喚醒
};
pool_timer_t
用于所有線程組的一個全局計時器
struct pool_timer_t {
mysql_mutex_t mutex; // 鎖
mysql_cond_t cond; // 信號量
std::atomic<uint64> current_microtime; // 當前時間,微秒
std::atomic<uint64> next_timeout_check; // 下一次檢查的事件
int tick_interval; // 節拍
bool shutdown; // 關閉標志
};
線程池初始化
線程池的初始化在tp_init()中
//sql/threadpool_unix.cc
bool tp_init() {
DBUG_ENTER("tp_init");
threadpool_started = true;
for (uint i = 0; i < array_elements(all_groups); i++) {
thread_group_init(&all_groups[i], get_connection_attrib());
}
tp_set_threadpool_size(threadpool_size);
...
pool_timer.tick_interval = threadpool_stall_limit;
start_timer(&pool_timer);
DBUG_RETURN(false);
}
在tp_init()主要初始化工作:
- 初始化所有(MAX_THREAD_GROUPS 128)的線程組, 初始化每個線程組的屬性和狀態等信息
- 初始化threadpool_size個線程組的pollfd
- 啟動start_timer定時器線程
處理新連接
將新連接加入線程池
下面是線程池從add_connetion開始,將新連接加入到線程池中的過程
> Thread_pool_connection_handler::add_connection(Channel_info *channel_info) {
THD *const thd = channel_info->create_thd(); // 創建THD
connection_t *const connection = alloc_connection(thd); // 初始化connection_t
...
// 初始化thd的一些信息
thd->set_new_thread_id();
thd->start_utime = my_micro_time();
thd->scheduler = &tp_event_functions;
Global_THD_manager::get_instance()->add_thd(thd);
thd->event_scheduler.data = connection;
// 分配一個線程組
thread_group_t *group = &all_groups[thd->thread_id() % group_count];
// 將new connetion添加到工作隊列,實際登錄操作由worker完成
queue_put(group, connection);
> queue_put(thread_group_t *thread_group, connection_t *connection) {
connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; // 取票
thread_group->queue.push_back(connection); // 加入工作隊列
// 如果活躍線程為0則創建或喚醒一個工作線程
wake_or_create_thread(thread_group, connection->thd->is_admin_connection());
> wake_or_create_thread() {
// 若喚醒成功則退出
// 喚醒過程就是在waiting_threads中找第一個線程,然后給它發送信號量
if (wake_thread(thread_group) == 0) DBUG_RETURN(0);
// 1. 當前線程組線程數大于連接數,則不用創建,返回
// 2. 如果活躍線程為0或為admin_connection連接,則創建直接工作線程
// 3. 如果time_since_last_thread_created大于限制創建的時間則創建
create_worker(thread_group, admin_connection) {
// 創建工作線程 worker_main
> err = mysql_thread_create(key_worker_thread, &thread_id,
thread_group->pthread_attr, worker_main,
thread_group);
}
}
}
}
工作線程(worker_main)
worker_main主要工作內容:
- get_event: 阻塞的從當前線程組中獲取一個的待處理事件(connection with pending event)
- handle_event:處理該連接
worker_main()
static void *worker_main(void *param) {
/* Init per-thread structure */
worker_thread_t this_thread;
mysql_cond_init(key_worker_cond, &this_thread.cond);
this_thread.thread_group = thread_group;
this_thread.event_count = 0;
/* Run event loop */
for (;;) {
connection_t *connection;
struct timespec ts;
set_timespec(&ts, threadpool_idle_timeout);
connection = get_event(&this_thread, thread_group, &ts);
if (!connection) break; // 如果超時已過期或關閉,則返回NULL,即退出線程
this_thread.event_count++;
handle_event(connection);
}
/* Thread shutdown: cleanup per-worker-thread structure. */
...
my_thread_end();
return nullptr;
}
工作線程的退出的情況(get_event返回空):
- 線程組被關閉
- 檢測到too_many_busy_threads,與參數thread_pool_oversubscribe有關(限制其小于1 + thread_pool_oversubscribe)
- 睡眠時mysql_cond_timedwait超時或出錯
- 睡眠時mysql_cond_wait出錯
get_event():從隊列或socket拿取一個事件
- oversubscribed表示是否系統CUP負載太高,已防止同時運行太多線程;與參數threadpool_oversubscribe有關
- 高優先級事件:與參數thread_pool_high_prio_mode有關
- 工作線程的睡眠狀態:在get_event()中,停在current_thread->cond信號上(mysql_cond_timedwait/mysql_cond_wait)
get_event()主要流程概述:
- 非oversubscribed時
- 從隊列中取一個連接/事件并返回,沒取到則繼續
- 檢查是否有監聽線程,沒有則變成監聽線程;有則繼續
- 從連接線程組socket中取一個連接或事件
- 沒取到連接,進入睡眠狀態
- 拿到連接,如果是高優先級事件則直接返回
- 否則判斷是否有太多忙線程,忙將事件放入隊列并進入睡眠狀態;否則返回連接
- oversubscribed時
- 檢查是否有監聽線程,沒有則變成監聽線程;有則繼續
- 進入睡眠狀態
listener():監聽線程
監聽線程主要工作:
- (優先)隊列中有事件時,常駐;負責獲取socket中的事件并將它們分發給工作線程(推入隊列中)
- (優先)隊列中無事件時,即空閑時:講N-1個事件推入隊列,返回第一個事件,最終get_event()會返回并由handle_event()處理
handle_event():實際處理連接
handle_event()主要工作:
- 未登錄狀態
- 進行threadpool_add_connection,進行一些初始化、登錄等工作(thd_prepare_connection)
- 處理的是從add_conntion被放入隊列中的新連接
- 已登錄狀態
- 進行threadpool_process_request,最后進入do_command進行處理事務
- 處理的是get_event或listener(隊列為空時)從socket獲取的事件
static void handle_event(connection_t *connection) {
...
if (!connection->logged_in) {
err = threadpool_add_connection(connection->thd);
connection->logged_in = true;
} else {
err = threadpool_process_request(connection->thd);
}
...
set_wait_timeout(connection);
}
對于threadpool_process_request函數:
函數中的循環與thread-per-connections的邏輯類似,但是這個函數的目標是執行單個QUERY,所以正常情況下,下面的循環只會執行一次
對于SSL的連接,可能會執行多次,由vio->has_data()判斷數據是否結束
/**
Process a single client request or a single batch.
*/
int threadpool_process_request(THD *thd) {
thread_attach(thd);
...
for (;;) {
Vio *vio;
thd_set_net_read_write(thd, 0);
if ((retval = do_command(thd)) != 0) goto end;
if (!thd_connection_alive(thd)) {
retval = 1;
goto end;
}
vio = thd->get_protocol_classic()->get_vio();
if (!vio->has_data(vio)) {
/* More info on this debug sync is in sql_parse.cc*/
DEBUG_SYNC(thd, "before_do_command_net_read");
thd_set_net_read_write(thd, 1);
goto end;
}
if (!thd->m_server_idle) {
MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
thd->m_server_idle = true;
}
}
end:
...
return retval;
}
計時器線程(timer_thread)
計時器線程主要工作:
- 檢查每個線程組是否停滯(stall)
- 檢查每個連接(THD)是否超時,超時則kill掉
static void *timer_thread(void *param) noexcept {
...
for (;;) {
...
if (err == ETIMEDOUT) {
timer->current_microtime.store(my_microsecond_getsystime(),
std::memory_order_relaxed);
/* Check stalls in thread groups */
for (size_t i = 0; i < array_elements(all_groups); i++) {
if (all_groups[i].connection_count) check_stall(&all_groups[i]);
}
/* Check if any client exceeded wait_timeout */
if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
timer->current_microtime.load(std::memory_order_relaxed))
timeout_check(timer);
}
}
...
}
check_stall流程:
- 檢測listener是否存在
- 如果不存在,檢查是否由io事件被彈出
- 如果沒有,表明listener被阻塞,創建一個工作線程(而后它會變成listener)并退出
- 重置io_event_count
- 判斷是否線程組是否stall:
- thread_group->queue_event_count表示從隊列中被處理事件的數量,每次會被重置
- 如果自從上一次check_stall到現在仍然為0且隊列和優先隊列不為空,則確定stall
- 如果確定stall了,則設置thread_group->stalled,喚醒,或創建新工作線程
- 當一個事件從隊列中出去,則表明恢復正常,重置thread_group->stalled
static void check_stall(thread_group_t *thread_group) {
...
// 如果listener不存在,則創建一個工作線程(而后它會變成listener)
if (!thread_group->listener && !thread_group->io_event_count) {
wake_or_create_thread(thread_group);
mysql_mutex_unlock(&thread_group->mutex);
return;
}
// 重置io_event_count
thread_group->io_event_count = 0;
// queue_event_count為0且隊列和優先隊列不為空,確定stall
if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
thread_group->stalled = true;
wake_or_create_thread(thread_group);
}
// check完后重置queue_event_count
thread_group->queue_event_count = 0;
mysql_mutex_unlock(&thread_group->mutex);
}
線程池退出
調用tp_end(), 回收線程組資源,停止定時器線程,最終會等待所有線程結束后才會退出
線程池相關參數
系統變量
|
變量 |
說明 |
|
thread_handling |
pool-of-threads開啟線程池模式 |
|
threadpool_idle_timeout |
用于限制空閑線程退出前應該等待的時間 |
|
threadpool_high_prio_mode |
用于對全局或每個連接的高優先級調度提供更細粒度的控制 |
|
threadpool_high_prio_tickets |
控制高優先級隊列策略,為每個新連接分配的票數,以進入高優先級隊列。將此變量設置為0將禁用高優先級隊列 |
|
threadpool_max_threads |
用于限制線程池中的最大線程數 |
|
threadpool_oversubscribe |
該參數的值越高,可以同時運行的線程就越多,如果該值低于3,則可能導致更多的睡眠和喚醒 |
|
threadpool_size |
用于定義同時可以使用CPU的線程數 |
|
threadpool_stall_limit |
認定為stall的時間,當達到此限制時,線程池將喚醒或創建另一個線程 |
狀態變量
|
變量 |
說明 |
|
Threadpool_idle_threads |
顯示線程池中空閑線程的數量 |
|
Threadpool_threads |
顯示線程池中的線程數。 |
參考文檔:
MySQL源碼閱讀2-連接與線程管理
線程池移植原理文檔
percona線程池文檔