利用NIO,ByteBuffer处理TCP长连接数据粘包的问题
问题:TCP 长连接会发生数据粘包
原因:IP层对数据进行了分包发送(就是说这个与发送端无关,接收端必须处理)
TCP长连接发送数据是以数据流的方式传输。当然也并不是将整个数据一次发送。当然程序里面就是write一次就足够了。然而这个会被IP层进行分包发送,这样意味着数据并不是一次能接受完的。如果数据量不大(1.3kb~1.4kb)就有可能一次收完,能否一次接受完毕主要看当时网络的MTU
值。如果一次不能读完这条数据,就尝试去读取下一条数据,这样读到的数据就会错乱。比如
发送端发送了 112233 ,332211
2条数据,而接受端可能读到的数据会是 1122,33332211
这样我们的程序就会无法处理这样的消息,这就是所谓的数据粘包
如何处理这样的问题
- 1.改用短连接,短连接不存在一个连接中多次调用write 就意味可以通过
inputStream.read(new byte[1024])==-1
的方式来完整的读取整个数据。这个明显不是处理问题的方式。
- 2.处理数据数据粘包,既然要处理那么就得告诉接受方这条数据是咋个样的。
- 1.设置包结束标识符,读到特定的标识符意味则这是一条完整的数据
- 2.定长的方式,先告诉这个包多长。接受方收到这么多数据就以为则数据完整(主要针对字符流,个人比较喜欢这种)
原生Socket处理方式(不通过)
通过观察原生的 Socket 并没有提供的直接处理的方法,于是这个就得自己程序实现了。既然是读取流那我来看看 socket.getInputStream()
这里的数据倒是有 in.available();
(返回当前有多少数据) 这个方法。 发现也不能处理这个问题。多数是说是网络传输的原因导致这个方法的返回值不准确
NIO 终极处理方式
由于文章主要讲数据的粘包的处理,就不介绍NIO的Selector的机制了,直接上Buffer
考虑到了缓存流 ByteBuffer
ByteBuffer 能做哪些事呢
简单的说下 ByteBuffer 大慨是,使用的候先分配多大的大小,使用put的方式进行填充数据。当ByteBuffer 满的情况时候可以通过hasRemaining()
,buffer.remaining()==0
查看。通过这种方法我们就可以得到一个准确的结果
还有其他的Buffer,如CharBuffer
ByteBuffer buffer=ByteBuffer.allocate(1024);
buffer.put(new byte[512]);
System.out.println("hasRemaining "+ buffer.hasRemaining());
buffer.put(new byte[512]);
System.out.println("hasRemaining "+ buffer.hasRemaining());
打印
hasRemaining true
hasRemaining false
当然有ByteBuffer 我们就可能正确的知道数据是否满了。
说完了原理来看看代码该如何实现
2.简单定义一个通信格式 4 位的数据长度 (数据域的大小)+ 数据
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOClient {
public static void main(String[] args) {
InetSocketAddress SERVER_ADDRESS = new InetSocketAddress("192.168.2.200", 8811);
String sendSome="7777777777777";
int count=-1;
try{
// 打开socket通道
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞方式
socketChannel.configureBlocking(false);
// 打开选择器
Selector selector = Selector.open();
// 注册连接服务端socket动作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 连接
socketChannel.connect(SERVER_ADDRESS);
// 分配缓冲区大小内存
SetselectionKeys;
Iteratoriterator;
SelectionKey selectionKey;
SocketChannel client;
ByteBuffer input = ByteBuffer.allocate(1024*100);
ByteBuffer header=ByteBuffer.allocate(4);
ByteBuffer sendbuffer = ByteBuffer.allocate(sendSome.getBytes().length);
while (true) {
//选择一组键,其相应的通道已为 I/O 操作准备就绪。
//此方法执行处于阻塞模式的选择操作。
selector.select();
//返回此选择器的已选择键集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
client = (SocketChannel) selectionKey.channel();
if (selectionKey.isConnectable()) {
// 判断此通道上是否正在进行连接操作。
// 完成套接字通道的连接过程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("connect finished");
sendbuffer.clear();
sendbuffer.put((sendSome).getBytes());
//将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
sendbuffer.flip();
client.write(sendbuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
if(count==-1){
client.read(header);
//并且缓存区的长度大于4(包头部分已经接受完毕)
if(!header.hasRemaining()){
count=byteArrayToInt(header.array());
System.out.println("dataSize ......"+count);
header.clear();
input = ByteBuffer.allocate(count);
client.register(selector, SelectionKey.OP_READ);
}
}
else{
System.out.println("wait......"+input.remaining());
// 尝试读取数据区域
client.read(input);
//input.mark();
if(!input.hasRemaining()){
// 这个时候可以解析数据
System.out.println("data full");
input.clear();
client.register(selector, SelectionKey.OP_WRITE);
}
else{
// 数据还没有填充满,继续接受数据
client.register(selector, SelectionKey.OP_READ);
}
}
}
else if(selectionKey.isWritable()){
client = (SocketChannel) selectionKey.channel();
count=-1;
sendbuffer.clear();
sendbuffer.put(sendSome.getBytes());
sendbuffer.flip();
client.write(sendbuffer);
System.out.println("send Message "+sendSome);
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}
catch(Exception e){
e.printStackTrace();
}
}
public static int byteArrayToInt(byte[] b) {
return b[3] & 0xFF | (b[2] & 0xFF) << 8 | (b[1] & 0xFF) << 16
| (b[0] & 0xFF) << 24;
}
}
大致是这样了。
对于通过特定标识符的方式也类似。 比如以#### 那么程序只需每次 read 后判断是否有这个标识,存在认为是一个包。区别在于需要自己组装
程序读到这样的2个包111#####111,111####
实际需要程序处理为 111,111111
PS: 如果感觉这样处理效率不高 MINA,NETTY 都对粘包的都处理。
End
之前数据掉了,无意间发现还有个存根,2015-06-25 23:15:03 星期四 重新整理
转载请注明作者和出处,并添加本页链接。
原文链接:
//xiaochun.zrlog.com/nio-deal-data-stick-package.html