亚欧色一区w666天堂,色情一区二区三区免费看,少妇特黄A片一区二区三区,亚洲人成网站999久久久综合,国产av熟女一区二区三区

  • 發布文章
  • 消息中心
點贊
收藏
評論
分享
原創

線程池原理

2023-04-11 01:21:35
74
0

author: 覃育龍
update:2023/3/16

本文主要講移植入mysql中的線程池的原理

MySQL中的連接管理

連接和線程管理相關的類

mysql中連接和線程管理相關的代碼主要在mysql/sql/conn_handler中,其中主要涉及的類包括:

  • Connection_handler_manager:單例類,用來管理連接處理器;
  • Connection_handler:連接處理的抽象類,具體實現由其子類實現;
  • Per_thread_connection_handler:繼承了Connection_handler,每一個連接用單獨的線程處理,默認為該實現,通過thread_handling參數可以設置;
  • One_thread_connection_handler:繼承了Connection_handler,所有連接用同一個線程處理;
  • Plugin_connection_handler:繼承了Connection_handler,支持由Plugin具體實現的handler,例如線程池;
  • Connection_acceptor:一個模版類,以模版方式支持不同的監聽實現(三個listener)。并且提供一個死循環,用以監聽連接;
  • Mysqld_socket_listener:實現以Socket的方式監聽客戶端的連接事件,并且支持TCP socket和Unix socket,即對應的TCP_socket和Unix_socket;
  • Shared_mem_listener:通過共享內存的監聽方式(windows下);
  • Named_pipe_listener:通過命名管道來監聽和接收客戶請求(windows下);
  • Channel_info:連接信道的抽象類,具體實現有Channel_info_local_socket和Channel_info_tcpip_socket,
  • Channel_info_local_socket:與本地方式與服務器進行交互;
  • Channel_info_tcpip_socket:以TCP/IP方式與服務器進行交互;
  • TCP_socket:TCP socket,與MySQL服務不同機器的連接訪問;
  • Unix_socket:Unix socket,與MySQL服務相同主機的連接訪問;

其類圖關系如下:

其中,三個Listener將作為Connection_acceptor模版類作為具體的Listener。
簡單來說就是通過listener監聽請求,并創建連接Channel_Info的具體類,然后通過單例Connection_handler_manager指派給具體的Connection_Handler(Per_thread_connection_handler/One_thread_connection_handler/Thread_pool_connection_handler)進行調度。

連接的流程

連接管理器的初始化(Connection_handler_manager)

Connection_handler_manager這個單例類初始化時會根據server啟動參數或配置文件來初始化,根據thread_handling來初始化connection_handler
其中Per_thread_connection_handler無論mysqld使用哪種方式均會被初始化

> mysqld_main()
  > init_common_variables() // 初始化變量
    > get_options() // 獲取server啟動參數
      > Connection_handler_manager::init() {// 初始化連接管理的單例類
        Per_thread_connection_handler::init()
        Connection_handler *connection_handler = nullptr;
        switch (Connection_handler_manager::thread_handling) {
          case SCHEDULER_ONE_THREAD_PER_CONNECTION:
            connection_handler = new (std::nothrow) Per_thread_connection_handler();
            break;
          case SCHEDULER_NO_THREADS:
            connection_handler = new (std::nothrow) One_thread_connection_handler();
            break;
          case SCHEDULER_THREAD_POOL:
            connection_handler = new (std::nothrow) Thread_pool_connection_handler();
            break;
          default:
            assert(false);
        }

        m_instance =
            new (std::nothrow) Connection_handler_manager(connection_handler);
      }

監聽處理循環

mysqld主函數開啟后,進入Connection_acceptor::connection_event_loop,在windows系統上是由三個線程分別調用Socket/NamePipe/SharedMemory的connection_event_loop;(Connection_acceptor是一個模板類,分別有3種實現)

//sql/conn_handler/connection_acceptor.h
  void connection_event_loop() {
    Connection_handler_manager *mgr =
        Connection_handler_manager::get_instance();
    while (!connection_events_loop_aborted()) {
      Channel_info *channel_info = m_listener->listen_for_connection_event();
      if (channel_info != nullptr) mgr->process_new_connection(channel_info);
    }
  }

該函數作用就是通過監聽客戶請求listen_for_connection_event(),然后處理請求m_listener->process_new_connection()
其中通過connection_events_loop_aborted()來判斷是否被終止

Mysqld_socket_listener的處理

初始化

通過Mysqld_socket_listener::setup_listener()來進行初始化工作。
Mysqld_socket_listener具體監聽的列表存儲在m_socket_vector中,由setup_listener()來完成初始化。
在m_socket_vector插入的順序為: admin socket-> tcp socket-> unix socket -> event socket。 所以說linster會監聽多個socket,最后才是普通的socket。

獲取請求
Mysqld_socket_listener::listen_for_connection_event() {
  #ifdef HAVE_POLL
    int retval = poll();
  #else
    int retval = select();
  // 獲取一個新的連接請求
  const Listen_socket *listen_socket = get_listen_socket();
  ...
  // 接受請求
  accept_connection(listen_socket->m_socket, &onnect_sock)
  // 構造Channel_info
  Channel_info *channel_info = nullptr;
  if (listen_socket->m_socket_type == ocket_type::UNIX_SOCKET)
    channel_info = new (std::nothrow) hannel_info_local_socket(connect_sock);
  else
    channel_info = new (std::nothrow) hannel_info_tcpip_socket(...);
  return channel_info;
}

通過Mysqld_socket_listener::listen_for_connection_event()來監聽請求,有poll與select兩種方式。

