ReverseProxy是golang自带的简单网络Daili工具,仅适合自己测试用,不过麻雀虽小五脏俱全,功能还是挺多的。今天我们一起分析下这个工具的源码。
功能支持
- 支持自定义修改响应内容
- 支持连接池
- 支持错误信息自定义处理
- 支持 websocket 服务
- 支持自定义负载均衡
- 支持 https Daili
- 支持 url 重写
简单使用
最简单使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
//Daili服务器ip
addr := "127.0.0.1:4001"
//后端真实服务器ip
rs1 := "http://127.0.0.1:2000"
url1, err1 := url.Parse(rs1)
if err1 != nil {
log.Println(er
r1)
}
//简单实例化
proxy := httputil.NewSingleHostReverseProxy(url1)
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
|
这样就开启了一个Daili服务器,通过房屋4001端口,就能无缝转发到真实的2000端口上的服务器了。
url重写
reverseproxy默认为我们提供了个简单的包装函数 NewSingleHostReverseProxy
它在该函数里实现了director逻辑,director逻辑是实现请求转发的核心逻辑。我们可以看下它实现的源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
//url重写
director := func(req *http.Request) {
//url_rewrite
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2003/base/abc ??
//127.0.0.1:2002/dir/abc ==> 127.0.0.1:2002/abc
//127.0.0.1:2002/abc ==> 127.0.0.1:2003/base/abc
re, _ := regexp.Compile("^/dir(.*)")
req.URL.Path = re.ReplaceAllString(req.URL.Path, "$1")
//协议重写 主机赋值
req.URL.Scheme = url1.Scheme
req.URL.Host = url1.Host
//target.Path : /base
//req.URL.Path : /dir
req.URL.Path = singleJoiningSlash(url1.Path, req.URL.Path)
if targetQuery == "" || req.URL.RawQuery == "" {
req.URL.RawQuery = targetQuery + req.URL.RawQuery
} else {
req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
}
if _, ok := req.Header["User-Agent"]; !ok {
req.Header.Set("User-Agent", "")
}
}
|
更改返回内容
ReverseProxy还支持我们修改返回内容,我们可以通过ModfyResponse 属性传递一个匿名方法,该方法里可以编写我们需要修改返回内容的逻辑。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
//更改内容 需要传递一个匿名函数
modifyFunc := func(resp *http.Response) error {
if resp.StatusCode != 200 {
oldPayload, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
newPayload := []byte("ProxyStatusCode error:" + string(oldPayload))
resp.Body = ioutil.NopCloser(bytes.NewBuffer(newPayload))
resp.ContentLength = int64(len(newPayload))
resp.Header.Set("Content-Length", strconv.FormatInt(int64(len(newPayload)), 10))
}
return nil
}
proxy.ModifyResponse = modifyFunc
|
我们也可以不使用它提供的NewSingleHostReverseProxy
方法,而是自己实现相应的逻辑,最后我们通过实例化一个ReverseProxy
对象并注入到http服务器内。
1
2
3
|
proxy := &httputil.ReverseProxy{Director: director, ModifyResponse: modifyFunc}
log.Println("Starting httpserver at " + addr)
log.Fatal(http.ListenAndServe(addr, proxy))
|
请求转发核心逻辑
大家看过golang http包源码的都知道,传入http包的handler主要是实现了 ServeHTTP
这个方法。那么我们一起看下ReverseProxy 是怎么实现ServeHTTP
的。
拿到连接池
第一步先验证结构体内是否有传入连接池,如果没有就用自己默认的连接池
1
2
3
4
|
transport := p.Transport
if transport == nil {
transport = http.DefaultTransport
}
|
循环判断请求是否终止
接下来就是验证该请求是否终止,我们拿到当前请求的http.ResponseWriter
然后向上转型为http.CloseNotifier
接口。接着我们拿到 cn.CloseNotify()
然后就一直开一个协程,判断notifyChan 是否有消息,如果有消息就直接出发cancel() 方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
ctx := req.Context()
if cn, ok := rw.(http.CloseNotifier); ok {
var cancel context.CancelFunc
ctx, cancel = context.WithCancel(ctx)
defer cancel()
notifyChan := cn.CloseNotify()
go func() {
select {
case <-notifyChan:
cancel()
case <-ctx.Done():
}
}()
}
|
深度拷贝ctx信息
接下来会用上游的ctx做个深度拷贝,然后对ContentLength
、Body
、 Header
做一些校验
1
2
3
4
5
6
7
8
9
10
|
outreq := req.Clone(ctx)
if req.ContentLength == 0 {
outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
}
if outreq.Body != nil {
defer outreq.Body.Close()
}
if outreq.Header == nil {
outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
}
|
修改request
调用我们设置的Director方法修改最终request, 同时把请求头的Close置为false。也就是说这个连接是可以被复用的。
1
2
|
p.Director(outreq)
outreq.Close = false
|
Upgrade头特殊处理
从header里面判断是否有Upgrade 如果有的话就返回Upgrade,否则返回空,接着把header头内的Connection信息清除掉
1
2
|
reqUpType := upgradeType(outreq.Header)
removeConnectionHeaders(outreq.Header)
|
删除请求里面的逐跳标题
因为我们需要一个持久的连接,而不管客户端发送给我们什么,但是针对Te,和trailers仍然做保留
1
2
3
4
5
6
|
for _, h := range hopHeaders {
outreq.Header.Del(h)
}
if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
outreq.Header.Set("Te", "trailers")
}
|
判断请求升级
如果upgrade不为空,那么就设置进去
1
2
3
4
|
if reqUpType != "" {
outreq.Header.Set("Connection", "Upgrade")
outreq.Header.Set("Upgrade", reqUpType)
}
|
追加clientIP信息
通过req请求头里面的RemoteAddr信息,追加到请求头的X-Forwarded-For信息中。
1
2
3
4
5
6
7
8
9
10
|
if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
prior, ok := outreq.Header["X-Forwarded-For"]
omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
if len(prior) > 0 {
clientIP = strings.Join(prior, ", ") + ", " + clientIP
}
if !omit {
outreq.Header.Set("X-Forwarded-For", clientIP)
}
}
|
下游请求数据
通过连接池,直接请求下游数据。如果请求失败,抛出一个ErrorHandler
方法并执行。
1
2
3
4
5
|
res, err := transport.RoundTrip(outreq)
if err != nil {
p.getErrorHandler()(rw, outreq, err)
return
}
|
处理请求升级
如果状态码是101,那么表示要进行请求升级(websocket,h2c,etc)接着执行modifyResponse
这时候使用者可以针对101情况做一些特殊的处理。(有兴趣可以看下handleUpgradeResponse
方法里的代码,再函数体内部劫持到原始的tcp连接,并且开2个协程持续交换数据直到一方关闭)。最后执行特殊的请求升级的返回。
1
2
3
4
5
6
7
|
if res.StatusCode == http.StatusSwitchingProtocols {
if !p.modifyResponse(rw, res, outreq) {
return
}
p.handleUpgradeResponse(rw, outreq, res)
return
}
|
移除一些无用的header头
1
2
3
4
|
removeConnectionHeaders(res.Header)
for _, h := range hopHeaders {
res.Header.Del(h)
}
|
修改返回内容
这一步就是调用我们之前定义的modifyResponse 函数了。并且拷贝下游返回的header头信息。另外也会处理一些Trailer头部信息。
1
2
3
4
|
if !p.modifyResponse(rw, res, outreq) {
return
}
copyHeader(rw.Header(), res.Header) }
|
写入状态码和刷新response
接下来就写入返回的状态码,和周期性刷新内容daoresponse中。
1
2
|
rw.WriteHeader(res.StatusCode)
err = p.copyResponse(rw, res.Body, p.flushInterval(res))
|
关闭body,处理Trailer信息
等数据全部拷贝完成后会关闭body,最后处理下trailer信息。
1
2
3
4
5
6
7
8
|
res.Body.Close()
...
for k, vv := range res.Trailer {
k = http.TrailerPrefix + k
for _, v := range vv {
rw.Header().Add(k, v)
}
}
|