要了解服務器的工作原理首先需要了解 TCP 協議的工作原理。TCP 是一種面向連接的、可靠的、基于字節流的傳輸層全雙工通信協議,它有 4 個特點:面向連接、可靠、流式、全雙工。下面詳細講解這些特性。
TCP 中的連接是一個虛擬的連接,本質上是主機在內存里記錄了對端的信息,我們可以將連接理解為一個通信的憑證。如下圖所示。
那么如何建立連接呢?TCP 的連接是通過三次握手建立的。
服務器首先需要監聽一個端口。
客戶端主動往服務器監聽的端口發起一個 syn 包(第一次握手)。
當服務器所在操作系統收到一個 syn 包時,會先根據 syn 包里的目的 IP 和端口找到對應的監聽 socket,如果找不到則回復 rst 包,如果找到則發送 ack 給客戶端(第二次握手),接著新建一個通信 socket 并插入到監聽 socket 的連接中隊列(具體的細節會隨著不同版本的操作系統而變化。比如連接中隊列和連接完成隊列是一條隊列還是兩條隊列,再比如是否使用了 syn cookie 技術來防止 syn flood 攻擊,如果使用了,收到 syn 包的時候就不會創建 socket,而是收到第三次握手的包時再創建)。
客戶端收到服務器的 ack 后,再次發送 ack 給服務器,客戶端就完成三次握手進入連接建立狀態了。
當服務器所在操作系統收到客戶端的 ack 時(第三次握手),處于連接中隊列的 socket 就會被移到連接完成隊列中。
當操作系統完成了一個 TCP 連接,操作系統就會通知相應的進程,進程從連接完成隊列中摘下一個已完成連接的 socket 結點,然后生成一個新的 fd,后續就可以在該 fd 上和對端通信。具體的流程如下圖所示。
完成三次握手后,客戶端和服務器就可以進行數據通信了。操作系統收到數據包和收到 syn 包的流程不一樣,操作系統會根據報文中的 IP 和端口找到處理該報文的通信 socket(而不是監聽 socket),然后把數據包(操作系統實現中是一個 skb 結構體)掛到該通信 socket 的數據隊列中。
當應用層調用 read 讀取該 socket 的數據時,操作系統會根據應用層所需大小,從一個或多個 skb 中返回對應的字節數。同樣,寫也是類似的流程,當應用層往 socket 寫入數據時,操作系統不一定會立刻發送出去,而是會保存到寫緩沖區中,然后根據復雜的 TCP 算法發送。
當兩端完成通信后需要關閉連接,否則會浪費內存。TCP 通過四次揮手實現連接的斷開,第一次揮手可以由任意一端發起。前面講過 TCP 是全雙工的,所以除了通過四次揮手完成整個 TCP 連接的斷開外,也可以實現半斷開,比如客戶端關閉寫端表示不會再發送數據,但是仍然可以讀取來自對端發送端數據。四次揮手的流程如下。
TCP 發送數據時會先緩存一份到已發送待確認隊列中,并啟動一個超時重傳計時器,如果一定時間內沒有收到對端到確認 ack,則觸發重傳機制,直到收到 ack 或者重傳次數達到閾值才會結束流程。
建立連接后,應用層就可以調用發送接口源源不斷地發送數據。通常情況下,并不是每次調用發送接口,操作系統就直接把數據發送出去,這些數據的發送是由操作系統按照一定的算法去發送的。對操作系統來說,它看到的是字節流,它會按照 TCP 算法打包出一個個包發送到對端,所以當對端收到數據后,需要處理好數據邊界的問題。
從上圖中可以看到,假設應用層發送了兩個 HTTP 請求,操作系統在打包數據發送時可能的場景是第一個包里包括了 HTTP 請求 1 的全部數據和部分請求 2 的數據,所以當對端收到數據并進行解析時,就需要根據 HTTP 協議準確地解析出第一個 HTTP 請求對應的數據。
因為 TCP 的流式協議,所以基于 TCP 的應用層通常需要定義一個應用層協議,然后按照應用層協議實現對應的解析器,這樣才能完成有效的數據通信,比如常用的 HTTP 協議。對比來說 UDP 是面向數據包的協議,當應用層把數據傳遞給 UDP 時,操作系統會直接打包發送出去(如果數據字節大小超過閾值則會報錯)。
剛才提到 TCP 是全雙工的,全雙工就是通信的兩端都有一個發送隊列和接收隊列,可以同時發送和接收,互不影響。另外也可以選擇關閉讀端或者寫端。
介紹了 TCP 協議的概念后,接著看看如何創建一個 TCP 服務器(偽代碼)。
// 創建一個 socket,拿到一個文件描述符int server_fd = socket();// 綁定地址(IP + 端口)到該 socket 中bind(server_fd, addressInfo);// 修改 socket 為監聽狀態,這樣就可以接收 TCP 連接了listen(server_fd);
執行完以上步驟,一個服務器就啟動了。服務器啟動的時候會監聽一個端口,如果有連接到來,我們可以通過 accept 系統調用拿到這個新連接對應的 socket,那這個 socket 和監聽的 socket 是不是同一個呢?
其實 socket 分為監聽型和通信型。表面上服務器用一個端口實現了多個連接,但是這個端口是用于監聽的,底層用于和客戶端通信的其實是另一個 socket。每當一個連接到來的時候,操作系統會根據請求包的目的地址信息找到對應的監聽 socket,如果找不到就會回復 RST 包,如果找到就會生成一個新的 socket 與之通信(accept 的時候返回的那個)。監聽 socket 里保存了監聽的 IP 和端口,通信 socket 首先從監聽 socket 中復制 IP 和端口,然后把客戶端的 IP 和端口也記錄下來。這樣一來,下次再收到一個數據包,操作系統就會根據四元組從 socket 池子里找到該 socket,完成數據的處理。因此理論上,一個服務器能接受多少連接取決于服務器的硬件配置,比如內存大小。
接下來分析各種處理連接的方式。
串行模式就是服務器逐個處理連接,處理完前面的連接后才能繼續處理后面的連接,邏輯如下。
while(1) { int client_fd = accept(server_fd); read(client_fd); write(client_fd);}
上面的處理方式是最樸素的模型,如果沒有連接,則服務器處于阻塞狀態,如果有連接服務器就會不斷地調用 accept 摘下完成三次握手的連接并處理。假設此時有 n 個請求到來,進程會從 accept 中被喚醒,然后拿到一個新的 socket 用于通信,結構圖如下。
這種處理模式下,如果處理的過程中調用了阻塞 API,比如文件 IO,就會影響后面請求的處理,可想而知,效率是非常低的,而且,并發量比較大的時候,監聽 socket 對應的隊列很快就會被占滿(已完成連接隊列有一個最大長度),導致后面的連接無法完成。這是最簡單的模式,雖然服務器的設計中肯定不會使用這種模式,但是它讓我們了解了一個服務器處理請求的整體過程。
串行模式中,所有請求都在一個進程中排隊被處理,效率非常低下。為了提高效率,我們可以把請求分給多個進程處理。因為在串行處理的模式中,如果有文件 IO 操作就會阻塞進程,繼而阻塞后續請求的處理。在多進程的模式中,即使一個請求阻塞了進程,操作系統還可以調度其它進程繼續執行新的任務。多進程模式分為幾種。
按需 fork 模式是主進程監聽端口,有連接到來時,主進程執行 accept 摘取連接,然后通過 fork 創建子進程處理連接,邏輯如下。
while(1) { int client_fd = accept(socket); // 忽略出錯處理 if (fork() > 0) { continue; // 父進程負責 accept } else { // 子進程負責處理連接 handle(client_fd); exit(); } }
這種模式下,每次來一個請求,就會新建一個進程去處理,比串行模式稍微好了一點,每個請求都被獨立處理。假設 a 請求阻塞在文件 IO,不會影響 b 請求的處理,盡可能地做到了并發。它的缺點是
1. 進程數有限,如果有大量的請求,需要排隊處理。
2. 進程的開銷會很大,對于系統來說是一個負擔。
3. 創建進程需要時間,實時創建會增加處理請求的時間。
pre-fork 模式就是服務器啟動的時候,預先創建一定數量的進程,但是這些進程是 worker 進程,不負責接收連接,只負責處理請求。處理過程為主進程負責接收連接,然后把接收到的連接交給 worker 進程處理,流程如下。
邏輯如下:
let fds = [[], [], [], …進程個數]; let process = []; for (let i = 0 ; i < 進程個數; i++) { // 創建管道用于傳遞文件描述符 socketpair(fds[i]); let pid; if (pid = fork() > 0) { // 父進程 process.push({pid, 其它字段}); } else { const index = i; // 子進程處理請求 while(1) { // 從管道中讀取文件描述符 var client_fd = read(fd[index][1]); // 處理請求 handle(client_fd); } } } // 主進程 acceptfor (;;) { const clientFd = accept(socket); // 找出處理該請求的子進程 const i = findProcess(); // 傳遞文件描述符 write(fds[i][0], clientFd); }
和 fork 模式相比,pre-fork 模式相對比較復雜,因為在前一種模式中,主進程收到一個請求就會實時 fork 一個子進程,這個子進程會繼承主進程中新請求對應的 fd,可以直接處理該 fd 對應的請求。但是在進程池的模式中,子進程是預先創建的,當主進程收到一個請求的時候,子進程中無法拿得到該請求對應的 fd 。這時候就需要主進程使用傳遞文件描述符的技術把這個請求對應的 fd 傳給子進程。
剛才介紹的模式中,是主進程接收連接,然后傳遞給子進程處理,這樣主進程就會成為系統的瓶頸,它可能來不及接收和分發請求給子進程,而子進程卻很空閑。子進程 accept 這種模式也是會預先創建多個進程,區別是多個子進程會調用 accept 共同處理請求,而不需要父進程參與,邏輯如下。
int server_fd = socket();bind(server_fd);for (let i = 0 ; i < 進程個數; i++) { if (fork() > 0) { // 父進程負責監控子進程 } else { // 子進程處理請求 listen(server_fd); while(1) { int client_fd = accept(socket); handle(client_fd); } } }
這種模式下多個子進程都阻塞在 accept,如果這時候有一個請求到來,那么所有的子進程都會被喚醒,但是先被調度的子進程會摘下這個請求節點,后續的進程被喚醒后可能會遇到已經沒有請求可以處理,而又進入睡眠,這種進程被無效喚醒的現象就是著名的驚群現象。這種模式的處理流程如下。
Nginx 中解決了驚群這個問題,它的處理方式是在 accpet 之前先加鎖,拿到鎖的進程才進行 accept,這樣就保證了只有一個進程會阻塞在 accept,不會引起驚群問題,但是新版操作系統已經在內核層面解決了這個問題,每次只會喚醒一個進程。
除了使用多進程外,也可以使用多線程技術處理連接,多線程模式和多進程模式類似,區別是在進程模式中,每個子進程都有自己的 task_struct,這就意味著在 fork 之后,每個進程負責維護自己的數據、資源。線程則不一樣,線程共享進程的數據和資源,所以連接可以在多個線程中共享,不需要通過文件描述符傳遞的方式進行處理,比如如下架構。
上圖中,主線程負責 accept 請求,然后通過互斥的方式插入一個任務到共享隊列中,線程池中的子線程同樣是通過互斥的方式,從共享隊列中摘取節點進行處理。
從之前的處理模式中我們知道,為了應對大量的請求,服務器通常需要大量的進程 / 線程,這是個非常大的開銷。現在很多服務器(Nginx、Nodejs、Redis)都開始使用單進程 + 事件驅動模式去設計,這種模式可以在單個進程中輕松處理成千上萬的請求。
但也正因為單進程模式下,再多的請求也只在一個進程里處理,這樣一個任務會一直在占據 CPU,后續的任務就無法執行了。因此,事件驅動模式不適合 CPU 密集型的場景,更適合 IO 密集的場景(一般都會提供線程 / 線程池,負責處理 CPU 或者阻塞型的任務)。大部分操作系統都提供了事件驅動的 API,但是事件驅動在不同系統中實現不一樣,所以一般都會有一層抽象層抹平這個差異。這里以 Linux 的 epoll 為例子。
// 創建一個 epoll 實例int epoll_fd = epoll_create(); /* 在 epoll 給某個文件描述符注冊感興趣的事件,這里是監聽的 socket,注冊可讀事件,即連接到來 event = { event: 可讀 fd:監聽 socket // 一些上下文 } */ epoll_ctl(epoll_fd , EPOLL_CTL_ADD , socket, event); while(1) { // 阻塞等待事件就緒,events 保存就緒事件的信息,total 是個數 int total= epoll_wait(epoll_fd , 保存就緒事件的結構events, 事件個數, timeout); for (let i = 0; i < total; i++) { if (events[fd] === 監聽 socket) { int client_fd = accpet(socket); // 把新的 socket 也注冊到 epoll,等待可讀,即可讀取客戶端數據 epoll_ctl(epoll_fd , EPOLL_CTL_ADD , client_fd, event); } else { // 從events[i] 中拿到一些上下文,執行相應的回調 } } }
事件驅動模式的處理流程為服務器注冊文件描述符和事件到 epoll 中,然后 epoll 開始阻塞,當有事件觸發時 epoll 就會返回哪些 fd 的哪些事件觸發了,接著服務器遍歷就緒事件并執行對應的回調,在回調里可以再次注冊 / 刪除事件,就這樣不斷驅動著進程的運行。epoll 的原理其實也類似事件驅動,它底層維護用戶注冊的事件和文件描述符,本身也會在文件描述符對應的文件 / socket / 管道處注冊一個回調,等被通知有事件發生的時候,就會把 fd 和事件返回給用戶,大致原理如下。
function epoll_wait() { for 事件個數 // 調用文件系統的函數判斷 if (事件 [i] 中對應的文件描述符中有某個用戶感興趣的事件發生 ?) { 插入就緒事件隊列 } else { /* 在事件 [i] 中的文件描述符所對應的文件 / socke / 管道等資源中注冊回調。 感興趣的事件觸發后回調 epoll,回調 epoll 后,epoll 把該 event[i] 插入 就緒事件隊列返回給用戶 */ } }
新版 Linux 支持 SO_REUSEPORT 特性后,使得服務器性能有了很大的提升。SO_REUSEPORT 之前,一個 socket 是無法綁定到同一個地址的,通常的做法是主進程接收連接然后傳遞給子進程處理,或者主進程 bind 后 fork 子進程,然后子進程執行 listen,但底層是共享同一個 socket,所以連接到來時所有子進程都會被喚醒,但是只有一個連接可以處理這個請求,其他的進程被無效喚醒。SO_REUSEPORT 特性支持多個子進程對應多個監聽 socket,多個 socket 綁定到同一個地址,當連接到來時,操作系統會根據地址信息找到一組 socket,然后根據策略選擇一個 socket 并喚醒阻塞在該 socket 的進程,被 socket 喚醒的進程處理自己的監聽 socket 下的連接就行,架構如下。
除了前面介紹的模式外,還有基于協程的模式,服務器技術繁多,就不一一介紹了。
IO 模型是服務器中非常重要的部分,操作系統通常會提供了多種 IO 模型,常見的如下。
當線程執行一個 IO 操作時,如果不滿足條件,當前線程會被阻塞,然后操作系統會調度其他線程執行。
非阻塞 IO 在不滿足條件的情況下直接返回一個錯誤碼給線程,而不是阻塞線程。
那么這個阻塞是什么意思呢?直接看一段操作系統的代碼。
// 沒有空間可寫了while(!(space = UN_BUF_SPACE(pupd))) { // 非阻塞模式,直接返回錯誤碼 if (nonblock) return(-EAGAIN); // 阻塞模式,進入阻塞狀態 interruptible_sleep_on(sock->wait);}void interruptible_sleep_on(struct wait_queue **p){ // 修改線程狀態為阻塞狀態 __sleep_on(p,TASK_INTERRUPTIBLE);}static inline void __sleep_on(struct wait_queue **p, int state){ unsigned long flags; // current 代表當前執行的線程 struct wait_queue wait = { current, NULL }; // 修改線程狀態為阻塞狀態 current->state = state; // 當前線程加入到資源的阻塞隊列,資源就緒后喚醒線程 add_wait_queue(p, &wait); // 重新調度其他線程執行,即從就緒的線程中選擇一個來執行 schedule();}
通過這段代碼,我們就可以非常明確地了解到阻塞和非阻塞到底是指什么。
在阻塞式 IO 中,我們需要通過阻塞進程來感知 IO 是否就緒,在非阻塞式 IO 中,我們需要通過輪詢來感知 IO 是否就緒,這些都不是合適的方式。為了更好感知 IO 是否就緒,操作系統實現了訂閱發布機制,我們只需要注冊感興趣的 fd 和事件,當事件發生時我們就可以感知到。多路復用 IO 可以同時訂閱多個 fd 的多個事件,是現在高性能服務器的基石。看一個例子。
#include <sys/event.h>#include <fcntl.h>#include <stdio.h>int main(int argc, char **argv){ // 用于注冊事件到 kqueue struct kevent event; // 用于接收從 kqueue 返回到事件,表示哪個 fd 觸發了哪些事件 struct kevent emit_event; int kqueue_fd, file_fd, result; // 打開需要監控的文件,拿到一個 fd file_fd = open(argv[1], O_RDONLY); if (file_fd == -1) { printf("Fail to open %s", argv[1]); return 1; } // 創建 kqueue 實例 kqueue_fd = kqueue(); // 設置需要監聽的事件,文件被寫入時觸發 EV_SET(&event,file_fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_RENAME, 0, NULL); // 注冊到操作系統 result = kevent(kqueue_fd, &event, 1, NULL, 0, NULL); // 不斷阻塞等待,直到文件被寫入 while(1) { // result 返回觸發事件的 fd 個數,這里是一個 result = kevent(kqueue_fd, NULL, 0, &emit_event, 1, NULL); if (result > 0) { printf("%s have been renamed/n", argv[1]); } }}
前面介紹的幾種 IO 模型中,當 IO 就緒時需要自己執行讀寫操作,而異步 IO 是 IO 就緒時,操作系統幫助線程完成 IO 操作,然后再通知線程操作完成了。下面以 io_uring(Linux 中的異步 IO 框架) 為例了解下具體的情況。
uv_loop_t* loop;napi_get_uv_event_loop(env, &loop);struct io_uring_info *io_uring_data = (io_uring_info *)loop->data;// 申請內存struct request *req = (struct request *)malloc(sizeof(*req) + (sizeof(struct iovec) * 1));req->fd = fd;req->offset = offset;// 保存回調napi_create_reference(env, args[2], 1, &req->func);req->env = env;req->nvecs = 1;// 記錄buffer大小req->iovecs[0].iov_len = bufferLength;// 記錄內存地址req->iovecs[0].iov_base = bufferData;// 提交給操作系統,操作系統讀完后通知線程,op 為 IORING_OP_READV 表示讀操作submit_request(op, req, &io_uring_data->ring);
上面的代碼就是我們提交了一個讀請求給操作系統,然后操作系統在文件可讀并且讀完成后通知我們。
Libuv 雖然寫著是異步 IO 庫,但是它并不是真正的異步 IO。它的意思是,你提交一個 IO 請求時,可以注冊一個回調,然后就可以去做其他事情了,等操作完成后它會通知你,它的底層實現是線程池 + 多路復用 IO。
Node.js 服務器的底層是 IO 多路復用 + 非阻塞 IO,所以可以輕松處理成千上萬的請求,但是因為 Node.js 是單線程的,所以更適合處理 IO 密集型的任務。下面看看 Node.js 中服務器是如何實現的。
在 Node.js 中,我們通常使用以下方式創建一個服務器。
// 創建一個 TCP Serverconst server = net.createServer((socket) => { // 處理連接});// 監聽端口,啟動服務器server.listen(8888);
使用 net.createServer 可以創建一個服務器,然后拿到一個 Server 對象,接著調用 Server 對象的 listen 函數就可以啟動一個 TCP 服務器了。下面來看一下具體的實現。
function createServer(options, connectionListener) { return new Server(options, connectionListener); } function Server(options, connectionListener) { EventEmitter.call(this); // 服務器收到的連接數,可以通過 maxConnections 限制并發連接數 this._connections = 0; // C++ 層的對象,真正實現 TCP 功能的地方 this._handle = null; // 服務器下的連接是否允許半關閉 this.allowHalfOpen = options.allowHalfOpen || false; // 有連接時是否注冊可讀事件,如果該 socket 是交給其他進程處理的話可以設置為 true this.pauseOnConnect = !!options.pauseOnConnect; }
createServer 返回的是一個一般的 JS 對象,繼續看一下 listen 函數的邏輯,listen 函數邏輯很繁瑣,但是原理大致是一樣的,所以只講解常用的情況。
Server.prototype.listen = function(...args) { /* 處理入參,listen 可以接收很多種格式的參數, 假設我們這里只傳了 8888 端口號 */ const normalized = normalizeArgs(args); // normalized = [{port: 8888}, null]; const options = normalized[0]; // 監聽成功后的回調 const cb = normalized[1]; // listen 成功后執行的回調 if (cb !== null) { this.once('listening', cb); } listenIncluster(this, null, options.port | 0, 4, ...); return this; };
listen 處理了入參后,接著調用了 listenIncluster。
function listenIncluster(server, address, port, addressType, backlog, fd, exclusive) { exclusive = !!exclusive; if (cluster === null) cluster = require('cluster'); if (cluster.isMaster || exclusive) { server._listen2(address, port, addressType, backlog, fd); return; } }
這里只分析在主進程創建服務器的情況,listenIncluster 中執行了 _listen2,_listen2 對應的函數是 setupListenHandle。
function setupListenHandle(address, port, addressType, backlog, fd) { // 通過 C++ 層導出的 API 創建一個對象,該對象關聯了 C++ 層的 TCPWrap 對象 this._handle = new TCP(TCPConstants.SERVER); // 創建 socket 并綁定地址到 socket 中 this._handle.bind(address, port); // 有完成三次握手的連接時執行的回調 this._handle.onconnection = onconnection; // 互相關聯 this._handle.owner = this; // 執行 C++ 層 listen this._handle.listen(backlog || 511); // 觸發 listen 回調 nextTick(this[async_id_symbol], emitListeningNT, this); }
setupListenHandle 的邏輯如下。
1. 調用 new TCP 創建一個 handle(new TCP 對象關聯了 C++ 層的 TCPWrap 對象)。
2. 保存處理連接的函數 onconnection,當有連接時被執行。
3. 調用了 bind 綁定地址到 socket。
4. 調用 listen 函數修改 socket 狀態為監聽狀態。首先看看 new TCP 做了什么。
void TCPWrap::New(const FunctionCallbackInfo<Value>& args) { new TCPWrap(env, args.This(), ...);}TCPWrap::TCPWrap(Environment* env, Local<Object> object, ProviderType provider) : ConnectionWrap(env, object, provider) { // 初始化一個 tcp handle int r = uv_tcp_init(env->event_loop(), &handle_);}
new TCP 本質上是創建一個 TCP 層的 TCPWrap 對象,并初始化了 Libuv 的數據結構 uv_tcp_t(TCPWrap 是對 Libuv uv_tcp_t 的封裝)。接著看 bind。
template <typename T>void TCPWrap::Bind(...) { // 通過 JS 對象拿到關聯的 C++ TCPWrap 對象 TCPWrap* wrap; ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder(), args.GetReturnValue().Set(UV_EBADF)); // 通過 JS 傳入的地址信息直接調用 Libuv uv_tcp_bind(&wrap->handle_, reinterpret_cast<const sockaddr*>(&addr), flags);}
Bind 函數的邏輯很簡單,直接調用了 Libuv 函數。
int uv_tcp_bind(...) { return uv__tcp_bind(handle, addr, addrlen, flags);}int uv__tcp_bind(uv_tcp_t* tcp, const struct sockaddr* addr, unsigned int addrlen, unsigned int flags) { // 創建一個 socket,并把返回的 fd 保存到 tcp 結構體中 maybe_new_socket(tcp, addr->sa_family, 0); on = 1; // 默認設置了 SO_REUSEADDR 屬性,后面具體分析 setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); // 綁定地址信息到 socket bind(tcp->io_watcher.fd, addr, addrlen); return 0;}
uv__tcp_bind 創建了一個 TCP socket 然后把地址信息保存到該 socket 中,執行 bind 綁定了地址信息后就繼續調用 listen 把 socket 變成監聽狀態,C++ 層代碼和 Bind 的差不多,就不再分析,直接看 Libuv 的代碼。
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);}int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { static int single_accept = -1; unsigned long flags; int err; // 已廢棄 if (single_accept == -1) { const char* val = getenv("UV_TCP_SINGLE_ACCEPT"); single_accept = (val != NULL && atoi(val) != 0); } // 有連接時是否連續接收,或者間歇性處理,見后面分析 if (single_accept) tcp->flags |= UV_HANDLE_TCP_SINGLE_ACCEPT; flags = 0; // 設置 flags 到 handle 上,因為已經創建了 socket maybe_new_socket(tcp, AF_INET, flags); listen(tcp->io_watcher.fd, backlog) // 保存回調,有連接到來時被 Libuv 執行 tcp->connection_cb = cb; tcp->flags |= UV_HANDLE_BOUND; // 有連接來時的處理函數,該函數再執行上面的 connection_cb tcp->io_watcher.cb = uv__server_io; // 注冊可讀事件,等待連接到來 uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN); return 0;}
uv_tcp_listen 首先調用了 listen 函數修改 socket 狀態為監聽狀態,這樣才能接收 TCP 連接,接著保存了 C++ 層的回調,并設置 Libuv 層的回調,最后注冊可讀事件等待 TCP 連接的到來。這里需要注意兩個回調函數的執行順序,當有 TCP 連接到來時 Libuv 會執行 uvserver_io,在 uvserver_io 里再執行 C++ 層的回調 cb。至此,服務器就啟動了。其中 uv__io_start 最終會把服務器對應的文件描述符注冊到 IO多路 復用模塊中。
當有三次握手的連接完成時,操作系統會新建一個通信的 socket,并通知 Libuv,Libuv 會執行 uv__server_io。
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv_stream_t* stream; int err; stream = container_of(w, uv_stream_t, io_watcher); uv__io_start(stream->loop, &stream->io_watcher, POLLIN); // 回調了可能關閉了 server,所以需要實時判斷 while (uv__stream_fd(stream) != -1) { // 摘取一個 TCP 連接,成功的話,err 保存了對應的 fd err = uv__accept(uv__stream_fd(stream)); // 保存 fd 在 accepted_fd,等待處理 stream->accepted_fd = err; // 執行回調 stream->connection_cb(stream, 0); // 如果回調里沒有處理該 accepted_fd,則注銷可讀事件、先不處理新的連接 if (stream->accepted_fd != -1) { uv__io_stop(loop, &stream->io_watcher, POLLIN); return; } // 設置了 UV_HANDLE_TCP_SINGLE_ACCEPT 則進入睡眠,讓其他進程有機會參與處理 if (stream->type == UV_TCP && (stream->flags & UV_HANDLE_TCP_SINGLE_ACCEPT)) { struct timespec timeout = { 0, 1 }; nanosleep(&timeout, NULL); } }}
uvserver_io 中通過 uvaccept 從操作系統中摘取一個完成連接的 TCP socket 并拿到一個 fd ,接著保存到 accepted_fd 中并執行 connection_cb 回調。
此外,我們需要注意 UV_HANDLE_TCP_SINGLE_ACCEPT 標記。因為可能有多個進程監聽同一個端口,當多個連接到來時,多個進程可能會競爭處理這些連接(驚群問題)。這樣一來,首先被調度的進程可能會直接處理所有的連接,導致負載不均衡。通過 UV_HANDLE_TCP_SINGLE_ACCEPT 標記,可以在通知進程接收連接時,每接收到一個后先睡眠一段時間,讓其他進程也有機會接收連接,一定程度解決負載不均衡的問題,不過這個邏輯最近被去掉了,Libuv 維護者 bnoordhuis 的理由是,第二次調用 uvaccept 時有 99.9% 的概念會返回 EAGAIN,那就是沒有更多的連接可以處理,這樣額外調用 uvaccept 帶來的系統調用開銷是比較可觀的。
接著看 connection_cb,connection_cb 對應的是 C++ 層的 OnConnection。
// WrapType 為 TCPWrap,UVType 為 uv_tcp_ttemplate <typename WrapType, typename UVType> void ConnectionWrap<WrapType, UVType>::OnConnection(uv_stream_t* handle, int status) { // HandleWrap 中保存了 handle 和 TCPWrap 的關系,這里取出來使用 WrapType* wrap_data = static_cast<WrapType*>(handle->data); Environment* env = wrap_data->env(); Local<Value> argv[] = { Integer::New(env->isolate(), status), Undefined(env->isolate()) }; // 新建一個表示和客戶端通信的對象,和 JS 層執行 new TCP 一樣 Local<Object> client_obj = WrapType::Instantiate(env,wrap_data,WrapType::SOCKET); WrapType* wrap; // 從 client_obj 中取出關聯的 TCPWrap 對象存到 wrap 中 ASSIGN_OR_RETURN_UNWRAP(&wrap, client_obj); // 拿到 TCPWrap 中的 uv_tcp_t 結構體,再轉成 uv_stream_t,因為它們類似父類和子類的關系 uv_stream_t* client_handle = reinterpret_cast<uv_stream_t*>(&wrap->handle_); // 把通信 fd 存儲到 client_handle 中 uv_accept(handle, client_handle); argv[1] = client_obj; // 回調上層的 onconnection 函數 wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv); }
當建立了新連接時,操作系統會新建一個 socket。同樣,在 Node.js 層,也會通過 Instantiate 函數新建一個對應的對象表示和客戶端的通信。結構如下所示。
Instantiate 代碼如下所示。
MaybeLocal<Object> TCPWrap::Instantiate(Environment* env, AsyncWrap* parent, TCPWrap::SocketType type) { // 拿到導出到 JS 層的 TCP 構造函數(緩存在env中) Local<Function> constructor = env->tcp_constructor_template() ->GetFunction(env->context()) .ToLocalChecked(); Local<Value> type_value = Int32::New(env->isolate(), type); // 相當于我們在 JS 層調用 new TCP() 時拿到的對象 return handle_scope.EscapeMaybe( constructor->NewInstance(env->context(), 1, &type_value));}
新建完和對端通信的對象后,接著調用 uv_accept 消費剛才保存在 accepted_fd 中的 fd,并把對應的 fd 保存到 C++ TCPWrap 對象的 uv_tcp_t 結構體中。
int uv_accept(uv_stream_t* server, uv_stream_t* client) { int err; // 把 accepted_fd 保存到 client 中 uv__stream_open(client, server->accepted_fd, UV_HANDLE_READABLE | UV_HANDLE_WRITABLE); // 處理了,重置該字段 server->accepted_fd = -1; // 保證注冊了可讀事件,繼續處理新的連接 uv__io_start(server->loop, &server->io_watcher, POLLIN); return err;}
C++ 層拿到一個新的對象并且保存了 fd 到對象后,接著回調 JS 層的 onconnection。
// clientHandle 代表一個和客戶端建立 TCP 連接的實體 function onconnection(err, clientHandle) { const handle = this; const self = handle.owner; // 新建一個 socket 用于通信 const socket = new Socket({ handle: clientHandle, allowHalfOpen: self.allowHalfOpen, pauseOnCreate: self.pauseOnConnect }); // 服務器的連接數加一 self._connections++; // 觸發用戶層連接事件 self.emit('connection', socket); }
在 JS 層也會封裝一個 Socket 對象用于管理和客戶端的通信,整體的關系如下。
接著觸發 connection 事件,剩下的事情就是應用層處理了,整體流程如下。
接著看看 HTTP 服務器的實現。下面是 Node.js 中創建服務器的例子。
const http = require('http'); http.createServer((req, res) => { res.write('hello'); res.end(); }) .listen(3000);
我們沿著 createServer 開始分析。
function createServer(opts, requestListener) { return new Server(opts, requestListener); }
createServer 中創建了一個 Server 對象,來看看 Server 初始化的邏輯。
function Server(options, requestListener) { // 可以自定義表示請求的對象和響應的對象 this[kIncomingMessage] = options.IncomingMessage || IncomingMessage; this[kServerResponse] = options.ServerResponse || ServerResponse; // HTTP 頭最大字節數 this.maxHeaderSize = options.maxHeaderSize; // 允許半關閉 net.Server.call(this, { allowHalfOpen: true }); // 有請求時的回調 if (requestListener) { this.on('request', requestListener); } // 服務器 socket 讀端關閉時是否允許繼續處理隊列里的響應(TCP 上有多個請求,管道化) this.httpAllowHalfOpen = false; // 有連接時的回調,由 net 模塊觸發 this.on('connection', connectionListener); // 服務器下所有請求和響應的超時時間 this.timeout = 0; // 同一個 TCP 連接上,兩個請求之前最多間隔的時間 this.keepAliveTimeout = 5000; // HTTP 頭的最大個數 this.maxHeadersCount = null; // 解析頭部的最長時間,防止 ddos this.headersTimeout = 60 * 1000; }
Server 中主要做了一些字段的初始化,并且監聽了 connection 和 request 兩個事件,當有連接到來時會觸發 connection 事件,connection 事件的處理函數會調用 HTTP 解析器進行數據的解析,當解析出一個 HTTP 請求時就會觸發 request 事件通知用戶。
創建了 Server 對象后,接著我們調用它的 listen 函數。因為 HTTP Server 繼承于 net.Server,所以執行 HTTP Server 的 listen 函數時,其實是執行了 net.Serve 的 listen 函數,net.Server 的 listen 函數前面已經分析過,就不再分析。當有請求到來時,會觸發 connection 事件,從而執行 connectionListener。
function connectionListener(socket) { defaultTriggerAsyncIdScope( getOrSetAsyncId(socket), connectionListenerInternal, this, socket ); } // socket 表示新連接 function connectionListenerInternal(server, socket) { // socket 所屬 server socket.server = server; // 分配一個 HTTP 解析器 const parser = parsers.alloc(); // 初始化解析器 parser.initialize(HTTPParser.REQUEST, ...); // 關聯起來 parser.socket = socket; socket.parser = parser; const state = { onData: null, // 同一 TCP 連接上,請求和響應的的隊列,線頭阻塞的原理 outgoing: [], incoming: [], }; // 監聽 TCP 上的數據,開始解析 HTTP 報文 state.onData = socketOnData.bind(undefined, server, socket, parser, state); socket.on('data', state.onData); // 解析 HTTP 頭部完成后執行的回調 parser.onIncoming = parserOnIncoming.bind(undefined, server, socket, state); /* 如果 handle 是繼承 StreamBase 的流,則在 C++ 層解析 HTTP 請求報文, 否則使用上面的 socketOnData 函數處理 HTTP 請求報文, TCP 模塊的 isStreamBase 為 true */ if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) { parser._consumed = true; socket._handle._consumed = true; parser.consume(socket._handle); } // 執行 llhttp_execute 時的回調 parser[kOnExecute] = onParserExecute.bind(undefined, server, socket, parser, state); }
上面的 connectionListenerInternal 函數中首先分配了一個 HTTP 解析器,HTTP 解析器由以下代碼管理。
const parsers = new FreeList('parsers', 1000, function parsersCb() { const parser = new HTTPParser(); cleanParser(parser); parser.onIncoming = null; // 各種鉤子毀掉 parser[kOnHeaders] = parserOnHeaders; parser[kOnHeadersComplete] = parserOnHeadersComplete; parser[kOnBody] = parserOnBody; parser[kOnMessageComplete] = parserOnMessageComplete; return parser;});
parsers 用于管理 HTTP 解析器,它負責分配 HTTP 解析器,并且在 HTTP 解析器不再使用時緩存起來給下次使用,而不是每次都創建一個新的解析器。分配完 HTTP 解析器后就開始等待 TCP 上數據的到來,即 HTTP 請求報文。但是這里有一個邏輯需要注意,上面代碼中 Node.js 監聽了 socket 的 data 事件,處理函數為 socketOnData,下面是 socketOnData 的邏輯。
function socketOnData(server, socket, parser, state, d) { // 交給 HTTP 解析器處理,返回已經解析的字節數 const ret = parser.execute(d); }
socketOnData 調用 HTTP 解析器處理數據,這看起來沒什么問題,但是有一個邏輯我們可能會忽略掉,看一下下面的代碼。
if (socket._handle && socket._handle.isStreamBase) { parser.consume(socket._handle); }
上面代碼中,如果 socket._handle.isStreamBase 為 true(TCP handle 的 isStreamBase 為 true),則會執行 parser.consume(socket._handle),這個是做什么的呢?
static void Consume(const FunctionCallbackInfo<Value>& args) { Parser* parser; ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder()); // 解析出 C++ TCPWrap 對象 StreamBase* stream = StreamBase::FromObject(args[0].As<Object>()); // 注冊 parser 成為流的消費者,即 TCP 數據的消費者 stream->PushStreamListener(parser);}
Consume 會注冊 parser 會成為流的消費者,這個邏輯會覆蓋掉剛才的 onData 函數,使得所有的數據直接由 parser 處理,看一下當數據到來時,parser 是如何處理的。
void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override { // 解析 HTTP 協議 Local<Value> ret = Execute(buf.base, nread); // 執行 kOnExecute 回調 Local<Value> cb = object()->Get(env()->context(), kOnExecute).ToLocalChecked(); MakeCallback(cb.As<Function>(), 1, &ret); }
在 OnStreamRead 中會源源不斷地把數據交給 HTTP 解析器處理并執行 kOnExecute 回調,并且在解析的過程中,會不斷觸發對應的鉤子函數。比如解析到 HTTP 頭部時執行 parserOnHeaders。
function parserOnHeaders(headers, url) { // 記錄解析到的 HTTP 頭 if (this.maxHeaderPairs <= 0 || this._headers.length < this.maxHeaderPairs) { this._headers = this._headers.concat(headers); } this._url += url;}
parserOnHeaders 會記錄解析到的 HTTP 頭,當解析完 HTTP 頭 時會調用 parserOnHeadersComplete。
function parserOnHeadersComplete(versionMajor, versionMinor, headers, method, url, statusCode, statusMessage, upgrade, shouldKeepAlive) { const parser = this; const { socket } = parser; // 創建一個對象表示收到的 HTTP 請求 const ParserIncomingMessage = (socket && socket.server && socket.server[kIncomingMessage]) || IncomingMessage; // 新建一個IncomingMessage對象 const incoming = parser.incoming = new ParserIncomingMessage(socket); // 執行回調 return parser.onIncoming(incoming, shouldKeepAlive);}
parserOnHeadersComplete 中創建了一個對象來表示收到的 HTTP 請求,接著執行 onIncoming 函數,對應的是 parserOnIncoming。
function parserOnIncoming(server, socket, state, req, keepAlive) { // 請求入隊(待處理的請求隊列) state.incoming.push(req); // 新建一個表示響應的對象 const res = new server[kServerResponse](req); /* socket 當前已經在處理其它請求的響應,則先排隊, 否則掛載響應對象到 socket,作為當前處理的響應 */ if (socket._httpMessage) { state.outgoing.push(res); } else { res.assignSocket(socket); } // 響應處理完畢后,需要做一些處理 res.on('finish', resOnFinish.bind(undefined, req, res, socket, state, server)); // 觸發 request 事件說明有請求到來 server.emit('request', req, res); }
我們看到這里會觸發 request 事件通知用戶有新請求到來,并傳入request和response作為參數,這樣用戶就可以處理請求了。另外 Node.js 本身是不會處理 HTTP 請求體的數據,當 Node.js 解析到請求體時會執行 kOnBody 鉤子函數,對應的是 parserOnBody 函數。
function parserOnBody(b, start, len) { // IncomingMessage 對象,即 request 對象 const stream = this.incoming; // Pretend this was the result of a stream._read call. if (len > 0 && !stream._dumped) { const slice = b.slice(start, start + len); const ret = stream.push(slice); if (!ret) readStop(this.socket); }}
parserOnBody 會把數據 push 到請求對象 request 中,接著 Node.js 會觸發 data 事件,所以我們可以通過以下方式獲取 body 的數據。
const server= http.createServer((request, response) => { request.on('data', (chunk) => { // 處理body }); request.on('end', () => { // body結束 }); })
雖然 Node.js 是單進程單線程的應用,但是我們可以創建多個進程來共同請求。在創建 HTTP 服務器時會調用 net 模塊的 listen,然后調用 listenIncluster。我們從該函數開始分析。
function listenIncluster(server, address, port, addressType, backlog, fd, exclusive, flags) { const serverQuery = { address: address, port: port, addressType: addressType, fd: fd, flags, }; cluster._getServer(server, serverQuery, listenOnMasterHandle); function listenOnMasterHandle(err, handle) { server._handle = handle; server._listen2(address, port, addressType, backlog, fd, flags); } }
listenIncluster 函數會調用子進程 cluster 模塊的 _getServer 函數。
cluster._getServer = function(obj, options, cb) { let address = options.address; const message = { act: 'queryServer', index, data: null, ...options }; message.address = address; // 給主進程發送消息 send(message, (reply, handle) => { // 根據不同模式做處理 if (handle) shared(reply, handle, indexesKey, cb); else rr(reply, indexesKey, cb); }); };
從上面代碼中可以看到,_getServer 函數會給主進程發送一個 queryServer 的請求并設置了一個回調函數。看一下主進程是如何處理 queryServer 請求的。
function queryServer(worker, message) { const key = `${message.address}:${message.port}:${message.addressType}:${message.fd}:${message.index}`; let handle = handles.get(key); if (handle === undefined) { let address = message.address; let constructor = RoundRobinHandle; // 根據策略選取不同的構造函數,UDP 只能使用共享模式,因為 UDP 不是基于連接的,沒有連接可以分發 if (schedulingPolicy !== SCHED_RR || message.addressType === 'udp4' || message.addressType === 'udp6') { constructor = SharedHandle; } handle = new constructor(key, address, message.port, message.addressType, message.fd, message.flags); handles.set(key, handle); } handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); // 返回結果給子進程 send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); }); }
queryServer 首先根據調度策略選擇構造函數并創建一個對象,然后執行該對象的 add 方法并且傳入一個回調。下面看看不同策略下的處理。
首先看看共享模式的實現,共享模式對應前面分析的主進程管理子進程,多個子進程共同 accept 處理連接這種方式。
function SharedHandle(key, address, port, addressType, fd, flags) { this.key = key; this.workers = []; this.handle = null; this.errno = 0; let rval; if (addressType === 'udp4' || addressType === 'udp6') rval = dgram._createSocketHandle(address, port, addressType, fd, flags); else rval = net._createServerHandle(address, port, addressType, fd, flags); if (typeof rval === 'number') this.errno = rval; else this.handle = rval; }
SharedHandle 是共享模式,即主進程創建好 handle,交給子進程處理,接著看它的 add 函數。
SharedHandle.prototype.add = function(worker, send) { this.workers.push(worker); send(this.errno, null, this.handle); };
SharedHandle 的 add 把 SharedHandle 中創建的 handle 返回給子進程。接著看子進程拿到 handle 后的處理。
function shared(message, handle, indexesKey, cb) { const key = message.key; const close = handle.close; handle.close = function() { send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); // 因為是共享的,可以直接 close 掉而不會影響其它子進程等 return close.apply(handle, arguments); }; handles.set(key, handle); // 執行 net 模塊的回調 cb(message.errno, handle); }
shared 函數把接收到的 handle 再回傳到調用方,即 net 模塊的 listenOnMasterHandle 函數,listenOnMasterHandle 會執行 listen 開始監聽地址。
function setupListenHandle(address, port, addressType, backlog, fd, flags) { // this._handle 即主進程返回的 handle // 連接到來時的回調 this._handle.onconnection = onconnection; this._handle[owner_symbol] = this; const err = this._handle.listen(backlog || 511);}
這樣多個子進程就成功啟動了服務器。共享模式的核心邏輯是主進程在 _createServerHandle 創建 handle 時執行 bind 綁定了地址(但沒有 listen),然后通過文件描述符傳遞的方式傳給子進程,子進程執行 listen 的時候就不會報端口已經被監聽的錯誤了,因為端口被監聽的錯誤是執行 bind 的時候返回的。邏輯如下圖所示。
看一個共享模式的使用例子。
const cluster = require('cluster');const os = require('os');// 設置為共享模式cluster.schedulingPolicy = cluster.SCHED_NONE;// 主進程 fork 多個子進程if (cluster.isMaster) { // 通常根據 CPU 核數創建多個進程 os.cpus().length for (let i = 0; i < 3; i++) { cluster.fork(); }} else { // 子進程創建服務器 const net = require('net'); const server = net.createServer((socket) => { socket.destroy(); console.log(`handled by process: ${process.pid}`); }); server.listen(8080);}
接著看輪詢模式,輪詢模式對應前面的主進程 accept,分發給多個子進程處理這種方式。
function RoundRobinHandle(key, address, port, addressType, fd, flags) { this.key = key; this.all = new Map(); this.free = []; this.handles = []; this.handle = null; this.server = net.createServer(assert.fail); if (fd >= 0) this.server.listen({ fd }); else if (port >= 0) { // 啟動一個服務器 this.server.listen({ port, host: address, ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), }); } else this.server.listen(address); // UNIX socket path. // 監聽成功后,注冊 onconnection 回調,有連接到來時執行 this.server.once('listening', () => { this.handle = this.server._handle; // 分發請求給子進程 this.handle.onconnection = (err, handle) => this.distribute(err, handle); this.server._handle = null; this.server = null; }); }
因為 RoundRobinHandle的 工作模式是主進程負責監聽,收到連接后分發給子進程,所以 RoundRobinHandle 中直接啟動了一個服務器,當收到連接時執行 this.distribute 進行分發。接著看一下RoundRobinHandle 的 add 函數。
RoundRobinHandle.prototype.add = function(worker, send) { this.all.set(worker.id, worker); const done = () => { // send 的第三個參數是 null,說明沒有 handle if (this.handle.getsockname) { const out = {}; this.handle.getsockname(out); send(null, { sockname: out }, null); } else { send(null, null, null); // UNIX socket. } this.handoff(worker); }; // 否則等待 listen 成功后執行回調 this.server.once('listening', done); };
RoundRobinHandle 會在 listen 成功后執行回調。我們回顧一下執行 add 函數時的回調。
handle.add(worker, (errno, reply, handle) => { const { data } = handles.get(key); send(worker, { errno, key, ack: message.seq, data, ...reply }, handle); });
回調函數會把 handle 等信息返回給子進程。但是在 RoundRobinHandle 和 SharedHandle 中返回的 handle 是不一樣的,分別是 null 和 net.createServer 實例,因為前者不需要啟動一個服務器,它只需要接收來自父進程傳遞的連接就行。
接著我們回到子進程的上下文,看子進程是如何處理的,剛才我們講過,不同的調度策略,返回的 handle 是不一樣的,我們看輪詢模式下的處理。
function rr(message, indexesKey, cb) { let key = message.key; // 不需要 listen,空操作 function listen(backlog) { return 0; } function close() { // 因為 handle 是共享的,所以無法直接關閉,需要告訴父進程,引用數減一 if (key === undefined) return; send({ act: 'close', key }); handles.delete(key); indexes.delete(indexesKey); key = undefined; } // 構造假的 handle 給調用方 const handle = { close, listen, ref: noop, unref: noop }; handles.set(key, handle); // 執行 net 模塊的回調 cb(0, handle); }
round-robin 模式下,Node.js 會構造一個假的 handle 返回給 net 模塊,因為調用方會調用 handle 的這些函數。當有請求到來時,round-bobin 模塊會執行 distribute 分發連接給子進程。
RoundRobinHandle.prototype.distribute = function(err, handle) { // 首先保存 handle 到隊列 this.handles.push(handle); // 從空閑隊列獲取一個子進程 const worker = this.free.shift(); // 分發 if (worker) this.handoff(worker); }; RoundRobinHandle.prototype.handoff = function(worker) { // 拿到一個 handle const handle = this.handles.shift(); // 沒有 handle,則子進程重新入隊 if (handle === undefined) { this.free.push(worker); return; } // 通知子進程有新連接 const message = { act: 'newconn', key: this.key }; sendHelper(worker.process, message, handle, (reply) => { // 接收成功 if (reply.accepted) handle.close(); else // 結束失敗,則重新分發 this.distribute(0, handle); // 繼續分發 this.handoff(worker); }); };
可以看到 Node.js 沒用按照嚴格的輪詢,而是哪個進程接收連接快,就繼續給它分發。接著看一下子進程是怎么處理該請求的。
function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle); } function onconnection(message, handle) { const key = message.key; const server = handles.get(key); const accepted = server !== undefined; // 回復接收成功 send({ ack: message.seq, accepted }); if (accepted) // 在 net 模塊設置 server.onconnection(0, handle); }
最終執行 server.onconnection 進行連接的處理。邏輯如下圖所示。
看一下輪詢模式的使用例子。
const cluster = require('cluster');const os = require('os');// 設置為輪詢模式cluster.schedulingPolicy = cluster.SCHED_RR;// 主進程 fork 多個子進程if (cluster.isMaster) { // 通常根據 CPU 核數創建多個進程 os.cpus().length for (let i = 0; i < 3; i++) { cluster.fork(); }} else { // 子進程創建服務器 const net = require('net'); const server = net.createServer((socket) => { socket.destroy(); console.log(`handled by process: ${process.pid}`); }); server.listen(8080);}
實現一個高性能的服務器是非常復雜的,涉及到很多復雜的知識,但是即使不是服務器開發者,了解服務器相關的一些知識也是非常有用的。
本文鏈接:http://www.tebozhan.com/showinfo-26-12149-0.htmlNode.js 是如何處理請求的
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com