"use strict"; Object.defineProperty(exports, "__esModule", { value: true }); const lodash_set_1 = require("lodash.set"); const lodash_unset_1 = require("lodash.unset"); const lodash_clonedeep_1 = require("lodash.clonedeep"); const message_1 = require("./message"); const error_1 = require("../utils/error"); const error_config_1 = require("../config/error.config"); const utils_1 = require("../utils/utils"); const listener_1 = require("./listener"); const snapshot_1 = require("./snapshot"); const error_2 = require("./error"); var WATCH_STATUS; (function (WATCH_STATUS) { WATCH_STATUS["LOGGINGIN"] = "LOGGINGIN"; WATCH_STATUS["INITING"] = "INITING"; WATCH_STATUS["REBUILDING"] = "REBUILDING"; WATCH_STATUS["ACTIVE"] = "ACTIVE"; WATCH_STATUS["ERRORED"] = "ERRORED"; WATCH_STATUS["CLOSING"] = "CLOSING"; WATCH_STATUS["CLOSED"] = "CLOSED"; WATCH_STATUS["PAUSED"] = "PAUSED"; WATCH_STATUS["RESUMING"] = "RESUMING"; })(WATCH_STATUS || (WATCH_STATUS = {})); const DEFAULT_WAIT_TIME_ON_UNKNOWN_ERROR = 100; const DEFAULT_MAX_AUTO_RETRY_ON_ERROR = 2; const DEFAULT_MAX_SEND_ACK_AUTO_RETRY_ON_ERROR = 2; const DEFAULT_SEND_ACK_DEBOUNCE_TIMEOUT = 10 * 1000; const DEFAULT_INIT_WATCH_TIMEOUT = 10 * 1000; const DEFAULT_REBUILD_WATCH_TIMEOUT = 10 * 1000; class VirtualWebSocketClient { constructor(options) { this.watchStatus = WATCH_STATUS.INITING; this._login = async (envId, refresh) => { this.watchStatus = WATCH_STATUS.LOGGINGIN; const loginResult = await this.login(envId, refresh); if (!this.envId) { this.envId = loginResult.envId; } return loginResult; }; this.initWatch = async (forceRefreshLogin) => { if (this._initWatchPromise) { return this._initWatchPromise; } this._initWatchPromise = new Promise(async (resolve, reject) => { try { if (this.watchStatus === WATCH_STATUS.PAUSED) { console.log('[realtime] initWatch cancelled on pause'); return resolve(); } const { envId } = await this._login(this.envId, forceRefreshLogin); if (this.watchStatus === WATCH_STATUS.PAUSED) { console.log('[realtime] initWatch cancelled on pause'); return resolve(); } this.watchStatus = WATCH_STATUS.INITING; const initWatchMsg = { watchId: this.watchId, requestId: message_1.genRequestId(), msgType: 'INIT_WATCH', msgData: { envId, collName: this.collectionName, query: this.query, limit: this.limit, orderBy: this.orderBy } }; const initEventMsg = await this.send({ msg: initWatchMsg, waitResponse: true, skipOnMessage: true, timeout: DEFAULT_INIT_WATCH_TIMEOUT }); const { events, currEvent } = initEventMsg.msgData; this.sessionInfo = { queryID: initEventMsg.msgData.queryID, currentEventId: currEvent - 1, currentDocs: [] }; if (events.length > 0) { for (const e of events) { e.ID = currEvent; } this.handleServerEvents(initEventMsg); } else { this.sessionInfo.currentEventId = currEvent; const snapshot = new snapshot_1.Snapshot({ id: currEvent, docChanges: [], docs: [], type: 'init' }); this.listener.onChange(snapshot); this.scheduleSendACK(); } this.onWatchStart(this, this.sessionInfo.queryID); this.watchStatus = WATCH_STATUS.ACTIVE; this._availableRetries.INIT_WATCH = DEFAULT_MAX_AUTO_RETRY_ON_ERROR; resolve(); } catch (e) { this.handleWatchEstablishmentError(e, { operationName: 'INIT_WATCH', resolve, reject }); } }); let success = false; try { await this._initWatchPromise; success = true; } finally { this._initWatchPromise = undefined; } console.log(`[realtime] initWatch ${success ? 'success' : 'fail'}`); }; this.rebuildWatch = async (forceRefreshLogin) => { if (this._rebuildWatchPromise) { return this._rebuildWatchPromise; } this._rebuildWatchPromise = new Promise(async (resolve, reject) => { try { if (this.watchStatus === WATCH_STATUS.PAUSED) { console.log('[realtime] rebuildWatch cancelled on pause'); return resolve(); } const { envId } = await this._login(this.envId, forceRefreshLogin); if (!this.sessionInfo) { throw new Error('can not rebuildWatch without a successful initWatch (lack of sessionInfo)'); } if (this.watchStatus === WATCH_STATUS.PAUSED) { console.log('[realtime] rebuildWatch cancelled on pause'); return resolve(); } this.watchStatus = WATCH_STATUS.REBUILDING; const rebuildWatchMsg = { watchId: this.watchId, requestId: message_1.genRequestId(), msgType: 'REBUILD_WATCH', msgData: { envId, collName: this.collectionName, queryID: this.sessionInfo.queryID, eventID: this.sessionInfo.currentEventId } }; const nextEventMsg = await this.send({ msg: rebuildWatchMsg, waitResponse: true, skipOnMessage: false, timeout: DEFAULT_REBUILD_WATCH_TIMEOUT }); this.handleServerEvents(nextEventMsg); this.watchStatus = WATCH_STATUS.ACTIVE; this._availableRetries.REBUILD_WATCH = DEFAULT_MAX_AUTO_RETRY_ON_ERROR; resolve(); } catch (e) { this.handleWatchEstablishmentError(e, { operationName: 'REBUILD_WATCH', resolve, reject }); } }); let success = false; try { await this._rebuildWatchPromise; success = true; } finally { this._rebuildWatchPromise = undefined; } console.log(`[realtime] rebuildWatch ${success ? 'success' : 'fail'}`); }; this.handleWatchEstablishmentError = async (e, options) => { const isInitWatch = options.operationName === 'INIT_WATCH'; const abortWatch = () => { this.closeWithError(new error_1.CloudSDKError({ errCode: isInitWatch ? error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_INIT_WATCH_FAIL : error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_REBUILD_WATCH_FAIL, errMsg: e })); options.reject(e); }; const retry = (refreshLogin) => { if (this.useRetryTicket(options.operationName)) { if (isInitWatch) { this._initWatchPromise = undefined; options.resolve(this.initWatch(refreshLogin)); } else { this._rebuildWatchPromise = undefined; options.resolve(this.rebuildWatch(refreshLogin)); } } else { abortWatch(); } }; this.handleCommonError(e, { onSignError: () => retry(true), onTimeoutError: () => retry(false), onNotRetryableError: abortWatch, onCancelledError: options.reject, onUnknownError: async () => { try { const onWSDisconnected = async () => { this.pause(); await this.onceWSConnected(); retry(true); }; if (!this.isWSConnected()) { await onWSDisconnected(); } else { await utils_1.sleep(DEFAULT_WAIT_TIME_ON_UNKNOWN_ERROR); if (this.watchStatus === WATCH_STATUS.PAUSED) { options.reject(new error_1.CancelledError(`${options.operationName} cancelled due to pause after unknownError`)); } else if (!this.isWSConnected()) { await onWSDisconnected(); } else { retry(false); } } } catch (e) { retry(true); } } }); }; this.closeWatch = async () => { const queryId = this.sessionInfo ? this.sessionInfo.queryID : ''; if (this.watchStatus !== WATCH_STATUS.ACTIVE) { this.watchStatus = WATCH_STATUS.CLOSED; this.onWatchClose(this, queryId); return; } try { this.watchStatus = WATCH_STATUS.CLOSING; const closeWatchMsg = { watchId: this.watchId, requestId: message_1.genRequestId(), msgType: 'CLOSE_WATCH', msgData: null }; await this.send({ msg: closeWatchMsg }); this.sessionInfo = undefined; this.watchStatus = WATCH_STATUS.CLOSED; } catch (e) { this.closeWithError(new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_CLOSE_WATCH_FAIL, errMsg: e })); } finally { this.onWatchClose(this, queryId); } }; this.scheduleSendACK = () => { this.clearACKSchedule(); this._ackTimeoutId = setTimeout(() => { if (this._waitExpectedTimeoutId) { this.scheduleSendACK(); } else { this.sendACK(); } }, DEFAULT_SEND_ACK_DEBOUNCE_TIMEOUT); }; this.clearACKSchedule = () => { if (this._ackTimeoutId) { clearTimeout(this._ackTimeoutId); } }; this.sendACK = async () => { try { if (this.watchStatus !== WATCH_STATUS.ACTIVE) { this.scheduleSendACK(); return; } if (!this.sessionInfo) { console.warn('[realtime listener] can not send ack without a successful initWatch (lack of sessionInfo)'); return; } const ackMsg = { watchId: this.watchId, requestId: message_1.genRequestId(), msgType: 'CHECK_LAST', msgData: { queryID: this.sessionInfo.queryID, eventID: this.sessionInfo.currentEventId } }; await this.send({ msg: ackMsg }); this.scheduleSendACK(); } catch (e) { if (error_2.isRealtimeErrorMessageError(e)) { const msg = e.payload; switch (msg.msgData.code) { case 'CHECK_LOGIN_FAILED': case 'SIGN_EXPIRED_ERROR': case 'SIGN_INVALID_ERROR': case 'SIGN_PARAM_INVALID': { this.rebuildWatch(); return; } case 'QUERYID_INVALID_ERROR': case 'SYS_ERR': case 'INVALIID_ENV': case 'COLLECTION_PERMISSION_DENIED': { this.closeWithError(new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_CHECK_LAST_FAIL, errMsg: msg.msgData.code })); return; } default: { break; } } } if (this._availableRetries.CHECK_LAST && this._availableRetries.CHECK_LAST > 0) { this._availableRetries.CHECK_LAST--; this.scheduleSendACK(); } else { this.closeWithError(new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_CHECK_LAST_FAIL, errMsg: e })); } } }; this.handleCommonError = (e, options) => { if (error_2.isRealtimeErrorMessageError(e)) { const msg = e.payload; switch (msg.msgData.code) { case 'CHECK_LOGIN_FAILED': case 'SIGN_EXPIRED_ERROR': case 'SIGN_INVALID_ERROR': case 'SIGN_PARAM_INVALID': { options.onSignError(e); return; } case 'QUERYID_INVALID_ERROR': case 'SYS_ERR': case 'INVALIID_ENV': case 'COLLECTION_PERMISSION_DENIED': { options.onNotRetryableError(e); return; } default: { options.onNotRetryableError(e); return; } } } else if (error_1.isTimeoutError(e)) { options.onTimeoutError(e); return; } else if (error_1.isCancelledError(e)) { options.onCancelledError(e); return; } options.onUnknownError(e); }; this.watchId = `watchid_${+new Date()}_${Math.random()}`; this.envId = options.envId; this.collectionName = options.collectionName; this.query = options.query; this.limit = options.limit; this.orderBy = options.orderBy; this.send = options.send; this.login = options.login; this.isWSConnected = options.isWSConnected; this.onceWSConnected = options.onceWSConnected; this.getWaitExpectedTimeoutLength = options.getWaitExpectedTimeoutLength; this.onWatchStart = options.onWatchStart; this.onWatchClose = options.onWatchClose; this.debug = options.debug; this._availableRetries = { INIT_WATCH: DEFAULT_MAX_AUTO_RETRY_ON_ERROR, REBUILD_WATCH: DEFAULT_MAX_AUTO_RETRY_ON_ERROR, CHECK_LAST: DEFAULT_MAX_SEND_ACK_AUTO_RETRY_ON_ERROR }; this.listener = new listener_1.RealtimeListener({ close: this.closeWatch, onChange: options.onChange, onError: options.onError, debug: this.debug, virtualClient: this }); this.initWatch(); } useRetryTicket(operationName) { if (this._availableRetries[operationName] && this._availableRetries[operationName] > 0) { this._availableRetries[operationName]--; console.log(`[realtime] ${operationName} use a retry ticket, now only ${this._availableRetries[operationName]} retry left`); return true; } return false; } async handleServerEvents(msg) { try { this.scheduleSendACK(); await this._handleServerEvents(msg); this._postHandleServerEventsValidityCheck(msg); } catch (e) { console.error('[realtime listener] internal non-fatal error: handle server events failed with error: ', e); throw e; } } async _handleServerEvents(msg) { const { requestId } = msg; const { events } = msg.msgData; const { msgType } = msg; if (!events.length || !this.sessionInfo) { return; } const sessionInfo = this.sessionInfo; let allChangeEvents; try { allChangeEvents = events.map(getPublicEvent); } catch (e) { this.closeWithError(new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_RECEIVE_INVALID_SERVER_DATA, errMsg: e })); return; } let docs = [...sessionInfo.currentDocs]; let initEncountered = false; for (let i = 0, len = allChangeEvents.length; i < len; i++) { const change = allChangeEvents[i]; if (sessionInfo.currentEventId >= change.id) { if (!allChangeEvents[i - 1] || change.id > allChangeEvents[i - 1].id) { console.warn(`[realtime] duplicate event received, cur ${sessionInfo.currentEventId} but got ${change.id}`); } else { console.error(`[realtime listener] server non-fatal error: events out of order (the latter event's id is smaller than that of the former) (requestId ${requestId})`); } continue; } else if (sessionInfo.currentEventId === change.id - 1) { switch (change.dataType) { case 'update': { if (!change.doc) { switch (change.queueType) { case 'update': case 'dequeue': { const localDoc = docs.find(doc => doc._id === change.docId); if (localDoc) { const doc = lodash_clonedeep_1.default(localDoc); if (change.updatedFields) { for (const fieldPath in change.updatedFields) { lodash_set_1.default(doc, fieldPath, change.updatedFields[fieldPath]); } } if (change.removedFields) { for (const fieldPath of change.removedFields) { lodash_unset_1.default(doc, fieldPath); } } change.doc = doc; } else { console.error('[realtime listener] internal non-fatal server error: unexpected update dataType event where no doc is associated.'); } break; } case 'enqueue': { const err = new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_UNEXPECTED_FATAL_ERROR, errMsg: `HandleServerEvents: full doc is not provided with dataType="update" and queueType="enqueue" (requestId ${msg.requestId})` }); this.closeWithError(err); throw err; } default: { break; } } } break; } case 'replace': { if (!change.doc) { const err = new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_UNEXPECTED_FATAL_ERROR, errMsg: `HandleServerEvents: full doc is not provided with dataType="replace" (requestId ${msg.requestId})` }); this.closeWithError(err); throw err; } break; } case 'remove': { const doc = docs.find(doc => doc._id === change.docId); if (doc) { change.doc = doc; } else { console.error('[realtime listener] internal non-fatal server error: unexpected remove event where no doc is associated.'); } break; } case 'limit': { if (!change.doc) { switch (change.queueType) { case 'dequeue': { const doc = docs.find(doc => doc._id === change.docId); if (doc) { change.doc = doc; } else { console.error('[realtime listener] internal non-fatal server error: unexpected limit dataType event where no doc is associated.'); } break; } case 'enqueue': { const err = new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_UNEXPECTED_FATAL_ERROR, errMsg: `HandleServerEvents: full doc is not provided with dataType="limit" and queueType="enqueue" (requestId ${msg.requestId})` }); this.closeWithError(err); throw err; } default: { break; } } } break; } } switch (change.queueType) { case 'init': { if (!initEncountered) { initEncountered = true; docs = [change.doc]; } else { docs.push(change.doc); } break; } case 'enqueue': { docs.push(change.doc); break; } case 'dequeue': { const ind = docs.findIndex(doc => doc._id === change.docId); if (ind > -1) { docs.splice(ind, 1); } else { console.error('[realtime listener] internal non-fatal server error: unexpected dequeue event where no doc is associated.'); } break; } case 'update': { const ind = docs.findIndex(doc => doc._id === change.docId); if (ind > -1) { docs[ind] = change.doc; } else { console.error('[realtime listener] internal non-fatal server error: unexpected queueType update event where no doc is associated.'); } break; } } if (i === len - 1 || (allChangeEvents[i + 1] && allChangeEvents[i + 1].id !== change.id)) { const docsSnapshot = [...docs]; const docChanges = allChangeEvents .slice(0, i + 1) .filter(c => c.id === change.id); this.sessionInfo.currentEventId = change.id; this.sessionInfo.currentDocs = docs; const snapshot = new snapshot_1.Snapshot({ id: change.id, docChanges, docs: docsSnapshot, msgType }); this.listener.onChange(snapshot); } } else { console.warn(`[realtime listener] event received is out of order, cur ${this.sessionInfo.currentEventId} but got ${change.id}`); await this.rebuildWatch(); return; } } } _postHandleServerEventsValidityCheck(msg) { if (!this.sessionInfo) { console.error('[realtime listener] internal non-fatal error: sessionInfo lost after server event handling, this should never occur'); return; } if (this.sessionInfo.expectEventId && this.sessionInfo.currentEventId >= this.sessionInfo.expectEventId) { this.clearWaitExpectedEvent(); } if (this.sessionInfo.currentEventId < msg.msgData.currEvent) { console.warn('[realtime listener] internal non-fatal error: client eventId does not match with server event id after server event handling'); return; } } clearWaitExpectedEvent() { if (this._waitExpectedTimeoutId) { clearTimeout(this._waitExpectedTimeoutId); this._waitExpectedTimeoutId = undefined; } } onMessage(msg) { switch (this.watchStatus) { case WATCH_STATUS.PAUSED: { if (msg.msgType !== 'ERROR') { return; } break; } case WATCH_STATUS.LOGGINGIN: case WATCH_STATUS.INITING: case WATCH_STATUS.REBUILDING: { console.warn(`[realtime listener] internal non-fatal error: unexpected message received while ${this.watchStatus}`); return; } case WATCH_STATUS.CLOSED: { console.warn('[realtime listener] internal non-fatal error: unexpected message received when the watch has closed'); return; } case WATCH_STATUS.ERRORED: { console.warn('[realtime listener] internal non-fatal error: unexpected message received when the watch has ended with error'); return; } } if (!this.sessionInfo) { console.warn('[realtime listener] internal non-fatal error: sessionInfo not found while message is received.'); return; } this.scheduleSendACK(); switch (msg.msgType) { case 'NEXT_EVENT': { console.warn(`nextevent ${msg.msgData.currEvent} ignored`, msg); this.handleServerEvents(msg); break; } case 'CHECK_EVENT': { if (this.sessionInfo.currentEventId < msg.msgData.currEvent) { this.sessionInfo.expectEventId = msg.msgData.currEvent; this.clearWaitExpectedEvent(); this._waitExpectedTimeoutId = setTimeout(() => { this.rebuildWatch(); }, this.getWaitExpectedTimeoutLength()); console.log(`[realtime] waitExpectedTimeoutLength ${this.getWaitExpectedTimeoutLength()}`); } break; } case 'ERROR': { this.closeWithError(new error_1.CloudSDKError({ errCode: error_config_1.ERR_CODE.SDK_DATABASE_REALTIME_LISTENER_SERVER_ERROR_MSG, errMsg: `${msg.msgData.code} - ${msg.msgData.message}` })); break; } default: { console.warn(`[realtime listener] virtual client receive unexpected msg ${msg.msgType}: `, msg); break; } } } closeWithError(error) { this.watchStatus = WATCH_STATUS.ERRORED; this.clearACKSchedule(); this.listener.onError(error); this.onWatchClose(this, (this.sessionInfo && this.sessionInfo.queryID) || ''); console.log(`[realtime] client closed (${this.collectionName} ${this.query}) (watchId ${this.watchId})`); } pause() { this.watchStatus = WATCH_STATUS.PAUSED; console.log(`[realtime] client paused (${this.collectionName} ${this.query}) (watchId ${this.watchId})`); } async resume() { this.watchStatus = WATCH_STATUS.RESUMING; console.log(`[realtime] client resuming with ${this.sessionInfo ? 'REBUILD_WATCH' : 'INIT_WATCH'} (${this.collectionName} ${this.query}) (${this.watchId})`); try { await (this.sessionInfo ? this.rebuildWatch() : this.initWatch()); console.log(`[realtime] client successfully resumed (${this.collectionName} ${this.query}) (${this.watchId})`); } catch (e) { console.error(`[realtime] client resume failed (${this.collectionName} ${this.query}) (${this.watchId})`, e); } } } exports.VirtualWebSocketClient = VirtualWebSocketClient; function getPublicEvent(event) { const e = { id: event.ID, dataType: event.DataType, queueType: event.QueueType, docId: event.DocID, doc: event.Doc && event.Doc !== '{}' ? JSON.parse(event.Doc) : undefined }; if (event.DataType === 'update') { if (event.UpdatedFields) { e.updatedFields = JSON.parse(event.UpdatedFields); } if (event.removedFields || event.RemovedFields) { e.removedFields = JSON.parse(event.removedFields); } } return e; }