Android WebSocket状态管理框架 - WebSocketGo

阅读本文需要 Integer.MAX_VALUE 分钟。

故事背景

笔者所在的公司主营业务是智能家居,笔者在公司负责的Android端App的开发。关于智能家居,估计现在百分之八九十的童鞋都听过,但真正了解或者使用过的估计就不占多数了。本文不谈行业前景,只谈技术。

为了方便大家更加了解故事的背景,顺便科普一下智能家居,想直奔WebSocket主题的童鞋可以直接第二章节

智能家居,算是物联网的一个典型应用场景。什么是物联网呢,字面意思就是把众多物体连接起来组成一个网络,英文是Internet of things(IoT)。小到一个你的手机跟蓝牙耳机,大到一个城市的各个角落,究极形态就是“万物互联”。(为什么突然想起万佛朝宗 -_-!)

IoT其实不在意网络的协议,也不在意连接的到底是什么东东,就这种形态本身而言,就是IoT。其实这个概念其实很早就有了,在笔者上大学那会,不怕暴露年龄的说是在09到10年左右就听说过物联网这个词。还记得实验课的GPRS智能抄表系统么,你可能没有觉得多么高大上,不就是水表上插个SIM卡,定期把值发到客户端上么,再也不怕被人上门查水表啦~ 没错,这也是IoT的一种体现。

但这么多年,物联网但一直不温不火。至于为什么最近几年又被提出来呢,智能家居在其中扮演了很重要的作用。另外AI这把火也起到了推波助澜的作用,任何产品都要加上智能二字。所以有了AI + IoT,也就是AIOT。具体就不详述了,以后可以再单开一文详细介绍下物联网。

说回智能家居,智能家居是家庭为单位的物联网,简单说就是你可以通过家里中控或者App控制制家里的灯、插座、监控、电器等等。

文字算个球,一图解千愁,上架构图

简单解释一下,网关的作用是负责连接屋内的所有设备,网关和设备之间一般不会采用HTTP(个别单品除外),而会采用比如zigbee、lora等近场通信协议(或者直接用有线方式更稳定),因为功耗低,比较省电,你想家里如果装了几十个开关面板,总归每个月也能节约十几二十块钱的电费吧。当然最重要的原因还是因为方案也比较成熟。所以使用这类设备之前需要有个“组网”的操作,把设备组到网关上。如果断开了,网关就认为设备离线了。

网关会将设备的状态上报给云平台,云平台就将状态下发给客户端了。同样,客户端控制设备就是个反向的过程。当然如果以后5G普及了,设备直接通过5G网络连接云平台,因为5G功耗低、延迟低、速度快,就可以不需要网关了。

整体的架构还算是比较简单的,当然中间也有很多复杂的逻辑,涉及到比如设备状态、人员、权限、房屋的管理等等这里就不关注了。

在服务器下发设备状态的时候就涉及到了推送。因为智能家居有情景模式的概念,比如回家模式,执行一个请求,咔咔打开十几个灯,所以靠请求的返回值判断状态是不太现实的,只能依靠推送。

由于业务的特殊性,智能家居的推送需要有较高的实时性,比如用户打开了灯(无论在app上打开或者直接打开),app上的灯的状态都需要立即变成开。如果等个3、5秒状态再发生改变,这样的用户体验很不好。

我们最早使用的是某光推送,发现延迟严重,有时需要等十几秒才收到推送,毕竟适用场景毕竟不一样。所以决定自建长连接当推送。经过选型,决定采用本文的主角,WebSocket。

WebSocket状态管理现存的问题

市面上有很多现成的WebSocket连接库,比较著名的有Java-WebSocket,OkHttp也自带WebSocket支持。

最初因为项目内已经接入了OkHttp,所以直接使用了OkHttp。使用的方式很简单,熟悉OkHttp的童鞋应该很懂,

