使用 Mio 编写底层 TCP 服务器

原文

是时候认识 (acquainted) 一下 Metal IO 了,它是在 epoll/kqueue 之上用 Rust 编写的跨平台抽象。

在本文中,我们将会展示并解释如何编写一个单线程异步 TCP 服务器,用它模拟 HTTP 协议,然后使用ab/wrk对其进行 benchmark。结果将会令人印象深刻。

Getting started

我使用的是mio = "0.6"

首先,需要 TCP listener。


#![allow(unused)]
fn main() {
let address = "0.0.0.0:8080";
let listener = TcpListener::bind(&address.parse().unwrap()).unwrap();
}

然后创建Poll对象并将 listener 注册到Token(0)中用于可读事件 (readable events),由 edge (而不是 level) 激活。更多内容请参阅 edge vs level


#![allow(unused)]
fn main() {
let poll = Poll::new().unwrap();
poll.register(
    &listener, 
    Token(0),
    Ready::readable(),
    PollOpt::edge()).unwrap();
}

下一步我们要做的就是根据给定的容量创建Events对象以及主循环(本例中是无限循环)。在循环中,事件被一一轮询并处理。


#![allow(unused)]
fn main() {
let mut events = Events::with_capacity(1024);
loop {
    poll.poll(&mut events, None).unwrap();
    for event in &events {
        // handle the event
    }
}
}

Accepting connections (and dropping them)

事件可以是以下其中一种:

  • listener 上的可读事件意味着有要准备接入的连接。
  • 已连接的 socket 上的事件
  • readable - socket 有数据可以读取
  • writable - socket 已经写数据就绪

listener 以及 socket 事件可以被 token 区分,对于 listener token 它总是 0,因为它已在Poll中注册。

以下代码是最简单的事件处理方式,在循环中接受所有的传入连接,并且对于每个连接 - 只需删除 socket。它将会关闭连接。在你的服务中抛弃协议


#![allow(unused)]
fn main() {
// handle the event
match event.token() {
    Token(0) => {
        loop {
            match listener.accept() {
                Ok((socket, address)) => {
                    // What to do with the connection?
                    // One option is to simply drop it!
                    println!("Got connection from {}", address);
                },
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock =>
                    // No more connections ready to be accepted 
                    break,
                Err(e) => 
                    panic!("Unexpected error: {}", e)
            }
        }
    },
    _ => () // Ignore all other tokens
}
}

listener 的.accept()方法返回std::io::Result<(TcpStream, SocketAddr)>(见 accept),因此我需要匹配并处理成功的响应或者错误。这里有一个特定的错误类型 io::ErrorKind::WouldBlock,它表示“我将等待(阻塞)以取得任何进展”。这是非阻塞 (non-blocking) 行为的本质 - 关键是不要阻塞(而是返回相应的错误)!遇到此类错误时,意味着此时没有更多的传入连接等待接入,因此循环中断,并处理下一个事件。

现在如果我运行服务器并尝试和它建立连接,我可以看到正在抛弃协议!是不是很神奇?

<span class="katex"><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.64444em;vertical-align:0em;"></span><span class="mord mathnormal">n</span><span class="mord mathnormal">c</span><span class="mord">127.0.0.18080</span></span></span></span> 

Registering connections for events

接着说下一个事件。为了发生下一个事件,首先必须使用Poll注册 token-socket 对。在底层 (under the hook),Poll将会跟踪哪一个 token 对应哪一个 socket,但是客户端代码只能访问 token。这意味着如果服务器打算与客户端进行实际通信(我很确信大多数服务器都这样做),就必须以某种方式存储 token-socket 对。在本例中,我使用了简单的HashMap<Token, TcpStream>,但是使用 slab 可能会更加高效。

token 只是usize的一个封装器,因此简单的计数器就足以提供递增的 token 序列。一旦使用相应的 token 注册了 socket,它就会被插入到HashMap中。


