Socket.IO 协议分析及 Server端最小化实现(Netty)

  • lovebing 
  • 未分类

一、简介

Socket.IO是一个基于事件的实时Web双向通信框架 Azithromycin,兼容所有平台和浏览器。根据浏览器的特性,客户端可以选择不同的方式(Websocket、Ajax长轮询、Jsonp长轮询)连接服务端。

Socket.IO 由两部分组成:

  1. 用 Node.js 实现的服务端。
  2. 在浏览器端运行的 Javascript 客户端。

Socket.IO 主要特性:

  1. 可靠
  2. 自动重连
  3. 掉线检测
  4. 二进制消息
  5. 多路复用
  6. 房间:可实现消息群发

更多详情的信息可以访问 https://socket.io/docs

二、背景

使用 Netty 实现 Socket.IO server,已经有一个成熟的开源项目 netty-socketio

但大多数场景,并不需要用到那么多功能,很多时候只需要实现服务端向客户发送即时消息的功能,如邮箱通知、日程提醒等。

经过笔者的深度使用,发生 Netty-SocketIO 存在以下三个问题:

  1. 当服务端重启时,有时候会发生端口被占用的情况,通过 netstat 查看,发现端口处理于 FIND_WAIT1 的状态,需要过一段时间才释放。(原因未知,Netty-SocketIO 把 SO_REUSEADDR 默认设为 false,将其设为 true 可避免 BindException)
  2. 根据消息的类型决定发送普通消息还是二进制消息,如果是 byte[] 类型,就发送二进制消息,否则发送文档消息;不够灵活。
  3. 发送消息时对消息内容进行不必要的 json 序列化操作。例如从消息队列消费时,取得消息内容的 byte[],如果要发送文本消息,需要先把 byte[] 转换成 bean,再进行 json 序列化,然后再写到 ByteBuf,对性能一有定的影响。

基于『服务端向客户端推送消息』的简单需求,和 Netty-SocketIO 的一些缺陷。通过对 Netty-SocketIO 和 Socket.IO-client-java 的调试和观察,本文将对 Socket.IO 的一些逻辑进行分析,并把重要的逻辑定位到原版的 socket.io 项目上, 然后使用 Netty 实现一个最小化的 Socket.IO Server。

由于不在需求范围内,最化小Socket.IO Server不支持以下特性:

  1. Namespace
  2. Room
  3. Ack

三、客户端主要的连接方式:

  1. Websocket:使用 Websocket 连接,针对主流浏览器(Chrome、Firefox、Opera、Safari、Edge、IE 10+)
  2. Polling: 使用 Ajax 长轮询,针对老的浏览器(IE 8、IE 9)
  3. Upgrade: 先发送一个 Ajax Polling 请求,然后升级到 Websocket 连接

四、Socket.IO 协议(数据格式)

Socket.IO 每个交互的消息称为 packet,packet 有两种类型:

  1. 不带附件的 packet,为文本消息。
  2. 带附件的 packet,分为两个名多个部分,每一部分为文本消息,其余的为附件的二进制消息。使用 Websocket 时,会发送两个或多个帧 (frame)。

交互过程:

  1. client 发起连接。
  2. server 回复类型为 open 的消息:0{“sid”: “xxx”, upgrades: [“websocket”], “pingInterval”: 10000, “pingTimeout”: 10000}
  3. server 发送连接消息:40
  4. server 发送没有附件的消息:42[“event name”, “xx”, ..]
  5. server 发送有附件的消息:451-[“event name”, “text message”, {“_placehoder”: true, “num”: 0}]
  6. client 发送 ping: 2 (如果是 upgrade,发送 2probe)
  7. server 回复 pong: 3 (如果是 upgrade,发送 3probe)
  8. client 关闭连接,发送 41 和 1

4.1 packet 格式:

<Engin.IO packet type> [<Socket.IO packet type>] [<attachment size>-] [<data>]

4.1.1 Engine.IO packet type

types = {
    open:     0    // non-ws
  , close:    1    // non-ws
  , ping:     2
  , pong:     3
  , message:  4
  , upgrade:  5
  , noop:     6
}

4.1.2 Socket.IO packet type (可选)

types = [
  'CONNECT', // 0
  'DISCONNECT', // 1
  'EVENT', // 2
  'ACK', // 3
  'ERROR', // 4
  'BINARY_EVENT', // 5
  'BINARY_ACK' // 6
]

4.1.3 attachment size (可选)

用于带附件的文本消息,见 function encodeAsString(obj)