OkHttpClient client = OkHttpClient.Builder().build();
Request request = new Request.Builder().build();
client.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(okhttp3.WebSocket webSocket, Response response) {}

    @Override
    public void onMessage(okhttp3.WebSocket webSocket, String text) {}

    @Override
    public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {}

    @Override
    public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {}
});
复制代码 

调用方便,回调状态也很清晰。Java-Websocket也差不多类似,但总体来说有以下几个问题:

1. 心跳机制

无论OkHttp还是Java-WebSocket,都是有心跳机制的。但OkHttp的心跳间隔,也就是pingInterval,在创建client时候就已经固定了,是不支持中途调整的。(Jake大神还在github上回复过相关问题,意思是调用者不需要关注这些。但往往确实有这种需求的啊😢😢😢)

比如应用在前台的时候心跳可能稍微频繁点,但应用出于后台时出于省电的优化,心跳间隔可以设置的稍微长一点。如果需要调整心跳,需要创建新的client,断掉旧的,重新连接。有两种策略:

  • 先断旧的,再连新的。如果服务端的消息没有做缓存策略,在断掉和新连接的间隔可能会漏掉消息。
  • 先连新的,再断旧的。这要在一瞬时就会有两个连接连上服务端,服务端对于消息的状态可能会发生异常。

以上两种策略都需要客户端和服务端先约定好,服务端做额外处理,增加研发成本不说,也有增加出错概率。

2. 断掉重连

通常连接断掉有以下这么几种情况:

  • 客户端主动断(无需重连)
  • 服务端主动断(无需重连)
  • 客户端因为网络原因,比如手机断网,非主动段(需要重连)
  • 运营商因连接空闲导致的连接断开(需要重连)

对于重连,还需要考虑的问题是重连的时间间隔,如果手机网断了,不停地重连同样是失败,所以需要自己制定一个重连次数和时间间隔关系。

总之,重连的逻辑和策略是需要我们自己维护的。

3. 多线程

OkHttp中关于状态的回调发生在OkHttp创建的子线程中,根据状态我们需要发起重连。如果这时候我们的应用发生了前后台切换又要创建新连接,各种连接(重连)交织在一起,那真是一团乱麻。需要考虑的问题有:

  • 同时发起多个连接的情况,需要等上一个连接请求执行完,再根据请求发起下一个连接请求。上一个连接如果连上了,那下一个等待的连接请求就可以直接跳过了。
  • 连接请求的发生在不同线程中,状态的同步

解决方案

因为笔者项目内用的是OkHttp,所以最初的解决方案都是针对于OkHttp来做的。大致思路如下:

1. 心跳机制

1.1 OkHttp WebSocket心跳分析

前面说到过,OkHttp是不支持动态调整心跳的,那OkHttp是怎么维护心跳的呢,我们来分析一下它的源码:

分析源码,我们从调用入口client.newWebSocket开始:

// newWebSocket@OkHttpClient.java

/**
 * 内部使用RealWebSocket类来发起连接
 */
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    // 注意这里的pingInterval,所以在创建RealWebSocket时就已经固定,没法再修改了。
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
    webSocket.connect(this);
    return webSocket;
}
复制代码

// RealWebSocket@RealWebSocket.java

