本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
编程思想:如何设计一个好的通信网络协议
追忆似水年华 · 437浏览 · 发布于2020-03-30 +关注

阅读目录

  • 网络协议的设计

  • RocketMQ 通信网络协议的实现

  • 小结

当网络中两个进程需要通信时,我们往往会使用 Socket 来实现。Socket 都不陌生。当三次握手成功后,客户端与服务端就能通信,并且,彼此之间通信的数据包格式都是二进制,由TCP/IP 协议负责传输。

当客户端和服务端取得了二进制数据包后,我们往往需要『萃取』出想要的数据,这样才能更好的执行业务逻辑。所以,我们需要定义好数据结构来描述这些二进制数据的格式,这就是通信网络协议。简单讲,就是需要约定好二进制数据包中每一段字节的含义,比如从第 n 字节开始的 m 长度是核心数据,有了这样的约定后,我们就能解码出想要的数据,执行业务逻辑,这样我们就能畅通无阻的通信了。

回到顶部

网络协议的设计

概要划分

一个最基本的网络协议必须包含

  • 数据的长度

  • 数据

了解 TCP 协议的同学一定听说过粘包、拆包 这两个术语。因为TCP协议是数据流协议,它的底层根据二进制缓冲区的实际情况进行包的划分。所以,不可避免的会出现粘包,拆包 现象 。为了解决它们,我们的网络协议往往会使用一个 4 字节的 int 类型来表示数据的大小。比如,Netty 就为我们提供了 LengthFieldBasedFrameDecoder 解码器,它可以有效的使用自定义长度帧来解决上述问题。

同时一个好的网络协议,还会将动作和业务数据分离。试想一下, HTTP 协议的分为请求头,请求体——

  • 请求头:定义了接口地址、Http Method、HTTP 版本

  • 请求体:定义了需要传递的数据

这就是一种分离关注点的思想。所以自定义的网络协议也可以包含:

  • 动作指令:比如定义 code 来分门别类的代表不同的业务逻辑

  • 序列化算法:描述了 JAVA 对象和二进制之间转换的形式,提供多种序列化/反序列化方式。比如 json、protobuf 等等,甚至是自定义算法。比如:rocketmq 等等。

同时,协议的开头可以定义一个约定的魔数。这个固定值(4字节),一般用来判断当前的数据包是否合法。比如,当我们使用 telnet 发送错误的数据包时,很显然,它不合法,会导致解码失败。所以,为了减轻服务器的压力,我们可以取出数据包的前4个字节与固定的魔数对比,如果是非法的格式,直接关闭连接,不继续解码。

网络协议结构如下所示:

+--------------+-----------+------------+-----------+----------+
| 魔数(4)       | code(1)   |序列化算法(1) |数据长度(4) |数据(n)   |
+--------------+-----------+------------+-----------+----------+

回到顶部

RocketMQ 通信网络协议的实现

RocketMQ 网络协议

这一小节,我们从RocketMQ 中,分析优秀通信网络协议的实现。RocketMQ 项目中,客户端和服务端的通信是基于 Netty 之上构建的。同时,为了更加有效的通信,往往需要对发送的消息自定义网络协议。

RocketMQ 的网络协议,从数据分类的角度上看,可分为两大类

  • 消息头数据(Header Data)

  • 消息体数据(Body Data)

从左到右

  • 第一段:4 个字节整数,等于2、3、4 长度总和

  • 第二段:4 个字节整数,等于3 的长度。特别的 byte[0] 代表序列化算法,byte[1~3]才是真正的长度

  • 第三段:代表消息头数据,结构如下

{    "code":0,    "language":"JAVA",    "version":0,    "opaque":0,    "flag":1,    "remark":"hello, I am respponse /127.0.0.1:27603",    "extFields":{        "count":"0",        "messageTitle":"HelloMessageTitle"
    }
}
  • 第四段:代表消息体数据

RocketMQ 消息头协议详细如下:

Header 字段名类型RequestResponse
code整数请求操作代码,请求接收方根据不同的代码做不同的操作应答结果代码,0表示成功,非0表示各种错误代码
language字符串请求发起方实现语言,默认JAVA应答接收方实现语言
version整数请求发起方程序版本应答接收方程序版本
opaque整数请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用应答方不做修改,直接返回
flag整数通信层的标志位通信层的标志位
remark字符串传输自定义文本信息错误详细描述信息
extFieldsHashMap<String,String>请求自定义字段应答自定义字段

编码过程

RocketMQ 的通信模块是基于 Netty的。通过定义 NettyEncoder 来实现对每一个 Channel的 出栈数据进行编码,如下所示:

@ChannelHandler.Sharablepublic class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {    @Override
    public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
        throws Exception {        try {
            ByteBuffer header = remotingCommand.encodeHeader();
            out.writeBytes(header);            byte[] body = remotingCommand.getBody();            if (body != null) {
                out.writeBytes(body);
            }
        } catch (Exception e) {
           ...
        }
    }
}

其中,核心的编码过程位于 RemotingCommand 对象中,encodeHeader 阶段,需要统计出消息总长度,即:

  • 定义消息头长度,一个整数表示:占4个字节

  • 定义消息头数据,并计算其长度

  • 定义消息体数据,并计算其长度

  • 额外再加 4是因为需要加入消息总长度,一个整数表示:占4个字节

