Rust 网络编程

原书英文版 地址

Building

git clone https://github.com/mjyi/network-programming-with-rust-zh.git
cd network-programming-with-rust-zh
cargo install mdbook
mdbook serve --open

示例代码

code 目录下。代码与原代码有更新。

原代码地址:PacktPublishing/Network-Programming-with-Rust

测试运行环境:

  • Platform: macOS
  • Rust: 1.34 stable

第一章: C/S 网络介绍

本书介绍了在 Rust 中编写网络应用程序。标题提出了两个问题: 我们为什么要关于网络?为什么要使用 Rust 进行网络编程? 我们试图在本章回答第一个问题,我们将在后续章节中介绍 Rust 和 Rust 网络编程。 首先,在本章中,我们将从一些历史开始,并尝试理解网络架构在过去的一百年中是如何演变的。 在后面的章节,我们将了解现代网络如何分层和如何寻址。然后,我们将描述网络中使用的常见服务模型。我们将在最后一节总结 Linux 公开的与网络相关的编程接口。 请注意,本书故意忽略其他操作系统中的网络编程,并且为了简单起见仅关注 Linux 。虽然 Rust 编译器与平台无关,但在某些情况下,与 Linux 相比,其他平台的某些内容会有所不同。随着我们的进步,我们将指出这些差异。

在本章中,我们将介绍以下主题:

  • 网络历史:网络为何以及是如何投入使用,网络是如何发展的
  • 网络分层:分层和封装的工作原理
  • 寻址:如何在互联网上唯一标识网络和单个主机
  • IP 路由是如何工作的
  • DNS 的工作原理
  • Linux 网络编程接口

网络简史

现代互联网彻底改变了我们彼此之间的沟通方式。然后,它在维多利亚时代的赴却是很简陋的。 互联网最早的前身之一是电报,早在 1850 年就开始运作了。在那之前需要 10 天才能通过海路从欧洲向北美洲发送信息。电报网把它减少到了 17 小时。到了 19 世纪末,电报是一种非常成功的通信技术,在再次世界大战中被广泛使用。从那时起,人们开始构建计算机以帮助破解敌人的代码。 与我们现代的移动电话和笔记本不同,这些计算机通常非常庞大,需要专门的环境才能运行。当操作员坐在终端上时,有必要将它们放在特殊位置。终端需要在短距离内与计算机通信。许多局域网实现了这一点,其中最突出的就是以太网。随着时间的推移,这些网络不断发展,到了 20 世纪 60 年代,这些网络中的一些网络相互连接,形成了一个更大的网络。Advanced Research Projects Agency Network (ARPANET) 成立于 1969 年,它成为第一个类似于现代互联网的互联网络。 最终协议被标准化,以便网络可以无缝地相互通信。所有这些网络后来合并成了今天的互联网。

由于网络在世界各地的孤岛中演变,因此它们通常根据地理位置进行组织。局域网 Local Area Network(LAN) 是一小部分邻近的主机的集合,如建筑物或小区域。广域网 Wide Area Network(WAN) 是连接多个邻域的网络;全球互联网位于层次结构的顶端。下图显示了1977年的ARPANET地图。此地图中的每个节点都是一台计算机(服务器,按照今天的说法)。 其中大部分位于斯坦福大学等大学或劳伦斯伯克利等国家实验室。

在网络中,Request For Comment(RFC)是描述提议的系统应如何工作的文档。这些是标准化协议或系统的第一步。互联网这个术语最初是在 RFC 675 中使用的,它提出了TCP的标准。

Arpanet_logical_map,_march_1977

网络分层

计算机科学通常专注于将问题折分成较小的问题,希望独立出可以被单独解决的组件。完成后,所需要的只是一组关于这些组件应如何通信以获得更大问题的解决方案的规则。这组规则以及预先商定的数据格式称为 *协议(protocol)。因此,这些层中的每一个都运行一个或多个协议,形成协议栈。网络被分成几层,每一层具有其特定的目的。在网络的早期,不同的人以不同的方式实现他们的网络。在构思互联网时,需要使这些网络无缝通信。 由于它们的构造不同,结果很难。

显然需要就标准协议和接口达成一致,以使互联网正常工作。 标准化网络协议的第一次尝试是在 1977 年,这产生了 OSI 模型。 该模型具有下面几层:

  • 物理层(Physical layer):它定义了数据在物理介质中的电气和物理特性的传输方式。这可以是有线,光纤或无线介质。
  • 数据链路层(Data link layer):它定义了如何在物理介质连接的两个节点之间传输数据。该层处理试图同时访问线路的多方之间的优先级。该层的另一个重要功能是在发送的比特中包括一些冗余,以最小化传输期间的错误。这被称为编码。
  • 网络层(Network layer):它定义了如何在网络之间传输数据包(由多个数据单元组成)。因此,该层需要定义如何唯一地标识主机和网络。
  • 传输层(Transport layer):它定义了可靠地向主机(在相同或不同网络中)传递可变长度消息的机制。该层定义了接收器可以监听的数据包流。
  • 会话层(Session layer):它定义了在主机上运行的应用程序应该如何通信。该层需要区分在同一主机上运行的应用程序并将数据包传递给它们。
  • 表示层(Presentation layer):它定义了数据表示的通用格式,以便不同的应用程序可以无缝链接。在某些情况下,该层还负责安全性。
  • 应用程序层(Application layer):它定义了以用户为中心的应用程序应如何发送和接收数据。如使用 HTTP(应用程序层协议)与 Web 服务器通信的 Web 浏览器(以用户为中心的应用程序)。

下图是这种模型的直观展示。它同时也展示了两个垂直分类,即运行网络堆栈的主机和物理介质(包括电线和网络设备)。每个层都有自己的数据单元,它所处理的信息的表示,并且由于每个层都封装了它下面的数据单元,因此数据单元也被封装。许多位形成一个帧,多个帧形成一个数据包,依此类推,到顶部:

The OSI model and its layers

虽然 OSI 正致力于标准化该模型,但是 Defense Advanced Research Projects Agency(DARPA) 提出了更简单的 TCP/IP 模型(也称为 IP (互联网协议)套件)的完整实现。此模型具有以下层级,从最接近物理介质到最远:

  • 硬件接口层:这是 OSI 模型的第一层和第二层的组合。该层负责管理媒体访问控制,处理比特的传输和接收,重传和编码(网络上的一些文本区分硬件接口层和链路层。这导致五层模型而不是四层。 但实际上很重要。)
  • IP层:该层对应于 OSI 堆栈的第三层。因此,该层负责两个主要任务:寻址主机和网络,以便可以唯一地识别它们并给出源和目标地址,并计算给定一系列约束(路由)的路径之间的路径。
  • 传输层:该层对应于OSI堆栈的第四层。该层将原始数据包转换为数据包流,并提供一些保证:按顺序传送(对于TCP)和随机排序传送(对于UDP)。
  • 应用程序层:该层组合OSI堆栈的五到七层,负责识别进程,数据格式化以及与所有用户级应用程序的接口。

请注意,当我们从一个层移动到另一个层时,特定层处理的定义会发生变化。硬件接口层处理主机发送的比特和字节的集合,IP 层处理数据包(主机以特定格式发送的多个字节的集合),传输层将来自主机上给定进程的数据包聚集在一起到另一个主机上的另一个进程以形成一个段(用于TCP)或数据报(用于UDP),并且应用层构造来自底层流的应用程序特定表示。对于这些层中的每一层,它们处理的数据的表示称为该层的协议数据单元(PDU)。作为此分层的结果,当在主机上运行的进程想要将数据发送到另一个主机时,必须将数据分成单独的块。当块从一层移动到另一层时,每层向块添加一个头(有时是一个尾部),形成该层的PDU。此过程称为封装(encapsulation)。因此,每个层向其上方的层提供一组服务,以协议的形式指定。

现代互联网呈现出一种地理层次结构。想象一下由许多 互联网服务提供商(ISP) 提供服务的许多家庭。这些家庭中的每一个都在 LAN 中(通过以太网,或更常见的是 Wi-Fi)。 ISP连接 WAN 中的许多此类 LAN。每个 ISP 都有一个或多个 WAN ,它们连接起来形成自己的网络。这些由单个业务实体控制的跨越城市的大型网络称为 管理系统(AS)。多个 ISP 之间的路由通常比常规IP路由更复杂,因为它们必须考虑交易协议等事项。这由边界网关协议(BGP) 等专用协议处理。

如前所述,最早和最成功的网络技术之一是以太网。它于 1974 年首次推出,由于其低成本和相对易于维护,它很快成为局域网和广域网的主流技术。 以太网是一种共享媒体协议,所有主机必须使用相同的物理介质来发送和接收帧。帧将传送到所有主机,这将检查目标MAC地址(这些地址将在下一节中描述)是否与其自己的地址匹配。如果是,则接受该帧,否则丢弃该帧。 由于物理介质在任何给定时刻只能携带一个信号,因此帧可能在传输过程中发生碰撞。如果确实发生了这种情况,发送方可以通过在发送帧时感知来自其他主机的传输来感知冲突。然后它中止传输并发送一个阻塞信号,让其他主机知道冲突。

该方案被称为 带有冲突检测的载波侦听多路访问(CSMA/CD)。 以太网的一个问题是它的范围相对较短。根据所使用的物理布线技术,以太网段的最大长度在 100 米到 500 米之间变化。因此,必须连接多个段以形成更大的网络。最常见的方法是在两个相邻的以太网段之间使用第二层交换机。 这些交换机的每个端口形成不同的冲突域,从而降低了冲突的总体概率。 这些交换机还可以监控流量,以了解哪些 MAC 地址在哪些端口上,这样最终,它们将仅在该端口上发送该 MAC 地址的帧(称为学习交换机)。 在现代家庭中,与以太网相比,Wi-Fi 通常是主要的 LAN 技术。

网络寻址

我们已经看到为什么必须唯一地识别主机和网络以便能够可靠地传递数据包。根据规模,主要有三种方式可以做到这一点;我们将在本节讨论全部三种。IP 路由的端到端过程将在下一节中讨论。一个值得注意的有趣的事是,对于这些寻址模式中的每一种,保留一个或者多个地址用于特殊用途。通常,这些是以知模式打开或关闭的已知位组标记的:

  • 以太网地址(Ethernet address): 这也称为 媒体访问控制 Media Access Control(MAC) 地址。 它是分配给网络设备(通常存储在卡上)的 48 位长的唯一标识符,用于在网段中识别它。通常,这些是由网卡制造商编程的,但所有现代操作系统都允许人们对其进行修改。写入以太网地址的标准方法是六组,每组两个十六进制数字(01-23-45-67-89-ab-cd-ef)。另一种常见方法是使用冒号分隔数字(01:23:45:67:89:ab:cd:ef)。保留一些特殊的比特序列用于寻址特殊情况:发送方可以通过将第一个八位字节的最低有效位设置为1来请求该段中的所有主机应该接收以太网帧。这称为 多播 。如果该特定位设置为 0,则帧应仅传送到一个接收器。今天,它们被广泛用于 以太网 和 Wi-Fi 。
  • IP 地址: 这是分配给 IP 网络中的每个设备的地址。原始 IP 地址标准(IPv4)在 1980 年定义了 32 位地址。 然而,到 1995 年显然互联网上的可用地址总数不足以覆盖所有设备。这导致了 IPV6 的发展,将地址空间扩展到 128 位。处理一级 IP 地址的标准方法是使用 CIDR 表示法,例如 192.168.100.1/26(IPv4)。斜杠后面的十进制数计算网络掩码中前导 1 的数量。因此,在这种特定情况下,网络中有 2^(32-26) = 64 个地址,从192.168.100.0 开始到 192.168.100.63 。互联网号码分配机构 Internet Assigned Numbers Authority(IANA) 为组织分配可公共路由的 IP 地址块。许多 IP v4 和 v6 地址被保留用于各种目的,例如在专用网络中寻址等。在家庭网络(将始终使用特殊的私有范围地址)中,这些由 Wi-Fi 路由器通过 动态主机配置协议 Dynamic Host Configuration Protocol(DHCP) 分配。
  • 自治系统编号 Autonomous system number: 这是一个 32 位数字,用于唯一标识自治系统。 与 IP 地址一样,这些由 IANA 分配和维护。

除此之外,主机之间的通信通常使用端口号来区分进程。当 OS 为进程分配特定端口时,它会更新其进程标识符和端口号之间映射的数据库。 因此,当它在该端口上接收传入数据包时,它知将这些数据包传递到哪个进程。 如果该进程在此时退出,操作系统将丢弃数据包,如果是 TCP , 则启动连接关闭。在后面的部分中,我们将看到 TCP 在实践中是如何工作的。

0 到 1024 之间的端口号范围由操作系统为公共服务保留。其他应用程序可以自由请求 1024 以上的任何端口。

IP 路由工作原理

要明白 IP 路由是如何工作的,我们必须从 IPv4 地址的结构开始。如前一节所述,IPv4 的长度是 32 位。它们以十进制计数法以 4 字节为一组(例如 192。168.122.5 )写成。该网络前缀中的给定数量的位用于标识应该递送分组的网络,并且其余的位用于标识特定的主机。因为同一网络的所有主机必须具有相同的前缀。通常,前缀在 CIDR 符号中描述,地址的网络部分的起始地址和位数由斜杠分隔(192.168.122.0/30)。然后可以使用该数字来找出网络中主机可用的地址数(在这种情况下,2 ^(32-30)= 4)。给定IP地址和前缀,可以通过在网络部分中使用全1的掩码对该地址进行逐位与运算来提取网络地址。计算主机地址恰恰相反;我们将需要AND与网络掩码的逻辑否定(主机掩码),其在网络部分中具有全0并且在主机部分中具有全 1。

因此,对于给定的 CIDR ,网络地址为 192.168.122.0 ,主机地址为 0.0.0.5 :

如前所述,每个 IP 网络都将有一个保留的广播地址,该地址可用于主机向该网络中的所有主机发送消息。这可以通过使用主机掩码进行 ORing 来计算。在我们的示例中,结果是 192.168.122.31 。请注意,网络地址不能是有效的主机地址。

IP 地址有两大类;一些地址块可以在公共互联网上路由,这些地址称为公共 IP 地址。其他一些块只能用于不直接与互联网连接的专用网络,这些被称为私有地址。如果 Internet 上的路由器收到发往私有 IP 地址的数据包,则必须丢弃该数据包。除了这两个,IP 地址也分类在各种参数上:一些仅保留用于文档(192.0.2.0/24),一些保留用于两个主机(169.254.0.0/16)之间的点对点通信,依此类推。 Rust 标准库具有根据类型对IP地址进行分类的便捷方法。

所有路由器都维护一个路由表,该路由表将前缀映射到路由器的出接口(而路由器管理员可能决定存储单个地址而不是前缀,这将很快导致繁忙路由器中的大型路由表)。表中的条目基本上表示如果数据包需要进入该网络,则应该在该接口上发送。接收数据包的下一个主机可能是另一个路由器或目标主机。路由器如何找出这张表?多个路由器在计算这些表的那些之间运行路由协议。一些常见的例子是 OSPF,RIP 和 BGP。给定这些原语,实际的路由机制相当简单,如下图所示。

IP 的一个有趣方面是使用 生存时间 Time To Live(TTL) 字段,这也称为跃点限制。 主机发送固定值为TTL(通常为64)的数据包。数据包交叉的每个路由器都会降低TTL。当它达到0时,数据包被丢弃。此机制确保数据包不会卡在路由器之间的无限循环中:

网络控制消息协议Internet Comtrol Message Protocol(ICMP) 用于在网络设备之前交换操作信息。在前面的示例中,如果配置为执行此操作,则一个或多个跌幅器可能决定发回 ICMP 错误。

请注意,尝试将前缀与路由表中的路由匹配时,多个路由可能匹配。如果发生这种情况,路由器必须选择具体的匹配并将其用于转发。 由于最具体的路由将具有前导 1 的最大数量因此具有最大前缀,这称为最长前缀匹配。假设我们的路由器具有以下路由表,如图所示。en2en1en3 是连接到我们路由器的三个网络接口,每个接口在不同的网络中具有不同的 IP 地址:

此时,如果我们的设备得到一个目标地址设置为 192.168.1.33 的数据包,那么这三个前缀都有这个地址,但最后一个是这个三个前缀中的最大的一个。所以,包将通过 eh3 流出。

到目前为止,我们描述的很多关于IPv4 地址的内容并没有因 IPv6 而改变。当然,它有更大的 128 位地址空间。在这种情况下,网络掩码和主机的长度取决于地址类型。

有人可能想知道,路由器如何构建路由表?与往常一样,有协议可以帮助解决这个问题。路由协议有两种主要类型:内部网关协议,用于在自治系统内部路由;以及外部网关协议,用于自治系统之间的路由;后者的一个例子是BGP。内部网关协议可以有两种类型,具体取决于它们对整个网络的看法。在链路状态路由中,参与协议的每个路由器维护整个网络拓扑的视图。在距离矢量路由中,每个路由器仅知道其一跳邻居。前者的示例是 路由信息协议 Routing Information Protocol(RIP),后者是 开放最短路径优先 Open Shortest Path First(OSPF)。有关这些的详细信息超出了本书的范围。但是,我们可以注意到,所有路由协议的共同主题是它们通过在路由器之间交换信息来工作。因此,他们有自己的数据包格式来封装该信息。

DNS 工作原理

请注意,任何人都无法记住互联网上每项服务的 IP 地址。幸运的是,有一个协议!域名服务 Domain Name Server(DNS) 通过将人类可读分层名称的映射维护到分布式数据库中的服务的IP地址来解决此问题。因此,当用户输入 http://www.google.com 在浏览器中点击 Enter 键,第一步是使用 DNS 查找名称 www.google.com 的 IP 地址。下图显示了此类查询所需的步骤。在本讨论中,我们将交替使用名称本地 DNS 解析程序本地 DNS 服务器本地 DNS 名称服务器

需要解析名称的应用程序将系统调用 getaddrinfo。这基本上要求操作系统继续执行并解析名称。图中未显示此步骤。接下来的步骤如下:

  1. 通常,网络中的每台计算机都将在文件 /etc/resolv.conf 中配置本地 DNS 服务器。在大多数情况下,这指向 ISP 的 DNS 服务器。也可能是指向家庭 Wi-fi 路由器的 DNS 服务器。在这种情况下,DNS 将透明地代理对 ISP 的DNS 服务器的请求。 然后操作系统将查询该服务器,询问给定的名称 www.google.com 的 IP。
  2. 反过来,本地 DNS 服务器将向预先填充的根名称服务器列表提出相同的问题。这些服务器由 ICANN 维护,其地址众所周知。它们维护顶级域名服务器的地址。这意味着他们知道 .com 域名的名称服务器的地址。
  3. 在此步骤中,根名称服务器将回复. com 域的 TLD 名称服务器的地址。这些服务器维护其自己域中名称服务器的地址列表。
  4. 然后,本地 DNS 服务器联系其中一个并询问相同的问题。
  5. 顶级域名服务器使用 google.com 域中的服务器地址回复。 google.com 域的管理员为该域维护了一堆域名服务器。这些域名服务器对该域中的所有记录具有完全权限,并且每个记录都标记为权威性以指示该域。
  6. 然后本地DNS服务器询问其中一个相同的问题。
  7. (希望)服务器确实知道 www.google.com 的地址。 如果是,则准备响应,将其标记为权威,并将其发送回本地DNS服务器。 答案还可以有一段时间与之关联,以便本地DNS服务器可以将其缓存以供将来使用,并在给定时间结束后将其逐出。如果没有,名称解析将失败,它将发回一个名为 NXDOMAIN 的特殊响应。
  8. 然后,本地 DNS 服务器将相同的响应发送回操作系统,操作系统将其传递给应用程序。本地服务器将响应标记为非授权,表明它从其他地方获得了该答案。

DNS 支持各种记录类型。 A 记录将名称映射到IPv4地址,AAAA 记录将名称映射到IPv6地址,依此类推。使用 PTR 记录支持反向查找。

常见的服务模式

对于两台通过网络进行通信的主机,他们需要相互发送消息。有两种交换消息的模型,每种模型都有特定的用例,它们最有效。在本节中,我们将探讨这些。请注意,服务模型是协议的属性,它们根据消费者对它们的期望来设定期望值。

面向连接的服务(TCP)

协议向其消费者提供的服务是面向连接的,当每一方在发送实际数据之前协商一个虚拟连接时。在设置过程中,必须商定一些有关连接的参数。这类似于旧的有线电话系统,在两台主机之间建立专用连接。在现代网络中,TCP 就是一个例子。TCP 的 PDU 是一个段,它由一个头和一个数据段组成。头部有几个字段用于在协议状态机的状态之间转换。下一个图显示了 TCP 头在实践中的样子。图中的每一行都是 32 位的(因此,每一行是两个八位字节),其中一些被分成多个段:

我们将看到一些用于操纵主机之前连接的方法:

  • 控制位(标志)是一组用于各种目的的 9 位。这里感兴趣的标志是 SYN、ACK、FIN 和 RST。SYN 触发序列号同步。ACK 标志表示接收器应该关心相应的确认号。FIN 标志启动断开连接的过程。RST 标志在出现错误时重置连接。
  • 序列号是一个 32 位字段,用于对接收端的消息重新排序。当设置 SYN 标志时(这种情况只适用于连接中的第一个数据包),序列号是初始序列号;否则,它是迄今为止累积的序列号。
  • 确认号是一个32位字段,用于实现可靠的消息传递。如果设置了 ACK 标志,则该值是发送方期望的下一个序列号。

在两个主机运行 TCP 交换数据之前,它们必须通过三次握手灭建立连接。这样的工作方式如下:第一次握手:建立连接时,客户端发送syn包(syn=j)到服务器,等待服务器确认。第二次握手:服务器收到 syn 包,必须确认客户的SYN(ack=j+1),同时自己也发送一个 SYN 包(syn=k),即SYN+ACK包。第三次握手:客户端收到服务器的 SYN+ACK 包,向服务器发送确认包ACK(ack=k+1),此包发送完毕,客户端和服务器进入ESTABLISHED(TCP连接成功)状态,完成三次握手。成功完成后,客户端和服务器都已同意序列号和确认号。该模型的优点是具有 可靠 的连接,其中发送方和接收方都知道期望什么。发送方可以调整发送数据的速率,具体取决于接收方的速度或速度,以及网络的拥塞程度。这里的缺点是更高的连接设置成本。假设将数据包发送到另一个大陆的主机需要 100 毫秒,我们将需要在开始发送数据之前交换至少 3 个数据包。这相当于 300 毫秒的延迟。虽然这可能看起来不是很多,但请记住,在任何给定的点上,用于访问 Facebook 的笔记本电脑可能会有数千个连接向全世界的服务器开放。面向连接的服务模型适用于大量用例,但在某些情况下,开销很大或不必要。一个例子是视频流。在这种情况下,一些丢失的数据包不会引起很大的问题,因为没有人注意到视频中的少量未对齐像素。这些应用程序更喜欢无连接模型,如下所述。

