diff --git a/js/read_syncs.js b/js/read_syncs.js index 9b6f05eef..c90063c65 100644 --- a/js/read_syncs.js +++ b/js/read_syncs.js @@ -69,12 +69,26 @@ if (message.isUnread()) { await message.markRead(readAt, { skipSave: true }); - // onReadMessage may result in messages older than this one being - // marked read. We want those messages to have the same expire timer - // start time as this one, so we pass the readAt value through. - const conversation = message.getConversation(); - if (conversation) { - conversation.onReadMessage(message, readAt); + const updateConversation = () => { + // onReadMessage may result in messages older than this one being + // marked read. We want those messages to have the same expire timer + // start time as this one, so we pass the readAt value through. + const conversation = message.getConversation(); + if (conversation) { + conversation.onReadMessage(message, readAt); + } + }; + + if (window.startupProcessingQueue) { + const conversation = message.getConversation(); + if (conversation) { + window.startupProcessingQueue.add( + conversation.get('id'), + updateConversation + ); + } + } else { + updateConversation(); } } else { const now = Date.now(); diff --git a/ts/background.ts b/ts/background.ts index fefd8657b..b827ab48d 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -9,6 +9,7 @@ import { isWindowDragElement } from './util/isWindowDragElement'; import { assert } from './util/assert'; export async function startApp(): Promise { + window.startupProcessingQueue = new window.Signal.Util.StartupQueue(); window.attachmentDownloadQueue = []; try { window.log.info('Initializing SQL in renderer'); @@ -2061,13 +2062,18 @@ export async function startApp(): Promise { clearInterval(interval!); interval = null; view.onEmpty(); + window.logAppLoadedEvent(); - window.log.info( - 'App loaded - messages:', - messageReceiver.getProcessedCount() - ); + if (messageReceiver) { + window.log.info( + 'App loaded - messages:', + messageReceiver.getProcessedCount() + ); + } + window.sqlInitializer.goBackToMainProcess(); window.Signal.Util.setBatchingStrategy(false); + const attachmentDownloadQueue = window.attachmentDownloadQueue || []; const THREE_DAYS_AGO = Date.now() - 3600 * 72 * 1000; const MAX_ATTACHMENT_MSGS_TO_DOWNLOAD = 250; @@ -2081,7 +2087,12 @@ export async function startApp(): Promise { attachmentsToDownload.length, attachmentDownloadQueue.length ); - window.attachmentDownloadQueue = undefined; + + if (window.startupProcessingQueue) { + window.startupProcessingQueue.flush(); + window.startupProcessingQueue = undefined; + } + const messagesWithDownloads = await Promise.all( attachmentsToDownload.map(message => message.queueAttachmentDownloads() diff --git a/ts/models/messages.ts b/ts/models/messages.ts index 6a8eed3a3..471b8626f 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -3532,21 +3532,6 @@ export class MessageModel extends window.Backbone.Model { return; } - if (type === 'outgoing') { - const receipts = window.Whisper.DeliveryReceipts.forMessage( - conversation, - message - ); - receipts.forEach(receipt => - message.set({ - delivered: (message.get('delivered') || 0) + 1, - delivered_to: _.union(message.get('delivered_to') || [], [ - receipt.get('deliveredTo'), - ]), - }) - ); - } - attributes.active_at = now; conversation.set(attributes); @@ -3608,59 +3593,6 @@ export class MessageModel extends window.Backbone.Model { } } - if (type === 'incoming') { - const readSync = window.Whisper.ReadSyncs.forMessage(message); - if (readSync) { - if ( - message.get('expireTimer') && - !message.get('expirationStartTimestamp') - ) { - message.set( - 'expirationStartTimestamp', - Math.min(readSync.get('read_at'), Date.now()) - ); - } - } - if (readSync || message.isExpirationTimerUpdate()) { - message.unset('unread'); - // This is primarily to allow the conversation to mark all older - // messages as read, as is done when we receive a read sync for - // a message we already know about. - const c = message.getConversation(); - if (c) { - c.onReadMessage(message); - } - } else { - conversation.set({ - unreadCount: (conversation.get('unreadCount') || 0) + 1, - isArchived: false, - }); - } - } - - if (type === 'outgoing') { - const reads = window.Whisper.ReadReceipts.forMessage( - conversation, - message - ); - if (reads.length) { - const readBy = reads.map(receipt => receipt.get('reader')); - message.set({ - read_by: _.union(message.get('read_by'), readBy), - }); - } - - // A sync'd message to ourself is automatically considered read/delivered - if (conversation.isMe()) { - message.set({ - read_by: conversation.getRecipients(), - delivered_to: conversation.getRecipients(), - }); - } - - message.set({ recipients: conversation.getRecipients() }); - } - if (dataMessage.profileKey) { const profileKey = dataMessage.profileKey.toString('base64'); if ( @@ -3699,13 +3631,6 @@ export class MessageModel extends window.Backbone.Model { }); await message.eraseContents(); } - // Check for out-of-order view syncs - if (type === 'incoming' && message.isTapToView()) { - const viewSync = window.Whisper.ViewSyncs.forMessage(message); - if (viewSync) { - await message.markViewed({ fromSync: true }); - } - } } const conversationTimestamp = conversation.get('timestamp'); @@ -3750,26 +3675,13 @@ export class MessageModel extends window.Backbone.Model { } } - // Does this message have any pending, previously-received associated reactions? - const reactions = window.Whisper.Reactions.forMessage(message); - await Promise.all( - reactions.map(reaction => message.handleReaction(reaction, false)) - ); - - // Does this message have any pending, previously-received associated - // delete for everyone messages? - const deletes = window.Whisper.Deletes.forMessage(message); - await Promise.all( - deletes.map(del => - window.Signal.Util.deleteForEveryone(message, del, false) - ) - ); + await this.modifyTargetMessage(conversation, isGroupV2); window.log.info( 'handleDataMessage: Batching save for', message.get('sent_at') ); - this.saveAndNotify(conversation, confirm); + this.saveAndNotify(conversation, isGroupV2, confirm); } catch (error) { const errorForLog = error && error.stack ? error.stack : error; window.log.error( @@ -3785,6 +3697,7 @@ export class MessageModel extends window.Backbone.Model { async saveAndNotify( conversation: ConversationModel, + isGroupV2: boolean, confirm: () => void ): Promise { await window.Signal.Util.saveNewMessageBatcher.add(this.attributes); @@ -3793,6 +3706,8 @@ export class MessageModel extends window.Backbone.Model { conversation.trigger('newmessage', this); + await this.modifyTargetMessage(conversation, isGroupV2); + if (this.get('unread')) { await conversation.notify(this); } @@ -3806,6 +3721,108 @@ export class MessageModel extends window.Backbone.Model { confirm(); } + async modifyTargetMessage( + conversation: ConversationModel, + isGroupV2: boolean + ): Promise { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const message = this; + const type = message.get('type'); + + if (type === 'outgoing') { + const receipts = window.Whisper.DeliveryReceipts.forMessage( + conversation, + message + ); + receipts.forEach(receipt => + message.set({ + delivered: (message.get('delivered') || 0) + 1, + delivered_to: _.union(message.get('delivered_to') || [], [ + receipt.get('deliveredTo'), + ]), + }) + ); + } + + if (!isGroupV2) { + if (type === 'incoming') { + const readSync = window.Whisper.ReadSyncs.forMessage(message); + if (readSync) { + if ( + message.get('expireTimer') && + !message.get('expirationStartTimestamp') + ) { + message.set( + 'expirationStartTimestamp', + Math.min(readSync.get('read_at'), Date.now()) + ); + } + } + if (readSync || message.isExpirationTimerUpdate()) { + message.unset('unread'); + // This is primarily to allow the conversation to mark all older + // messages as read, as is done when we receive a read sync for + // a message we already know about. + const c = message.getConversation(); + if (c) { + c.onReadMessage(message); + } + } else { + conversation.set({ + unreadCount: (conversation.get('unreadCount') || 0) + 1, + isArchived: false, + }); + } + } + + if (type === 'outgoing') { + const reads = window.Whisper.ReadReceipts.forMessage( + conversation, + message + ); + if (reads.length) { + const readBy = reads.map(receipt => receipt.get('reader')); + message.set({ + read_by: _.union(message.get('read_by'), readBy), + }); + } + + // A sync'd message to ourself is automatically considered read/delivered + if (conversation.isMe()) { + message.set({ + read_by: conversation.getRecipients(), + delivered_to: conversation.getRecipients(), + }); + } + + message.set({ recipients: conversation.getRecipients() }); + } + + // Check for out-of-order view syncs + if (type === 'incoming' && message.isTapToView()) { + const viewSync = window.Whisper.ViewSyncs.forMessage(message); + if (viewSync) { + await message.markViewed({ fromSync: true }); + } + } + } + + // Does this message have any pending, previously-received associated reactions? + const reactions = window.Whisper.Reactions.forMessage(message); + await Promise.all( + reactions.map(reaction => message.handleReaction(reaction, false)) + ); + + // Does this message have any pending, previously-received associated + // delete for everyone messages? + const deletes = window.Whisper.Deletes.forMessage(message); + await Promise.all( + deletes.map(del => + window.Signal.Util.deleteForEveryone(message, del, false) + ) + ); + } + async handleReaction( reaction: typeof window.WhatIsThis, shouldPersist = true diff --git a/ts/textsecure/MessageReceiver.ts b/ts/textsecure/MessageReceiver.ts index 344ea1dc8..a544696b8 100644 --- a/ts/textsecure/MessageReceiver.ts +++ b/ts/textsecure/MessageReceiver.ts @@ -441,6 +441,7 @@ class MessageReceiverInner extends EventTarget { ); this.cacheAndHandle(envelope, plaintext, request); + this.processedCount += 1; } catch (e) { request.respond(500, 'Bad encrypted websocket message'); window.log.error( @@ -787,7 +788,6 @@ class MessageReceiverInner extends EventTarget { removeFromCache(envelope: EnvelopeClass) { const { id } = envelope; this.cacheRemoveBatcher.add(id); - this.processedCount += 1; } // Same as handleEnvelope, just without the decryption step. Necessary for handling diff --git a/ts/util/StartupQueue.ts b/ts/util/StartupQueue.ts new file mode 100644 index 000000000..3c61ce3c4 --- /dev/null +++ b/ts/util/StartupQueue.ts @@ -0,0 +1,30 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export class StartupQueue { + set: Set; + + items: Array<() => void>; + + constructor() { + this.set = new Set(); + this.items = []; + } + + add(id: string, f: () => void): void { + if (this.set.has(id)) { + return; + } + + this.items.push(f); + this.set.add(id); + } + + flush(): void { + const { items } = this; + window.log.info('StartupQueue: Processing', items.length, 'actions'); + items.forEach(f => f()); + this.items = []; + this.set.clear(); + } +} diff --git a/ts/util/index.ts b/ts/util/index.ts index 23801bd0d..ca775d6c6 100644 --- a/ts/util/index.ts +++ b/ts/util/index.ts @@ -33,10 +33,12 @@ import { sessionStructureToArrayBuffer, } from './sessionTranslation'; import * as zkgroup from './zkgroup'; +import { StartupQueue } from './StartupQueue'; export { GoogleChrome, Registration, + StartupQueue, arrayBufferToObjectURL, combineNames, createBatcher, diff --git a/ts/window.d.ts b/ts/window.d.ts index 8e7704f72..096942f19 100644 --- a/ts/window.d.ts +++ b/ts/window.d.ts @@ -94,6 +94,7 @@ import { StagedLinkPreview } from './components/conversation/StagedLinkPreview'; import { MIMEType } from './types/MIME'; import { ElectronLocaleType } from './util/mapToSupportLocale'; import { SignalProtocolStore } from './LibSignalStore'; +import { StartupQueue } from './util/StartupQueue'; export { Long } from 'long'; @@ -138,6 +139,7 @@ declare global { WhatIsThis: WhatIsThis; attachmentDownloadQueue: Array | undefined; + startupProcessingQueue: StartupQueue | undefined; baseAttachmentsPath: string; baseStickersPath: string; baseTempPath: string;