在監聽到有新請求之后,調用Mysqld_socket_listener::get_listen_socket()獲取監聽到新請求的socket,具體邏輯如下:

  • 首先檢查是否有admin的請求,有則返回admin的socket(首先處理admin socket)
  • 返回第一個有請求的socket
接受請求

獲取到有效socket后,進入accept_connection()->mysql_socket_accept調用accept接收請求。
然后構造一個Channel_info,Channel_info_local_socket(localhost登錄)或Channel_info_tcpip_socket(ip登錄),并作為listen_for_connection_event()的返回值,所有與連接的相關的信息都保存在Channel_info中。

處理新連接(process_new_connection)
// sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
    Channel_info *channel_info) {
  if (connection_events_loop_aborted() ||
      !check_and_incr_conn_count(channel_info->is_admin_connection())) {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    sql_print_warning("%s", ER_DEFAULT(ER_CON_COUNT_ERROR));
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info)) {
    inc_aborted_connects();
    delete channel_info;
  }
}

1.首先判斷是否超過連接上限
其代碼邏輯如下:

  • 首先檢查服務是否已經停止,然后check_and_incr_conn_count增加連接計數,如果超過連接上限,那么將拒絕連接,并關閉連接;
  • 將連接傳入Connection_handler::add_connection()中

實際上mysql會運行max_connections + 1個連接,當連接數滿時,為super用戶保留最后一個連接

2.執行add_connection()
最終add_connetion()的執行mysql中配置的線程處理方式決定,有三種:

  • one-thread-per-connection: 每個連接一個線程,線程關閉后線程會等待,直到下次新的連接后復用該線程。
  • no-threads:在一個工作線程中處理的所有連接。
  • pool-of-threads: 線程池方式
one-thread-per-connection方式

先查看是否有空閑線程,如果有則直接用空閑線程處理;否則將創建一個新的線程來處理新連接。

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
  ...
  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

check_idle_thread_and_enqueue_connection 做的事情就是檢查當前是否有空閑狀態的線程;有的話,則將channel_info加入到隊列中,并向空閑線程發送信號;否則創建新線程,并執行handle_connection函數。

handle_connection函數的邏輯如下:

  • 首先是初始化線程需要的內存;
  • 然后創建一個THD對象init_new_thd;
  • 創建/或重用psi對象,并加到thd對象;
  • 將thd對象加到thd manager中;
  • 調用thd_prepare_connection做驗證;
  • do_command(thd)處理query
  • end_connection,關閉一個已經建立的連接,
  • close_connection,disconnect關閉當前會話的vio;
  • 釋放資源thd中的資源
  • 阻塞等待下一個連接,block_until_new_connection(),監聽接收前面提到的check_idle_thread_and_enqueue_connection發過來的信號

線程池

線程池的總覽

主要數據結構

connection_t

連接結構體,一個連接/事件對應一個connection_t,一個connection_t對應一個THD

struct connection_t {
  THD *thd;                       // THD
  thread_group_t *thread_group;   // 該連接所在的線程組
  connection_t *next_in_queue;    // 用于connection_queue_t
  connection_t **prev_in_queue;   // 用于connection_queue_t
  ulonglong abs_wait_timeout;     // 連接時間,用于超時關閉
  bool logged_in;                 // 是否登入
  bool bound_to_poll_descriptor;  // 是否獲取到poolfd
  bool waiting;                   // 是否在等待
  uint tickets;                   // threadpool_high_prio_mode=TP_HIGH_PRIO_MODE_TRANSACTIONS下,在高優先級隊列中的票數
};

thread_group_t

線程組結構體,在該線程池的框架中,最多有128(MAX_THREAD_GROUPS)個線程組

struct alignas(128) thread_group_t {
  mysql_mutex_t mutex;
  connection_queue_t queue;           // 待處理連接隊列
  connection_queue_t high_prio_queue; // 高優先級待處理連接隊列
  worker_list_t waiting_threads;      // 等待線程隊列,線程每次執行完后會進入睡眠狀態等待下次喚醒工作
  worker_thread_t *listener;          // 監聽線程,負責監聽命令事件,事件隊列為空時也會充當工作線程
  pthread_attr_t *pthread_attr;       // 線程屬性
  int pollfd;                         // poll文件描述符
  int thread_count;                   // 線程數
  int active_thread_count;            // 活躍線程數
  int connection_count;               // 連接數
  int waiting_thread_count;           // 活躍連接數
  /* Stats for the deadlock detection timer routine.*/
  int io_event_count;                 // io事件數
  int queue_event_count;              // 隊列中已處理事件數量
  ulonglong last_thread_creation_time;// 上一個線程創建的時間
  int shutdown_pipe[2];               // 管道通信
  bool shutdown;                      // 關閉標志
  bool stalled;                       // 停滯標志
  char padding[328];                  // 填充
};

waiting_threads: 等待線程隊列,線程每次執行完后會將線程添加到等待列表的頭部并等待。重要的是要把線加在頭部而不是尾部,因為它確保了LIFO喚醒順序(熱緩存,工作不活動超時)

worker_thread_t

工作線程結構體,一個結構體對應一個worker

struct worker_thread_t {
  ulonglong event_count; /* number of request handled by this thread */
  thread_group_t *thread_group;   // 所在線程組
  worker_thread_t *next_in_list;  // 用于worker_list_t
  worker_thread_t **prev_in_list; // 用于worker_list_t
  mysql_cond_t cond;              // 用于喚醒線程的信號量
  bool woken;                     // 是否被喚醒
};

pool_timer_t

用于所有線程組的一個全局計時器

