diff --git a/ts/SignalProtocolStore.ts b/ts/SignalProtocolStore.ts index 3fde993d4..62ebdf11e 100644 --- a/ts/SignalProtocolStore.ts +++ b/ts/SignalProtocolStore.ts @@ -241,6 +241,8 @@ export class SignalProtocolStore extends EventEmitter { sessionQueues = new Map(); + sessionQueueJobCounter = 0; + private readonly identityQueues = new Map(); private currentZone?: Zone; @@ -703,13 +705,31 @@ export class SignalProtocolStore extends EventEmitter { async enqueueSessionJob( qualifiedAddress: QualifiedAddress, + name: string, task: () => Promise, zone: Zone = GLOBAL_ZONE ): Promise { + this.sessionQueueJobCounter += 1; + const id = this.sessionQueueJobCounter; + + const waitStart = Date.now(); + return this.withZone(zone, 'enqueueSessionJob', async () => { const queue = this._getSessionQueue(qualifiedAddress); - return queue.add(task); + const waitTime = Date.now() - waitStart; + log.info( + `enqueueSessionJob(${id}): queuing task ${name}, waited ${waitTime}ms` + ); + const queueStart = Date.now(); + + return queue.add(() => { + const queueTime = Date.now() - queueStart; + log.info( + `enqueueSessionJob(${id}): running task ${name}, waited ${queueTime}ms` + ); + return task(); + }); }); } @@ -1322,6 +1342,7 @@ export class SignalProtocolStore extends EventEmitter { await this.enqueueSessionJob( addr, + `_archiveSession(${addr.toString()})`, async () => { const item = entry.hydrated ? entry.item diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 1f0d65414..7c2e8918f 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -3279,7 +3279,7 @@ async function getTapToViewMessagesNeedingErase(): Promise> { return rows.map(row => jsonToObject(row.json)); } -const MAX_UNPROCESSED_ATTEMPTS = 3; +const MAX_UNPROCESSED_ATTEMPTS = 10; function saveUnprocessedSync(data: UnprocessedType): string { const db = getInstance(); diff --git a/ts/test-electron/SignalProtocolStore_test.ts b/ts/test-electron/SignalProtocolStore_test.ts index 7a6455002..5d46ca4bd 100644 --- a/ts/test-electron/SignalProtocolStore_test.ts +++ b/ts/test-electron/SignalProtocolStore_test.ts @@ -1542,7 +1542,7 @@ describe('SignalProtocolStore', () => { id: '1-one', version: 2, - attempts: 3, + attempts: 10, envelope: 'first', receivedAtCounter: 0, timestamp: NOW + 1, diff --git a/ts/test-electron/WebsocketResources_test.ts b/ts/test-electron/WebsocketResources_test.ts index 388f9b3ee..fbc54cdf6 100644 --- a/ts/test-electron/WebsocketResources_test.ts +++ b/ts/test-electron/WebsocketResources_test.ts @@ -21,7 +21,9 @@ import WebSocketResource from '../textsecure/WebsocketResources'; describe('WebSocket-Resource', () => { class FakeSocket extends EventEmitter { public sendBytes(_: Uint8Array) {} - + public socket = { + localPort: 5678, + }; public close() {} } diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index ff5af4a6b..9841ee27f 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -47,7 +47,7 @@ import { normalizeUuid } from '../util/normalizeUuid'; import { parseIntOrThrow } from '../util/parseIntOrThrow'; import { clearTimeoutIfNecessary } from '../util/clearTimeoutIfNecessary'; import { Zone } from '../util/Zone'; -import { DurationInSeconds } from '../util/durations'; +import { DurationInSeconds, SECOND } from '../util/durations'; import { bytesToUuid } from '../Crypto'; import type { DownloadedAttachmentType } from '../types/Attachment'; import { Address } from '../types/Address'; @@ -1752,6 +1752,7 @@ export default class MessageReceiver ); const unsealedPlaintext = await this.storage.protocol.enqueueSessionJob( address, + `sealedSenderDecryptMessage(${address.toString()})`, () => sealedSenderDecryptMessage( Buffer.from(ciphertext), @@ -1839,6 +1840,7 @@ export default class MessageReceiver const plaintext = await this.storage.protocol.enqueueSessionJob( address, + `signalDecrypt(${address.toString()})`, async () => this.unpad( await signalDecrypt( @@ -1870,6 +1872,7 @@ export default class MessageReceiver const plaintext = await this.storage.protocol.enqueueSessionJob( address, + `signalDecryptPreKey(${address.toString()})`, async () => this.unpad( await signalDecryptPreKey( @@ -3397,7 +3400,10 @@ export default class MessageReceiver this.removeFromCache(envelope); - const attachmentPointer = await this.handleAttachment(blob); + const attachmentPointer = await this.handleAttachment(blob, { + disableRetries: true, + timeout: 90 * SECOND, + }); const contactBuffer = new ContactBuffer(attachmentPointer.data); const contactSync = new ContactSyncEvent( @@ -3430,7 +3436,10 @@ export default class MessageReceiver // Note: we do not return here because we don't want to block the next message on // this attachment download and a lot of processing of that attachment. - const attachmentPointer = await this.handleAttachment(blob); + const attachmentPointer = await this.handleAttachment(blob, { + disableRetries: true, + timeout: 90 * SECOND, + }); const groupBuffer = new GroupBuffer(attachmentPointer.data); let groupDetails = groupBuffer.next(); const promises = []; @@ -3550,10 +3559,11 @@ export default class MessageReceiver } private async handleAttachment( - attachment: Proto.IAttachmentPointer + attachment: Proto.IAttachmentPointer, + options?: { timeout?: number; disableRetries?: boolean } ): Promise { const cleaned = processAttachment(attachment); - return downloadAttachment(this.server, cleaned); + return downloadAttachment(this.server, cleaned, options); } private async handleEndSession( diff --git a/ts/textsecure/OutgoingMessage.ts b/ts/textsecure/OutgoingMessage.ts index a84b5dbd0..9a41dae63 100644 --- a/ts/textsecure/OutgoingMessage.ts +++ b/ts/textsecure/OutgoingMessage.ts @@ -440,6 +440,7 @@ export default class OutgoingMessage { return window.textsecure.storage.protocol.enqueueSessionJob( address, + `doSendMessage(${address.toString()})`, async () => { const protocolAddress = ProtocolAddress.new( theirUuid.toString(), diff --git a/ts/textsecure/SocketManager.ts b/ts/textsecure/SocketManager.ts index 61bf1bd4e..92df88ddb 100644 --- a/ts/textsecure/SocketManager.ts +++ b/ts/textsecure/SocketManager.ts @@ -225,7 +225,9 @@ export class SocketManager extends EventListener { return; } - log.info('SocketManager: connected authenticated socket'); + log.info( + `SocketManager: connected authenticated socket (localPort: ${authenticated.localPort})` + ); window.logAuthenticatedConnect?.(); this.backOff.reset(); @@ -503,7 +505,9 @@ export class SocketManager extends EventListener { throw error; } - log.info('SocketManager: connected unauthenticated socket'); + log.info( + `SocketManager: connected unauthenticated socket (localPort: ${unauthenticated.localPort})` + ); unauthenticated.addEventListener('close', ({ code, reason }): void => { if (this.unauthenticated !== process) { diff --git a/ts/textsecure/WebAPI.ts b/ts/textsecure/WebAPI.ts index d29662560..ccbbbda98 100644 --- a/ts/textsecure/WebAPI.ts +++ b/ts/textsecure/WebAPI.ts @@ -130,6 +130,7 @@ type PromiseAjaxOptionsType = { certificateAuthority?: string; contentType?: string; data?: Uint8Array | string; + disableRetries?: boolean; disableSessionResumption?: boolean; headers?: HeaderListType; host?: string; @@ -451,6 +452,10 @@ async function _outerAjax( ): Promise { options.stack = new Error().stack; // just in case, save stack here. + if (options.disableRetries) { + return _promiseAjax(url, options); + } + return _retryAjax(url, options); } @@ -868,7 +873,14 @@ export type WebAPIType = { imageFiles: Array ) => Promise>; getArtAuth: () => Promise; - getAttachment: (cdnKey: string, cdnNumber?: number) => Promise; + getAttachment: ( + cdnKey: string, + cdnNumber?: number, + options?: { + disableRetries?: boolean; + timeout?: number; + } + ) => Promise; getAvatar: (path: string) => Promise; getDevices: () => Promise; getHasSubscription: (subscriberId: Uint8Array) => Promise; @@ -2473,7 +2485,14 @@ export function initialize({ return packId; } - async function getAttachment(cdnKey: string, cdnNumber?: number) { + async function getAttachment( + cdnKey: string, + cdnNumber?: number, + options?: { + disableRetries?: boolean; + timeout?: number; + } + ) { const abortController = new AbortController(); const cdnUrl = isNumber(cdnNumber) @@ -2482,9 +2501,10 @@ export function initialize({ // This is going to the CDN, not the service, so we use _outerAjax const stream = await _outerAjax(`${cdnUrl}/attachments/${cdnKey}`, { certificateAuthority, + disableRetries: options?.disableRetries, proxyUrl, responseType: 'stream', - timeout: 0, + timeout: options?.timeout || 0, type: 'GET', redactUrl: _createRedactor(cdnKey), version, diff --git a/ts/textsecure/WebsocketResources.ts b/ts/textsecure/WebsocketResources.ts index 25b333611..b22046b98 100644 --- a/ts/textsecure/WebsocketResources.ts +++ b/ts/textsecure/WebsocketResources.ts @@ -126,6 +126,8 @@ export default class WebSocketResource extends EventTarget { private readonly logId: string; + public readonly localPort: number | undefined; + // Public for tests public readonly keepalive?: KeepAlive; @@ -136,6 +138,7 @@ export default class WebSocketResource extends EventTarget { super(); this.logId = `WebSocketResource(${options.name})`; + this.localPort = socket.socket.localPort; this.boundOnMessage = this.onMessage.bind(this); diff --git a/ts/textsecure/downloadAttachment.ts b/ts/textsecure/downloadAttachment.ts index 68ce665c0..dcfec3d26 100644 --- a/ts/textsecure/downloadAttachment.ts +++ b/ts/textsecure/downloadAttachment.ts @@ -15,7 +15,11 @@ import type { WebAPIType } from './WebAPI'; export async function downloadAttachment( server: WebAPIType, - attachment: ProcessedAttachment + attachment: ProcessedAttachment, + options?: { + disableRetries?: boolean; + timeout?: number; + } ): Promise { const cdnId = attachment.cdnId || attachment.cdnKey; const { cdnNumber } = attachment; @@ -25,7 +29,11 @@ export async function downloadAttachment( } strictAssert(cdnId, 'attachment without cdnId'); - const encrypted = await server.getAttachment(cdnId, dropNull(cdnNumber)); + const encrypted = await server.getAttachment( + cdnId, + dropNull(cdnNumber), + options + ); const { key, digest, size, contentType } = attachment; if (!digest) { diff --git a/ts/textsecure/getKeysForIdentifier.ts b/ts/textsecure/getKeysForIdentifier.ts index 5a6608f12..0f2c1d757 100644 --- a/ts/textsecure/getKeysForIdentifier.ts +++ b/ts/textsecure/getKeysForIdentifier.ts @@ -154,6 +154,7 @@ async function handleServerKeys( try { await window.textsecure.storage.protocol.enqueueSessionJob( address, + `handleServerKeys(${identifier})`, () => processPreKeyBundle( preKeyBundle,