4.1.4 data (可选)

data 主要有两种:连接成功后下发的配置、事件消息,均为 json 的形式。

配置 data 是一个 map 结构

{"sid": "36 位 uuid", upgrades: ["websocket"], "pingInterval": 120000, "pingTimeout", 120000}

事件 data 是一个 array 结构:

["event name", "msg1", ...]

第一个元素为event name,可以发送一个或多个 message。

如果要发送二进制数据,需要在相应的位置上使用 {“_placeholder”: true, “num”: 0} 替换要发送的二进制消息(即附件,attachment),num 为二进制消息的序号,从0开始。(详情可参考 function _deconstructPacket(data, buffers)

二进制消息的格式为:

<attachment size>-["event name", "text message", {"_placeholder": true, "num": 0}]
Binary Message

可以同时发文本消息和附件(一个或多个),也可以只发附件

4.2 packet encode (加前缀 )

某些情况下,packet 需要 encode(加前缀)。有两种情况:

  1. data 需要 encode(Websocket 和 Polling):取相应的 Engine.IO packet type 的 byte 作为前缀。详见 encodeBuffer(packet, supportsBinary, callback)
  2. 整个 packet 需要 encode(Polling):engine.io packet type 加前缀。

格式为:

<0/1><packet 长度><0xff>

第1位声明格式:0 为 string,1 为 binary,最后一位固定为 0xff,表示分隔。假如文本消息的 packet 的长度为 114 个字节,那么前缀为:

0x00 0x01 0x01 0x04 0xff

packet 的 encode 详见 function encodeOneBinaryPacket(p, doneCallback)

4.2.1 Websocket 消息前缀

二进制消息的 attachment 需要 encode,前缀为0x04。

4.2.2 长轮询消息前缀

所有消息都需要在engine.io packet type 加前缀。对二进制消息,在 Websocket 消息格式的基础上,attachment 添加额外的前缀。

4.3 主要的消息示例:

4.3.1 Transport 为 Websocket 的消息


Websocket连接

4.3.2 Polling 升级为 Websocket 的消息


连接


连接成功


升级为 Websocket 后

4.3.3 Transport 为 Polling 的消息

连接过程跟 Upgrade方式的 Polling 部分一样


文本消息


二进制消息


客户端发送断开

五、Netty 实现

5.1 相关的类

  1. SocketIoServer,server初始化、启动和停止。
  2. SocketIoRequest 解析来自客户端的 Socket.IO http 请求,对 Websocket 和 Polling 进行区分。
  3. SocketIoMessage,向客户端发送消息。
  4. MessageType,定义了三种:AUTO(byte[]类型作为附件发送),BIN(所有数据类型作为附件的发送),TEXT(所有类型作为文本发送),这个跟 Netty-SocketIO 的逻辑有所不同。TextWebSocketFrame 跟 BinaryWebSocketFrame 的区别,仅仅 opcode 不同,所以发送消息时,可以指定发文本或二进制比较好。
  5. Transport,包括 WEBSOCKET 和 POLLING。
  6. Packet,消息体。
  7. EncoderHandler,处理 SocketIoMessage:transport=WEBSOCKET 时,转为 TextWebSocketFrame 或 BinaryWebSocketFrame;transport=POLLING,转为 FullHttpResponse。

5.2 几个核心类的实现

以下为几个核心类的具体实现,其余的比较简单,就不贴出来了。

SocketIoServer

public class SocketIoServer {

    private ChannelFuture channelFuture;

    private final EventLoopGroup bossGroup;
    private final EventLoopGroup workerGroup;
    private final ServerBootstrap server;
    private final int port;

    public SocketIoServer(int port, SocketIoServerInitializer socketIoServerInitializer) {
        this.port = port;
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(socketIoServerInitializer);
    }

    public void start() {
        try {
            channelFuture = server.bind(port).sync();
        } catch (InterruptedException e) {
            stop();
        }
    }
    
    public void stop() {
        channelFuture.channel().closeFuture();
        workerGroup.shutdownGracefully();
        bossGroup.shutdownGracefully();
    }
}

SocketIoServerInitializer

public class SocketIoServerInitializer extends ChannelInitializer<SocketChannel> {

    private final Logger log = LoggerFactory.getLogger(SocketIoServerInitializer.class);

    private final SocketIoServerProtocolHandler socketIoServerProtocolHandler;
    private final AuthHandler authHandler;
    private final PollingHandler pollingHandler;
    private final WebSocketHandler webSocketHandler;
    private final NonWebsocketHandler nonWebsocketHandler;
    private final EncoderHandler encoderHandler;

    @Value("${server.ssl.key-store}")
    private String keyStore;
    @Value("${server.ssl.key-store-password}")
    private String keyStorePassword;

    public SocketIoServerInitializer(SocketIoServerProtocolHandler socketIoServerProtocolHandler,
                                     AuthHandler authHandler,
                                     WebSocketHandler webSocketHandler,
                                     PollingHandler pollingHandler,
                                     NonWebsocketHandler nonWebsocketHandler,
                                     EncoderHandler encoderHandler) {
        this.socketIoServerProtocolHandler = socketIoServerProtocolHandler;
        this.authHandler = authHandler;
        this.webSocketHandler = webSocketHandler;
        this.pollingHandler = pollingHandler;
        this.nonWebsocketHandler = nonWebsocketHandler;
        this.encoderHandler = encoderHandler;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("SslHandler", CustomSslHandler.createHandler(keyStore, keyStorePassword));
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());
        pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(65536));
        pipeline.addLast("SocketIoServerProtocolHandler", socketIoServerProtocolHandler);
        // 根据 cookie 鉴权
        pipeline.addLast("AuthHandler", authHandler);
        // 处理 polling 请求
        pipeline.addLast("PollingHandler", pollingHandler);
        // websocket 支持
        pipeline.addLast("WebSocketServerProtocolHandler", new WebSocketServerProtocolHandler(Constants.WEBSOCKET_PATH));
        // 处理 websocket 请求
        pipeline.addLast("TextWebSocketHandler", webSocketHandler);
        // 其它请求
        pipeline.addLast("NonWebsocketHandler", nonWebsocketHandler);
        // 根据 transport 发送 websocket 或 http 消息
        pipeline.addLast("EncoderHandler", encoderHandler);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        log.warn(cause.getMessage());
    }
}

