概述
在使用 HTTP 的过程中,无论是浏览器还是 SDK 的 http 客户端,我们可能都会有需求希望能够复用 HTTP 连接,也就是在同一条 HTTP 连接上请求和响应多次 HTTP 请求,这也是我们常说的 HTTP 长连接。
那么本文就将从 HTTP 长连接的协议支持已经实际 Go 实现上来介绍一下长连接的相关知识。
协议支持
在 HTTP 1.1 的 RFC 中,特别对长连接单开一章节进行讨论。在开篇就先描述了一波使用持久连接的好处:
- 降低所有环节设备的资源使用量(客户端、服务器、路由器等的 CPU 和内存使用);
- 多个请求和响应可以在同一个连接里面 pipeline,提高连接利用率;
- 因为减少了新建和关闭 TCP 连接的过程,所以网络上的数据包会变少,减少了网络拥塞的情况;
- 请求的响应延迟会降低,因为节省了连接建立的时间;
- 错误的处理可以更加友好,短连接只能用断开连接来表示异常,持久连接加入了错误报告;
在 HTTP 1.1 的协议 RFC 中说明了,默认情况下 HTTP 1.1 都是长连接,除非客户端或者服务端发送了 Connection:Close 的 Header 的情况下才会认为这是一个短连接。
如果是长连接,如果在发送过程中连接异常断开了,那么客户端对于幂等的请求,应该重试,对于非幂等的请求,不能重试;在 HTTP 1.1 的 RFC 中,幂等的请求方法有 GET、HEAD、PUT 和 DELETE。
Go 实现
接下来,我们就看看 Go 1.22 是如何针对 HTTP 1.1 的协议进行处理的,我们会分别针对 Client 和 Server 进行不同的讨论。
客户端
在 Go 中,对于连接的管理是通过 Transport 来管理的,而 Transport 的类型为 RoundTripper,接口为:
[root@liqiang.io]# cat client.gotype RoundTripper interface {RoundTrip(*Request) (*Response, error)}
RoundTripper 是一个 interface,所以需要我们提供实现,但是,默认情况下,HTTP 客户端会使用 Default 的 RoundTripper:
[root@liqiang.io]# cat transport.govar DefaultTransport RoundTripper = &Transport{Proxy: ProxyFromEnvironment,DialContext: defaultTransportDialContext(&net.Dialer{Timeout: 30 * time.Second,KeepAlive: 30 * time.Second,}),ForceAttemptHTTP2: true,MaxIdleConns: 100,IdleConnTimeout: 90 * time.Second,TLSHandshakeTimeout: 10 * time.Second,ExpectContinueTimeout: 1 * time.Second,}func (t *Transport) roundTrip(req *Request) (*Response, error) {... ...for {select {case <-ctx.Done():req.closeBody()return nil, ctx.Err()default:}treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}cm, err := t.connectMethodForRequest(treq)... ...pconn, err := t.getConn(treq, cm)... ...resp, err = pconn.roundTrip(treq)... ...} else if !pconn.shouldRetryRequest(req, err) {... ...return nil, err}... ...// Rewind the body if we're able to.req, err = rewindBody(req)if err != nil {return nil, err}}}
从这个简单的实现中可以看到,Go 的 HTTP Client 默认情况下是长连接,并且最多允许有 100 条并发的长连接,且默认的连接 idle 时间是 90 秒,并且在一些情况下会进行重试,下面列举了一下可能重试的场景:
- HTTP2 场景下单条连接太多 stream 时;
- 如果请求不包含 body 或者我们可以重新获取 body 的情况;(这里没有区分幂等请求方法)
- 在读取响应的第一个字节之前就获得了非 EOF 的错误;
- 服务端关闭了空闲的连接;
上面这段代码可以简单看出来 Go 的 HTTP 客户端实现了这些特性,那么我们再深入一点看其他的代码逻辑:
[root@liqiang.io]# cat transport.gofunc (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {... ...w := &wantConn{cm: cm,key: cm.key(),ctx: ctx,ready: make(chan struct{}, 1),beforeDial: testHookPrePendingDial,afterDial: testHookPostPendingDial,}if delivered := t.queueForIdleConn(w); delivered {pc := w.pc... ...t.setReqCanceler(treq.cancelKey, func(error) {})return pc, nil}... ...}func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {if t.DisableKeepAlives {return false}t.idleMu.Lock()defer t.idleMu.Unlock()... ...var oldTime time.Timeif t.IdleConnTimeout > 0 {oldTime = time.Now().Add(-t.IdleConnTimeout)}// Look for most recently-used idle connection.if list, ok := t.idleConn[w.key]; ok {stop := falsedelivered := falsefor len(list) > 0 && !stop {pconn := list[len(list)-1]tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)if tooOld {go pconn.closeConnIfStillIdle()}if pconn.isBroken() || tooOld {list = list[:len(list)-1]continue}delivered = w.tryDeliver(pconn, nil)if delivered {if pconn.alt != nil {} else {t.idleLRU.remove(pconn)list = list[:len(list)-1]}}stop = true}if len(list) > 0 {t.idleConn[w.key] = list} else {delete(t.idleConn, w.key)}if stop {return delivered}}if t.idleConnWait == nil {t.idleConnWait = make(map[connectMethodKey]wantConnQueue)}q := t.idleConnWait[w.key]q.cleanFront()q.pushBack(w)t.idleConnWait[w.key] = qreturn false}
从这些代码可以看到,Go 的实现并没有按照 RFC 说的一条连接可以有多个 onGoging 的请求,而是一条连接只有一个正在处理中的请求,然后 Go 自己维护了一个连接池,连接池以 schema、方法加地址作为 key 进行维护。
上面代码中还有一个小细节,就是这里维护了一个 LRU 的数据结构,它是用在如果连接过多时,需要删除掉一些连接的情况下,会通过这个数据结构关闭掉最长空闲时间的连接。
归还连接池
从上面的发送请求逻辑中,我们可以看到连接是通过 func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) 获取的,并且从里面的实现中我们知道连接是放在 idleConn 这个连接池中的,那么既然有借,就必然有还,那这个连接又是什么时候还回去的呢?
首先从获取连接的代码附近我们可以看到 func (t *Transport) tryPutIdleConn(pconn *persistConn) error 这段代码,从实现上来看应该就是还的逻辑了,那么我们回溯一下哪里会调用这段代码即可知道还的代码在哪里了,除了一些异常处理情况之外,我觉得主要需要关注的地方有两个:
- 当处理请求出错的时候会还回去,这里的出错不是指网络出错(这种情况应该关闭连接),而是指一些内部状态的处理异常;
- 当请求完成之后并且response 的 Body 被读取完之后(这里读取完的标志就是 read 操作能获取到 EOF);
我这里比较关注第二个情况,所以我们来走读一下代码:
[root@liqiang.io]# cat transport.goselect {case bodyEOF := <-waitForBodyRead:replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle poolalive = alive &&bodyEOF &&!pc.sawEOF &&pc.wroteRequest() &&replaced && tryPutIdleConn(trace)if bodyEOF {eofc <- struct{}{}}case <-rc.req.Cancel:alive = falsepc.t.cancelRequest(rc.cancelKey, errRequestCanceled)case <-rc.req.Context().Done():alive = falsepc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())case <-pc.closech:alive = false}
从这里可以看到,当收到 bodyEOF 的信号时才可能将连接放回连接池,那么 bodyEOF 又是如何产生的呢?这里往前看可以看到一个包装的 Body 对象:
[root@liqiang.io]# cat transport.gobody := &bodyEOFSignal{body: resp.Body,earlyCloseFn: func() error {waitForBodyRead <- false<-eofc // will be closed by deferred call at the end of the functionreturn nil},fn: func(err error) error {isEOF := err == io.EOFwaitForBodyRead <- isEOFif isEOF {<-eofc // see comment above eofc declaration} else if err != nil {if cerr := pc.canceled(); cerr != nil {return cerr}}return err},}resp.Body = body
然后这个包装的对象就实现了 Read 的方法:
[root@liqiang.io]# cat transport.gofunc (es *bodyEOFSignal) Read(p []byte) (n int, err error) {es.mu.Lock()closed, rerr := es.closed, es.rerres.mu.Unlock()if closed {return 0, errReadOnClosedResBody}if rerr != nil {return 0, rerr}n, err = es.body.Read(p)if err != nil {es.mu.Lock()defer es.mu.Unlock()if es.rerr == nil {es.rerr = err}err = es.condfn(err)}return}
所以我们在自己的业务代码中 Read 的时候,实际上调用到的是这里,然后可以看到如果读到了错误之后会交给前面传递进来的 fn 函数处理,而这个 fn 的逻辑就是判断错误的类型,并且往 channel 里写入结果,然后就被前面提到的 select 收到,然后进行判断是否是 EOF 错误,如果是的话,就会尝试归还连接。
服务端
Go 的服务端实现和客户端相比是相对简单的,从 Goroutine 模型上来看,就是 Per Request Per Goroutine 的模型:
[root@liqiang.io]# cat server.gofunc (srv *Server) Serve(l net.Listener) error {for {rw, err := l.Accept()... ...connCtx := ctx... ...tempDelay = 0c := srv.newConn(rw)c.setState(c.rwc, StateNew, runHooks) // before Serve can returngo c.serve(connCtx)}}func (c *conn) serve(ctx context.Context) {if ra := c.rwc.RemoteAddr(); ra != nil {c.remoteAddr = ra.String()}ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr())var inFlightResponse *response... ...ctx, cancelCtx := context.WithCancel(ctx)c.cancelCtx = cancelCtxdefer cancelCtx()c.r = &connReader{conn: c}c.bufr = newBufioReader(c.r)c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)for {w, err := c.readRequest(ctx)if c.r.remain != c.server.initialReadLimitSize() {// If we read any bytes off the wire, we're active.c.setState(c.rwc, StateActive, runHooks)}... ...req := w.req... ...c.curReq.Store(w)... ...inFlightResponse = wserverHandler{c.server}.ServeHTTP(w, w.req)inFlightResponse = nilw.cancelCtx()if c.hijacked() {return}... ...w.finishRequest()c.rwc.SetWriteDeadline(time.Time{})c.setState(c.rwc, StateIdle, runHooks)c.curReq.Store(nil)if !w.conn.server.doKeepAlives() {return}if d := c.server.idleTimeout(); d != 0 {c.rwc.SetReadDeadline(time.Now().Add(d))} else {c.rwc.SetReadDeadline(time.Time{})}if _, err := c.bufr.Peek(4); err != nil {return}c.rwc.SetReadDeadline(time.Time{})}
从这段代码的实现中可以看到,服务端的连接也是复用的,同时也是一样的每个连接最多只有一个正在进行中的请求。但是,从代码中也可以看到,并不是所有的连接都是直接复用的(shouldReuseConnection),这里列举不能复用的场景:
- 连接过大或者客户端指定了
Connection: Close的场景; - 响应没有写完的场景(一般是写出错了);
- 连接出现了写错误;
- 连接的读请求有异常;
最后再补一下,实际上 http client 并没有真实从连接中读取 response 的 body,只有当我们 read 的时候才会继续读取,这也就是说,如果我们忘记关闭 Response 的 Body 的时候,就会出现内存泄露和连接泄露。
实践
代码看得差不多了,那么是时候用例子来验证一下我们的学习成果了,我已经将这些代码都放在一个专属的仓库了:https://github.com/liuliqiang/blog_codes/tree/master/golang/http/simple ,感兴趣的话可以自行下载尝试。
[root@liqiang.io]# cat main2.gofunc main() {for i := 0; i < 101; i++ {func() {resp, err := http.Get("https://gobyexample.com")if err != nil {panic(err)}defer resp.Body.Close()fmt.Println("Response status:", resp.Status)fmt.Println("Connection Header:", resp.Header.Get("Connection"))_, _ = io.ReadAll(resp.Body)}()time.Sleep(time.Millisecond * 500)}}
我们运行这段程序,然后打开另外一个终端查看连接数量,我们会发现只有有一条连接:
[root@liqiang.io]# ss -antp | grep "13.35.238"ESTAB 0 0 192.168.121.195:44664 13.35.238.63:443 users:(("main",pid=87455,fd=6))
所以这里有个有趣的事实就是我们多次使用 http.Get,实际上底层它是复用了一个 DefaultClient,所以它会复用长连接,因此默认情况下,Go 的 http 客户端是使用长连接的。
但是,如果你忘记 close response 的 body 了,那么可能情况就不一样了,例如这段代码你看看能不能找到什么问题:
[root@liqiang.io]# cat main.gofunc main() {for i := 0; i < 101; i++ {func() {resp, err := http.Get("https://gobyexample.com")if err != nil {panic(err)}defer resp.Body.Close()fmt.Println("Response status:", resp.Status)scanner := bufio.NewScanner(resp.Body)for i := 0; scanner.Scan() && i < 5; i++ {fmt.Println(scanner.Text())}if err := scanner.Err(); err != nil {panic(err)}}()time.Sleep(time.Millisecond * 500)}}
我们运行这段程序,然后再查看连接的情况就会发现每次都会新建一条连接:
[root@liqiang.io]# ss -antp | grep "13.35.238"TIME-WAIT 0 0 192.168.121.195:59538 13.35.238.114:443TIME-WAIT 0 0 192.168.121.195:52688 13.35.238.70:443TIME-WAIT 0 0 192.168.121.195:44890 13.35.238.63:443TIME-WAIT 0 0 192.168.121.195:54294 13.35.238.22:443TIME-WAIT 0 0 192.168.121.195:44916 13.35.238.63:443TIME-WAIT 0 0 192.168.121.195:54306 13.35.238.22:443TIME-WAIT 0 0 192.168.121.195:44906 13.35.238.63:443
那么问题是什么呢?你重复一下上面的代码分析就可以很快发现问题所在了,那就是我们说归还长连接的场景,第二个就提到了只有 response 的 Body 被完全读取完之后,遇到了 EOF 之后才会归还,所以这里的问题就是没有完全读取完 Body 之后就 Close 了,所以连接并不能复用,这也给我们写代码一些提醒,即使我们不需要全部的 Body 也最好读取完,这样才可以复用连接。
中间件
在日常的使用中,我们大多数情况下都不是直接对外暴露 Go 的 HTTP 客户端,往往都是中间会加一些中间件作为负载均衡,例如 Nginx 之类的,那么当我们使用长连接时,也需要考虑到中间件的能力,这里就以 Nginx 为例做一个介绍。
Nginx
Nginx 的 keep alive 参数需要分为两部分来讨论,分别是 Nginx <-> Upstream 和 Client <-> Nginx 这两段链路,在 Nginx 的文档中,关于 Keepalive 的参数有这么几个:
| 链路 | 配置 | 语法 | 默认值 | 描述 |
|---|---|---|---|---|
| 客户端 | keepalive_disable |
keepalive_disable none | msie | safari ...; |
keepalive_disable msie6; |
针对某些浏览器禁用长连接 |
| 客户端 | keepalive_requests |
keepalive_requests number; |
keepalive_requests 1000; |
一条连接最多可以承载多少次的请求 |
| 客户端 | keepalive_time |
keepalive_time time; |
keepalive_time 1h; |
一条长连接的最长生命周期是多久,到达这个时间之后这条连接将不接受新的请求并关闭 |
| 客户端 | keepalive_timeout |
keepalive_timeout timeout [header_timeout]; |
keepalive_timeout 75s; |
一条长连接的最长空闲时间,有些浏览器支持 Keep-Alive: timeout=time 的用法 |
| 服务端 | keepalive |
keepalive connections; |
- |
每个 Worker 线程最多可以和 Upstream 建立多少条连接 |
| 服务端 | keepalive_requests |
keepalive_requests number; |
keepalive_requests 1000; |
每条连接最多可以处理多少个请求 |
| 服务端 | keepalive_time |
keepalive_time time; |
keepalive_time 1h; |
每条连接的最长生命周期是多久 |
| 服务端 | keepalive_timeout |
keepalive_timeout timeout; |
keepalive_timeout 60s; |
连接的空闲时间 |
Ref
- HTTP 协议比较重要的一些 RFCs
- HTTP 1.0:https://www.rfc-editor.org/rfc/rfc1945
- HTTP 1.1 的标准提案:https://www.rfc-editor.org/rfc/rfc2068
- HTTP 1.1 的标准草案:https://www.rfc-editor.org/rfc/rfc2616
- HTTP 2.0:https://datatracker.ietf.org/doc/html/rfc7540
- HTTP 3.0:https://datatracker.ietf.org/doc/html/rfc9114
- Nginx