利用NIO,ByteBuffer处理TCP长连接数据粘包的问题

Category:

/

Date:

问题:TCP 长连接会发生数据粘包

原因:IP层对数据进行了分包发送(就是说这个与发送端无关,接收端必须处理)

TCP长连接发送数据是以数据流的方式传输。当然也并不是将整个数据一次发送。当然程序里面就是write一次就足够了。然而这个会被IP层进行分包发送,这样意味着数据并不是一次能接受完的。如果数据量不大(1.3kb~1.4kb)就有可能一次收完,能否一次接受完毕主要看当时网络的MTU值。如果一次不能读完这条数据,就尝试去读取下一条数据,这样读到的数据就会错乱。比如

发送端发送了 112233 ,332211 2条数据,而接受端可能读到的数据会是 1122,33332211 这样我们的程序就会无法处理这样的消息,这就是所谓的数据粘包

如何处理这样的问题

  • 1.改用短连接,短连接不存在一个连接中多次调用write 就意味可以通过

inputStream.read(new byte[1024])==-1的方式来完整的读取整个数据。这个明显不是处理问题的方式。

  • 2.处理数据数据粘包,既然要处理那么就得告诉接受方这条数据是咋个样的。
    • 1.设置包结束标识符,读到特定的标识符意味则这是一条完整的数据
    • 2.定长的方式,先告诉这个包多长。接受方收到这么多数据就以为则数据完整(主要针对字符流,个人比较喜欢这种)

原生Socket处理方式(不通过)

通过观察原生的 Socket 并没有提供的直接处理的方法,于是这个就得自己程序实现了。既然是读取流那我来看看 socket.getInputStream() 这里的数据倒是有 in.available(); (返回当前有多少数据) 这个方法。 发现也不能处理这个问题。多数是说是网络传输的原因导致这个方法的返回值不准确

NIO 终极处理方式

由于文章主要讲数据的粘包的处理,就不介绍NIO的Selector的机制了,直接上Buffer

考虑到了缓存流 ByteBuffer

ByteBuffer 能做哪些事呢

简单的说下 ByteBuffer 大慨是,使用的候先分配多大的大小,使用put的方式进行填充数据。当ByteBuffer 满的情况时候可以通过hasRemaining()buffer.remaining()==0 查看。通过这种方法我们就可以得到一个准确的结果
还有其他的Buffer,如CharBuffer

  1. ByteBuffer buffer=ByteBuffer.allocate(1024);
  2. buffer.put(new byte[512]);
  3. System.out.println("hasRemaining "+ buffer.hasRemaining());
  4. buffer.put(new byte[512]);
  5. System.out.println("hasRemaining "+ buffer.hasRemaining());

打印

  1. hasRemaining true
  2. hasRemaining false

当然有ByteBuffer 我们就可能正确的知道数据是否满了。
说完了原理来看看代码该如何实现