无连接服务(UDP)

这里的第二种情况是无连接服务。当多个消息彼此之间没有关系时使用此方法,因此这些协议在发送任何数据之前不需要任何连接协商步骤。这方面的一个例子是UDP,它不保证传输消息的顺序或可靠性(但是,它有一个校验和字段来保证数据报的正确性)。应该注意的是,如果需要的话,在UDP之上运行的协议总是可以自由实现可靠性的。有趣的是,IP路由也是一种无连接的服务。UDP头显示如下:

很容易看到这里的头远远小于 TCP 头。它还缺少一些字段,TCP 使用这些字段来管理连接,并根据网络拥塞等进行调整。由于 UDP 没有这些字段,因此它无法提供这些保证。

Linux 中的网络编程接口

在本节中,我们将看到 Linux(以及许多其他Unix家族成员)如何实现常见的网络模式,以及用户在编写网络应用程序时如何与这些模式进行交互。本节中的所有讨论都将严格基于类似 Linux 的操作系统和标准 C 库(glibc)。Portable OS Interface(POSIX) 标准括所有这些,使它们可以移植到任何符合 POSIX 的操作系统。这里的所有函数和数据结构都遵循 C(和C++)编码约定,但我们稍后会看到,其中一些可以通过LIBC绑定在 Rust 中获得。

操作系统提供的最重要的网络原语是 套接字(socket) 。现在,什么是套接字?套接字是一个美化的文件描述符,一个唯一的ID,分配给类Unix操作系统中的每个文件。这遵循Unix哲学,即一切都应该是一个文件;通过网络将两台主机之间的连接视为文件,使操作系统可以将其作为文件描述符公开。然后,程序员可以自由地使用传统的 I/O 相关系统调用来从该文件进行写入和接收。

显然套接字需要保存比常规文件描述符更多的数据。例如,它需要跟踪远程IP和端口(以及本地IP和端口)。因此,套接字是两个主机之间连接的逻辑抽象,以及在这些主机之间传输数据所需的所有信息。

有两种主要类型的套接字:用于与同一主机上的进程通信的UNIX套接字,以及用于通过IP网络进行通信的Internet套接字

标准库还提供了一些与套接字交互的系统调用。其中一些是特定于套接字的,其中一些是支持写入文件描述符的通用 I/O 系统调用。由于套接字基本上是文件描述符,因此可以使用它们与套接字进行交互。其中一些在下图中描述。请注意,并非所有应用程序都需要使用所有这些系统调用。例如,服务器在创建套接字后需要调用listen 来开始侦听传入连接。它不需要为同一个连接调用 connect

任何类Unix操作系统都会在联机帮助页中为每个系统调用提供详细的文档。例如,可以使用命令 man socket 访问套接字系统调用的文档。 man 命令的第二个参数是联机帮助页的一部分。

让我们更详细地看五这些系统调用的签名。除非另有说明否则所有这些都会在成功返回 0 或者在失败时返回 -1 ,并相应地设置 errno 值

int socket(int domain, int type, int protocol);

socket syscall的第一个参数告诉它将使用哪种通信套接字。常见类型是 IPv4 的 AF_INET,IPv6 的 AF_INET6,IPC 的 AF_UNIX 等,第二个参数告诉它应该创建什么类型的套接字,常见值是 TCP 套接字的 SOCK_STREAM,UDP 套接字的 SOCK_DGRAM,原始套接字的 SOCK_RAW,它提供对数据包级别的网络硬件的直接访问,等等。隍一个参数表示要使用的第 3 层协议。在我们的例子中,只是 IP 。文件 /etc/protocols 中提供了支持的协议的完整列表。

成功时,将返回内核分配给创建的套接字的新文件描述符。

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

bind 的第一个参数是文件描述符,通常是 socket 系统调用返回的。第二个参数是要分配给给定套接字的地址,作为指向结构的指针传递。第三个参数是给定地址的长度。

int listen(int sockfd, int backlog);

listen 是一个接收套接字文件描述符的函数。请注意,当应用程序正在侦听

套接字上的传入连接时,它可能无法像数据包到达那样快速地从中读取。为了处理这样的情况,内核为每个套接字维护一个数据包队列。这里的第二个参数是给定套接字的队列的最大长度。如果有更多客户端尝试在此处给定的号码后连接,则连接将被关闭并返回一个拒绝连接错误。

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

此调用用于接受 TCP 套接字上的连接。它接受给定套接字的队列连接,创建一个新套接字,并将新套接字的文件描述符返回给调用者。第二个参数是一个指向套接字地址结构的指针,该结构填充了新套接字的信息。第三个参数是它的长度。

int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

此函数将第一个参数给出的套接字连接到第二个参数中指定的地址(第三个参数是地址结构的长度)。

ssize_t send(int sockfd, const void *buf, size_t len, int flags);

这用于通过套接字发送数据。第一个参数告诉它使用哪个套接字。第二个参数是指向要发送的数据的指针,第三个参数是其长度。最后一个参数是多个选项的按位 OR ,它指示如何在此连接中传递数据包。

系统调用返回成功发送的字节数。

ssize_t recv(int sockfd, void *buf, size_t len, int flags);

这个是发送的对应函数。和之前一样,第一个参数告诉它要读取哪个套接字。第二个参数是指向已分配空间的指针,它应该写入它读取的数据,第三个参数是其长度。这里的 flags 与 send 函数的含义相同。

此函数返回成功时收到的字节数。

int shutdown(int sockfd, int how);

这个函数用来关闭 socket。第一个参数告诉它要关闭哪个套接字。第二个参数指示在套接字关闭之前是否应允许进一步的传输或接收。

int close(int fd);

此系统调用用于销毁文件描述符。因此,在给定文件描述符编号的情况下,这也可用于关闭和清理套接字。关闭允许套接字接收挂起数据而不接受新连接,关闭将丢弃所有现有连接和清理资源。

除了上面提到的那些之外,主机还需要使用 DNS 来解析远程主机的 IP 。 getaddrinfo 系统调用就是这么做的。还有一些其他系统调用为编写应用程序提供了各种有用的信息:gethostname 返回当前计算机的主机名,setsockopt 在套接字上设置各种控制选项,等等。

请注意,上面描述的很多系统调用都阻塞的,这意味着它们会阻止在等待给定操作完成时调用它们的线程。例如,如果没有足够的数据来填充提供的缓冲区,则 read 调用将阻塞套接字。通常情况下,这是不可取的,尤其是在现代多线程环境中,阻塞调用将无法充分利用可用的计算能力,因为线程将循环执行无用的操作。

Unix 提供了一些系统调用,可以使用标准 C 库启用异步,非阻塞应用程序。有两种标准方法:

  • 使用 select 系统调用:此系统调用监视给定套接字的列表,并让调用者知道是否有任何数据要读取。然后,调用者可以使用一些特殊的宏检索这些文件描述符并从中读取。

  • 使用 poll 系统调用:这里的高级语义类似于 select:它接收套接字文件描述符列表和超时。它会针对给定的超时异步监视这些超时,如果其中任何一个有一些数据,它会让调用者知道。与 select 不同, select 检查所有文件描述符的所有条件(可读性,可写性和错误),poll 仅关注它接收的文件描述符和条件列表。这使得 poll 更容易使用,并且比 select 更快。

但实际上,对于可能需要监视大量连接套接字的应用程序, selectpoll 都非常慢。对于这样的应用程序, epoll 或基于事件的网络库(如 libeventlibev)可能更适合。性能的提高是以可移植性为代价的;这些库并非在所有系统中都可用,因为它们不是标准库的一部分。另一个成本是编写和维护基于外部库的应用程序的复杂性。

在接下来的章节中,我们将介绍通过网络进行通信的 TCP 服务器和客户端的状态转换。为简单起见,这里有一些理想的假设:我们假设没有任何类型的中间错误或延迟,服务器和客户端可以以相同的速率处理数据,并且服务器和客户端都不会崩溃通信。我们还假设客户端启动连接(Active open)并将其关闭(Active close)。我们没有显示状态机的所有可能状态,因为这样做太麻烦了:

服务器和客户端都从 CLOSED 状态开始。假设服务器首先启动,它将首先获取套接字,将地址绑定到它,然后开始监听它。客户端启动并调用连接到服务器的地址和端口。当服务器看到连接时,它会调用 accept 。该调用返回一个新的套接字,服务器可以从中读取数据。但在实际数据传输发生之前,服务器和客户端必须进行三次握手。客户端通过发送 SYN 来启动,服务器读取该消息,响应 SYN + ACK 消息,然后进入 SYN_RCVD 状态。客户端进入 SYN_SEND 状态。

当客户端获得 SYN + ACK 时,它会发出最终的 ACK 并进入 ESTABLISHED 状态。服务器在获得最终 ACK 时进入 ESTABLISHED 。仅当双方都处理 ESTABLISHED 状态时才 建立实际连接。此时服务器和客户端都可以发送和接收数据。这些操作不会导致状态改变。一段时间后,客户端可能想关闭连接。为此,它发送一个 FIN 数据包,并进入 FIN_WAIT_1 状态。服务器接收到该消息,发送 ACK 并进入 CLOSE_WAIT 状态。当客户端获得该状态时,它将进入 FIN_WAIT_2 状态。第一轮连接终止到此结束。然后服务器调用 close ,发送 FIN ,然后进入 LAST_ACK 状态。当客户端得到它时,它发出一个 ACK 并进入 TIME_WAIT 状态。当服务器收到最终的 ACK 时,它会返回到 CLOSED 状态。在此之后,将释放此连接的所有服务器资源。但是,客户端在进入 CLOSED 状态之前等待超时,然后释放所有客户端资源。

我们在这里的假设是非常基本各理想主义的。在现实世界中,沟通往往更加复杂。例如,服务器可能想要推送数据,然后它必须启动连接。数据包在传输过程中可能已损坏,导致任何一方请求重新传输,依次类推。

最大段寿命 Maximum Segment Lifetime(MSL) 定义为 TCP 段在网络中可能存在的最长时间。在大多数现代系统中,它设置为 60s 。

总结

本章首先介绍了在现代世界中编写网络应用程序的动机。我们还研究了网络是如何演变的。我们研究了常见的网络技术和想法,并研究了它们如何协同工作;从简单的 IP 路由和 DNS 到 TCP 和 UDP 。然后我们研究了 Linux(和POSIX)如何支持同步和异步网络编程。

在下一章中,我们将介绍 Rust 并尝试了解它在现有平台上的优势。在激励网络和 Rust 之后,我们将继续使用 Rust 进行网络编程。

第二章:Rust 及其生态系统简介

Rust 编程语言由 Mozilla 赞助, 并得到来自全球各地的开发人员社区的支持。Rust 被提升为一种系统编程语言, 它支持自动内存管理, 而无需运行时或垃圾收集器的开销, 不需要编译器强制执行的数据竞争, 以及零成本抽象和泛型。在后面的部分中, 我们将更详细地讨论这些功能。Rust 是静态类型化的, 并借用了许多函数编程的想法。Rust 的一个迷人方面是使用类型系统来保证内存安全, 而无需使用运行时。这使得 Rust 特别适用于低资源嵌入式设备和实时系统, 这需要围绕代码正确性提供强有力的保证。反过来说, 这通常意味着编译器必须做更多的工作来确保语法的正确性, 然后转换源代码, 从而获得更高的生成时间。虽然社区正在研究尽可能缩短编译时间的方法, 但这仍然是很多开发者遇到的一个重要问题。

The Low Level Virtual Machine(LLVM) 项目最初是一个大学研究项目, 旨在开发一组用于构建编译器的工具, 这些编译器可以为一系列 CPU 体系结构生成计算机代码。这是使用 LLVM intermediate representation (LLVM IR) 实现的。工具链可以将任何更高级别的语言编译为 LLVM IR, 然后将其定位到给定的 CPU 。Rust 编译器在很大程度上依赖于 LLVM 项目的互操作性, 将其用作后端。它实际上将 Rust 代码转换为 LLVM 的中间表示形式, 并根据需要对其进行优化。然后 LLVM 将其转换为特定平台的计算机代码, 该代码在 CPU 上运行。

在本章中, 我们将介绍以下主题:

  • 介绍生态系统和 Rust 是如何工作的
  • 安装 Rust 和设置工具链
  • 介绍了它的主要特点, 从借用检查器开始, 所有权是如何工作的
  • 泛型和特征系统如何与所有权模型一起工作
  • 错误处理和宏系统
  • 简单的并发
  • 简单的测试

请注意, 本章是对语言及其一些最显著特征的非常高级的概述, 而不是深入的介绍。

Rust 生态系统

源项目的成败往往取决于它周围社区的实力。拥有一个连贯的生态系统有助于建立一个强大的社区。由于 Rust 主要由 Mozilla 驱动, 它们能够在其周围建立一个强大的生态系统, 主要组件是:

  • 源代码: Rust 承载 GitHub 中的所有源代码。鼓励开发人员报告错误并在那里提交拉请求。在编写本文时, GitHub 上的 Rust 存储库有 1, 868 个独特的贡献者、2, 700多份开放 bug 报告和90个打开的请求。核心 Rust 团队由 Mozilla 员工和其他组织 (如 Google、百度等) 的贡献者组成。该团队使用 GitHub 进行所有协作;即使是对任何组件的重大更改, 也必须首先通过编写征求 意见请求 Request For Comments(RFC) 来提出。这样, 每个人都有机会去看看, 合作改进它。一旦获得批准, 实际的变化就可以实现。
  • 编译器: Rust 编译器被命名为 rustc。由于 Rust 遵循编译器版本的语义版本控制, 因此在次要版本之间不可能有任何向后不兼容的中断更改。在编写本书时, 编译器已经达到1.0 版, 因此可以假定在2.0 版之前不会有任何重大更改。请注意, 重大更改确实会偶尔滑倒一次。但在所有这些情况下, 它们都会被当作 Bug 对待, 并尽快修复。

为了在不破坏现有依赖库的情况下添加新的编译器功能, Rust 分阶段发布新的编译器版本。在任何时候, 都会保留三个不同版本的编译器 (和标准库)。

  • 第一个叫 nightly。顾名思义, 它每晚都会从源树的顶端构建。由于这只经过单元测试和集成测试, 因此此版本在现实世界中往往有更多的 bug。
  • 第二阶段是 beta , 这是一个计划中的版本。到每晚达到此阶段时, 它已经经历了多轮单元、集成和回归测试。此外, 社区还有时间在实际项目中使用它, 并与 Rust 团队分享反馈。
  • 一旦每个人都对发布充满信心, 就会将其标记为 stable 的发布, 并推出。由于编译器支持各种平台 (从 Windows 到氧化还原) 和体系结构 (amd64), 因此每个版本都为所有平台和体系结构的组合提供了预构建的二进制二进制文件。
  • 安装机制: 社区支持的安装机制是通过一个名为 rustup 的工具进行的。此工具可以安装给定版本的 Rust 以及使用它所需的所有内容 (包括编译器、标准库、包管理器等)。
  • 包管理器: Rust 的包管理器称为 Cargo, 而单个包裹称为 crate 。所有外部库和应用程序都可以打包在 crate 中, 并使用 Cargo CLI 工具发布。然后, 用户可以使用它来搜索和安装包。所有 crates 均可使用以下网站进行搜索: https://crates.io/ 。对于在 crates.io 上托管的所有包, 相应的文档可在: https://docs.rs/ 获得。

Rust 入门

Rust 工具链安装程序在 https://www.rustup.rs/ 。 以下命令将在系统上安装工具链的所有三个版本。对于本书中的示例, 我们将使用运行 Ubuntu 16.04 的 Linux 计算机。虽然大多数 Rust 不应依赖于操作系统, 但可能会有一些细微的差异。

我们将指出操作系统上的任何严格依赖关系:

curl https://sh.rustup.rs -sSf | sh
source $HOME/.cargo/env
rustup install nightly beta

我们需要将 Cargo 的 bin 目录添加到 系统环境变量 PATH 中,通过编辑 .bashrc 或者:

echo "export PATH=$HOME/.cargo/bin:$PATH" >> ~/.bashrc

Rust 安装附带了大量的文档;它们可以通过运行以下命令进行访问。这应该会在浏览器窗口中打开文档:

rustup doc

下一步创建一个 Rust 项目并运行它,使用 cargo:


# #![allow(unused_variables)]
#fn main() {
cargo new --bin hello-rust
#}

它告诉 Cargo 在当前目录下建立一个名为 hello-rust 的项目。cargo 将创建该名称的目录,并设置基本结构。由于项目的类型设置为二进制, cargo 将生成一个名为 main.rs 的文件,该文件有一个空的 main 函数,即整个应用程序的入口点。这里的另一个(默认)选择是库,在这种情况下,将生成一个 lib.rs 的文件。名为 cargo.toml 的文件有一堆当前项目的元数据,由 Cargo 使用。所有源代码在 src 目录下:

# tree hello-rust/
hello-rust/
├── Cargo.toml
└── src
└── main.rs

1 directory, 2 files

然后,可以使用以下命令生成和运行该项目。请注意,此命令应该从 Cargo 创建的 hello-rust 目录中运行:

cargo run

有趣的是, 此命令对目录进行了相当大的修改。 target 目录包含编译项目。它的结构在很大程度上依赖于平台, 但始终包括在给定的生成模式下运行应用程序所需的一切。默认生成模式是 debug , 其中包括调试信息和要与调试器一起使用的符号:

# tree hello-rust/
hello-rust/
├── Cargo.lock
├── Cargo.toml
├── src
│   └── main.rs
└── target
└── debug
├── build
├── deps
│ └── hello_rust-392ba379262c5523
├── examples
├── hello-rust
├── hello-rust.d
├── incremental
└── native

8 directories, 6 files

打包的 Rust 应用叫做 crate 。Rust crate 发布到 crate.io 。 发布后,任何人都可以使用 web 界面或者 cargo CLI 查找 crates。如下面代码所示。这个操作需要连网以便 cargo 与 crates.io 通信:

# cargo search term
Updating registry `https://github.com/rust-lang/crates.io-index`
term = "0.4.6" # A terminal formatting library
ansi_term = "0.9.0" # Library for ANSI terminal colours and styles (bold, underline)
term-painter = "0.2.4" # Coloring and formatting terminal output
term_size = "0.3.0" # functions for determining terminal sizes and dimensions
rust_erl_ext = "0.2.1" # Erlang external term format codec.

slog-term = "2.2.0" # Unix terminal drain and formatter for slog-rs
colored = "1.5.2" # The most simple way to add colors in your terminal
term_grid = "0.1.6" # Library for formatting strings into a grid layout
rust-tfidf = "1.0.4" # Library to calculate TF-IDF (Term Frequency Inverse Document Frequency) for generic documents
aterm = "0.20.0" # Implementation of the Annotated Terms data structure ... and 1147 crates more (use --limit N to see more)

现在, 说我们要在我们的应用程序中使用 term ;我们将需要编辑 Cargo. toml 文件, 并将其包w含在 [dependencies] 部分, 如下面的代码段所示。这个特殊的 crate 有助于格式化终端颜色。使用这个 crate , 我们想打印的单词 helloworld ,用绿色和红色分别显示:

[package]
name = "hello-rust"
version = "0.1.0"
authors = ["Foo Bar <foo.bar@foobar.com>"]

[dependencies]
term = "0.4.6"

要在程序中使用 term ,我们需要编辑 main.rs 如下面代码段所示。我们将在后面的章节中详细讨论语言, 但这里有一个小概述。下面的示例是一个简单的程序在屏幕上打印 hello world! 绿色的 hello , 红色的 world!

// chapter2/hello-rust/src/main.rs

extern crate term;

fn main() {

    let mut t = term::stdout().unwrap(); 
    t.fg(term::color::GREEN).unwrap(); write!(t, "hello, ").unwrap();

    t.fg(term::color::RED).unwrap(); writeln!(t, "world!").unwrap();

    t.reset().unwrap();

}

在 Rust 中, 每个应用程序都必须有一个称为 main 的入口点, 该入口点应定义为不接受参数的函数。函数是使用 fn 关键字定义的。短语 extern crate 术语告诉工具链, 我们要使用外部 crate 作为我们当前应用程序的依赖项。

现在, 我们可以使用 Cargo 运行它。它自动下载和构建我们需要的库及其所有依赖项。最后, 它调用 Rust 编译器, 以便我们的应用程序与库链接并运行可执行文件。Cargo 还生成一个名为 Cargo.lock 的文件, 该文件包含以一致的方式运行应用程序所需的所有内容的快照。永远不要手动编辑此文件。由于货物在本地缓存所有依赖项, 因此后续调用不需要互联网访问:

