从零实现一个基于cpp协程的tiny webserver
Seele Lv1

webserver 在 cpper 眼里好像已经是烂大街的存在了,但是没有搓过的我也不敢随意评价,于是这一个月上手搓了一个小玩具。

实际操作起来还是有许多有点意思的地方的(当你想做到高性能的时候)。所以也来记录分享一下这一个月的想法和收获。

技术栈的选择

最开始做构思的时候,上网上查了一下,发现多数都是做的 reactor + epoll。那我就想搞点特殊的,搞个 proactor + io_uring。

而后发现协程天生适合 io_uring ,最终选择的就是 proactor + io_uring + cpp coroutine 作为技术栈。

最终的成品

30 行搭建一个 GET 服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <print>
#include "http.h"
#include "log.h"
#include "server.h"
using namespace seele;

int main() {
log::logger().set_output_file("web_server.log");
auto tiny_app = [](const http::query_t& query, const http::header_t& header) {
std::println("Received GET request for /tiny_app with query: {}", query);
if (query != "hello!"){
return web::send_http_error(http::status_code::bad_request);
}
std::println("Headers:");
for (const auto& [key, value] : header) {
std::println(" {}: {}", key, value);
}

return web::send_msg(
{
http::status_code::ok,
{
{"Content-Type", "text/plain; charset=utf-8"}
},
"Hello from tiny_get_app!"
}
);
};


app().set_addr("127.0.0.1:80")
.set_root_path("/home/seele/webserver/static")
.GET("/tiny_app.so", tiny_app)
.run();
return 0;
}

目前我也用这个 webserver 搭建了属于本人的静态 blog 网站 seeleofficial.blog

性能方面的话,已实现的功能性能……可能……好像……大概……已经…………与 Nginx 持平

因为 WSL 内不知道搞了什么东西,限制端口范围只有 4096 个,捣鼓半天也不知道怎么调整限制,所以最终测试的时候只选择了 1500 的连接数来测试。为了测试的公平性,我实现的是线程模型,而 Nginx 是进程模型,所以线程和进程应该要保证一致,同样双方也都打开 logger。

这里我耍了点小流氓,进程模型锁的竞争几乎没有,而我的实现里有一小部分锁的使用,于是挑了 7 这么个不高不低的数字

最终测试的方法:

  • 环境: WSL Ubuntu 20
  • 方法:wrk -t10 -c1500 -d15s -T2s --latency 请求目录下同一文件

结果:

  • Nginx:
1
wrk -t10 -c1500 -d15s -T2s --latency http://127.0.0.1:8080/index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
Running 15s test @ http://127.0.0.1:8080/index.html
10 threads and 1500 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 33.74ms 19.60ms 241.24ms 74.16%
Req/Sec 4.51k 721.49 7.64k 71.95%
Latency Distribution
50% 30.17ms
75% 42.94ms
90% 58.96ms
99% 98.43ms
670992 requests in 15.10s, 1.48GB read
Requests/sec: 44447.54
Transfer/sec: 100.63MB
  • Tiny server:
1
wrk -t10 -c1500 -d15s -T2s --latency http://127.0.0.1:8081/index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
Running 15s test @ http://127.0.0.1:8081/index.html
10 threads and 1500 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 33.47ms 12.17ms 330.65ms 77.40%
Req/Sec 4.50k 1.32k 7.57k 50.47%
Latency Distribution
50% 29.32ms
75% 39.41ms
90% 51.92ms
99% 65.89ms
670373 requests in 15.09s, 1.39GB read
Requests/sec: 44422.65
Transfer/sec: 94.26MB

主要的一些收获点

io_uring