struct pool_timer_t {
  mysql_mutex_t mutex;                    // 鎖
  mysql_cond_t cond;                      // 信號量
  std::atomic<uint64> current_microtime;  // 當前時間,微秒
  std::atomic<uint64> next_timeout_check; // 下一次檢查的事件
  int tick_interval;                      // 節拍
  bool shutdown;                          // 關閉標志
};

線程池初始化

線程池的初始化在tp_init()中

//sql/threadpool_unix.cc
bool tp_init() {
  DBUG_ENTER("tp_init");
  threadpool_started = true;

  for (uint i = 0; i < array_elements(all_groups); i++) {
    thread_group_init(&all_groups[i], get_connection_attrib());
  }
  tp_set_threadpool_size(threadpool_size);
  ...

  pool_timer.tick_interval = threadpool_stall_limit;
  start_timer(&pool_timer);
  DBUG_RETURN(false);
}

在tp_init()主要初始化工作:

  1. 初始化所有(MAX_THREAD_GROUPS 128)的線程組, 初始化每個線程組的屬性和狀態等信息
  2. 初始化threadpool_size個線程組的pollfd
  3. 啟動start_timer定時器線程

處理新連接

將新連接加入線程池

下面是線程池從add_connetion開始,將新連接加入到線程池中的過程

> Thread_pool_connection_handler::add_connection(Channel_info *channel_info) {
  THD *const thd = channel_info->create_thd(); // 創建THD
  connection_t *const connection = alloc_connection(thd); // 初始化connection_t
  ...
  // 初始化thd的一些信息
  thd->set_new_thread_id();
  thd->start_utime = my_micro_time();
  thd->scheduler = &tp_event_functions;
  Global_THD_manager::get_instance()->add_thd(thd);
  thd->event_scheduler.data = connection;
  // 分配一個線程組
  thread_group_t *group = &all_groups[thd->thread_id() % group_count];
  // 將new connetion添加到工作隊列,實際登錄操作由worker完成
  queue_put(group, connection);
  > queue_put(thread_group_t *thread_group, connection_t *connection) {
    connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; // 取票
    thread_group->queue.push_back(connection); // 加入工作隊列
    // 如果活躍線程為0則創建或喚醒一個工作線程
    wake_or_create_thread(thread_group, connection->thd->is_admin_connection());
    > wake_or_create_thread() {
      // 若喚醒成功則退出
      // 喚醒過程就是在waiting_threads中找第一個線程,然后給它發送信號量
      if (wake_thread(thread_group) == 0) DBUG_RETURN(0);
      // 1. 當前線程組線程數大于連接數,則不用創建,返回
      // 2. 如果活躍線程為0或為admin_connection連接,則創建直接工作線程
      // 3. 如果time_since_last_thread_created大于限制創建的時間則創建
      create_worker(thread_group, admin_connection) {
        // 創建工作線程 worker_main
       > err = mysql_thread_create(key_worker_thread, &thread_id,
                thread_group->pthread_attr, worker_main,
                thread_group);
      }
    }
  }
}

工作線程(worker_main)

worker_main主要工作內容:

  1. get_event: 阻塞的從當前線程組中獲取一個的待處理事件(connection with pending event)
  2. handle_event:處理該連接
worker_main()
static void *worker_main(void *param) {
  /* Init per-thread structure */
  worker_thread_t this_thread;
  mysql_cond_init(key_worker_cond, &this_thread.cond);
  this_thread.thread_group = thread_group;
  this_thread.event_count = 0;
  /* Run event loop */
  for (;;) {
    connection_t *connection;
    struct timespec ts;
    set_timespec(&ts, threadpool_idle_timeout);
    connection = get_event(&this_thread, thread_group, &ts);
    if (!connection) break; // 如果超時已過期或關閉,則返回NULL,即退出線程
    this_thread.event_count++;
    handle_event(connection);
  }

  /* Thread shutdown: cleanup per-worker-thread structure. */
  ...
  my_thread_end();
  return nullptr;
}

工作線程的退出的情況(get_event返回空):

  • 線程組被關閉
  • 檢測到too_many_busy_threads,與參數thread_pool_oversubscribe有關(限制其小于1 + thread_pool_oversubscribe)
  • 睡眠時mysql_cond_timedwait超時或出錯
  • 睡眠時mysql_cond_wait出錯
get_event():從隊列或socket拿取一個事件
  1. oversubscribed表示是否系統CUP負載太高,已防止同時運行太多線程;與參數threadpool_oversubscribe有關
  2. 高優先級事件:與參數thread_pool_high_prio_mode有關
  3. 工作線程的睡眠狀態:在get_event()中,停在current_thread->cond信號上(mysql_cond_timedwait/mysql_cond_wait)

get_event()主要流程概述:

  1. 非oversubscribed時
    • 從隊列中取一個連接/事件并返回,沒取到則繼續
    • 檢查是否有監聽線程,沒有則變成監聽線程;有則繼續
    • 從連接線程組socket中取一個連接或事件
      • 沒取到連接,進入睡眠狀態
      • 拿到連接,如果是高優先級事件則直接返回
      • 否則判斷是否有太多忙線程,忙將事件放入隊列并進入睡眠狀態;否則返回連接
  1. oversubscribed時
    • 檢查是否有監聽線程,沒有則變成監聽線程;有則繼續
    • 進入睡眠狀態
listener():監聽線程

監聽線程主要工作:

  • (優先)隊列中有事件時,常駐;負責獲取socket中的事件并將它們分發給工作線程(推入隊列中)
  • (優先)隊列中無事件時,即空閑時:講N-1個事件推入隊列,返回第一個事件,最終get_event()會返回并由handle_event()處理