SocketIoServerProtocolHandler

Netty Websocket 不支持处理带 query 参数的 uri,需要做一下特殊的处理

public class SocketIoServerProtocolHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            FullHttpRequest request = (FullHttpRequest) msg;
            if (request.uri().startsWith(Constants.WEBSOCKET_PATH)) {
                handleSocketIoRequest(ctx, request);
                return;
            }
        }
        super.channelRead(ctx, msg);
    }
    
    private void handleSocketIoRequest(ChannelHandlerContext ctx, FullHttpRequest httpRequest) {
        SocketIoRequest socketIoRequest = new SocketIoRequest(httpRequest);
        if (httpRequest.uri().contains(Transport.POLLING.value())) {
            socketIoRequest.setTransport(Transport.POLLING);
        } else {
            socketIoRequest.setTransport(Transport.WEBSOCKET);
            socketIoRequest.setUri(Constants.WEBSOCKET_PATH);
        }
        socketIoRequest.setCookie(httpRequest.headers().get(HttpHeaders.COOKIE));
        ctx.fireChannelRead(socketIoRequest);
    }
}

EncoderHandler

public class EncoderHandler extends ChannelOutboundHandlerAdapter {

    private final ObjectMapper objectMapper = new ObjectMapper();

    private static final String QUOTES = "\"";
    private static final String LEFT_BRACKET = "[";
    private static final String LEFT_BRACE = "{";
    private static final byte[] LEFT_BRACKET_BYTES = LEFT_BRACKET.getBytes();
    private static final byte[] RIGHT_BRACKET_BYTES = "]".getBytes();
    private static final byte[] QUOTES_BYTES = QUOTES.getBytes();
    private static final byte[] COMMA_BYTES = ",".getBytes();
    private static final byte[] CONNECT_LINE_BYTES = "-".getBytes();
    private static final Map<Integer, Object> PLACEHOLDER_BYTES_MAP = new HashMap<>();
    