$ cargo run 
        Updating registry `https://github.com/rust-lang/crates.io-index` 
    Compiling term v0.4.6 
    Compiling hello-rust v0.1.0 (file:///Users/Abhishek/Desktop/rustbook/src/chapter2/hello-rust) 
    Finished dev [unoptimized + debuginfo] target(s) in 3.20 secs 
    Running `target/debug/hello-rust` 
hello, world!

借用检查

Rust 最重要的是所有权和借用模式。基于严格执行借用规则,编译器可以在没有外部垃圾收集的情况下保证内存安全。这是由借用检查器(编译器的子系统)完成的。根据定义,创建的每个资源都有一个生命周期和一个与之关联的所有者,它根据以下规则运行:

  • 每个资源在任何时间点都只有一个所有者。默认情况下,所有者是创建该资源的变量,其生命周期封闭范围的生命周期。其他人可以在需要时借用或者复制资源。请注意,资源可以是变量或者函数中的任何内容。函数从其调用者获得资源的所有权;从函数返回转回所有权。
  • 当所有者的作用域已完成执行时,将删除其拥有的所有资源。这由编译器静态计算,然后编译器相应地生成机器代码。

以下代码段中显示了这些规则的一些示例:

// code/chapter2/ownership-heap.rs

fn main() {
    let s = String::from("Test");
    heap_example(s);   
}

fn heap_example(input: String) {
    let mystr = input;
    let _otherstr = mystr;
    println!("{}", mystr);
}

在 Rust 中, 变量是使用 let 关键字声明的。默认情况下, 所有变量都是不可变的, 可以使用 mut 关键字使其可变。:: 语法是指给定命名空间中的对象, 在本例中为 from 函数。println! 是编译器提供的内置宏;它用于写入标准输出与尾随换行符。函数是使用 fn 关键字定义的。当我们试图编译它, 得到以下错误:

# rustc ownership-heap.rs 
error[E0382]: use of moved value: `mystr` 
    --> ownership-heap.rs:9:20
    | 
  8 | let _otherstr = mystr;
    | --------- value moved here 
  9 | println!("{}", mystr);
    | ^^^^^ value used here after move
    |
    = note: move occurs because `mystr` has type `std::string::String`, which does not implement the `Copy` trait

error: aborting due to previous error

在这个例子中,在函数 heap_example 中创建了一个由变量 mystr 拥有的字符串资源。因此,它的生命周期与它的范围相同。由于编译器在编译时不知道字符串的长度, 因此必须将其放置在堆上。所有者变量是在堆栈上创建的, 并指向堆上的资源。当我们将该资源分配给新变量时, 该资源现在归新变量所有。此时, rust 将把 mystr 标记为无效, 以防止与资源关联的内存可能被多次释放的情况。因此, 编译不能在这里保证内存安全。我们可以强制编译器复制资源, 并让第二个所有者指向新创建的资源。为此, 我们需要 .clone() 名为 mystr 的资源。下面是它的样子:

// code/chapter2/ownership-heap-fixed.rs

fn main() {
    let s = String::from("Test");
    heap_example(s);
}

fn heap_example(input: String) { 
    let mystr = input; 
    let _otherstr = mystr.clone(); 
    println!("{}", mystr); 
}

如预期的那样, 这不会在编译时引发任何错误, 并在运行时打印给定的字符串 "Test"。请注意, 到目前为止, 我们一直在使用 Cargo 来运行我们的代码。因为在这种情况下, 我们只有一个简单的文件, 没有外部依赖关系, 我们将使用 Rust 编译器直接编译我们的代码, 然后手动运行它:

$ rustc ownership-heap-fixed.rs && ./ownership-heap-fixed 
Test 

请考虑下面的代码, 它显示了资源存储在堆栈上的情况:

// code/chapter2/ownership-stack.rs

fn main() {
    let i = 42;
    stack_example(i);
}

fn stack_example(input: i32) {
    let x = input;
    let _y = x;
    println!("{}", x);
}

有趣的是, 尽管它看起来与以前的代码块完全相同, 但这不会引发编译错误。我们直接从命令行使用 Rust 编译器构建和运行:

# rustc ownership-stack.rs && ./ownership-stack 
42

区别在于变量的类型。在这里, 原始所有者和资源都是在堆栈上创建的。重新分配资源后, 会将其复制到新所有者。之所以能够做到这一点, 只是因为编译器知道整数的大小始终是固定的 (因此可以放在堆栈上)。 Rust 提供了一种特殊的方式来表示, 可以通过 Copy 特征将类型放置在堆栈上。我们的示例之所以有效, 只是因为内置整数 (和其他一些类型) 使用此特性进行标记。我们将在后面的章节中更详细地解释特征系统。

人们可能已经注意到, 将长度未知的资源复制到函数可能会导致内存膨胀。在许多语言中, 调用方将向内存位置传递一个指针, 然后传递到函数。Rust 通过使用引用来执行此操作。这样, 您就可以引用资源, 而不实际拥有资源。当函数收到对资源的引用时, 我们说它借用了该资源。在下面的示例中, 函数 heap_example 借用变量 s 所拥有的资源。由于借用不是绝对所有权, 因此借用变量的范围不会影响释放与资源关联的内存的方式。这也意味着不可能在函数中多次释放借来的资源, 因为函数作用域中没有人实际拥有该资源。因此, 在这种情况下, 早期失败的代码起作用:

// code/chapter2/ownership-borrow.rs

fn main() {
    let s = String::from("Test");
    heap_example(&s);
}

fn heap_example(input: &String) {
    let mystr = input;
    let _otherstr = mystr;
    println!("{}", mystr);
}

借用规则还意味着借用是不可改变的,然而,在有些情况下,我们需要可变借用。为了处理这种情况,Rust 允许可变引用(或借用)。这将使我们回到第一个示例中遇到的编译失败的问题,代码如下:

// code/chapter2/ownership-mut-borrow.rs

fn main() {
    let mut s = String::from("Test");
    heap_example(&mut s);
}

fn heap_example(input: &mut String) {
    let mystr = input;
    let _otherstr = &mystr;
    println!("{}", mystr);
}


请注意, 资源只能在作用域中被可变借用一次。编译器将拒绝编译尝试执行其他操作的代码。虽然这看起来可能是一个令人讨厌的错误, 但您需要记住, 在工作的应用程序中, 这些函数通常会从竞争线程调用。如果由于编程错误而出现同步错误, 我们最终将出现数据争用, 其中多个不同同步的线程竞相修改同一资源。此功能有助于防止这种情况。

与引用密切相关的另一个语言功能是生命周期。一个引用只要在范围内就会存活,因此它的生命周期是一整个封闭作用域。在 Rust 中声明的所有变量都可以有一个生命周期的显式省略,将一个名称置于其生命周期。这对于借用检查器来说很有用,可以解释变量的相对生命周期。通常, 不需要为每个变量都设置显式生存期名称, 因为编译器管理该名称。在某些情况下, 这是必需的, 尤其是在自动生命周期确定无法工作的情况下。让我们看一个发生这种情况的例子:

// code/chapter2/lifetime.rs
fn main() {
    let v1 = vec![1, 2, 3, 4, 5];
    let v2 = vec![1, 2];

    println!("{:?}", longer_vector(&v1, &v2));
}

fn longer_vector<'a>(x: &'a[i32], y: &'a[i32]) -> &'a[i32] {
    if x.len() > y.len() { x } else {y }
}


vec! 宏从给定的对象列表中构造一个向量。请注意,与前面的示例不同,我们在此处的函数需要将值返回给调用方。我们需要使用箭头语法指定返回类型,在这里,我们得到两个向量,我们要打印两个向量中最长的一个。longer_vector 函数完成这个功能。它接受两个向量的引用,计算它们的长度,并返回长度较大的向量的引用。编译失败,错误如下:

# rustc lifetime.rs 
error[E0106]: missing lifetime specifier 
    --> lifetime.rs:8:43
    | 
  8 | fn longer_vector(x: &[i32], y: &[i32]) -> &[i32] {
    | ^ expected lifetime parameter
    |
    = help: this function's return type contains a borrowed value, but the signature does not say whether it is borrowed from `x` or `y`

error: aborting due to previous error

这告诉我们, 编译器无法确定返回的引用是应引用第一个参数还是第二个参数, 因此无法确定它应该存在多长时间。由于我们无法控制输入, 因此无法在编译时确定这一点。这里的一个关键见解是, 我们不需要在编译时知道所有引用的生命周期。我们需要确保以下事情成立:

  • 这两个输入应该具有相同的生存期, 因为我们要在函数中比较它们的长度
  • 返回值应该与输入参数中更长的一个有着相同的生命周期。

考虑到这两个公理, 这两个输入和返回应该具有相同的生存期。我们可以对此进行注释, 如下面的代码段所示:

fn longer_vector<'a>(x: &'a[i32], y: &'a[i32]) -> &'a[i32] { 
    if x.len() > y.len() { x } else { y } 
}

这与预期的工作一样, 因为编译器可以很好地保证代码的正确性。生存期参数也可以附加到结构和方法定义。有一个特殊的生命周期叫做 'static, 它指的是程序的整个持续时间。

Rust 最近接受了一项建议, 即增加一个名为 'fn 的新的指定生命周期, 其生命周期等于最里面的函数或闭包的范围。

泛型和 Trait 系统

Rust 支持编写泛型代码, 这些代码后来与更具体的类型绑定, 无论是在编译时或运行时。熟悉 C++ 中模板的人可能会注意到, 就语法而言, Rust 中的泛型与模板非常相似。下面的示例说明如何使用泛型编程。我们还介绍了一些我们以前没有讨论过的新结构, 我们将在继续讨论的过程中解释这些结构。

与 C 和 C++ 类似, Rust struct 定义了一个用户定义的类型,该类型在一个单无聚合多个逻辑上的连接的资源。这里的结构定义了两个变量的元组。我们定义一个泛型结构,并使用了一个泛型类型参数 <T> 。结构的第一个成员都被定义为该类型。稍后,我们定义一个泛型函数,该函数对元组的两个元素进行求和,让我们来看看一个幼稚的实现:

// code/chapter2/generic-function.rs

struct Tuple<T> {
    first: T,
    second: T,
}

fn main() {
    let tuple_u32: Tuple<u32> = Tuple {
        first: 4u32,
        second: 2u32,
    };
    let tuple_u64: Tuple<u64> = Tuple {
        first: 5u64,
        second: 6u64,
    };
    println!("{}", sum(tuple_u32));
    println!("{}", sum(tuple_u64));

    let tuple: Tuple<String> = Tuple {
        first: "One".to_owned(),
        second: "Two".to_owned(),
    };
    println!("{}", sum(tuple));
}

fn sum<T>(tuple: Tuple<T>) -> T {
    tuple.first + tuple.second
}

编译失败,报错如下:

$ rustc generic-function.rs 
error[E0369]: binary operation `+` cannot be applied to type `T` 
    --> generic-function-error.rs:18:5 
    | 
 18 | tuple.first + tuple.second 
    | ^^^^^^^^^^^^^^^^^^^^^^^^^^ 
    | = note: `T` might need a bound for `std::ops::Add`

error: aborting due to previous error

此错误很重要。编译器告诉我们, 它不知道如何添加 T 类型的两个操作数。它还 (正确地) 猜测 T 类型需要受 Add 特征的约束。这意味着 T 的可能具体类型列表应只具有实现 Add 特征的类型, 这些类型可以添加具体引用。让我们继续前进, 并把特征绑定在和函数中。我们的代码现在应该是这样的:

// code/chapter2/generic-function-fixed.rs

use std::ops::Add;

struct Tuple<T> {
    first: T,
    second: T,
}

fn main() {
    let tuple_u32: Tuple<u32> = Tuple {
        first: 4u32,
        second: 2u32,
    };
    let tuple_u64: Tuple<u64> = Tuple {
        first: 5u64,
        second: 6u64,
    };

    println!("{}", sum(tuple_u32));
    println!("{}", sum(tuple_u64));

    // These lines fail to compile
    let tuple: Tuple<String> = Tuple {
        first: "One".to_owned(),
        second: "Two".to_owned(),
    };
    println!("{}", sum(tuple));
}

// We constrain the possible types of T to those which implement the Add trait

fn sum<T: Add<Output = T>>(tuple: Tuple<T>) -> T {
    tuple.first + tuple.second
}

要实现这一点,元素必须是可求和的;对它们求和应该有逻辑意义。因此,我们将 T 参数可能具有的类型限制为那些实现了添加特性的类型。我们还需要让编译器知道这个函数的输出应该是 T 类型的。根据这些信息,我们可以构造我们的元组并对它们调用 sum 函数,它们的行为将如预期的那样。另外,请注意,字符串的元组将无法编译并出现错误,因为没有为字符串实现 Add 特性。

从上一个例子中,我们可能会注意到,特性对于正确实现泛型是必不可少的。它们帮助编译器解释泛型类型的属性。本质上,特征定义了类型的属性。库为内置类型定义了一系列常用的特性及其实现。对于任何用户定义的类型,都取决于用户通过定义和实现特性来定义这些类型应该具有哪些属性。

// code/chapter2/traits.rs

trait Max<T> {
    fn max(&self) -> T;
}

struct ThreeTuple<T> {
    first: T,
    second: T,
    third: T,
}

impl<T: PartialOrd + Copy> Max<T> for ThreeTuple<T> {
    fn max(&self) -> T {
        if self.first >= self.second && self.first >= self.third {
            self.first
        } else if self.second >= self.first && self.second >= self.third {
            self.second
        } else {
            self.third
        }
    }
}

struct TwoTuple<T> {
    first: T,
    second: T,
}

impl<T: PartialOrd + Copy> Max<T> for TwoTuple<T> {
    fn max(&self) -> T {
        if self.first >= self.second {
            self.first
        } else {
            self.second
        }
    }
}

fn main() {
    let two_tuple: TwoTuple<u32> = TwoTuple {
        first: 4u32,
        second: 2u32,
    };
    let three_tuple: ThreeTuple<u64> = ThreeTuple {
        first: 6u64,
        second: 5u64,
        third: 10u64,
    };

    println!("{}", two_tuple.max());
    println!("{}", three_tuple.max());
}

我们首先定义T类型的一般特性。我们的特性只有一个函数,它将返回实现它的给定类型的最大值。 最重要的是实现细节,在这个阶段并不重要。然后我们定义一个由三个元素组成的元组,每个元素都是相同的泛型类型。稍后,我们为我们定义的类型实现我们的特性。在 rust 中,如果没有显式返回语句,则函数返回最后一个表达式。在社区中使用这种风格被认为是惯用的。我们的 max 函数在 if…else 块中使用这个特性。为了使实现工作,泛型类型必须在它们之间定义一个排序关系,以便我们可以比较它们。在 Rust 中,这是通过将可能的类型约束到实现党派特性的类型来实现的。我们还需要对复制特性设置一个约束,以便编译器可以在从函数返回之前复制自参数。我们继续定义另一个元组,它有两个元素。我们在这里以类似的方式实现相同的特性。当我们在 main 函数中实际使用这些函数时,它们会按预期工作。

Traits 还可用于扩展内置类型和添加新功能。让我们看下面的例子:

// Trait for our behavior

trait Sawtooth {
    fn sawtooth(&self) -> Self;
}

impl Sawtooth for f64 {
    fn sawtooth(&self) -> f64 {
        self - self.floor()
    }
}

fn main() {
    println!("{}", 2.34f64.sawtooth());
}

在这里, 我们要在内置的 f64 类型上实现 sawtooth https://en.wikipedia.org/wiki/Sawtooth_wave 。 此函数在标准库中不可用, 因此我们需要编写一些代码, 以便通过扩展标准库使其正常工作。要将其无缝集成到类型系统中, 我们需要定义一个特性, 并为 f64 类型实现它。这使我们能够使用点符号使用新函数, 就像 f64 类型上的任何其他内置函数一样。

标准库提供了许多内置特性;最常见的是 DisplayDebug 。用于格式化调试输出。Display 对应空格式化输出 {}Debug 用于调试格式化输出。所有的数学运算都被定义为特征, 如 AddDiv 等。如果使用 #[derive] 属性标记用户定义类型, 编译器将尝试为这些类型提供默认实现。但是, 如果需要, 实现可能会选择重写其中的任何一个。下面的代码段中显示了这方面的一个示例:

// code/chapter2/derive.rs

use std::fmt;
use std::fmt::Display;

#[derive(Debug, Hash)]
struct Point<T> {
    x: T,
    y: T,
}

impl<T> fmt::Display for Point<T> where T: Display {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "({}, {})", self.x, self.y)
    }
}

fn main() {
    let p: Point<u32> = Point { x: 4u32, y: 2u32 };

    // uses Display
    println!("{}", p);

    // uses Debug
    println!("{:?}", p);
}


我们用两个字段定义一个 Point 结构。我们让编译器生成一些常见特性的实现。但是,我们必须手工实现Display 特性,因为编译器无法确定显示用户定义类型的最佳方式。我们必须将泛型类型约束到使用 where 子句实现 Display 的类型。这个例子还演示了基于特征约束类型的替代方法。设置好所有这些之后,我们可以使用默认格式化程序显示我们的点。这将产生以下输出:

# rustc derive.rs && ./derive 
(4, 2) 
Point { x: 4, y: 2 }

错误处理

Rust 的主要目标之一是使开发人员能够编写可靠的软件。这方面的一个重要组成部分是高级错误处理。在本节中, 我们将更深入地了解 Rust 是如何进行错误处理的。但在此之前, 让我们绕道看看某种类型的理论。具体来说, 我们对 代数数据类型 algebraic data types (ADT) 感兴趣, 这些类型是通过组合其他类型而形成的。 两种最常见的 ADT 是 sumproduct 类型。Rust 中的 structproduct 类型的一个示例。此名称源于这样一个事实:给定一个结构,其类型的范围本质上是其每个组件范围的笛卡尔积,因为该类型的一个实例具有其所有组成类型的值。相反, sum 类型是当 ADT 可以假设只有一个它的成分的类型。这方面的一个例子是 Rust 中的枚举。虽然与 C 和其他语言中的枚举类似, 但 Rust 枚举提供了许多增强功能: 它们允许变体携带数据。

现在, 回到错误处理。Rust 要求可能导致错误的操作必须返回带有结果的特殊枚举。方便的是, 这个枚举看起来像这样:


# #![allow(unused_variables)]
#fn main() {
enum Result<T, E> { 
    Ok(T), 
    Err(E), 
}
#}

这两个可能的选项称为变体。在这种情况下, 它们分别表示非错误情况和错误情况。请注意, 这是一般定义的, 因此实现可以在这两种情况下自由定义类型。这在想要扩展标准错误类型并实现自定义错误的应用程序中很有用。让我们看一个操作中的例子:

// code/chapter2/custom-errors.rs

use std::error::Error;
use std::fmt;

#[derive(Debug)]
enum OperationsError {
    DivideByZeroError,
}

impl fmt::Display for OperationsError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            OperationsError::DivideByZeroError => f.write_str("Cannot divide by zero"),
        }
    }
}

impl Error for OperationsError {
    fn description(&self) -> &str {
        match *self {
            OperationsError::DivideByZeroError => "Cannot divide by zero",
        }
    }
}

fn divide(dividend: u32, divisor: u32) -> Result<u32, OperationsError> {
    if divisor == 0u32 {
        Err(OperationsError::DivideByZeroError)
    } else {
        Ok(dividend / divisor)
    }
}

fn main() {
    let result1 = divide(100, 0);
    println!("{:?}", result1);

    let result2 = divide(100, 2);
    println!("{:?}", result2.unwrap());
}

在示例中,我们定义了一个函数,该函数在第一个操作数除以第二个操作数时只返回商,当除数为 0 时,此函数必须处理错误情况。我们还希望它向其调用者发出错误信号(如果是这种情况的话)。另外,让我们假设这是库的一部分,它将扩展到包括更多这样的操作。为了让代码可管理,我们为我们的库创建一个错误类,其中的一个元素表示错误的划分。

为了使 Rust 编译器知道枚举是一种错误类型, 我们的枚举必须实现标准库中的错误特性。它还需要手动实现显示特征。在设置了这个样板之后, 我们可以定义我们的除法方法。我们将利用泛型 Result 特征来注释成功时, 它应返回与操作数相同的类型 u32 。失败时, 它应返回类型为 OperationsError 的错误。 在函数中, 如果我们的除数为零, 我们会引发错误。否则, 我们执行除法, 将结果包装在 "确定" 中, 使其成为 "结果枚举" 的变体, 并将其返回。在我们的主要函数中, 我们用零除数来称呼它。结果将是一个错误, 如第一个打印宏所示。在第二次调用中, 我们知道除数不是零。因此, 我们可以安全地展开结果, 将其从 Ok(50) 转换为50。标准库具有许多实用程序函数来处理结果类型, 安全地向调用者报告错误。

下面是最后一个示例的示例运行示例:

$ rustc custom-errors.rs && ./custom-errors 
Err(DivideByZeroError) 
50

标准库中的一个相关习语是 Option 类型, 如下面的代码段所示。这用于指示操作的可空性, 由 None 变量指示。Some 变体处理的情况下, 它保存类型 T :


# #![allow(unused_variables)]
#fn main() {
pub enum Option<T> {
    None, 
    Some(T), 
}
#}

考虑到这种类型, 我们可以这样写我们的 divide 函数:

// code/chapter2/options.rs

fn divide(dividend: u32, divisor: u32) -> Option<u32> {
    if divisor == 0u32 {
        None
    } else {
        Some(dividend / divisor)
    }
}

fn main() {
    let result1 = divide(100, 0);

    match result1 {
        None => println!("Error occurred"),
        Some(result) => println!("The result is {}", result),
    }

    let result2 = divide(100, 2);
    println!("{:?}", result2.unwrap());
}

我们修改函数返回一个 u32 类型的 Option ,在 main 函数中调用。在这种情况下,我们可以匹配返回类型。如果是 None,我们可以知道它并没有成功。这样的话,我们可以打印错误。如果返回 Some 我们提取基础值并打印出来。第二个调用工作正常, 因为我们知道它没有得到零除数。使用 Option 进行错误处理可以更容易地管理, 因为它涉及的样板较少。但是, 在具有自定义错误类型的库中, 这可能有点难以管理, 因为错误不是由类型系统处理的。

请注意, Option 可以表示为给定类型的 Result 和单位类型。 type Option = Result<T, ()>;

到目前为止, 我们为错误处理所描述的内容已经完成了可恢复的错误。不过, 在某些情况下, 如果发生错误, 中止执行可能是明智的。标准库提供 panic! 宏处理此类情况。调用它将停止当前线程的执行,在屏幕上打印消息,并展开调用堆栈。但是,需要谨慎地使用它,因为在许多情况下,更好的选择是正确地处理错误并将错误冒泡到调用方。

在出现错误时, 许多内置方法和函数调用此宏。让我们看下面的例子:

// code/chapter2/panic.rs

fn parse_int(s: String) -> u64 {
    return s.parse::<u64>().expect("Could not parse as integer");
}

fn main() {
    // works fine
    let _ = parse_int("1".to_owned());

    // panics
    let _ = parse_int("abcd".to_owned());
}

运行失败,并报以下问题:

# ./panic 
thread 'main' panicked at 'Could not parse as integer: ParseIntError { kind: InvalidDigit }', src/libcore/result.rs:906:4 note: Run with `RUST_BACKTRACE=1` for a backtrace.

一些引起恐慌的方法是 expect()unwrap()

Rust 支持多年来发展了相当多的宏系统。Rust 宏的一个显著特点是, 保证它们不会意外引用其范围之外的标识符, 因此 rust 中的宏实现是“卫生”的。正如人们所期望的那样, Rust 宏在编译之前被扩展到源代码, 并与翻译单元一起编译。编译器对扩展的宏强制执行范围规则,以使其保持卫生。Rust宏不同于其他构造,因为它们总是以感叹号结尾 !

现代 Rust 有两种使用宏的方法;较旧的语法方法和较新的程序宏方式。让我们来看看这些:

语法宏

pre-1.0 发布以来, 宏系统一直作为 Rust 的一部分存在。这些宏是使用 macro_rules! 宏定义的。让我们看一个例子:

// code/chapter2/syntactic-macro.rs

macro_rules! factorial {
    ($x:expr) => {{
        let mut result = 1;
        for i in 1..($x + 1) {
            result = result * i;
        }
        result
    }};
}

fn main() {
    let arg = std::env::args()
        .nth(1)
        .expect("Please provide only one argument");

    println!(
        "{:?}",
        factorial!(arg.parse::<u64>().expect("Could not parse to an integer"))
    );
}

我们从定义阶乘宏开始。由于我们不希望编译器拒绝编译我们的代码, 因为它可能会溢出宏堆栈, 我们将使用非递归实现。Rust中的语法宏是一组规则,其中左侧指示规则应如何与输入匹配,右侧指示规则应扩展到什么。规则通过 => 运算符映射到右侧的表达式。使用 $ 符号声明规则的局部变量。匹配规则是用一种特殊的宏语言来表示的,它有自己的一组保留关键字。 我们的声明说,我们希望采用任何有效的 rust 表达式;在这种特定情况下,它的值应该是整数。我们会把它留给调用者,以确保这是真的。然后,我们在累积结果的同时循环从1到范围内的最后一个整数。完成后,我们使用隐式返回语法返回结果。

我们的调用方是 main 函数, 因为我们使用 std::env 模块从用户处获取输入。如果没有输入, 我们将获取输入列表中的第一个, 并引发错误。然后, 我们打印宏的结果, 并尝试将输入解析为 u64 , 然后再传递给它。我们还处理分析可能失败的情况。

# rustc syntactic-macro.rs && ./syntactic-macro 5 
120

Rust 还提供了一些用于调试宏的工具。人们可能有兴趣看看展开的宏是什么样子。trace_macros! 宏正是这样做的。为了使其工作, 我们将需要启用一个功能门, 如下面的代码段所示 (因为它在 Rust 中还不稳定, 这个代码将只在 Rust nightly 有效):

#![feature(trace_macros)] 
trace_macros!(true);

请注意, 扩展还包括 println! 因为它是标准库中定义的宏。

同样的扩展也可以使用以下命令来调用编译器: rustc -Z unstable-options --pretty expanded syntacticmacro.rs.

过程宏

虽然常规语法宏在许多情况下都很有用, 但某些应用程序需要高级代码生成功能, 而这些功能最好使用编译器操作的 AST 来完成。因此, 有必要扩大宏系统, 将其包括在内。后来决定, 旧的宏系统和这个称为 过程宏 的新系统将共存。随着时间的推移, 这是为了取代语法宏系统。编译器支持从外部 crates 加载插件;这些插件可以在编译器生成后接收AST。有可用的API来修改AST,以根据需要添加新代码。对这个系统的详细讨论超出了本书的范围。

Rust 函数特性

Rust受到 Haskell 和 OCaml 等功能语言的启发。不出意外,Rust 在语言和标准库中都支持函数式编程。在本节中,我们将介绍其中的一些内容。

高阶函数

我们之前已经看到了 Rust 函数如何定义一个孤立的范围,其中所有局部变量都存在。因此,范围之外的变量永远不会泄漏到它中云,除非,它们作为参数显式传递。在某些情况下,这可能不是一个理想的行为。闭包提供类似于匿名函数的机制,该机制可以访问在其定义范围内定义的所有资源。这使得编译器,可以强制执行相同的借用检查规则,同时使代码重用更容易。在 Rust 术语中,典型的闭包借用了其周围范围的所有绑定。可以通过使用move关键字标记闭包来强制关闭它们。 我们来看一些例子:

// closure-borrow.rs


fn main() {
    // closure with two arameters
    let add = |a, b| a + b;
    assert_eq!(add(2, 3), 5);

    // common use ecases are on iterators
    println!("{:?}", (1..10).filter(|x| x % 2 == 0).collect::<Vec<u32>>());

    // using a variable from enclosing scope
    let times = 2;
    println!("{:?}", (1..10).map(|x| x * times).collect::<Vec<i32>>());

}

第一个例子是一个简单的闭包,它添加了两个给它的数字。第二个例子涉及更多,它显示了函数式编程闭包的真实示例。 我们过滤整数列表,只收集其中的偶数。因此,我们从 1 到 10, 返回内置类型 Range 的实例。由于该类型实现了 IntoIterator 特征,因此该类型表现为迭代器。因此,我们可以通过传递一个闭包来过滤它,只有当输入可以除以 2 时才返回 true 。 最后,我们将得到的迭代器收集到 u32 的向量中并将其打印出来。最后一个例子在结构上类似。它从闭包的封闭范围借用变量 times,并使用它来映射到范围的项目。

让我们看看使用关键字 move 的示例:

// code/chapter2/closure-move.rs

fn main() {
    let mut times = 2;
    {
        // This is in a new scope
        let mut borrow = |x| times += x;
        borrow(5);
    }

    assert_eq!(times, 7);

    let mut own = move |x| times += x;

    own(5);
    assert_eq!(times, 7);

}

第一个闭包(borrow)和我们迄今为止讨论的那个之间的区别在于,这会改变它从封闭作用域继承的变量。我们必须将变量和闭包声明为 mut 。我们还需要将闭包放在不同的作用域,以便编译器在我们尝试断言其值时不会抱怨双重借用。断言,闭包 borrow 从其父作用域借用变量,这就是它的原始值变为7的原因。第二个名为 own 的闭包是一个移动闭包,因此它获得变量 times 的副本。为此,变量必须实现 Copy trait,以便编译器可以将其复制到闭包,所有内置类型都可以。由于闭包获得的变量和原始变量不相同,编译器不会抱怨它两次借用它。此外,变量的原始值不会更改。这些类型的闭包在实现多线程时非常重要,我们将在后面的部分中看到。标准库还支持使用许多内置特征接受和返回用户定义函数或方法中的闭包。

Trait name Function
std::ops::Fn 由不接收可变捕获变量的闭包实现。
std::ops::FnMut 由需要改变捕获变量的闭包实现。
std::ops::FnOnce 由所有闭包实现。表示可以只调用一次闭包。

迭代器

另一个重要的功能方面是 lazy iteration 延迟迭代。给定一组类型,应该能够以任何给定的顺序循环这些或子集。在 Rust 中,常见的迭代器是一个具有开始和结束的范围。让我们看看它们是如何工作的:

// code/chapter2/range.rs

#![feature(inclusive_range_syntax)]

fn main() {
    let numbers = 1..5;
    for number in numbers {
        println!("{}", number);
    }

    println!("---------------");
    let inclusive = 1..=5;
    for number in inclusive {
        println!("{}", number);
    }
}

第一个 range 是一个独特的范围,从第一个元素到最后一个元素,不包含最后一个。第二个范围是包含范围,直到最后一个元素。请注意,包含范围是一个可能在将来发生变化的实验性功能。

正如我们所期望,Rust 确实提供了可以迭代用户定义类型的接口。该类型只需要实现 trait std::iterator::Iterator。我们来看一个例子。我们生成 考拉兹 Collatz 序列(https//en.wikipedia.org/wiki/Collatz_conjecture),给定一个整数。给定一个整数,这由下面的递归关系给出:

  • 如果是偶数,则除以 2
  • 如果是奇数,则将其乘以3并加1

根据猜想,这个序列将始终终止于 1 .我们将假设这是真的并定义我们的代码:

// code/chapter2/collatz.rs

// This struct holds state while iterating
//
struct Collatz {
    current: u64,
    end: u64,
}

impl Iterator for Collatz {
    type Item = u64;

    fn next(&mut self) -> Option<u64> {
        if self.current % 2 == 0 {
            self.current = self.current / 2;
        } else {
            self.current = 3 * self.current + 1;
        }

        if self.current == self.end {
            None
        } else {
            Some(self.current)
        }
    }
}

// Utility function to start iteration
fn collatz(start: u64) -> Collatz {
    Collatz {
        current: start,
        end: 1u64,
    }
}

fn main() {
    let input = 10;

    // First 2 items
    for n in collatz(input).take(2) {
        println!("{}", n);
    }

    // Dropping first 2 items
    for n in collatz(input).skip(2) {
        println!("{}", n);
    }
}

在我们的代码中,当前迭代的状态由 Collatz 结构体表示,我们在其上面实现 Iterator 协议。为此,我们需要实现 next 函数,它接收当前状态,并产生下一个状态。当它到达结束状态时,必须返回 None ,以便调用者知道迭代器已经用尽。这由函数的可空返回值表示。考虑到重复性,实现是简单的。在我们的 main 函数中,我们实例化了初始状态,并且可以使用 for 循环进行迭代。迭代器特性自动实现许多有用的函数;take 函数从迭代器中获取给定数量的元素,而 skip 函数跳过给定数量的元素。所有这些对于使用不可重复的集合都非常重要。

以下是我们示例运行的输出:

$ rustc collatz.rs && ./collatz 
5 
16 
8 
4 
2

简单的并发

Rust 的一个承诺是实现无畏并发。很自然的,Rust支持通过许多机制编写并发代码。在本章中,我们将讨论其中的一些。我们已经看到Rust编译器如何使用借用检查来确保编译时程序的正确性。事实证明,这些原语在验证并发代码的正确性方面也很有用。现在,有多种方法可以在一种语言中实现线程。最简单的方法是为平台中创建的每个线程创建一个新的 OS 线程。 这通常称为 1:1 线程。另一方面,许多应用程序线程可以映射到一个OS 线程。这称为 N:1 线程。虽然这种方法很轻松,因为我们最终实际线程较少,但上下文切换的开销较高。中间层称为 M:N 线程,其中多个应用程序线程映射到多个OS级别线程。 这种方法需要最大程度的安全保护,并使用运行时实现,这是 Rust 避免的。因此,Rust使用 1:1 模型。与 Go 等语言相比,Rust中的一个线程对应一个 OS 线程。让我们先看看 Rust 如何编写多线程应用程序:

// code/chapter2/threads.rs

use std::thread;

fn main() {
    for i in 1..10 {
        let handle = thread::spawn(move || {
            println!("Hello from thread number {}", i);
        });
        let _ = handle.join();
    }
}

我们引入线程。在 main 函数中,我们创建一个空向量,用它来存储创建的线程的引用,以便我们可以等待它们退出。线程实际际上是使用 thread::spawn 创建的,我们必须传递一个将每个线程中执行的闭包。因为我们必须在闭包中从封闭作用域(循环索引i)中借用一个变量,所以闭包本身必须是一个 move 闭包。在退出闭包之前,我们调用当前线程句柄的连接,以便所有线程彼此等待。这会产生以下输出:

# rustc threads.rs && ./threads 
Hello from thread number 1 
Hello from thread number 2 
Hello from thread number 3 
Hello from thread number 4 
Hello from thread number 5 
Hello from thread number 6 
Hello from thread number 7 
Hello from thread number 8 
Hello from thread number 9

多线程应用程序的真正强大之处在于线程可以合作进行有意义的工作。为此,有两件重要的事情是必要的。线程需要能够从彼此传递数据,并且应该有方法来协调线程的调度方式,以便它们不会相互跨越。对于第一个问题,Rust提供了一条消息,通过通道传递机制。我们来看下面的例子:

// code/chapter2/channels.rs
use std::sync::mpsc;
use std::thread;

fn main() {
    let rhs = vec![10, 20, 30, 40, 50, 60, 70];
    let lhs = vec![1, 2, 3, 4, 5, 6, 7];
    let (tx, rx) = mpsc::channel();

    assert_eq!(rhs.len(), lhs.len());
    for i in 1..rhs.len() {
        let rhs = rhs.clone();
        let lhs = lhs.clone();
        let tx = tx.clone();
        let handle = thread::spawn(move || {
            let s = format!(
                "Thread {} added {} and {}, result {}",
                i,
                rhs[i],
                lhs[i],
                rhs[i] + lhs[i]
            );
            tx.clone().send(s).unwrap();
        });

        let _ = handle.join().unwrap();
    }

    drop(tx);

    for result in rx {
        println!("{}", result);
    }
}

这个例子很像前一个例子。我们导入必要的模块以便能够使用频道。我们定义了两个向量,我们将为两个向量中的每对元素创建一个线程,以便我们可以添加它们并返回结果。我们创建了通道,它将句柄返回给发送端和接收端。作为安全检查,我们确保两个矢量确实具有相同的长度。然后,我们继续创建我们的线程。由于我们需要在这里访问外部变量,因此线程需要像上一个示例那样接受移动闭包。此外,编译器将尝试使用 Copy trait 将这些变量复制到线程。在这种情况下,由于矢量类型没有实现Copy,因此会失败。

我们需要显式 clone 资源,以便不需要复制它们。我们运行计算并将结果发送到管道的发送端。后来,我们加入了所有的主题。在我们遍历接收端并打印结果之前,我们需要显式删除对发送端的原始句柄的引用,以便在我们开始接收之前销毁所有发送者(当线程退出时,克隆的发送者将被自动销毁) 。这将按预期打印以下内容:

# rustc channels.rs && ./channels 
Thread 1 added 20 and 2, result 22 
Thread 2 added 30 and 3, result 33 
Thread 3 added 40 and 4, result 44 
Thread 4 added 50 and 5, result 55 
Thread 5 added 60 and 6, result 66 
Thread 6 added 70 and 7, result 77

另请注意,mpsc代表多个生产者单一消费者。

在处理多个线程时,另一个常见的习惯用法是在所有线程之间共享一个公共状态。然而,在许多情况下,这可能是一罐蠕虫。调用者需要仔细设置排除机制,以便以 race-free 无种族的方式共享状态。幸运的是,借阅检查器可以帮助确保这更容易。Rust有许多智能指针用于处理共享状态。该库还提供了一个通用的互斥类型,可以在处理多个线程时用作锁。但也许最重要的是发送和同步特性。实现发送特性的任何类型都可以在多个线程之间安全地共享。同步特性表示多个线程对给定数据的访问是安全的。关于这些特征有一些规则:

  • 所有内置类型都实现 SendSync ,除了任何 unsafe 的东西,一些智能指针类型,如 Rc<T>UnsafeCell<T>
  • 复合类型将自动实现两者,只要它没有任何不实现 SendSync 的类型

std::sync 包有很多类型和帮助器来处理并行代码。

在上一段中,我们提到了不安全的 Rust 。让我们绕道走,再仔细看看。Rust 编译器通过使用一个健壮的类型系统为安全编程提供了一些强有力的保证。然而,在某些情况下,这些可能会成为更大的开销。为了处理这种情况,语言提供了一种选择退出这些保证的方法。用不安全关键字标记的代码块可以做 Rust 可以做的所有事情,以及以下内容:

  • 取消引用原始指针类型(*mut 或者 *const T)
  • 调用不安全的函数或者方法
  • 实现标记为不安全的特性。
  • 改变一个静态变量

让我们看一个使用 unsafe 代码块取消引用指针的示例:

// code/chapter2/unsafe.rs

fn main() {
    let num: u32 = 43;
    let p: *const u32 = &num;

    unsafe {
        assert_eq!(*p, num);
    }
}

在这里,我们创建一个变量和一个指向它的指针;如果我们尝试在不使用 unsafe 块的情况下取消引用指针,编译器将拒绝编译。在不安全的块内,我们在取消引用时返回原始值。虽然不安全的代码可能很危险,但它在低级编程(如内核(RedoxOS)和嵌入式系统)中非常有用。

测试

Rust将测试视为一流的构造;生态系统中的所有工具都支持测试。编译器提供了一个内置的配置属性,用于指定要测试的模块。还有一个将函数指定为测试的测试属性。当 Cargo 从头开始生成一个项目时,它建立了这个样板文件。让我们来看一个示例项目;我们将其称为阶乘。它将导出一个宏,该宏计算给定整数的阶乘。因为我们以前已经方便地编写了这样一个宏,所以我们将在这里重新使用该代码。请注意,由于此 crate 将用作库,因此它没有 main 函数:

# cargo new factorial --lib 
# cargo test
    Compiling factorial v0.1.0 (file:///Users/Abhishek/Desktop/rustbook/src/ch2/factorial)
     Finished dev [unoptimized + debuginfo] target(s) in 1.6 secs
      Running target/debug/deps/factorial-364286f171614349 
      
running 1 test 
test tests::it_works ... ok 

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

    Doc-tests factorial 
    
running 0 tests 

test result: ok. 0 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

运行 cargo test 运行 Cargo 为我们生成的存根测试。我们将把 factorial 宏的代码复制到 lib.rs ,它们将如下所示:


# #![allow(unused_variables)]
#fn main() {
// chapter2/factorial/src/lib.rs

#[allow(unused_macros)] 
#[macro_export] 
macro_rules! factorial {
    ($x:expr) => { 
        { 
            let mut result = 1; 
            for i in 1..($x+1) { 
                result = result * i; 
            } 
            result 
        }
    };
}

#[cfg(test)] 
mod tests { 
    #[test] 
    fn test_factorial() { 
        assert_eq!(factorial!(5), 120); 
    }     
}
#}

我们还添加了一个测试,以确保 factorial 实际上像我们所期望的那样工作。#[macro_export] 属性告诉编译器这个宏将在 crate 外部使用。编译器内置的 assert_eq! 宏检查两个参数是否确实相等。 我们还需要放置 #[allow(unused_macros)] 属性,因为如果没有它,编译器会抱怨宏未在非测试代码中使用。如果我们再添加一个这样的测试:


# #![allow(unused_variables)]
#fn main() {
#[test] 
fn test_factorial_fail() { 
    assert_eq!(factorial!(5), 121); 
}
#}

这显然是错误的,正如预期的那样,失败并给我们一个描述性的错误。编译器还支持名为 #[should_panic] 的属性,该属性标记应该发生恐慌的测试。 在这种情况下,只有在出现恐慌时才会通过测试。另一种编写测试的方法是在 Cargo 调用中运行的文档中。

这是一个非常重要的工具,在用工作示例记录代码时,可以保证随着代码库的发展而工作。让我们继续为阶乘宏添加一些“文档测试” :

// chapter2/factorial/src/lib.rs

/// The factorial crate provides a macro to compute factorial of a given 
/// integer /// # Examples
///
/// ``` 
/// # #[macro_use] extern crate factorial; 
/// # fn main() { 
/// assert_eq!(factorial!(0), 1); 
/// assert_eq!(factorial!(6), 720);
/// # } 
/// ``` 
///

#[macro_export] macro_rules! factorial {
    ($x:expr) => { 
        { 
            let mut result = 1; 
            for i in 1..($x+1) { 
                result = result * i; 
            } 
            result 
        }
    };
}

宏的 doctests 与其他所有文档的 doctests 有点不同,其方式如下:

  • 他们必须使用 #[macro_use] 属性来标记此处正在使用宏。请注意,依赖于导出宏的 crate 的外部 crate 也必须使用该属性
  • 它们必须定义 main 函数,并在 doctests 中包含一个 extern crate 指令。对于其他一切,编译器根据需要生成主函数。额外的 # 标记将这些标记隐藏在生成的文档中。

一般来说,测试模块、doctests和 #[test] 属性只能用于单元测试。集成测试应该放在顶级测试目录中。

Rust 团队正在努力增加对测试系统中运行基准点的支持。目前只在夜间提供。

总结

本章是 Rust 语言和生态系统的简短介绍。鉴于 Rust 的这个背景,让我们来看一个常见问题:公司是否应该采用 Rust ?像工程中的很多东西一样,正确的答案是它取决于很多因素。 采用 Rust 的主要原因之一是能够以尽可能少的占用空间编写健壮的代码。因此,Rust 适用于针对嵌入式设备的项目。该区域传统上使用 assembly,C 和 C ++。 Rust 可以在确保代码正确性的同时提供相同的性能保证。 Rust 也适用于从 Python 或 Ruby 卸载性能密集型计算。 Rust 的主要痛点是学习曲线可能很陡峭。因此,试图采用 Rust 的团队可能会花费大量时间与编译器斗争,尝试运行代码。然而,这随着时间的推移而缓和。幸运的是,编译器错误消息通常非常有用。 2017 年,Rust 团队决定将人体工程学作为首要任务。这一推动使新开发人员的入职变得更加容易。 对于大型 Rust 项目,编译时间可能大于 C,C ++ 或 Go。这可能会成为一些团队的问题。有几种方法可以解决此问题,其中之一是增量编译。因此,难以达到一种适合所有解决方案。希望这个简短的介绍将有助于决定是否在新项目中选择 Rust。

在下一章中,我们将通过研究Rust如何处理网络中两个主机之间的 TCP 和UDP 连接来构建本文所研究的内容。

第三章:Rust 操作 TCP 和 UDP

作为一种系统编程语言,Rust 标准库支持与网络堆栈交互。所有与网络相关的功能都位于 std::net 中;对套接字的读写也使用 std::io 的读写特性。这里的一些最重要的结构是 Ipaddr ,它表示可以是 v4 或 v6 的通用IP地址,SocketAddr ,它表示通用套接字地址(主机上的IP和端口的组合),TcplistenerTcpstream 用于通过 TCP 通信,UdpSocket 用于 UDP 等等。目前,标准库不提供任何 API 来处理较低级别的网络堆栈。 虽然这在未来可能会改变,但许多 crate 填补了这一空白。其中最重要的是 libpnet ,它为低级网络提供了一组 API。

关于网络的其他一些重要的 crate 是 net2socket2 。这些是用于可能被移动到标准库的API的孵化器。这里的一些功能被认为是有用且足够稳定的,可以移植到 Rust 核心仓库。不幸的是,这在所有情况下都没有按计划进行。 总的来说,社区现在建议使用 tokio 生态系统来编写高性能网络应用程序,这些应用程序不需要对套接字语义进行细粒度控制。请注意,tokio 不在本章的范围内,我们将在后面的章节中介绍它。

在本章中,我们将讨论以下主题:

  • 简单的多线程TCP客户机和服务器在Rust中是什么样子的?
  • 编写简单的多线程UDP客户端和服务器
  • std::net 中的许多功能
  • 学习使用 net2ipnetwork, 和 libpnet

为了简单起见,本章中的所有代码只处理 IPv4 。将给定的示例扩展到 IPv6 应该是微不足道的。

一个简单的 TCP 服务器和客户端

大多数网络示例都以 echo 服务器开头。那么,让我们继续在 Rust 中编写一个基本的 echo 服务器,看看所有部分是如何组合在一起的。我们将使用标准库中的线程模型来并行处理多个客户端。代码如下:

// code/chapter3/tcp-echo-server.rs

use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;

// Handles a single client
fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
    println!("Incoming connection from: {}", stream.peer_addr()?);

    let mut buf = [0; 512];
    loop {
        let bytes_read = stream.read(&mut buf)?;
        if bytes_read == 0 {
            return Ok(());
        }
        stream.write(&buf[..bytes_read])?;
    }
}