handle_event():實際處理連接

handle_event()主要工作:

  1. 未登錄狀態
    • 進行threadpool_add_connection,進行一些初始化、登錄等工作(thd_prepare_connection)
    • 處理的是從add_conntion被放入隊列中的新連接
  2. 已登錄狀態
    • 進行threadpool_process_request,最后進入do_command進行處理事務
    • 處理的是get_event或listener(隊列為空時)從socket獲取的事件
static void handle_event(connection_t *connection) {
  ...
  if (!connection->logged_in) {
    err = threadpool_add_connection(connection->thd);
    connection->logged_in = true;
  } else {
    err = threadpool_process_request(connection->thd);
  }
  ...
  set_wait_timeout(connection);
}

對于threadpool_process_request函數:
函數中的循環與thread-per-connections的邏輯類似,但是這個函數的目標是執行單個QUERY,所以正常情況下,下面的循環只會執行一次
對于SSL的連接,可能會執行多次,由vio->has_data()判斷數據是否結束

/**
 Process a single client request or a single batch.
*/
int threadpool_process_request(THD *thd) {
  thread_attach(thd);
  ...
  for (;;) {
    Vio *vio;
    thd_set_net_read_write(thd, 0);

    if ((retval = do_command(thd)) != 0) goto end;

    if (!thd_connection_alive(thd)) {
      retval = 1;
      goto end;
    }

    vio = thd->get_protocol_classic()->get_vio();
    if (!vio->has_data(vio)) {
      /* More info on this debug sync is in sql_parse.cc*/
      DEBUG_SYNC(thd, "before_do_command_net_read");
      thd_set_net_read_write(thd, 1);
      goto end;
    }
    if (!thd->m_server_idle) {
      MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
      MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
      thd->m_server_idle = true;
    }
  }

end:
  ...
  return retval;
}

計時器線程(timer_thread)

計時器線程主要工作:

  1. 檢查每個線程組是否停滯(stall)
  2. 檢查每個連接(THD)是否超時,超時則kill掉
static void *timer_thread(void *param) noexcept {
  ...
  for (;;) {
    ...
    if (err == ETIMEDOUT) {
      timer->current_microtime.store(my_microsecond_getsystime(),
                                     std::memory_order_relaxed);

      /* Check stalls in thread groups */
      for (size_t i = 0; i < array_elements(all_groups); i++) {
        if (all_groups[i].connection_count) check_stall(&all_groups[i]);
      }

      /* Check if any client exceeded wait_timeout */
      if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
          timer->current_microtime.load(std::memory_order_relaxed))
        timeout_check(timer);
    }
  }
  ...
}

check_stall流程:

  1. 檢測listener是否存在
    • 如果不存在,檢查是否由io事件被彈出
    • 如果沒有,表明listener被阻塞,創建一個工作線程(而后它會變成listener)并退出
  2. 重置io_event_count
  3. 判斷是否線程組是否stall:
    • thread_group->queue_event_count表示從隊列中被處理事件的數量,每次會被重置
    • 如果自從上一次check_stall到現在仍然為0且隊列和優先隊列不為空,則確定stall
    • 如果確定stall了,則設置thread_group->stalled,喚醒,或創建新工作線程
    • 當一個事件從隊列中出去,則表明恢復正常,重置thread_group->stalled
static void check_stall(thread_group_t *thread_group) {
  ...
  // 如果listener不存在,則創建一個工作線程(而后它會變成listener)
  if (!thread_group->listener && !thread_group->io_event_count) {
    wake_or_create_thread(thread_group);
    mysql_mutex_unlock(&thread_group->mutex);
    return;
  }
  // 重置io_event_count
  thread_group->io_event_count = 0;
  // queue_event_count為0且隊列和優先隊列不為空,確定stall
  if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
    thread_group->stalled = true;
    wake_or_create_thread(thread_group);
  }
  // check完后重置queue_event_count
  thread_group->queue_event_count = 0;

  mysql_mutex_unlock(&thread_group->mutex);
}

線程池退出

調用tp_end(), 回收線程組資源,停止定時器線程,最終會等待所有線程結束后才會退出

線程池相關參數

系統變量

變量

說明

thread_handling

pool-of-threads開啟線程池模式

threadpool_idle_timeout

用于限制空閑線程退出前應該等待的時間

threadpool_high_prio_mode

用于對全局或每個連接的高優先級調度提供更細粒度的控制

threadpool_high_prio_tickets

控制高優先級隊列策略,為每個新連接分配的票數,以進入高優先級隊列。將此變量設置為0將禁用高優先級隊列

threadpool_max_threads

用于限制線程池中的最大線程數

threadpool_oversubscribe

該參數的值越高,可以同時運行的線程就越多,如果該值低于3,則可能導致更多的睡眠和喚醒

threadpool_size

用于定義同時可以使用CPU的線程數

threadpool_stall_limit

認定為stall的時間,當達到此限制時,線程池將喚醒或創建另一個線程

狀態變量

變量

說明

Threadpool_idle_threads

顯示線程池中空閑線程的數量

Threadpool_threads

顯示線程池中的線程數。

參考文檔:

MySQL源碼閱讀2-連接與線程管理
線程池移植原理文檔
percona線程池文檔

0條評論
0 / 1000
qinyl
6文章數
0粉絲數
qinyl
6 文章 | 0 粉絲
qinyl
6文章數
0粉絲數
qinyl
6 文章 | 0 粉絲
原創

線程池原理

2023-04-11 01:21:35
74
0

author: 覃育龍
update:2023/3/16

本文主要講移植入mysql中的線程池的原理

