Browse Source

1.添加websocket的基础service

master
wangwanlei 11 months ago
parent
commit
a4cf260a14
1 changed files with 379 additions and 0 deletions
  1. 379
    0
      websocket/src/main/java/com/xhly/websocket/service/LongConnService.java

+ 379
- 0
websocket/src/main/java/com/xhly/websocket/service/LongConnService.java View File

@@ -0,0 +1,379 @@
1
+package com.xhly.websocket.service;
2
+
3
+import android.app.Service;
4
+import android.content.Intent;
5
+import android.os.Handler;
6
+import android.os.IBinder;
7
+import android.text.TextUtils;
8
+import android.util.Log;
9
+import android.widget.Toast;
10
+
11
+import androidx.annotation.Nullable;
12
+
13
+import com.neovisionaries.ws.client.WebSocket;
14
+import com.neovisionaries.ws.client.WebSocketAdapter;
15
+import com.neovisionaries.ws.client.WebSocketException;
16
+import com.neovisionaries.ws.client.WebSocketFactory;
17
+import com.neovisionaries.ws.client.WebSocketFrame;
18
+import com.xhly.corelib.utils.NetworkUtils;
19
+import com.xhly.websocket.utils.WebSocketUtils;
20
+
21
+import java.io.IOException;
22
+import java.util.List;
23
+import java.util.Map;
24
+import java.util.concurrent.TimeUnit;
25
+
26
+import io.reactivex.Observable;
27
+import io.reactivex.Observer;
28
+import io.reactivex.android.schedulers.AndroidSchedulers;
29
+import io.reactivex.disposables.Disposable;
30
+import io.reactivex.schedulers.Schedulers;
31
+
32
+public abstract class LongConnService extends Service {
33
+    /**
34
+     * 聊天
35
+     */
36
+    protected static final int SOCKET_TYPE = 1;
37
+    /**
38
+     * WebSocket config
39
+     */
40
+    private static final int FRAME_QUEUE_SIZE = 5;
41
+    /**
42
+     * 心跳时间间隔单位是s
43
+     */
44
+    private static final long HEART_INTERVAL = 10;
45
+    /**
46
+     * 未收到心跳响应的次数
47
+     */
48
+    public static int HEART_NOT_RECEIVE_TIME = 0;
49
+    /**
50
+     * 是否收到心跳响应
51
+     */
52
+    public static boolean HAS_HEART_RECEIVE = false;
53
+    /**
54
+     * 连接超时时间,单位毫秒
55
+     */
56
+    public static int CONNECT_TIMEOUT = 20000;
57
+    public static boolean closeWebsocket = false;
58
+    public String TAG = LongConnService.class.getSimpleName();
59
+    protected int type = 1;
60
+    protected String connectUrl;
61
+    /**
62
+     * 重连最小时间间隔,单位毫秒
63
+     */
64
+    private long minInterval = 10000;
65
+    /**
66
+     * 重连最大时间间隔,单位毫秒
67
+     */
68
+    private long maxInterval = 60000;
69
+    private Handler mHandler = new Handler();
70
+    /**
71
+     * 重连次数
72
+     */
73
+    private int reconnectCount = 0;
74
+    /**
75
+     * 心跳subscription
76
+     */
77
+    private Disposable heartSubscription;
78
+    /**
79
+     * 是否连接状态
80
+     */
81
+    private boolean connected = false;
82
+    /**
83
+     * 是否允许重连
84
+     */
85
+    private boolean isReConnect = true;
86
+    private WsStatus mStatus;
87
+    private WebSocket webSocket;
88
+    private WsListener mListener;
89
+
90
+    /**
91
+     * 设置已收到心跳
92
+     */
93
+    public static void hasReceiveHeart() {
94
+        HEART_NOT_RECEIVE_TIME = 0;
95
+        HAS_HEART_RECEIVE = true;
96
+    }
97
+
98
+    @Override
99
+    public void onCreate() {
100
+        super.onCreate();
101
+        Log.i(TAG, "onCreate()............");
102
+        mListener = new WsListener();
103
+    }
104
+
105
+    @Override
106
+    public int onStartCommand(Intent intent, int flags, int startId) {
107
+        Log.i(TAG, "onStartCommand()............");
108
+        closeWebsocket = false;
109
+        if (mListener == null) {
110
+            mListener = new WsListener();
111
+        }
112
+        startConnect();
113
+        return START_STICKY;
114
+    }
115
+
116
+    @Override
117
+    public void onDestroy() {
118
+        Log.i(TAG, "onDestroy()............");
119
+        if (heartSubscription != null) {
120
+            heartSubscription.dispose();
121
+        }
122
+        disConnect();
123
+        closeWebsocket = true;
124
+        super.onDestroy();
125
+    }
126
+
127
+    @Nullable
128
+    @Override
129
+    public IBinder onBind(Intent intent) {
130
+        throw new UnsupportedOperationException("Not yet implemented");
131
+    }
132
+
133
+    /**
134
+     * 断开连接
135
+     */
136
+    private void disConnect() {
137
+        if (mHandler != null) {
138
+            mHandler.removeCallbacksAndMessages(null);
139
+        }
140
+        isReConnect = false;
141
+        connected = false;
142
+        if (webSocket != null) {
143
+            setStatus(WsStatus.DISCONNECT);
144
+            cancelReconnect();
145
+            webSocket.disconnect();
146
+            webSocket.clearListeners();  // 清除回调,避免多次链接导致上个链接的消息接收问题
147
+            webSocket = null;
148
+        }
149
+    }
150
+
151
+    /**
152
+     * 取消重连
153
+     */
154
+    private void cancelReconnect() {
155
+        reconnectCount = 0;
156
+    }
157
+
158
+
159
+    /**
160
+     * 链接成功,子类可以拓展
161
+     */
162
+    public void onConnectedSuccess() {
163
+
164
+    }
165
+
166
+    /**
167
+     * 连接长链接
168
+     */
169
+    private synchronized void startConnect() {
170
+        if (TextUtils.isEmpty("用户ID")) {
171
+            setStatus(WsStatus.DISCONNECT);
172
+            return;
173
+        }
174
+        if (!NetworkUtils.isAvailable(this)) {
175
+            Toast.makeText(this, "网络不可用", Toast.LENGTH_SHORT);
176
+            return;
177
+        }
178
+
179
+        isReConnect = true;
180
+      /*
181
+      //存在!=null 并且open时断联的情况,所以注释
182
+      if (webSocket != null && webSocket.isOpen()) {
183
+            setStatus(WsStatus.CONNECT_SUCCESS);
184
+            return;
185
+        }*/
186
+        if (getStatus() == WsStatus.CONNECTING) {
187
+            return;
188
+        }
189
+
190
+        if (!connected) {
191
+            if (webSocket != null) {
192
+                webSocket.disconnect();
193
+                webSocket = null;
194
+            }
195
+            try {
196
+                Log.i(TAG, "第一次连接: " + connectUrl);
197
+                // 异步连接
198
+                webSocket = createWebSocket();
199
+                setStatus(WsStatus.CONNECTING);
200
+//                checkConnectState();
201
+            } catch (IOException e) {
202
+                e.printStackTrace();
203
+            }
204
+        }
205
+    }
206
+
207
+    public WsStatus getStatus() {
208
+        return mStatus;
209
+    }
210
+
211
+    private void setStatus(WsStatus status) {
212
+        this.mStatus = status;
213
+    }
214
+
215
+    private WebSocket createWebSocket() throws IOException {
216
+        Log.i(TAG, "创建连接:createWebSocket()");
217
+        return new WebSocketFactory().createSocket(connectUrl, CONNECT_TIMEOUT)
218
+                //设置帧队列最大值为5
219
+                .setFrameQueueSize(FRAME_QUEUE_SIZE)
220
+                //设置不允许服务端关闭连接却未发送关闭帧
221
+                .setMissingCloseFrameAllowed(false)
222
+                //添加回调监听
223
+                .addListener(mListener).connectAsynchronously();
224
+    }
225
+
226
+    /**
227
+     * 重新连接
228
+     */
229
+    private void reConnect() {
230
+        if (!NetworkUtils.isAvailable(this)) {
231
+            reconnectCount = 0;
232
+            isReConnect = false;
233
+            closeWebsocket = true;
234
+            Log.i(TAG, "重连失败网络不可用,当前closeWebsocket==" + closeWebsocket);
235
+            return;
236
+        }
237
+        if (HAS_HEART_RECEIVE) {
238
+            return;
239
+        }
240
+        if (webSocket != null && !webSocket.isOpen() &&//当前连接断开了
241
+                getStatus() != WsStatus.CONNECTING && !connected) { //不是正在重连状态
242
+            setStatus(WsStatus.CONNECTING);
243
+            long reconnectTime = minInterval;
244
+            if (reconnectCount > 3) {
245
+                long temp = minInterval * (reconnectCount - 2);
246
+                reconnectTime = temp > maxInterval ? maxInterval : temp;
247
+            }
248
+            mHandler.postDelayed(new Runnable() {
249
+                @Override
250
+                public void run() {
251
+                    try {
252
+                        if (webSocket.isOpen() && isConnect()) {
253
+                            return;
254
+                        }
255
+                        if (webSocket != null) {
256
+                            webSocket.disconnect();
257
+                            webSocket = null;
258
+                        }
259
+                        webSocket = createWebSocket();
260
+                        reconnectCount++;
261
+                    } catch (IOException e) {
262
+                        e.printStackTrace();
263
+                    }
264
+                }
265
+            }, reconnectTime);
266
+        }
267
+    }
268
+
269
+    /**
270
+     * 是否连接
271
+     */
272
+    private boolean isConnect() {
273
+        return connected;
274
+    }
275
+
276
+    /**
277
+     * 开始socket心跳
278
+     */
279
+    private void startHeartInterval(final WebSocket websocket) {
280
+        if (HEART_NOT_RECEIVE_TIME > 3) {
281
+            // 有3次心跳没有收到服务器响应,则认为是已经断开连接,则重新连接
282
+            connected = false;
283
+            startConnect();
284
+            return;
285
+        }
286
+        // 在客户端上增加重试的逻辑。先发送心跳包,在N秒内若没有收到回复,则再发送一次。重试M次。
287
+        WebSocketUtils.hearBeat(webSocket);
288
+        HEART_NOT_RECEIVE_TIME++;
289
+        HAS_HEART_RECEIVE = false;
290
+    }
291
+
292
+    /**
293
+     * 处理socket响应消息
294
+     *
295
+     * @param text 要处理的文本
296
+     */
297
+    public abstract void processTextMessage(String text);
298
+
299
+    /**
300
+     * 继承默认的监听空实现WebSocketAdapter,重写我们需要的方法
301
+     * onTextMessage 收到文字信息
302
+     * onConnected 连接成功
303
+     * onConnectError 连接失败
304
+     * onDisconnected 连接关闭
305
+     */
306
+    private class WsListener extends WebSocketAdapter {
307
+        @Override
308
+        public void onConnected(final WebSocket websocket, Map<String, List<String>> headers) throws Exception {
309
+            super.onConnected(websocket, headers);
310
+            Log.i(TAG, "连接成功:" + headers.toString());
311
+            connected = true;
312
+            setStatus(WsStatus.CONNECT_SUCCESS);
313
+            cancelReconnect();// 连接成功的时候取消重连,初始化连接次数
314
+            if (type == SOCKET_TYPE) {
315
+                WebSocketHelper.getInstance().build(websocket);
316
+                //调用成功方法
317
+                onConnectedSuccess();
318
+                Observable.interval(HEART_INTERVAL, TimeUnit.SECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Long>() {
319
+                    @Override
320
+                    public void onSubscribe(Disposable d) {
321
+                        Log.d(TAG, "订阅开始");
322
+                        heartSubscription = d;
323
+                    }
324
+
325
+                    @Override
326
+                    public void onNext(Long aLong) {
327
+                        Log.v(TAG, "发送心跳");
328
+                        startHeartInterval(websocket);
329
+                    }
330
+
331
+                    @Override
332
+                    public void onError(Throwable e) {
333
+
334
+                    }
335
+
336
+                    @Override
337
+                    public void onComplete() {
338
+
339
+                    }
340
+                });
341
+            }
342
+        }
343
+
344
+        @Override
345
+        public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception {
346
+            super.onConnectError(websocket, exception);
347
+            exception.printStackTrace();
348
+            Log.i(TAG, "连接错误: " + exception.getMessage());
349
+            HAS_HEART_RECEIVE = false;
350
+            setStatus(WsStatus.CONNECT_FAIL);
351
+            connected = false;
352
+            if (isReConnect) {
353
+                reConnect();//连接断开的时候调用重连方法
354
+            }
355
+        }
356
+
357
+        @Override
358
+        public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception {
359
+            super.onDisconnected(websocket, serverCloseFrame, clientCloseFrame, closedByServer);
360
+            Log.i(TAG, "断开连接,是否重连" + isReConnect + ",当前closeWebsocket==" + closeWebsocket);
361
+            HAS_HEART_RECEIVE = false;
362
+            connected = false;
363
+            if (heartSubscription != null) {
364
+                heartSubscription.dispose();
365
+            }
366
+            setStatus(WsStatus.CONNECT_FAIL);
367
+            if (isReConnect) {
368
+                reConnect();//连接断开的时候调用重连方法
369
+            }
370
+        }
371
+
372
+        @Override
373
+        public void onTextMessage(WebSocket websocket, String text) throws Exception {
374
+            super.onTextMessage(websocket, text);
375
+            hasReceiveHeart();
376
+            processTextMessage(text);
377
+        }
378
+    }
379
+}

Loading…
Cancel
Save