From 057762806eb3ca4b8534b2cced61b73fde92f34e Mon Sep 17 00:00:00 2001 From: Daniel Gasienica Date: Mon, 2 Apr 2018 18:59:24 -0400 Subject: [PATCH] Add support for attachment background migration without index --- js/background.js | 39 +++-- js/modules/messages_data_migrator.js | 209 +++++++++++++++++---------- 2 files changed, 153 insertions(+), 95 deletions(-) diff --git a/js/background.js b/js/background.js index 8e0c0c51b..4b659b4d7 100644 --- a/js/background.js +++ b/js/background.js @@ -19,7 +19,7 @@ const { upgradeMessageSchema } = window.Signal.Migrations; const { Migrations0DatabaseWithAttachmentData, - // Migrations1DatabaseWithoutAttachmentData, + Migrations1DatabaseWithoutAttachmentData, } = window.Signal.Migrations; const { Views } = window.Signal; @@ -86,11 +86,21 @@ console.log('Run migrations on database with attachment data'); await Migrations0DatabaseWithAttachmentData.run({ Backbone }); - const database = Whisper.Database; - const status = await Migrations1DatabaseWithoutAttachmentData.getStatus({ database }); + await MessageDataMigrator.dangerouslyProcessAllWithoutIndex({ + databaseName: Migrations0DatabaseWithAttachmentData.getDatabase().name, + minDatabaseVersion: Migrations0DatabaseWithAttachmentData.getDatabase().version, + upgradeMessageSchema, + }); + + const status = await Migrations1DatabaseWithoutAttachmentData.getStatus({ + database: Whisper.Database, + }); console.log('Run migrations on database without attachment data:', status); if (status.canRun) { - await Migrations1DatabaseWithoutAttachmentData.run({ Backbone, database }); + await Migrations1DatabaseWithoutAttachmentData.run({ + Backbone, + database: Whisper.Database, + }); } console.log('Storage fetch'); @@ -98,19 +108,18 @@ const idleDetector = new IdleDetector(); - const NUM_MESSAGE_UPGRADES_PER_IDLE = 2; idleDetector.on('idle', async () => { - const results = await MessageDataMigrator.processNext({ - BackboneMessage: Whisper.Message, - BackboneMessageCollection: Whisper.MessageCollection, - count: NUM_MESSAGE_UPGRADES_PER_IDLE, - upgradeMessageSchema, - }); - console.log('Upgrade message schema:', results); + // const database = Migrations0DatabaseWithAttachmentData.getDatabase(); + // const batch = await MessageDataMigrator.processNextBatchWithoutIndex({ + // databaseName: database.name, + // minDatabaseVersion: database.version, + // upgradeMessageSchema, + // }); + // console.log('Upgrade message schema:', batch); - if (!results.done) { - idleDetector.stop(); - } + // if (batch.done) { + // idleDetector.stop(); + // } }); /* eslint-disable */ diff --git a/js/modules/messages_data_migrator.js b/js/modules/messages_data_migrator.js index 5545853d8..7a210d74c 100644 --- a/js/modules/messages_data_migrator.js +++ b/js/modules/messages_data_migrator.js @@ -1,8 +1,8 @@ // Module to upgrade the schema of messages, e.g. migrate attachments to disk. -// `processAll` purposely doesn’t rely on our Backbone IndexedDB adapter to -// prevent automatic migrations. Rather, it uses direct IndexedDB access. -// This includes avoiding usage of `storage` module which uses Backbone under -// the hood. +// `dangerouslyProcessAllWithoutIndex` purposely doesn’t rely on our Backbone +// IndexedDB adapter to prevent automatic migrations. Rather, it uses direct +// IndexedDB access. This includes avoiding usage of `storage` module which uses +// Backbone under the hood. /* global IDBKeyRange */ @@ -81,7 +81,7 @@ exports.processNext = async ({ }; }; -exports.processAll = async ({ +exports.dangerouslyProcessAllWithoutIndex = async ({ databaseName, minDatabaseVersion, upgradeMessageSchema, @@ -111,84 +111,26 @@ exports.processAll = async ({ ` to be at least ${minDatabaseVersion}`); } - const isComplete = await settings.isAttachmentMigrationComplete(connection); - console.log('Attachment migration status:', isComplete ? 'complete' : 'incomplete'); - if (isComplete) { - return; - } - - let numTotalMessages = null; - // eslint-disable-next-line more/no-then - getNumMessages({ connection }).then((numMessages) => { - numTotalMessages = numMessages; - }); + // NOTE: Even if we make this async using `then`, requesting `count` on an + // IndexedDB store blocks all subsequent transactions, so we might as well + // explicitly wait for it here: + const numTotalMessages = await _getNumMessages({ connection }); const migrationStartTime = Date.now(); - let unprocessedMessages = []; - let totalMessagesProcessed = 0; - do { - const lastProcessedIndex = - // eslint-disable-next-line no-await-in-loop - await settings.getAttachmentMigrationLastProcessedIndex(connection); - - const fetchUnprocessedMessagesStartTime = Date.now(); - unprocessedMessages = - // eslint-disable-next-line no-await-in-loop - await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({ - connection, - count: NUM_MESSAGES_PER_BATCH, - lastIndex: lastProcessedIndex, - }); - const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime; - const numUnprocessedMessages = unprocessedMessages.length; - - if (numUnprocessedMessages === 0) { + let numCumulativeMessagesProcessed = 0; + // eslint-disable-next-line no-constant-condition + while (true) { + // eslint-disable-next-line no-await-in-loop + const status = await _processBatch({ connection, upgradeMessageSchema }); + if (status.done) { break; } - - const upgradeStartTime = Date.now(); - const upgradedMessages = - // eslint-disable-next-line no-await-in-loop - await Promise.all(unprocessedMessages.map(upgradeMessageSchema)); - const upgradeDuration = Date.now() - upgradeStartTime; - - const saveMessagesStartTime = Date.now(); - const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite'); - const transactionCompletion = database.completeTransaction(transaction); - // eslint-disable-next-line no-await-in-loop - await Promise.all(upgradedMessages.map(_saveMessage({ transaction }))); - // eslint-disable-next-line no-await-in-loop - await transactionCompletion; - const saveDuration = Date.now() - saveMessagesStartTime; - - // TODO: Confirm transaction is complete - - const lastMessage = last(upgradedMessages); - const newLastProcessedIndex = lastMessage ? lastMessage.id : null; - if (newLastProcessedIndex) { - // eslint-disable-next-line no-await-in-loop - await settings.setAttachmentMigrationLastProcessedIndex( - connection, - newLastProcessedIndex - ); - } - - totalMessagesProcessed += numUnprocessedMessages; - console.log('Upgrade message schema:', { - lastProcessedIndex, - numUnprocessedMessages, - numCumulativeMessagesProcessed: totalMessagesProcessed, + numCumulativeMessagesProcessed += status.numMessagesProcessed; + console.log('Upgrade message schema:', Object.assign({}, status, { numTotalMessages, - fetchDuration, - saveDuration, - upgradeDuration, - newLastProcessedIndex, - targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION, - }); - } while (unprocessedMessages.length > 0); - - await settings.markAttachmentMigrationComplete(connection); - await settings.deleteAttachmentMigrationLastProcessedIndex(connection); + numCumulativeMessagesProcessed, + })); + } console.log('Close database connection'); connection.close(); @@ -196,10 +138,117 @@ exports.processAll = async ({ const totalDuration = Date.now() - migrationStartTime; console.log('Attachment migration complete:', { totalDuration, - totalMessagesProcessed, + totalMessagesProcessed: numCumulativeMessagesProcessed, }); }; +exports.processNextBatchWithoutIndex = async ({ + databaseName, + minDatabaseVersion, + upgradeMessageSchema, +} = {}) => { + if (!isFunction(upgradeMessageSchema)) { + throw new TypeError('"upgradeMessageSchema" is required'); + } + + const connection = await _getConnection({ databaseName, minDatabaseVersion }); + const batch = await _processBatch({ connection, upgradeMessageSchema }); + return batch; +}; + +// Private API +const _getConnection = async ({ databaseName, minDatabaseVersion }) => { + if (!isString(databaseName)) { + throw new TypeError('"databaseName" must be a string'); + } + + if (!isNumber(minDatabaseVersion)) { + throw new TypeError('"minDatabaseVersion" must be a number'); + } + + const connection = await database.open(databaseName); + const databaseVersion = connection.version; + const isValidDatabaseVersion = databaseVersion >= minDatabaseVersion; + console.log('Database status', { + databaseVersion, + isValidDatabaseVersion, + minDatabaseVersion, + }); + if (!isValidDatabaseVersion) { + throw new Error(`Expected database version (${databaseVersion})` + + ` to be at least ${minDatabaseVersion}`); + } + + return connection; +}; + +const _processBatch = async ({ connection, upgradeMessageSchema } = {}) => { + if (!isObject(connection)) { + throw new TypeError('"connection" must be a string'); + } + + if (!isFunction(upgradeMessageSchema)) { + throw new TypeError('"upgradeMessageSchema" is required'); + } + + const isAttachmentMigrationComplete = + await settings.isAttachmentMigrationComplete(connection); + if (isAttachmentMigrationComplete) { + return { + done: true, + }; + } + + const lastProcessedIndex = + await settings.getAttachmentMigrationLastProcessedIndex(connection); + + const fetchUnprocessedMessagesStartTime = Date.now(); + const unprocessedMessages = + await _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex({ + connection, + count: NUM_MESSAGES_PER_BATCH, + lastIndex: lastProcessedIndex, + }); + const fetchDuration = Date.now() - fetchUnprocessedMessagesStartTime; + + const upgradeStartTime = Date.now(); + const upgradedMessages = + await Promise.all(unprocessedMessages.map(upgradeMessageSchema)); + const upgradeDuration = Date.now() - upgradeStartTime; + + const saveMessagesStartTime = Date.now(); + const transaction = connection.transaction(MESSAGES_STORE_NAME, 'readwrite'); + const transactionCompletion = database.completeTransaction(transaction); + await Promise.all(upgradedMessages.map(_saveMessage({ transaction }))); + await transactionCompletion; + const saveDuration = Date.now() - saveMessagesStartTime; + + const numMessagesProcessed = upgradedMessages.length; + const done = numMessagesProcessed === 0; + const lastMessage = last(upgradedMessages); + const newLastProcessedIndex = lastMessage ? lastMessage.id : null; + if (!done) { + await settings.setAttachmentMigrationLastProcessedIndex( + connection, + newLastProcessedIndex + ); + } else { + await settings.markAttachmentMigrationComplete(connection); + await settings.deleteAttachmentMigrationLastProcessedIndex(connection); + } + + return { + done, + fetchDuration, + lastProcessedIndex, + newLastProcessedIndex, + numMessagesProcessed, + saveDuration, + targetSchemaVersion: Message.CURRENT_SCHEMA_VERSION, + upgradeDuration, + }; +}; + const _saveMessageBackbone = ({ BackboneMessage } = {}) => (message) => { const backboneMessage = new BackboneMessage(message); return deferredToPromise(backboneMessage.save()); @@ -281,7 +330,7 @@ const _dangerouslyFetchMessagesRequiringSchemaUpgradeWithoutIndex = }); }; -const getNumMessages = async ({ connection } = {}) => { +const _getNumMessages = async ({ connection } = {}) => { if (!isObject(connection)) { throw new TypeError('"connection" is required'); }