MINA 学习记录(三)
Mina 使用 ProtocolCodecFactory,CumulativeProtocolDecoder 处理TCP数据粘包
关于数据粘包的处理可以查看这篇文章 http://blog.94fzb.com/post/nio-deal-data-stick-package
本文将讲述使用MINA 处理TCP数据粘包的处理。
处理数据粘包的核心思想就是然让对端知道这一次发送了那些数据(数据的长度),对于原生NIO的处理方式可以看上面的这篇文章。
再来看看这张图,MINA 一般情况是不需要关心 IoService,IoProcessor 所以处理数据粘包的就放在 IoFilter 里面就好了。
建立与通信格式相同Bean 比如通信格式为 5E+ 4位的数据长度 (数据域的大小)+ 数据
import java.io.Serializable;
public class WebDataPacket implements Serializable{
/**
*
*/
private static final long serialVersionUID = -7088876817150682353L;
private static final int start=94;
public static int getStart() {
return start;
}
private int dataLength;
private String data;
public int getDataLength() {
return dataLength;
}
public void setDataLength(int dataLength) {
this.dataLength = dataLength;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
ProtocolCodecFactory
创建一个静态的编码解码工厂
import java.nio.charset.Charset;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;
public class WebPakcetCodecFactory implements ProtocolCodecFactory{
private final WebPacketEncoder encoder;
private final WebPacketDecoder decoder;
public WebPakcetCodecFactory(){
this(Charset.defaultCharset());
}
public WebPakcetCodecFactory(Charset charset){
this.encoder = new WebPacketEncoder();
this.decoder = new WebPacketDecoder();
}
@Override
public ProtocolDecoder getDecoder(IoSession session) throws Exception{
return decoder;
}
@Override
public ProtocolEncoder getEncoder(IoSession session) throws Exception{
return encoder;
}
}
核心的编码解码(接受数据) 将字节流封装为我们需要的Bean doDecode方法会根据返回的值来确定是否需要继续调用doDecode 当为false的时候会停止调用(直到下一次数据到来),当为true会马上进行解码数据(通常为一个数据包读取完毕时返回)。
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import com.fzb.rc.util.HexaConversionUtil;
public class WebPacketDecoder extends CumulativeProtocolDecoder {
@Override
protected boolean doDecode(IoSession session, IoBuffer buffer, ProtocolDecoderOutput out) {
int remainLen = buffer.remaining();
// 当buffer长度没有足够到包头+包长度的继续等待数据到来
if (remainLen < 5) {
return false;
}
if (remainLen > 1) {
buffer.order(ByteOrder.LITTLE_ENDIAN);
byte[] ddStart = new byte[1];
buffer.get(ddStart);
byte dStart = ddStart[0];
// 对数据包头的检查。
if (dStart != WebDataPacket.getStart()) {
return false;
}
byte[] dataLengthByte = new byte[4];
buffer.get(dataLengthByte);
int dataLength = HexaConversionUtil.byteArrayToIntL(dataLengthByte);
buffer.mark();
// 数据没有完全收到,需要在等一等
if (remainLen - 5 < dataLength) {
buffer.reset();
return false;
}
byte[] bData = new byte[dataLength];
buffer.get(bData);
WebDataPacket wdp = new WebDataPacket();
wdp.setDataLength(bData.length);
wdp.setData(new String(bData));
out.write(wdp);
return true;
}
return false;
}
}
编码工作(将Bean按照数据格式依次写(发)出去)
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;
public class WebPacketEncoder extends ProtocolEncoderAdapter{
@Override
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception{
WebDataPacket wdp = (WebDataPacket)message;
IoBuffer buffer = IoBuffer.allocate(wdp.getData().getBytes().length+5).setAutoExpand(true);
buffer.put((byte)wdp.getStart());
buffer.putInt(wdp.getDataLength());
buffer.put(wdp.getData().getBytes());
if(message!=null){
buffer.flip();
out.write(buffer);
}
}
}
然后添加到IoFilter 中就可以了
IoFilter filter = new ProtocolCodecFilter(new WebPakcetCodecFactory());
acceptor.getFilterChain().addLast("ioFitler", filter);
这样就完成了MINA 对数据粘包的处理了。
转载请注明作者和出处,并添加本页链接。
原文链接:
//xiaochun.zrlog.com/238.html