diff --git a/libtextsecure/test/fake_web_api.js b/libtextsecure/test/fake_web_api.js index 0d4b20270..001fae403 100644 --- a/libtextsecure/test/fake_web_api.js +++ b/libtextsecure/test/fake_web_api.js @@ -14,15 +14,9 @@ const fakeAPI = { getAvatar: fakeCall, getDevices: fakeCall, // getKeysForIdentifier : fakeCall, - getMessageSocket: async () => ({ - on() {}, - removeListener() {}, - close() {}, - sendBytes() {}, - }), getMyKeys: fakeCall, getProfile: fakeCall, - getProvisioningSocket: fakeCall, + getProvisioningResource: fakeCall, putAttachment: fakeCall, registerKeys: fakeCall, requestVerificationSMS: fakeCall, diff --git a/libtextsecure/test/task_with_timeout_test.js b/libtextsecure/test/task_with_timeout_test.js deleted file mode 100644 index a5e4b5f8e..000000000 --- a/libtextsecure/test/task_with_timeout_test.js +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright 2017-2020 Signal Messenger, LLC -// SPDX-License-Identifier: AGPL-3.0-only - -/* global textsecure */ - -describe('createTaskWithTimeout', () => { - it('resolves when promise resolves', () => { - const task = () => Promise.resolve('hi!'); - const taskWithTimeout = textsecure.createTaskWithTimeout(task); - - return taskWithTimeout().then(result => { - assert.strictEqual(result, 'hi!'); - }); - }); - it('flows error from promise back', () => { - const error = new Error('original'); - const task = () => Promise.reject(error); - const taskWithTimeout = textsecure.createTaskWithTimeout(task); - - return taskWithTimeout().catch(flowedError => { - assert.strictEqual(error, flowedError); - }); - }); - it('rejects if promise takes too long (this one logs error to console)', () => { - let complete = false; - const task = () => - new Promise(resolve => { - setTimeout(() => { - complete = true; - resolve(); - }, 3000); - }); - const taskWithTimeout = textsecure.createTaskWithTimeout(task, this.name, { - timeout: 10, - }); - - return taskWithTimeout().then( - () => { - throw new Error('it was not supposed to resolve!'); - }, - () => { - assert.strictEqual(complete, false); - } - ); - }); - it('resolves if task returns something falsey', () => { - const task = () => {}; - const taskWithTimeout = textsecure.createTaskWithTimeout(task); - return taskWithTimeout(); - }); - it('resolves if task returns a non-promise', () => { - const task = () => 'hi!'; - const taskWithTimeout = textsecure.createTaskWithTimeout(task); - return taskWithTimeout().then(result => { - assert.strictEqual(result, 'hi!'); - }); - }); - it('rejects if task throws (and does not log about taking too long)', () => { - const error = new Error('Task is throwing!'); - const task = () => { - throw error; - }; - const taskWithTimeout = textsecure.createTaskWithTimeout(task, this.name, { - timeout: 10, - }); - return taskWithTimeout().then( - () => { - throw new Error('Overall task should reject!'); - }, - flowedError => { - assert.strictEqual(flowedError, error); - } - ); - }); -}); diff --git a/preload.js b/preload.js index 6e93efcc1..cee9803e8 100644 --- a/preload.js +++ b/preload.js @@ -107,7 +107,7 @@ try { let connectStartTime = 0; - window.logMessageReceiverConnect = () => { + window.logAuthenticatedConnect = () => { if (connectStartTime === 0) { connectStartTime = Date.now(); } diff --git a/ts/background.ts b/ts/background.ts index 58daf9ad6..8b9bec1c4 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -135,10 +135,12 @@ export async function startApp(): Promise { // Initialize WebAPI as early as possible let server: WebAPIType | undefined; + let messageReceiver: MessageReceiver | undefined; window.storage.onready(() => { server = window.WebAPI.connect( window.textsecure.storage.user.getWebAPICredentials() ); + window.textsecure.server = server; window.textsecure.storage.user.on('credentialsChange', async () => { strictAssert(server !== undefined, 'WebAPI not ready'); @@ -150,6 +152,122 @@ export async function startApp(): Promise { initializeAllJobQueues({ server, }); + + window.log.info('Initializing MessageReceiver'); + messageReceiver = new MessageReceiver({ + server, + storage: window.storage, + serverTrustRoot: window.getServerTrustRoot(), + }); + + // eslint-disable-next-line no-inner-declarations + function queuedEventListener>( + handler: (...args: Args) => Promise | void, + track = true + ): (...args: Args) => void { + return (...args: Args): void => { + eventHandlerQueue.add(async () => { + try { + await handler(...args); + } finally { + // message/sent: Message.handleDataMessage has its own queue and will + // trigger this event itself when complete. + // error: Error processing (below) also has its own queue and self-trigger. + if (track) { + window.Whisper.events.trigger('incrementProgress'); + } + } + }); + }; + } + + messageReceiver.addEventListener( + 'message', + queuedEventListener(onMessageReceived, false) + ); + messageReceiver.addEventListener( + 'delivery', + queuedEventListener(onDeliveryReceipt) + ); + messageReceiver.addEventListener( + 'contact', + queuedEventListener(onContactReceived) + ); + messageReceiver.addEventListener( + 'contactSync', + queuedEventListener(onContactSyncComplete) + ); + messageReceiver.addEventListener( + 'group', + queuedEventListener(onGroupReceived) + ); + messageReceiver.addEventListener( + 'groupSync', + queuedEventListener(onGroupSyncComplete) + ); + messageReceiver.addEventListener( + 'sent', + queuedEventListener(onSentMessage, false) + ); + messageReceiver.addEventListener( + 'readSync', + queuedEventListener(onReadSync) + ); + messageReceiver.addEventListener( + 'read', + queuedEventListener(onReadReceipt) + ); + messageReceiver.addEventListener( + 'view', + queuedEventListener(onViewReceipt) + ); + messageReceiver.addEventListener( + 'verified', + queuedEventListener(onVerified) + ); + messageReceiver.addEventListener( + 'error', + queuedEventListener(onError, false) + ); + messageReceiver.addEventListener( + 'decryption-error', + queuedEventListener(onDecryptionError) + ); + messageReceiver.addEventListener( + 'retry-request', + queuedEventListener(onRetryRequest) + ); + messageReceiver.addEventListener('empty', queuedEventListener(onEmpty)); + messageReceiver.addEventListener( + 'reconnect', + queuedEventListener(onReconnect) + ); + messageReceiver.addEventListener( + 'configuration', + queuedEventListener(onConfiguration) + ); + messageReceiver.addEventListener('typing', queuedEventListener(onTyping)); + messageReceiver.addEventListener( + 'sticker-pack', + queuedEventListener(onStickerPack) + ); + messageReceiver.addEventListener( + 'viewOnceOpenSync', + queuedEventListener(onViewOnceOpenSync) + ); + messageReceiver.addEventListener( + 'messageRequestResponse', + queuedEventListener(onMessageRequestResponse) + ); + messageReceiver.addEventListener( + 'profileKeyUpdate', + queuedEventListener(onProfileKeyUpdate) + ); + messageReceiver.addEventListener( + 'fetchLatest', + queuedEventListener(onFetchLatestSync) + ); + messageReceiver.addEventListener('keys', queuedEventListener(onKeysSync)); }); ourProfileKeyService.initialize(window.storage); @@ -378,16 +496,11 @@ export async function startApp(): Promise { window.getAccountManager().refreshPreKeys(); }); - let messageReceiver: MessageReceiver | undefined; - let preMessageReceiverStatus: SocketStatus | undefined; window.getSocketStatus = () => { - if (messageReceiver) { - return messageReceiver.getStatus(); + if (server === undefined) { + return SocketStatus.CLOSED; } - if (preMessageReceiverStatus) { - return preMessageReceiverStatus; - } - return SocketStatus.CLOSED; + return server.getSocketStatus(); }; let accountManager: typeof window.textsecure.AccountManager; window.getAccountManager = () => { @@ -638,15 +751,15 @@ export async function startApp(): Promise { // Stop processing incoming messages if (messageReceiver) { + strictAssert( + server !== undefined, + 'WebAPI should be initialized together with MessageReceiver' + ); + server.unregisterRequestHandler(messageReceiver); await messageReceiver.stopProcessing(); await window.waitForAllBatchers(); } - if (messageReceiver) { - messageReceiver.unregisterBatchers(); - messageReceiver = undefined; - } - // A number of still-to-queue database queries might be waiting inside batchers. // We wait for these to empty first, and then shut down the data interface. await Promise.all([ @@ -1658,27 +1771,21 @@ export async function startApp(): Promise { window.Whisper.events.on('powerMonitorResume', () => { window.log.info('powerMonitor: resume'); - if (!messageReceiver) { - return; - } - - messageReceiver.checkSocket(); + server?.checkSockets(); }); const reconnectToWebSocketQueue = new LatestQueue(); const enqueueReconnectToWebSocket = () => { reconnectToWebSocketQueue.add(async () => { - if (!messageReceiver) { - window.log.info( - 'reconnectToWebSocket: No messageReceiver. Early return.' - ); + if (!server) { + window.log.info('reconnectToWebSocket: No server. Early return.'); return; } window.log.info('reconnectToWebSocket starting...'); - await disconnect(); - connect(); + await server.onOffline(); + await server.onOnline(); window.log.info('reconnectToWebSocket complete.'); }); }; @@ -1688,6 +1795,8 @@ export async function startApp(): Promise { window._.debounce(enqueueReconnectToWebSocket, 1000, { maxWait: 5000 }) ); + window.Whisper.events.on('unlinkAndDisconnect', unlinkAndDisconnect); + function runStorageService() { window.Signal.Services.enableStorageService(); @@ -2011,8 +2120,13 @@ export async function startApp(): Promise { disconnectTimer = undefined; AttachmentDownloads.stop(); - if (messageReceiver) { - await messageReceiver.close(); + if (server !== undefined) { + strictAssert( + messageReceiver !== undefined, + 'WebAPI should be initialized together with MessageReceiver' + ); + await server.onOffline(); + await messageReceiver.drain(); } } @@ -2056,19 +2170,6 @@ export async function startApp(): Promise { return; } - preMessageReceiverStatus = SocketStatus.CONNECTING; - - if (messageReceiver) { - await messageReceiver.stopProcessing(); - - await window.waitForAllBatchers(); - } - - if (messageReceiver) { - messageReceiver.unregisterBatchers(); - messageReceiver = undefined; - } - window.textsecure.messaging = new window.textsecure.MessageSender(server); if (connectCount === 0) { @@ -2115,132 +2216,20 @@ export async function startApp(): Promise { window.Whisper.deliveryReceiptQueue.pause(); window.Whisper.Notifications.disable(); - // initialize the socket and start listening for messages - window.log.info('Initializing socket and listening for messages'); - const messageReceiverOptions = { - serverTrustRoot: window.getServerTrustRoot(), - }; - messageReceiver = new window.textsecure.MessageReceiver( - server, - messageReceiverOptions - ); - window.textsecure.messageReceiver = messageReceiver; - window.Signal.Services.initializeGroupCredentialFetcher(); - preMessageReceiverStatus = undefined; + strictAssert(server !== undefined, 'WebAPI not initialized'); + strictAssert( + messageReceiver !== undefined, + 'MessageReceiver not initialized' + ); + messageReceiver.reset(); + server.registerRequestHandler(messageReceiver); - // eslint-disable-next-line no-inner-declarations - function queuedEventListener>( - handler: (...args: Args) => Promise | void, - track = true - ): (...args: Args) => void { - return (...args: Args): void => { - eventHandlerQueue.add(async () => { - try { - await handler(...args); - } finally { - // message/sent: Message.handleDataMessage has its own queue and will - // trigger this event itself when complete. - // error: Error processing (below) also has its own queue and self-trigger. - if (track) { - window.Whisper.events.trigger('incrementProgress'); - } - } - }); - }; - } - - messageReceiver.addEventListener( - 'message', - queuedEventListener(onMessageReceived, false) - ); - messageReceiver.addEventListener( - 'delivery', - queuedEventListener(onDeliveryReceipt) - ); - messageReceiver.addEventListener( - 'contact', - queuedEventListener(onContactReceived) - ); - messageReceiver.addEventListener( - 'contactSync', - queuedEventListener(onContactSyncComplete) - ); - messageReceiver.addEventListener( - 'group', - queuedEventListener(onGroupReceived) - ); - messageReceiver.addEventListener( - 'groupSync', - queuedEventListener(onGroupSyncComplete) - ); - messageReceiver.addEventListener( - 'sent', - queuedEventListener(onSentMessage, false) - ); - messageReceiver.addEventListener( - 'readSync', - queuedEventListener(onReadSync) - ); - messageReceiver.addEventListener( - 'read', - queuedEventListener(onReadReceipt) - ); - messageReceiver.addEventListener( - 'view', - queuedEventListener(onViewReceipt) - ); - messageReceiver.addEventListener( - 'verified', - queuedEventListener(onVerified) - ); - messageReceiver.addEventListener( - 'error', - queuedEventListener(onError, false) - ); - messageReceiver.addEventListener( - 'decryption-error', - queuedEventListener(onDecryptionError) - ); - messageReceiver.addEventListener( - 'retry-request', - queuedEventListener(onRetryRequest) - ); - messageReceiver.addEventListener('empty', queuedEventListener(onEmpty)); - messageReceiver.addEventListener( - 'reconnect', - queuedEventListener(onReconnect) - ); - messageReceiver.addEventListener( - 'configuration', - queuedEventListener(onConfiguration) - ); - messageReceiver.addEventListener('typing', queuedEventListener(onTyping)); - messageReceiver.addEventListener( - 'sticker-pack', - queuedEventListener(onStickerPack) - ); - messageReceiver.addEventListener( - 'viewOnceOpenSync', - queuedEventListener(onViewOnceOpenSync) - ); - messageReceiver.addEventListener( - 'messageRequestResponse', - queuedEventListener(onMessageRequestResponse) - ); - messageReceiver.addEventListener( - 'profileKeyUpdate', - queuedEventListener(onProfileKeyUpdate) - ); - messageReceiver.addEventListener( - 'fetchLatest', - queuedEventListener(onFetchLatestSync) - ); - messageReceiver.addEventListener('keys', queuedEventListener(onKeysSync)); + // If coming here after `offline` event - connect again. + await server.onOnline(); AttachmentDownloads.start({ - getMessageReceiver: () => messageReceiver, logger: window.log, }); @@ -3511,16 +3500,12 @@ export async function startApp(): Promise { window.Whisper.events.trigger('unauthorized'); if (messageReceiver) { + strictAssert(server !== undefined, 'WebAPI not initialized'); + server.unregisterRequestHandler(messageReceiver); await messageReceiver.stopProcessing(); - await window.waitForAllBatchers(); } - if (messageReceiver) { - messageReceiver.unregisterBatchers(); - messageReceiver = undefined; - } - onEmpty(); window.log.warn( diff --git a/ts/groups.ts b/ts/groups.ts index 71758bdad..965a94a5a 100644 --- a/ts/groups.ts +++ b/ts/groups.ts @@ -1689,8 +1689,8 @@ export async function createGroupV2({ } ); - await conversation.queueJob('storageServiceUploadJob', () => { - window.Signal.Services.storageServiceUploadJob(); + await conversation.queueJob('storageServiceUploadJob', async () => { + await window.Signal.Services.storageServiceUploadJob(); }); const timestamp = Date.now(); diff --git a/ts/groups/joinViaLink.ts b/ts/groups/joinViaLink.ts index ff72c83a5..42193779e 100644 --- a/ts/groups/joinViaLink.ts +++ b/ts/groups/joinViaLink.ts @@ -14,6 +14,7 @@ import { import * as Bytes from '../Bytes'; import { longRunningTaskWrapper } from '../util/longRunningTaskWrapper'; import { isGroupV1 } from '../util/whatTypeOfConversation'; +import { explodePromise } from '../util/explodePromise'; import type { ConversationAttributesType } from '../model-types.d'; import type { ConversationModel } from '../models/conversations'; @@ -175,7 +176,7 @@ export async function joinViaLink(hash: string): Promise { }; // Explode a promise so we know when this whole join process is complete - const { promise, resolve, reject } = explodePromise(); + const { promise, resolve, reject } = explodePromise(); const closeDialog = async () => { try { @@ -405,26 +406,3 @@ function showErrorDialog(description: string, title: string) { }, }); } - -function explodePromise(): { - promise: Promise; - resolve: () => void; - reject: (error: Error) => void; -} { - let resolve: () => void; - let reject: (error: Error) => void; - - const promise = new Promise((innerResolve, innerReject) => { - resolve = innerResolve; - reject = innerReject; - }); - - return { - promise, - // Typescript thinks that resolve and reject can be undefined here. - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - resolve: resolve!, - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - reject: reject!, - }; -} diff --git a/ts/messageModifiers/AttachmentDownloads.ts b/ts/messageModifiers/AttachmentDownloads.ts index 0a0f5bc26..2f0680703 100644 --- a/ts/messageModifiers/AttachmentDownloads.ts +++ b/ts/messageModifiers/AttachmentDownloads.ts @@ -1,13 +1,12 @@ // Copyright 2019-2021 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -import { isFunction, isNumber, omit } from 'lodash'; +import { isNumber, omit } from 'lodash'; import { v4 as getGuid } from 'uuid'; import dataInterface from '../sql/Client'; import { downloadAttachment } from '../util/downloadAttachment'; import { stringFromBytes } from '../Crypto'; -import MessageReceiver from '../textsecure/MessageReceiver'; import { AttachmentDownloadJobType, AttachmentDownloadJobTypeType, @@ -42,7 +41,6 @@ const RETRY_BACKOFF: Record = { let enabled = false; let timeout: NodeJS.Timeout | null; -let getMessageReceiver: () => MessageReceiver | undefined; let logger: LoggerType; const _activeAttachmentDownloadJobs: Record< string, @@ -50,17 +48,11 @@ const _activeAttachmentDownloadJobs: Record< > = {}; type StartOptionsType = { - getMessageReceiver: () => MessageReceiver | undefined; logger: LoggerType; }; export async function start(options: StartOptionsType): Promise { - ({ getMessageReceiver, logger } = options); - if (!isFunction(getMessageReceiver)) { - throw new Error( - 'attachment_downloads/start: getMessageReceiver must be a function' - ); - } + ({ logger } = options); if (!logger) { throw new Error('attachment_downloads/start: logger must be provided!'); } @@ -220,11 +212,6 @@ async function _runJob(job?: AttachmentDownloadJobType): Promise { const pending = true; await setAttachmentDownloadJobPending(id, pending); - const messageReceiver = getMessageReceiver(); - if (!messageReceiver) { - throw new Error('_runJob: messageReceiver not found'); - } - const downloaded = await downloadAttachment(attachment); if (!downloaded) { diff --git a/ts/messageModifiers/Reactions.ts b/ts/messageModifiers/Reactions.ts index 6f815af83..80a9c813a 100644 --- a/ts/messageModifiers/Reactions.ts +++ b/ts/messageModifiers/Reactions.ts @@ -6,17 +6,9 @@ import { Collection, Model } from 'backbone'; import { MessageModel } from '../models/messages'; import { isOutgoing } from '../state/selectors/message'; +import { ReactionAttributesType } from '../model-types.d'; -type ReactionsAttributesType = { - emoji: string; - remove: boolean; - targetAuthorUuid: string; - targetTimestamp: number; - timestamp: number; - fromId: string; -}; - -export class ReactionModel extends Model {} +export class ReactionModel extends Model {} let singleton: Reactions | undefined; @@ -63,7 +55,7 @@ export class Reactions extends Collection { async onReaction( reaction: ReactionModel - ): Promise { + ): Promise { try { // The conversation the target message was in; we have to find it in the database // to to figure that out. diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index 5b4e5b9a5..e743dbafc 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -16,6 +16,7 @@ import { AttachmentType } from '../types/Attachment'; import { CallMode, CallHistoryDetailsType } from '../types/Calling'; import * as Stickers from '../types/Stickers'; import { GroupV2InfoType } from '../textsecure/SendMessage'; +import createTaskWithTimeout from '../textsecure/TaskWithTimeout'; import { CallbackResultType } from '../textsecure/Types.d'; import { ConversationType } from '../state/ducks/conversations'; import { @@ -1058,7 +1059,7 @@ export class ConversationModel extends window.Backbone this.setRegistered(); // If we couldn't apply universal timer before - try it again. - this.queueJob('maybeSetPendingUniversalTimer', () => + this.queueJob('maybeSetPendingUniversalTimer', async () => this.maybeSetPendingUniversalTimer() ); } @@ -2893,13 +2894,10 @@ export class ConversationModel extends window.Backbone return null; } - queueJob( - name: string, - callback: () => unknown | Promise - ): Promise { + queueJob(name: string, callback: () => Promise): Promise { this.jobQueue = this.jobQueue || new window.PQueue({ concurrency: 1 }); - const taskWithTimeout = window.textsecure.createTaskWithTimeout( + const taskWithTimeout = createTaskWithTimeout( callback, `conversation ${this.idForLogging()}` ); @@ -3231,7 +3229,7 @@ export class ConversationModel extends window.Backbone // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const destination = this.getSendTarget()!; - return this.queueJob('sendDeleteForEveryone', async () => { + await this.queueJob('sendDeleteForEveryone', async () => { window.log.info( 'Sending deleteForEveryone to conversation', this.idForLogging(), @@ -3782,7 +3780,7 @@ export class ConversationModel extends window.Backbone return; } - this.queueJob('maybeSetPendingUniversalTimer', () => + this.queueJob('maybeSetPendingUniversalTimer', async () => this.maybeSetPendingUniversalTimer() ); @@ -4804,7 +4802,7 @@ export class ConversationModel extends window.Backbone ); this.set({ needsStorageServiceSync: true }); - this.queueJob('captureChange', () => { + this.queueJob('captureChange', async () => { Services.storageServiceUploadJob(); }); } diff --git a/ts/test-both/TaskWithTimeout_test.ts b/ts/test-both/TaskWithTimeout_test.ts new file mode 100644 index 000000000..802255ee2 --- /dev/null +++ b/ts/test-both/TaskWithTimeout_test.ts @@ -0,0 +1,47 @@ +// Copyright 2017-2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { sleep } from '../util/sleep'; +import createTaskWithTimeout from '../textsecure/TaskWithTimeout'; + +describe('createTaskWithTimeout', () => { + it('resolves when promise resolves', async () => { + const task = () => Promise.resolve('hi!'); + const taskWithTimeout = createTaskWithTimeout(task, 'test'); + + const result = await taskWithTimeout(); + assert.strictEqual(result, 'hi!'); + }); + + it('flows error from promise back', async () => { + const error = new Error('original'); + const task = () => Promise.reject(error); + const taskWithTimeout = createTaskWithTimeout(task, 'test'); + + await assert.isRejected(taskWithTimeout(), 'original'); + }); + + it('rejects if promise takes too long (this one logs error to console)', async () => { + const task = async () => { + await sleep(3000); + }; + const taskWithTimeout = createTaskWithTimeout(task, 'test', { + timeout: 10, + }); + + await assert.isRejected(taskWithTimeout()); + }); + + it('rejects if task throws (and does not log about taking too long)', async () => { + const error = new Error('Task is throwing!'); + const task = () => { + throw error; + }; + const taskWithTimeout = createTaskWithTimeout(task, 'test', { + timeout: 10, + }); + await assert.isRejected(taskWithTimeout(), 'Task is throwing!'); + }); +}); diff --git a/ts/test-both/util/AbortableProcess_test.ts b/ts/test-both/util/AbortableProcess_test.ts new file mode 100644 index 000000000..6492fc64b --- /dev/null +++ b/ts/test-both/util/AbortableProcess_test.ts @@ -0,0 +1,46 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import { noop } from 'lodash'; + +import { AbortableProcess } from '../../util/AbortableProcess'; + +describe('AbortableProcess', () => { + it('resolves the result normally', async () => { + const process = new AbortableProcess( + 'process', + { abort: noop }, + Promise.resolve(42) + ); + + assert.strictEqual(await process.getResult(), 42); + }); + + it('rejects normally', async () => { + const process = new AbortableProcess( + 'process', + { abort: noop }, + Promise.reject(new Error('rejected')) + ); + + await assert.isRejected(process.getResult(), 'rejected'); + }); + + it('rejects on abort', async () => { + let calledAbort = false; + const process = new AbortableProcess( + 'A', + { + abort() { + calledAbort = true; + }, + }, + new Promise(noop) + ); + + process.abort(); + await assert.isRejected(process.getResult(), 'Process "A" was aborted'); + assert.isTrue(calledAbort); + }); +}); diff --git a/ts/test-both/util/explodePromise_test.ts b/ts/test-both/util/explodePromise_test.ts new file mode 100644 index 000000000..151fa0971 --- /dev/null +++ b/ts/test-both/util/explodePromise_test.ts @@ -0,0 +1,26 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable no-restricted-syntax */ + +import { assert } from 'chai'; + +import { explodePromise } from '../../util/explodePromise'; + +describe('explodePromise', () => { + it('resolves the promise', async () => { + const { promise, resolve } = explodePromise(); + + resolve(42); + + assert.strictEqual(await promise, 42); + }); + + it('rejects the promise', async () => { + const { promise, reject } = explodePromise(); + + reject(new Error('rejected')); + + await assert.isRejected(promise, 'rejected'); + }); +}); diff --git a/ts/test-electron/MessageReceiver_test.ts b/ts/test-electron/MessageReceiver_test.ts index 1b30a2282..cd02fac80 100644 --- a/ts/test-electron/MessageReceiver_test.ts +++ b/ts/test-electron/MessageReceiver_test.ts @@ -6,12 +6,11 @@ */ import { assert } from 'chai'; -import EventEmitter from 'events'; -import { connection as WebSocket } from 'websocket'; import MessageReceiver from '../textsecure/MessageReceiver'; -import { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents'; +import { IncomingWebSocketRequest } from '../textsecure/WebsocketResources'; import { WebAPIType } from '../textsecure/WebAPI'; +import { DecryptionErrorEvent } from '../textsecure/messageReceiverEvents'; import { SignalService as Proto } from '../protobuf'; import * as Crypto from '../Crypto'; @@ -19,23 +18,16 @@ import * as Crypto from '../Crypto'; const FIXMEU8 = Uint8Array; describe('MessageReceiver', () => { - class FakeSocket extends EventEmitter { - public sendBytes(_: Uint8Array) {} - - public close() {} - } - const number = '+19999999999'; const uuid = 'aaaaaaaa-bbbb-4ccc-9ddd-eeeeeeeeeeee'; const deviceId = 1; describe('connecting', () => { it('generates decryption-error event when it cannot decrypt', done => { - const socket = new FakeSocket(); - - const messageReceiver = new MessageReceiver({} as WebAPIType, { + const messageReceiver = new MessageReceiver({ + server: {} as WebAPIType, + storage: window.storage, serverTrustRoot: 'AAAAAAAA', - socket: socket as WebSocket, }); const body = Proto.Envelope.encode({ @@ -47,15 +39,18 @@ describe('MessageReceiver', () => { content: new FIXMEU8(Crypto.getRandomBytes(200)), }).finish(); - const message = Proto.WebSocketMessage.encode({ - type: Proto.WebSocketMessage.Type.REQUEST, - request: { id: 1, verb: 'PUT', path: '/api/v1/message', body }, - }).finish(); - - socket.emit('message', { - type: 'binary', - binaryData: message, - }); + messageReceiver.handleRequest( + new IncomingWebSocketRequest( + { + id: 1, + verb: 'PUT', + path: '/api/v1/message', + body, + headers: [], + }, + (_: Buffer): void => {} + ) + ); messageReceiver.addEventListener( 'decryption-error', diff --git a/ts/test-electron/WebsocketResources_test.ts b/ts/test-electron/WebsocketResources_test.ts index fb7d0d27b..a95d74785 100644 --- a/ts/test-electron/WebsocketResources_test.ts +++ b/ts/test-electron/WebsocketResources_test.ts @@ -85,7 +85,7 @@ describe('WebSocket-Resource', () => { }); }); - it('sends requests and receives responses', done => { + it('sends requests and receives responses', async () => { // mock socket and request handler let requestId: number | Long | undefined; const socket = new FakeSocket(); @@ -101,16 +101,10 @@ describe('WebSocket-Resource', () => { // actual test const resource = new WebSocketResource(socket as WebSocket); - resource.sendRequest({ + const promise = resource.sendRequest({ verb: 'PUT', path: '/some/path', body: new Uint8Array([1, 2, 3]), - error: done, - success(message: string, status: number) { - assert.strictEqual(message, 'OK'); - assert.strictEqual(status, 200); - done(); - }, }); // mock socket response @@ -121,6 +115,10 @@ describe('WebSocket-Resource', () => { response: { id: requestId, message: 'OK', status: 200 }, }).finish(), }); + + const { status, message } = await promise; + assert.strictEqual(message, 'OK'); + assert.strictEqual(status, 200); }); }); diff --git a/ts/textsecure.d.ts b/ts/textsecure.d.ts index b81401db2..5417441ac 100644 --- a/ts/textsecure.d.ts +++ b/ts/textsecure.d.ts @@ -4,7 +4,6 @@ import { UnidentifiedSenderMessageContent } from '@signalapp/signal-client'; import Crypto from './textsecure/Crypto'; -import MessageReceiver from './textsecure/MessageReceiver'; import MessageSender from './textsecure/SendMessage'; import SyncRequest from './textsecure/SyncRequest'; import EventTarget from './textsecure/EventTarget'; @@ -38,20 +37,14 @@ export type UnprocessedType = { export { StorageServiceCallOptionsType, StorageServiceCredentials }; export type TextSecureType = { - createTaskWithTimeout: ( - task: () => Promise | any, - id?: string, - options?: { timeout?: number } - ) => () => Promise; crypto: typeof Crypto; storage: Storage; - messageReceiver: MessageReceiver; + server: WebAPIType; messageSender: MessageSender; messaging: SendMessage; utils: typeof utils; EventTarget: typeof EventTarget; - MessageReceiver: typeof MessageReceiver; AccountManager: WhatIsThis; MessageSender: typeof MessageSender; SyncRequest: typeof SyncRequest; diff --git a/ts/textsecure/AccountManager.ts b/ts/textsecure/AccountManager.ts index 0922d31c8..c2cc0a7d2 100644 --- a/ts/textsecure/AccountManager.ts +++ b/ts/textsecure/AccountManager.ts @@ -13,9 +13,8 @@ import { WebAPIType } from './WebAPI'; import { KeyPairType, CompatSignedPreKeyType } from './Types.d'; import utils from './Helpers'; import ProvisioningCipher from './ProvisioningCipher'; -import WebSocketResource, { - IncomingWebSocketRequest, -} from './WebsocketResources'; +import { IncomingWebSocketRequest } from './WebsocketResources'; +import createTaskWithTimeout from './TaskWithTimeout'; import * as Bytes from '../Bytes'; import { deriveAccessKey, @@ -192,112 +191,102 @@ export default class AccountManager extends EventTarget { SIGNED_KEY_GEN_BATCH_SIZE, progressCallback ); - const confirmKeys = this.confirmKeys.bind(this); - const registrationDone = this.registrationDone.bind(this); - const registerKeys = this.server.registerKeys.bind(this.server); - const getSocket = this.server.getProvisioningSocket.bind(this.server); - const queueTask = this.queueTask.bind(this); const provisioningCipher = new ProvisioningCipher(); - let gotProvisionEnvelope = false; const pubKey = await provisioningCipher.getPublicKey(); - const socket = await getSocket(); + let envelopeCallbacks: + | { + resolve(data: Proto.ProvisionEnvelope): void; + reject(error: Error): void; + } + | undefined; + const envelopePromise = new Promise( + (resolve, reject) => { + envelopeCallbacks = { resolve, reject }; + } + ); + + const wsr = await this.server.getProvisioningResource({ + handleRequest(request: IncomingWebSocketRequest) { + if ( + request.path === '/v1/address' && + request.verb === 'PUT' && + request.body + ) { + const proto = Proto.ProvisioningUuid.decode(request.body); + const { uuid } = proto; + if (!uuid) { + throw new Error('registerSecondDevice: expected a UUID'); + } + const url = getProvisioningUrl(uuid, pubKey); + + if (window.CI) { + window.CI.setProvisioningURL(url); + } + + setProvisioningUrl(url); + request.respond(200, 'OK'); + } else if ( + request.path === '/v1/message' && + request.verb === 'PUT' && + request.body + ) { + const envelope = Proto.ProvisionEnvelope.decode(request.body); + request.respond(200, 'OK'); + wsr.close(); + envelopeCallbacks?.resolve(envelope); + } else { + window.log.error('Unknown websocket message', request.path); + } + }, + }); window.log.info('provisioning socket open'); - return new Promise((resolve, reject) => { - socket.on('close', (code, reason) => { - window.log.info( - `provisioning socket closed. Code: ${code} Reason: ${reason}` + wsr.addEventListener('close', ({ code, reason }) => { + window.log.info( + `provisioning socket closed. Code: ${code} Reason: ${reason}` + ); + + // Note: if we have resolved the envelope already - this has no effect + envelopeCallbacks?.reject(new Error('websocket closed')); + }); + + const envelope = await envelopePromise; + const provisionMessage = await provisioningCipher.decrypt(envelope); + + await this.queueTask(async () => { + const deviceName = await confirmNumber(provisionMessage.number); + if (typeof deviceName !== 'string' || deviceName.length === 0) { + throw new Error( + 'AccountManager.registerSecondDevice: Invalid device name' ); - if (!gotProvisionEnvelope) { - reject(new Error('websocket closed')); - } - }); + } + if ( + !provisionMessage.number || + !provisionMessage.provisioningCode || + !provisionMessage.identityKeyPair + ) { + throw new Error( + 'AccountManager.registerSecondDevice: Provision message was missing key data' + ); + } - const wsr = new WebSocketResource(socket, { - keepalive: { path: '/v1/keepalive/provisioning' }, - handleRequest(request: IncomingWebSocketRequest) { - if ( - request.path === '/v1/address' && - request.verb === 'PUT' && - request.body - ) { - const proto = Proto.ProvisioningUuid.decode(request.body); - const { uuid } = proto; - if (!uuid) { - throw new Error('registerSecondDevice: expected a UUID'); - } - const url = getProvisioningUrl(uuid, pubKey); - - if (window.CI) { - window.CI.setProvisioningURL(url); - } - - setProvisioningUrl(url); - request.respond(200, 'OK'); - } else if ( - request.path === '/v1/message' && - request.verb === 'PUT' && - request.body - ) { - const envelope = Proto.ProvisionEnvelope.decode(request.body); - request.respond(200, 'OK'); - gotProvisionEnvelope = true; - wsr.close(); - resolve( - provisioningCipher - .decrypt(envelope) - .then(async provisionMessage => - queueTask(async () => - confirmNumber(provisionMessage.number).then( - async deviceName => { - if ( - typeof deviceName !== 'string' || - deviceName.length === 0 - ) { - throw new Error( - 'AccountManager.registerSecondDevice: Invalid device name' - ); - } - if ( - !provisionMessage.number || - !provisionMessage.provisioningCode || - !provisionMessage.identityKeyPair - ) { - throw new Error( - 'AccountManager.registerSecondDevice: Provision message was missing key data' - ); - } - - return createAccount( - provisionMessage.number, - provisionMessage.provisioningCode, - provisionMessage.identityKeyPair, - provisionMessage.profileKey, - deviceName, - provisionMessage.userAgent, - provisionMessage.readReceipts, - { uuid: provisionMessage.uuid } - ) - .then(clearSessionsAndPreKeys) - .then(generateKeys) - .then(async (keys: GeneratedKeysType) => - registerKeys(keys).then(async () => - confirmKeys(keys) - ) - ) - .then(registrationDone); - } - ) - ) - ) - ); - } else { - window.log.error('Unknown websocket message', request.path); - } - }, - }); + await createAccount( + provisionMessage.number, + provisionMessage.provisioningCode, + provisionMessage.identityKeyPair, + provisionMessage.profileKey, + deviceName, + provisionMessage.userAgent, + provisionMessage.readReceipts, + { uuid: provisionMessage.uuid } + ); + await clearSessionsAndPreKeys(); + const keys = await generateKeys(); + await this.server.registerKeys(keys); + await this.confirmKeys(keys); + await this.registrationDone(); }); } @@ -416,7 +405,7 @@ export default class AccountManager extends EventTarget { async queueTask(task: () => Promise) { this.pendingQueue = this.pendingQueue || new PQueue({ concurrency: 1 }); - const taskWithTimeout = window.textsecure.createTaskWithTimeout(task); + const taskWithTimeout = createTaskWithTimeout(task, 'AccountManager task'); return this.pendingQueue.add(taskWithTimeout); } @@ -633,6 +622,11 @@ export default class AccountManager extends EventTarget { ); await window.textsecure.storage.put('regionCode', regionCode); await window.textsecure.storage.protocol.hydrateCaches(); + + // We are finally ready to reconnect + window.textsecure.storage.user.emitCredentialsChanged( + 'AccountManager.createAccount' + ); } async clearSessionsAndPreKeys() { diff --git a/ts/textsecure/Errors.ts b/ts/textsecure/Errors.ts index 408671983..bbc7e1049 100644 --- a/ts/textsecure/Errors.ts +++ b/ts/textsecure/Errors.ts @@ -13,6 +13,40 @@ function appendStack(newError: Error, originalError: Error) { newError.stack += `\nOriginal stack:\n${originalError.stack}`; } +export type HTTPErrorHeadersType = { + [name: string]: string | ReadonlyArray; +}; + +export class HTTPError extends Error { + public readonly name = 'HTTPError'; + + public readonly code: number; + + public readonly responseHeaders: HTTPErrorHeadersType; + + public readonly response: unknown; + + constructor( + message: string, + options: { + code: number; + headers: HTTPErrorHeadersType; + response?: unknown; + stack?: string; + } + ) { + super(`${message}; code: ${options.code}`); + + const { code: providedCode, headers, response, stack } = options; + + this.code = providedCode > 999 || providedCode < 100 ? -1 : providedCode; + this.responseHeaders = headers; + + this.stack += `\nOriginal stack:\n${stack}`; + this.response = response; + } +} + export class ReplayableError extends Error { name: string; diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 8c7e85ea8..0425ccb17 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -6,10 +6,9 @@ /* eslint-disable camelcase */ /* eslint-disable no-restricted-syntax */ -import { isNumber, map, omit } from 'lodash'; +import { isNumber, map } from 'lodash'; import PQueue from 'p-queue'; import { v4 as getGuid } from 'uuid'; -import { connection as WebSocket } from 'websocket'; import { DecryptionErrorMessage, @@ -38,42 +37,37 @@ import { SignedPreKeys, } from '../LibSignalStores'; import { verifySignature } from '../Curve'; -import { BackOff, FIBONACCI_TIMEOUTS } from '../util/BackOff'; import { strictAssert } from '../util/assert'; import { BatcherType, createBatcher } from '../util/batcher'; import { dropNull } from '../util/dropNull'; import { normalizeUuid } from '../util/normalizeUuid'; import { normalizeNumber } from '../util/normalizeNumber'; -import { sleep } from '../util/sleep'; import { parseIntOrThrow } from '../util/parseIntOrThrow'; import { Zone } from '../util/Zone'; +import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto'; +import { DownloadedAttachmentType } from '../types/Attachment'; +import * as Errors from '../types/errors'; + +import { SignalService as Proto } from '../protobuf'; +import { UnprocessedType } from '../textsecure.d'; +import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups'; + +import createTaskWithTimeout from './TaskWithTimeout'; import { processAttachment, processDataMessage } from './processDataMessage'; import { processSyncMessage } from './processSyncMessage'; import EventTarget, { EventHandler } from './EventTarget'; -import { WebAPIType } from './WebAPI'; -import WebSocketResource, { - IncomingWebSocketRequest, - CloseEvent, -} from './WebsocketResources'; -import { ConnectTimeoutError } from './Errors'; -import * as Bytes from '../Bytes'; -import Crypto from './Crypto'; -import { deriveMasterKeyFromGroupV1, typedArrayToArrayBuffer } from '../Crypto'; +import { downloadAttachment } from './downloadAttachment'; +import { IncomingWebSocketRequest } from './WebsocketResources'; import { ContactBuffer, GroupBuffer } from './ContactsParser'; -import { DownloadedAttachmentType } from '../types/Attachment'; -import * as Errors from '../types/errors'; -import * as MIME from '../types/MIME'; -import { SocketStatus } from '../types/SocketStatus'; - -import { SignalService as Proto } from '../protobuf'; - -import { UnprocessedType } from '../textsecure.d'; +import type { WebAPIType } from './WebAPI'; +import type { Storage } from './Storage'; +import * as Bytes from '../Bytes'; import { - ProcessedAttachment, ProcessedDataMessage, ProcessedSyncMessage, ProcessedSent, ProcessedEnvelope, + IRequestHandler, } from './Types.d'; import { ReconnectEvent, @@ -103,8 +97,6 @@ import { GroupSyncEvent, } from './messageReceiverEvents'; -import { deriveGroupFields, MASTER_KEY_LENGTH } from '../groups'; - // TODO: remove once we move away from ArrayBuffers const FIXMEU8 = Uint8Array; @@ -150,12 +142,18 @@ enum TaskType { Decrypted = 'Decrypted', } -export default class MessageReceiver extends EventTarget { - private _onClose?: (code: number, reason: string) => Promise; +export type MessageReceiverOptions = { + server: WebAPIType; + storage: Storage; + serverTrustRoot: string; +}; - private _onWSRClose?: (event: CloseEvent) => void; +export default class MessageReceiver + extends EventTarget + implements IRequestHandler { + private server: WebAPIType; - private _onError?: (error: Error) => Promise; + private storage: Storage; private appQueue: PQueue; @@ -163,22 +161,14 @@ export default class MessageReceiver extends EventTarget { private cacheRemoveBatcher: BatcherType; - private calledClose?: boolean; - private count: number; private processedCount: number; - private deviceId?: number; - - private hasConnected = false; - private incomingQueue: PQueue; private isEmptied?: boolean; - private number_id?: string; - private encryptedQueue: PQueue; private decryptedQueue: PQueue; @@ -187,38 +177,21 @@ export default class MessageReceiver extends EventTarget { private serverTrustRoot: Uint8Array; - private socket?: WebSocket; - - private socketStatus = SocketStatus.CLOSED; - private stoppingProcessing?: boolean; - private uuid_id?: string; - - private wsr?: WebSocketResource; - - private readonly reconnectBackOff = new BackOff(FIBONACCI_TIMEOUTS); - - constructor( - public readonly server: WebAPIType, - options: { - serverTrustRoot: string; - socket?: WebSocket; - } - ) { + constructor({ server, storage, serverTrustRoot }: MessageReceiverOptions) { super(); + this.server = server; + this.storage = storage; + this.count = 0; this.processedCount = 0; - if (!options.serverTrustRoot) { + if (!serverTrustRoot) { throw new Error('Server trust root is required!'); } - this.serverTrustRoot = Bytes.fromBase64(options.serverTrustRoot); - - this.number_id = window.textsecure.storage.user.getNumber(); - this.uuid_id = window.textsecure.storage.user.getUuid(); - this.deviceId = window.textsecure.storage.user.getDeviceId(); + this.serverTrustRoot = Bytes.fromBase64(serverTrustRoot); this.incomingQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); this.appQueue = new PQueue({ concurrency: 1, timeout: 1000 * 60 * 2 }); @@ -249,174 +222,124 @@ export default class MessageReceiver extends EventTarget { maxSize: 30, processBatch: this.cacheRemoveBatch.bind(this), }); - - this.connect(options.socket); - } - - public async stopProcessing(): Promise { - window.log.info('MessageReceiver: stopProcessing requested'); - this.stoppingProcessing = true; - return this.close(); - } - - public unregisterBatchers(): void { - window.log.info('MessageReceiver: unregister batchers'); - this.decryptAndCacheBatcher.unregister(); - this.cacheRemoveBatcher.unregister(); } public getProcessedCount(): number { return this.processedCount; } - private async connect(socket?: WebSocket): Promise { - if (this.calledClose) { + public handleRequest(request: IncomingWebSocketRequest): void { + // We do the message decryption here, instead of in the ordered pending queue, + // to avoid exposing the time it took us to process messages through the time-to-ack. + window.log.info('MessageReceiver: got request', request.verb, request.path); + if (request.path !== '/api/v1/message') { + request.respond(200, 'OK'); + + if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') { + this.incomingQueue.add(() => { + this.onEmpty(); + }); + } return; } + const job = async () => { + const headers = request.headers || []; + + if (!request.body) { + throw new Error( + 'MessageReceiver.handleRequest: request.body was falsey!' + ); + } + + const plaintext = request.body; + + try { + const decoded = Proto.Envelope.decode(plaintext); + const serverTimestamp = normalizeNumber(decoded.serverTimestamp); + + const envelope: ProcessedEnvelope = { + // Make non-private envelope IDs dashless so they don't get redacted + // from logs + id: getGuid().replace(/-/g, ''), + receivedAtCounter: window.Signal.Util.incrementMessageCounter(), + receivedAtDate: Date.now(), + // Calculate the message age (time on server). + messageAgeSec: this.calculateMessageAge(headers, serverTimestamp), + + // Proto.Envelope fields + type: decoded.type, + source: decoded.source, + sourceUuid: decoded.sourceUuid + ? normalizeUuid( + decoded.sourceUuid, + 'MessageReceiver.handleRequest.sourceUuid' + ) + : undefined, + sourceDevice: decoded.sourceDevice, + timestamp: normalizeNumber(decoded.timestamp), + legacyMessage: dropNull(decoded.legacyMessage), + content: dropNull(decoded.content), + serverGuid: decoded.serverGuid, + serverTimestamp, + }; + + // After this point, decoding errors are not the server's + // fault, and we should handle them gracefully and tell the + // user they received an invalid message + + if (envelope.source && this.isBlocked(envelope.source)) { + request.respond(200, 'OK'); + return; + } + + if (envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid)) { + request.respond(200, 'OK'); + return; + } + + this.decryptAndCache(envelope, plaintext, request); + this.processedCount += 1; + } catch (e) { + request.respond(500, 'Bad encrypted websocket message'); + window.log.error( + 'Error handling incoming message:', + Errors.toLogFormat(e) + ); + await this.dispatchAndWait(new ErrorEvent(e)); + } + }; + + this.incomingQueue.add(job); + } + + public reset(): void { // We always process our cache before processing a new websocket message this.incomingQueue.add(async () => this.queueAllCached()); this.count = 0; - if (this.hasConnected) { - this.dispatchEvent(new ReconnectEvent()); - } - this.isEmptied = false; - - this.hasConnected = true; - - if (this.socket && this.socket.connected) { - this.socket.close(); - this.socket = undefined; - if (this.wsr) { - this.wsr.close(); - this.wsr = undefined; - } - } - this.socketStatus = SocketStatus.CONNECTING; - - // initialize the socket and start listening for messages - try { - this.socket = socket || (await this.server.getMessageSocket()); - } catch (error) { - this.socketStatus = SocketStatus.CLOSED; - - if (error instanceof ConnectTimeoutError) { - await this.onclose(-1, 'Connection timed out'); - return; - } - - await this.dispatchAndWait(new ErrorEvent(error)); - return; - } - - this.socketStatus = SocketStatus.OPEN; - - window.log.info('websocket open'); - window.logMessageReceiverConnect(); - - if (!this._onClose) { - this._onClose = this.onclose.bind(this); - } - if (!this._onWSRClose) { - this._onWSRClose = ({ code, reason }: CloseEvent): void => { - this.onclose(code, reason); - }; - } - if (!this._onError) { - this._onError = this.onerror.bind(this); - } - - this.socket.on('close', this._onClose); - this.socket.on('error', this._onError); - - this.wsr = new WebSocketResource(this.socket, { - handleRequest: this.handleRequest.bind(this), - keepalive: { - path: '/v1/keepalive', - disconnect: true, - }, - }); - - // Because sometimes the socket doesn't properly emit its close event - if (this._onWSRClose) { - this.wsr.addEventListener('close', this._onWSRClose); - } + this.stoppingProcessing = false; } - public async close(): Promise { - window.log.info('MessageReceiver.close()'); - this.calledClose = true; - this.socketStatus = SocketStatus.CLOSING; - - // Our WebSocketResource instance will close the socket and emit a 'close' event - // if the socket doesn't emit one quickly enough. - if (this.wsr) { - this.wsr.close(3000, 'called close'); - } - - this.clearRetryTimeout(); - - return this.drain(); + public stopProcessing(): void { + this.stoppingProcessing = true; } - public checkSocket(): void { - if (this.wsr) { - this.wsr.forceKeepAlive(); - } + public hasEmptied(): boolean { + return Boolean(this.isEmptied); } - public getStatus(): SocketStatus { - return this.socketStatus; - } + public async drain(): Promise { + const waitForEncryptedQueue = async () => + this.addToQueue(async () => { + window.log.info('drained'); + }, TaskType.Decrypted); - public async downloadAttachment( - attachment: ProcessedAttachment - ): Promise { - const cdnId = attachment.cdnId || attachment.cdnKey; - const { cdnNumber } = attachment; + const waitForIncomingQueue = async () => + this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted); - if (!cdnId) { - throw new Error('downloadAttachment: Attachment was missing cdnId!'); - } - - strictAssert(cdnId, 'attachment without cdnId'); - const encrypted = await this.server.getAttachment( - cdnId, - dropNull(cdnNumber) - ); - const { key, digest, size, contentType } = attachment; - - if (!digest) { - throw new Error('Failure: Ask sender to update Signal and resend.'); - } - - strictAssert(key, 'attachment has no key'); - strictAssert(digest, 'attachment has no digest'); - - const paddedData = await Crypto.decryptAttachment( - encrypted, - typedArrayToArrayBuffer(Bytes.fromBase64(key)), - typedArrayToArrayBuffer(Bytes.fromBase64(digest)) - ); - - if (!isNumber(size)) { - throw new Error( - `downloadAttachment: Size was not provided, actual size was ${paddedData.byteLength}` - ); - } - - const data = window.Signal.Crypto.getFirstBytes(paddedData, size); - - return { - ...omit(attachment, 'digest', 'key'), - - contentType: contentType - ? MIME.fromString(contentType) - : MIME.APPLICATION_OCTET_STREAM, - data, - }; + return this.incomingQueue.add(waitForIncomingQueue); } // @@ -548,157 +471,10 @@ export default class MessageReceiver extends EventTarget { // Private // - private shutdown(): void { - if (this.socket) { - if (this._onClose) { - this.socket.removeListener('close', this._onClose); - } - if (this._onError) { - this.socket.removeListener('error', this._onError); - } - - this.socket = undefined; - } - - if (this.wsr) { - if (this._onWSRClose) { - this.wsr.removeEventListener('close', this._onWSRClose); - } - this.wsr = undefined; - } - } - - private async onerror(error: Error): Promise { - window.log.error('websocket error', error); - } - private async dispatchAndWait(event: Event): Promise { this.appQueue.add(async () => Promise.all(this.dispatchEvent(event))); } - private async onclose(code: number, reason: string): Promise { - window.log.info( - 'MessageReceiver: websocket closed', - code, - reason || '', - 'calledClose:', - this.calledClose - ); - - this.socketStatus = SocketStatus.CLOSED; - - this.shutdown(); - - if (this.calledClose) { - return; - } - if (code === 3000) { - return; - } - if (code === 3001) { - this.onEmpty(); - } - - const timeout = this.reconnectBackOff.getAndIncrement(); - - window.log.info(`MessageReceiver: reconnecting after ${timeout}ms`); - await sleep(timeout); - - // Try to reconnect (if there is an HTTP error - we'll get an - // `error` event from `connect()` and hit the secondary retry backoff - // logic in `ts/background.ts`) - await this.connect(); - - // Successfull reconnect, reset the backoff timeouts - this.reconnectBackOff.reset(); - } - - private handleRequest(request: IncomingWebSocketRequest): void { - // We do the message decryption here, instead of in the ordered pending queue, - // to avoid exposing the time it took us to process messages through the time-to-ack. - - if (request.path !== '/api/v1/message') { - window.log.info('got request', request.verb, request.path); - request.respond(200, 'OK'); - - if (request.verb === 'PUT' && request.path === '/api/v1/queue/empty') { - this.incomingQueue.add(() => { - this.onEmpty(); - }); - } - return; - } - - const job = async () => { - const headers = request.headers || []; - - if (!request.body) { - throw new Error( - 'MessageReceiver.handleRequest: request.body was falsey!' - ); - } - - const plaintext = request.body; - - try { - const decoded = Proto.Envelope.decode(plaintext); - const serverTimestamp = normalizeNumber(decoded.serverTimestamp); - - const envelope: ProcessedEnvelope = { - // Make non-private envelope IDs dashless so they don't get redacted - // from logs - id: getGuid().replace(/-/g, ''), - receivedAtCounter: window.Signal.Util.incrementMessageCounter(), - receivedAtDate: Date.now(), - // Calculate the message age (time on server). - messageAgeSec: this.calculateMessageAge(headers, serverTimestamp), - - // Proto.Envelope fields - type: decoded.type, - source: decoded.source, - sourceUuid: decoded.sourceUuid - ? normalizeUuid( - decoded.sourceUuid, - 'MessageReceiver.handleRequest.sourceUuid' - ) - : undefined, - sourceDevice: decoded.sourceDevice, - timestamp: normalizeNumber(decoded.timestamp), - legacyMessage: dropNull(decoded.legacyMessage), - content: dropNull(decoded.content), - serverGuid: decoded.serverGuid, - serverTimestamp, - }; - - // After this point, decoding errors are not the server's - // fault, and we should handle them gracefully and tell the - // user they received an invalid message - - if (envelope.source && this.isBlocked(envelope.source)) { - request.respond(200, 'OK'); - return; - } - - if (envelope.sourceUuid && this.isUuidBlocked(envelope.sourceUuid)) { - request.respond(200, 'OK'); - return; - } - - this.decryptAndCache(envelope, plaintext, request); - this.processedCount += 1; - } catch (e) { - request.respond(500, 'Bad encrypted websocket message'); - window.log.error( - 'Error handling incoming message:', - Errors.toLogFormat(e) - ); - await this.dispatchAndWait(new ErrorEvent(e)); - } - }; - - this.incomingQueue.add(job); - } - private calculateMessageAge( headers: ReadonlyArray, serverTimestamp?: number @@ -748,10 +524,6 @@ export default class MessageReceiver extends EventTarget { } } - public hasEmptied(): boolean { - return Boolean(this.isEmptied); - } - private onEmpty(): void { const emitEmpty = async () => { await Promise.all([ @@ -795,18 +567,6 @@ export default class MessageReceiver extends EventTarget { waitForCacheAddBatcher(); } - private async drain(): Promise { - const waitForEncryptedQueue = async () => - this.addToQueue(async () => { - window.log.info('drained'); - }, TaskType.Decrypted); - - const waitForIncomingQueue = async () => - this.addToQueue(waitForEncryptedQueue, TaskType.Encrypted); - - return this.incomingQueue.add(waitForIncomingQueue); - } - private updateProgress(count: number): void { // count by 10s if (count % 10 !== 0) { @@ -890,7 +650,7 @@ export default class MessageReceiver extends EventTarget { try { const { id } = item; - await window.textsecure.storage.protocol.removeUnprocessed(id); + await this.storage.protocol.removeUnprocessed(id); } catch (deleteError) { window.log.error( 'queueCached error deleting item', @@ -931,17 +691,17 @@ export default class MessageReceiver extends EventTarget { private async getAllFromCache(): Promise> { window.log.info('getAllFromCache'); - const count = await window.textsecure.storage.protocol.getUnprocessedCount(); + const count = await this.storage.protocol.getUnprocessedCount(); if (count > 1500) { - await window.textsecure.storage.protocol.removeAllUnprocessed(); + await this.storage.protocol.removeAllUnprocessed(); window.log.warn( `There were ${count} messages in cache. Deleted all instead of reprocessing` ); return []; } - const items = await window.textsecure.storage.protocol.getAllUnprocessed(); + const items = await this.storage.protocol.getAllUnprocessed(); window.log.info('getAllFromCache loaded', items.length, 'saved envelopes'); return Promise.all( @@ -954,9 +714,9 @@ export default class MessageReceiver extends EventTarget { 'getAllFromCache final attempt for envelope', item.id ); - await window.textsecure.storage.protocol.removeUnprocessed(item.id); + await this.storage.protocol.removeUnprocessed(item.id); } else { - await window.textsecure.storage.protocol.updateUnprocessedAttempts( + await this.storage.protocol.updateUnprocessedAttempts( item.id, attempts ); @@ -986,7 +746,7 @@ export default class MessageReceiver extends EventTarget { }> > = []; - const storageProtocol = window.textsecure.storage.protocol; + const storageProtocol = this.storage.protocol; try { const zone = new Zone('decryptAndCacheBatch', { @@ -1124,7 +884,7 @@ export default class MessageReceiver extends EventTarget { } private async cacheRemoveBatch(items: Array): Promise { - await window.textsecure.storage.protocol.removeUnprocessed(items); + await this.storage.protocol.removeUnprocessed(items); } private removeFromCache(envelope: ProcessedEnvelope): void { @@ -1140,7 +900,7 @@ export default class MessageReceiver extends EventTarget { window.log.info('queueing decrypted envelope', id); const task = this.handleDecryptedEnvelope.bind(this, envelope, plaintext); - const taskWithTimeout = window.textsecure.createTaskWithTimeout( + const taskWithTimeout = createTaskWithTimeout( task, `queueDecryptedEnvelope ${id}` ); @@ -1163,7 +923,7 @@ export default class MessageReceiver extends EventTarget { window.log.info('queueing envelope', id); const task = this.decryptEnvelope.bind(this, stores, envelope); - const taskWithTimeout = window.textsecure.createTaskWithTimeout( + const taskWithTimeout = createTaskWithTimeout( task, `queueEncryptedEnvelope ${id}` ); @@ -1456,10 +1216,10 @@ export default class MessageReceiver extends EventTarget { envelope: UnsealedEnvelope, ciphertext: Uint8Array ): Promise { - const localE164 = window.textsecure.storage.user.getNumber(); - const localUuid = window.textsecure.storage.user.getUuid(); + const localE164 = this.storage.user.getNumber(); + const localUuid = this.storage.user.getUuid(); const localDeviceId = parseIntOrThrow( - window.textsecure.storage.user.getDeviceId(), + this.storage.user.getDeviceId(), 'MessageReceiver.decryptSealedSender: localDeviceId' ); @@ -1513,7 +1273,7 @@ export default class MessageReceiver extends EventTarget { const address = `${sealedSenderIdentifier}.${sealedSenderSourceDevice}`; - const plaintext = await window.textsecure.storage.protocol.enqueueSenderKeyJob( + const plaintext = await this.storage.protocol.enqueueSenderKeyJob( address, () => groupDecrypt( @@ -1539,7 +1299,7 @@ export default class MessageReceiver extends EventTarget { const sealedSenderIdentifier = envelope.sourceUuid || envelope.source; const address = `${sealedSenderIdentifier}.${envelope.sourceDevice}`; - const unsealedPlaintext = await window.textsecure.storage.protocol.enqueueSessionJob( + const unsealedPlaintext = await this.storage.protocol.enqueueSessionJob( address, () => sealedSenderDecryptMessage( @@ -1598,7 +1358,7 @@ export default class MessageReceiver extends EventTarget { const signalMessage = SignalMessage.deserialize(Buffer.from(ciphertext)); const address = `${identifier}.${sourceDevice}`; - const plaintext = await window.textsecure.storage.protocol.enqueueSessionJob( + const plaintext = await this.storage.protocol.enqueueSessionJob( address, async () => this.unpad( @@ -1630,7 +1390,7 @@ export default class MessageReceiver extends EventTarget { ); const address = `${identifier}.${sourceDevice}`; - const plaintext = await window.textsecure.storage.protocol.enqueueSessionJob( + const plaintext = await this.storage.protocol.enqueueSessionJob( address, async () => this.unpad( @@ -1780,8 +1540,8 @@ export default class MessageReceiver extends EventTarget { const groupId = this.getProcessedGroupId(message); const isBlocked = groupId ? this.isGroupBlocked(groupId) : false; const { source, sourceUuid } = envelope; - const ourE164 = window.textsecure.storage.user.getNumber(); - const ourUuid = window.textsecure.storage.user.getUuid(); + const ourE164 = this.storage.user.getNumber(); + const ourUuid = this.storage.user.getUuid(); const isMe = (source && ourE164 && source === ourE164) || (sourceUuid && ourUuid && sourceUuid === ourUuid); @@ -1869,8 +1629,8 @@ export default class MessageReceiver extends EventTarget { const groupId = this.getProcessedGroupId(message); const isBlocked = groupId ? this.isGroupBlocked(groupId) : false; const { source, sourceUuid } = envelope; - const ourE164 = window.textsecure.storage.user.getNumber(); - const ourUuid = window.textsecure.storage.user.getUuid(); + const ourE164 = this.storage.user.getNumber(); + const ourUuid = this.storage.user.getUuid(); const isMe = (source && ourE164 && source === ourE164) || (sourceUuid && ourUuid && sourceUuid === ourUuid); @@ -2086,7 +1846,7 @@ export default class MessageReceiver extends EventTarget { const senderKeyStore = new SenderKeys(); const address = `${identifier}.${sourceDevice}`; - await window.textsecure.storage.protocol.enqueueSenderKeyJob( + await this.storage.protocol.enqueueSenderKeyJob( address, () => processSenderKeyDistributionMessage( @@ -2326,15 +2086,19 @@ export default class MessageReceiver extends EventTarget { envelope: ProcessedEnvelope, syncMessage: ProcessedSyncMessage ): Promise { - const fromSelfSource = - envelope.source && envelope.source === this.number_id; + const ourNumber = this.storage.user.getNumber(); + const ourUuid = this.storage.user.getUuid(); + + const fromSelfSource = envelope.source && envelope.source === ourNumber; const fromSelfSourceUuid = - envelope.sourceUuid && envelope.sourceUuid === this.uuid_id; + envelope.sourceUuid && envelope.sourceUuid === ourUuid; if (!fromSelfSource && !fromSelfSourceUuid) { throw new Error('Received sync message from another number'); } + + const ourDeviceId = this.storage.user.getDeviceId(); // eslint-disable-next-line eqeqeq - if (envelope.sourceDevice == this.deviceId) { + if (envelope.sourceDevice == ourDeviceId) { throw new Error('Received sync message from our own device'); } if (syncMessage.sent) { @@ -2681,14 +2445,14 @@ export default class MessageReceiver extends EventTarget { ): Promise { window.log.info('Setting these numbers as blocked:', blocked.numbers); if (blocked.numbers) { - await window.textsecure.storage.put('blocked', blocked.numbers); + await this.storage.put('blocked', blocked.numbers); } if (blocked.uuids) { const uuids = blocked.uuids.map((uuid, index) => { return normalizeUuid(uuid, `handleBlocked.uuids.${index}`); }); window.log.info('Setting these uuids as blocked:', uuids); - await window.textsecure.storage.put('blocked-uuids', uuids); + await this.storage.put('blocked-uuids', uuids); } const groupIds = map(blocked.groupIds, groupId => Bytes.toBinary(groupId)); @@ -2696,33 +2460,33 @@ export default class MessageReceiver extends EventTarget { 'Setting these groups as blocked:', groupIds.map(groupId => `group(${groupId})`) ); - await window.textsecure.storage.put('blocked-groups', groupIds); + await this.storage.put('blocked-groups', groupIds); this.removeFromCache(envelope); } private isBlocked(number: string): boolean { - return window.textsecure.storage.blocked.isBlocked(number); + return this.storage.blocked.isBlocked(number); } private isUuidBlocked(uuid: string): boolean { - return window.textsecure.storage.blocked.isUuidBlocked(uuid); + return this.storage.blocked.isUuidBlocked(uuid); } private isGroupBlocked(groupId: string): boolean { - return window.textsecure.storage.blocked.isGroupBlocked(groupId); + return this.storage.blocked.isGroupBlocked(groupId); } private async handleAttachment( attachment: Proto.IAttachmentPointer ): Promise { const cleaned = processAttachment(attachment); - return this.downloadAttachment(cleaned); + return downloadAttachment(this.server, cleaned); } private async handleEndSession(identifier: string): Promise { window.log.info(`handleEndSession: closing sessions for ${identifier}`); - await window.textsecure.storage.protocol.archiveAllSessions(identifier); + await this.storage.protocol.archiveAllSessions(identifier); } private async processDecrypted( diff --git a/ts/textsecure/SocketManager.ts b/ts/textsecure/SocketManager.ts new file mode 100644 index 000000000..79b644f04 --- /dev/null +++ b/ts/textsecure/SocketManager.ts @@ -0,0 +1,627 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable no-restricted-syntax */ + +import URL from 'url'; +import ProxyAgent from 'proxy-agent'; +import { RequestInit, Response, Headers } from 'node-fetch'; +import { client as WebSocketClient } from 'websocket'; +import qs from 'querystring'; +import EventListener from 'events'; + +import { AbortableProcess } from '../util/AbortableProcess'; +import { strictAssert } from '../util/assert'; +import { explodePromise } from '../util/explodePromise'; +import { BackOff, FIBONACCI_TIMEOUTS } from '../util/BackOff'; +import { getUserAgent } from '../util/getUserAgent'; +import { sleep } from '../util/sleep'; +import { SocketStatus } from '../types/SocketStatus'; +import * as Errors from '../types/errors'; +import * as Bytes from '../Bytes'; + +import WebSocketResource, { + WebSocketResourceOptions, + IncomingWebSocketRequest, +} from './WebsocketResources'; +import { ConnectTimeoutError, HTTPError } from './Errors'; +import { handleStatusCode, translateError } from './Utils'; +import { WebAPICredentials, IRequestHandler } from './Types.d'; + +// TODO: remove once we move away from ArrayBuffers +const FIXMEU8 = Uint8Array; + +const TEN_SECONDS = 1000 * 10; + +const FIVE_MINUTES = 5 * 60 * 1000; + +export type SocketManagerOptions = Readonly<{ + url: string; + certificateAuthority: string; + version: string; + proxyUrl?: string; +}>; + +// This class manages two websocket resource: +// +// - Authenticated WebSocketResource which uses supplied WebAPICredentials and +// automatically reconnects on closed socket (using back off) +// - Unauthenticated WebSocketResource that is created on demand and reconnected +// every 5 minutes. +// +// Incoming requests on authenticated resource are funneled into the registered +// request handlers (`registerRequestHandler`) or queued internally until at +// least one such request handler becomes available. +// +// Incoming requests on unauthenticated resource are not currently supported. +// WebSocketResource is responsible for their immediate termination. +export class SocketManager extends EventListener { + private backOff = new BackOff(FIBONACCI_TIMEOUTS); + + private authenticated?: AbortableProcess; + + private unauthenticated?: AbortableProcess; + + private credentials?: WebAPICredentials; + + private readonly proxyAgent?: ProxyAgent; + + private status = SocketStatus.CLOSED; + + private requestHandlers = new Set(); + + private incomingRequestQueue = new Array(); + + private isOffline = false; + + constructor(private readonly options: SocketManagerOptions) { + super(); + + if (options.proxyUrl) { + this.proxyAgent = new ProxyAgent(options.proxyUrl); + } + } + + public getStatus(): SocketStatus { + return this.status; + } + + // Update WebAPICredentials and reconnect authenticated resource if + // credentials changed + public async authenticate(credentials: WebAPICredentials): Promise { + if (this.isOffline) { + throw new HTTPError('SocketManager offline', { + code: 0, + headers: {}, + stack: new Error().stack, + }); + } + + const { username, password } = credentials; + if (!username && !password) { + window.log.warn( + 'SocketManager authenticate was called without credentials' + ); + return; + } + + if ( + this.credentials && + this.credentials.username === username && + this.credentials.password === password && + this.authenticated + ) { + try { + await this.authenticated.getResult(); + } catch (error) { + window.log.warn( + 'SocketManager: failed to wait for existing authenticated socket ' + + ` due to error: ${Errors.toLogFormat(error)}` + ); + } + return; + } + + this.credentials = credentials; + + window.log.info('SocketManager: connecting authenticated socket'); + + this.status = SocketStatus.CONNECTING; + + const process = this.connectResource({ + path: '/v1/websocket/', + query: { login: username, password }, + resourceOptions: { + keepalive: { path: '/v1/keepalive' }, + handleRequest: (req: IncomingWebSocketRequest): void => { + this.queueOrHandleRequest(req); + }, + }, + }); + + // Cancel previous connect attempt or close socket + this.authenticated?.abort(); + + this.authenticated = process; + + const reconnect = async (): Promise => { + const timeout = this.backOff.getAndIncrement(); + + window.log.info( + 'SocketManager: reconnecting authenticated socket ' + + `after ${timeout}ms` + ); + + await sleep(timeout); + if (this.isOffline) { + window.log.info( + 'SocketManager: cancelled reconnect because we are offline' + ); + return; + } + + if (this.authenticated) { + window.log.info( + 'SocketManager: authenticated socket already reconnected' + ); + return; + } + + strictAssert(this.credentials !== undefined, 'Missing credentials'); + + try { + await this.authenticate(this.credentials); + } catch (error) { + window.log.info( + 'SocketManager: authenticated socket failed to reconect ' + + `due to error ${Errors.toLogFormat(error)}` + ); + return reconnect(); + } + }; + + let authenticated: WebSocketResource; + try { + authenticated = await process.getResult(); + this.status = SocketStatus.OPEN; + } catch (error) { + strictAssert(this.authenticated === process, 'Someone stole our socket'); + this.dropAuthenticated(process); + + window.log.warn( + 'SocketManager: authenticated socket connection failed with ' + + `error: ${Errors.toLogFormat(error)}` + ); + + if (error instanceof HTTPError) { + const { code } = error; + + if (code === 401 || code === 403) { + this.emit('authError', error); + return; + } + + if (code !== 500 && code !== -1) { + // No reconnect attempt should be made + return; + } + } + + reconnect(); + return; + } + + window.log.info('SocketManager: connected authenticated socket'); + + window.logAuthenticatedConnect(); + this.backOff.reset(); + + authenticated.addEventListener('close', ({ code, reason }): void => { + if (this.authenticated !== process) { + return; + } + + window.log.warn( + 'SocketManager: authenticated socket closed ' + + `with code=${code} and reason=${reason}` + ); + this.dropAuthenticated(process); + + if (code === 3000) { + // Intentional disconnect + return; + } + + reconnect(); + }); + } + + // Either returns currently connecting/active authenticated + // WebSocketResource or connects a fresh one. + public async getAuthenticatedResource(): Promise { + if (!this.authenticated) { + strictAssert(this.credentials !== undefined, 'Missing credentials'); + await this.authenticate(this.credentials); + } + + strictAssert(this.authenticated !== undefined, 'Authentication failed'); + return this.authenticated.getResult(); + } + + // Creates new WebSocketResource for AccountManager's provisioning + public async getProvisioningResource( + handler: IRequestHandler + ): Promise { + return this.connectResource({ + path: '/v1/websocket/provisioning/', + resourceOptions: { + handleRequest: (req: IncomingWebSocketRequest): void => { + handler.handleRequest(req); + }, + keepalive: { path: '/v1/keepalive/provisioning' }, + }, + }).getResult(); + } + + // Fetch-compatible wrapper around underlying unauthenticated/authenticated + // websocket resources. This wrapper supports only limited number of features + // of node-fetch despite being API compatible. + public async fetch(url: string, init: RequestInit): Promise { + const headers = new Headers(init.headers); + + const isAuthenticated = headers.has('Authorization'); + + let resource: WebSocketResource; + if (isAuthenticated) { + resource = await this.getAuthenticatedResource(); + } else { + resource = await this.getUnauthenticatedResource(); + } + + const { path } = URL.parse(url); + strictAssert(path, "Fetch can't have empty path"); + + const { method = 'GET', body, timeout } = init; + + let bodyBytes: Uint8Array | undefined; + if (body === undefined) { + bodyBytes = undefined; + } else if (body instanceof Uint8Array) { + bodyBytes = body; + } else if (body instanceof ArrayBuffer) { + bodyBytes = new FIXMEU8(body); + } else if (typeof body === 'string') { + bodyBytes = Bytes.fromString(body); + } else { + throw new Error(`Unsupported body type: ${typeof body}`); + } + + const { + status, + message: statusText, + response, + headers: flatResponseHeaders, + } = await resource.sendRequest({ + verb: method, + path, + body: bodyBytes, + headers: Array.from(headers.entries()).map(([key, value]) => { + return `${key}:${value}`; + }), + timeout, + }); + + const responseHeaders: Array<[string, string]> = flatResponseHeaders.map( + header => { + const [key, value] = header.split(':', 2); + strictAssert(value !== undefined, 'Invalid header!'); + return [key, value]; + } + ); + + return new Response(response, { + status, + statusText, + headers: responseHeaders, + }); + } + + public registerRequestHandler(handler: IRequestHandler): void { + this.requestHandlers.add(handler); + + const queue = this.incomingRequestQueue; + if (queue.length === 0) { + return; + } + + window.log.info( + `SocketManager: processing ${queue.length} queued incoming requests` + ); + this.incomingRequestQueue = []; + for (const req of queue) { + this.queueOrHandleRequest(req); + } + } + + public unregisterRequestHandler(handler: IRequestHandler): void { + this.requestHandlers.delete(handler); + } + + // Force keep-alive checks on WebSocketResources + public async check(): Promise { + if (this.isOffline) { + return; + } + + window.log.info('SocketManager.check'); + await Promise.all([ + SocketManager.checkResource(this.authenticated), + SocketManager.checkResource(this.unauthenticated), + ]); + } + + // Puts SocketManager into "online" state and reconnects the authenticated + // WebSocketResource (if there are valid credentials) + public async onOnline(): Promise { + window.log.info('SocketManager.onOnline'); + this.isOffline = false; + + if (this.credentials) { + await this.authenticate(this.credentials); + } + } + + // Puts SocketManager into "offline" state and gracefully disconnects both + // unauthenticated and authenticated resources. + public async onOffline(): Promise { + window.log.info('SocketManager.onOffline'); + this.isOffline = true; + + this.authenticated?.abort(); + this.unauthenticated?.abort(); + } + + // + // Private + // + + private async getUnauthenticatedResource(): Promise { + if (this.isOffline) { + throw new HTTPError('SocketManager offline', { + code: 0, + headers: {}, + stack: new Error().stack, + }); + } + + if (this.unauthenticated) { + return this.unauthenticated.getResult(); + } + + window.log.info('SocketManager: connecting unauthenticated socket'); + + const process = this.connectResource({ + path: '/v1/websocket/', + resourceOptions: { + keepalive: { path: '/v1/keepalive' }, + }, + }); + this.unauthenticated = process; + + const unauthenticated = await this.unauthenticated.getResult(); + + window.log.info('SocketManager: connected unauthenticated socket'); + + let timer: NodeJS.Timeout | undefined = setTimeout(() => { + window.log.info( + 'SocketManager: shutting down unauthenticated socket after timeout' + ); + timer = undefined; + unauthenticated.shutdown(); + this.dropUnauthenticated(process); + }, FIVE_MINUTES); + + unauthenticated.addEventListener('close', ({ code, reason }): void => { + if (timer !== undefined) { + clearTimeout(timer); + timer = undefined; + } + + if (this.unauthenticated !== process) { + return; + } + + window.log.warn( + 'SocketManager: unauthenticated socket closed ' + + `with code=${code} and reason=${reason}` + ); + + this.dropUnauthenticated(process); + }); + + return this.unauthenticated.getResult(); + } + + private connectResource({ + path, + resourceOptions, + query = {}, + timeout = TEN_SECONDS, + }: { + path: string; + resourceOptions: WebSocketResourceOptions; + query?: Record; + timeout?: number; + }): AbortableProcess { + const fixedScheme = this.options.url + .replace('https://', 'wss://') + .replace('http://', 'ws://'); + + const headers = { + 'User-Agent': getUserAgent(this.options.version), + }; + const client = new WebSocketClient({ + tlsOptions: { + ca: this.options.certificateAuthority, + agent: this.proxyAgent, + }, + maxReceivedFrameSize: 0x210000, + }); + + const queryWithDefaults = { + agent: 'OWD', + version: this.options.version, + ...query, + }; + + client.connect( + `${fixedScheme}${path}?${qs.encode(queryWithDefaults)}`, + undefined, + undefined, + headers + ); + + const { stack } = new Error(); + + const { promise, resolve, reject } = explodePromise(); + + const timer = setTimeout(() => { + reject(new ConnectTimeoutError('Connection timed out')); + + client.abort(); + }, timeout); + + let resource: WebSocketResource | undefined; + client.on('connect', socket => { + clearTimeout(timer); + + resource = new WebSocketResource(socket, resourceOptions); + resolve(resource); + }); + + client.on('httpResponse', async response => { + clearTimeout(timer); + + const statusCode = response.statusCode || -1; + await handleStatusCode(statusCode); + + const error = new HTTPError( + 'connectResource: invalid websocket response', + { + code: statusCode || -1, + headers: {}, + stack, + } + ); + + const translatedError = translateError(error); + strictAssert( + translatedError, + '`httpResponse` event cannot be emitted with 200 status code' + ); + + reject(translatedError); + }); + + client.on('connectFailed', e => { + clearTimeout(timer); + + reject( + new HTTPError('connectResource: connectFailed', { + code: -1, + headers: {}, + response: e.toString(), + stack, + }) + ); + }); + + return new AbortableProcess( + `SocketManager.connectResource(${path})`, + { + abort() { + if (resource) { + resource.close(3000, 'aborted'); + } else { + clearTimeout(timer); + client.abort(); + } + }, + }, + promise + ); + } + + private static async checkResource( + process?: AbortableProcess + ): Promise { + if (!process) { + return; + } + + const resource = await process.getResult(); + resource.forceKeepAlive(); + } + + private dropAuthenticated( + process: AbortableProcess + ): void { + strictAssert( + this.authenticated === process, + 'Authenticated resource mismatch' + ); + + this.incomingRequestQueue = []; + this.authenticated = undefined; + this.status = SocketStatus.CLOSED; + } + + private dropUnauthenticated( + process: AbortableProcess + ): void { + strictAssert( + this.unauthenticated === process, + 'Unauthenticated resource mismatch' + ); + this.unauthenticated = undefined; + } + + private queueOrHandleRequest(req: IncomingWebSocketRequest): void { + if (this.requestHandlers.size === 0) { + this.incomingRequestQueue.push(req); + window.log.info( + 'SocketManager: request handler unavailable, ' + + `queued request. Queue size: ${this.incomingRequestQueue.length}` + ); + return; + } + for (const handlers of this.requestHandlers) { + try { + handlers.handleRequest(req); + } catch (error) { + window.log.warn( + 'SocketManager: got exception while handling incoming request, ' + + `error: ${Errors.toLogFormat(error)}` + ); + } + } + } + + // EventEmitter types + + public on(type: 'authError', callback: (error: HTTPError) => void): this; + + public on( + type: string | symbol, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + listener: (...args: Array) => void + ): this { + return super.on(type, listener); + } + + public emit(type: 'authError', error: HTTPError): boolean; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + public emit(type: string | symbol, ...args: Array): boolean { + return super.emit(type, ...args); + } +} diff --git a/ts/textsecure/Types.d.ts b/ts/textsecure/Types.d.ts index 74f46f31d..dfa357b5e 100644 --- a/ts/textsecure/Types.d.ts +++ b/ts/textsecure/Types.d.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: AGPL-3.0-only import type { SignalService as Proto } from '../protobuf'; +import type { IncomingWebSocketRequest } from './WebsocketResources'; export { IdentityKeyType, @@ -232,3 +233,7 @@ export interface CallbackResultType { timestamp?: number; recipients?: Record>; } + +export interface IRequestHandler { + handleRequest(request: IncomingWebSocketRequest): void; +} diff --git a/ts/textsecure/Utils.ts b/ts/textsecure/Utils.ts new file mode 100644 index 000000000..8880ce31e --- /dev/null +++ b/ts/textsecure/Utils.ts @@ -0,0 +1,46 @@ +// Copyright 2020-2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export async function handleStatusCode(status: number): Promise { + if (status === 499) { + window.log.error('Got 499 from Signal Server. Build is expired.'); + await window.storage.put('remoteBuildExpiration', Date.now()); + window.reduxActions.expiration.hydrateExpirationStatus(true); + } +} + +export function translateError(error: Error): Error | undefined { + const { code } = error; + if (code === 200) { + // Happens sometimes when we get no response. Might be nice to get 204 instead. + return undefined; + } + let message: string; + switch (code) { + case -1: + message = + 'Failed to connect to the server, please check your network connection.'; + break; + case 413: + message = 'Rate limit exceeded, please try again later.'; + break; + case 403: + message = 'Invalid code, please try again.'; + break; + case 417: + message = 'Number already registered.'; + break; + case 401: + message = + 'Invalid authentication, most likely someone re-registered and invalidated our registration.'; + break; + case 404: + message = 'Number is not registered.'; + break; + default: + message = 'The server rejected our query, please file a bug report.'; + } + // eslint-disable-next-line no-param-reassign + error.message = `${message} (original: ${error.message})`; + return error; +} diff --git a/ts/textsecure/WebAPI.ts b/ts/textsecure/WebAPI.ts index 8f01cc79d..c4a913f93 100644 --- a/ts/textsecure/WebAPI.ts +++ b/ts/textsecure/WebAPI.ts @@ -11,7 +11,7 @@ import fetch, { Response } from 'node-fetch'; import ProxyAgent from 'proxy-agent'; -import { Agent, RequestOptions } from 'https'; +import { Agent } from 'https'; import pProps from 'p-props'; import { compact, @@ -26,13 +26,13 @@ import { pki } from 'node-forge'; import is from '@sindresorhus/is'; import PQueue from 'p-queue'; import { v4 as getGuid } from 'uuid'; -import { client as WebSocketClient, connection as WebSocket } from 'websocket'; import { z } from 'zod'; import Long from 'long'; import { assert } from '../util/assert'; import { getUserAgent } from '../util/getUserAgent'; import { toWebSafeBase64 } from '../util/webSafeBase64'; +import { SocketStatus } from '../types/SocketStatus'; import { isPackIdValid, redactPackId } from '../types/Stickers'; import * as Bytes from '../Bytes'; import { @@ -55,11 +55,14 @@ import { StorageServiceCallOptionsType, StorageServiceCredentials, } from '../textsecure.d'; +import { SocketManager } from './SocketManager'; +import WebSocketResource from './WebsocketResources'; import { SignalService as Proto } from '../protobuf'; -import { ConnectTimeoutError } from './Errors'; +import { HTTPError } from './Errors'; import MessageSender from './SendMessage'; -import { WebAPICredentials } from './Types.d'; +import { WebAPICredentials, IRequestHandler } from './Types.d'; +import { handleStatusCode, translateError } from './Utils'; // TODO: remove once we move away from ArrayBuffers const FIXMEU8 = Uint8Array; @@ -263,96 +266,6 @@ function _validateResponse(response: any, schema: any) { return true; } -export type ConnectSocketOptions = Readonly<{ - certificateAuthority: string; - proxyUrl?: string; - version: string; - timeout?: number; -}>; - -const TEN_SECONDS = 1000 * 10; - -async function _connectSocket( - url: string, - { - certificateAuthority, - proxyUrl, - version, - timeout = TEN_SECONDS, - }: ConnectSocketOptions -): Promise { - let tlsOptions: RequestOptions = { - ca: certificateAuthority, - }; - if (proxyUrl) { - tlsOptions = { - ...tlsOptions, - agent: new ProxyAgent(proxyUrl), - }; - } - - const headers = { - 'User-Agent': getUserAgent(version), - }; - const client = new WebSocketClient({ - tlsOptions, - maxReceivedFrameSize: 0x210000, - }); - - client.connect(url, undefined, undefined, headers); - - const { stack } = new Error(); - - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - reject(new ConnectTimeoutError('Connection timed out')); - - client.abort(); - }, timeout); - - client.on('connect', socket => { - clearTimeout(timer); - resolve(socket); - }); - - client.on('httpResponse', async response => { - clearTimeout(timer); - - const statusCode = response.statusCode || -1; - await _handleStatusCode(statusCode); - - const error = makeHTTPError( - '_connectSocket: invalid websocket response', - statusCode || -1, - {}, // headers - undefined, - stack - ); - - const translatedError = _translateError(error); - assert( - translatedError, - '`httpResponse` event cannot be emitted with 200 status code' - ); - - reject(translatedError); - }); - client.on('connectFailed', e => { - clearTimeout(timer); - - reject( - makeHTTPError( - '_connectSocket: connectFailed', - -1, - {}, - e.toString(), - stack - ) - ); - }); - }); -} - const FIVE_MINUTES = 1000 * 60 * 5; type AgentCacheType = { @@ -378,6 +291,7 @@ type HTTPCodeType = 'GET' | 'POST' | 'PUT' | 'DELETE' | 'PATCH'; type RedactUrl = (url: string) => string; type PromiseAjaxOptionsType = { + socketManager?: SocketManager; accessKey?: string; basicAuth?: string; certificateAuthority?: string; @@ -468,284 +382,218 @@ function getHostname(url: string): string { return urlObject.hostname; } -async function _handleStatusCode( - status: number, - unauthenticated = false -): Promise { - if (status === 499) { - window.log.error('Got 499 from Signal Server. Build is expired.'); - await window.storage.put('remoteBuildExpiration', Date.now()); - window.reduxActions.expiration.hydrateExpirationStatus(true); - } - if (!unauthenticated && status === 401) { - window.log.error('Got 401 from Signal Server. We might be unlinked.'); - window.Whisper.events.trigger('mightBeUnlinked'); - } -} - -function _translateError(error: Error): Error | undefined { - const { code } = error; - if (code === 200) { - // Happens sometimes when we get no response. Might be nice to get 204 instead. - return undefined; - } - let message: string; - switch (code) { - case -1: - message = - 'Failed to connect to the server, please check your network connection.'; - break; - case 413: - message = 'Rate limit exceeded, please try again later.'; - break; - case 403: - message = 'Invalid code, please try again.'; - break; - case 417: - message = 'Number already registered.'; - break; - case 401: - message = - 'Invalid authentication, most likely someone re-registered and invalidated our registration.'; - break; - case 404: - message = 'Number is not registered.'; - break; - default: - message = 'The server rejected our query, please file a bug report.'; - } - error.message = `${message} (original: ${error.message})`; - return error; -} - async function _promiseAjax( providedUrl: string | null, options: PromiseAjaxOptionsType -): Promise { - return new Promise((resolve, reject) => { - const url = providedUrl || `${options.host}/${options.path}`; +): Promise< + | string + | ArrayBuffer + | unknown + | JSONWithDetailsType + | ArrayBufferWithDetailsType +> { + const url = providedUrl || `${options.host}/${options.path}`; - const unauthLabel = options.unauthenticated ? ' (unauth)' : ''; + const unauthLabel = options.unauthenticated ? ' (unauth)' : ''; + if (options.redactUrl) { + window.log.info(`${options.type} ${options.redactUrl(url)}${unauthLabel}`); + } else { + window.log.info(`${options.type} ${url}${unauthLabel}`); + } + + const timeout = typeof options.timeout === 'number' ? options.timeout : 10000; + + const { proxyUrl, socketManager } = options; + const agentType = options.unauthenticated ? 'unauth' : 'auth'; + const cacheKey = `${proxyUrl}-${agentType}`; + + const { timestamp } = agents[cacheKey] || { timestamp: null }; + if (!timestamp || timestamp + FIVE_MINUTES < Date.now()) { + if (timestamp) { + window.log.info(`Cycling agent for type ${cacheKey}`); + } + agents[cacheKey] = { + agent: proxyUrl + ? new ProxyAgent(proxyUrl) + : new Agent({ keepAlive: true }), + timestamp: Date.now(), + }; + } + const { agent } = agents[cacheKey]; + + const fetchOptions = { + method: options.type, + body: options.data, + headers: { + 'User-Agent': getUserAgent(options.version), + 'X-Signal-Agent': 'OWD', + ...options.headers, + } as FetchHeaderListType, + redirect: options.redirect, + agent, + ca: options.certificateAuthority, + timeout, + }; + + if (fetchOptions.body instanceof ArrayBuffer) { + // node-fetch doesn't support ArrayBuffer, only node Buffer + const contentLength = fetchOptions.body.byteLength; + fetchOptions.body = Buffer.from(fetchOptions.body); + + // node-fetch doesn't set content-length like S3 requires + fetchOptions.headers['Content-Length'] = contentLength.toString(); + } + + const { accessKey, basicAuth, unauthenticated } = options; + if (basicAuth) { + fetchOptions.headers.Authorization = `Basic ${basicAuth}`; + } else if (unauthenticated) { + if (!accessKey) { + throw new Error( + '_promiseAjax: mode is unauthenticated, but accessKey was not provided' + ); + } + // Access key is already a Base64 string + fetchOptions.headers['Unidentified-Access-Key'] = accessKey; + } else if (options.user && options.password) { + const user = _getString(options.user); + const password = _getString(options.password); + const auth = _btoa(`${user}:${password}`); + fetchOptions.headers.Authorization = `Basic ${auth}`; + } + + if (options.contentType) { + fetchOptions.headers['Content-Type'] = options.contentType; + } + + let response: Response; + let result: string | ArrayBuffer | unknown; + try { + response = socketManager + ? await socketManager.fetch(url, fetchOptions) + : await fetch(url, fetchOptions); + + if ( + options.serverUrl && + getHostname(options.serverUrl) === getHostname(url) + ) { + await handleStatusCode(response.status); + + if (!unauthenticated && response.status === 401) { + window.log.error('Got 401 from Signal Server. We might be unlinked.'); + window.Whisper.events.trigger('mightBeUnlinked'); + } + } + + if (DEBUG && !isSuccess(response.status)) { + result = await response.text(); + } else if ( + (options.responseType === 'json' || + options.responseType === 'jsonwithdetails') && + /^application\/json(;.*)?$/.test( + response.headers.get('Content-Type') || '' + ) + ) { + result = await response.json(); + } else if ( + options.responseType === 'arraybuffer' || + options.responseType === 'arraybufferwithdetails' + ) { + result = await response.arrayBuffer(); + } else { + result = await response.textConverted(); + } + } catch (e) { + if (options.redactUrl) { + window.log.error(options.type, options.redactUrl(url), 0, 'Error'); + } else { + window.log.error(options.type, url, 0, 'Error'); + } + const stack = `${e.stack}\nInitial stack:\n${options.stack}`; + throw makeHTTPError('promiseAjax catch', 0, {}, e.toString(), stack); + } + + if (!isSuccess(response.status)) { if (options.redactUrl) { window.log.info( - `${options.type} ${options.redactUrl(url)}${unauthLabel}` + options.type, + options.redactUrl(url), + response.status, + 'Error' ); } else { - window.log.info(`${options.type} ${url}${unauthLabel}`); + window.log.error(options.type, url, response.status, 'Error'); } - const timeout = - typeof options.timeout === 'number' ? options.timeout : 10000; + throw makeHTTPError( + 'promiseAjax: error response', + response.status, + response.headers.raw(), + result, + options.stack + ); + } - const { proxyUrl } = options; - const agentType = options.unauthenticated ? 'unauth' : 'auth'; - const cacheKey = `${proxyUrl}-${agentType}`; - - const { timestamp } = agents[cacheKey] || { timestamp: null }; - if (!timestamp || timestamp + FIVE_MINUTES < Date.now()) { - if (timestamp) { - window.log.info(`Cycling agent for type ${cacheKey}`); - } - agents[cacheKey] = { - agent: proxyUrl - ? new ProxyAgent(proxyUrl) - : new Agent({ keepAlive: true }), - timestamp: Date.now(), - }; - } - const { agent } = agents[cacheKey]; - - const fetchOptions = { - method: options.type, - body: options.data, - headers: { - 'User-Agent': getUserAgent(options.version), - 'X-Signal-Agent': 'OWD', - ...options.headers, - } as FetchHeaderListType, - redirect: options.redirect, - agent, - ca: options.certificateAuthority, - timeout, - }; - - if (fetchOptions.body instanceof ArrayBuffer) { - // node-fetch doesn't support ArrayBuffer, only node Buffer - const contentLength = fetchOptions.body.byteLength; - fetchOptions.body = Buffer.from(fetchOptions.body); - - // node-fetch doesn't set content-length like S3 requires - fetchOptions.headers['Content-Length'] = contentLength.toString(); - } - - const { accessKey, basicAuth, unauthenticated } = options; - if (basicAuth) { - fetchOptions.headers.Authorization = `Basic ${basicAuth}`; - } else if (unauthenticated) { - if (!accessKey) { - throw new Error( - '_promiseAjax: mode is unauthenticated, but accessKey was not provided' + if ( + options.responseType === 'json' || + options.responseType === 'jsonwithdetails' + ) { + if (options.validateResponse) { + if (!_validateResponse(result, options.validateResponse)) { + if (options.redactUrl) { + window.log.info( + options.type, + options.redactUrl(url), + response.status, + 'Error' + ); + } else { + window.log.error(options.type, url, response.status, 'Error'); + } + throw makeHTTPError( + 'promiseAjax: invalid response', + response.status, + response.headers.raw(), + result, + options.stack ); } - // Access key is already a Base64 string - fetchOptions.headers['Unidentified-Access-Key'] = accessKey; - } else if (options.user && options.password) { - const user = _getString(options.user); - const password = _getString(options.password); - const auth = _btoa(`${user}:${password}`); - fetchOptions.headers.Authorization = `Basic ${auth}`; } + } - if (options.contentType) { - fetchOptions.headers['Content-Type'] = options.contentType; - } + if (options.redactUrl) { + window.log.info( + options.type, + options.redactUrl(url), + response.status, + 'Success' + ); + } else { + window.log.info(options.type, url, response.status, 'Success'); + } - fetch(url, fetchOptions) - .then(async response => { - if ( - options.serverUrl && - getHostname(options.serverUrl) === getHostname(url) - ) { - await _handleStatusCode(response.status, unauthenticated); - } + if (options.responseType === 'arraybufferwithdetails') { + assert(result instanceof ArrayBuffer, 'Expected ArrayBuffer result'); + const fullResult: ArrayBufferWithDetailsType = { + data: result, + contentType: getContentType(response), + response, + }; - let resultPromise; - if (DEBUG && !isSuccess(response.status)) { - resultPromise = response.text(); - } else if ( - (options.responseType === 'json' || - options.responseType === 'jsonwithdetails') && - /^application\/json(;.*)?$/.test( - response.headers.get('Content-Type') || '' - ) - ) { - resultPromise = response.json(); - } else if ( - options.responseType === 'arraybuffer' || - options.responseType === 'arraybufferwithdetails' - ) { - resultPromise = response.buffer(); - } else { - resultPromise = response.textConverted(); - } + return fullResult; + } - return resultPromise.then(result => { - if (isSuccess(response.status)) { - if ( - options.responseType === 'arraybuffer' || - options.responseType === 'arraybufferwithdetails' - ) { - result = result.buffer.slice( - result.byteOffset, - result.byteOffset + result.byteLength - ); - } - if ( - options.responseType === 'json' || - options.responseType === 'jsonwithdetails' - ) { - if (options.validateResponse) { - if (!_validateResponse(result, options.validateResponse)) { - if (options.redactUrl) { - window.log.info( - options.type, - options.redactUrl(url), - response.status, - 'Error' - ); - } else { - window.log.error( - options.type, - url, - response.status, - 'Error' - ); - } - reject( - makeHTTPError( - 'promiseAjax: invalid response', - response.status, - response.headers.raw(), - result, - options.stack - ) - ); + if (options.responseType === 'jsonwithdetails') { + const fullResult: JSONWithDetailsType = { + data: result, + contentType: getContentType(response), + response, + }; - return; - } - } - } + return fullResult; + } - if (options.redactUrl) { - window.log.info( - options.type, - options.redactUrl(url), - response.status, - 'Success' - ); - } else { - window.log.info(options.type, url, response.status, 'Success'); - } - if (options.responseType === 'arraybufferwithdetails') { - const fullResult: ArrayBufferWithDetailsType = { - data: result, - contentType: getContentType(response), - response, - }; - - resolve(fullResult); - - return; - } - if (options.responseType === 'jsonwithdetails') { - const fullResult: JSONWithDetailsType = { - data: result, - contentType: getContentType(response), - response, - }; - - resolve(fullResult); - - return; - } - - resolve(result); - - return; - } - - if (options.redactUrl) { - window.log.info( - options.type, - options.redactUrl(url), - response.status, - 'Error' - ); - } else { - window.log.error(options.type, url, response.status, 'Error'); - } - - reject( - makeHTTPError( - 'promiseAjax: error response', - response.status, - response.headers.raw(), - result, - options.stack - ) - ); - }); - }) - .catch(e => { - if (options.redactUrl) { - window.log.error(options.type, options.redactUrl(url), 0, 'Error'); - } else { - window.log.error(options.type, url, 0, 'Error'); - } - const stack = `${e.stack}\nInitial stack:\n${options.stack}`; - reject(makeHTTPError('promiseAjax catch', 0, {}, e.toString(), stack)); - }); - }); + return result; } async function _retryAjax( @@ -793,21 +641,12 @@ function makeHTTPError( response: any, stack?: string ) { - const code = providedCode > 999 || providedCode < 100 ? -1 : providedCode; - const e = new Error(`${message}; code: ${code}`); - e.name = 'HTTPError'; - e.code = code; - e.responseHeaders = headers; - if (DEBUG && response) { - e.stack += `\nresponse: ${response}`; - } - - e.stack += `\nOriginal stack:\n${stack}`; - if (response) { - e.response = response; - } - - return e; + return new HTTPError(message, { + code: providedCode, + headers, + response, + stack, + }); } const URL_CALLS = { @@ -844,6 +683,21 @@ const URL_CALLS = { challenge: 'v1/challenge', }; +const WEBSOCKET_CALLS = new Set([ + // MessageController + 'messages', + 'reportMessage', + + // ProfileController + 'profile', + + // AttachmentControllerV2 + 'attachmentId', + + // RemoteConfigController + 'config', +]); + type InitializeOptionsType = { url: string; storageUrl: string; @@ -983,7 +837,6 @@ export type WebAPIType = { deviceId?: number, options?: { accessKey?: string } ) => Promise; - getMessageSocket: () => Promise; getMyKeys: () => Promise; getProfile: ( identifier: string, @@ -1000,7 +853,9 @@ export type WebAPIType = { profileKeyCredentialRequest?: string; } ) => Promise; - getProvisioningSocket: () => Promise; + getProvisioningResource: ( + handler: IRequestHandler + ) => Promise; getSenderCertificate: ( withUuid?: boolean ) => Promise<{ certificate: string }>; @@ -1086,6 +941,12 @@ export type WebAPIType = { Array<{ name: string; enabled: boolean; value: string | null }> >; authenticate: (credentials: WebAPICredentials) => Promise; + getSocketStatus: () => SocketStatus; + registerRequestHandler: (handler: IRequestHandler) => void; + unregisterRequestHandler: (handler: IRequestHandler) => void; + checkSockets: () => void; + onOnline: () => Promise; + onOffline: () => Promise; }; export type SignedPreKeyType = { @@ -1200,8 +1061,27 @@ export function initialize({ const PARSE_RANGE_HEADER = /\/(\d+)$/; const PARSE_GROUP_LOG_RANGE_HEADER = /$versions (\d{1,10})-(\d{1,10})\/(d{1,10})/; + const socketManager = new SocketManager({ + url, + certificateAuthority, + version, + proxyUrl, + }); + + socketManager.on('authError', () => { + window.Whisper.events.trigger('unlinkAndDisconnect'); + }); + + socketManager.authenticate({ username, password }); + // Thanks, function hoisting! return { + getSocketStatus, + checkSockets, + onOnline, + onOffline, + registerRequestHandler, + unregisterRequestHandler, authenticate, confirmCode, createGroup, @@ -1220,11 +1100,10 @@ export function initialize({ getIceServers, getKeysForIdentifier, getKeysForIdentifierUnauth, - getMessageSocket, getMyKeys, getProfile, getProfileUnauth, - getProvisioningSocket, + getProvisioningResource, getSenderCertificate, getSticker, getStickerPackManifest, @@ -1261,7 +1140,10 @@ export function initialize({ param.urlParameters = ''; } + const useWebSocket = WEBSOCKET_CALLS.has(param.call); + return _outerAjax(null, { + socketManager: useWebSocket ? socketManager : undefined, basicAuth: param.basicAuth, certificateAuthority, contentType: param.contentType || 'application/json; charset=utf-8', @@ -1282,7 +1164,7 @@ export function initialize({ unauthenticated: param.unauthenticated, accessKey: param.accessKey, }).catch((e: Error) => { - const translatedError = _translateError(e); + const translatedError = translateError(e); if (translatedError) { throw translatedError; } @@ -1311,6 +1193,33 @@ export function initialize({ }: WebAPICredentials) { username = newUsername; password = newPassword; + + await socketManager.authenticate({ username, password }); + } + + function getSocketStatus(): SocketStatus { + return socketManager.getStatus(); + } + + function checkSockets(): void { + // Intentionally not awaiting + socketManager.check(); + } + + async function onOnline(): Promise { + await socketManager.onOnline(); + } + + async function onOffline(): Promise { + await socketManager.onOffline(); + } + + function registerRequestHandler(handler: IRequestHandler): void { + socketManager.registerRequestHandler(handler); + } + + function unregisterRequestHandler(handler: IRequestHandler): void { + socketManager.unregisterRequestHandler(handler); } async function getConfig() { @@ -1589,8 +1498,7 @@ export function initialize({ const urlPrefix = deviceName ? '/' : '/code/'; // We update our saved username and password, since we're creating a new account - username = number; - password = newPassword; + await authenticate({ username: number, password: newPassword }); const response = await _ajax({ call, @@ -1601,7 +1509,10 @@ export function initialize({ }); // From here on out, our username will be our UUID or E164 combined with device - username = `${response.uuid || number}.${response.deviceId || 1}`; + await authenticate({ + username: `${response.uuid || number}.${response.deviceId || 1}`, + password, + }); return response; } @@ -2157,7 +2068,7 @@ export function initialize({ timeout: 0, type, version, - }); + }) as Promise; } // Groups @@ -2312,7 +2223,7 @@ export function initialize({ timeout: 0, type: 'GET', version, - }); + }) as Promise; } async function createGroup( @@ -2461,32 +2372,10 @@ export function initialize({ }; } - function getMessageSocket(): Promise { - window.log.info('opening message socket', url); - const fixedScheme = url - .replace('https://', 'wss://') - .replace('http://', 'ws://'); - const login = encodeURIComponent(username); - const pass = encodeURIComponent(password); - const clientVersion = encodeURIComponent(version); - - return _connectSocket( - `${fixedScheme}/v1/websocket/?login=${login}&password=${pass}&agent=OWD&version=${clientVersion}`, - { certificateAuthority, proxyUrl, version } - ); - } - - function getProvisioningSocket(): Promise { - window.log.info('opening provisioning socket', url); - const fixedScheme = url - .replace('https://', 'wss://') - .replace('http://', 'ws://'); - const clientVersion = encodeURIComponent(version); - - return _connectSocket( - `${fixedScheme}/v1/websocket/provisioning/?agent=OWD&version=${clientVersion}`, - { certificateAuthority, proxyUrl, version } - ); + function getProvisioningResource( + handler: IRequestHandler + ): Promise { + return socketManager.getProvisioningResource(handler); } async function getDirectoryAuth(): Promise<{ @@ -2688,7 +2577,7 @@ export function initialize({ const pubKeyBase64 = arrayBufferToBase64(slicedPubKey); // Do request const data = JSON.stringify({ clientPublic: pubKeyBase64 }); - const result: JSONWithDetailsType = await _outerAjax(null, { + const result: JSONWithDetailsType = (await _outerAjax(null, { certificateAuthority, type: 'PUT', contentType: 'application/json; charset=utf-8', @@ -2700,7 +2589,7 @@ export function initialize({ data, timeout: 30000, version, - }); + })) as JSONWithDetailsType; const { data: responseBody, response } = result; @@ -2806,7 +2695,7 @@ export function initialize({ iv: string; data: string; mac: string; - } = await _outerAjax(null, { + } = (await _outerAjax(null, { certificateAuthority, type: 'PUT', headers: cookie @@ -2823,7 +2712,7 @@ export function initialize({ timeout: 30000, data: JSON.stringify(data), version, - }); + })) as any; // Decode discovery request response const decodedDiscoveryResponse: { diff --git a/ts/textsecure/WebsocketResources.ts b/ts/textsecure/WebsocketResources.ts index 59a8bb103..6edfd8c29 100644 --- a/ts/textsecure/WebsocketResources.ts +++ b/ts/textsecure/WebsocketResources.ts @@ -1,7 +1,7 @@ // Copyright 2020 Signal Messenger, LLC // SPDX-License-Identifier: AGPL-3.0-only -/* eslint-disable max-classes-per-file */ +/* eslint-disable max-classes-per-file, no-restricted-syntax */ /* * WebSocket-Resources * @@ -12,12 +12,11 @@ * request.respond(200, 'OK'); * }); * - * client.sendRequest({ + * const { response, status } = await client.sendRequest({ * verb: 'PUT', * path: '/v1/messages', - * body: '{ some: "json" }', - * success: function(message, status, request) {...}, - * error: function(message, status, request) {...} + * headers: ['content-type:application/json'], + * body: Buffer.from('{ some: "json" }'), * }); * * 1. https://github.com/signalapp/WebSocket-Resources @@ -32,13 +31,10 @@ import { dropNull } from '../util/dropNull'; import { isOlderThan } from '../util/timestamp'; import { strictAssert } from '../util/assert'; import { normalizeNumber } from '../util/normalizeNumber'; +import * as Errors from '../types/errors'; import { SignalService as Proto } from '../protobuf'; -type Callback = ( - message: string, - status: number, - request: OutgoingWebSocketRequest -) => void; +const THIRTY_SECONDS = 30 * 1000; export class IncomingWebSocketRequest { private readonly id: Long | number; @@ -53,7 +49,7 @@ export class IncomingWebSocketRequest { constructor( request: Proto.IWebSocketRequestMessage, - private readonly socket: WebSocket + private readonly sendBytes: (bytes: Buffer) => void ) { strictAssert(request.id, 'request without id'); strictAssert(request.verb, 'request without verb'); @@ -64,7 +60,6 @@ export class IncomingWebSocketRequest { this.path = request.path; this.body = dropNull(request.body); this.headers = request.headers || []; - this.socket = socket; } public respond(status: number, message: string): void { @@ -73,47 +68,24 @@ export class IncomingWebSocketRequest { response: { id: this.id, message, status }, }).finish(); - this.socket.sendBytes(Buffer.from(bytes)); + this.sendBytes(Buffer.from(bytes)); } } -export type OutgoingWebSocketRequestOptions = Readonly<{ +export type SendRequestOptions = Readonly<{ verb: string; path: string; body?: Uint8Array; + timeout?: number; headers?: ReadonlyArray; - error?: Callback; - success?: Callback; }>; -export class OutgoingWebSocketRequest { - public readonly error: Callback | undefined; - - public readonly success: Callback | undefined; - - public response: Proto.IWebSocketResponseMessage | undefined; - - constructor( - id: number, - options: OutgoingWebSocketRequestOptions, - socket: WebSocket - ) { - this.error = options.error; - this.success = options.success; - - const bytes = Proto.WebSocketMessage.encode({ - type: Proto.WebSocketMessage.Type.REQUEST, - request: { - verb: options.verb, - path: options.path, - body: options.body, - headers: options.headers ? options.headers.slice() : undefined, - id, - }, - }).finish(); - socket.sendBytes(Buffer.from(bytes)); - } -} +export type SendRequestResult = Readonly<{ + status: number; + message: string; + response?: Uint8Array; + headers: ReadonlyArray; +}>; export type WebSocketResourceOptions = { handleRequest?: (request: IncomingWebSocketRequest) => void; @@ -129,12 +101,21 @@ export class CloseEvent extends Event { export default class WebSocketResource extends EventTarget { private outgoingId = 1; - private closed?: boolean; + private closed = false; - private readonly outgoingMap = new Map(); + private readonly outgoingMap = new Map< + number, + (result: SendRequestResult) => void + >(); private readonly boundOnMessage: (message: IMessage) => void; + private activeRequests = new Set(); + + private shuttingDown = false; + + private shutdownTimer?: NodeJS.Timeout; + // Public for tests public readonly keepalive?: KeepAlive; @@ -158,11 +139,22 @@ export default class WebSocketResource extends EventTarget { keepalive.reset(); socket.on('message', () => keepalive.reset()); socket.on('close', () => keepalive.stop()); + socket.on('error', (error: Error) => { + window.log.warn( + 'WebSocketResource: WebSocket error', + Errors.toLogFormat(error) + ); + }); } - socket.on('close', () => { + socket.on('close', (code, reason) => { this.closed = true; + + window.log.warn('WebSocketResource: Socket closed'); + this.dispatchEvent(new CloseEvent(code, reason || 'normal')); }); + + this.addEventListener('close', () => this.onClose()); } public addEventListener( @@ -174,19 +166,50 @@ export default class WebSocketResource extends EventTarget { return super.addEventListener(name, handler); } - public sendRequest( - options: OutgoingWebSocketRequestOptions - ): OutgoingWebSocketRequest { + public async sendRequest( + options: SendRequestOptions + ): Promise { const id = this.outgoingId; strictAssert(!this.outgoingMap.has(id), 'Duplicate outgoing request'); // eslint-disable-next-line no-bitwise this.outgoingId = Math.max(1, (this.outgoingId + 1) & 0x7fffffff); - const outgoing = new OutgoingWebSocketRequest(id, options, this.socket); - this.outgoingMap.set(id, outgoing); + const bytes = Proto.WebSocketMessage.encode({ + type: Proto.WebSocketMessage.Type.REQUEST, + request: { + verb: options.verb, + path: options.path, + body: options.body, + headers: options.headers ? options.headers.slice() : undefined, + id, + }, + }).finish(); - return outgoing; + strictAssert(!this.shuttingDown, 'Cannot send request, shutting down'); + this.addActive(id); + const promise = new Promise((resolve, reject) => { + let timer = options.timeout + ? setTimeout(() => { + this.removeActive(id); + reject(new Error('Request timed out')); + }, options.timeout) + : undefined; + + this.outgoingMap.set(id, result => { + if (timer !== undefined) { + clearTimeout(timer); + timer = undefined; + } + + this.removeActive(id); + resolve(result); + }); + }); + + this.socket.sendBytes(Buffer.from(bytes)); + + return promise; } public forceKeepAlive(): void { @@ -218,11 +241,37 @@ export default class WebSocketResource extends EventTarget { return; } - window.log.warn('Dispatching our own socket close event'); + window.log.warn( + 'WebSocketResource: Dispatching our own socket close event' + ); this.dispatchEvent(new CloseEvent(code, reason || 'normal')); }, 5000); } + public shutdown(): void { + if (this.closed) { + return; + } + + if (this.activeRequests.size === 0) { + window.log.info('WebSocketResource: no active requests, closing'); + this.close(3000, 'Shutdown'); + return; + } + + this.shuttingDown = true; + + window.log.info('WebSocketResource: shutting down'); + this.shutdownTimer = setTimeout(() => { + if (this.closed) { + return; + } + + window.log.warn('WebSocketResource: Failed to shutdown gracefully'); + this.close(3000, 'Shutdown'); + }, THIRTY_SECONDS); + } + private onMessage({ type, binaryData }: IMessage): void { if (type !== 'binary' || !binaryData) { throw new Error(`Unsupported websocket message type: ${type}`); @@ -236,7 +285,23 @@ export default class WebSocketResource extends EventTarget { const handleRequest = this.options.handleRequest || (request => request.respond(404, 'Not found')); - handleRequest(new IncomingWebSocketRequest(message.request, this.socket)); + + const incomingRequest = new IncomingWebSocketRequest( + message.request, + (bytes: Buffer): void => { + this.removeActive(incomingRequest); + + this.socket.sendBytes(bytes); + } + ); + + if (this.shuttingDown) { + incomingRequest.respond(500, 'Shutting down'); + return; + } + + this.addActive(incomingRequest); + handleRequest(incomingRequest); } else if ( message.type === Proto.WebSocketMessage.Type.RESPONSE && message.response @@ -245,27 +310,62 @@ export default class WebSocketResource extends EventTarget { strictAssert(response.id, 'response without id'); const responseId = normalizeNumber(response.id); - const request = this.outgoingMap.get(responseId); + const resolve = this.outgoingMap.get(responseId); this.outgoingMap.delete(responseId); - if (!request) { + if (!resolve) { throw new Error(`Received response for unknown request ${responseId}`); } - request.response = dropNull(response); - - let callback = request.error; - - const status = response.status ?? -1; - if (status >= 200 && status < 300) { - callback = request.success; - } - - if (typeof callback === 'function') { - callback(response.message ?? '', status, request); - } + resolve({ + status: response.status ?? -1, + message: response.message ?? '', + response: dropNull(response.body), + headers: response.headers ?? [], + }); } } + + private onClose(): void { + const outgoing = new Map(this.outgoingMap); + this.outgoingMap.clear(); + + for (const resolve of outgoing.values()) { + resolve({ + status: 500, + message: 'Connection closed', + response: undefined, + headers: [], + }); + } + } + + private addActive(request: IncomingWebSocketRequest | number): void { + this.activeRequests.add(request); + } + + private removeActive(request: IncomingWebSocketRequest | number): void { + if (!this.activeRequests.has(request)) { + window.log.warn('WebSocketResource: removing unknown request'); + return; + } + + this.activeRequests.delete(request); + if (this.activeRequests.size !== 0) { + return; + } + if (!this.shuttingDown) { + return; + } + + if (this.shutdownTimer) { + clearTimeout(this.shutdownTimer); + this.shutdownTimer = undefined; + } + + window.log.info('WebSocketResource: shutdown complete'); + this.close(3000, 'Shutdown'); + } } export type KeepAliveOptionsType = { @@ -307,7 +407,7 @@ class KeepAlive { this.clearTimers(); } - public send(): void { + public async send(): Promise { this.clearTimers(); if (isOlderThan(this.lastAliveAt, MAX_KEEPALIVE_INTERVAL_MS)) { @@ -332,11 +432,14 @@ class KeepAlive { } window.log.info('WebSocketResources: Sending a keepalive message'); - this.wsr.sendRequest({ + const { status } = await this.wsr.sendRequest({ verb: 'GET', path: this.path, - success: this.reset.bind(this), }); + + if (status >= 200 || status < 300) { + this.reset(); + } } public reset(): void { diff --git a/ts/textsecure/downloadAttachment.ts b/ts/textsecure/downloadAttachment.ts new file mode 100644 index 000000000..5b56de3c5 --- /dev/null +++ b/ts/textsecure/downloadAttachment.ts @@ -0,0 +1,61 @@ +// Copyright 2020-2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { isNumber, omit } from 'lodash'; + +import { strictAssert } from '../util/assert'; +import { dropNull } from '../util/dropNull'; +import { DownloadedAttachmentType } from '../types/Attachment'; +import * as MIME from '../types/MIME'; +import * as Bytes from '../Bytes'; +import { typedArrayToArrayBuffer } from '../Crypto'; + +import Crypto from './Crypto'; +import { ProcessedAttachment } from './Types.d'; +import type { WebAPIType } from './WebAPI'; + +export async function downloadAttachment( + server: WebAPIType, + attachment: ProcessedAttachment +): Promise { + const cdnId = attachment.cdnId || attachment.cdnKey; + const { cdnNumber } = attachment; + + if (!cdnId) { + throw new Error('downloadAttachment: Attachment was missing cdnId!'); + } + + strictAssert(cdnId, 'attachment without cdnId'); + const encrypted = await server.getAttachment(cdnId, dropNull(cdnNumber)); + const { key, digest, size, contentType } = attachment; + + if (!digest) { + throw new Error('Failure: Ask sender to update Signal and resend.'); + } + + strictAssert(key, 'attachment has no key'); + strictAssert(digest, 'attachment has no digest'); + + const paddedData = await Crypto.decryptAttachment( + encrypted, + typedArrayToArrayBuffer(Bytes.fromBase64(key)), + typedArrayToArrayBuffer(Bytes.fromBase64(digest)) + ); + + if (!isNumber(size)) { + throw new Error( + `downloadAttachment: Size was not provided, actual size was ${paddedData.byteLength}` + ); + } + + const data = window.Signal.Crypto.getFirstBytes(paddedData, size); + + return { + ...omit(attachment, 'digest', 'key'), + + contentType: contentType + ? MIME.fromString(contentType) + : MIME.APPLICATION_OCTET_STREAM, + data, + }; +} diff --git a/ts/textsecure/index.ts b/ts/textsecure/index.ts index 09214fbad..08d110304 100644 --- a/ts/textsecure/index.ts +++ b/ts/textsecure/index.ts @@ -7,7 +7,6 @@ import MessageReceiver from './MessageReceiver'; import utils from './Helpers'; import Crypto from './Crypto'; import { ContactBuffer, GroupBuffer } from './ContactsParser'; -import createTaskWithTimeout from './TaskWithTimeout'; import SyncRequest from './SyncRequest'; import MessageSender from './SendMessage'; import StringView from './StringView'; @@ -16,7 +15,6 @@ import * as WebAPI from './WebAPI'; import WebSocketResource from './WebsocketResources'; export const textsecure = { - createTaskWithTimeout, crypto: Crypto, utils, storage: new Storage(), diff --git a/ts/textsecure/storage/User.ts b/ts/textsecure/storage/User.ts index 7258e2252..23f71568e 100644 --- a/ts/textsecure/storage/User.ts +++ b/ts/textsecure/storage/User.ts @@ -81,12 +81,16 @@ export class User extends EventEmitter { ? this.storage.put('device_name', deviceName) : Promise.resolve(), ]); + } - window.log.info('storage.user: credentials changed'); + public emitCredentialsChanged(reason: string): void { + window.log.info(`storage.user: credentials changed, ${reason}`); this.emit('credentialsChange'); } public async removeCredentials(): Promise { + window.log.info('storage.user: removeCredentials'); + await Promise.all([ this.storage.remove('number_id'), this.storage.remove('uuid_id'), diff --git a/ts/util/AbortableProcess.ts b/ts/util/AbortableProcess.ts new file mode 100644 index 000000000..417c6ef22 --- /dev/null +++ b/ts/util/AbortableProcess.ts @@ -0,0 +1,38 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +/* eslint-disable no-restricted-syntax */ + +import { explodePromise } from './explodePromise'; + +export interface IController { + abort(): void; +} + +export class AbortableProcess implements IController { + private abortReject: (error: Error) => void; + + public readonly resultPromise: Promise; + + constructor( + private readonly name: string, + private readonly controller: IController, + resultPromise: Promise + ) { + const { + promise: abortPromise, + reject: abortReject, + } = explodePromise(); + + this.abortReject = abortReject; + this.resultPromise = Promise.race([abortPromise, resultPromise]); + } + + public abort(): void { + this.controller.abort(); + this.abortReject(new Error(`Process "${this.name}" was aborted`)); + } + + public getResult(): Promise { + return this.resultPromise; + } +} diff --git a/ts/util/downloadAttachment.ts b/ts/util/downloadAttachment.ts index e3ef255f8..81c839a73 100644 --- a/ts/util/downloadAttachment.ts +++ b/ts/util/downloadAttachment.ts @@ -2,6 +2,7 @@ // SPDX-License-Identifier: AGPL-3.0-only import { AttachmentType, DownloadedAttachmentType } from '../types/Attachment'; +import { downloadAttachment as doDownloadAttachment } from '../textsecure/downloadAttachment'; export async function downloadAttachment( attachmentData: AttachmentType @@ -20,7 +21,8 @@ export async function downloadAttachment( let downloaded; try { - downloaded = await window.textsecure.messageReceiver.downloadAttachment( + downloaded = await doDownloadAttachment( + window.textsecure.server, migratedAttachment ); } catch (error) { diff --git a/ts/util/explodePromise.ts b/ts/util/explodePromise.ts new file mode 100644 index 000000000..54867519f --- /dev/null +++ b/ts/util/explodePromise.ts @@ -0,0 +1,25 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export function explodePromise(): { + promise: Promise; + resolve: (value: T) => void; + reject: (error: Error) => void; +} { + let resolve: (value: T) => void; + let reject: (error: Error) => void; + + const promise = new Promise((innerResolve, innerReject) => { + resolve = innerResolve; + reject = innerReject; + }); + + return { + promise, + // Typescript thinks that resolve and reject can be undefined here. + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + resolve: resolve!, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + reject: reject!, + }; +} diff --git a/ts/util/waitBatcher.ts b/ts/util/waitBatcher.ts index a95e495fd..365540015 100644 --- a/ts/util/waitBatcher.ts +++ b/ts/util/waitBatcher.ts @@ -19,10 +19,12 @@ declare global { window.waitBatchers = []; window.flushAllWaitBatchers = async () => { + window.log.info('waitBatcher#flushAllWaitBatchers'); await Promise.all(window.waitBatchers.map(item => item.flushAndWait())); }; window.waitForAllWaitBatchers = async () => { + window.log.info('waitBatcher#waitForAllWaitBatchers'); await Promise.all(window.waitBatchers.map(item => item.onIdle())); }; diff --git a/ts/window.d.ts b/ts/window.d.ts index 6a0e0c154..6ad3be0fb 100644 --- a/ts/window.d.ts +++ b/ts/window.d.ts @@ -483,7 +483,7 @@ declare global { getServerTrustRoot: () => WhatIsThis; readyForUpdates: () => void; logAppLoadedEvent: (options: { processedCount?: number }) => void; - logMessageReceiverConnect: () => void; + logAuthenticatedConnect: () => void; // Runtime Flags isShowingModal?: boolean;