#![allow(unused)]
fn main() {
let mut counter: usize = 0;
let mut sockets: HashMap<Token, TcpStream> = HashMap::new();

// handle the event
match event.token() {
    Token(0) => {
        loop {
            match listener.accept() {
                Ok((socket, _)) => {
                    counter += 1;
                    let token = Token(counter);

                    // Register for readable events
                    poll.register(&socket, token
                        Ready::readable(),
                        PollOpt::edge()).unwrap();

                    sockets.insert(token, socket);                    
                },
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock =>
                    // No more connections ready to be accepted 
                    break,
                Err(e) => 
                    panic!("Unexpected error: {}", e)
            }
        }
    },
    token if event.readiness().is_readable() => {
        // Socket associated with token is ready for reading data from it
    }
}
}

Reading data from client

当给定 token 发生可读事件时,意味着数据在相应的 socket 中读就绪。我将只使用字节数组作为读取数据的缓冲区。

在循环中执行读取操作,直到返回已知的WouldBlock错误。每次调用read将返回(如果成功的话)实际读取的字节数,当读取的字节数为 0 时 - 意味着客户端已经断开连接,此后保持 socket (或继续循环读取)没有意义。


#![allow(unused)]
fn main() {
// Fixed size buffer for reading/writing to/from sockets
let mut buffer = [0 as u8; 1024];
...
token if event.readiness().is_readable() => {
    loop {
        let read = sockets.get_mut(token).unwrap().read(&mut buffer);
        match read {
            Ok(0) => {
                // Successful read of zero bytes means connection is closed
                sockets.remove(token);
                break;
            },
            Ok(len) => {
                // Now do something with &buffer[0..len]
                println!("Read {} bytes for token {}", len, token.0);
            },
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => break,
            Err(e) => panic!("Unexpected error: {}", e)
        }
    }
}
...
}

Writing data to the client

对于接收可写事件的 token,它必须先在Poll中注册。oneshot选项对于安排可写事件可能很有用,该选项确保感兴趣的 (interest) 事件只被触发一次。


#![allow(unused)]
fn main() {
poll.register(&socket, token
    Ready::writable(),
    PollOpt::edge() | PollOpt::oneshot()).unwrap();
}

向客户端 socket 写入数据与之类似,也是通过缓冲区完成的,但是不需要显式循环,因为已经有一个方法在执行循环:write_all()

如果我想让协议返回接收到的字节数,我将需要写入的实际字节数(HashMap将会做这件事),在发生可读事件时计算字节数,然后安排一次可写事件,以及何时发生可写事件 - 然后发送响应并断开连接。


#![allow(unused)]
fn main() {
let mut response: HashMap<Token, usize> = HashMap::new();
...
token if event.readiness().is_readable() => {
    let mut bytes_read: usize = 0;
    loop {
        ... // sum up number of bytes received
    }
    response.insert(token, bytes_read);
    // re-register for one-shot writable event
}
...
token if event.readiness().is_writable() => {
    let n_bytes = response[&token];
    let message = format!("Received {} bytes\n", n_bytes);
    sockets.get_mut(&token).unwrap().write_all(message.as_bytes()).unwrap();
    response.remove(&token);
    sockets.remove(&token); // Drop the connection
},
}

What happens between reading and writing data?

此时我已经从 socket 上读取了数据,并且将数据写入 socket 中。但是写入事件永远也不会发生,因为没有为可写事件注册 token!

我应该什么时候为可写事件注册 token?好吧,当它有东西要写入的时候(进行注册)!听起来很简单,不是吗?在实践中,这意味着要真正实现一些协议了。

How do I implement a protocol?

我只想发回文本(或 JSON),而 TCP 是一种协议,一种传输级的传输控制协议。TCP 关心接收方以发送方发送的确切顺序来接收确切数量的字节!所以在传输级别,我必须处理两个字节流:一个从客户端到服务端,另一个直接返回。

与服务器打交道时应用层协议会很有用(如 HTTP)。应用层协议可以定义实体,如request - 服务器从客户端接收,以及response - 客户端从服务器接收回来。

值得一提的是,正确实现 HTTP 并不像听起来那么容易。但是已经有现成的 HTTP 库可供使用(如 hyper)。在这里,我不会为如何实现 HTTP 而烦恼,我要做的是让我的服务器表现的好像它真的理解 GET 请求,但总会用包含 6 个字节的响应来应答这样的请求:b"hello \n"