io_uring 设计理念上是提供一个异步的 io 接口。简单来说就是提交需要的 io 操作到提交队列,然后从完成队列获取 io 操作的结果。

  • 通过 SQE (Submission Queue Entry) 提交所需的系统调用信息,是一个非常复杂的结构体。不过绝大多数时候不需要手动去操作这么复杂的结构体, liburing 库提供了非常方便的 C 函数来。如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    /*
    * IO submission data structure (Submission Queue Entry)
    */
    struct io_uring_sqe {
    __u8 opcode; /* type of operation for this sqe */
    __u8 flags; /* IOSQE_ flags */
    __u16 ioprio; /* ioprio for the request */
    __s32 fd; /* file descriptor to do IO on */
    union {
    __u64 off; /* offset into file */
    __u64 addr2;
    struct {
    __u32 cmd_op;
    __u32 __pad1;
    };
    };
    union {
    __u64 addr; /* pointer to buffer or iovecs */
    __u64 splice_off_in;
    };
    __u32 len; /* buffer size or number of iovecs */
    union {
    __kernel_rwf_t rw_flags;
    __u32 fsync_flags;
    __u16 poll_events; /* compatibility */
    __u32 poll32_events; /* word-reversed for BE */
    __u32 sync_range_flags;
    __u32 msg_flags;
    __u32 timeout_flags;
    __u32 accept_flags;
    __u32 cancel_flags;
    __u32 open_flags;
    __u32 statx_flags;
    __u32 fadvise_advice;
    __u32 splice_flags;
    __u32 rename_flags;
    __u32 unlink_flags;
    __u32 hardlink_flags;
    __u32 xattr_flags;
    __u32 msg_ring_flags;
    __u32 uring_cmd_flags;
    };
    __u64 user_data; /* data to be passed back at completion time */
    /* pack this to avoid bogus arm OABI complaints */
    union {
    /* index into fixed buffers, if used */
    __u16 buf_index;
    /* for grouped buffer selection */
    __u16 buf_group;
    } __attribute__((packed));
    /* personality to use, if used */
    __u16 personality;
    union {
    __s32 splice_fd_in;
    __u32 file_index;
    struct {
    __u16 addr_len;
    __u16 __pad3[1];
    };
    };
    union {
    struct {
    __u64 addr3;
    __u64 __pad2[1];
    };
    /*
    * If the ring is initialized with IORING_SETUP_SQE128, then
    * this field is used for 80 bytes of arbitrary command data
    */
    __u8 cmd[0];
    };
    };
  • 通过 CQE (Completion Queue Entry) 获取系统调用的结构,这个结构体就没有这么复杂了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /*
    * IO completion data structure (Completion Queue Entry)
    */
    struct io_uring_cqe {
    __u64 user_data; /* sqe->data submission passed back */
    __s32 res; /* result code for this event */
    __u32 flags;

    /*
    * If the ring is initialized with IORING_SETUP_CQE32, then this field
    * contains 16-bytes of padding, doubling the size of the CQE.
    */
    __u64 big_cqe[];
    };

因为操作是完全异步的,提交顺序与完成顺序不会保持一致,而是先完成的先返回。为了能够辨认不同 CQE 对应提交的 SQE 是哪个,io_uring 提供了 user_data 字段来辨认。提交的 SQE user_data 字段会写入到对应的 CQE user_data 字段。

更详细的 io_uring 教程可以看这篇文章 io_uring By Example: An Article Series

cpp coroutine

一般来说使用 io_uring 都是基于回调的,比如将 user_data 设置为回调函数的地址。但是调用链一长很容易就会陷入回调地狱:

1
2
3
4
5
6
// c 的回调地狱
callback(callback(callback));

// cpp 封装一下稍微好点
task.then([](){}).then([](){}).then([](){})

协程的设计刚刚好可以解决回调地狱的问题,将异步回调的写法变成同步的写法:

1
2
3
4
5
6
7
8
9
// int32_t fd = ...;

sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);

int32_t ret = co_await
coro_io::awaiter::accept{fd, (sockaddr *)&client_addr, &client_addr_len};

// handle ret ...

简单来说就是通过 co_await 挂起当前协程,直到数据准备完毕再通过协程的 handle 去 resume 当前协程。

但是这样会面临一个问题,resume 的协程会堵塞当前线程,可是下一个 CQE 已经到达,为了效率你必须处理它。

那么线程池的需求呼之欲出了。

thread pool 线程池

不需要任何花里胡哨的功能,只供协程恢复使用。直接信号量消费者生产者队列一把梭,有效代码甚至50行都没有。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
namespace seele::coro::thread {

class pool{
public:
static pool& get_instance();

auto submit(std::coroutine_handle<> h){
tasks.emplace_back(h);
sem.release();
}

pool(const pool&) = delete;
pool(pool&&) = delete;
pool& operator=(const pool&) = delete;
pool& operator=(pool&&) = delete;
private:

void worker(std::stop_token st);

pool(size_t worker_count);
~pool();

structs::msc_queue<std::coroutine_handle<>> tasks;
std::vector<std::jthread> workers;
std::counting_semaphore<> sem;
};
pool& pool::get_instance(){
static pool instance{5};
return instance;
}


void pool::worker(std::stop_token st){
while(sem.acquire(), !st.stop_requested()){

auto h = tasks.pop_front();

h->resume();
}
}
pool::pool(size_t worker_count) : sem{0} {
workers.reserve(worker_count);
for(size_t i = 0; i < worker_count; ++i){
workers.emplace_back([this](std::stop_token st){
this->worker(st);
});
}
}

pool::~pool() {
for (auto& worker : workers) {
worker.request_stop();
}
sem.release(workers.size());
}

inline auto dispatch(std::coroutine_handle<> handle) {
pool::get_instance().submit(handle);
}

struct dispatch_awaiter{
bool await_ready() { return false; }

void await_suspend(std::coroutine_handle<> handle) {
dispatch(handle);
}

void await_resume() {}

explicit dispatch_awaiter(){}
};
}