fn main() {
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");

    for stream in listener.incoming() {
        match stream {
            Err(e) => eprintln!("failed: {}", e),
            Ok(stream) => {
                thread::spawn(move || {
                    handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
            }
        }
    }
}

main 中,我们创建一个新的 Tcplistener,它在rust中表示一个tcp套接字,用于监听来自客户机的传入连接。在我们的示例中,我们已经对本地地址和端口进行了硬编码;本地地址设置为 0.0.0.0 会告诉内核将这个套接字绑定到这个主机上所有可用的接口上。 在这里设置一个众所周知的端口很重要,因为我们需要知道从客户机连接。在实际应用程序中,这应该是从 CLI 或配置文件中获取的可配置参数。我们调用本地IP和端口对上的 bind 来创建本地监听套接字。 如前所述,我们给定的 IP 选择将把这个套接字绑定到主机上端口 8888 上的所有可用接口。因此,任何能够连接到该主机的网络的客户端都可以与该主机对话。 正如我们在上一章中看到的,如果没有错误,expect 函数将返回监听器。如果不是这样,它会对给定的消息产生恐慌。 在这里,对未能绑定到端口的恐慌实际上是可以的,因为如果失败了,服务器将无法继续工作。listener 上的 incoming 方法返回已连接到服务器的流上的迭代器。我们遍历它们并检查是否有任何错误。在这种情况下,我们可以打印错误并转到下一个连接的客户端。请注意,在这种情况下恐慌是不合适的,因为如果某些客户端由于某种原因遇到错误,服务器可以正常运行。

现在,我们必须在无限循环中从每个客户端读取数据。但是在主线程中运行无限循环将阻止它,并且没有其他客户端能够连接。 这种行为绝对不可取。因此,我们必须生成一个工作线程来处理每个客户端连接。 从每个流中读取并将其写回的逻辑封装在名为 handle_client 的函数中。每个线程都会收到一个调用此函数的闭包。 此闭包必须是 move 闭包,因为它必须从封闭的作用域读取变量(stream)。 在该函数中,我们打印远程端点地址和端口,然后定义一个临时保存数据的缓冲区。我们还确保将缓冲区清零。然后我们运行一个无限循环,在其中我们读取流中的所有数据。流中的 read 方法返回它已读取的数据的长度。它可以在两种情况下返回零,如果它已到达流的末尾或者给定缓冲区的长度为零。我们肯定知道第二种情况并非如此。因此,当read 方法返回零时,我们会跳出循环(和函数)。在这种情况下,我们返回一个 Ok() 。然后,我们使用切片语法将相同的数据写回流。请注意,我们已经使用了eprintln! 输出错误。此宏将给定字符串写入标准错误,并且最近已稳定。

你可能会注意到在读取和写入流时明显缺乏错误处理。但事实并非如此。我们用过 ? 运算符来处理这些调用中的错误。如果一切正常,该操作员将结果解包为 Ok ;否则,它会将错误提前返回给调用函数。给定此设置,函数的返回类型必须是空类型,以处理成功案例,或者 io::Error 类型,以处理错误情况。 请注意,在这种情况下实现自定义错误并返回这些错误而不是内置错误可能是个好主意。还要注意的是由于 main 函数不返回 Result,因此当前不能在 main 函数中使用 ? 操作。

Rust 最近接受了一个 RFC ,它建议在 main 函数中使用 ? 操作符。

从终端与服务器交互很容易。当我们在 Linux 机器上运行服务器,在另一个终端上运行 nc 时,输入到 nc 的任何文本都应该被回送。注意,如果客户机和服务器运行在同一个节点上,我们可以使用 127.0.0.1 作为服务器地址。

$ nc <server ip> 8888 
test 
test 
foobar 
foobar 
foobarbaz 
foobarbaz 
^C
// code/chapter3/tcp-client.rs

use std::io::{self, BufRead, BufReader, Write};
use std::net::TcpStream;
use std::str;

fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8888").expect("Could not connect to server");
    loop {
        let mut input = String::new();
        let mut buffer: Vec<u8> = Vec::new();
        io::stdin()
            .read_line(&mut input)
            .expect("Failed to read from stdin");
        stream
            .write(input.as_bytes())
            .expect("Failed to write to server");

        let mut reader = BufReader::new(&stream);

        reader
            .read_until(b'\n', &mut buffer)
            .expect("Could not read into buffer");
        print!(
            "{}",
            str::from_utf8(&buffer).expect("Could not write buffer as string")
        );
    }
}

在这种情况下,我们首先导入所有必需的库。然后,我们使用 TcpStrem::connect 建立与服务器的连接,该连接将远程端点地址作为字符串接收。 与所有 TCP 连接一样,客户端需要知道要连接的远程 IP 和端口。 如果设置连接失败,我们将使用错误消息中止我们的程序。 然后我们开始一个无限循环,在其中我们将初始化一个空字符串以在本地读取用户输入,并使用 u8 向量来读取来自服务器的响应。 由于 Rust 中的向量会根据需要增长,因此我们不需要在每次迭代时手动对数据进行分块。 read_line 函数从标准输入读取一行并将其存储在名为 input 的变量中。 然后,它作为字节流写入连接。此时,如果一切按预期工作,服务器应该已发回响应。我们将使用 BufReader 读取它,该 BufReader 负责内部分块数据。这也使得读取更有效,因为不会有超过必要的系统调用。 read_until 方法读取缓冲区中的数据,缓冲区根据需要增长。最后,我们可以将缓冲区打印为字符串,该字符串已使用from_utf8 方法进行转换。

运行客户端很容易,正如预期的那样,行为与nc 完全相同:

$ rustc tcp-client.rs && ./tcp-client 
test 
test 
foobar 
foobar 
foobarbaz 
foobarbaz 
^C

真实世界的应用程序通常比这更复杂。在回送响应之前,服务器可能需要一些时间来处理输入。 让我们通过在 handle_client 函数中随机休眠来模拟它; 主要功能将与前一个示例完全相同。第一步使用 cargo 创建一个项目:

$ cargo new --bin tcp-echo-random

注意,我们将添加 randCargo.toml 中,如下面代码段所示:

[package] 
name = "tcp-echo-random" 
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
rand = "0.3.17"

设置了依赖项之后,让我们修改 handle_client 函数,以便在发送响应之前暂停一段时间:

// chapter3/tcp-echo-random/src/main.rs

extern crate rand;
use rand::{thread_rng, seq::SliceRandom};
use std::io::{Error, Read, Write};
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::time::Duration;

fn handle_client(mut stream: TcpStream) -> Result<(), Error> {
    let mut buf = [0; 512];
    loop {
        let byte_read = stream.read(&mut buf)?;
        if byte_read == 0 {
            return Ok(());
        }
        let choices = [0, 1, 2, 3, 4, 5];
        let mut rng = thread_rng();
        let sec = choices.choose(&mut rng);
        let sleep = Duration::from_secs(*sec.unwrap());
        println!("Sleeping for {:?} before replay", sleep);
        std::thread::sleep(sleep);
        stream.write(&buf[..byte_read])?;
    }
}

fn main() {
    let listener = TcpListener::bind("127.0.0.1:8888").expect("Could not bind");
    for stream in listener.incoming() {
        match stream {
            Err(e) => eprintln!("failed: {}", e),
            Ok(stream) => {
                thread::spawn(move || {
                    handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
            }
        }
    }
}

main 函数中,我们必须声明依赖 rand ,并且使用 extern crate .我们使用 thread_rng 函数支选择一个 0 到 5 的随机整数,然后调用 std::thread::sleep 休眠。在客户端,我们将设置读取和连接超时,因为回复不会立即从服务器回复:

// code/chapter3/tcp-client-timeout.rs

use std::io::{self, BufRead, BufReader, Write};
use std::net::SocketAddr;
use std::net::TcpStream;
use std::str;
use std::time::Duration;

fn main() {
    let remote: SocketAddr = "127.0.0.1:8888".parse().unwrap();
    let mut stream = TcpStream::connect_timeout(&remote, Duration::from_secs(1))
        .expect("Could not connect to server");
    stream
        .set_read_timeout(Some(Duration::from_secs(3)))
        .expect("Could not set a read timeout");
    loop {
        let mut input = String::new();
        let mut buffer: Vec<u8> = Vec::new();
        io::stdin()
            .read_line(&mut input)
            .expect("Failed to read from stdin");
        stream
            .write(input.as_bytes())
            .expect("Failed to write to server");
        let mut reader = BufReader::new(&stream);
        reader
            .read_until(b'\n', &mut buffer)
            .expect("Could not read into buffer");

        print!(
            "{}",
            str::from_utf8(&buffer).expect("Could not write buffer as string")
        );
    }
}

在这里,我们使用 set_read_timeout 将超时设置为三秒。因此,如果服务器睡眠时间超过三秒,客户端将中止连接。 函数很奇怪,因为它使用 Option<Duration> 可以指定 DurationNone 。因此,在传递给此函数之前,我们需要将 Duration 包含在 Some 中。现在,如果我们打开两个会话,使用 cargo 运行服务端 和另一个运行客户器,我们将看到这一点; 服务器打印它接受的每个客户端的睡眠时间。

$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
      Running `target/debug/tcp-echo-random`
Sleeping for Duration { secs: 2, nanos: 0 } before replying 
Sleeping for Duration { secs: 1, nanos: 0 } before replying 
Sleeping for Duration { secs: 1, nanos: 0 } before replying 
Sleeping for Duration { secs: 5, nanos: 0 } before replying

在客户端,我们有一个单文件(不是 Cargo 项目),我们将使用 rustc 构建并在编译后直接运行可执行文件。

$ rustc tcp-client-timeout.rs && ./tcp-client-timeout 
test 
test 
foo 
foo 
bar 
bar 
baz
thread 'main' panicked at 'Could not read into buffer: Error { repr: Os { code: 35, message: "Resource temporarily unavailable" } }', src/libcore/result.rs:906:4
note: Run with `RUST_BACKTRACE=1` for a backtrace.

对于前三个输入,服务器选择的延迟小于三秒。客户端在三秒钟内收到响应并且没有中止连接。 对于最后一条消息,延迟是五秒,这导致客户端中止读取。

一个简单的 UDP 服务器和客户端

UDP 服务器和我们之前写的 TCP 服务器之间存在一些语义差异。 与 TCP 不同,UDP 没有流结构。这源于两个协议之间的语义差异。我们来看看 UDP 服务器的是什么:

// code/chapter3/udp-echo-server.rs

use std::net::UdpSocket;
use std::thread;

fn main() {
   let socket = UdpSocket::bind("0.0.0.0:8888").expect("Could not bind socket");

   loop {
       let mut buf = [0u8; 1500];
       let sock = socket.try_clone().expect("Failed to clone socket");

       match socket.recv_from(&mut buf) {
           Ok((_, src)) => {
               thread::spawn(move || {
                   println!("Handling connection from {}", src);
                   sock.send_to(&buf, &src).expect("Failed to send response");
               });
           }
           Err(e) => eprintln!("couldn't recieve a datagram: {}", e),
       }
   }
}

与 TCP 一样,我们从绑定到给定端口上的本地地址开始,我们处理绑定可能失败的可能性。由于 UDP 是无连接协议,因此我们不需要使用滑动窗口来读取所有数据。因此,我们可以只分配给定大小的静态缓冲区。动态检测底层网卡的 MTU 并将缓冲区大小设置为更好的想法,因为这是每个 UDP 数据包可以具有的最大大小。但是,由于普通 LAN 的 MTU 大约为1,500,因此我们可以在这里分配这个大小的缓冲区。 try_clone 方法克隆给定的套接字并返回一个新的套接字,该套接字被移入闭包中。

然后我们从套接字读取,它返回数据读取的长度和 Ok() 情况下的源。然后我们生成一个新线程,在其中我们将相同的缓冲区写回给定的套接字。对于任何可能失败的事情,我们需要像处理 TCP 服务器一样处理错误。

与此服务器交互与上次使用 nc 完全相同。唯一的区别是,在这种情况下,我们需要传递 -u 来强制 nc 使其仅使用 UDP 。看一下下面的例子:

$ nc -u 127.0.0.1 8888 
test 
test 
test 
test 
^C

现在,让我们编写一个简单的 UDP 客户端来实现相同的结果。正如我们将看到的,TCP 服务器与此之间存在一些细微差别:

// code/chapter3/udp-client.rs

use std::net::UdpSocket;
use std::{io, str};

fn main() {
    let socket = UdpSocket::bind("127.0.0.1:8000").expect("Could not bind client socket");
    socket
        .connect("127.0.0.1:8888")
        .expect("Could not connect to server");

    loop {
        let mut input = String::new();
        let mut buffer = [0u8; 150000];
        io::stdin().read_line(&mut input).expect("Failed to read ");
        socket
            .send(input.as_bytes())
            .expect("Failed to write to server");
        socket
            .recv_from(&mut buffer)
            .expect("Could not read into buffer");
        print!(
            "{}",
            str::from_utf8(&buffer).expect("Could not write buffer as string")
        );
    }
}

这个基本客户端和我们在上一节中看到的 TCP 客户端之间存在重大差异。在这种情况下,在连接到服务器之前首先绑定到客户端套接字是绝对必要的。 完成后,示例的其余部分基本相同。在客户端和服务器端运行它会产生类似于 TCP 情况的类似结果。这是服务器端的会话:

$ rustc udp-echo-server.rs && ./udp-echo-server 
Handling connection from 127.0.0.1:8000 
Handling connection from 127.0.0.1:8000 
Handling connection from 127.0.0.1:8000 
^C

客户端:

$ rustc udp-client.rs && ./udp-client 
test 
test 
foo 
foo 
bar
bar 
^C 

UDP 组播

UdpSocket 中有许多 TCP 没有的方法。其中最有趣的是多播和广播。让我们用一个示例看一下多播如何在服务器和客户端工作的。 对于此示例,我们将客户端和服务器组合在一个文件中。在 main函数中,我们将检查是否已传递CLI参数。如果有,我们将运行客户端;否则,我们将运行服务器。请注意,不会使用参数的值;它将被视为一个布尔值:

// code/chapter3/udp-multicast.rs


use std::{env, str};
use std::net::{UdpSocket, Ipv4Addr};

fn main() {
    let mcast_group: Ipv4Addr = "233.0.0.1".parse().unwrap();
    let port: u16 = 6000;
    let any = "0.0.0.0".parse().unwrap();
    let mut buffer = [0u8; 1600];
    if env::args().count() > 1 {
        // client case
        let socket = UdpSocket::bind((any, port)).expect("Could not bind client socket");
        socket.join_multicast_v4(&mcast_group, &any)
            .expect("Could not join multicast group");
        socket.recv_from(&mut buffer).expect("Failed to write to server");
        print!("{}", str::from_utf8(&buffer).expect("Could not write buffer as string"));

    } else {
        // server case
        let socket = UdpSocket::bind((any, 0))
            .expect("Could not write buffer as string");
        socket.send_to("Hello, world!".as_bytes(), &(mcast_group, port)).expect("Failed to write data");
    }
}

这里的客户端和服务器部分大致类似于我们之前讨论过的内容。一个区别是 join_multicast_v4 调用使当前套接字加入一个传递了地址的多播组。对于服务器和客户端,我们在绑定时不指定单个地址。 相反,我们使用表示任何可用地址的特殊地址 0.0.0.0 。这相当于将 INADDR_ANY 传递给基础 setsockopt 调用。在服务器的情况下,我们将其发送到多播组。运行这个有点棘手。由于无法在标准库中设置 SO_REUSEADDRSO_REUSEPORT ,因此我们需要在多台不同的机器上运行客户机,在另一台机器上运行服务器。为此,所有这些都需要在同一网络中,并且多播组的地址需要是有效的多播地址(前四位应为1110)。 UdpSocket类 型还支持离开多播组,广播等。请注意,广播对 TCP 没有意义,因为根据定义它是两个主机之间的连接。

运行上一个示例很简单;在一台主机上,我们将运行服务器,另一台运行客户端。 鉴于此设置,输出应在服务器端看起来像这样:

$ rustc udp-multicast.rs && ./udp-multicast server 
Hello world!

std::net 中的其它应用程序

标准库中另一个重要的类型是 IpAddr ,它代表一个 IP 地址。毫不奇怪,它是一个包含两个变体的枚举,一个用于 v4 地址,另一个用于v6地址。所有这些类型都有根据类型(全局,环回,多播等)对地址进行分类的方法。请注意,许多这些方法尚未稳定,因此仅在 nightly 编译器中可用。它们位于名为 ip 的功能标志后面,必须包含在 crate 根目录中,以便您可以使用这些方法。密切相关的类型是 SocketAddr ,它是 IP 地址和端口号的组合。因此,这也有两个变体,一个用于v4,一个用于v6。我们来看一些例子:

// code/chapter3/ip-socket-addr.rs

#![feature(ip)]

use std::net::{IpAddr, SocketAddr};

fn main() {
    // construct an IpAddr from a string and check it
    // represents the loopback address
    let local: IpAddr = "127.0.0.1".parse().unwrap();
    assert!(local.is_loopback());

    // construct a globally routable IPv6 address from individual octets
    // and assert it is classified correctly

    let global: IpAddr = IpAddr::from([0, 0, 0x1c9, 0, 0, 0xafc8, 0, 0x1]);
    assert!(global.is_global());

    // construct a SocketAddr from a string an assert that the underlying
    // IP is a IPv4 address
    let local_sa: SocketAddr = "127.0.0.1:80".parse().unwrap();
    assert!(local_sa.is_ipv4());

    // construct a SocketAddr from a IPv6 address and a port, assert that
    // the underlying address is indeed IPv6
    let global_sa = SocketAddr::new(global, 80u16);
    assert!(global_sa.is_ipv6());
}

由于 is_global 尚未稳定,因此必须使用 feature(ip) 声明。这个示例不产生任何输出,因为所有的断言结果都为 true 。

在给定主机名的情况下,常见功能是 DNS 查找。 Rust使用 lookup_host 函数执行此操作,该函数返回 LookupHost 类型,该类型实际上是DNS响应的迭代器。我们来看看如何使用它。这个函数由 looup_host 标志门控,必须包含在 nightly 编译器中使用此函数:

// code/chapter3/lookup-host.rs

#![feature(lookup_host)]

use std::env;
use std::net::lookup_host;

fn main() {
    let args: Vec<_> = env::args().collect();
    if args.len() != 2 {
        eprintln!("Please provide only one host name");
        std::provide::exit(1);
    } else {
        let addresses = lookup_host(&args([1]).unwrap());
        for address in addresses {
            println!("{}", address.ip);
        }
    }
}


在这里,我们读取一个 CLI 参数并退出,如果我们没有给出一个名称来解决。否则,我们使用给定的主机名调用 lookup_host 。我们迭代返回的结果并打印每个的 IP 。 请注意,每个返回的结果都是 SocketAddr 类型;因为我们只对 IP 感兴趣,所以我们使用 ip() 方法提取它。此函数对应于 libc 中的 getaddrinfo 调用,因此它仅返回 A 和 AAAA 记录类型。运行此按预期方式:

$ rustc lookup-host.rs && ./lookup-host 
google.com 2a00:1450:4009:810::200e 
216.58.206.110

目前,标准库中无法进行反向 DNS 查找。在下一节中,我们将讨论生态系统中可用于高级网络功能的一些 crate。例如,trust-dns crate支持更详细地与DNS服务器交互,它还支持查询所有记录类型以及反向 DNS 。

一些相关的库

细心的读者可能已经注意到标准库中缺少许多常见的网络相关功能。例如,没有办法处理 IP 网络(CIDR)。让我们来看看 ipnetwork 库如何帮助它。由于我们将使用外部箱子,因此该示例必须在货物项目中。 我们需要将它添加为 Cargo.toml 的依赖项。让我们从设置项目开始:

$ cargo new --bin ipnetwork-example

这会生成一个 Cargo.toml 文件,我们需要修改它来声明我们的依赖项。一旦我们这样做,它应该是这样的:

[package] 
name = "ipnetwork-example" 
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
ipnetwork = "0.12.7"

设置完项目后,main 函数如下:

// chapter3/ipnetwork-example/src/main.rs

extern crate ipnetwork;

use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network};
use std::net::Ipv4Addr;

fn main() {
    let net = IpNetwork::new("192.168.122.0".parse().unwrap(), 22)
        .expect("Could not construct a network");
    let str_net: IpNetwork = "192.168.122.0/22".parse().unwrap();

    assert!(net == str_net);
    assert!(net.is_ipv4());

    let net4: Ipv4Network = "192.168.121.0/22".parse().unwrap();
    assert!(net4.size() == 2u32.pow(32 - 22));
    assert!(net4.contains(Ipv4Addr::new(192, 168, 121, 3)));

    let _net6: Ipv6Network = "2001:db8::0/96".parse().unwrap();
    for addr in net4.iter().take(10) {
        println!("{}", addr);
    }
}

前两行显示了两种不同的构造 IPNetwork 实例的方法,要么使用构造函数,要么通过解析字符串。 下一个断言确保它们确实是相同的。之后的断言确保我们创建的网络是一个v4网络。接下来,我们专门创建 IPv4Network 对象,如预期的那样,网络的大小与2^(32 - prefix) 匹配。下一个断言确保 contains 方法对该网络中的IP正确工作。然后我们创建一个IPV6网络,由于所有这些类型都实现了迭代器协议,所以我们可以在网络上迭代,并在for循环中打印各个地址。下面是我们通过运行最后一个示例应该看到的输出:

$ cargo run 
    Compiling ipnetwork v0.12.7 
    Compiling ipnetwork-example v0.1.0 (file:///Users/Abhishek/Desktop/rustbook/src/chapter3/ipnetwork-example) 
    Finished dev [unoptimized + debuginfo] target(s) in 1.18 secs 
     Running `target/debug/ipnetwork-example`

192.168.120.0
192.168.120.1
192.168.120.2
192.168.120.3
192.168.120.4
192.168.120.5
192.168.120.6
192.168.120.7
192.168.120.8
192.168.120.9

标准库也缺乏对套接字和连接的细粒度控制,一个例子就是能够设置 SO_REUSEADDR ,如前所述。主要原因是社区未能就如何在保持清洁 API 的同时最好地公开这些功能达成强烈共识。 此上下文中的一个有用的库是 mio ,它提供了基于线程的并发的替代方法。 mio 基本上运行一个所有各方注册的事件循环。当有事件时,每个监听器都会收到警报,并且可以选择处理该事件。 我们来看下面的例子。像上次一样,我们需要使用 cargo 设置项目:

$ cargo new --bin mio-example

下一步,把 mio 添加到依赖项,Cargo.toml 如下:

[package] 
name = "mio-example"
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
mio = "0.6.11"

像所有其他 cargo 项目一样,我们需要将 mio 声明为 Cargo.toml 中的依赖项并将其固定到特定版本,以便 cargo 可以下载并将其链接到我们的应用程序:

// /chapter3/mio-example/src/main.rs

extern crate mio;

use mio::tcp::TcpListener;
use mio::*;

use std::env;
use std::net::SocketAddr;

const SERVER: Token = Token(0);

struct TCPServer {
    address: SocketAddr,
}

impl TCPServer {
    fn new(port: u32) -> Self {
        let address = format!("0.0.0.0:{}", port).parse::<SocketAddr>().unwrap();

        TCPServer { address }
    }

    fn run(&mut self) {
        let server = TcpListener::bind(&self.address).expect("Could not bind to port");
        let poll = Poll::new().unwrap();
        poll.register(&server, SERVER, Ready::readable(), PollOpt::edge())
            .unwrap();

        let mut events = Events::with_capacity(1024);
        loop {
            poll.poll(&mut events, None).unwrap();

            for event in events.iter() {
                match event.token() {
                    SERVER => {
                        let (_stream, remote) = server.accept().unwrap();
                        println!("Connection from {}", remote);
                    }
                    _ => {
                        unreachable!();
                    }
                }
            }
        }
    }
}

fn main() {
    let args: Vec<String> = env::args().collect();
    if args.len() != 2 {
        eprintln!("Please provide only one port number as argument");
        std::process::exit(1);
    }
    let mut server = TCPServer::new(args[1].parse::<u32>().expect("Could not parse as u32"));
    server.run();
}

与我们之前的示例不同,这是一个 TCP 服务器,它只打印客户端的源 IP 和端口。在 mio 中,事件循环中的每个侦听器都被分配一个令牌,然后可以在事件传递时用于区分侦听器。 我们在其构造函数中为我们的服务器(TCPServer)定义一个结构,然后我们绑定到所有本地地址并返回结构的一个实例。该结构的 run 方法将套接字绑定到给定的套接字地址;然后它使用 Poll 结构来实例化事件循环。

然后它用实例上的令牌注册服务器套接字。我们还指出,当事件准备好读或写时,应该向我们发出警报。最后,我们指出我们只希望边缘触发事件,这意味着当接收到事件时,该事件应该完全被消耗掉,否则同一令牌上的后续调用可能会阻止它。然后我们为我们的活动设置一个空容器。完成所有样板文件后,我们进入一个无限循环,并开始使用刚刚创建的事件容器进行轮询。我们循环遍历事件列表,如果事件的任何令牌与服务器的令牌匹配,我们就知道它是针对服务器的。然后我们可以接受连接并打印远程端的信息。然后我们回到下一个事件,依此类推。在主函数中,我们首先处理 cli 参数,确保将端口号作为整数传递。然后,我们实例化服务器并对其调用 run 方法。

以下是两个客户端连接到服务器时运行服务器的示例会话。请注意,可以使用前面的 nc 或 TCP 客户端连接到此服务器:

$ cargo run 4321 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/mio-example 4321` 
Connection from 127.0.0.1:60955 
Connection from 127.0.0.1:60956 
^C

这里讨论的标准库和 crate 中缺少的其他一些主要内容是与物理网络设备协作的能力,一个更好的 API 来处理和解析数据包,等等。一个有助于处理 libpnet 中较低级别的网络相关事务的 crate 。让我们用它来写一个小的数据包转储程序:

$ cat Cargo.toml 
[package] 
name = "pnet-example" 
version = "0.1.0" 
authors = ["Foo Bar <foo@bar.com>"]

[dependencies] 
pnet = "0.20.0"

我们初始化 Cargo 项目:

$ cargo new --bin pnet-example

添加依赖项,并指定它的版本。然后我们来看代码:

// chapter3/pnet-example/src/main.rs

extern crate pnet;

use pnet::datalink::Channel::Ethernet;
use pnet::datalink::{self, NetworkInterface};
use pnet::packet::ethernet::{EtherTypes, EthernetPacket};
use pnet::packet::ip::IpNextHeaderProtocols;
use pnet::packet::ipv4::Ipv4Packet;
use pnet::packet::tcp::TcpPacket;
use pnet::packet::Packet;

use std::env;

fn handle_packet(ethernet: &EthernetPacket) {
    match ethernet.get_ethertype() {
        EtherTypes::Ipv4 => {
            let header = Ipv4Packet::new(ethernet.payload());
            if let Some(header) = header {
                match header.get_next_level_protocol() {
                    IpNextHeaderProtocols::Tcp => {
                        let tcp = TcpPacket::new(header.payload());
                        if let Some(tcp) = tcp {
                            println!(
                                "Got a TCP packet {}:{} to {}:{}",
                                header.get_source(),
                                tcp.get_source(),
                                header.get_destination(),
                                tcp.get_destination()
                            );
                        }
                    }
                    _ => println!("Ignoring non TCP packet"),
                }
            }
        }
        _ => println!("Ignoring non IPv4 packet"),
    }
}

fn main() {
    let interface_name = env::args().nth(1).unwrap();

    let interfaces = datalink::interfaces();
    let interface = interfaces
        .into_iter()
        .filter(|iface: &NetworkInterface| iface.name == interface_name)
        .next()
        .expect("Error getting interface");

    let (_tx, mut rx) = match datalink::channel(&interface, Default::default()) {
        Ok(Ethernet(tx, rx)) => (tx, rx),
        Ok(_) => panic!("Unhandled channel type"),
        Err(e) => panic!(
            "An error occurred when creating the datalink channel: {}",
            e
        ),
    };

    loop {
        match rx.next() {
            Ok(packet) => {
                let packet = EthernetPacket::new(packet).unwrap();
                handle_packet(&packet);
            }
            Err(e) => {
                panic!("An error occurred while reading: {}", e);
            }
        }
    }
}

和之前一样,我们从把 pnet 做为外部 crate 开始。然后我们导入一些稍后将要使用的东西。 我们将接口的名称作为一个 cli参数进行嗅探。datalink::interfaces() 为我们提供了当前主机中所有可用接口的列表,我们根据给定接口的名称筛选该列表。如果找不到匹配项,我们将抛出一个错误并退出。 datalink::channel() 调用为我们提供了发送和接收数据包的通道。 在这种情况下,我们不关心发送端,因为我们只是对嗅探数据包感兴趣。我们匹配返回的通道类型,以确保只使用以太网。通道的接收端 rx 为我们提供了一个迭代器,它在每次 next() 调用时生成数据包。

然后,这些数据包被传递给 handle_packet 函数,该函数提取相关信息并打印这些信息。对于这个玩具示例,我们只处理基于IPv4 的 TCP 包。一个真正的网络显然会通过 UDP 和 TCP 获得 IPv6 和 ICMP 数据包。这里将忽略所有这些组合。

handle_packet 函数中,我们匹配数据包的 ethertype ,以确保只处理IPv4数据包。由于以太网数据包的整个有效负载是IP数据包(请参阅第1章,客户机/服务器网络简介),因此我们从有效负载构造一个IP数据包。get_next_level_protocol() 调用返回传输协议,如果该协议与 TCP 匹配,我们将从前一层的有效负载构造一个 TCP 包。 此时,我们可以从 TCP 包打印源端口和目标端口。源和目标 IP 将在封闭的 IP 包中。 下面的代码块中显示了一个运行示例。我们需要将接口的名称作为命令行参数传递给我们的程序。下面是如何在Linux中获取接口名称:

$ ip a 
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000 
    link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00 
    inet 127.0.0.1/8 scope host lo 
        valid_lft forever preferred_lft forever 
    inet6 ::1/128 scope host
        valid_lft forever preferred_lft forever 
2: enp1s0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast 
state UP group default qlen 1000 
    link/ether f4:4d:30:ac:88:ee brd ff:ff:ff:ff:ff:ff 
    inet 192.168.5.15/22 brd 192.168.7.255 scope global enp1s0 
        valid_lft forever preferred_lft forever 
    inet6 fe80::58c6:9ccc:e78c:caa6/64 scope link 
        valid_lft forever preferred_lft forever

在这个例子中,我们将忽略环回接口 l0 ,因为它不接收大量的流量,并使用另一个接口 enp1s0 。我们还将使用根权限(使用sudo)运行这个示例,因为它需要直接访问网络设备。

$ cargo build 
$ sudo ./target/debug/pnet-example enp1s0 
Got a TCP packet 192.168.0.2:53041 to 104.82.249.116:443 
Got a TCP packet 104.82.249.116:443 to 192.168.0.2:53041 
Got a TCP packet 192.168.0.2:53064 to 17.142.169.200:443 
Got a TCP packet 192.168.0.2:53064 to 17.142.169.200:443 
Got a TCP packet 17.142.169.200:443 to 192.168.0.2:53064
Got a TCP packet 17.142.169.200:443 to 192.168.0.2:53064 
Got a TCP packet 192.168.0.2:53064 to 17.142.169.200:443 
Got a TCP packet 192.168.0.2:52086 to 52.174.153.60:443 
Ignoring non IPv4 packet 
Got a TCP packet 52.174.153.60:443 to 192.168.0.2:52086 
Got a TCP packet 192.168.0.2:52086 to 52.174.153.60:443 
Ignoring non IPv4 packet 
Ignoring non IPv4 packet 
Ignoring non IPv4 packet 
Ignoring non IPv4 packet

在上一节中,我们看到了标准库中与DNS相关的功能是多么有限。 trust-dns 是广泛受欢迎的与 DNS 相关的东西的一个库。让我们来看一个使用它查询给定名称的示例。让我们从空项目开始:

$ cargo new --bin trust-dns-example

添加指定版本的 crate 到 Cargo.toml 中:

[package] 
name = "trust-dns-example" 
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
trust-dns-resolver = "0.6.0" 
trust-dns = "0.12.0"

我们的应用程序依赖 trust-dns 来处理与 DNS 相关的事情。像往常一样,在我们的应用程序中使用之前,我们会将其添加到 Cargo.toml中:

我们设置了所有必须的引入库并用 extern 声明。这里,我们希望将名称作为一个 cli 参数来解析,如果一切正常,它应该在 args[1] 中。 这个 crate 支持两种类型的同步DNS解析程序。 Resolver::new 创建了一个同步的 resolver,使用默认选项,它将使用 Google 的公共 DNS 作为上游服务器。 Resolver::from_system_conf 创建一个同步 resolver,配置来自系统的 resolv.conf 。 因此,第二个选项仅在 UNIX 系统上可用。在将查询传递给解析程序之前, 使用 format! 通过追加 . 来格式化名称 。 我们使用 lookup_ip函数传递查询,然后该函数返回一个迭代器,遍历 DNS 问题的答案。一旦我们得到它,我们就可以迭代并打印出每个答案。 顾名思义,lookup_ip 函数只查找 A 和 AAAA 记录。有一个更通用的查找函数可以接受要查询的记录类型。在最后一步中,我们要获取给定名称的所有 NS 记录。一旦我们得到一个答案,我们就循环它并打印结果。

trust-dns 也支持 tokio 的异步 DNS resolver 。

示例会话将如下所示:

$ cargo run google.com 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
      Running `target/debug/trust-dns-example google.com` 
Using the synchronous resolver 
LookupIp(Lookup { rdatas: [AAAA(2a00:1450:4009:811::200e), A(216.58.206.142)] }) 
Using the system resolver LookupIp(Lookup { rdatas: [A(216.58.206.110), AAAA(2a00:1450:4009:810::200e)] }) 
NS records using the synchronous resolver Lookup { rdatas: [NS(Name { is_fqdn: true, labels: ["ns3", "google", "com"] }), NS(Name { is_fqdn: true, labels: ["ns1", "google", "com"] }), NS(Name { is_fqdn: true, labels: ["ns4", "google", "com"] }), NS(Name { is_fqdn: true, labels: ["ns2", "google", "com"] })] }

在此示例中,所有打印都使用结构的调试表示。真正的应用程序需要根据需要格式化这些格式。

总结

本章简要介绍了Rust中的基本网络功能。我们从 std::net 中的给定功能开始,我们使用它们编写了一些 TCP 和 UDP 服务器。然后我们查看了同一命名空间中的其他一些实用程序。最后,我们介绍了一些旨在扩展标准库的网络功能的板条箱示例。请记住,始终可以使用 libc crate 来编写网络代码,该代码基于 POSIX 兼容的网络代码,可以访问对套接字和网络设备的细粒度控制。 这种方法的问题在于代码可能不安全,破坏了 Rust 的安全保障。另一个名为 nix 的crate旨在提供 libc 的功能本机 Rust ,以便它保留编译器提供的所有内存和类型安全保证:对于需要对网络进行非常精细控制的人来说,这可能是一个有用的替代方案。

第四章:数据序列化、反序列化和解析

在前一章中,我们讨论了如何编写 Rust 的简单套接字服务器。传输协议(如TCP和UDP)只提供了传输消息的机制,因此,实际制作和发送这些消息取决于更高级别的协议。 另外,TCP 和 UDP 协议总是处理字节;在将这些字节发送到套接字之前,当我们在字符串上调用 as_bytes 时,我们看到了这一点。 将一段数据转换成可以存储或传输的格式(在联网的情况下是字节流)的过程称为序列化。相反的过程是反序列化,它将原始数据格式转换为数据结构。 任何网络软件都必须处理已接收或即将发送的数据的序列化和反序列化。 对于更复杂的类型,例如用户定义的类型,甚至简单的集合类型,这种简单的转换并不总是可能的。Rust 生态系统有特殊的 crate ,可以在各种情况下处理这些问题。

在本章,我们将讨论以下话题:

  • 使用 Serde 序列化和反序列化。我们将从基本用法开始,然后使用 Serde 编写自定义序列化程序。
  • 使用 nom 解析文本数据
  • 最后一个主题是解析二进制数据,这是一种非常常用的网络技术。

使用 Serde进入序列化和反序列化

Serde 是 Rust 中数据序列化和反序列化的事实标准方法。 Serde 支持许多数据结构,它可以开箱即用地串行化为多种给定的数据格式(包括 JSON 和 TOML ,CSV)。理解 Serde 的最简单方法是将其视为可逆函数,将给定的数据结构转换为字节流。除了标准数据类型之外,Serde 还提供了一些可以在用户定义的数据类型上实现的宏,使它们(可)序列化。

在第2章“Rust及其生态系统简介”中,我们讨论了如何使用过程宏来实现给定数据类型的自定义派生。 Serde 使用该机制提供两个自定义派生,名为 SerializeDeserialize ,可以为由 Serde 支持的数据类型组成的用户定义数据类型实现。让我们看一下这是如何工作的一个小例子。我们首先使用 Cargo 创建空项目:

Cargo.toml 如下所示:

[package] 
name = "serde-basic" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies] 
serde = "1.0" 
serde_derive = "1.0" 
serde_json = "1.0" 
serde_yaml = "0.7.1"

serde crate 是 Serde 生态的核心。serde_derive crate 提供必要的工具,使用过程宏来派生 SerializeDeserialize 。接下来的两个 crate 分别为 JSON 和 YAML 提供特定于 Serde 的功能:

// chapter4/serde-basic/src/main.rs

#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;
extern crate serde_yaml;

// We will serialize and deserialize instances of
// this struct
#[derive(Serialize, Deserialize, Debug)]
struct ServerConfig {
    workers: u64,
    ignore: bool,
    auth_server: Option<String>,
}

fn main() {
    let config = ServerConfig {
        workers: 100,
        ignore: false,
        auth_server: Some("auth.server.io".to_string()),
    };

    {
        println!("To and from YAML");
        let serialized = serde_yaml::to_string(&config).unwrap();
        println!("{}", serialized);
        let deserialized: ServerConfig = serde_yaml::from_str(&serialized).unwrap();
        println!("{:?}", deserialized);
    }
    println!("\n\n");

    {
        println!("To and from JSON");
        let serialized = serde_json::to_string(&config).unwrap();
        println!("{}", serialized);
        let deserialized: ServerConfig = serde_json::from_str(&serialized).unwrap();
        println!("{:?}", deserialized);
    }
}


由于 serde_derive 导出宏,我们需要用 macro_use 声明对其标记;然后我们将所有依赖项声明为外部 crate . 设置完之后,我们就可以自定义数据类型。在这种情况下,我们感兴趣的是具有一堆不同类型参数的服务器的配置。 auth_server 参数是可选的,这就是它包含在 Option 中的原因。我们的结构派生了 Serde 的两个特征,以及编译器提供的 Debug 特性,稍后我们将在反序列化后显示它们。 在我们的 main 函数中,我们实例化我们的类并在其上调用 serde_yaml::to_string 以将其序列化为字符串;与此相反的是 serde_yaml::from_str

运行代码:

$ cargo run 
    Compiling serde-basic v0.1.0 (file:///Users/Abhishek/Desktop/rustbook/src/chapter4/serde-basic)
     Finished dev [unoptimized + debuginfo] target(s) in 1.88 secs
      Running `target/debug/serde-basic` 
To and from YAML 
--
workers: 100 
ignore: false 
auth_server: auth.server.io 
ServerConfig { workers: 100, ignore: false, auth_server: Some("auth.server.io") }

To and from JSON 
{"workers":100,"ignore":false,"auth_server":"auth.server.io"} 
ServerConfig { workers: 100, ignore: false, auth_server: Some("auth.server.io") }

让我们继续讨论通过网络使用 Serde 的更高级示例。在此示例中,我们将设置 TCP 服务器和客户端。这一部分与我们在上一章中所做的完全相同。但这一次,我们的 TCP 服务器将作为一个计算器,在三个空间中使用三个分量的三个分量接收一个点,并在同一参考帧中返回它与原点的距离。让我们像这样建立我们的 Cargo 项目:

$ cargo new --bin serde-server

Cargo.toml 如下:

$ cat Cargo.toml 
[package] 
name = "serde-server" 
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
serde = "1.0" 
serde_derive = "1.0" 
serde_json = "1.0"

这样,我们就可以继续定义代码了。在本例中,服务器和客户机将在同一个二进制文件中。 应用程序将接受一个标志,该标志指示它应该作为服务器还是客户机运行。 正如我们在上一章中所做的,在服务器的情况下,我们将绑定到已知端口上的所有本地接口,并监听传入的连接。客户机案例将连接到该已知端口上的服务器,并等待控制台上的用户输入。 客户机期望输入为三个整数的逗号分隔列表,每个轴一个。在获取输入时,客户机构造一个给定定义的结构,使用 serde 对其进行序列化,并将字节流发送到服务器。 服务器将流反序列化为同一类型的结构。然后,它计算距离并返回结果,然后客户机显示该结果。代码如下:

// chapter4/serde-server/src/main.rs

#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate serde_json;

use std::net::{TcpListener, TcpStream};
use std::io::{stdin, BufRead, BufReader, Error, Write};
use std::{env, str, thread};

#[derive(Serialize, Deserialize, Debug)]
struct Point3D {
    x: u32,
    y: u32,
    z: u32,
}

fn handle_client(stream: TcpStream) -> Result<(), Error> {
    println!("Incoming connection from: {}", stream.peer_addr()?);
    let mut data = Vec::new();
    let mut stream = BufReader::new(stream);

    loop {
        data.clear();

        let bytes_read = stream.read_until(b'\n', &mut data)?;
        if bytes_read == 0 {
            return Ok(());
        }
        let input: Point3D = serde_json::from_slice(&data)?;
        let value = input.x.pow(2) + input.y.pow(2) + input.z.pow(2);

        write!(stream.get_mut(), "{}", f64::from(value).sqrt())?;
        write!(stream.get_mut(), "{}", "\n")?;

    }
}

fn main() {

    let args: Vec<_> = env::args().collect();

    if args.len() != 2 {
        eprintln!("Please provide --client or --server as argument");
        std::process::exit(1);
    }

    if args[1] == "--server" {
        let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
        for stream in listener.incoming() {
            match stream {
                Err(e) => eprintln!("failed: {}", e),
                Ok(stream) => {
                    thread::spawn(move || {
                        handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                    });
                }
            }
        }
    } else if args[1] == "--client" {
        let mut stream = TcpStream::connect("127.0.0.1:8888").expect("Could not connect to server");
        println!("Please provide a 3D point as three comma separated integers");
        loop {
            let mut input = String::new();
            let mut buffer: Vec<u8> = Vec::new();
            stdin().read_line(&mut input).expect("Failed to read from stdin");
            let parts: Vec<&str> = input.trim_matches('\n').split(',').collect();
            let point = Point3D {
                x: parts[0].parse().unwrap(),
                y: parts[1].parse().unwrap(),
                z: parts[2].parse().unwrap(),

            };

            stream.write_all(serde_json::to_string(&point).unwrap().as_bytes())
                .expect("Failed to write to server");
            stream.write_all(b"\n").expect("Failed to write to server");

            let mut reader = BufReader::new(&stream);
            reader.read_until(b'\n', &mut buffer).expect("Could not read into buffer");
            let input = str::from_utf8(&buffer).expect("Could not write buffer as string");
            if input == "" {
                eprintln!("Empty response from server");
            }
            print!("Response from server {}", input);

        }
    }
}







我们设置 Serde ,就像我们在上一个例子中所做的那样。然后,我们将 3D 点定义为三个元素的结构。 在我们的 main 函数中,我们处理 CLI 参数并分支到客户端或服务器,具体取决于传递的内容。 在这两种情况下,我们通过发送换行符来表示传输结束。客户端从 stdin 读取一行,清除它,并在循环中创建结构的实例。 在这两种情况下,我们将流包装在 BufReader 中以便于处理。我们使用 Cargo 运行代码。服务器上的示例会话如下:

server$ cargo run -- --server 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/serde-server --server` 
Incoming connection from: 127.0.0.1:49630

在客户端,我们看到以下与服务器的交互。正如所料,客户端读取输入,序列化输入,并将其发送到服务器。 然后它等待响应,当它得到响应时,将结果打印到标准输出:

client$ cargo run -- --client 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/serde-server --client` 
Please provide a 3D point as three comma separated integers 
1,2,3 
Response from server 3.7416573867739413 
3,4,5 
Response from server 7.0710678118654755 
4,5,6
Response from server 8.774964387392123

自定义序列化和反序列化

正如我们前面看到的,serde 通过宏为所有原始数据类型和许多复杂数据类型提供内置的序列化和反序列化。然而,在某些情况下,SERDE 可能无法自动实现。对于更复杂的数据类型,可能会发生这种情况。 在这些情况下,您需要手动实现这些功能。这些案例演示了 serde 的高级用法,它还允许重命名输出中的字段。对于日常使用,使用这些高级功能几乎是不必要的。这些对于网络、处理新协议以及其他方面来说可能更为常见。

假设我们有三个字段的结构。我们将假设 Serde 未能对此实现 Serializ e和 Deserialize ,因此我们需要手动实现这些。我们使用 Cargo 初始化我们的项目:

$ cargo new --bin serde-custom

然后声明依赖项,Cargo.toml 看起来是这样子:

[package]
name = "serde-custom"
version = "0.1.0"
authors = ["Foo <foo@bar.com>"]

[dependencies]
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
serde_test = "1.0"

我们的结构体是这样子的:


# #![allow(unused_variables)]
#fn main() {
// chapter4/serde-custom/src/main.rs

// We will implement custom serialization and deserialization // for this struct
#[derive(Debug, PartialEq)]
struct KubeConfig {
port: u8,
healthz_port: u8,
max_pods: u8,
}
#}

我们需要为 Serde 派生 DebugPartialEq 在内部使用。在现实世界,可能还要手动实现这些。现在,我们需要为 kubeconfig 实现 Serialize


# #![allow(unused_variables)]
#fn main() {
pub trait Serialize {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer;
}
#}

序列化结构体的基本流程是,将简单地序列化结构名称,然后序列化每个元素,然后按顺序发出序列化结束的信号。serde 具有可用于所有基本类型的内置序列化方法,因此实现不需要担心处理内置类型。让我们看看如何序列化我们的结构体:


# #![allow(unused_variables)]
#fn main() {
// chapter4/serde-custom/src/main.rs

// Implementing Serialize for our custom struct defines
// how instances of that struct should be serialized.
// In essence, serialization of an object is equal to
// sum of the serializations of it's components
impl Serialize for KubeConfig {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer
{
let mut state = serializer.serialize_struct("KubeConfig", 3)?;
state.serialize_field("port", &self.port)?;
state.serialize_field("healthz_port", &self.healthz_port)?;
state.serialize_field("max_pods", &self.max_pods)?; state.end()
}
}
#}

结构的序列化总是以调用 serialize_struct 开始,结构名称和字段数作为参数(对于其他类型,有类似的命名方法)。然后,我们按照它们出现的顺序序列化每个字段,同时传递将在结果 json 中使用的键名。 完成后,我们将特定的 end 作为信号。

实现反序列化有点复杂,有一些样板代码。相关特征如下:


# #![allow(unused_variables)]
#fn main() {
pub trait Deserialize<'de>: Sized {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de>;

}
#}

为类型实现此操作需要实现访问者模式。 Serde 定义了一个特殊的 Visitor 特征,如下面的示例所示。请注意,这有所有内置类型的 visit_ * 方法,这里没有显示。 此外,在下面的示例中,我们使用符号 ... 来表示此处有更多对我们的讨论不重要的方法。


# #![allow(unused_variables)]
#fn main() {
pub trait Visitor<'de>: Sized {
type Value;
fn expecting(&self, formatter: &mut Formatter) -> Result;
fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E>
where
E: Error,
{ }
...
}
#}

该特征的实现由解析器在内部用于构造结果类型。在我们的例子中,它看起来像这样:


# #![allow(unused_variables)]
#fn main() {
// chapter4/serde-custom/src/main.rs

// Implementing Deserialize for our struct defines how
// an instance of the struct should be created from an
// input stream of bytes

impl<'de> Deserialize<'de> for KubeConfig {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where D: Deserializer<'de>
{
enum Field { Port, HealthzPort, MaxPods };

impl<'de> Deserialize<'de> for Field {

fn deserialize<D>(deserializer: D) -> Result<Field, D::Error>
where D: Deserializer<'de>
{
struct FieldVisitor;

impl<'de> Visitor<'de> for FieldVisitor {
type Value = Field;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("`port` or `healthz_port`or `max_pods`")
}

fn visit_str<E>(self, value: &str) -> Result<Field, E>
where E: de::Error {
match value {
"port" => Ok(Field::Port),
"healthz_port" => Ok(Field::HealthzPort),
"max_pods" => Ok(Field::MaxPods),
_ => Err(de::Error::unknown_field(value, FIELDS)),
}
}
}

deserializer.deserialize_identifier(FieldVisitor)
}
}
}
#}

现在,反序列化器的输入是 json ,可以将其视为映射。因此,我们只需要实现访问者特征中的 visit_map 。如果任何非 json 数据传递给我们的反序列化器,它将在从该特征调用某些其他函数时出错。 以前的大多数实现都是样板。它归结为几个部分:为字段实现 Visitor ,并实现 visit_str (因为我们所有的字段都是字符串)。 此时,我们应该能够反序列化单个字段。第二部分是为整个结构实现 Visitor ,并实现 visit_map 。在所有情况下都必须适当处理错误。 最后,我们可以调用 deserializer.deserialize_struct 并传递结构的名称,字段列表以及整个结构的访问者实现。

具体实现如下:


# #![allow(unused_variables)]
#fn main() {
// chapter4/serde-custom/src/main.rs

impl<'de> Deserialize<'de> for KubeConfig {
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 
        where D: Deserializer<'de> 
    {
        struct KubeConfigVisitor;

        impl<'de> Visitor<'de> for KubeConfigVisitor { 
            type Value = KubeConfig;

            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 
                formatter.write_str("struct KubeConfig") 
            }

            fn visit_map<V>(self, mut map: V) -> Result<KubeConfig, V::Error>
                where V: MapAccess<'de> 
            {
                let mut port = None;
                let mut hport = None;
                let mut max = None;

                while let Some(key) = map.next_key()? {
                    match key { 
                        Field::Port => { 
                            if port.is_some() { 
                                return Err(de::Error::duplicate_field("port")); 
                            } 
                            port = Some(map.next_value()?); 
                        } 
                        Field::HealthzPort => { 
                            if hport.is_some() { 
                                return Err(de::Error::duplicate_field ("healthz_port"));
                            } 
                            hport = Some(map.next_value()?); 
                        } 
                        Field::MaxPods => { 
                            if max.is_some() { 
                                return Err(de::Error::duplicate_field ("max_pods"));
                            } 
                            max = Some(map.next_value()?);
                        }
                    }
                } 
                let port = port.ok_or_else(|| de::Error::missing_field("port"))?;
                let hport = hport.ok_or_else(|| de::Error::missing_field("healthz_port"))?; 
                let max = max.ok_or_else(|| de::Error::missing_field("max_pods"))?; 
                Ok(KubeConfig {port: port, healthz_port: hport, max_pods: max})
            }
        }

        const FIELDS: &'static [&'static str] = &["port", "healthz_port", "max_pods"]; 
        deserializer.deserialize_struct("KubeConfig", FIELDS, KubeConfigVisitor)

    }
}
#}

Serde 还提供了一个 crate ,可用于使用类似令牌流的接口对自定义序列化器和反序列化器进行单元测试。要使用它,我们需要将 serde_test 添加到我们的 Cargo.toml 并在主文件中将其声明为 extern crate 。这是对我们的反序列化器的测试:


# #![allow(unused_variables)]
#fn main() {
// chapter4/serde-custom/src/main.rs

#[test] 
fn test_ser_de() { 
    let c = KubeConfig { port: 10, healthz_port: 11, max_pods: 12};

    assert_de_tokens(&c, &[
        Token::Struct { name: "KubeConfig", len: 3 }, 
        Token::Str("port"), Token::U8(10), 
        Token::Str("healthz_port"), 
        Token::U8(11), 
        Token::Str("max_pods"), 
        Token::U8(12), 
        Token::StructEnd,
    ]);
}
#}

assert_de_tokens 调用检查给定的标记流是否反序列化到我们的结构,从而测试我们的反序列化器。 我们还可以添加一个主函数来驱动序列化器,如下所示:

// chapter4/serde-custom/src/main.rs

fn main() { 
    let c = KubeConfig { port: 10, healthz_port: 11, max_pods: 12}; 
    let serialized = serde_json::to_string(&c).unwrap(); 
    println!("{:?}", serialized); 
}

在所有这些都可以使用 Cargo 运行。使用 cargo test 运行我们刚刚编写的测试,该测试应该通过。 货运应该运行 main 函数并打印序列化的json:


# #![allow(unused_variables)]
#fn main() {
$ cargo test
    Compiling serde-custom v0.1.0 (file:///serde-custom) 
     Finished dev [unoptimized + debuginfo] target(s) in 0.61 secs 
      Running target/debug/deps/serde_custom-81ee5105cf257563

running 1 test test 
test_ser_de ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out


$ cargo run
    Compiling serde-custom v0.1.0 (file:///serde-custom) 
     Finished dev [unoptimized + debuginfo] target(s) in 0.54 secs 
      Running `target/debug/serde-custom` 
"{\"port\":10,\"healthz_port\":11,\"max_pods\":12}"
#}

解析文本数据

数据解析是与反序列化密切相差的问题。考虑解析的最常见方式是从正则语法开始并基于此构造解析器。这将导致一个自下而上的解析器,较小的规则解析整个输入的较小组件。 最终的组合规则将给定顺序中的所有较小规则组合在一起形成最终解析器。这种正式定义有限规则集的方式称为 解析表达语法 Parsing Expression Grammar(PEG)。 这确保解析是明确的;如果解析成功,则只有一个有效的解析树,在 Rust 生态系统中,有几种不同的方式可以实现 PEG 。 每种方式都有优点和缺点。第一种方法是使用宏来定义于域的语言以进行解析。

该方法通过新的宏系统与编译器很好地集成,并且可以生成快速代码。 但是,这通常更难以调试和维护。由于此方法不允许重载运算符,因此实现必须定义 DSL ,这可能更多地是学习者的认知负载。 第二种方法是使用 特质系统 。此方法有助于定义自定义运算符,并且通常更易于调试和维护。使用第一种方法的解析器的示例是 nom;使用第二种方法的解析器的例子是 pom 和 pest 。

我们用于解析的用例主要在网络应用程序的上下文中。在这些情况下,有时处理原始字符串(或字节流)并解析所需信息而不是反序列化为复杂数据结构更有用。 常见的情况是任何基于文本的协议,例如 HTTP 。服务器可能会在套接字上接收原始请求作为字节流,并对其进行解析以提取信息。在本节中,我们将研究 Rust 生态系统中的一些常见解析技术。

现在,nom是一个解析器组合框架,这意味着它可以组合较小的解析器来构建更强大的解析器。 这是一种自下而上的方法,通常从编写非常特定的解析器开始,该解析器从输入中解析定义良好的东西。然后,框架提供了将这些小解析器链接到完整的解析器的方法。 这种方法与 lexyacc 情况下的自上而下方法形成对比,其中一种方法可以从定义语法开始。 它可以处理字节流(二进制数据)或字符串,并提供 Rust 的所有常规保证。 让我们从解析一个简单的字符串开始,在这种情况下是一个 HTTP GET 或 POST 请求。像所有货物项目一样,我们将首先建立结构:

$ cargo new --bin nom-http

然后添加依赖项(nom) :

$ cat Cargo.toml 
[package] 
name = "nom-http" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies.nom] 
version = "3.2.1" 
features = ["verbose-errors"]

crate 提供了一些通常对调试有用的额外功能;默认情况下禁用它们,可以通过将列表传递给 features 标志来打开它们,如上例所示。现在,让我们转到我们的主文件:

// chapter4/nom-http/src/main.rs

#[macro_use]
extern crate nom;

use nom::{ErrorKind, IResult};
use std::str;

#[derive(Debug)]
enum Method {
    GET,

    POST,
}

#[derive(Debug)]
struct Request {
    method: Method,

    url: String,

    version: String,
}

// A parser that parses method out of a HTT request
named!(parse_method<&[u8], Method>, return_error!(ErrorKind::Custom(12), alt!(map!(tag!("GET"), |_| Method::GET) | map!(tag!("POST"), |_| Method::POST))));

// A parser that parses the request part
named!(parse_request<&[u8], Request>, ws!(do_parse!( method: parse_method >> url: map_res!(take_until!(" "), str::from_utf8) >> tag!("HTTP/") >> version: map_res!(take_until!("\r"), str::from_utf8) >> (Request { method: method, url: url.into(), version: version.into() }) )));

// Driver function for running the overall parser
fn run_parser(input: &str) {
    match parse_request(input.as_bytes()) {
        IResult::Done(rest, value) => println!("Rest: {:?} Value: {:?}", rest, value),
        IResult::Error(err) => println!("{:?}", err),
        IResult::Incomplete(needed) => println!("{:?}", needed),
    }
}

fn main() {
    let get = "GET /home/ HTTP/1.1\r\n";
    run_parser(get);
    let post = "POST /update/ HTTP/1.1\r\n";
    run_parser(post);
    let wrong = "WRONG /wrong/ HTTP/1.1\r\n";
    run_parser(wrong);
}

很明显, nom 使用了大量的宏来生成代码,最重要的是 named! ,它接受一个函数签名并定义一个基于它的解析器, nom 解析器返回 IResult 类型的实例,它是枚举类型,并有三种变体:

  • Done(rest, value) 表示当前解析器成功的情况,在这种情况下,该值具有当前解析的值,其余的将具有要解析的剩余输入。
  • Error(Err(E)) 表示解析期间的错误。基础错误将包含错误代码,错误位置等,在一个大型的解析树中,这也可以保存指向更多错误的指针。
  • Incomplete(needed) 表示由于某种原因解析不完整的情况。需要的是一个枚举,它又有两个变体;第一个代表不知道需要多少数据的情况。第二个代码所需数据的大小。

我们从 HTTP 方法表示和结构的完整请求开始。在我们的示例中,我们只处理 GET 和 POST 。并忽略其他的所有内容。然后我们为HTTP方法定义一个解析器;我们的解析器将接受一片字节并返回Method枚举。这可以通过读取输入并查找字符串GET或POST来完成。 在每种情况下,基本解析器都是使用 tag! 构造,它解析输入以提取给定的字符串。 而且,如果解析成功的话,我们可以使用 map! 将结果转换为 Method ,将分析器的结果映射到函数。现在,对于解析方法,我们要么有一个post,要么有一个get,但决不能两者都有。我们使用 alt! 宏来表示前面构造的两个分析器的逻辑 或。 alt! 如果宏的任何一个组成宏可以解析给定的输入,那么宏将构造一个解析器来解析输入。最后,所有这些都包含在 return_error ,如果分析在当前分析器中失败,它会提前返回,而不是传递到树中的下一个分析器。

然后,我们通过定义 parse_request 来解析整个请求。我们从使用 ws! 从输入中删除多余的空白。 然后我们调用 do_parse! 链接多个子分析器。这一个不同于其他组合器,因为它允许存储中间分析器的结果。这对于在返回结果时构造结构的实例很有用。 在 do_parse! 我们首先调用 parse_method 并将其结果存储在一个变量中。在从请求中删除了方法之后,我们应该在找到对象的位置之前遇到空白。这是由 take-until!(" ) 处理的,它使用输入直到找到空白空间。使用 map-res! 将结果转换为 str 。 列表中的下一个分析器是使用标记移除序列 HTTPtag! 。接下来,我们通过读取输入来解析 HTTP 版本,直到看到一个 \r ,并将其映射回一个 str 。完成所有解析后,我们构造一个请求对象并返回它。 注意在序列中使用 >> 作为解析器之间的分隔符。

我们还定义了一个名为 run_parser 的助手函数,在给定的输入中运行解析器并打印结果。此函数调用解析器并对结果进行匹配,以显示结果结构或错误。 然后,我们用三个 HTTP 请求定义我们的主函数,前两个是有效的,最后一个是无效的,因为方法是错误的。运行此命令时,输出如下:


# #![allow(unused_variables)]
#fn main() {
$ cargo run 
    Compiling nom-http v0.1.0 (file:///Users/Abhishek/Desktop/rustbook/src/ch4/nom-http)
     Finished dev [unoptimized + debuginfo] target(s) in 0.60 secs
      Running `target/debug/nom-http` 
Rest: [] Value: Request { method: GET, url: "/home/", version: "1.1" } 
Rest: [] Value: Request { method: POST, url: "/update/", version: "1.1" } 
NodePosition(Custom(128), [87, 82, 79, 78, 71, 32, 47, 119, 114, 111, 110, 103, 47, 32, 72, 84, 84, 80, 47, 49, 46, 49, 13, 10], [Position(Alt, [87, 82, 79, 78, 71, 32, 47, 119, 114, 111, 110, 103, 47, 32, 72, 84, 84, 80, 47, 49, 46, 49, 13, 10])])
#}

在前两种情况下,所有内容都按预期进行了解析,然后我们得到了结果。正如预期的那样,在最后一种情况下解析失败并返回自定义错误

正如我们之前讨论的那样,nom 的一个常见问题是调试,因为调试宏要困难得多。 宏也鼓励使用特定的 DSL(比如使用>>分隔符),有些人可能会觉得难以使用。在撰写本文时,来自 nom 的一些错误消息对于找出给定解析器的错误是没有帮助的。 这些肯定会在未来有所改进,但与此同时,nom 提供了一些辅助宏来进行调试。

例如,dbg! 如果底层解析器未返回 Done,则打印结果和输入。 dbg_dump! 宏是类似的,但也打印出输入缓冲区的十六进制转储。 根据我们的经验,可以使用一些技术进行调试:

  • 通过将编译器选项传递给 rustc 来扩展宏。 Cargo 使用以下调用启用此功能:cargo rustc -- -Z unstable-options -pretty=expanded 扩展并漂亮打印给定项目中的所有宏。有人可能会发现扩展宏以跟踪执行和调试很有用。 Cargo中的相关命令, rustc -- -Z trace-macros 仅扩展宏。
  • 独立运行较小的解析器。给定一系列解析器和另一个解析器,可能更容易运行每个子解析器,直到其中一个错误出来。然后,可以继续调试失败的小解析器。这在隔离故障时非常有用。
  • 使用提供的调试宏 dbg!dbg_dump!。这些可以像调试打印语句一样用于跟踪执行。

pretty=expanded 现在是一个不稳定的编译器选项。在将来的某个时候,它将被稳定(或移除)。在这种情况下,不需要传递 -Z unstable-options 标志来使用它。

让我们看一个名为 pom 的另一个解析器组合子的例子。正如我们之前讨论的那样,这个很大程度上依赖于 traits 和 operator-overloading 来实现解析器组合。在撰写本文时,当前版本为 1.1.0 ,我们将在示例项目中使用它。 像往常一样,第一步是设置我们的项目并将pom添加到我们的依赖项:

$ cargo new --bin pom-string

Cargo.toml 如下:

[package] 
name = "pom-string" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies] 
pom = "1.1.0"

在此示例中,我们将解析示例HTTP请求,就像上次一样。这是它的样子:

解析二进制数据

总结

第五章:应用层协议

正如我们在前几章中看到的那样,网络中的两台主机以流或离散数据包交换字节。 通常由更高级别的应用程序将这些字节处理为对应用程序有意义的事物。 这些应用程序在传输层上定义了一个新的协议层,通常称为应用层协议。 在本章中,我们将研究其中的一些协议。

设计应用层协议有许多重要的考虑因素。实现需要至少知道以下细节:

  • 通信广播还是点对点?在第一种情况下,底层传输协议必须是 UDP。在第二种情况下,它可以是 TCP 或 UDP 。
  • 协议是否需要可靠的传输?如果是,TCP 是唯一的选择。否则,UDP 也可能是合适的。
  • 应用程序是否需要字节流(TCP),还是可以逐个数据包(UDP)工作?
  • 是什么标志着各方之间的投入结束?
  • 使用的数据格式和编码是什么?

一些非常常用的应用层协议是 DNS(我们在前面的章节中研究过)和 HTTP(我们将在后续章节中研究)。除此之外,通常用于基于微服务架构的非常重要的应用层工具集是 gRPC。 每个人至少使用过几次的另一个应用层协议是 SMTP,电子邮件提供支持的协议。

在本章中,我们将研究以下主题:

  • RPC如何工作。具体来说,我们将查看gRPC并使用该工具包编写一个小型服务器和客户端。
  • 我们将看一下可以用来以编程方式发送电子邮件的crate caller lettre。
  • 最后一个主题是在Rust中编写一个简单的FTP客户端和TFTP服务器。

RPC 简介

在常规编程中,将常用逻辑封装在函数中通常很有用,这样它就可以在多个地方重用。 随着网络和分布式系统的兴起,有必要通过网络访问一组通用操作,以便经过验证的客户端可以调用它们。这通常称为 Remote Procedure Call (RPC) 。 在第4章,数据序列化,反序列化和解析中,当服务器从原点返回给定点的距离时,我们看到了一个简单的例子。真实世界RPC定义了许多应用层协议,这些协议要复杂得多。 最受欢迎的RPC实现之一是 gRPC ,它最初由 Google 引入,后来转移到开源模型。 gRPC 提供高性能的 RPC over Internet 规模网络,并广泛用于许多项目,包括 Kubernetes 。

在深入研究 gRPC 之前,让我们看看协议缓冲区,一个相关的工具。 它是一组在应用程序之间构建语言和平台中立交换结构化数据的机制。它定义了自己的 接口定义语言 Interface Definition Language (IDL) 来描述数据格式,以及一个编译器,它可以采用该格式并生成代码以进行转换。 IDL 还允许定义抽象服务:输入和输出消息格式,编译器可以使用它来生成给定语言的存根。我们将在后续示例中看到数据格式定义的示例。 编译器具有插件,可以生成大量语言的输出代码,包括 Rust 。在我们的示例中,我们将在构建脚本中使用此类插件来自动生成Rust模块。现在,gRPC使用协议缓冲区来定义基础数据和消息。消息在 TCP/IP 之上通过 HTTP/2 进行交换。这种通信模式在实践中通常更快,因为它可以更好地利用现有连接,并且因为 HTTP/2 支持双向异步连接。作为一个自以为是的系统,gRPC 对我们代表我们在上一节中讨论过的考虑做出了很多假设。大多数这些默认值(如 TCP 上的 HTTP/2 )都被选中,因为它们支持 gRPC 提供的高级功能(如双向流)。 其他一些默认设置(如使用protobuf)可以交换为另一种消息格式实现。

SMTP 简介

FTP 和 TFTP 简介

总结

第六章:HTTP

Hyper 简介

Rocket 简介

reqwest 简介

第七章:使用 Tokio 异步网络编程

在顺序编程模型中,代码总是按照编程语言的语义规定的顺序执行。因此,如果一个操作由于某种原因(等待资源等)而阻塞,则整个执行块只能在该操作完成后继续前进。这通常会导致资源利用率低下,因此主线程将忙于等待琴操作。在 GUI 应用程序中,这也导致用户交互不良,因为负责管理 GUI 的主线程正忙于等待其他东西。这是网络编程的一个主要问题。因为我们通常需要等待数据在套接字上可用。在过去,我们使用多个线程来解决这个问题。在这个模型中,我们将一个代价高昂的操作委托给一个后台线程,使主线程可以自由地进行用户交互或其他任务。相反,异步编程模型规定不应该阻止任何操作。但是我们如何实现这一目标呢?一种简单的方法是在自己的线程中运行每个操作,然后在所有这些线程上连接。实际上,由于大量潜在的线程和它们之间的协调,这很麻烦。

Rust提供了一些使用基于 Future 的事件循环驱动模型支持异步编程的 crate 。我们将在本章详细研究。 以下是我们将在此处讨论的主题:

  • Rust 的 Future 抽象
  • 使用 tokio 堆栈进行异步编程

深入 Future

Rust 的异步编程主干库是 futures。它提供了一个 future 的结构。这实际上是一个表示操作结果的点位符。如你所想,操作的结果可以处理两种状态:操作仍在进行中,旨果尚未完成;或者操作已完成且结果可用。 注意在第二种情况下,可能存在错误,使用结果变得无关紧要。

该库提供了一个名为 Future(以及其他东西)的特征,任何类型都可以实现这种特性。


# #![allow(unused_variables)]
#fn main() {
trait Future {
    type Item;
    type Error;
    fn poll(&mut self) -> Poll<Self::Item, Self::Error>;
}
#}

Item 表示操作成功后的返回类型,Error 表示操作失败的返回类型。实现必须指定这些并且还实现获取计算当前状态的 poll 方法。如果已经完成,则返回结果。如果没有, future 将注册当前任务对给定操作的结果感兴趣。此函数返回一个 Poll ,如下所示:


# #![allow(unused_variables)]
#fn main() {
type Poll<T, E> = Result<Async<T>, E>;
#}

Poll 被输入到另一个名为 Async(和给定的错误类型)的类型的结果中,该类型接下来定义。


# #![allow(unused_variables)]
#fn main() {
pub enum Async<T> {
    Ready(T),
    NotReady,
}
#}

反过来,Async 是一个可以在 Ready(T)NotReady 中的枚举。 最后两个状态对应于操作的状态。因此,poll 函数可以返回三种可能的状态:

  • Ok(Async::Ready(result)) 当操作成功完成并且结果在名为 result 的内部变量中。
  • OK(Async::NotReady) 当操作还没有完成,结果获取不到。注意,这并不表示错误情况。
  • Err(e) 当操作运行产生一个错误,在这种情况下没有结果可以获取。

很容易注意到,Future 本质上是一个 Result ,可能仍在运行实际生成 Result 的东西。 如果删除结果可能在任何时间点都没有准备好的情况,我们剩下的唯一两个选项是 Ok 和 Err 情况,它们与结果完全对应。

因此, Future 可以代表任何需要花费大量时间才能完成的事情。这可以是网络事件,磁盘读取等。现在,最常见的问题是:我们如何从给定函数返回 future ?有几种方法可以做到这一点。我们来看一个例子吧。项目设置与以往一样。

 cargo new --bin futures-example

在 Cargo 配置里添加一些库:

[package] 
name = "futures-example" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies] 
futures = "0.1.17" 
futures-cpupool = "0.1.7"

main.rs 中,我们像往常一样设置所有内容。我们的目的是找出给定的整数是否为素数,这将代表我们的操作部分,需要一些时间才能完成。我们有两个函数来做这些。 正如我们稍后将看到的,这两种方法使用两种不同的 futures 返回方式。在实践中,这种幼稚的初等性测试方法并没有被证明是一个很好的例子。因此,我们不得不随机睡眠一段时间来模拟慢度。

// ch7/futures-example/src/main.rs

use futures::Future;
use futures_cpupool::CpuPool;
use std::io;

fn check_prime_boxed(n: u64) -> Box<Future<Item = bool, Error = io::Error>> {
    for i in 2..n {
        if n % i == 0 {
            return Box::new(futures::future::ok(false));
        }
    }
    Box::new(futures::future::ok(true))
}

fn check_prime_impl_trait(n: u64) -> impl Future<Item = bool, Error = io::Error> {
    for i in 2..n {
        if n % i == 0 {
            return futures::future::ok(false);
        }
    }
    futures::future::ok(true)
}

fn check_prime(n: u64) -> bool {
    for i in 2..n {
        if n % i == 0 {
            return false;
        }
    }
    true
}

fn main() {

    let input: u64 = 58466453;
    println!("Right before first call");
    let res_one = check_prime_boxed(input);
    println!("Called check_prime_boxed");
    let res_two = check_prime_impl_trait(input);
    // let _r = futures::executor::spawn(res_two);
    println!("Called check_prime_impl_trait");
    println!(
        "Results are {} and {}",
        res_one.wait().unwrap(),
        res_two.wait().unwrap()
    );

    let thread_pool = CpuPool::new(4);
    let res_three = thread_pool.spawn_fn(move || {
        let temp = check_prime(input);
        let result: Result<bool, ()> = Ok(temp);
        result
    });
    println!("Called check_prime in another thread");
    println!("Result from the last call: {}", res_three.wait().unwrap());
}

有几种返回 futures 的方式。第一种是使用 trait 对象,如在 check_prime_boxed 。 Box 是一个指向堆上对象的指针类型。从某种意义上说,它是一个托管指针,当对象超出范围时,它将被自动清理。 函数的返回类型是一个 trait 对象,它可以表示将其 Item 设置为 bool 且 Error 设置为 io::Error 的任何 future 。因此,这代表动态调度。 返回 futures 的第二种方法是使用 impl trait 功能。如 check_prime_impl_trait 中所做的。 我们说该函数返回返回一个实现 Future<Item=bool, Error=io::Error> 的类型。并且任何实现 Future trait 的类型都是 future ,我们的函数返回一个 future。 请注意,在这种情况下,我们不需要在返回结果之前使用 box 。因此,这种方法的一个优点是不需要分配来返回 future 。我们的两个函数都使用 future::ok 函数来表示我们的计算已成功完成给定的结果。 另一种选择是不实际返回 future 并使用基于 future 的线程池箱来创造 future 并管理它。这是 check_prime 只返回一个 bool 的情况。在我们的 main 函数中,我们使用futures-cpupool crate 建立一个线程池,然后运行该池中的最后一个函数。我们回到 future ,我们可以调用 wait 获得结果。 实现相同目标的完全不同的选择是返回实现 Future 特征的自定义类型。这个是最不符合人体工程学的,因为它涉及编写一些额外的代码,但它是最灵活的方法。

在构建了 future 之后,下一个目标就是执行它。有三种方法可以做到这一点:

  • 在当前线程中:这将最终阻塞当前线程,直到将来完成执行。在前面的示例中,res-one和res-two在主线程上执行,从而阻止用户交互。
  • 在线程池中:res_three 就是这种情况,它在名为 thread_pool 的线程池中执行。因此,在这种情况下,调用线程可以自己继续进行自己的处理。
  • 在事件循环中:在某些情况下,上述两种情况都不可能。那么唯一的选择是在事件循环中执行 future 。方便的是,tokio-core crate提供了适合期货的API来使用事件循环。我们将在下一节深入探讨这个模型

在我们的主函数中,我们调用主线程中的前两个函数。因此,它们将阻塞主线程的执行。但是,最后一个线程在另一个线程上运行。在这种情况下,主线程立即可以自由打印出调用了check-prime 的文件。它再次阻止了对未来的等待。请注意,在所有情况下, future 都被懒惰地评估。当我们运行此程序时,应该看到以下内容:

$ cargo run 
    Compiling futures-example v0.1.0 (file:///src/ch7/futures-example) 
     Finished dev [unoptimized + debuginfo] target(s) in 0.77 secs 
      Running `target/debug/futures-example` 
Right before first call 
Called check_prime_boxed 
Called check_prime_impl_trait 
Results are true and true 
Called check_prime in another thread 
Result from the last call: true

将 futures 与常规线程区分开来的是,它们可以按照人体工程学方式链接起来。 这就像是说,下载网页,然后解析HTML,然后提取给定的单词。这些步骤中的每一个都是一个 future ,下一个步骤不能开始,除非第一个步骤已经完成。 整个业务也是一个 future ,由若干组成 future 组成。当这个更大的 Future 被执行时,它被称为任务。 板条箱提供了许多用于与 futures::task 命名空间中的任务交互的 API。该库提供了许多以这种方式处理期货的功能。 当一个给定的类型实现 Future 的特性(实现轮询方法)时,编译器可以为所有这些组合器提供实现。让我们来看一个使用链接实现超时功能的示例。 我们将使用 tokio timer 板条箱来计算 future 的超时时间,在我们的代码中,我们有两个相互竞争的函数,它们可以随机休眠一段时间,然后向调用者返回一个固定的字符串。我们将同时调度所有这些函数,如果我们返回与第一个函数对应的字符串,我们将声明它已经赢了。同样,这也适用于第二个。如果我们也不回来,我们知道 future 的超时已经触发了。让我们从项目设置开始:

cargo new --bin futures-chaining

在 Cargo.toml 中添加依赖:

[package] 
name = "futures-chaining" 
version = "0.1.0" 
authors = ["Foo <foo@bar.com>"]

[dependencies] 
tokio-timer = "0.1.2"
futures = "0.1.17" 
futures-cpupool = "0.1.7" 
rand = "0.3.18"

和上次一样,我们使用一个线程池使用 futures-cpupool 来执行我们的 future 。让我们看看代码:

//  ch7/futures-chaining/src/main.rs

use futures::future::select_ok;
use futures::Future;
use futures_cpupool::CpuPool;
use rand::{thread_rng, Rng};
use std::thread;
use std::time::{Duration, Instant};
use tokio_timer::Timer;

fn player_one() -> &'static str {
    let d: u64 = thread_rng().gen_range(1, 5);
    thread::sleep(Duration::from_secs(d));
    "player_one"
}

fn player_two() -> &'static str {
    let d: u64 = thread_rng().gen_range(1, 5);
    thread::sleep(Duration::from_secs(d));
    "player_two"
}


fn main() {
    let pool = CpuPool::new_num_cpus();
    let timer = Timer::default();

    let timeout = timer.sleep(Duration::from_secs(3))
        .then(|_| Err(()));

    let one = pool.spawn_fn(|| {
        Ok(player_one())
    });

    let two = pool.spawn_fn(|| {
        Ok(player_two())
    });

    let tasks = vec![one, two];
    let winner = select_ok(tasks).select(timeout).map(|(result, _)| result);
    let result = winner.wait().ok();
    match result {
        Some(("player_one", _)) => println!("Player one won"),
        Some(("player_two", _)) => println!("Player two won"),
        Some((_, _)) | None => println!("Timed out"),
    }
}

我们的两个玩家非常相似;他们都会产生一个介于1到5之间的随机数,并在这段时间内睡眠。 之后,它们返回与名称对应的固定字符串。稍后我们将使用这些字符串来唯一地标识它们。 在我们的 main 函数中,我们初始化线程池和计时器。我们使用计时器上的组合器返回一个 3 秒后出错的 future 。然后,我们在线程池中生成这两个参与者,并将这些参与者的结果作为 future 返回。 请注意,这些函数目前并没有真正运行,因为 future 惰性执行。然后,我们将这些 future 放到一个列表中,并使用 select_ok 组合器并行运行这些 future 。此函数接受一个不可重复的 future ,并选择第一个成功的 future;这里唯一的限制是传递给此函数的所有 future 都应该是相同的类型。因此,我们不能在这里通过超时 future 。我们使用 select 组合器将 select_ok 的结果链接到超时 future ,select 组合器接受两个 future ,并等待其中一个完成执行。结果的未来将有一个已经完成的和一个没有完成的,然后我们使用 map 组合器来丢弃第二部分。 最后,我们阻塞了我们的 future ,并用 OK() 来表示链的结束。然后,我们可以将结果与已知字符串进行比较,以确定哪一个 future 获胜,并相应地打印出消息。

这是几次运行后的样子。由于我们的超时小于两个函数中任何一个的最大睡眠周期,我们应该看到一些超时。每当一个函数选择一个小于超时的时间时,它就会获得胜利。

$ cargo run 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/futures-chaining` 
Player two won 
$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/futures-chaining` 
Player one won 
$ cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/futures-chaining` 
Timed out

#ustream 和 sinks

futures 库为惰性评估的一系列事件提供了另一个有用的抽象,叫做 Stream 。 如果 Future 对应于 Result ,那么 Stream 对应于 Iterator 迭代器。从语义上讲,它们与 futures 非常相似,它们看起来是这样的:


# #![allow(unused_variables)]
#fn main() {
trait Stream { 
    type Item; 
    type Error; 
    fn poll(& mut self) -> Poll<Option<Self::Item>, Self::Error>; 
    ...
}
#}

这里唯一的区别是返回类型包含在 Option 中,与 Iterator 特征完全相同。因此,此处的 None 表示流已终止。此外,所有流都是 Future ,可以使用 into_future 进行转换。 让我们看一下使用这个结构的例子。我们将部分重用前一章中的collatz 示例。第一步是建立项目:

$ cargo new --bin streams

Cargo.toml 添加依赖:

[package] 
name = "streams" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies] 
futures = "0.1.17" 
rand = "0.3.18"

设置完所有后,我们的主文件将如下所示。在这种情况下,我们有一个名为Collat​​zStream的结构,它有两个字段用于当前状态和结束状态(应始终为1)。我们将在此实现Stream特性,使其表现为流:

// chapter7/streams/src/main.rs

use futures::stream::Stream;
use futures::Future;
use futures::{Async, Poll};
use rand::{thread_rng, Rng};
use std::time::Duration;
use std::{io, thread};

// This struct holds the current state and the end condition
// for the stream

#[derive(Debug)]
struct CollatzStream {
    current: u64,
    end: u64,
}

// A constructor to initialize the struct with defaults
impl CollatzStream {
    fn new(start: u64) -> CollatzStream {
        CollatzStream {
            current: start,
            end: 1,
        }
    }
}

// Implementation of the Stream trait for our struct
impl Stream for CollatzStream {
    type Item = u64;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
        let d: u64 = thread_rng().gen_range(1, 5);
        thread::sleep(Duration::from_secs(d));
        if self.current % 2 == 0 {
            self.current = self.current / 2;
        } else {
            self.current = 3 * self.current + 1;
        }

        if self.current == self.end {
            Ok(Async::Ready(None))
        } else {
            Ok(Async::Ready(Some(self.current)))
        }
    }
}

fn main() {
    let stream = CollatzStream::new(10);
    let f = stream.for_each(|num| {
        println!("{}", num);
        Ok(())
    });
    f.wait().ok();
}

我们通过在1到5秒之间的随机时间内模拟返回结果的延迟。 我们对轮询的实现返回Ok(Async :: Ready(None))以在流达到 1 时发出流已完成的信号。否则,它将当前状态返回为 Ok(Async::Ready(None)) 。很容易注意到,除了流语义之外,这个实现与迭代器的实现相同。在我们的main函数中,我们初始化 struct 并使用 for_each 组合器打印出流中的每个项目。这个组合器返回一个我们称之为 wait 的未来,可以阻止并获得所有结果。 以下是我们在运行最后一个示例时看到的内容:

$ cargo run 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/streams` 
5 
16 
8 
4 
2

与 Future 特征一样,Stream 特性也支持许多其他用于不同目的的组合器。 Stream 的双重性是 Sink,它是异步事件的接收者。 这在为 Rust 通道,网络套接字,文件描述符等的发送端建模时非常有用。

任何异步系统中的常见模式是同步。这很重要,因为组件通常需要彼此通信以传递数据或协调任务。 我们过去使用 channels 通道解决了这个问题。但是这些结构在这里不适用,因为标准库中的通道实现不是异步的。 因此,期货有自己的通道实施,它提供了您期望从异步系统获得的所有保证。 让我们看一个例子;我们的项目设置应如下所示:

cargo new --bin futures-ping-pong

Cargo.toml :

[package] 
name = "futures-ping-pong" 
version = "0.1.0" 
authors = ["Foo<foo@bar.com>"]

[dependencies] 
futures = "0.1" 
tokio-core = "0.1" 
rand = "0.3.18"

有两个函数,一个等待一段随机的时间,然后随机返回 ping 或者 pong 。这个函数是我们的发送者。如下:

// chapter7/futures-ping-pong/src/main.rs

use futures::sync::mpsc;
use futures::sync::mpsc::Receiver;
use futures::Future;
use futures::{Sink, Stream};
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use std::fmt::Debug;
use std::thread;
use std::time::Duration;

// Randomly selects a sleep duration between 1 and 5 seconds. Then
// randomly returns either "ping" or "pong"
fn sender() -> &'static str {
    let mut rng = thread_rng();

    let d: u64 = rng.gen_range(1, 5);
    thread::sleep(Duration::from_secs(d));

    let choices = ["ping", "pong"];
    choices.choose(&mut rng).unwrap()
}

// Receives input on the given channel and prints each item
fn receiver<T: Debug>(recv: Receiver<T>) {
    let f = recv.for_each(|item| {
        println!("{:?}", item);
        Ok(())
    });
    f.wait().ok();
}

fn main() {
    let (tx, rx) = mpsc::channel(100);
    let h1 = thread::spawn(|| {
        tx.send(sender()).wait().ok();
    });

    let h2 = thread::spawn(|| {
        receiver::<&str>(rx);
    });

    h1.join().unwrap();
    h2.join().unwrap();
}

futures 提供两种类型的通道:一次性通道只能使用一次发送和接收任何消息,一个常规的 mpsc 通道可以多次使用。 在我们的主要功能中,我们掌握了 channel 的两端,并将我们的发送者作为未来在另一个线程中产生。接收器是在另一个线程中生成的。在这两种情况下,我们都会记录句柄,以便稍后等待它们完成(使用连接)。 请注意,我们的接收器将通道的接收端作为参数。因为 Receiver 实现了Stream,我们可以在其上使用 and_then 组合器来打印出值。最后,在退出接收函数之前,我们在将来调用 wait()ok() 。在 main 函数中,我们连接两个线程句柄来驱动它们完成。

运行最后一个示例将随机打印“ping”或“pong”,具体取决于通过通道发送的内容。请注意,实际打印发生在接收端:

$ cargo run 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/futures-ping-pong` 
"ping"

futures 还在 futures::sync::BiLock 中提供了一个锁定机制,它与 std::sync::Mutex 紧密相连。这是一个面向 future 的互斥锁,用于仲裁两个所有者之间的资源共享。请注意,BiLock 仅适用于两个 future ,这是一个恼人的限制。 以下是它的工作原理:我们有兴趣修改我们的最后一个示例,以便在调用 sender 函数时显示计数器。现在我们的计数器需要是线程安全的,以便它可以在消费者之间共享。 使用Cargo设置项目:

$ cargo new --bin future-bilock

Cargo.toml 与上个示例一样,下面是 main.rs :

// chapter7/future-bilock/src/main.rs

use std::fmt::Debug;
use std::thread;
use std::time::Duration;

use futures::sync::mpsc::Receiver;
use futures::sync::{mpsc, BiLock};
use futures::{Async, Future, Sink, Stream};
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};