MySQL中的連接管理

連接和線程管理相關的類

mysql中連接和線程管理相關的代碼主要在mysql/sql/conn_handler中,其中主要涉及的類包括:

  • Connection_handler_manager:單例類,用來管理連接處理器;
  • Connection_handler:連接處理的抽象類,具體實現由其子類實現;
  • Per_thread_connection_handler:繼承了Connection_handler,每一個連接用單獨的線程處理,默認為該實現,通過thread_handling參數可以設置;
  • One_thread_connection_handler:繼承了Connection_handler,所有連接用同一個線程處理;
  • Plugin_connection_handler:繼承了Connection_handler,支持由Plugin具體實現的handler,例如線程池;
  • Connection_acceptor:一個模版類,以模版方式支持不同的監聽實現(三個listener)。并且提供一個死循環,用以監聽連接;
  • Mysqld_socket_listener:實現以Socket的方式監聽客戶端的連接事件,并且支持TCP socket和Unix socket,即對應的TCP_socket和Unix_socket;
  • Shared_mem_listener:通過共享內存的監聽方式(windows下);
  • Named_pipe_listener:通過命名管道來監聽和接收客戶請求(windows下);
  • Channel_info:連接信道的抽象類,具體實現有Channel_info_local_socket和Channel_info_tcpip_socket,
  • Channel_info_local_socket:與本地方式與服務器進行交互;
  • Channel_info_tcpip_socket:以TCP/IP方式與服務器進行交互;
  • TCP_socket:TCP socket,與MySQL服務不同機器的連接訪問;
  • Unix_socket:Unix socket,與MySQL服務相同主機的連接訪問;

其類圖關系如下:

其中,三個Listener將作為Connection_acceptor模版類作為具體的Listener。
簡單來說就是通過listener監聽請求,并創建連接Channel_Info的具體類,然后通過單例Connection_handler_manager指派給具體的Connection_Handler(Per_thread_connection_handler/One_thread_connection_handler/Thread_pool_connection_handler)進行調度。

連接的流程

連接管理器的初始化(Connection_handler_manager)

Connection_handler_manager這個單例類初始化時會根據server啟動參數或配置文件來初始化,根據thread_handling來初始化connection_handler
其中Per_thread_connection_handler無論mysqld使用哪種方式均會被初始化

> mysqld_main()
  > init_common_variables() // 初始化變量
    > get_options() // 獲取server啟動參數
      > Connection_handler_manager::init() {// 初始化連接管理的單例類
        Per_thread_connection_handler::init()
        Connection_handler *connection_handler = nullptr;
        switch (Connection_handler_manager::thread_handling) {
          case SCHEDULER_ONE_THREAD_PER_CONNECTION:
            connection_handler = new (std::nothrow) Per_thread_connection_handler();
            break;
          case SCHEDULER_NO_THREADS:
            connection_handler = new (std::nothrow) One_thread_connection_handler();
            break;
          case SCHEDULER_THREAD_POOL:
            connection_handler = new (std::nothrow) Thread_pool_connection_handler();
            break;
          default:
            assert(false);
        }

        m_instance =
            new (std::nothrow) Connection_handler_manager(connection_handler);
      }

監聽處理循環

mysqld主函數開啟后,進入Connection_acceptor::connection_event_loop,在windows系統上是由三個線程分別調用Socket/NamePipe/SharedMemory的connection_event_loop;(Connection_acceptor是一個模板類,分別有3種實現)

//sql/conn_handler/connection_acceptor.h
  void connection_event_loop() {
    Connection_handler_manager *mgr =
        Connection_handler_manager::get_instance();
    while (!connection_events_loop_aborted()) {
      Channel_info *channel_info = m_listener->listen_for_connection_event();
      if (channel_info != nullptr) mgr->process_new_connection(channel_info);
    }
  }

該函數作用就是通過監聽客戶請求listen_for_connection_event(),然后處理請求m_listener->process_new_connection()
其中通過connection_events_loop_aborted()來判斷是否被終止

Mysqld_socket_listener的處理

初始化

通過Mysqld_socket_listener::setup_listener()來進行初始化工作。
Mysqld_socket_listener具體監聽的列表存儲在m_socket_vector中,由setup_listener()來完成初始化。
在m_socket_vector插入的順序為: admin socket-> tcp socket-> unix socket -> event socket。 所以說linster會監聽多個socket,最后才是普通的socket。

獲取請求
Mysqld_socket_listener::listen_for_connection_event() {
  #ifdef HAVE_POLL
    int retval = poll();
  #else
    int retval = select();
  // 獲取一個新的連接請求
  const Listen_socket *listen_socket = get_listen_socket();
  ...
  // 接受請求
  accept_connection(listen_socket->m_socket, &onnect_sock)
  // 構造Channel_info
  Channel_info *channel_info = nullptr;
  if (listen_socket->m_socket_type == ocket_type::UNIX_SOCKET)
    channel_info = new (std::nothrow) hannel_info_local_socket(connect_sock);
  else
    channel_info = new (std::nothrow) hannel_info_tcpip_socket(...);
  return channel_info;
}

通過Mysqld_socket_listener::listen_for_connection_event()來監聽請求,有poll與select兩種方式。

在監聽到有新請求之后,調用Mysqld_socket_listener::get_listen_socket()獲取監聽到新請求的socket,具體邏輯如下:

  • 首先檢查是否有admin的請求,有則返回admin的socket(首先處理admin socket)
  • 返回第一個有請求的socket
接受請求