public RealWebSocket(Request request, WebSocketListener listener, Random random, long pingIntervalMillis) {
    this.originalRequest = request;
    this.listener = listener;
    this.random = random;
    this.pingIntervalMillis = pingIntervalMillis;
    // ...
    // 这里的writerRunnable是用来向服务端发送消息的,在后面详述
    this.writerRunnable = new Runnable() {
      @Override public void run() {
        try {
          while (writeOneFrame()) {
          }
        } catch (IOException e) {
          failWebSocket(e, null);
        }
      }
    };
  }
  
  public void connect(OkHttpClient client) {
    // WebSocket协议相关的header
    client = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build();
    final Request request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();
    call = Internal.instance.newWebSocketCall(client, request);
    call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }

        // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
        streamAllocation.noNewStreams(); // Prevent connection pooling!
        Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

        // Process all web socket messages.
        try {
          // 连接成功,调用listern的onOpen
          listener.onOpen(RealWebSocket.this, response);
          String name = "OkHttp WebSocket " + request.url().redact();
          // 重点1:创建reader和writer,见下文
          initReaderAndWriter(name, streams);
          streamAllocation.connection().socket().setSoTimeout(0);
          // 重点2:轮询读
          loopReader();
        } catch (Exception e) {
          failWebSocket(e, null);
        }
      }

      @Override public void onFailure(Call call, IOException e) {
        failWebSocket(e, null);
      }
    });
  }
  
   public void initReaderAndWriter(String name, Streams streams) throws IOException {
    synchronized (this) {
      this.streams = streams;
      this.writer = new WebSocketWriter(streams.client, streams.sink, random);
      // 创建Scheduled线程池
      this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
      if (pingIntervalMillis != 0) {
        // 利用线程池定期运行PingRunnable,越来越接近真像了
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }
      if (!messageAndCloseQueue.isEmpty()) {
        runWriter(); // Send messages that were enqueued before we were connected.
      }
    }
    // 创建reader,用来读取消息
    reader = new WebSocketReader(streams.client, streams.source, this);
  }
复制代码 

重点看一下PingRunnable都做了什么

private final class PingRunnable implements Runnable {
    PingRunnable() {
    }

    @Override public void run() {
      // 发送ping frame
      writePingFrame();
    }
  }

  void writePingFrame() {
    WebSocketWriter writer;
    int failedPing;
    synchronized (this) {
      if (failed) return;
      writer = this.writer;
      // 判断是否在等待pong,如果在等待failedPing置为sentPingCount,否则置为-1
      failedPing = awaitingPong ? sentPingCount : -1;
      sentPingCount++;
      awaitingPong = true;
    }

    if (failedPing != -1) {
      // 运行到此处的条件是上一个pong还没有返回,报错退出
      failWebSocket(new SocketTimeoutException("sent ping but didn't receive pong within "
          + pingIntervalMillis + "ms (after " + (failedPing - 1) + " successful ping/pongs)"),
          null);
      return;
    }

    try {
      // 发送一个ping消息
      writer.writePing(ByteString.EMPTY);
    } catch (IOException e) {
      failWebSocket(e, null);
    }
  }
复制代码 

那么怎么判断有没有收到pong了,就需要用到reader了。我们来看一下reader里做了什么。

还记得前文重点2标记的loopReader么?

// loopReader@RealWebSocket.java

/** Receive frames until there are no more. Invoked only by the reader thread. */
public void loopReader() throws IOException {
    while (receivedCloseCode == -1) {
        // This method call results in one or more onRead* methods being called on this thread.
        // 处理收到的frame
        reader.processNextFrame();
    }
}

/**
 * 根据header判断是消息帧还是控制帧,心跳pong属于控制帧
 */
void processNextFrame() throws IOException {
    readHeader();
    if (isControlFrame) {
      readControlFrame();
    } else {
      readMessageFrame();
    }
  }

/**
 * 解析opcode
 */
private void readControlFrame() throws IOException {
    // ...
    switch (opcode) {
      case OPCODE_CONTROL_PING:
        frameCallback.onReadPing(controlFrameBuffer.readByteString());
        break;
      case OPCODE_CONTROL_PONG:
        // 注意:收到pong,触发onReadPing
        frameCallback.onReadPong(controlFrameBuffer.readByteString());
        break;
      case OPCODE_CONTROL_CLOSE:
        // ...
        frameCallback.onReadClose(code, reason);
        closed = true;
        break;
      default:
        throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
    }
  }
  
  /**
    * 计数器自增,将awaitingPong状态置为false
    */
  @Override public synchronized void onReadPong(ByteString buffer) {
    // This API doesn't expose pings.
    receivedPongCount++;
    awaitingPong = false;
  }
复制代码 

一路追下来发现,其实收到pong之后,okhttp就是简单地改变awaitingPong标记位的状态,就代表收到了上一个pong消息。

