一般情况下,如果要实现聊天即时通讯,都要借助公网服务器作为中继节点对消息进行转发。
例如用户A和用户B进行即时通讯的具体步骤如下所示
首先用户A和B需要和公网服务器建立长连接
ClientA ====> (建立长连接) ===> 公网服务器
`ClientB ====> (建立长连接) ===> 公网服务器
紧接着用户A如果想发送消息给用户B,就会采用转发的形式
ClientA => 公网服务器(消息转发) => ClientB
但是我们从中可以看到,如果用户之间进行的是语音视频通话,所有流量将会从中继服务器中经过。这将会给中继服务器带来巨大挑战。
那么是否可以存在一种方式可以抛除中继服务器的存在,让用户A和用户B进行直连通信呢?
我们知道用户A和用户B都在各自的内网下,双方都不知道彼此的地址,那么如何进行通信成了问题。
二、P2P 通信与NAT类型
紧接上文,其实用户A在给中继服务器发送长连接请求后,中继服务器就能获取到运营商给用户A开放的公网IP和端口。
那么如果用户B知道了用户A所在的公网IP和端口,是否就能脱离中继服务器的限制,直接发送请求给用户A所在的IP和端口呢?
答案是,在一定情况下是可以的。这要求用户A所在的 NAT 是完全锥形。
NAT 的作用是会将内网主机的IP地址映射为一个公网IP,由于 IPV4 地址池不够用的情况下,运营商不会给每个接入互联网的用户分配公网 IP ,而是多个用户,或者一整个小区公用一个公网 IP 出口。
当用户发送网络请求时, NAT 会将用户的内网 IP 转换为公网 IP,并且分配一个公网端口。当用户的请求结束,一段时间后该这些公共资源将会被回收。
Server S1 Server S218.181.0.31:1235 138.76.29.7:1235 | | | | +----------------------+----------------------+ | ^ Session 1 (A-S1) ^ | ^ Session 2 (A-S2) ^ | 18.181.0.31:1235 | | | 138.76.29.7:1235 | v 155.99.25.11:62000 v | v 155.99.25.11:62000 v | Cone NAT 155.99.25.11 | ^ Session 1 (A-S1) ^ | ^ Session 2 (A-S2) ^ | 18.181.0.31:1235 | | | 138.76.29.7:1235 | v 10.0.0.1:1234 v | v 10.0.0.1:1234 v | 用户内网 10.0.0.1:1234
基于这种特性,NAT一般情况被分为 4 类
- 完全圆锥型NAT (Full Cone NAT)把一个来自内部IP地址和端口的所有请求,始终映射到相同的外网IP地址和端口;同时,任意外部主机向该映射的外网IP地址和端口发送报文,都可以实现和内网主机进行通信,就像一个向外开口的圆锥形一样,故得名。
- 地址限制式锥形NAT(Address Restricted Cone NAT)地址限制式圆锥形NAT同样把一个来自内部IP地址和端口的所有请求,始终映射到相同的外网IP地址和端口;与完全圆锥型NAT不同的是,当内网主机向某公网主机发送过报文后,只有该公网主机才能向内网主机发送报文,故得名。相比完全锥形,增加了地址限制,也就是IP受限,而端口不受限。
- 端口限制式锥形NAT(Port Restricted Cone NAT)端口限制式圆锥形NAT更加严格,在上述条件下,只有该公网主机该端口才能向内网主机发送报文,故得名。相比地址限制锥形又增加了端口限制,也就是说IP、端口都受限。
- 对称式NAT(Symmetric NAT)对称式NAT把内网IP和端口到相同目的地址和端口的所有请求,都映射到同一个公网地址和端口;同一个内网主机,用相同的内网IP和端口向另外一个目的地址发送报文,则会用不同的映射(比如映射到不同的端口)。和端口限制式NAT不同的是,端口限制式NAT是所有请求映射到相同的公网IP地址和端口,而对称式NAT是为不同的请求建立不同的映射。它具有端口受限锥型的受限特性,内部地址每一次请求一个特定的外部地址,都可能会绑定到一个新的端口号。也就是请求不同的外部地址映射的端口号是可能不同的。这种类型基本上就告别 P2P 了。
一般情况下,家用 NAT 是NAT3,也就是 端口限制式锥形NAT。我们基于这一特性可以尝试让两台主机进行内网端对端直连。
请注意,P2P通信不意味着全程不需要服务器的介入。服务器的介入只是为了让双方节点都获取到各自穿透的公网 IP和端口,实现的具体流程请方法下图。
请注意这里使用到了端口复用技术。因为我们的端口不仅要监听一个服务,并且这个端口还能进行复用发送网络请求。
具体代码示例如下:
代码我把它托管到了 Github 上,并且有完整说明,链接如下
https://github.com/xhyonline/p2p-demo
server.go
代码其实很简单,server.go 只做一件事,交换两个内网节点临时生成的公网 IP 和端口
package mainimport ( "encoding/json" "fmt" "github.com/go-basic/uuid" "github.com/libp2p/go-reuseport" "net" "time")type Client struct { UID string Conn net.Conn Address string}type Handler struct { // 服务端句柄 Listener net.Listener // 客户端句柄池 ClientPool map[string]*Client}func (s *Handler) Handle() { for { conn, err := s.Listener.Accept() if err != nil { fmt.Println("获取连接句柄失败", err.Error()) continue } id := uuid.New() s.ClientPool[id] = &Client{ UID: id, Conn: conn, Address: conn.RemoteAddr().String(), } fmt.Println("一个客户端连接进去了,他的公网IP是", conn.RemoteAddr().String()) // 暂时只接受两个客户端,多余的不处理 if len(s.ClientPool) == 2 { // 交换双方的公网地址 s.ExchangeAddress() break } }}// ExchangeAddress 交换地址func (s *Handler) ExchangeAddress() { for uid, client := range s.ClientPool { for id, c := range s.ClientPool { // 自己不交换 if uid == id { continue } var data = make(map[string]string) data["dst_uid"] = client.UID // 对方的 UID data["address"] = client.Address // 对方的公网地址 body, _ := json.Marshal(data) if _, err := c.Conn.Write(body); err != nil { fmt.Println("交换地址时出现了错误", err.Error()) } } }}func main() { address := fmt.Sprintf("0.0.0.0:6999") listener, err := reuseport.Listen("tcp", address) if err != nil { panic("服务端监听失败" + err.Error()) } h := &Handler{ Listener: listener, ClientPool: make(map[string]*Client)} // 监听内网节点连接,交换彼此的公网 IP 和端口 h.Handle() time.Sleep(time.Hour) // 防止主线程退出}
client.go
客户端得到对方的临时生成的公网IP和端口后,尝试进行连接,并不停发送数据
package mainimport ( "crypto/rand" "encoding/json" "fmt" "github.com/libp2p/go-reuseport" "math" "math/big" "net" "time")type Handler struct { // 中继服务器的连接句柄 ServerConn net.Conn // p2p 连接 P2PConn net.Conn // 端口复用 LocalPort int}// WaitNotify 等待远程服务器发送通知告知我们另一个用户的公网IPfunc (s *Handler) WaitNotify() { buffer := make([]byte, 1024) n, err := s.ServerConn.Read(buffer) if err != nil { panic("从服务器获取用户地址失败" + err.Error()) } data := make(map[string]string) if err := json.Unmarshal(buffer[:n], &data); err != nil { panic("获取用户信息失败" + err.Error()) } fmt.Println("客户端获取到了对方的地址:", data["address"]) // 断开服务器连接 defer s.ServerConn.Close() // 请求用户的临时公网 IP go s.DailP2PAndSayHello(data["address"], data["dst_uid"])}// DailP2PAndSayHello 连接对方临时的公网地址,并且不停的发送数据func (s *Handler) DailP2PAndSayHello(address, uid string) { var errCount = 1 var conn net.Conn var err error for { // 重试三次 if errCount > 3 { break } time.Sleep(time.Second) conn, err = reuseport.Dial("tcp", fmt.Sprintf(":%d", s.LocalPort), address) if err != nil { fmt.Println("请求第", errCount, "次地址失败,用户地址:", address) errCount++ continue } break } if errCount > 3 { panic("客户端连接失败") } s.P2PConn = conn go s.P2PRead() go s.P2PWrite()}// P2PRead 读取 P2P 节点的数据func (s *Handler) P2PRead() { for { buffer := make([]byte, 1024) n, err := s.P2PConn.Read(buffer) if err != nil { fmt.Println("读取失败", err.Error()) time.Sleep(time.Second) continue } body := string(buffer[:n]) fmt.Println("读取到的内容是:", body) fmt.Println("来自地址", s.P2PConn.RemoteAddr()) fmt.Println("=============") }}// P2PWrite 向远程 P2P 节点写入数据func (s *Handler) P2PWrite() { for { if _, err := s.P2PConn.Write([]byte("你好呀~")); err != nil { fmt.Println("客户端写入错误") } time.Sleep(time.Second) }}func main() { // 指定本地端口 localPort := RandPort(10000, 50000) // 向 P2P 转发服务器注册自己的临时生成的公网 IP (请注意,Dial 这里拨号指定了自己临时生成的本地端口) serverConn, err := reuseport.Dial("tcp", fmt.Sprintf(":%d", localPort), "你自己的公网服务器IP:6999") if err != nil { panic("请求远程服务器失败" + err.Error()) } h := &Handler{ ServerConn: serverConn, LocalPort: int(localPort)} h.WaitNotify() time.Sleep(time.Hour)}// RandPort 生成区间范围内的随机端口func RandPort(min, max int64) int64 { if min > max { panic("the min is greater than max!") } if min < 0 { f64Min := math.Abs(float64(min)) i64Min := int64(f64Min) result, _ := rand.Int(rand.Reader, big.NewInt(max+1+i64Min)) return result.Int64() - i64Min } result, _ := rand.Int(rand.Reader, big.NewInt(max-min+1)) return min + result.Int64()}