MINA 学习记录(三)

/

Mina 使用 ProtocolCodecFactory,CumulativeProtocolDecoder 处理TCP数据粘包

关于数据粘包的处理可以查看这篇文章 http://blog.94fzb.com/post/nio-deal-data-stick-package

本文将讲述使用MINA 处理TCP数据粘包的处理。
处理数据粘包的核心思想就是然让对端知道这一次发送了那些数据(数据的长度),对于原生NIO的处理方式可以看上面的这篇文章。

再来看看这张图,MINA 一般情况是不需要关心 IoService,IoProcessor 所以处理数据粘包的就放在 IoFilter 里面就好了。

建立与通信格式相同Bean 比如通信格式为 5E+ 4位的数据长度 (数据域的大小)+ 数据

  1. import java.io.Serializable;
  2. public class WebDataPacket implements Serializable{
  3. /**
  4. *
  5. */
  6. private static final long serialVersionUID = -7088876817150682353L;
  7. private static final int start=94;
  8. public static int getStart() {
  9. return start;
  10. }
  11. private int dataLength;
  12. private String data;
  13. public int getDataLength() {
  14. return dataLength;
  15. }
  16. public void setDataLength(int dataLength) {
  17. this.dataLength = dataLength;
  18. }
  19. public String getData() {
  20. return data;
  21. }
  22. public void setData(String data) {
  23. this.data = data;
  24. }
  25. }

ProtocolCodecFactory创建一个静态的编码解码工厂

  1. import java.nio.charset.Charset;
  2. import org.apache.mina.core.session.IoSession;
  3. import org.apache.mina.filter.codec.ProtocolCodecFactory;
  4. import org.apache.mina.filter.codec.ProtocolDecoder;
  5. import org.apache.mina.filter.codec.ProtocolEncoder;
  6. public class WebPakcetCodecFactory implements ProtocolCodecFactory{
  7. private final WebPacketEncoder encoder;
  8. private final WebPacketDecoder decoder;
  9. public WebPakcetCodecFactory(){
  10. this(Charset.defaultCharset());
  11. }
  12. public WebPakcetCodecFactory(Charset charset){
  13. this.encoder = new WebPacketEncoder();
  14. this.decoder = new WebPacketDecoder();
  15. }
  16. @Override
  17. public ProtocolDecoder getDecoder(IoSession session) throws Exception{
  18. return decoder;
  19. }
  20. @Override
  21. public ProtocolEncoder getEncoder(IoSession session) throws Exception{
  22. return encoder;
  23. }
  24. }

核心的编码解码(接受数据) 将字节流封装为我们需要的Bean doDecode方法会根据返回的值来确定是否需要继续调用doDecode 当为false的时候会停止调用(直到下一次数据到来),当为true会马上进行解码数据(通常为一个数据包读取完毕时返回)。

  1. import org.apache.mina.core.buffer.IoBuffer;
  2. import org.apache.mina.core.session.IoSession;
  3. import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
  4. import org.apache.mina.filter.codec.ProtocolDecoderOutput;
  5. import com.fzb.rc.util.HexaConversionUtil;
  6. public class WebPacketDecoder extends CumulativeProtocolDecoder {
  7. @Override
  8. protected boolean doDecode(IoSession session, IoBuffer buffer, ProtocolDecoderOutput out) {
  9. int remainLen = buffer.remaining();
  10. // 当buffer长度没有足够到包头+包长度的继续等待数据到来
  11. if (remainLen < 5) {
  12. return false;
  13. }
  14. if (remainLen > 1) {
  15. buffer.order(ByteOrder.LITTLE_ENDIAN);
  16. byte[] ddStart = new byte[1];
  17. buffer.get(ddStart);
  18. byte dStart = ddStart[0];
  19. // 对数据包头的检查。
  20. if (dStart != WebDataPacket.getStart()) {
  21. return false;
  22. }
  23. byte[] dataLengthByte = new byte[4];
  24. buffer.get(dataLengthByte);
  25. int dataLength = HexaConversionUtil.byteArrayToIntL(dataLengthByte);
  26. buffer.mark();
  27. // 数据没有完全收到,需要在等一等
  28. if (remainLen - 5 < dataLength) {
  29. buffer.reset();
  30. return false;
  31. }
  32. byte[] bData = new byte[dataLength];
  33. buffer.get(bData);
  34. WebDataPacket wdp = new WebDataPacket();
  35. wdp.setDataLength(bData.length);
  36. wdp.setData(new String(bData));
  37. out.write(wdp);
  38. return true;
  39. }
  40. return false;
  41. }
  42. }

编码工作(将Bean按照数据格式依次写(发)出去)

  1. import org.apache.mina.core.buffer.IoBuffer;
  2. import org.apache.mina.core.session.IoSession;
  3. import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
  4. import org.apache.mina.filter.codec.ProtocolEncoderOutput;
  5. public class WebPacketEncoder extends ProtocolEncoderAdapter{
  6. @Override
  7. public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception{
  8. WebDataPacket wdp = (WebDataPacket)message;
  9. IoBuffer buffer = IoBuffer.allocate(wdp.getData().getBytes().length+5).setAutoExpand(true);
  10. buffer.put((byte)wdp.getStart());
  11. buffer.putInt(wdp.getDataLength());
  12. buffer.put(wdp.getData().getBytes());
  13. if(message!=null){
  14. buffer.flip();
  15. out.write(buffer);
  16. }
  17. }
  18. }

然后添加到IoFilter 中就可以了

  1. IoFilter filter = new ProtocolCodecFilter(new WebPakcetCodecFactory());
  2. acceptor.getFilterChain().addLast("ioFitler", filter);

这样就完成了MINA 对数据粘包的处理了。

转载请注明作者和出处,并添加本页链接。
原文链接: //xiaochun.zrlog.com/238.html