    private static final String PLACEHOLDER_FORMAT = "{\"_placeholder\":true,\"num\":%d}";
    static {
        PLACEHOLDER_BYTES_MAP.put(0, "{\"_placeholder\":true,\"num\":0}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(1, "{\"_placeholder\":true,\"num\":1}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(2, "{\"_placeholder\":true,\"num\":2}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(3, "{\"_placeholder\":true,\"num\":3}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(4, "{\"_placeholder\":true,\"num\":4}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(5, "{\"_placeholder\":true,\"num\":5}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(6, "{\"_placeholder\":true,\"num\":6}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(7, "{\"_placeholder\":true,\"num\":7}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(8, "{\"_placeholder\":true,\"num\":8}".getBytes());
        PLACEHOLDER_BYTES_MAP.put(9, "{\"_placeholder\":true,\"num\":9}".getBytes());
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof SocketIoMessage) {
            SocketIoMessage message = (SocketIoMessage) msg;
            if (!message.getMessageType().equals(MessageType.TEXT)) {
                updateAttachment(message.getPacket(), message.getMessageType().equals(MessageType.BIN));
            }
            if (message.getTransport().equals(Transport.POLLING)) {
                writeHttp(ctx, message);
            } else {
                writeWebsocket(ctx, message);
            }
        } else {
            super.write(ctx, msg, promise);
        }
    }

    /**
     * 发送 http 消息
     * @param ctx
     * @param message
     */
    private void writeHttp(ChannelHandlerContext ctx, SocketIoMessage message) {
        Packet packet = message.getPacket();
        ByteBuf srcBuf = ctx.alloc().ioBuffer();
        ByteBuf byteBuf = ctx.alloc().ioBuffer();
        try {
            log.info("message.getPacket().getType()={}", message.getPacket().getType());
            writePacketTextParts(srcBuf, message.getPacket());
            byteBuf.writeByte(0x00);
            byteBuf.writeBytes(PacketUtils.numberToBin(srcBuf.readableBytes()));
            byteBuf.writeByte(0xff);
            byteBuf.writeBytes(srcBuf);
            srcBuf.release();

            if (hasAttachments(message.getPacket())) {
                for (int i = 0; i < packet.getAttachments().length; i++) {
                    ByteBuf tmp = ctx.alloc().ioBuffer();
                    writePacketAttachmentItem(tmp, packet.getType(), packet.getAttachments()[i]);
                    byteBuf.writeByte(0x01);
                    byteBuf.writeBytes(PacketUtils.numberToBin(tmp.readableBytes()));
                    byteBuf.writeByte(0xff);
                    byteBuf.writeBytes(tmp);
                    tmp.release();
                }
            }
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
            if (message.getOrigin() != null) {
                response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN, message.getOrigin());
                response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
                response.headers().set(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS, "GET");
                response.headers().set(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
            }
            ctx.writeAndFlush(response);
        } finally {
            log.info("byteBuf refCnt = {}", byteBuf.refCnt());
            log.info("srcBuf refCnt = {}", srcBuf.refCnt());
            ctx.channel().close();
        }
    }


    /**
     * 发送 websocket 消息
     * @param ctx
     * @param socketIoMessage
     */
    private void writeWebsocket(ChannelHandlerContext ctx, SocketIoMessage socketIoMessage) {
        log.debug("writeWebsocket");
        Packet packet = socketIoMessage.getPacket();
        ByteBuf byteBuf = ctx.alloc().ioBuffer();
        writePacketTextParts(byteBuf, packet);
        ctx.write(new TextWebSocketFrame(byteBuf));
        if (hasAttachments(packet)) {
            for (int i = 0; i < packet.getAttachments().length; i++) {
                ByteBuf item = ctx.alloc().ioBuffer();
                writePacketAttachmentItem(item, packet.getType(), packet.getAttachments()[i]);
                ctx.write(new BinaryWebSocketFrame(item));
            }
        }
    }

    /**
     * 文本消息写到 byteBuf
     * @param byteBuf
     * @param packet
     */
    private void writePacketTextParts(ByteBuf byteBuf, Packet packet) {
        byteBuf.writeBytes(PacketUtils.getBytes(packet.getType().value()));
        if (packet.getType().equals(Packet.Type.OPEN)) {
            writePacketDataItem(byteBuf, packet.getData()[0], false);
        } else if (packet.getType().equals(Packet.Type.MESSAGE)) {
            switch (packet.getSubType()) {
                case CONNECT:
                    byteBuf.writeBytes(PacketUtils.getBytes(Packet.SubType.CONNECT.value()));
                    break;
                case DISCONNECT:
                    byteBuf.writeBytes(PacketUtils.getBytes(Packet.SubType.DISCONNECT.value()));
                    break;
                case EVENT:
                    // 没有 attachment 为纯文本消息
                    if (packet.getAttachments() == null || packet.getAttachments().length == 0) {
                        byteBuf.writeBytes(PacketUtils.getBytes(Packet.SubType.EVENT.value()));
                    } else {
                        byteBuf.writeBytes(PacketUtils.getBytes(Packet.SubType.BINARY_EVENT.value()));
                        byteBuf.writeBytes(PacketUtils.getBytes(packet.getAttachments().length));
                        byteBuf.writeBytes(CONNECT_LINE_BYTES);
                    }
                    byteBuf.writeBytes(LEFT_BRACKET_BYTES);
                    byteBuf.writeBytes(QUOTES_BYTES);
                    if (packet.getName() != null) {
                        if (packet.getName() instanceof byte[]) {
                            byteBuf.writeBytes((byte[]) packet.getName());
                        } else if(packet.getName() instanceof String) {
                            byteBuf.writeBytes(((String) packet.getName()).getBytes(Charsets.UTF_8));
                        }
                    }
                    byteBuf.writeBytes(QUOTES_BYTES);
                    byteBuf.writeBytes(COMMA_BYTES);
                    for (int i = 0; i < packet.getData().length; i++) {
                        writePacketDataItem(byteBuf, packet.getData()[i], false);
                        if (i < packet.getData().length - 1) {
                            byteBuf.writeBytes(COMMA_BYTES);
                        }
                    }
                    byteBuf.writeBytes(RIGHT_BRACKET_BYTES);
                default:
                    break;
            }
        }
    }
    
    private void writePacketAttachmentItem(ByteBuf byteBuf, Packet.Type type, Object attachment) {
        byteBuf.writeByte(type.value());
        writePacketDataItem(byteBuf, attachment, false);
    }

    /**
     * 单项的消息写到 byteBuf
     * @param byteBuf
     * @param data
     * @param wrapBytes
     * @return
     */
    private boolean writePacketDataItem(ByteBuf byteBuf, Object data, boolean wrapBytes) {
        int dataCount = 0;
        if (data instanceof Number) {
            byteBuf.writeBytes(data.toString().getBytes(Charsets.UTF_8));
        } else if (data instanceof String) {
            boolean isWrapped = data.toString().startsWith(QUOTES)
                    || data.toString().startsWith(LEFT_BRACKET)
                    || data.toString().startsWith(LEFT_BRACE);
            if (!isWrapped) {
                byteBuf.writeBytes(QUOTES_BYTES);
            }
            byteBuf.writeBytes(data.toString().getBytes(Charsets.UTF_8));
            if (!isWrapped) {
                byteBuf.writeBytes(QUOTES_BYTES);
            }
            dataCount++;
        } else if (data instanceof byte[]) {
            if (wrapBytes) {
                byteBuf.writeBytes(QUOTES_BYTES);
            }
            byteBuf.writeBytes((byte[]) data);
            if (wrapBytes) {
                byteBuf.writeBytes(QUOTES_BYTES);
            }
            dataCount++;
        } else {
            try {
                byteBuf.writeBytes(objectMapper.writeValueAsBytes(data));
                dataCount++;
            } catch (Exception e) {
                log.warn("writePacketTextParts error|{}", e.getMessage());
            }
        }
        return dataCount > 0;
    }

    /**
     * 根据 packet data 的类型和 force 参数 来更新 attachment
     * @param packet
     * @param force force=false:仅 byte[] 类型转换为 attachment;force=true:所有 data 转换为 attachment
     */
    private void updateAttachment(Packet packet, boolean force) {
        if (packet.getSubType() == null || packet.getData() == null) {
            log.debug("updateAttachment|subType or data is null");
            return;
        }
        if (!packet.getType().equals(Packet.Type.MESSAGE) 
                || !packet.getSubType().equals(Packet.SubType.EVENT)) {
            log.debug("updateAttachment|invalid type");
            return;
        }
        List<Object> attachments = new ArrayList<>();
        Object[] data = packet.getData();
        int placeHolderIndex = 0;
        for (int i = 0; i < data.length; i++) {
            Object item = data[i];
            if (force || item instanceof byte[]) {
                attachments.add(item);
                data[i] = getPlaceHolderBytes(placeHolderIndex);
                placeHolderIndex++;
            }
        }
        if (!attachments.isEmpty()) {
            packet.setAttachments(attachments.toArray());
        }
    }
    
    private byte[] getPlaceHolderBytes(int index) {
        if (PLACEHOLDER_BYTES_MAP.containsKey(index)) {
            return (byte[]) PLACEHOLDER_BYTES_MAP.get(index);
        }
        return String.format(PLACEHOLDER_FORMAT, index).getBytes();
    }
    
    private boolean hasAttachments(Packet packet) {
        return packet.getAttachments() != null && packet.getAttachments().length > 0;
    }
}