獲取到有效socket后,進入accept_connection()->mysql_socket_accept調用accept接收請求。
然后構造一個Channel_info,Channel_info_local_socket(localhost登錄)或Channel_info_tcpip_socket(ip登錄),并作為listen_for_connection_event()的返回值,所有與連接的相關的信息都保存在Channel_info中。

處理新連接(process_new_connection)
// sql/conn_handler/connection_handler_manager.cc
void Connection_handler_manager::process_new_connection(
    Channel_info *channel_info) {
  if (connection_events_loop_aborted() ||
      !check_and_incr_conn_count(channel_info->is_admin_connection())) {
    channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
    sql_print_warning("%s", ER_DEFAULT(ER_CON_COUNT_ERROR));
    delete channel_info;
    return;
  }

  if (m_connection_handler->add_connection(channel_info)) {
    inc_aborted_connects();
    delete channel_info;
  }
}

1.首先判斷是否超過連接上限
其代碼邏輯如下:

  • 首先檢查服務是否已經停止,然后check_and_incr_conn_count增加連接計數,如果超過連接上限,那么將拒絕連接,并關閉連接;
  • 將連接傳入Connection_handler::add_connection()中

實際上mysql會運行max_connections + 1個連接,當連接數滿時,為super用戶保留最后一個連接

2.執行add_connection()
最終add_connetion()的執行mysql中配置的線程處理方式決定,有三種:

  • one-thread-per-connection: 每個連接一個線程,線程關閉后線程會等待,直到下次新的連接后復用該線程。
  • no-threads:在一個工作線程中處理的所有連接。
  • pool-of-threads: 線程池方式
one-thread-per-connection方式

先查看是否有空閑線程,如果有則直接用空閑線程處理;否則將創建一個新的線程來處理新連接。

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  /*
    There are no idle threads avaliable to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
  ...
  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

check_idle_thread_and_enqueue_connection 做的事情就是檢查當前是否有空閑狀態的線程;有的話,則將channel_info加入到隊列中,并向空閑線程發送信號;否則創建新線程,并執行handle_connection函數。

handle_connection函數的邏輯如下:

  • 首先是初始化線程需要的內存;
  • 然后創建一個THD對象init_new_thd;
  • 創建/或重用psi對象,并加到thd對象;
  • 將thd對象加到thd manager中;
  • 調用thd_prepare_connection做驗證;
  • do_command(thd)處理query
  • end_connection,關閉一個已經建立的連接,
  • close_connection,disconnect關閉當前會話的vio;
  • 釋放資源thd中的資源
  • 阻塞等待下一個連接,block_until_new_connection(),監聽接收前面提到的check_idle_thread_and_enqueue_connection發過來的信號

線程池

線程池的總覽

主要數據結構

connection_t

連接結構體,一個連接/事件對應一個connection_t,一個connection_t對應一個THD

struct connection_t {
  THD *thd;                       // THD
  thread_group_t *thread_group;   // 該連接所在的線程組
  connection_t *next_in_queue;    // 用于connection_queue_t
  connection_t **prev_in_queue;   // 用于connection_queue_t
  ulonglong abs_wait_timeout;     // 連接時間,用于超時關閉
  bool logged_in;                 // 是否登入
  bool bound_to_poll_descriptor;  // 是否獲取到poolfd
  bool waiting;                   // 是否在等待
  uint tickets;                   // threadpool_high_prio_mode=TP_HIGH_PRIO_MODE_TRANSACTIONS下,在高優先級隊列中的票數
};

thread_group_t

線程組結構體,在該線程池的框架中,最多有128(MAX_THREAD_GROUPS)個線程組

struct alignas(128) thread_group_t {
  mysql_mutex_t mutex;
  connection_queue_t queue;           // 待處理連接隊列
  connection_queue_t high_prio_queue; // 高優先級待處理連接隊列
  worker_list_t waiting_threads;      // 等待線程隊列,線程每次執行完后會進入睡眠狀態等待下次喚醒工作
  worker_thread_t *listener;          // 監聽線程,負責監聽命令事件,事件隊列為空時也會充當工作線程
  pthread_attr_t *pthread_attr;       // 線程屬性
  int pollfd;                         // poll文件描述符
  int thread_count;                   // 線程數
  int active_thread_count;            // 活躍線程數
  int connection_count;               // 連接數
  int waiting_thread_count;           // 活躍連接數
  /* Stats for the deadlock detection timer routine.*/
  int io_event_count;                 // io事件數
  int queue_event_count;              // 隊列中已處理事件數量
  ulonglong last_thread_creation_time;// 上一個線程創建的時間
  int shutdown_pipe[2];               // 管道通信
  bool shutdown;                      // 關閉標志
  bool stalled;                       // 停滯標志
  char padding[328];                  // 填充
};

waiting_threads: 等待線程隊列,線程每次執行完后會將線程添加到等待列表的頭部并等待。重要的是要把線加在頭部而不是尾部,因為它確保了LIFO喚醒順序(熱緩存,工作不活動超時)

worker_thread_t

工作線程結構體,一個結構體對應一個worker

struct worker_thread_t {
  ulonglong event_count; /* number of request handled by this thread */
  thread_group_t *thread_group;   // 所在線程組
  worker_thread_t *next_in_list;  // 用于worker_list_t
  worker_thread_t **prev_in_list; // 用于worker_list_t
  mysql_cond_t cond;              // 用于喚醒線程的信號量
  bool woken;                     // 是否被喚醒
};

pool_timer_t

用于所有線程組的一個全局計時器

struct pool_timer_t {
  mysql_mutex_t mutex;                    // 鎖
  mysql_cond_t cond;                      // 信號量
  std::atomic<uint64> current_microtime;  // 當前時間,微秒
  std::atomic<uint64> next_timeout_check; // 下一次檢查的事件
  int tick_interval;                      // 節拍
  bool shutdown;                          // 關閉標志
};