总结一下,okhttp的心跳流程,

  1. ReadWebSocket连接的时候创建schedule线程池,定期地执行PingRunnable发送ping消息;(发送其他的请求消息在WriteRunnable中)
  2. 同时创建一个reader不断接受消息;
  3. 发送ping之前,通过awaitingPong标记,判断上一个pong有没有回来;
  4. 对于收到的消息,进行解析,如果收到的pong,改变awaitingPong状态。

1.2 动态调整okhttp心跳

我们发现,只要改变线程池执行PingRunable的delayTime,就能达到在不重新创建OkHttpClient的情况下,动态改变ping的速率。

所以可以用反射的方式,shutDown原来的线程池,创建新的线程池,用新的pingInterval执行pingRunnable。

Class clazz = Class.forName("okhttp3.internal.ws.RealWebSocket");
Field field = clazz.getDeclaredField("executor");
field.setAccessible(true);
ScheduledExecutorService oldService = (ScheduledExecutorService) field.get(mWebSocket);

Class[] innerClasses = Class.forName("okhttp3.internal.ws.RealWebSocket").getDeclaredClasses();
for (Class innerClass : innerClasses) {
    if ("PingRunnable".equals(innerClass.getSimpleName())) {
        // 创建新的PingRunnable实例
        Constructor constructor = innerClass.getDeclaredConstructor(RealWebSocket.class);
        constructor.setAccessible(true);
        Object pingRunnable = constructor.newInstance(mWebSocket);

        // 创建新的线程池
        ScheduledThreadPoolExecutor newService = new ScheduledThreadPoolExecutor(1, Util.threadFactory("ws-ping", false));
        newService.scheduleAtFixedRate((Runnable) pingRunnable, interval, interval, unit);
        field.set(mWebSocket, newService);

        // shutdown旧的线程池
        oldService.shutdown();
    }
}
复制代码 

这样,我们就达成了动态调整心跳的目的。

Java-WebSocket的ping是通过外部调用sendPing()来达到发送ping的目的的,内部没有ping/pong状态机制,所以需要我们自己去维护这个关系。其实可以仿照OkHttp那样,利用定时消息去发送ping,然后解析pong来维护心跳状态。这里就不过多阐述了。

2. 断掉重连

正如前面提过的,断掉重连只要处理好几种断开状态的逻辑即可。好在绝大多数websocket库都分close、error回调,可以区分异常断开和主动断开。

需要自己维护的也就是重连的间隔和重试次数的关系,类似指数退避算法。

3. 多线程

针对上文说到的多线程问题,主要的问题点就是发生在同时发生多个连接请求的时候,与其对多线程加各种同步,不如并行该串行。采用类似消息队列的方式,消息的处理放在单独的线程中取做。这样就可以省却了线程的同步,同时也能保证状态的有序性。

其实Android中,利用HandlerThread就可以完美地满足上述两点,最初笔者也确实是利用HandlerThread来做的。后来觉得可以独立于平台,也可以运行在纯Java的平台上,所以还是自己手动实现了。同时自己维护消息队列,也可以更好地自己处理优先级、延迟等。

WebSocketGo

之前也说过,笔者的项目是基于OkHttp的,但发现状态管理这部分可以单独抽象出来,所以才会有了本文的标题WebSocketGo(以下简称WsGo)。

数据流如下:

  1. Dispatcher就是前文提到的消息队列,就是生产者-消费者模式,维护两个队列,发送command,接收event。

    • command主要有:CONNECT、DISCONNECT、RECONNECT、CHANGE_PING、SEND

    • event主要有:OnConnect、onMessage、onSend、onRetry、onDisConnect、onClose

  2. Channel Manager主要负责状态管理。

  3. WebSocket interface可以理解为适配层,负责调用WebSocket库。

  4. Event Listener就是前台的状态回调。

接入WsGo

implementation 'com.gnepux:wsgo:1.0.2'
// use okhttp
implementation 'com.gnepux:wsgo-okwebsocket:1.0.1'
// use java websocket
implementation 'com.gnepux:wsgo-jwebsocket:1.0.1'
复制代码 

