深入理解 IPFS - DHT 网络(1)

在前面的文章中,我们说过,DHT(分布式哈希表)是 IPFS 一个重要的组成部分,接下来我们会分别从应用,原理两个方面来分析 DHT。

IPFS 的网络层源码在 libp2p 中,本文用 go-libp2p 做分析。

我们假设一个场景应用,有两个节点名字分别叫 earth 和 mars,然后他们分别加入了 DHT 网络,接下来他们需要找到对方,并能够互相发送消息。

(一)初始化节点

首先我们需要初始化节点

1
2
3
4
ctx := context.Background()
listenAddresses, _ := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/8004")
host, _ := libp2p.New(ctx, libp2p.ListenAddrs([]multiaddr.Multiaddr{listenAddresses}...))
fmt.Println("host->", host.ID())

其实初始化就一行 libp2p.New(),可自定义参数,比如上面我们定义了监听地址和端口 /ip4/127.0.0.1/tcp/8004, 等同于 127.0.0.1:8004 不过自解释性更强。

再举个例子,/ip4/1.2.3.4/tcp/4321/p2p/QmcEPrat8ShnCph8WjkREzt5CPXF2RwhYxYBALDcLC1iV6 结尾有个 PeerId QmcEPrat8ShnCph8WjkREzt5CPXF2RwhYxYBALDcLC1iV6

那么不仅仅可以通过 ip + port 寻址,通过 PeerId 也可以直接定位到节点。

初始化后我们生成了一个节点,节点 ID 以 btc58encode 编码:QmcEPrat8ShnCph8WjkREzt5CPXF2RwhYxYBALDcLC1iV6,也就是上文的 PeerID。

接下来我们需要给 8004 监听的端口配置 handler

1
2
3
4
5
func handleStream(stream network.Stream) {
log.Println("Got a new stream!", stream)
}

host.SetStreamHandler(protocol.ID('/chat/1.0'), handleStream)

handleStream 这个函数的逻辑跟普通的 socket 编程一样,拿到 stream 往里读写数据就行,这里不细讲。

(二)加入 DHT 网络

节点建立完成后,接下来就需要加入 DHT 网络了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 加入 dht 网络
kademliaDHT, err := dht.New(ctx, host)
if err != nil {
panic(err)
}
// 设置状态为 bootstrap 模式
if err = kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}

var wg sync.WaitGroup
// connect 到 bootstrap 节点
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := host.Connect(ctx, *peerinfo); err !=nil {
log.Println(err)
} else {
log.Println("Connection established with bootstrap node:", *peerinfo)
}
}()
}
wg.Wait()

不管是比特币,以太坊,还是早前的 BT 网络,任何新节点加入网络都需要种子 (bootstrap) 节点作为起点,然后扩展自己的路由表,完成初始化动作。

(三)广而告之
1
2
3
4
5
6
// 广而告之
nodeName := "mars"
log.Println("Announcing ourselves...")
routingDiscovery := discovery.NewRoutingDiscovery(kademliaDHT)
discovery.Advertise(ctx, routingDiscovery, nodeName)
log.Println("Successfully announced!")

回到我们开头的场景,假设我们初始化一个节点名叫 mars,我们加入 DHT 网络后,需要让所有节点都知道我是 mars 节点 。

这里先简单介绍下,原理下篇文章再分析。nodeName 其实最后被转换为一个内容的 hash,节点通过 Advertise 这个方法告诉其他节点,它拥有这个 hash,然后其他节点就会记住,更新自己的路由表。等到有请求去做这个内容的寻址时,就会告诉对方谁有这个内容,或者谁和这个内容更接近。

(四)寻找节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
findName := "earth"
log.Println("Searching for other peers...")
peerChan, err := routingDiscovery.FindPeers(ctx, findName)
if err != nil {
panic(err)
}
for peer := range peerChan {
fmt.Println("found peer", peer.ID)
if peer.ID == host.ID() {
continue
}

stream, _ := host.NewStream(ctx, peer.ID, protocol.ID("/chat/1.0.0"))
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
go readData(rw)
go writeData(rw)
}

FindPeers 内在实现逻辑其实是找 earth 这个 hash 的地址,找到就和他建立一个双工的连接,正好和前面 handleStream 实现了服务端和客户端的通信。

(五)演示

(六) 完善

上面的例子有个问题是,谁都可以宣称自己是 mars 节点,通信双方没法信任,所以这种模式适用聊天室的 channel 场景。通过将内容寻址改成节点寻址,就可找到可信通信方,当然前提是你知道你要通信的节点 ID。

代码如下:

1
2
findID, err := peer.Decode("QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dw1")
kademliaDHT.FindPeer(ctx, findID)