線程池初始化

線程池的初始化在tp_init()中

//sql/threadpool_unix.cc
bool tp_init() {
  DBUG_ENTER("tp_init");
  threadpool_started = true;

  for (uint i = 0; i < array_elements(all_groups); i++) {
    thread_group_init(&all_groups[i], get_connection_attrib());
  }
  tp_set_threadpool_size(threadpool_size);
  ...

  pool_timer.tick_interval = threadpool_stall_limit;
  start_timer(&pool_timer);
  DBUG_RETURN(false);
}

在tp_init()主要初始化工作:

  1. 初始化所有(MAX_THREAD_GROUPS 128)的線程組, 初始化每個線程組的屬性和狀態等信息
  2. 初始化threadpool_size個線程組的pollfd
  3. 啟動start_timer定時器線程

處理新連接

將新連接加入線程池

下面是線程池從add_connetion開始,將新連接加入到線程池中的過程

> Thread_pool_connection_handler::add_connection(Channel_info *channel_info) {
  THD *const thd = channel_info->create_thd(); // 創建THD
  connection_t *const connection = alloc_connection(thd); // 初始化connection_t
  ...
  // 初始化thd的一些信息
  thd->set_new_thread_id();
  thd->start_utime = my_micro_time();
  thd->scheduler = &tp_event_functions;
  Global_THD_manager::get_instance()->add_thd(thd);
  thd->event_scheduler.data = connection;
  // 分配一個線程組
  thread_group_t *group = &all_groups[thd->thread_id() % group_count];
  // 將new connetion添加到工作隊列,實際登錄操作由worker完成
  queue_put(group, connection);
  > queue_put(thread_group_t *thread_group, connection_t *connection) {
    connection->tickets = connection->thd->variables.threadpool_high_prio_tickets; // 取票
    thread_group->queue.push_back(connection); // 加入工作隊列
    // 如果活躍線程為0則創建或喚醒一個工作線程
    wake_or_create_thread(thread_group, connection->thd->is_admin_connection());
    > wake_or_create_thread() {
      // 若喚醒成功則退出
      // 喚醒過程就是在waiting_threads中找第一個線程,然后給它發送信號量
      if (wake_thread(thread_group) == 0) DBUG_RETURN(0);
      // 1. 當前線程組線程數大于連接數,則不用創建,返回
      // 2. 如果活躍線程為0或為admin_connection連接,則創建直接工作線程
      // 3. 如果time_since_last_thread_created大于限制創建的時間則創建
      create_worker(thread_group, admin_connection) {
        // 創建工作線程 worker_main
       > err = mysql_thread_create(key_worker_thread, &thread_id,
                thread_group->pthread_attr, worker_main,
                thread_group);
      }
    }
  }
}

工作線程(worker_main)

worker_main主要工作內容:

  1. get_event: 阻塞的從當前線程組中獲取一個的待處理事件(connection with pending event)
  2. handle_event:處理該連接
worker_main()
static void *worker_main(void *param) {
  /* Init per-thread structure */
  worker_thread_t this_thread;
  mysql_cond_init(key_worker_cond, &this_thread.cond);
  this_thread.thread_group = thread_group;
  this_thread.event_count = 0;
  /* Run event loop */
  for (;;) {
    connection_t *connection;
    struct timespec ts;
    set_timespec(&ts, threadpool_idle_timeout);
    connection = get_event(&this_thread, thread_group, &ts);
    if (!connection) break; // 如果超時已過期或關閉,則返回NULL,即退出線程
    this_thread.event_count++;
    handle_event(connection);
  }

  /* Thread shutdown: cleanup per-worker-thread structure. */
  ...
  my_thread_end();
  return nullptr;
}

工作線程的退出的情況(get_event返回空):

  • 線程組被關閉
  • 檢測到too_many_busy_threads,與參數thread_pool_oversubscribe有關(限制其小于1 + thread_pool_oversubscribe)
  • 睡眠時mysql_cond_timedwait超時或出錯
  • 睡眠時mysql_cond_wait出錯
get_event():從隊列或socket拿取一個事件
  1. oversubscribed表示是否系統CUP負載太高,已防止同時運行太多線程;與參數threadpool_oversubscribe有關
  2. 高優先級事件:與參數thread_pool_high_prio_mode有關
  3. 工作線程的睡眠狀態:在get_event()中,停在current_thread->cond信號上(mysql_cond_timedwait/mysql_cond_wait)

get_event()主要流程概述:

  1. 非oversubscribed時
    • 從隊列中取一個連接/事件并返回,沒取到則繼續
    • 檢查是否有監聽線程,沒有則變成監聽線程;有則繼續
    • 從連接線程組socket中取一個連接或事件
      • 沒取到連接,進入睡眠狀態
      • 拿到連接,如果是高優先級事件則直接返回
      • 否則判斷是否有太多忙線程,忙將事件放入隊列并進入睡眠狀態;否則返回連接
  1. oversubscribed時
    • 檢查是否有監聽線程,沒有則變成監聽線程;有則繼續
    • 進入睡眠狀態
listener():監聽線程

監聽線程主要工作:

  • (優先)隊列中有事件時,常駐;負責獲取socket中的事件并將它們分發給工作線程(推入隊列中)
  • (優先)隊列中無事件時,即空閑時:講N-1個事件推入隊列,返回第一個事件,最終get_event()會返回并由handle_event()處理
handle_event():實際處理連接