但是这是一个高竞争的场所,锁的使用会引起极大的开销,那就需要引入无锁数据结构了。上述代码中的结构体 msc_queue 就是一个无锁队列。

lock free queue 无锁队列

无锁数据结构一直都是程序员眼前的一座大山。可作为一名 cpper 怎么可能败倒在无锁数据结构的淫威下呢?

经过一段时间学习后,搜寻到一个又简单又有性能的实现,也就是 Michael & Scott Queue。核心的思想就是通过一个空节点,避免同时首尾操作同一个节点。

刷刷刷一百行多行就实现了

点击运行:
————UAF(Use after free)————

我明明是按着原理实现的为什么会 UAF 呢?

说实话我真的是百思不得其解,我一度怀疑是我理解错了。因为按照原理来说 dequeue 出来的节点,使用完就应该释放了,但是这个时候可能还有线程在引用这个节点,在原理中并没有说明该如何处理。后来问了一下 gpt 才知道,这个问题叫做 ABA 问题。无锁数据结构的一大核心问题就是处理 ABA, 一般的解决办法是设计垃圾回收。但是这是 cpp,即使 GC 在十几年前就引入了,但是从未有编译器真正实现。在 cpp 23 也移除这个 GC,cpp26 某种意义上用 Hazard Point 来替代了。Hazard Point 也是多数无锁数据结构的解决 ABA 问题多数使用的方案。

最后我选择了 Hazard Point,又花了数百行来实现……

写完了程序,自然是得做 benchmark。与 std::queue + std::mutex 做对比:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Threads: 16
Operations per thread: 1000000
Total operations: 16000000
------------------------------------------
Testing std::queue + std::mutex...
Duration: 1.83 seconds
Throughput: 8.75 M ops/sec
Successful Pops: 8000000
------------------------------------------
Testing ms_queue (Lock-Free, node-based)...
Duration: 2.10 seconds
Throughput: 7.61 M ops/sec
Successful Pops: 8000000
------------------------------------------

一顿操作猛如虎,怎么比标准库还慢!!??

于是去观摩了 std::queue 底层 std::deque 的实现,原来是因为其中使用了块分配优化。那相应的我也应该实现,不过这里块的实现也需要无锁的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Threads: 16
Operations per thread: 1000000
Total operations: 16000000
------------------------------------------
Testing std::queue + std::mutex...
Duration: 1.73 seconds
Throughput: 9.26 M ops/sec
Successful Pops: 8000000
------------------------------------------
Testing ms_queue (Lock-Free, node-based)...
Duration: 2.11 seconds
Throughput: 7.57 M ops/sec
Successful Pops: 8000000
------------------------------------------
Testing msc_queue (Lock-Free, chunked)...
Duration: 1.26 seconds
Throughput: 12.70 M ops/sec
Successful Pops: 8000000
------------------------------------------

emm…,最后好像确实是提升了一些。虽然不多,不过不管怎样也是比标准库强了

还有很多的优化策略,比如说按 thread id 分配块,将 MPMC 的问题转换为多个 SPMC 的问题。即使我还想继续提升他,但是作为一个玩具,它已经走完它应有的生命周期了。

在实现中也存在着部分 UB,不过我的一位大佬朋友说:

  • 你甭管 UB 不 UB,只要他 B 的对就是好 Behavior

我:

  • ✍✍✍😭🙌🙌

batch 批处理

在 linux 中,因为高贵的上下文切换,syscall 的开销都是以 ms 为单位的。所以说做 linux 优化的一大核心点就是如何减少 syscall。

而 io_uring 的一大优势点就是可以攒齐多个 syscall 一并提交,将多个系统调用合并成一个系统调用。不过要注意,对 SQE 处理是非原子的,随之带来的并发问题也需要正确处理。

我选择了用户态轮询方案:

  • 通过 MPSC 队列,用一个专门的 consumer 线程来消费 SQE
  • 当 SQE 达到一个阈值时 consumer 就刷新 SQE ring buffer

这里也同样应该避免锁的使用,拿原来的 MPMC 无锁队列魔改一下。简化后的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// producer
template<typename derived>
struct base {
std::atomic<int32_t> io_ret;
std::coroutine_handle<> handle;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> handle) {
this->handle = handle;
ctx::get_instance().submit(
this,
[](void* helper_ptr, io_uring* ring) {
return static_cast<decltype(this)>(helper_ptr)->init(ring);
}
);
}
int32_t await_resume() {
//......
}