// Increments the shared counter if it can acquire a lock, then
// sleeps for a random duration between 1 and 5 seconds, then
// randomly returns either "ping" or "pong"
fn sender(send: &BiLock<u64>) -> &'static str {
    match send.poll_lock() {
        Async::Ready(mut lock) => *lock += 1,
        Async::NotReady => (),
    }
    let mut rng = thread_rng();
    let d: u64 = rng.gen_range(1, 5);
    thread::sleep(Duration::from_secs(d));

    let choices = ["ping", "pong"];
    choices.choose(&mut rng).unwrap()
}

// Tries to acquire a lock on the shared variable and prints it's
// value if it got the lock. Then prints each item in the given
// stream
fn receiver<T: Debug>(recv: Receiver<T>, recv_lock: BiLock<u64>) {
    match recv_lock.poll_lock() {
        Async::Ready(lock) => println!("Value if lock {}", *lock),
        Async::NotReady => (),
    }

    let f = recv.for_each(|item| {
        println!("{:?}", item);
        Ok(())
    });
    f.wait().ok();
}

fn main() {
    let counter = 0;
    let (send, recv) = BiLock::new(counter);
    let (tx, rx) = mpsc::channel(100);
    let h1 = thread::spawn(move || {
        tx.send(sender(&send)).wait().ok();
    });
    let h2 = thread::spawn(|| {
        receiver::<&str>(rx, recv);
    });
    h1.join().unwrap();
    h2.join().unwrap();

}