初始化

WsConfig config = new WsConfig.Builder()
        .debugMode(true)    // true to print log
        .setUrl(pushUrl)    // ws url
        .setHttpHeaders(headerMap)  // http headers
        .setConnectTimeout(10 * 1000L)  // connect timeout
        .setReadTimeout(10 * 1000L)     // read timeout
        .setWriteTimeout(10 * 1000L)    // write timeout
        .setPingInterval(10 * 1000L)    // initial ping interval
        .setWebSocket(OkWebSocket.create()) // websocket client
        .setRetryStrategy(retryStrategy)    // retry count and delay time strategy
        .setEventListener(eventListener)    // event listener
        .build();
WsGo.init(config);
复制代码 

Go

// 连接
WsGo.getInstance().connect();
// 发送消息
WsGo.getInstance().send("hello from WsGo");
// 断开
WsGo.getInstance().disconnect(1000, "close");
WsGo.getInstance().disconnectNormal("close");
// 改变心跳
WsGo.getInstance().changePingInterval(10, TimeUnit.SECONDS);
// 释放
WsGo.getInstance().destroyInstance();
复制代码 

WsConfig的更多配置

setWebSocket(WebSocket socket)

WsGo 已经支持OkHttp and Java WebSocket

// for OkHttp (wsgo-okwebsocket)
setWebSocket(OkWebSocket.create());
// for Java WebSocket (wsgo-jwebsocket)
setWebSocket(JWebSocket.create());
复制代码 

如果你需要使用其他的WebSocket库或自定义客户端,只需要实现一个WebSocket接口,将对应结果传递给ChannelCallback即可。剩下的连接管理,WsGo会帮你完成。

public interface WebSocket {
    void connect(WsConfig config, ChannelCallback callback);
    void reconnect(WsConfig config, ChannelCallback callback);
    boolean disconnect(int code, String reason);
    void changePingInterval(long interval, TimeUnit unit);
    boolean send(String msg);
}
复制代码 

setRetryStrategy(RetryStrategy retryStrategy)

对于非正常断开,WsGo会自动重连。RetryStrategy指的是重连次数和延时的关系。

WsGo默认有一个DefaultRetryStrategy,如果你需要自己调整,实现RetryStrategy接口里的onRetry方法即可。

public interface RetryStrategy {
    /**
     * 重试次数和延迟的关系
     *
     * @param retryCount 已经重试的次数
     * @return 延时的时间
     */
    long onRetry(long retryCount);
}
复制代码 

setEventListener(EventListener eventListener)

添加事件回调。需要注意,回调在WsGo自己创建的一个线程中运行,不在调用线程中。如有必要,需要在会调用手动切换线程。

public interface EventListener {
    void onConnect();
    void onDisConnect(Throwable throwable);
    void onClose(int code, String reason);
    void onMessage(String text);
    void onReconnect(long retryCount, long delayMillSec);
    void onSend(String text, boolean success);
}
复制代码 

PS

目前WsGo没有与手机网络的变化通知关联(即断网可以收到回调,网络恢复不会自动重连),因为笔者觉得这部分不属于WebSocket自身的状态管理范畴,而且已经不属于平台无关了。如有有这方便需求,需要自己从外部发起连接。在项目中简单地封装一下就ok啦~

后记

本文稍显长,从最初的智能家居的场景开始,介绍到websocket状态管理现存的问题,然后给出了笔者的解决方案,最后是通用的状态管理框架WebSocketGo。也算是笔者在工作中的一点思考和总结。其实细想,不光WebSocket,所有的长连接应该都会面临相同的问题。但思考的角度和解决的出发点应该都是一样的。

文末

总结

要想成为架构师,那就不要局限在编码,业务,要会选型、扩展,提升编程思维。此外,良好的职业规划也很重要,学习的习惯很重要,但是最重要的还是要能持之以恒,任何不能坚持落实的计划都是空谈。

