C#网络编程(异步传输字符串) - Part.3
消息发送时的问题
这个问题就是:客户端分两次向流中写入数据(比如字符串)时,我们主观上将这两次写入视为两次请求;然而服务端有可能将这两次合起来视为一条请求,这在两个请求间隔时间比较短的情况下尤其如此。同样,也有可能客户端发出一条请求,但是服务端将其视为两条请求处理。下面列出了可能的情况,假设我们在客户端连续发送两条“Welcome to Tracefact.net!”,则数据到达服务端时可能有这样三种情况:
NOTE:在这里我们假设采用ASCII编码方式,因为此时上面的一个方框正好代表一个字节,而字符串到达末尾后为持续的0(因为byte是值类型,且最小为0)。
上面的第一种情况是最理想的情况,此时两条消息被视为两个独立请求由服务端完整地接收。第二种情况的示意图如下,此时一条消息被当作两条消息接收了:
而对于第三种情况,则是两条消息被合并成了一条接收:
如果你下载了上一篇文章所附带的源码,那么将Client2.cs进行一下修改,不通过用户输入,而是使用一个for循环连续的发送三个请求过去,这样会使请求的间隔时间更短,下面是关键代码:
string msg = "Welcome to TraceFact.Net!";
for (int i = 0; i <= 2; i ) {
byte[] buffer = Encoding.Unicode.GetBytes(msg); // 获得缓存
try {
streamToServer.Write(buffer, 0, buffer.Length); // 发往服务器
Console.WriteLine("Sent: {0}", msg);
} catch (Exception ex) {
Console.WriteLine(ex.Message);
break;
}
}
运行服务端,然后再运行这个客户端,你可能会看到这样的结果:
可以看到,尽管上面将消息分成了三条单独发送,但是服务端却将后两条合并成了一条。对于这些情况,我们可以这样处理:就好像HTTP协议一样,在实际的请求和应答内容之前包含了HTTP头,其中是一些与请求相关的信息。我们也可以订立自己的协议,来解决这个问题,比如说,对于上面的情况,我们就可以定义这样一个协议:
[length=XXX]:其中xxx是实际发送的字符串长度(注意不是字节数组buffer的长度),那么对于上面的请求,则我们发送的数据为:“[length=25]Welcome to TraceFact.Net!”。而服务端接收字符串之后,首先读取这个“元数据”的内容,然后再根据“元数据”内容来读取实际的数据,它可能有下面这样两种情况:
NOTE:我觉得这里借用“元数据”这个术语还算比较恰当,因为“元数据”就是用来描述数据的数据。
“[“”]”中括号是完整的,可以读取到length的字节数。然后根据这个数值与后面的字符串长度相比,如果相等,则说明发来了一条完整信息;如果多了,那么说明接收的字节数多了,取出合适的长度,并将剩余的进行缓存;如果少了,说明接收的不够,那么将收到的进行一个缓存,等待下次请求,然后将两条合并。
“[”“]”中括号本身就不完整,此时读不到length的值,因为中括号里的内容被截断了,那么将读到的数据进行缓存,等待读取下次发送来的数据,然后将两次合并之后再按上面的方式进行处理。
接下来我们来看下如何来进行实际的操作,实际上,这个问题已经不属于C#网络编程的内容了,而完全是对字符串的处理。所以我们不再编写服务端/客户端代码,直接编写处理这几种情况的方法:
public class RequestHandler {
private string temp = string.Empty;
public string[] GetActualString(string input) {
return GetActualString(input, null);
}
private string[] GetActualString(string input, List<string> outputList) {
if (outputList == null)
outputList = new List<string>();
if (!String.IsNullOrEmpty(temp))
input = temp input;
string output = "";
string pattern = @"(?<=^[length=)(d )(?=])";
int length;
if (Regex.IsMatch(input, pattern)) {
Match m = Regex.Match(input, pattern);
// 获取消息字符串实际应有的长度
length = Convert.ToInt32(m.Groups[0].Value);
// 获取需要进行截取的位置
int startIndex = input.IndexOf(']') 1;
// 获取从此位置开始后所有字符的长度
output = input.Substring(startIndex);
if (output.Length == length) {
// 如果output的长度与消息字符串的应有长度相等
// 说明刚好是完整的一条信息
outputList.Add(output);
temp = "";
} else if (output.Length < length) {
// 如果之后的长度小于应有的长度,
// 说明没有发完整,则应将整条信息,包括元数据,全部缓存
// 与下一条数据合并起来再进行处理
temp = input;
// 此时程序应该退出,因为需要等待下一条数据到来才能继续处理
} else if (output.Length > length) {
// 如果之后的长度大于应有的长度,
// 说明消息发完整了,但是有多余的数据
// 多余的数据可能是截断消息,也可能是多条完整消息
// 截取字符串
output = output.Substring(0, length);
outputList.Add(output);
temp = "";
// 缩短input的长度
input = input.Substring(startIndex length);
// 递归调用
GetActualString(input, outputList);
}
} else { // 说明“[”,“]”就不完整
temp = input;
}
return outputList.ToArray();
}
}
这个方法接收一个满足协议格式要求的输入字符串,然后返回一个数组,这是因为如果出现多次请求合并成一个发送过来的情况,那么就将它们全部返回。随后简单起见,我在这个类中添加了一个静态的Test()方法和PrintOutput()帮助方法,进行了一个简单的测试,注意我直接输入了length=13,这个是我提前计算好的。
public static void Test() {
RequestHandler handler = new RequestHandler();
string input;
// 第一种情况测试 - 一条消息完整发送
input = "[length=13]明天中秋,祝大家节日快乐!";
handler.PrintOutput(input);
// 第二种情况测试 - 两条完整消息一次发送
input = "明天中秋,祝大家节日快乐!";
input = String.Format
("[length=13]{0}[length=13]{0}", input);
handler.PrintOutput(input);
// 第三种情况测试A - 两条消息不完整发送
input = "[length=13]明天中秋,祝大家节日快乐![length=13]明天中秋";
handler.PrintOutput(input);
input = ",祝大家节日快乐!";
handler.PrintOutput(input);
// 第三种情况测试B - 两条消息不完整发送
input = "[length=13]明天中秋,祝大家";
handler.PrintOutput(input);
input = "节日快乐![length=13]明天中秋,祝大家节日快乐!";
handler.PrintOutput(input);
// 第四种情况测试 - 元数据不完整
input = "[leng";
handler.PrintOutput(input); // 不会有输出
input = "th=13]明天中秋,祝大家节日快乐!";
handler.PrintOutput(input);
}
// 用于测试输出
private void PrintOutput(string input) {
Console.WriteLine(input);
string[] outputArray = GetActualString(input);
foreach (string output in outputArray) {
Console.WriteLine(output);
}
Console.WriteLine();
}
运行上面的程序,可以得到如下的输出:
OK,从上面的输出可以看到,这个方法能够满足我们的要求。对于这篇文章最开始提出的问题,可以很轻松地通过加入这个方法来解决,这里就不再演示了,但在本文所附带的源代码含有修改过的程序。在这里花费了很长的时间,接下来让我们回到正题,看下如何使用异步方式完成上一篇中的程序吧。
异步传输字符串
在上一篇中,我们由简到繁,提到了服务端的四种方式:服务一个客户端的一个请求、服务一个客户端的多个请求、服务多个客户端的一个请求、服务多个客户端的多个请求。我们说到可以将里层的while循环交给一个新建的线程去让它来完成。除了这种方式以外,我们还可以使用一种更好的方式――使用线程池中的线程来完成。我们可以使用BeginRead()、BeginWrite()等异步方法,同时让这BeginRead()方法和它的回调方法形成一个类似于while的无限循环:首先在第一层循环中,接收到一个客户端后,调用BeginRead(),然后为该方法提供一个读取完成后的回调方法,然后在回调方法中对收到的字符进行处理,随后在回调方法中接着调用BeginRead()方法,并传入回调方法本身。
由于程序实现功能和上一篇完全相同,我就不再细述了。而关于异步调用方法更多详细内容,可以参见 C#中的委托和事件(续)。
1.服务端的实现
当程序越来越复杂的时候,就需要越来越高的抽象,所以从现在起我们不再把所有的代码全部都扔进Main()里,这次我创建了一个RemoteClient类,它对于服务端获取到的TcpClient进行了一个包装:
public class RemoteClient {
private TcpClient client;
private NetworkStream streamToClient;
private const int BufferSize = 8192;
private byte[] buffer;
private RequestHandler handler;
public RemoteClient(TcpClient client) {
this.client = client;
// 打印连接到的客户端信息
Console.WriteLine("nClient Connected!{0} <-- {1}",
client.Client.LocalEndPoint, client.Client.RemoteEndPoint);
// 获得流
streamToClient = client.GetStream();
buffer = new byte[BufferSize];
// 设置RequestHandler
handler = new RequestHandler();
// 在构造函数中就开始准备读取
AsyncCallback callBack = new AsyncCallback(ReadComplete);
streamToClient.BeginRead(buffer, 0, BufferSize, callBack, null);
}
// 再读取完成时进行回调
private void ReadComplete(IAsyncResult ar) {
int bytesRead = 0;
try {
lock (streamToClient) {
bytesRead = streamToClient.EndRead(ar);
Console.WriteLine("
Reading data, {0} bytes ...", bytesRead);
}
if (bytesRead == 0) throw new Exception("读取到0字节");
string msg = Encoding.Unicode.GetString(buffer, 0, bytesRead);
Array.Clear(buffer,0,buffer.Length); // 清空缓存,避免脏读
string[] msgArray = handler.GetActualString(msg); // 获取实际的字符串
// 遍历获得到的字符串
foreach (string m in msgArray) {
Console.WriteLine("Received: {0}", m);
string back = m.ToUpper();
// 将得到的字符串改为大写并重新发送
byte[] temp = Encoding.Unicode.GetBytes(back);
streamToClient.Write(temp, 0, temp.Length);
streamToClient.Flush();
Console.WriteLine("Sent: {0}", back);
}
// 再次调用BeginRead(),完成时调用自身,形成无限循环
lock (streamToClient) {
AsyncCallback callBack = new AsyncCallback(ReadComplete);
streamToClient.BeginRead(buffer, 0, BufferSize, callBack, null);
}
} catch(Exception ex) {
if(streamToClient!=null)
streamToClient.Dispose();
client.Close();
Console.WriteLine(ex.Message); // 捕获异常时退出程序
}
}
}
随后,我们在主程序中仅仅创建TcpListener类型实例,由于RemoteClient类在构造函数中已经完成了初始化的工作,所以我们在下面的while循环中我们甚至不需要调用任何方法:
class Server {
static void
Main(string[] args) {
Console.WriteLine("Server is running ... ");
IPAddress ip = new IPAddress(new byte[] { 127, 0, 0, 1 });
TcpListener listener = new TcpListener(ip, 8500);
listener.Start(); // 开始侦听
Console.WriteLine("Start Listening ...");
while (true) {
// 获取一个连接,同步方法,在此处中断
TcpClient client = listener.AcceptTcpClient();
RemoteClient wapper = new RemoteClient(client);
}
}
}
好了,服务端的实现现在就完成了,接下来我们再看一下客户端的实现:
2.客户端的实现
与服务端类似,我们首先对TcpClient进行一个简单的包装,使它的使用更加方便一些,因为它是服务端的客户,所以我们将类的名称命名为ServerClient:
public class ServerClient {
private const int BufferSize = 8192;
private byte[] buffer;
private TcpClient client;
private NetworkStream streamToServer;
private string msg = "Welcome to TraceFact.Net!";
public ServerClient() {
try {
client = new TcpClient();
client.Connect("localhost", 8500); // 与服务器连接
} catch (Exception ex) {
Console.WriteLine(ex.Message);
return;
}
buffer = new byte[BufferSize];
// 打印连接到的服务端信息
Console.WriteLine("Server Connected!{0} --> {1}",
client.Client.LocalEndPoint, client.Client.RemoteEndPoint);
streamToServer = client.GetStream();
}
// 连续发送三条消息到服务端
public void SendMessage(string msg) {
msg = String.Format("[length={0}]{1}", msg.Length, msg);
for (int i = 0; i <= 2; i ) {
byte[] temp = Encoding.Unicode.GetBytes(msg); // 获得缓存
try {
streamToServer.Write(temp, 0, temp.Length); // 发往服务器
Console.WriteLine("Sent: {0}", msg);
} catch (Exception ex) {
Console.WriteLine(ex.Message);
break;
}
}
lock (streamToServer) {
AsyncCallback callBack = new AsyncCallback(ReadComplete);
streamToServer.BeginRead(buffer, 0, BufferSize, callBack, null);
}
}
public void SendMessage() {
SendMessage(this.msg);
}
// 读取完成时的回调方法
private void ReadComplete(IAsyncResult ar) {
int bytesRead;
try {
lock (streamToServer) {
bytesRead = streamToServer.EndRead(ar);
}
if (bytesRead == 0) throw new Exception("读取到0字节");
string msg = Encoding.Unicode.GetString(buffer, 0, bytesRead);
Console.WriteLine("Received: {0}", msg);
Array.Clear(buffer, 0, buffer.Length); // 清空缓存,避免脏读
lock (streamToServer) {
AsyncCallback callBack = new AsyncCallback(ReadComplete);
streamToServer.BeginRead(buffer, 0, BufferSize, callBack, null);
}
} catch (Exception ex) {
if(streamToServer!=null)
streamToServer.Dispose();
client.Close();
Console.WriteLine(ex.Message);
}
}
}
在上面的SendMessage()方法中,我们让它连续发送了三条同样的消息,这么仅仅是为了测试,因为异步操作同样会出现上面说过的:服务器将客户端的请求拆开了的情况。最后我们在Main()方法中创建这个类型的实例,然后调用SendMessage()方法进行测试:
class Client {
static void
Main(string[] args) {
ConsoleKey key;
ServerClient client = new ServerClient();
client.SendMessage();
Console.WriteLine("nn输入"Q"键退出。");
do {
key = Console.ReadKey(true).Key;
} while (key != ConsoleKey.Q);
}
}
是不是感觉很清爽?因为良好的代码重构,使得程序在复杂程度提高的情况下依然可以在一定程度上保持良好的阅读性。
3.程序测试
最后一步,我们先运行服务端,接着连续运行两个客户端,看看它们的输出分别是什么:
大家可以看到,在服务端,我们可以连接多个客户端,同时为它们服务;除此以外,由接收的字节数发现,两个客户端均有两个请求被服务端合并成了一条请求,因为我们在其中加入了特殊的协议,所以在服务端可以对这种情况进行良好的处理。
在客户端,我们没有采取类似的处理,所以当客户端收到应答时,仍然会发生请求合并的情况。对于这种情况,我想大家已经知道该如何处理了,就不再多费口舌了。
使用这种定义协议的方式有它的优点,但缺点也很明显,如果客户知道了这个协议,有意地输入[length=xxx],但是后面的长度却不匹配,此时程序就会出错。可选的解决办法是对“[”和“]”进行编码,当客户端有意输入这两个字符时,我们将它替换成“[”和“]”或者别的字符,在读取后再将它还原。
关于这个范例就到此结束了,剩下的两个范例都将采用异步传输的方式,并且会加入更多的协议内容。下一篇我们将介绍如何向服务端发送或接收文件。
继续浏览系列文章