2.简单定义一个通信格式 4 位的数据长度 (数据域的大小)+ 数据

  1. import java.net.InetSocketAddress;
  2. import java.nio.ByteBuffer;
  3. import java.nio.channels.SelectionKey;
  4. import java.nio.channels.Selector;
  5. import java.nio.channels.SocketChannel;
  6. import java.util.Iterator;
  7. import java.util.Set;
  8. public class NIOClient {
  9. public static void main(String[] args) {
  10. InetSocketAddress SERVER_ADDRESS = new InetSocketAddress("192.168.2.200", 8811);
  11. String sendSome="7777777777777";
  12. int count=-1;
  13. try{
  14. // 打开socket通道
  15. SocketChannel socketChannel = SocketChannel.open();
  16. // 设置为非阻塞方式
  17. socketChannel.configureBlocking(false);
  18. // 打开选择器
  19. Selector selector = Selector.open();
  20. // 注册连接服务端socket动作
  21. socketChannel.register(selector, SelectionKey.OP_CONNECT);
  22. // 连接
  23. socketChannel.connect(SERVER_ADDRESS);
  24. // 分配缓冲区大小内存
  25. SetselectionKeys;
  26. Iteratoriterator;
  27. SelectionKey selectionKey;
  28. SocketChannel client;
  29. ByteBuffer input = ByteBuffer.allocate(1024*100);
  30. ByteBuffer header=ByteBuffer.allocate(4);
  31. ByteBuffer sendbuffer = ByteBuffer.allocate(sendSome.getBytes().length);
  32. while (true) {
  33. //选择一组键,其相应的通道已为 I/O 操作准备就绪。
  34. //此方法执行处于阻塞模式的选择操作。
  35. selector.select();
  36. //返回此选择器的已选择键集。
  37. selectionKeys = selector.selectedKeys();
  38. //System.out.println(selectionKeys.size());
  39. iterator = selectionKeys.iterator();
  40. while (iterator.hasNext()) {
  41. selectionKey = iterator.next();
  42. client = (SocketChannel) selectionKey.channel();
  43. if (selectionKey.isConnectable()) {
  44. // 判断此通道上是否正在进行连接操作。
  45. // 完成套接字通道的连接过程。
  46. if (client.isConnectionPending()) {
  47. client.finishConnect();
  48. System.out.println("connect finished");
  49. sendbuffer.clear();
  50. sendbuffer.put((sendSome).getBytes());
  51. //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
  52. sendbuffer.flip();
  53. client.write(sendbuffer);
  54. }
  55. client.register(selector, SelectionKey.OP_READ);
  56. } else if (selectionKey.isReadable()) {
  57. if(count==-1){
  58. client.read(header);
  59. //并且缓存区的长度大于4(包头部分已经接受完毕)
  60. if(!header.hasRemaining()){
  61. count=byteArrayToInt(header.array());
  62. System.out.println("dataSize ......"+count);
  63. header.clear();
  64. input = ByteBuffer.allocate(count);
  65. client.register(selector, SelectionKey.OP_READ);
  66. }
  67. }
  68. else{
  69. System.out.println("wait......"+input.remaining());
  70. // 尝试读取数据区域
  71. client.read(input);
  72. //input.mark();
  73. if(!input.hasRemaining()){
  74. // 这个时候可以解析数据
  75. System.out.println("data full");
  76. input.clear();
  77. client.register(selector, SelectionKey.OP_WRITE);
  78. }
  79. else{
  80. // 数据还没有填充满,继续接受数据
  81. client.register(selector, SelectionKey.OP_READ);
  82. }
  83. }
  84. }
  85. else if(selectionKey.isWritable()){
  86. client = (SocketChannel) selectionKey.channel();
  87. count=-1;
  88. sendbuffer.clear();
  89. sendbuffer.put(sendSome.getBytes());
  90. sendbuffer.flip();
  91. client.write(sendbuffer);
  92. System.out.println("send Message "+sendSome);
  93. client.register(selector, SelectionKey.OP_READ);
  94. }
  95. }
  96. selectionKeys.clear();
  97. }
  98. }
  99. catch(Exception e){
  100. e.printStackTrace();
  101. }
  102. }
  103. public static int byteArrayToInt(byte[] b) {
  104. return b[3] & 0xFF | (b[2] & 0xFF) << 8 | (b[1] & 0xFF) << 16
  105. | (b[0] & 0xFF) << 24;
  106. }
  107. }

大致是这样了。
对于通过特定标识符的方式也类似。 比如以#### 那么程序只需每次 read 后判断是否有这个标识,存在认为是一个包。区别在于需要自己组装

程序读到这样的2个包111#####111,111####实际需要程序处理为 111,111111

PS: 如果感觉这样处理效率不高 MINA,NETTY 都对粘包的都处理。

End

之前数据掉了,无意间发现还有个存根,2015-06-25 23:15:03 星期四 重新整理

最后更新:2015-02-06 07:17:46.0

—— 原创内容,转载请注明:[ 文章转载自:小春 ' s blog   / ] ——