handle_event()主要工作:

  1. 未登錄狀態
    • 進行threadpool_add_connection,進行一些初始化、登錄等工作(thd_prepare_connection)
    • 處理的是從add_conntion被放入隊列中的新連接
  2. 已登錄狀態
    • 進行threadpool_process_request,最后進入do_command進行處理事務
    • 處理的是get_event或listener(隊列為空時)從socket獲取的事件
static void handle_event(connection_t *connection) {
  ...
  if (!connection->logged_in) {
    err = threadpool_add_connection(connection->thd);
    connection->logged_in = true;
  } else {
    err = threadpool_process_request(connection->thd);
  }
  ...
  set_wait_timeout(connection);
}

對于threadpool_process_request函數:
函數中的循環與thread-per-connections的邏輯類似,但是這個函數的目標是執行單個QUERY,所以正常情況下,下面的循環只會執行一次
對于SSL的連接,可能會執行多次,由vio->has_data()判斷數據是否結束

/**
 Process a single client request or a single batch.
*/
int threadpool_process_request(THD *thd) {
  thread_attach(thd);
  ...
  for (;;) {
    Vio *vio;
    thd_set_net_read_write(thd, 0);

    if ((retval = do_command(thd)) != 0) goto end;

    if (!thd_connection_alive(thd)) {
      retval = 1;
      goto end;
    }

    vio = thd->get_protocol_classic()->get_vio();
    if (!vio->has_data(vio)) {
      /* More info on this debug sync is in sql_parse.cc*/
      DEBUG_SYNC(thd, "before_do_command_net_read");
      thd_set_net_read_write(thd, 1);
      goto end;
    }
    if (!thd->m_server_idle) {
      MYSQL_SOCKET_SET_STATE(vio->mysql_socket, PSI_SOCKET_STATE_IDLE);
      MYSQL_START_IDLE_WAIT(thd->m_idle_psi, &thd->m_idle_state);
      thd->m_server_idle = true;
    }
  }

end:
  ...
  return retval;
}

計時器線程(timer_thread)

計時器線程主要工作:

  1. 檢查每個線程組是否停滯(stall)
  2. 檢查每個連接(THD)是否超時,超時則kill掉
static void *timer_thread(void *param) noexcept {
  ...
  for (;;) {
    ...
    if (err == ETIMEDOUT) {
      timer->current_microtime.store(my_microsecond_getsystime(),
                                     std::memory_order_relaxed);

      /* Check stalls in thread groups */
      for (size_t i = 0; i < array_elements(all_groups); i++) {
        if (all_groups[i].connection_count) check_stall(&all_groups[i]);
      }

      /* Check if any client exceeded wait_timeout */
      if (timer->next_timeout_check.load(std::memory_order_relaxed) <=
          timer->current_microtime.load(std::memory_order_relaxed))
        timeout_check(timer);
    }
  }
  ...
}

check_stall流程:

  1. 檢測listener是否存在
    • 如果不存在,檢查是否由io事件被彈出
    • 如果沒有,表明listener被阻塞,創建一個工作線程(而后它會變成listener)并退出
  2. 重置io_event_count
  3. 判斷是否線程組是否stall:
    • thread_group->queue_event_count表示從隊列中被處理事件的數量,每次會被重置
    • 如果自從上一次check_stall到現在仍然為0且隊列和優先隊列不為空,則確定stall
    • 如果確定stall了,則設置thread_group->stalled,喚醒,或創建新工作線程
    • 當一個事件從隊列中出去,則表明恢復正常,重置thread_group->stalled
static void check_stall(thread_group_t *thread_group) {
  ...
  // 如果listener不存在,則創建一個工作線程(而后它會變成listener)
  if (!thread_group->listener && !thread_group->io_event_count) {
    wake_or_create_thread(thread_group);
    mysql_mutex_unlock(&thread_group->mutex);
    return;
  }
  // 重置io_event_count
  thread_group->io_event_count = 0;
  // queue_event_count為0且隊列和優先隊列不為空,確定stall
  if (!thread_group->queue_event_count && !queues_are_empty(*thread_group)) {
    thread_group->stalled = true;
    wake_or_create_thread(thread_group);
  }
  // check完后重置queue_event_count
  thread_group->queue_event_count = 0;

  mysql_mutex_unlock(&thread_group->mutex);
}

線程池退出

調用tp_end(), 回收線程組資源,停止定時器線程,最終會等待所有線程結束后才會退出

線程池相關參數

系統變量

變量

說明

thread_handling

pool-of-threads開啟線程池模式

threadpool_idle_timeout

用于限制空閑線程退出前應該等待的時間

threadpool_high_prio_mode

用于對全局或每個連接的高優先級調度提供更細粒度的控制

threadpool_high_prio_tickets

控制高優先級隊列策略,為每個新連接分配的票數,以進入高優先級隊列。將此變量設置為0將禁用高優先級隊列

threadpool_max_threads

用于限制線程池中的最大線程數

threadpool_oversubscribe

該參數的值越高,可以同時運行的線程就越多,如果該值低于3,則可能導致更多的睡眠和喚醒

threadpool_size

用于定義同時可以使用CPU的線程數

threadpool_stall_limit

認定為stall的時間,當達到此限制時,線程池將喚醒或創建另一個線程

狀態變量

變量

說明

Threadpool_idle_threads

顯示線程池中空閑線程的數量

Threadpool_threads

顯示線程池中的線程數。

參考文檔:

MySQL源碼閱讀2-連接與線程管理
線程池移植原理文檔
percona線程池文檔

文章來自個人專欄
文章 | 訂閱
0條評論
0 / 1000
請輸入你的評論
1
0