如果你没有方向,这里给大家分享一套由阿里高级架构师编写的《Android八大模块进阶笔记》,帮大家将杂乱、零散、碎片化的知识进行体系化的整理,让大家系统而高效地掌握Android开发的各个知识点。
在这里插入图片描述
相对于我们平时看的碎片化内容,这份笔记的知识点更系统化,更容易理解和记忆,是严格按照知识体系编排的。

一、架构师筑基必备技能

1、深入理解Java泛型
2、注解深入浅出
3、并发编程
4、数据传输与序列化
5、Java虚拟机原理
6、高效IO
……

在这里插入图片描述

二、Android百大框架源码解析

1.Retrofit 2.0源码解析
2.Okhttp3源码解析
3.ButterKnife源码解析
4.MPAndroidChart 源码解析
5.Glide源码解析
6.Leakcanary 源码解析
7.Universal-lmage-Loader源码解析
8.EventBus 3.0源码解析
9.zxing源码分析
10.Picasso源码解析
11.LottieAndroid使用详解及源码解析
12.Fresco 源码分析——图片加载流程

在这里插入图片描述

三、Android性能优化实战解析

  • 腾讯Bugly:对字符串匹配算法的一点理解
  • 爱奇艺:安卓APP崩溃捕获方案——xCrash
  • 字节跳动:深入理解Gradle框架之一:Plugin, Extension, buildSrc
  • 百度APP技术:Android H5首屏优化实践
  • 支付宝客户端架构解析:Android 客户端启动速度优化之「垃圾回收」
  • 携程:从智行 Android 项目看组件化架构实践
  • 网易新闻构建优化:如何让你的构建速度“势如闪电”?

在这里插入图片描述

四、高级kotlin强化实战

1、Kotlin入门教程
2、Kotlin 实战避坑指南
3、项目实战《Kotlin Jetpack 实战》

  • 从一个膜拜大神的 Demo 开始

  • Kotlin 写 Gradle 脚本是一种什么体验?

  • Kotlin 编程的三重境界

  • Kotlin 高阶函数

  • Kotlin 泛型

  • Kotlin 扩展

  • Kotlin 委托

  • 协程“不为人知”的调试技巧

  • 图解协程:suspend

在这里插入图片描述

五、Android高级UI开源框架进阶解密

1.SmartRefreshLayout的使用
2.Android之PullToRefresh控件源码解析
3.Android-PullToRefresh下拉刷新库基本用法
4.LoadSir-高效易用的加载反馈页管理框架
5.Android通用LoadingView加载框架详解
6.MPAndroidChart实现LineChart(折线图)
7.hellocharts-android使用指南
8.SmartTable使用指南
9.开源项目android-uitableview介绍
10.ExcelPanel 使用指南
11.Android开源项目SlidingMenu深切解析
12.MaterialDrawer使用指南
在这里插入图片描述

六、NDK模块开发

1、NDK 模块开发
2、JNI 模块
3、Native 开发工具
4、Linux 编程
5、底层图片处理
6、音视频开发
7、机器学习

在这里插入图片描述

七、Flutter技术进阶

1、Flutter跨平台开发概述
2、Windows中Flutter开发环境搭建
3、编写你的第一个Flutter APP
4、Flutter开发环境搭建和调试
5、Dart语法篇之基础语法(一)
6、Dart语法篇之集合的使用与源码解析(二)
7、Dart语法篇之集合操作符函数与源码分析(三)

在这里插入图片描述

八、微信小程序开发

1、小程序概述及入门
2、小程序UI开发
3、API操作
4、购物商场项目实战……

在这里插入图片描述

全套视频资料:

一、面试合集
在这里插入图片描述
二、源码解析合集

在这里插入图片描述
三、开源框架合集

在这里插入图片描述
欢迎大家一键三连支持,若需要文中资料,直接点击文末CSDN官方认证微信卡片免费领取【保证100%免费】↓↓↓
在这里插入图片描述