diff --git a/ts/background.ts b/ts/background.ts index 231b11a94..9a829f5c7 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -850,7 +850,8 @@ export async function startApp(): Promise { `Starting background data migration. Target version: ${Message.CURRENT_SCHEMA_VERSION}` ); idleDetector.on('idle', async () => { - const NUM_MESSAGES_PER_BATCH = 1; + const NUM_MESSAGES_PER_BATCH = 100; + const BATCH_DELAY = 5 * durations.SECOND; if (!isMigrationWithIndexComplete) { const batchWithIndex = await migrateMessageData({ @@ -858,15 +859,22 @@ export async function startApp(): Promise { upgradeMessageSchema, getMessagesNeedingUpgrade: window.Signal.Data.getMessagesNeedingUpgrade, - saveMessage: window.Signal.Data.saveMessage, + saveMessages: window.Signal.Data.saveMessages, }); log.info('Upgrade message schema (with index):', batchWithIndex); isMigrationWithIndexComplete = batchWithIndex.done; } + idleDetector.stop(); + if (isMigrationWithIndexComplete) { log.info('Background migration complete. Stopping idle detector.'); - idleDetector.stop(); + } else { + log.info('Background migration not complete. Pausing idle detector.'); + + setTimeout(() => { + idleDetector.start(); + }, BATCH_DELAY); } }); diff --git a/ts/messages/migrateMessageData.ts b/ts/messages/migrateMessageData.ts index d20470252..3d1ba4fe8 100644 --- a/ts/messages/migrateMessageData.ts +++ b/ts/messages/migrateMessageData.ts @@ -2,9 +2,15 @@ // SPDX-License-Identifier: AGPL-3.0-only import { isFunction, isNumber } from 'lodash'; +import pMap from 'p-map'; + import { CURRENT_SCHEMA_VERSION } from '../types/Message2'; +import { isNotNil } from '../util/isNotNil'; import type { MessageAttributesType } from '../model-types.d'; import type { UUIDStringType } from '../types/UUID'; +import * as Errors from '../types/errors'; + +const MAX_CONCURRENCY = 5; /** * Ensures that messages in database are at the right schema. @@ -13,7 +19,7 @@ export async function migrateMessageData({ numMessagesPerBatch, upgradeMessageSchema, getMessagesNeedingUpgrade, - saveMessage, + saveMessages, maxVersion = CURRENT_SCHEMA_VERSION, }: Readonly<{ numMessagesPerBatch: number; @@ -25,10 +31,10 @@ export async function migrateMessageData({ limit: number, options: { maxVersion: number } ) => Promise>; - saveMessage: ( - data: MessageAttributesType, + saveMessages: ( + data: ReadonlyArray, options: { ourUuid: UUIDStringType } - ) => Promise; + ) => Promise; maxVersion?: number; }>): Promise< | { @@ -63,8 +69,8 @@ export async function migrateMessageData({ ); } catch (error) { window.SignalContext.log.error( - 'processNext error:', - error && error.stack ? error.stack : error + 'migrateMessageData.getMessagesNeedingUpgrade error:', + Errors.toLogFormat(error) ); return { done: true, @@ -74,20 +80,41 @@ export async function migrateMessageData({ const fetchDuration = Date.now() - fetchStartTime; const upgradeStartTime = Date.now(); - const upgradedMessages = await Promise.all( - messagesRequiringSchemaUpgrade.map(message => - upgradeMessageSchema(message, { maxVersion }) + const failedMessages = new Array(); + const upgradedMessages = ( + await pMap( + messagesRequiringSchemaUpgrade, + async message => { + try { + return await upgradeMessageSchema(message, { maxVersion }); + } catch (error) { + window.SignalContext.log.error( + 'migrateMessageData.upgradeMessageSchema error:', + Errors.toLogFormat(error) + ); + failedMessages.push(message); + return undefined; + } + }, + { concurrency: MAX_CONCURRENCY } ) - ); + ).filter(isNotNil); const upgradeDuration = Date.now() - upgradeStartTime; const saveStartTime = Date.now(); - await Promise.all( - upgradedMessages.map(message => - saveMessage(message, { - ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), - }) - ) + + const ourUuid = window.textsecure.storage.user.getCheckedUuid().toString(); + await saveMessages( + [ + ...upgradedMessages, + + // Increment migration attempts + ...failedMessages.map(message => ({ + ...message, + schemaMigrationAttempts: (message.schemaMigrationAttempts ?? 0) + 1, + })), + ], + { ourUuid } ); const saveDuration = Date.now() - saveStartTime; diff --git a/ts/model-types.d.ts b/ts/model-types.d.ts index 24b099191..1a366477e 100644 --- a/ts/model-types.d.ts +++ b/ts/model-types.d.ts @@ -205,6 +205,9 @@ export type MessageAttributesType = { // background, when we were still in IndexedDB, before attachments had gone to disk // We set this so that the idle message upgrade process doesn't pick this message up schemaVersion?: number; + // migrateMessageData will increment this field on every failure and give up + // when the value is too high. + schemaMigrationAttempts?: number; // This should always be set for new messages, but older messages may not have them. We // may not have these for outbound messages, either, as we have not needed them. serverGuid?: string; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index cd9ec1bc2..4699837bf 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -4387,22 +4387,31 @@ async function removeAllConfiguration( })(); } +const MAX_MESSAGE_MIGRATION_ATTEMPTS = 5; + async function getMessagesNeedingUpgrade( limit: number, { maxVersion }: { maxVersion: number } ): Promise> { const db = getInstance(); + const rows: JSONRows = db .prepare( ` SELECT json FROM messages - WHERE schemaVersion IS NULL OR schemaVersion < $maxVersion + WHERE + (schemaVersion IS NULL OR schemaVersion < $maxVersion) AND + IFNULL( + json_extract(json, '$.schemaMigrationAttempts'), + 0 + ) < $maxAttempts LIMIT $limit; ` ) .all({ maxVersion, + maxAttempts: MAX_MESSAGE_MIGRATION_ATTEMPTS, limit, });