在上一章中,我们使用 Tokio 框架来支持异步 Actor 模型。 我们的 Tokio 框架接受基本流量,然后在处理消息后将这些消息发送给参与者。 然而,我们的 TCP 处理是基本的。 如果这本书是您唯一接触过 TCP 的书,那么您不应该在这个基本的 TCP 过程上构建复杂的系统。 在本章中,我们将完全关注如何通过 TCP 连接打包、发送和读取数据。
在本章中,我们将讨论以下主题:
设置 TCP 客户端和 echo 服务器
使用结构体通过 TCP 处理字节
创建帧来分隔 TCP 上的消息
在 TCP 之上构建 HTTP 帧
在本章结束时,您将能够使用一系列不同的方法打包、发送和读取通过 TCP 发送的数据。 您将能够了解如何将数据拆分为可以作为结构处理的框架。 最后,您将能够构建一个 HTTP 框架,该框架具有包含 URL 和方法的标头以及包含数据的正文。 这将使您能够构建通过 TCP 发送数据时所需的任何数据结构。
技术要求
在本章中,我们将纯粹关注如何通过 TCP 连接处理数据。 因此,当我们构建自己的回显服务器时,我们将不会依赖任何以前的代码。
设置 TCP 客户端和服务器
为了探索通过 TCP 发送和处理字节,我们将创建一个基本的 echo 服务器和客户端。 我们将放弃在上一章中构建的任何复杂逻辑,因为在尝试探索有关发送、接收和处理字节的想法时,我们不需要复杂逻辑的干扰。
在一个新目录中,我们应该有两个货物项目——一个用于服务器,另一个用于客户端。 它们可以采用以下文件结构:
├── client
│ ├── Cargo.toml
│ └── src
│ └── main.rs
└── server
├── Cargo.toml
└── src
└── main.rs
这两个项目都将使用 Tokio 的相同依赖项,因此这两个项目都应在其 Cargo.toml 文件中定义以下依赖项:
[dependencies]
tokio = { version = "1", features = ["full"] }
我们现在需要构建回显服务器的基本机制。 这是从客户端向服务器发送消息的地方。 服务器然后处理客户端发送的消息,重新打包消息,并将相同的消息发送回客户端。 我们将从构建我们的服务器开始。
设置我们的 TCP 服务器
我们可以通过使用以下代码最初导入我们需要的所有内容来在 server/src/main.rs 文件中定义服务器:
use tokio::net::TcpListener;
use tokio::io::{BufReader, AsyncBufReadExt, AsyncWriteExt};
这样我们就可以监听传入的 TCP 流量,从该流量中读取字节,并将其写回发送消息的客户端。 然后,我们需要利用 Tokio 运行时来侦听传入流量,如果收到消息则启动线程。 如果您完成了上一章,那么这是您尝试自己执行此操作的好机会,因为我们介绍了创建侦听传入流量的 TCP 服务器所需的概念。
如果您尝试编写 TCP 服务器的基础知识,您的代码应如下所示:
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();
let socket = TcpListener::bind(&addr).await.unwrap();
println!("Listening on: {}", addr);
while let Ok((mut stream, peer)) =
socket.accept().await {
println!("Incoming connection from: {}",
peer.to_string());
tokio::spawn(async move {
. . .
});
}
}
在这里,我们应该熟悉这样的概念:我们创建一个侦听器,将其绑定到一个地址,然后等待传入的消息,在发送消息时创建一个线程。 在线程内部,我们循环传入消息,直到出现包含以下代码的新行:
println!("thread starting {} starting", peer.to_string());
let (reader, mut writer) = stream.split();
let mut buf_reader = BufReader::new(reader);
let mut buf = vec![];
loop {
match buf_reader.read_until(b'\n', &mut buf).await {
Ok(n) => {
if n == 0 {
println!("EOF received");
break;
}
let buf_string = String::from_utf8_lossy(&buf);
writer.write_all(buf_string.as_bytes())
.await.unwrap();
buf.clear();
},
Err(e) => println!("Error receiving message: {}", e)
}
}
println!("thread {} finishing", peer.to_string());
这段代码现在应该不足为奇了。 如果您不熟悉前面的代码所涵盖的任何概念,建议您阅读上一章。
现在我们已经定义了一个基本的 echo 服务器,并且它已准备好运行,这意味着我们可以将注意力转向在 client/src/main.rs 文件中创建客户端代码。 让客户端工作需要相同的结构和特征。 然后,我们需要向 TCP 服务器发送一条标准文本消息。 现在是您自己尝试和实现客户端的好时机,并且在构建客户端之前没有多次涉及过任何内容。
设置我们的 TCP 客户端
如果您尝试自己构建客户端,则应该导入以下结构和特征:
use tokio::net::TcpStream;
use tokio::io::{BufReader, AsyncBufReadExt, AsyncWriteExt};
use std::error::Error;
然后,我们必须建立 TCP 连接,发送消息,等待消息发回,然后使用 Tokio 运行时将其打印出来:
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut stream =
TcpStream::connect("127.0.0.1:8080").await?;
let (reader, mut writer) = stream.split();
println!("stream starting");
writer.write_all(b"this is a test\n").await?;
println!("sent data");
let mut buf_reader = BufReader::new(reader);
let mut buf = vec![];
println!("reading data");
let _ = buf_reader.read_until(b'\n', &mut
buf).await.unwrap();
let message = String::from_utf8_lossy(&buf);
println!("{}", message);
Ok(())
}
我们现在有了一个正常运行的客户端和服务器。 为了测试客户端和服务器是否正常工作,我们必须在一个终端中启动服务器,然后通过运行cargo run在另一个终端中运行客户端。 服务器将有以下打印输出:
stream starting
sent data
reading data
this is a test
我们的服务器打印输出将具有以下打印输出:
Listening on: 127.0.0.1:8080
Incoming connection from: 127.0.0.1:60545
thread starting 127.0.0.1:60545 starting
EOF received
thread 127.0.0.1:60545 finishing
这样,我们就有了一个基本的回显服务器和客户端。 我们现在可以专注于打包、解包和处理字节。 在下一节中,我们将探讨使用结构体标准化处理消息的基本方法。
使用结构体处理字节
在上一章中,我们将字符串发送到服务器。 然而,结果是我们必须将各个值解析为我们需要的类型。 由于字符串解析的结构不佳,其他开发人员不清楚我们的消息的结构。 我们可以通过定义可以通过 TCP 通道发送的结构体来使消息结构更加清晰。 通过 TCP 通道发送结构体可以通过在发送结构体本身之前将消息结构体转换为二进制格式来实现。 这也称为序列化数据。
如果我们要将结构转换为二进制格式,首先,我们需要使用 serde 和 bincode 包。 使用我们的新包,客户端和服务器 Cargo.toml 文件都应包含以下依赖项:
[dependencies]
serde = { version = "1.0.144", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
bincode = "1.3.3"
serde crate 将用于序列化该结构,而 bincode crate 将用于将消息结构转换为二进制格式。 现在我们的依赖关系已经定义,我们可以开始创建消息发送者客户端。
创建消息发送者客户端
我们可以构建 client/src/main.rs 文件以通过 TCP 发送结构。 首先,我们必须导入我们需要的东西:
. . .
use serde::{Serialize, Deserialize};
use bincode;
准备好导入后,我们可以使用以下代码定义 Message 结构:
#[derive(Serialize, Deserialize, Debug)]
struct Message {
pub ticker: String,
pub amount: f32
}
Message 结构体的定义采用与我们用来在 Actix 服务器上处理 HTTP 请求的 JSON 主体的结构体类似的形式。 然而,这一次,我们将不会使用 Actix Web 结构和特征来处理该结构。
我们的消息结构现在可以在我们的主函数中使用。 请记住,在我们的 main 函数中,我们有一个由以下代码创建的 TCP 流:
let mut stream = TcpStream::connect("127.0.0.1:8080").await?;
let (reader, mut writer) = stream.split();
现在我们已经建立了连接,我们可以创建 Message 结构并将 Message 结构转换为二进制格式:
let message = Message{ticker: String::from("BYND"),
amount: 3.2};
let message_bin = bincode::serialize(&message).unwrap();
我们的消息结构现在是二进制格式。 然后,我们必须使用以下代码通过 TCP 流发送消息:
println!("stream starting");
writer.write_all(&message_bin).await?;
writer.write_all(b"\n").await?;
println!("sent data");
请注意,我们已经发送了消息,然后发送了新行。 这是因为我们的服务器将读取直到出现新行。 如果我们不发送新行,那么程序将挂起并且永远不会完成。
现在我们已经发送了消息,我们可以等待,直到再次收到消息。 然后,我们必须从二进制格式构造 Message 结构,并使用以下代码打印出构造的 Message 结构:
let mut buf_reader = BufReader::new(reader);
let mut buf = vec![];
println!("reading data");
let _ = buf_reader.read_until(b'\n',
&mut buf).await.unwrap();
println!("{:?}", bincode::deserialize::<Message>(&buf));
我们的客户端现在已准备好向服务器发送消息。
在服务器中处理消息
当涉及到更新我们的服务器代码时,我们的目标是解压缩消息,打印出消息,然后将消息转换为二进制格式以发送回客户端。 在此阶段,您应该能够自己实施更改。 这是修改我们所涵盖内容的好机会。
如果您确实尝试首先在 server/src/main.rs 文件中实现服务器上消息的处理,则应该导入以下附加要求:
use serde::{Serialize, Deserialize};
use bincode;
然后,您应该定义 Message 结构,如下所示:
#[derive(Serialize, Deserialize, Debug)]
struct Message {
pub ticker: String,
pub amount: f32
}
现在,我们只需要处理消息,打印出消息,然后将消息返回给客户端即可。 我们可以使用线程内循环中的消息来管理所有进程,代码如下:
let message =
bincode::deserialize::<Message>(&buf).unwrap();
println!("{:?}", message);
let message_bin = bincode::serialize(&message).unwrap();
writer.write_all(&message_bin).await.unwrap();
writer.write_all(b"\n").await.unwrap();
buf.clear();
在这里,我们使用与客户端中使用的相同的方法,但反之亦然 - 也就是说,我们首先从二进制格式转换,然后最后转换为二进制格式。
如果我们运行我们的服务器,然后运行我们的客户端,我们的服务器将为我们提供以下打印输出:
Listening on: 127.0.0.1:8080
Incoming connection from: 127.0.0.1:50973
thread starting 127.0.0.1:50973 starting
Message { ticker: "BYND", amount: 3.2 }
EOF received
thread 127.0.0.1:50973 finishing
我们的客户给了我们以下打印输出:
stream starting
sent data
reading data
Ok(Message { ticker: "BYND", amount: 3.2 })
在这里,我们可以看到我们的 Message 结构可以被发送、接收,然后再次发送回来,没有任何妥协。 这使我们的 TCP 流量更加复杂,因为我们可以为消息提供更复杂的结构。 例如,消息的一个字段可以是 HashMap,消息的另一个字段可以是另一个结构的向量(如果向量中的结构实现了 serde 特征)。 我们可以修改和更改 Message 结构的结构,而无需重写用于解包和打包消息的协议。 其他开发人员只需查看我们的 Message 结构并了解通过 TCP 通道发送的内容。 现在我们已经改进了通过 TCP 发送消息的方式,我们可以将流分成带有帧的帧。
利用框架
到目前为止,我们通过 TCP 发送结构并用新行分隔这些消息。 本质上,这是最基本的框架形式。 然而,也有一些缺点。 我们必须记住添加一个分隔符,例如换行符; 否则,我们的程序将无限期地挂起。 我们还面临着通过在消息数据中使用分隔符而过早地将消息分成两条消息的风险。 例如,当我们使用新行分隔符拆分消息时,消息中的一大块文本包含新行或任何特殊字符或字节来表示需要将流分成可序列化的包,这并非不可想象。 为了防止此类问题,我们可以使用 Tokio 提供的内置框架支持。
在本节中,随着消息的发送和接收发生变化,我们将重写客户端和服务器。 如果我们尝试将新方法插入到客户端的现有代码中,很容易导致混乱。 在编写客户端和服务器之前,我们必须更新客户端和服务器的 Cargo.toml 文件中的依赖关系:
[dependencies]
tokio-util = {version = "0.7.4", features = ["full"] }
serde = { version = "1.0.144", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3.24"
bincode = "1.3.3"
bytes = "1.2.1"
在这里,我们使用了更多的板条箱。 当我们完成本节中的其余代码时,我们将满足他们的需求。 为了掌握框架,我们将从一个简单的任务开始,即重写我们的客户端以使其支持框架。
重写我们的客户端,使其支持框架
请记住,我们正在将整个客户端写入 client/src/main.rs 文件中。 首先,我们必须使用以下代码从 Tokio 导入我们需要的内容:
use tokio::net::TcpStream;
use tokio_util::codec::{BytesCodec, Decoder};
TcpStream 用于连接到我们的服务器。 BytesCodec 结构用于通过连接传送原始字节。 我们将使用 BytesCodec 结构来配置帧。 解码器是一种对我们通过连接接受的字节进行解码的特征。 但是,当通过连接发送数据时,我们可以传入结构、字符串或任何其他必须转换为字节的内容。 因此,我们必须通过查看 BytesCodec 的源代码来检查 BytesCodec 结构体的实现内容。 可以通过查看文档或仅按住 Control 键单击或将鼠标悬停在编辑器中的 BytesCodec 结构上来检查源代码。 当我们检查 BytesCodec 结构体的源代码时,我们将看到以下 Encode 实现:
impl Encoder<Bytes> for BytesCodec {
. . .
}
impl Encoder<BytesMut> for BytesCodec {
. . .
}
在这里,我们只能使用 BytesCodec 结构通过连接发送 Bytes 或 BytesMut。 我们可以为 BytesCodec 实现 Encode 来发送其他类型的数据; 然而,对于我们的用例来说,这太过分了,通过我们的连接发送字节才有意义。 然而,在编写更多代码之前,我们不妨检查一下 Bytes 实现,以了解框架的工作原理。 Encode for Bytes 的实现采用以下形式:
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;
fn encode(&mut self, data: Bytes, buf: &mut BytesMut)
-> Result<(), io::Error> {
buf.reserve(data.len());
buf.put(data);
Ok(())
}
}
在这里,我们可以看到正在传递的数据的长度被保留在缓冲区中。 然后数据被放入缓冲区。
现在我们了解了如何使用框架对消息进行编码和解码,我们需要从 futures 和 bytes 包中导入特征,以使我们能够处理消息:
use futures::sink::SinkExt;
use futures::StreamExt;
use bytes::Bytes;
SinkExt 和 StreamExt 特性本质上使我们能够从流中异步接收消息。 Bytes 结构将包装我们要发送的序列化消息。 然后,我们必须导入特征以启用消息序列化并定义我们的消息结构:
use serde::{Serialize, Deserialize};
use bincode;
use std::error::Error;
#[derive(Serialize, Deserialize, Debug)]
struct Message {
pub ticker: String,
pub amount: f32
}
现在我们已经拥有了开始运行时所需的一切。 请记住,我们的主要运行时具有以下概要:
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
. . .
Ok(())
}
在我们的运行时中,我们最初建立一个 TCP 连接并使用以下代码定义帧:
let stream = TcpStream::connect("127.0.0.1:8080").await?;
let mut framed = BytesCodec::new().framed(stream);
然后,我们定义消息,序列化消息,然后使用以下代码将消息包装在字节中:
let message = Message{ticker: String::from("BYND"),
amount: 3.2};
let message_bin = bincode::serialize(&message).unwrap();
let sending_message = Bytes::from(message_bin);
然后,我们可以发送消息,等待消息发回,然后使用以下代码反序列化消息并将其打印出来:
framed.send(sending_message).await.unwrap();
let message = framed.next().await.unwrap().unwrap();
let message =
bincode::deserialize::<Message>(&message).unwrap();
println!("{:?}", message);
至此,我们的客户端就构建完成了。 我们可以看到,我们不必担心新行或任何其他分隔符。 当涉及到通过 TCP 发送和接收消息时,我们的代码干净、简单。 现在我们的客户端已经构建好了,我们可以继续构建我们的服务器,以便它处理框架。
重写我们的服务器以支持分帧
当谈到构建我们的服务器以支持框架时,与我们在上一节中编写的代码有很多重叠。 此时,是尝试自己构建服务器的好时机。 构建服务器需要将我们在上一节中编码的框架逻辑实现到现有服务器代码中。
如果您尝试重写服务器,首先您应该导入以下结构和特征:
use tokio::net::TcpListener;
use tokio_util::codec::{BytesCodec, Decoder};
use futures::StreamExt;
use futures::sink::SinkExt;
use bytes::Bytes;
use serde::{Serialize, Deserialize};
use bincode;
请注意,我们导入的 Decoder 特征允许我们在字节编解码器上调用 .framed。 这里没有什么对你来说是新鲜的。 一旦我们有了必要的导入,我们必须使用以下代码定义相同的 Message 结构:
#[derive(Serialize, Deserialize, Debug)]
struct Message {
pub ticker: String,
pub amount: f32
}
现在,我们必须使用以下代码定义服务器运行时的轮廓:
#[tokio::main]
async fn main() {
let addr = "127.0.0.1:8080".to_string();
let listener = TcpListener::bind(&addr).await.unwrap();
println!("Listening on: {}", addr);
loop {
let (socket, _) = listener.accept().await.unwrap();
tokio::spawn(async move {
. . .
});
}
}
在这里,我们可以看到侦听器正在循环接受流量,并在收到消息时生成线程,就像以前的服务器实现一样。 在我们的线程中,我们使用以下代码读取框架消息:
let mut framed = BytesCodec::new().framed(socket);
let message = framed.next().await.unwrap();
match message {
Ok(bytes) => {
. . .
},
Err(err) => println!("Socket closed with error:
{:?}", err),
}
println!("Socket received FIN packet and closed
connection");
正如我们所看到的,我们不再有 while 循环。 这是因为我们的框架管理消息之间的分割。
一旦我们从连接中提取了字节,我们必须实现与在客户端中执行的相同逻辑,在客户端中处理消息、打印出来、再次处理它,然后将其发送回客户端:
let message =
bincode::deserialize::<Message>(&bytes).unwrap();
println!("{:?}", message);
let message_bin = bincode::serialize(&message).unwrap();
let sending_message = Bytes::from(message_bin);
framed.send(sending_message).await.unwrap();
我们现在有了一个使用框架的工作客户端和服务器。 如果我们启动服务器然后运行客户端,客户端将为我们提供以下打印输出:
Message { ticker: "BYND", amount: 3.2 }
我们的服务器将为我们提供以下打印输出:
Listening on: 127.0.0.1:8080
Message { ticker: "BYND", amount: 3.2 }
Socket received FIN packet and closed connection
我们的服务器和客户端现在支持分帧。 我们已经走了很长一段路。 现在,我们在本章中只剩下一个概念需要探索,那就是使用 TCP 构建 HTTP 帧。
在 TCP 之上构建 HTTP 帧
在探索本书中的 Tokio 框架之前,我们使用 HTTP 向服务器发送和接收数据。 HTTP 协议本质上是建立在 TCP 之上的。 在本节中,虽然我们将创建一个 HTTP 帧,但我们不会完全模仿 HTTP 协议。 相反,为了防止过多的代码,我们将创建一个基本的 HTTP 框架来了解创建 HTTP 框架时使用的机制。 还必须强调,这是出于教育目的。 TCP 对我们的协议很有好处,但如果您想使用 HTTP 处理程序,那么使用 Hyper 等开箱即用的 HTTP 处理程序会更快、更安全且不易出错。 我们将在下一章介绍如何在 Tokio 中使用超 HTTP 处理程序。
当涉及 HTTP 请求时,请求通常具有标头和正文。 当我们发送请求时,标头会告诉我们正在使用什么方法以及与请求关联的 URL。 为了定义我们的 HTTP 框架,我们需要在服务器和客户端上定义框架的相同结构。 因此,client/src/http_frame.rs 和 server/src/http_frame.rs 文件必须具有相同的代码。 首先,我们必须使用以下代码导入所需的序列化特征:
use serde::{Serialize, Deserialize};
然后,我们必须使用以下代码定义 HTTP 框架:
#[derive(Serialize, Deserialize, Debug)]
pub struct HttpFrame {
pub header: Header,
pub body: Body
}
正如我们所看到的,我们在 HttpFrame 结构中定义了标头和正文。 我们使用以下代码定义标头和主体结构:
#[derive(Serialize, Deserialize, Debug)]
pub struct Header {
pub method: String,
pub uri: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Body {
pub ticker: String,
pub amount: f32,
}
现在我们的基本 HTTP 框架已经完成,我们可以使用以下代码将 HTTP 框架导入到客户端和服务器的 main.rs 文件中:
mod http_frame;
use http_frame::{HttpFrame, Header, Body};
我们首先使用以下代码在客户端的 main.rs 文件中发送 HTTP 帧:
let stream = TcpStream::connect("127.0.0.1:8080").await?;
let mut framed = BytesCodec::new().framed(stream);
let message = HttpFrame{
header: Header{
method: "POST".to_string(),
uri: "www.freshcutswags.com/stock/purchase".to_string()
},
body: Body{
ticker: "BYND".to_string(),
amount: 3.2,
}
};
let message_bin = bincode::serialize(&message).unwrap();
let sending_message = Bytes::from(message_bin);
framed.send(sending_message).await.unwrap();
我们可以看到我们的 HTTP 帧开始看起来像我们在 Actix 服务器中接收请求时要处理的 HTTP 请求。 对于我们服务器中的 main.rs 文件,几乎没有变化。 我们所要做的就是使用以下代码重新定义正在反序列化的结构:
let message = bincode::deserialize::<HttpFrame>(&bytes).unwrap();
println!("{:?}", message);
let message_bin = bincode::serialize(&message).unwrap();
let sending_message = Bytes::from(message_bin);
framed.send(sending_message).await.unwrap();
现在我们有了一个基本的 HTTP 框架,可以用来发送信息。 如果我们先运行服务器,然后运行客户端程序,我们将得到以下服务器打印输出:
Listening on: 127.0.0.1:8080
HttpFrame { header: Header {
method: "POST",
uri: "www.freshcutswags.com/stock/purchase"
},
body: Body {
ticker: "BYND",
amount: 3.2
}
}
Socket received FIN packet and closed connection
然后我们的客户端程序将为我们提供以下打印输出:
HttpFrame { header: Header {
method: "POST",
uri: "www.freshcutswags.com/stock/purchase"
},
body: Body {
ticker: "BYND",
amount: 3.2
}
}
我们可以看到我们的数据没有损坏。 我们现在已经涵盖了通过 TCP 打包、发送和读取数据所需的所有核心基本途径和方法。
概括
在本章中,我们构建了一个基本的 TCP 客户端,用于向回显服务器发送和接收数据。 我们首先发送基本字符串数据并用分隔符分隔消息。 然后,我们通过序列化结构增加了通过 TCP 连接发送的数据的复杂性。 这使我们能够拥有更复杂的数据结构。 这种序列化还减少了以我们需要的格式获取消息数据所需的处理。例如,在上一章中,我们在收到消息后将字符串解析为浮点数。 对于结构,没有什么可以阻止我们将浮点数列表作为字段,并且在消息序列化之后,我们将让该字段容纳浮点数列表,而无需任何额外的代码行。
结构的序列化足以让我们处理大多数问题,但我们探索了成帧,以便我们不必依赖分隔符来分隔我们通过 TCP 发送的消息。 通过框架,我们构建了一个基本的 HTTP 框架,以可视化我们可以使用框架做什么以及 HTTP 如何构建在 TCP 之上。 我们必须记住,实现 HTTP 协议比我们在本章中所做的更复杂,建议我们利用 crate 中已建立的 HTTP 处理程序来处理 HTTP 流量。
在下一章中,我们将使用已建立的 Hyper crate 与 Tokio 运行时框架来处理 HTTP 流量。