MINA 学习记录(三)

/

Mina 使用 ProtocolCodecFactory,CumulativeProtocolDecoder 处理TCP数据粘包

关于数据粘包的处理可以查看这篇文章 https://xiaochun.zrlog.com/nio-deal-data-stick-package.html

本文将讲述使用MINA 处理TCP数据粘包的处理。
处理数据粘包的核心思想就是然让对端知道这一次发送了那些数据(数据的长度),对于原生NIO的处理方式可以看上面的这篇文章。

再来看看这张图,MINA 一般情况是不需要关心 IoService,IoProcessor 所以处理数据粘包的就放在 IoFilter 里面就好了。

建立与通信格式相同Bean 比如通信格式为 5E+ 4位的数据长度 (数据域的大小)+ 数据

import java.io.Serializable;

public class WebDataPacket implements Serializable{

	/**
	 *
	 */
	private static final long serialVersionUID = -7088876817150682353L;
	private static final int start=94;
	public static int getStart() {
		return start;
	}
	private int dataLength;
	private String data;
	
	public int getDataLength() {
		return dataLength;
	}
	public void setDataLength(int dataLength) {
		this.dataLength = dataLength;
	}
	public String getData() {
		return data;
	}
	public void setData(String data) {
		this.data = data;
	}
}

ProtocolCodecFactory创建一个静态的编码解码工厂

import java.nio.charset.Charset;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFactory;
import org.apache.mina.filter.codec.ProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolEncoder;

public class WebPakcetCodecFactory implements ProtocolCodecFactory{

	private final WebPacketEncoder encoder;
	private final WebPacketDecoder decoder;

	public WebPakcetCodecFactory(){
		this(Charset.defaultCharset());
	}

	public WebPakcetCodecFactory(Charset charset){
		this.encoder = new WebPacketEncoder();
		this.decoder = new WebPacketDecoder();
	}

	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception{
		return decoder;
	}

	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception{
		return encoder;
	}

}

核心的编码解码(接受数据) 将字节流封装为我们需要的Bean doDecode方法会根据返回的值来确定是否需要继续调用doDecode 当为false的时候会停止调用(直到下一次数据到来),当为true会马上进行解码数据(通常为一个数据包读取完毕时返回)。

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import com.fzb.rc.util.HexaConversionUtil;

public class WebPacketDecoder extends CumulativeProtocolDecoder {

	@Override
	protected boolean doDecode(IoSession session, IoBuffer buffer, ProtocolDecoderOutput out) {
		int remainLen = buffer.remaining();
		// 当buffer长度没有足够到包头+包长度的继续等待数据到来
		if (remainLen < 5) {
			return false;
		}
		if (remainLen > 1) {
			buffer.order(ByteOrder.LITTLE_ENDIAN);
			byte[] ddStart = new byte[1];
			buffer.get(ddStart);
			byte dStart = ddStart[0];
			// 对数据包头的检查。
			if (dStart != WebDataPacket.getStart()) {
				return false;
			}
			byte[] dataLengthByte = new byte[4];
			buffer.get(dataLengthByte);
			int dataLength = HexaConversionUtil.byteArrayToIntL(dataLengthByte);
			buffer.mark();
			// 数据没有完全收到,需要在等一等
			if (remainLen - 5 < dataLength) {
				buffer.reset();
				return false;
			}
			
			byte[] bData = new byte[dataLength];
			buffer.get(bData);
			WebDataPacket wdp = new WebDataPacket();
			wdp.setDataLength(bData.length);
			wdp.setData(new String(bData));
			
			out.write(wdp);
			return true;
		}
		return false;
	}
}

编码工作(将Bean按照数据格式依次写(发)出去)

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
import org.apache.mina.filter.codec.ProtocolEncoderOutput;

public class WebPacketEncoder extends ProtocolEncoderAdapter{

	@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception{
		
		WebDataPacket wdp = (WebDataPacket)message;
		IoBuffer buffer = IoBuffer.allocate(wdp.getData().getBytes().length+5).setAutoExpand(true);
		buffer.put((byte)wdp.getStart());
		buffer.putInt(wdp.getDataLength());
		buffer.put(wdp.getData().getBytes());
		if(message!=null){
			buffer.flip();
			out.write(buffer);
		}
		
	}
}

然后添加到IoFilter 中就可以了

IoFilter filter = new ProtocolCodecFilter(new WebPakcetCodecFactory());
acceptor.getFilterChain().addLast("ioFitler", filter);

这样就完成了MINA 对数据粘包的处理了。

转载请注明作者和出处,并添加本页链接。
原文链接: //xiaochun.zrlog.com/238.html