与上一个示例基本相同,但存在一些差异。在 main 函数中,把 counter 设置为 0 。然后我们用 counter 创建一个 BiLock 。构建函数返回两个句柄,就像是一个通道,然后我们可以传递它。然后我们创建我们的频道并产生发送者。现在,如果我们查看发件人,则会对其进行修改以接受对BiLock的引用。在函数中,我们尝试使用poll_lock获取锁,如果可行,我们递增计数器。否则,我们什么都不做。然后,我们继续我们通常的返回“ping”或“pong”的业务。接收器已经过修改,也可以使用BiLock。在那里,我们尝试获取一个锁,如果成功,我们打印出被锁定数据的值。在我们的主函数中,我们使用线程生成这些期货并加入它们以等待它们完成。

以下是在不成功的运行中发生的情况,当双方都无法获得锁定时。在一个真实的例子中,我们希望优雅地处理错误并重试。为简洁起见,我们省略了这一部分:

$ cargo run 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/futures-bilock` 
thread '<unnamed>' panicked at 'no Task is currently running', libcore/option.rs:917:5 
note: Run with `RUST_BACKTRACE=1` for a backtrace.
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:945:5

下面是运行正常的样子:

$ cargo run 
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs 
     Running `target/debug/futures-bilock` 
Value of lock 1 "pong"

Tokio

Tokio 中的套接字复用

编写流媒体协议

更大的 tokio 生态

结尾

第八章:安全

保护网络

Letsencrypt using Rust

OpenSSL using Rust

保护 tokio 应用安全

Cryptography using ring

总结

第九章:附录

协程和生成器简介

May 是怎么处理协程

Awaiting the future

数据并行性

使用 Pest 解析

Miscellaneous utilities

总结