public ByteBuffer encodeHeader(final int bodyLength) {    // 1> 消息头长度,一个整数表示:占4个字节
    int length = 4;    // 2> 消息头数据
    byte[] headerData;
    headerData = this.headerEncode();    // 再加消息头数据长度
    length += headerData.length;    // 3> 再加消息体数据长度
    length += bodyLength;    // 4> 额外加 4是因为需要加入消息总长度,一个整数表示:占4个字节
    ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);    // 5> 将消息总长度加入 ByteBuffer
    result.putInt(length);    // 6> 将消息的头长度加入 ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));    // 7> 将消息头数据加入 ByteBuffer
    result.put(headerData);

    result.flip();    return result;
}

其中,encode 阶段会将 CommandCustomHeader 数据转换 HashMap<String,String>,方便序列化

public void makeCustomHeaderToNet() {    if (this.customHeader != null) {
        Field[] fields = getClazzFields(customHeader.getClass());        if (null == this.extFields) {            this.extFields = new HashMap<String, String>();
        }        for (Field field : fields) {            if (!Modifier.isStatic(field.getModifiers())) {
                String name = field.getName();                if (!name.startsWith("this")) {
                    Object value = null;                    try {
                        field.setAccessible(true);
                        value = field.get(this.customHeader);
                    } catch (Exception e) {
                        log.error("Failed to access field [{}]", name, e);
                    }                    if (value != null) {                        this.extFields.put(name, value.toString());
                    }
                }
            }
        }
    }
}

特别的,消息头序列化支持两种算法:

  • JSON

  • RocketMQ

private byte[] headerEncode() {    this.makeCustomHeaderToNet();    if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {        return RocketMQSerializable.rocketMQProtocolEncode(this);
    } else {        return RemotingSerializable.encode(this);
    }
}

这儿需要值得注意的是,encode阶段将当前 RPC 类型和 headerData长度编码到一个 byte[4] 数组中,byte[0] 位序列化类型。

public static byte[] markProtocolType(int source, SerializeType type) {    byte[] result = new byte[4];

    result[0] = type.getCode();
    result[1] = (byte) ((source >> 16) & 0xFF);
    result[2] = (byte) ((source >> 8) & 0xFF);
    result[3] = (byte) (source & 0xFF);    return result;
}

其中,通过与运算 & 0xFF 取低八位数据。

所以, 最终 length 长度等于序列化类型 + header length + header data + body data 的字节的长度。

解码过程

RocketMQ 解码通过NettyDecoder来实现,它继承自 LengthFieldBasedFrameDecoder,其中调用了父类LengthFieldBasedFrameDecoder的构造函数

super(FRAME_MAX_LENGTH, 0, 4, 0, 4);

这些参数设置4个字节代表 length总长度,同时解码时跳过最开始的4个字节:

frame = (ByteBuf) super.decode(ctx, in);

所以,得到的 frame= 序列化类型 + header length + header data + body data 。解码如下所示:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {    //总长度
    int length = byteBuffer.limit();    //原始的 header length,4位
    int oriHeaderLen = byteBuffer.getInt();    //真正的 header data 长度。忽略 byte[0]的 serializeType
    int headerLength = getHeaderLength(oriHeaderLen);    byte[] headerData = new byte[headerLength];
    byteBuffer.get(headerData);

    RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));    int bodyLength = length - 4 - headerLength;    byte[] bodyData = null;    if (bodyLength > 0) {
        bodyData = new byte[bodyLength];
        byteBuffer.get(bodyData);
    }
    cmd.body = bodyData;    return cmd;
}private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {    switch (type) {        case JSON:
            RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
            resultJson.setSerializeTypeCurrentRPC(type);            return resultJson;        case ROCKETMQ:
            RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
            resultRMQ.setSerializeTypeCurrentRPC(type);            return resultRMQ;        default:            break;
    }    return null;
}

其中,getProtocolType,右移 24位,拿到 serializeType:

public static SerializeType getProtocolType(int source) {    return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}

getHeaderLength 拿到 0-24 位代表的 headerData length:

public static int getHeaderLength(int length) {    return length & 0xFFFFFF;
}

回到顶部

小结

对于诸多中间件而言,底层的网络通信模块往往会使用 Netty。Netty 提供了诸多的编解码器,可以快速方便的上手。本文从如何设计一个网络协议入手,最终切入到 RocketMQ 底层网络协议的实现。可以看到,它并不复杂。仔细研读几遍变能理解其奥义。具体参考类NettyEncoder、NettyDecoder、RemotingCommand。

相关推荐

Git代码防丢指南

· 734浏览 · 2019-06-06 11:39:44
页面防重提交技术之令牌机制

iamitnan · 1305浏览 · 2019-06-13 10:22:29
8款主流Scrum敏捷开发工具评测

吴振华 · 2069浏览 · 2019-07-12 11:24:29
轻松解决github图片不显示的问题

manongba · 2693浏览 · 2020-02-17 11:20:05
加载中

0评论

评论
分类专栏
小鸟云服务器
扫码进入手机网页