Mocking HTTP

对于本文而言,mock HTTP 已经绰绰有余。我将把 HTTP 请求头与请求体(如果有的话)用 4 个字节b"\r\n\r\n"进行分割。因此,如果我跟踪当前客户端发送的内容,并且在任何时候那里都有 4 个字节,我就可以使用预定义的 HTTP 响应进行应答:

HTTP/1.1 200 OK
Content-Type: text/html
Connection: keep-alive
Content-Length: 6

hello

HashMap就已经足够用于跟踪所有接收到的字节。


#![allow(unused)]
fn main() {
let mut requests: HashMap<Token, Vec<u8>> = HashMap::new();
}

一旦读取结束,就需要检查请求是否已就绪:


#![allow(unused)]
fn main() {
fn is_double_crnl(window: &[u8]) -> bool {  /* trivial */ }

let ready = requests.get(&token).unwrap()
    .windows(4)
    .find(|window| is_double_crnl(*window))
    .is_some();
}

如果已就绪,则可以安排一些数据写入!


#![allow(unused)]
fn main() {
if ready {
    let socket = sockets.get(&token).unwrap();
    poll.reregister(
        socket,
        token,
        Ready::writable(),
        PollOpt::edge() | PollOpt::oneshot()).unwrap();
}
}

写入完成之后,重要的是要保持连接打开,并重新注册 socket 以再次读取。


#![allow(unused)]
fn main() {
poll.reregister(
    sockets.get(&token).unwrap(),
    token,
    Ready::readable(),
    PollOpt::edge()).unwrap();
}

服务器已就绪!

