websocket-client.js 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. import { VirtualWebSocketClient } from './virtual-websocket-client';
  2. import { sleep } from '../utils/utils';
  3. import { genRequestId } from './message';
  4. import { CLOSE_EVENT_CODE, CLOSE_EVENT_CODE_INFO, getWSCloseError } from './ws-event';
  5. import { CloudSDKError, TimeoutError } from '../utils/error';
  6. import { RealtimeErrorMessageError } from './error';
  7. import { ERR_CODE } from '../config/error.config';
  8. import { Db } from '../';
  9. const WS_READY_STATE = {
  10. CONNECTING: 0,
  11. OPEN: 1,
  12. CLOSING: 2,
  13. CLOSED: 3
  14. };
  15. const MAX_RTT_OBSERVED = 3;
  16. const DEFAULT_EXPECTED_EVENT_WAIT_TIME = 5000;
  17. const DEFAULT_UNTRUSTED_RTT_THRESHOLD = 10000;
  18. const DEFAULT_MAX_RECONNECT = 5;
  19. const DEFAULT_WS_RECONNECT_INTERVAL = 10000;
  20. const DEFAULT_PING_FAIL_TOLERANCE = 2;
  21. const DEFAULT_PONG_MISS_TOLERANCE = 2;
  22. const DEFAULT_LOGIN_TIMEOUT = 5000;
  23. export class RealtimeWebSocketClient {
  24. constructor(options) {
  25. this._virtualWSClient = new Set();
  26. this._queryIdClientMap = new Map();
  27. this._watchIdClientMap = new Map();
  28. this._pingFailed = 0;
  29. this._pongMissed = 0;
  30. this._logins = new Map();
  31. this._wsReadySubsribers = [];
  32. this._wsResponseWait = new Map();
  33. this._rttObserved = [];
  34. this.initWebSocketConnection = async (reconnect, availableRetries = this._maxReconnect) => {
  35. if (reconnect && this._reconnectState) {
  36. return;
  37. }
  38. if (reconnect) {
  39. this._reconnectState = true;
  40. }
  41. if (this._wsInitPromise) {
  42. return this._wsInitPromise;
  43. }
  44. if (reconnect) {
  45. this.pauseClients();
  46. }
  47. this.close(CLOSE_EVENT_CODE.ReconnectWebSocket);
  48. this._wsInitPromise = new Promise(async (resolve, reject) => {
  49. try {
  50. const wsSign = await this.getWsSign();
  51. await new Promise(success => {
  52. const url = wsSign.wsUrl || 'wss://tcb-ws.tencentcloudapi.com';
  53. this._ws = Db.wsClass ? new Db.wsClass(url) : new WebSocket(url);
  54. success();
  55. });
  56. if (this._ws.connect) {
  57. await this._ws.connect();
  58. }
  59. await this.initWebSocketEvent();
  60. resolve();
  61. if (reconnect) {
  62. this.resumeClients();
  63. this._reconnectState = false;
  64. }
  65. }
  66. catch (e) {
  67. console.error('[realtime] initWebSocketConnection connect fail', e);
  68. if (availableRetries > 0) {
  69. const isConnected = true;
  70. this._wsInitPromise = undefined;
  71. if (isConnected) {
  72. await sleep(this._reconnectInterval);
  73. if (reconnect) {
  74. this._reconnectState = false;
  75. }
  76. }
  77. resolve(this.initWebSocketConnection(reconnect, availableRetries - 1));
  78. }
  79. else {
  80. reject(e);
  81. if (reconnect) {
  82. this.closeAllClients(new CloudSDKError({
  83. errCode: ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_RECONNECT_WATCH_FAIL,
  84. errMsg: e
  85. }));
  86. }
  87. }
  88. }
  89. });
  90. try {
  91. await this._wsInitPromise;
  92. this._wsReadySubsribers.forEach(({ resolve }) => resolve());
  93. }
  94. catch (e) {
  95. this._wsReadySubsribers.forEach(({ reject }) => reject());
  96. }
  97. finally {
  98. this._wsInitPromise = undefined;
  99. this._wsReadySubsribers = [];
  100. }
  101. };
  102. this.initWebSocketEvent = () => new Promise((resolve, reject) => {
  103. if (!this._ws) {
  104. throw new Error('can not initWebSocketEvent, ws not exists');
  105. }
  106. let wsOpened = false;
  107. this._ws.onopen = event => {
  108. console.warn('[realtime] ws event: open', event);
  109. wsOpened = true;
  110. resolve();
  111. };
  112. this._ws.onerror = event => {
  113. this._logins = new Map();
  114. if (!wsOpened) {
  115. console.error('[realtime] ws open failed with ws event: error', event);
  116. reject(event);
  117. }
  118. else {
  119. console.error('[realtime] ws event: error', event);
  120. this.clearHeartbeat();
  121. this._virtualWSClient.forEach(client => client.closeWithError(new CloudSDKError({
  122. errCode: ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_WEBSOCKET_CONNECTION_ERROR,
  123. errMsg: event
  124. })));
  125. }
  126. };
  127. this._ws.onclose = closeEvent => {
  128. console.warn('[realtime] ws event: close', closeEvent);
  129. this._logins = new Map();
  130. this.clearHeartbeat();
  131. switch (closeEvent.code) {
  132. case CLOSE_EVENT_CODE.ReconnectWebSocket: {
  133. break;
  134. }
  135. case CLOSE_EVENT_CODE.NoRealtimeListeners: {
  136. break;
  137. }
  138. case CLOSE_EVENT_CODE.HeartbeatPingError:
  139. case CLOSE_EVENT_CODE.HeartbeatPongTimeoutError:
  140. case CLOSE_EVENT_CODE.NormalClosure:
  141. case CLOSE_EVENT_CODE.AbnormalClosure: {
  142. if (this._maxReconnect > 0) {
  143. this.initWebSocketConnection(true, this._maxReconnect);
  144. }
  145. else {
  146. this.closeAllClients(getWSCloseError(closeEvent.code));
  147. }
  148. break;
  149. }
  150. case CLOSE_EVENT_CODE.NoAuthentication: {
  151. this.closeAllClients(getWSCloseError(closeEvent.code, closeEvent.reason));
  152. break;
  153. }
  154. default: {
  155. if (this._maxReconnect > 0) {
  156. this.initWebSocketConnection(true, this._maxReconnect);
  157. }
  158. else {
  159. this.closeAllClients(getWSCloseError(closeEvent.code));
  160. }
  161. }
  162. }
  163. };
  164. this._ws.onmessage = res => {
  165. const rawMsg = res.data;
  166. this.heartbeat();
  167. let msg;
  168. try {
  169. msg = JSON.parse(rawMsg);
  170. }
  171. catch (e) {
  172. throw new Error(`[realtime] onMessage parse res.data error: ${e}`);
  173. }
  174. if (msg.msgType === 'ERROR') {
  175. let virtualWatch = null;
  176. this._virtualWSClient.forEach(item => {
  177. if (item.watchId === msg.watchId) {
  178. virtualWatch = item;
  179. }
  180. });
  181. if (virtualWatch) {
  182. virtualWatch.listener.onError(msg);
  183. }
  184. }
  185. const responseWaitSpec = this._wsResponseWait.get(msg.requestId);
  186. if (responseWaitSpec) {
  187. try {
  188. if (msg.msgType === 'ERROR') {
  189. responseWaitSpec.reject(new RealtimeErrorMessageError(msg));
  190. }
  191. else {
  192. responseWaitSpec.resolve(msg);
  193. }
  194. }
  195. catch (e) {
  196. console.error('ws onMessage responseWaitSpec.resolve(msg) errored:', e);
  197. }
  198. finally {
  199. this._wsResponseWait.delete(msg.requestId);
  200. }
  201. if (responseWaitSpec.skipOnMessage) {
  202. return;
  203. }
  204. }
  205. if (msg.msgType === 'PONG') {
  206. if (this._lastPingSendTS) {
  207. const rtt = Date.now() - this._lastPingSendTS;
  208. if (rtt > DEFAULT_UNTRUSTED_RTT_THRESHOLD) {
  209. console.warn(`[realtime] untrusted rtt observed: ${rtt}`);
  210. return;
  211. }
  212. if (this._rttObserved.length >= MAX_RTT_OBSERVED) {
  213. this._rttObserved.splice(0, this._rttObserved.length - MAX_RTT_OBSERVED + 1);
  214. }
  215. this._rttObserved.push(rtt);
  216. }
  217. return;
  218. }
  219. let client = msg.watchId && this._watchIdClientMap.get(msg.watchId);
  220. if (client) {
  221. client.onMessage(msg);
  222. }
  223. else {
  224. console.error(`[realtime] no realtime listener found responsible for watchId ${msg.watchId}: `, msg);
  225. switch (msg.msgType) {
  226. case 'INIT_EVENT':
  227. case 'NEXT_EVENT':
  228. case 'CHECK_EVENT': {
  229. client = this._queryIdClientMap.get(msg.msgData.queryID);
  230. if (client) {
  231. client.onMessage(msg);
  232. }
  233. break;
  234. }
  235. default: {
  236. for (const [, client] of this._watchIdClientMap) {
  237. client.onMessage(msg);
  238. break;
  239. }
  240. }
  241. }
  242. }
  243. };
  244. this.heartbeat();
  245. });
  246. this.isWSConnected = () => {
  247. return Boolean(this._ws && this._ws.readyState === WS_READY_STATE.OPEN);
  248. };
  249. this.onceWSConnected = async () => {
  250. if (this.isWSConnected()) {
  251. return;
  252. }
  253. if (this._wsInitPromise) {
  254. return this._wsInitPromise;
  255. }
  256. return new Promise((resolve, reject) => {
  257. this._wsReadySubsribers.push({
  258. resolve,
  259. reject
  260. });
  261. });
  262. };
  263. this.webLogin = async (envId, refresh) => {
  264. if (!refresh) {
  265. if (envId) {
  266. const loginInfo = this._logins.get(envId);
  267. if (loginInfo) {
  268. if (loginInfo.loggedIn && loginInfo.loginResult) {
  269. return loginInfo.loginResult;
  270. }
  271. else if (loginInfo.loggingInPromise) {
  272. return loginInfo.loggingInPromise;
  273. }
  274. }
  275. }
  276. else {
  277. const emptyEnvLoginInfo = this._logins.get('');
  278. if (emptyEnvLoginInfo && emptyEnvLoginInfo.loggingInPromise) {
  279. return emptyEnvLoginInfo.loggingInPromise;
  280. }
  281. }
  282. }
  283. const promise = new Promise(async (resolve, reject) => {
  284. try {
  285. const wsSign = await this.getWsSign();
  286. const msgData = {
  287. envId: wsSign.envId || '',
  288. accessToken: '',
  289. referrer: 'web',
  290. sdkVersion: '',
  291. dataVersion: Db.dataVersion || ''
  292. };
  293. const loginMsg = {
  294. watchId: undefined,
  295. requestId: genRequestId(),
  296. msgType: 'LOGIN',
  297. msgData,
  298. exMsgData: {
  299. runtime: Db.runtime,
  300. signStr: wsSign.signStr,
  301. secretVersion: wsSign.secretVersion
  302. }
  303. };
  304. const loginResMsg = await this.send({
  305. msg: loginMsg,
  306. waitResponse: true,
  307. skipOnMessage: true,
  308. timeout: DEFAULT_LOGIN_TIMEOUT
  309. });
  310. if (!loginResMsg.msgData.code) {
  311. resolve({
  312. envId: wsSign.envId
  313. });
  314. }
  315. else {
  316. reject(new Error(`${loginResMsg.msgData.code} ${loginResMsg.msgData.message}`));
  317. }
  318. }
  319. catch (e) {
  320. reject(e);
  321. }
  322. });
  323. let loginInfo = envId && this._logins.get(envId);
  324. const loginStartTS = Date.now();
  325. if (loginInfo) {
  326. loginInfo.loggedIn = false;
  327. loginInfo.loggingInPromise = promise;
  328. loginInfo.loginStartTS = loginStartTS;
  329. }
  330. else {
  331. loginInfo = {
  332. loggedIn: false,
  333. loggingInPromise: promise,
  334. loginStartTS
  335. };
  336. this._logins.set(envId || '', loginInfo);
  337. }
  338. try {
  339. const loginResult = await promise;
  340. const curLoginInfo = envId && this._logins.get(envId);
  341. if (curLoginInfo &&
  342. curLoginInfo === loginInfo &&
  343. curLoginInfo.loginStartTS === loginStartTS) {
  344. loginInfo.loggedIn = true;
  345. loginInfo.loggingInPromise = undefined;
  346. loginInfo.loginStartTS = undefined;
  347. loginInfo.loginResult = loginResult;
  348. return loginResult;
  349. }
  350. else if (curLoginInfo) {
  351. if (curLoginInfo.loggedIn && curLoginInfo.loginResult) {
  352. return curLoginInfo.loginResult;
  353. }
  354. else if (curLoginInfo.loggingInPromise) {
  355. return curLoginInfo.loggingInPromise;
  356. }
  357. else {
  358. throw new Error('ws unexpected login info');
  359. }
  360. }
  361. else {
  362. throw new Error('ws login info reset');
  363. }
  364. }
  365. catch (e) {
  366. loginInfo.loggedIn = false;
  367. loginInfo.loggingInPromise = undefined;
  368. loginInfo.loginStartTS = undefined;
  369. loginInfo.loginResult = undefined;
  370. throw e;
  371. }
  372. };
  373. this.getWsSign = async () => {
  374. if (this._wsSign && this._wsSign.expiredTs > Date.now()) {
  375. return this._wsSign;
  376. }
  377. const expiredTs = Date.now() + 60000;
  378. const res = await this._context.appConfig.request.send('auth.wsWebSign', { runtime: Db.runtime });
  379. if (res.code) {
  380. throw new Error(`[tcb-js-sdk] 获取实时数据推送登录票据失败: ${res.code}`);
  381. }
  382. if (res.data) {
  383. const { signStr, wsUrl, secretVersion, envId } = res.data;
  384. return {
  385. signStr,
  386. wsUrl,
  387. secretVersion,
  388. envId,
  389. expiredTs
  390. };
  391. }
  392. else {
  393. throw new Error('[tcb-js-sdk] 获取实时数据推送登录票据失败');
  394. }
  395. };
  396. this.getWaitExpectedTimeoutLength = () => {
  397. if (!this._rttObserved.length) {
  398. return DEFAULT_EXPECTED_EVENT_WAIT_TIME;
  399. }
  400. return ((this._rttObserved.reduce((acc, cur) => acc + cur) /
  401. this._rttObserved.length) *
  402. 1.5);
  403. };
  404. this.ping = async () => {
  405. const msg = {
  406. watchId: undefined,
  407. requestId: genRequestId(),
  408. msgType: 'PING',
  409. msgData: null
  410. };
  411. await this.send({
  412. msg
  413. });
  414. };
  415. this.send = async (opts) => new Promise(async (_resolve, _reject) => {
  416. let timeoutId;
  417. let _hasResolved = false;
  418. let _hasRejected = false;
  419. const resolve = (value) => {
  420. _hasResolved = true;
  421. timeoutId && clearTimeout(timeoutId);
  422. _resolve(value);
  423. };
  424. const reject = (error) => {
  425. _hasRejected = true;
  426. timeoutId && clearTimeout(timeoutId);
  427. _reject(error);
  428. };
  429. if (opts.timeout) {
  430. timeoutId = setTimeout(async () => {
  431. if (!_hasResolved || !_hasRejected) {
  432. await sleep(0);
  433. if (!_hasResolved || !_hasRejected) {
  434. reject(new TimeoutError('wsclient.send timedout'));
  435. }
  436. }
  437. }, opts.timeout);
  438. }
  439. try {
  440. if (this._wsInitPromise) {
  441. await this._wsInitPromise;
  442. }
  443. if (!this._ws) {
  444. reject(new Error('invalid state: ws connection not exists, can not send message'));
  445. return;
  446. }
  447. if (this._ws.readyState !== WS_READY_STATE.OPEN) {
  448. reject(new Error(`ws readyState invalid: ${this._ws.readyState}, can not send message`));
  449. return;
  450. }
  451. if (opts.waitResponse) {
  452. this._wsResponseWait.set(opts.msg.requestId, {
  453. resolve,
  454. reject,
  455. skipOnMessage: opts.skipOnMessage
  456. });
  457. }
  458. try {
  459. await this._ws.send(JSON.stringify(opts.msg));
  460. if (!opts.waitResponse) {
  461. resolve();
  462. }
  463. }
  464. catch (err) {
  465. if (err) {
  466. reject(err);
  467. if (opts.waitResponse) {
  468. this._wsResponseWait.delete(opts.msg.requestId);
  469. }
  470. }
  471. }
  472. }
  473. catch (e) {
  474. reject(e);
  475. }
  476. });
  477. this.closeAllClients = (error) => {
  478. this._virtualWSClient.forEach(client => {
  479. client.closeWithError(error);
  480. });
  481. };
  482. this.pauseClients = (clients) => {
  483. ;
  484. (clients || this._virtualWSClient).forEach(client => {
  485. client.pause();
  486. });
  487. };
  488. this.resumeClients = (clients) => {
  489. ;
  490. (clients || this._virtualWSClient).forEach(client => {
  491. client.resume();
  492. });
  493. };
  494. this.onWatchStart = (client, queryID) => {
  495. this._queryIdClientMap.set(queryID, client);
  496. };
  497. this.onWatchClose = (client, queryID) => {
  498. if (queryID) {
  499. this._queryIdClientMap.delete(queryID);
  500. }
  501. this._watchIdClientMap.delete(client.watchId);
  502. this._virtualWSClient.delete(client);
  503. if (!this._virtualWSClient.size) {
  504. this.close(CLOSE_EVENT_CODE.NoRealtimeListeners);
  505. }
  506. };
  507. this._maxReconnect = options.maxReconnect || DEFAULT_MAX_RECONNECT;
  508. this._reconnectInterval =
  509. options.reconnectInterval || DEFAULT_WS_RECONNECT_INTERVAL;
  510. this._context = options.context;
  511. }
  512. heartbeat(immediate) {
  513. this.clearHeartbeat();
  514. this._pingTimeoutId = setTimeout(async () => {
  515. try {
  516. if (!this._ws || this._ws.readyState !== WS_READY_STATE.OPEN) {
  517. return;
  518. }
  519. this._lastPingSendTS = Date.now();
  520. await this.ping();
  521. this._pingFailed = 0;
  522. this._pongTimeoutId = setTimeout(() => {
  523. console.error('pong timed out');
  524. if (this._pongMissed < DEFAULT_PONG_MISS_TOLERANCE) {
  525. this._pongMissed++;
  526. this.heartbeat(true);
  527. }
  528. else {
  529. this.initWebSocketConnection(true);
  530. }
  531. }, this._context.appConfig.realtimePongWaitTimeout);
  532. }
  533. catch (e) {
  534. if (this._pingFailed < DEFAULT_PING_FAIL_TOLERANCE) {
  535. this._pingFailed++;
  536. this.heartbeat();
  537. }
  538. else {
  539. this.close(CLOSE_EVENT_CODE.HeartbeatPingError);
  540. }
  541. }
  542. }, immediate ? 0 : this._context.appConfig.realtimePingInterval);
  543. }
  544. clearHeartbeat() {
  545. this._pingTimeoutId && clearTimeout(this._pingTimeoutId);
  546. this._pongTimeoutId && clearTimeout(this._pongTimeoutId);
  547. }
  548. close(code) {
  549. this.clearHeartbeat();
  550. if (this._ws) {
  551. this._ws.close(code, CLOSE_EVENT_CODE_INFO[code].name);
  552. this._ws = undefined;
  553. }
  554. }
  555. watch(options) {
  556. if (!this._ws && !this._wsInitPromise) {
  557. this.initWebSocketConnection(false);
  558. }
  559. const virtualClient = new VirtualWebSocketClient(Object.assign(Object.assign({}, options), { send: this.send, login: this.webLogin, isWSConnected: this.isWSConnected, onceWSConnected: this.onceWSConnected, getWaitExpectedTimeoutLength: this.getWaitExpectedTimeoutLength, onWatchStart: this.onWatchStart, onWatchClose: this.onWatchClose, debug: true }));
  560. this._virtualWSClient.add(virtualClient);
  561. this._watchIdClientMap.set(virtualClient.watchId, virtualClient);
  562. return virtualClient.listener;
  563. }
  564. }