int init(io_uring* ring) {
auto* sqe = io_uring_get_sqe(ring);
static_cast<derived*>(this)->setup(sqe);

// ......


return 1;// Return the number of sqe written
}

void setup(io_uring_sqe* sqe) { std::terminate();} // Default setup, can be overridden by derived classes
};


// consumer
void ctx::worker(std::stop_token st){

while (!st.stop_requested()) {
size_t submit_count = 0;
if (unp_sem.try_acquire_for(25ms)) {
auto req = unprocessed_requests.pop_front();
auto& [helper_ptr, ring_handle] = req.value();

submit_count += ring_handle(helper_ptr, &ring);
if (submit_count >= submit_threshold) {
io_uring_submit(&ring);
submit_count = 0;
}
} else {
if (submit_count) {
io_uring_submit(&ring);
submit_count = 0;
}
}
}

}

而 io_uring 也提供了一个方案 SQPOLL,这种方案甚至可以避免系统调用

  • 内核提供一个专门的线程来轮询是否出现了新的 SQE
  • 如果出现了新的 SQE 就处理并消费它

zero copy 零拷贝

在传统 io 中,如果要将磁盘数据发送至网络端。一般的操作为

1
2
3
4
5
6
7
8
std::byte buffer[1024];

int file_fd = open(file_path, O_RDONLY);

int read_bytes = read(fd, buf, file_size);

int ret = write(socket_fd, buffer, read_bytes)

这经过了四次拷贝:

  • 磁盘 -> 内核 -> 用户态 -> 内核 -> 网卡

很明显 内核 -> 用户态 -> 内核 这一段完全是多余的。

早在 linux 2.1 版本就有了 sendfile 来去除中间这一段多余拷贝,将四次拷贝变为俩次甚至一次。同样 mmap + write 也可以实现零拷贝。而 io_uring 也提供了 IORING_OP_SPLICE 来实现零拷贝。

但是呢我们这是 webserver,写入到网卡的内容有俩段:

  • http 头部
  • 文件内容

因为要保证头部和内容的顺序,所以必须 write 再 splice,而这就是俩次调用。而 writev 正好可以将多个数据向量按顺序一起提交,所以最终是 mmap + writev。

到这里我们只实现了文件的零拷贝,对于头部或者其他类型的 http 消息却没有实现。不过 io_uring 提供了 IORING_OP_SENDMSG_ZC IORING_OP_SEND_ZC 一系列操作来支持。但是要保证极致的性能就需要自己管理 buffer 的注册,这又是一个复杂的工程,所以我选择跳过人生。

一些不知所谓的东西

  • 很多时候总是想既要又要,想到一个完美的办法,但是这里空太小写不下(bushi 其实是发现好复杂写下来就是一堆脑细胞)。虽然这妥协那妥协的,也写了 4500 多行了,也是自己完成的最大的玩具项目了。

  • 虽然说是一个 webserver,但是更重点的是 server 而非 web。4500 多行里实际上真正处理 http 报文的只有 500 行不到。server 确实比 web 稍微有趣点,写 parser 真的是一件很枯燥的事情,基本就是对着 RFC 上定义照猫画虎。

  • 有位大佬说:不做 benchmark 的优化毫无意义。确实,不做 benchmark,真不知道自己几斤几两。最开始一顿写下来,感觉这是我有史以来写过的最完美的代码,一看 qps 一万出头。

  • 在 benchmark 过程中我一直有一件非常苦恼的事情。就是并发连接数一多,在启动阶段始会出现 timeout。我想了好几天也没有想明白我写错在哪里,一度怀疑被资本做局了。直到某个摆烂的一天晚上,晚上洗完澡上床打算玩玩星穹铁道,等待的过程中不由自主的开始回顾这几天写的代码。一下子就抓住了关键点,原来是负责 accept 的协程只有一个。算一下,平均一个 40 ms,1000 * 40 ms = 40000 ms = 40s,基本全军覆没。解决办法也很简单,通过 accept 设置 SO_REUSEPORT 标志,然后多个协程同时启动监听。还有更好的办法就是 multishot accept,不过那样要重新设计一个基于回调的线程池了,还要做类型擦除什么的太麻烦,我就没实现了。所以说查不出来的 bug 就随他去吧,指不定吃个饭洗个澡之后回来想想就查出来了。

  • 有遗憾的就是没写单测这种东西,不过我是包管理苦手,引入外部库对我来说还是太复杂了。可能哪天一时兴起就给加个 Google test 吧。

Powered by Hexo & Theme Keep
Unique Visitor Page View