<span class="katex"><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.69444em;vertical-align:0em;"></span><span class="mord mathnormal">c</span><span class="mord mathnormal">u</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal" style="margin-right:0.01968em;">ll</span><span class="mord mathnormal">oc</span><span class="mord mathnormal">a</span><span class="mord mathnormal" style="margin-right:0.01968em;">l</span><span class="mord mathnormal">h</span><span class="mord mathnormal">os</span><span class="mord mathnormal">t</span><span class="mspace" style="margin-right:0.2777777777777778em;"></span><span class="mrel">:</span><span class="mspace" style="margin-right:0.2777777777777778em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord">8080</span><span class="mord mathnormal">h</span><span class="mord mathnormal">e</span><span class="mord mathnormal" style="margin-right:0.01968em;">ll</span><span class="mord mathnormal">o</span><span class="mord">‘‘‘</span><span class="mord cjk_fallback">好戏开始了</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord cjk_fallback">让我们看看这个单线程服务器表现如何。我将会使用常用的工具:</span><span class="mord">‘</span><span class="mord mathnormal">ab</span><span class="mord">‘</span><span class="mord cjk_fallback">和</span><span class="mord">‘</span><span class="mord mathnormal" style="margin-right:0.02691em;">w</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord">‘</span><span class="mord cjk_fallback">。</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord">‘</span><span class="mord mathnormal">ab</span><span class="mord">‘</span><span class="mord cjk_fallback">需要使用</span><span class="mord">‘</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.8888799999999999em;vertical-align:-0.19444em;"></span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord">‘</span><span class="mord cjk_fallback">选项以使用</span><span class="mord">‘</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord mathnormal">ee</span><span class="mord mathnormal">p</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord mathnormal">a</span><span class="mord mathnormal" style="margin-right:0.01968em;">l</span><span class="mord mathnormal">i</span><span class="mord mathnormal" style="margin-right:0.03588em;">v</span><span class="mord mathnormal">e</span><span class="mord">‘</span><span class="mord cjk_fallback">并重用已有连接。</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord">‘</span><span class="mord mathnormal" style="margin-right:0.02691em;">w</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord">2‘</span><span class="mord cjk_fallback">实际与</span><span class="mord">‘</span><span class="mord mathnormal" style="margin-right:0.02691em;">w</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord">‘</span><span class="mord cjk_fallback">用法相同,因此需要</span><span class="mord">‘</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord">−</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal">a</span><span class="mord mathnormal">t</span><span class="mord mathnormal">e</span><span class="mord">‘</span><span class="mord cjk_fallback">参数。</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:1em;vertical-align:-0.25em;"></span><span class="mord">‘</span><span class="mord mathnormal">ab</span><span class="mord">/</span><span class="mord mathnormal" style="margin-right:0.02691em;">w</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord">‘</span><span class="mord cjk_fallback">运行在不同的</span><span class="mord mathnormal" style="margin-right:0.22222em;">V</span><span class="mord mathnormal" style="margin-right:0.10903em;">M</span><span class="mord cjk_fallback">上而不是在服务器上</span><span class="mopen">(</span><span class="mord cjk_fallback">但是在相同的</span><span class="mord mathnormal">re</span><span class="mord mathnormal" style="margin-right:0.03588em;">g</span><span class="mord mathnormal">i</span><span class="mord mathnormal">o</span><span class="mord mathnormal">n</span><span class="mord cjk_fallback">中</span><span class="mclose">)</span><span class="mord cjk_fallback">。以下是我在某个云提供商的实例</span><span class="mord">‘</span><span class="mord mathnormal">n</span><span class="mord">1</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:0.77777em;vertical-align:-0.08333em;"></span><span class="mord mathnormal">s</span><span class="mord mathnormal">t</span><span class="mord mathnormal">an</span><span class="mord mathnormal">d</span><span class="mord mathnormal">a</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal">d</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span><span class="mbin">−</span><span class="mspace" style="margin-right:0.2222222222222222em;"></span></span><span class="base"><span class="strut" style="height:1em;vertical-align:-0.25em;"></span><span class="mord">8</span><span class="mopen">(</span><span class="mord">8</span><span class="mord mathnormal" style="margin-right:0.03588em;">v</span><span class="mord mathnormal" style="margin-right:0.13889em;">CP</span><span class="mord mathnormal" style="margin-right:0.10903em;">U</span><span class="mord mathnormal">s</span><span class="mpunct">,</span><span class="mspace" style="margin-right:0.16666666666666666em;"></span><span class="mord">30</span><span class="mord mathnormal" style="margin-right:0.05017em;">GB</span><span class="mord mathnormal">m</span><span class="mord mathnormal">e</span><span class="mord mathnormal">m</span><span class="mord mathnormal" style="margin-right:0.03588em;">ory</span><span class="mclose">)</span><span class="mord">‘</span><span class="mord cjk_fallback">上尝试对服务器进行</span><span class="mord mathnormal">b</span><span class="mord mathnormal">e</span><span class="mord mathnormal">n</span><span class="mord mathnormal">c</span><span class="mord mathnormal">hma</span><span class="mord mathnormal" style="margin-right:0.02778em;">r</span><span class="mord mathnormal" style="margin-right:0.03148em;">k</span><span class="mord cjk_fallback">时得到的数字:</span><span class="mord">‘‘‘</span><span class="mord mathnormal">co</span><span class="mord mathnormal">n</span><span class="mord mathnormal">so</span><span class="mord mathnormal" style="margin-right:0.01968em;">l</span><span class="mord mathnormal">e</span></span></span></span> ab -n 1000000 -c 128 -k http://instance-1:8080/
<snip>
Requests per second:    105838.76 [#/sec] (mean)
Transfer rate:          9095.52 [Kbytes/sec] received
 wrk -d 60s -t 8 -c 128 --rate 150k http://instance-1:8080/
<snip>
Requests/sec: 120596.75
Transfer/sec: 10.12MB

对于单线程来说,105k 与 120k 的 rps 不算太差。

当然,这次可以当作是作弊,但只要涉及真实网络(即使在同一区域内),这就是负载下的真实服务器,这可能(或多或少)是使用单线程完成此网络速度的重要底线。

完成可运行的代码地址是:github,每一个 pull-request 由一个逻辑章节组成:

  • 初始化项目:PR#1
  • accept & discard: PR#2
  • read from socket:PR#3
  • writing to socket:PR#4
  • mocking HTTP:PR#5

Where to go from here

扩展到多线程:从这里开始。