From 716f852970cd276ba424fe1f161bc9f94e58e165 Mon Sep 17 00:00:00 2001 From: Scott Nonnenberg Date: Fri, 21 Jul 2023 15:10:32 -0700 Subject: [PATCH] New getRecentStoryReplies function to clean up replies in multiple convos --- ts/jobs/helpers/sendReaction.ts | 14 +-- ts/models/conversations.ts | 8 +- ts/models/messages.ts | 49 +++++--- ts/sql/Client.ts | 11 ++ ts/sql/Interface.ts | 17 +++ ts/sql/Server.ts | 49 ++++++++ ts/sql/migrations/86-story-replies-index.ts | 32 +++++ ts/sql/migrations/index.ts | 2 + .../sql/getRecentStoryReplies_test.ts | 118 ++++++++++++++++++ ts/test-node/sql_migrations_test.ts | 46 +++++++ ts/util/cleanup.ts | 73 ++++++----- 11 files changed, 356 insertions(+), 63 deletions(-) create mode 100644 ts/sql/migrations/86-story-replies-index.ts create mode 100644 ts/test-electron/sql/getRecentStoryReplies_test.ts diff --git a/ts/jobs/helpers/sendReaction.ts b/ts/jobs/helpers/sendReaction.ts index 5cd7019ea..dad222306 100644 --- a/ts/jobs/helpers/sendReaction.ts +++ b/ts/jobs/helpers/sendReaction.ts @@ -312,13 +312,13 @@ export async function sendReaction( if (!ephemeralMessageForReactionSend.doNotSave) { const reactionMessage = ephemeralMessageForReactionSend; - await Promise.all([ - await window.Signal.Data.saveMessage(reactionMessage.attributes, { - ourUuid, - forceSave: true, - }), - reactionMessage.hydrateStoryContext(message.attributes), - ]); + await reactionMessage.hydrateStoryContext(message.attributes, { + shouldSave: false, + }); + await window.Signal.Data.saveMessage(reactionMessage.attributes, { + ourUuid, + forceSave: true, + }); void conversation.addSingleMessage( window.MessageController.register(reactionMessage.id, reactionMessage) diff --git a/ts/models/conversations.ts b/ts/models/conversations.ts index c2b5b5c58..ed084288e 100644 --- a/ts/models/conversations.ts +++ b/ts/models/conversations.ts @@ -1395,7 +1395,7 @@ export class ConversationModel extends window.Backbone } private async beforeAddSingleMessage(message: MessageModel): Promise { - await message.hydrateStoryContext(); + await message.hydrateStoryContext(undefined, { shouldSave: true }); if (!this.newMessageQueue) { this.newMessageQueue = new PQueue({ @@ -1778,7 +1778,11 @@ export class ConversationModel extends window.Backbone log.warn(`cleanModels: Upgraded schema of ${upgraded} messages`); } - await Promise.all(result.map(model => model.hydrateStoryContext())); + await Promise.all( + result.map(model => + model.hydrateStoryContext(undefined, { shouldSave: true }) + ) + ); return result; } diff --git a/ts/models/messages.ts b/ts/models/messages.ts index f5f98c1ce..47eaaa6dd 100644 --- a/ts/models/messages.ts +++ b/ts/models/messages.ts @@ -332,8 +332,14 @@ export class MessageModel extends window.Backbone.Model { } async hydrateStoryContext( - inMemoryMessage?: MessageAttributesType + inMemoryMessage?: MessageAttributesType, + { + shouldSave, + }: { + shouldSave?: boolean; + } = {} ): Promise { + const ourUuid = window.textsecure.storage.user.getCheckedUuid().toString(); const storyId = this.get('storyId'); if (!storyId) { return; @@ -366,6 +372,9 @@ export class MessageModel extends window.Backbone.Model { messageId: '', }, }); + if (shouldSave) { + await window.Signal.Data.saveMessage(this.attributes, { ourUuid }); + } return; } @@ -382,6 +391,9 @@ export class MessageModel extends window.Backbone.Model { messageId: message.id, }, }); + if (shouldSave) { + await window.Signal.Data.saveMessage(this.attributes, { ourUuid }); + } } // Dependencies of prop-generation functions @@ -1028,7 +1040,7 @@ export class MessageModel extends window.Backbone.Model { if (this.get('storyReplyContext')) { this.unset('storyReplyContext'); } - await this.hydrateStoryContext(message.attributes); + await this.hydrateStoryContext(message.attributes, { shouldSave: true }); return; } @@ -2610,7 +2622,9 @@ export class MessageModel extends window.Backbone.Model { }); if (storyQuote) { - await this.hydrateStoryContext(storyQuote.attributes); + await this.hydrateStoryContext(storyQuote.attributes, { + shouldSave: true, + }); } const isSupported = !isUnsupportedMessage(message.attributes); @@ -3003,14 +3017,14 @@ export class MessageModel extends window.Backbone.Model { }, }); + await generatedMessage.hydrateStoryContext(storyMessage, { + shouldSave: false, + }); // Note: generatedMessage comes with an id, so we have to force this save - await Promise.all([ - window.Signal.Data.saveMessage(generatedMessage.attributes, { - ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), - forceSave: true, - }), - generatedMessage.hydrateStoryContext(storyMessage), - ]); + await window.Signal.Data.saveMessage(generatedMessage.attributes, { + ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), + forceSave: true, + }); log.info('Reactions.onReaction adding reaction to story', { reactionMessageId: getMessageIdForLogging( @@ -3159,13 +3173,14 @@ export class MessageModel extends window.Backbone.Model { generatedMessage, 'Story reactions must provide storyReactionmessage' ); - await Promise.all([ - await window.Signal.Data.saveMessage(generatedMessage.attributes, { - ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), - forceSave: true, - }), - generatedMessage.hydrateStoryContext(this.attributes), - ]); + + await generatedMessage.hydrateStoryContext(this.attributes, { + shouldSave: false, + }); + await window.Signal.Data.saveMessage(generatedMessage.attributes, { + ourUuid: window.textsecure.storage.user.getCheckedUuid().toString(), + forceSave: true, + }); void conversation.addSingleMessage( window.MessageController.register( diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index f1f8988c0..6246fb67e 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -36,6 +36,7 @@ import type { ClientSearchResultMessageType, ConversationType, GetConversationRangeCenteredOnMessageResultType, + GetRecentStoryRepliesOptionsType, IdentityKeyIdType, IdentityKeyType, StoredIdentityKeyType, @@ -99,6 +100,7 @@ const exclusiveInterface: ClientExclusiveInterface = { searchMessages, + getRecentStoryReplies, getOlderMessagesByConversation, getConversationRangeCenteredOnMessage, getNewerMessagesByConversation, @@ -613,6 +615,15 @@ async function getNewerMessagesByConversation( return handleMessageJSON(messages); } +async function getRecentStoryReplies( + storyId: string, + options?: GetRecentStoryRepliesOptionsType +): Promise> { + const messages = await channels.getRecentStoryReplies(storyId, options); + + return handleMessageJSON(messages); +} + async function getOlderMessagesByConversation( options: AdjacentMessagesByConversationOptionsType ): Promise> { diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index 43af041ba..a69147e93 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -825,6 +825,11 @@ export type ServerInterface = DataInterface & { options?: { limit?: number }; contactUuidsMatchingQuery?: Array; }) => Promise>; + + getRecentStoryReplies( + storyId: string, + options?: GetRecentStoryRepliesOptionsType + ): Promise>; getOlderMessagesByConversation: ( options: AdjacentMessagesByConversationOptionsType ) => Promise>; @@ -895,6 +900,13 @@ export type ServerInterface = DataInterface & { getAllBadgeImageFileLocalPaths: () => Promise>; }; +export type GetRecentStoryRepliesOptionsType = { + limit?: number; + messageId?: string; + receivedAt?: number; + sentAt?: number; +}; + // Differing signature on client/server export type ClientExclusiveInterface = { // Differing signature on client/server @@ -913,6 +925,11 @@ export type ClientExclusiveInterface = { options?: { limit?: number }; contactUuidsMatchingQuery?: Array; }) => Promise>; + + getRecentStoryReplies( + storyId: string, + options?: GetRecentStoryRepliesOptionsType + ): Promise>; getOlderMessagesByConversation: ( options: AdjacentMessagesByConversationOptionsType ) => Promise>; diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 8e60b877d..ce0fcc317 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -93,6 +93,7 @@ import type { GetAllStoriesResultType, GetConversationRangeCenteredOnMessageResultType, GetKnownMessageAttachmentsResultType, + GetRecentStoryRepliesOptionsType, GetUnreadByConversationAndMarkReadResultType, IdentityKeyIdType, StoredIdentityKeyType, @@ -251,6 +252,7 @@ const dataInterface: ServerInterface = { getMessageCount, getStoryCount, + getRecentStoryReplies, saveMessage, saveMessages, removeMessage, @@ -2530,6 +2532,53 @@ enum AdjacentDirection { Newer = 'Newer', } +async function getRecentStoryReplies( + storyId: string, + options?: GetRecentStoryRepliesOptionsType +): Promise> { + return getRecentStoryRepliesSync(storyId, options); +} + +// This function needs to pull story replies from all conversations, because when we send +// a story to one or more distribution lists, each reply to it will be in the sender's +// 1:1 conversation with us. +function getRecentStoryRepliesSync( + storyId: string, + { + limit = 100, + messageId, + receivedAt = Number.MAX_VALUE, + sentAt = Number.MAX_VALUE, + }: GetRecentStoryRepliesOptionsType = {} +): Array { + const db = getInstance(); + const timeFilters = { + first: sqlFragment`received_at = ${receivedAt} AND sent_at < ${sentAt}`, + second: sqlFragment`received_at < ${receivedAt}`, + }; + + const createQuery = (timeFilter: QueryFragment): QueryFragment => sqlFragment` + SELECT json FROM messages WHERE + (${messageId} IS NULL OR id IS NOT ${messageId}) AND + isStory IS 0 AND + storyId IS ${storyId} AND + ( + ${timeFilter} + ) + ORDER BY received_at DESC, sent_at DESC + `; + + const template = sqlFragment` + SELECT first.json FROM (${createQuery(timeFilters.first)}) as first + UNION ALL + SELECT second.json FROM (${createQuery(timeFilters.second)}) as second + `; + + const [query, params] = sql`${template} LIMIT ${limit}`; + + return db.prepare(query).all(params); +} + function getAdjacentMessagesByConversationSync( direction: AdjacentDirection, { diff --git a/ts/sql/migrations/86-story-replies-index.ts b/ts/sql/migrations/86-story-replies-index.ts new file mode 100644 index 000000000..0fda5328f --- /dev/null +++ b/ts/sql/migrations/86-story-replies-index.ts @@ -0,0 +1,32 @@ +// Copyright 2023 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import type { Database } from '@signalapp/better-sqlite3'; + +import type { LoggerType } from '../../types/Logging'; + +export default function updateToSchemaVersion86( + currentVersion: number, + db: Database, + logger: LoggerType +): void { + if (currentVersion >= 86) { + return; + } + + db.transaction(() => { + // The key reason for this new schema is that all of our previous schemas start with + // conversationId. This query is meant to find all replies to a given story, no + // matter the conversation. + db.exec( + `CREATE INDEX messages_story_replies + ON messages (storyId, received_at, sent_at) + WHERE isStory IS 0; + ` + ); + + db.pragma('user_version = 86'); + })(); + + logger.info('updateToSchemaVersion86: success!'); +} diff --git a/ts/sql/migrations/index.ts b/ts/sql/migrations/index.ts index 35907077d..86ce1e322 100644 --- a/ts/sql/migrations/index.ts +++ b/ts/sql/migrations/index.ts @@ -61,6 +61,7 @@ import updateToSchemaVersion82 from './82-edited-messages-read-index'; import updateToSchemaVersion83 from './83-mentions'; import updateToSchemaVersion84 from './84-all-mentions'; import updateToSchemaVersion85 from './85-add-kyber-keys'; +import updateToSchemaVersion86 from './86-story-replies-index'; function updateToSchemaVersion1( currentVersion: number, @@ -1992,6 +1993,7 @@ export const SCHEMA_VERSIONS = [ updateToSchemaVersion83, updateToSchemaVersion84, updateToSchemaVersion85, + updateToSchemaVersion86, ]; export function updateSchema(db: Database, logger: LoggerType): void { diff --git a/ts/test-electron/sql/getRecentStoryReplies_test.ts b/ts/test-electron/sql/getRecentStoryReplies_test.ts new file mode 100644 index 000000000..49e251a55 --- /dev/null +++ b/ts/test-electron/sql/getRecentStoryReplies_test.ts @@ -0,0 +1,118 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import dataInterface from '../../sql/Client'; +import { UUID } from '../../types/UUID'; +import type { UUIDStringType } from '../../types/UUID'; + +import type { MessageAttributesType } from '../../model-types.d'; + +const { _getAllMessages, getRecentStoryReplies, removeAll, saveMessages } = + dataInterface; + +function getUuid(): UUIDStringType { + return UUID.generate().toString(); +} + +describe('sql/getRecentStoryReplies', () => { + beforeEach(async () => { + await removeAll(); + }); + + it('returns message matching storyId in all converssations ', async () => { + assert.lengthOf(await _getAllMessages(), 0); + + const now = Date.now(); + const conversationId1 = getUuid(); + const conversationId2 = getUuid(); + const conversationId3 = getUuid(); + const ourUuid = getUuid(); + const storyId = getUuid(); + const message1: MessageAttributesType = { + id: getUuid(), + body: 'message 1 - reply #1', + type: 'incoming', + conversationId: conversationId1, + sent_at: now - 20, + received_at: now - 20, + timestamp: now - 20, + storyId, + }; + const message2: MessageAttributesType = { + id: getUuid(), + body: 'message 2 - reply #2', + type: 'incoming', + conversationId: conversationId2, + sent_at: now - 10, + received_at: now - 10, + timestamp: now - 10, + storyId, + }; + const message3: MessageAttributesType = { + id: getUuid(), + body: 'message 3 - reply #3', + type: 'incoming', + conversationId: conversationId3, + sent_at: now, + received_at: now, + timestamp: now, + storyId, + }; + const message4: MessageAttributesType = { + id: getUuid(), + body: 'message 4 - the story itself', + type: 'story', + conversationId: conversationId3, + sent_at: now, + received_at: now, + timestamp: now, + storyId, + }; + const message5: MessageAttributesType = { + id: getUuid(), + body: 'message 5 - different story reply', + type: 'incoming', + conversationId: conversationId1, + sent_at: now, + received_at: now, + timestamp: now, + storyId: getUuid(), + }; + const message6: MessageAttributesType = { + id: getUuid(), + body: 'message 6 - no story fields', + type: 'incoming', + conversationId: conversationId1, + sent_at: now, + received_at: now, + timestamp: now, + }; + + await saveMessages( + [message1, message2, message3, message4, message5, message6], + { + forceSave: true, + ourUuid, + } + ); + + assert.lengthOf(await _getAllMessages(), 6); + + const searchResultsPage1 = await getRecentStoryReplies(storyId, { + limit: 2, + }); + assert.lengthOf(searchResultsPage1, 2, 'page 1'); + assert.strictEqual(searchResultsPage1[0].body, message3.body); + assert.strictEqual(searchResultsPage1[1].body, message2.body); + + const searchResultsPage2 = await getRecentStoryReplies(storyId, { + messageId: message2.id, + receivedAt: message2.received_at, + limit: 2, + }); + assert.lengthOf(searchResultsPage2, 1, 'page 2'); + assert.strictEqual(searchResultsPage2[0].body, message1.body); + }); +}); diff --git a/ts/test-node/sql_migrations_test.ts b/ts/test-node/sql_migrations_test.ts index a1a09f892..a65ff7b57 100644 --- a/ts/test-node/sql_migrations_test.ts +++ b/ts/test-node/sql_migrations_test.ts @@ -3528,4 +3528,50 @@ describe('SQL migrations test', () => { assert.isAtLeast(object.createdAt, startingTime); }); }); + + describe('updateToSchemaVersion86', () => { + it('supports the right index for first query used in getRecentStoryRepliesSync', () => { + updateToVersion(86); + const [query, params] = sql` + EXPLAIN QUERY PLAN + SELECT json FROM messages WHERE + ('messageId' IS NULL OR id IS NOT 'messageId') AND + isStory IS 0 AND + storyId IS 'storyId' AND + received_at = 100000 AND sent_at < 100000 + ORDER BY received_at DESC, sent_at DESC + LIMIT 100 + `; + const { detail } = db.prepare(query).get(params); + + assert.notInclude(detail, 'B-TREE'); + assert.notInclude(detail, 'SCAN'); + assert.include( + detail, + 'SEARCH messages USING INDEX messages_story_replies (storyId=? AND received_at=? AND sent_at { + updateToVersion(86); + const [query, params] = sql` + EXPLAIN QUERY PLAN + SELECT json FROM messages WHERE + ('messageId' IS NULL OR id IS NOT 'messageId') AND + isStory IS 0 AND + storyId IS 'storyId' AND + received_at < 100000 + ORDER BY received_at DESC, sent_at DESC + LIMIT 100 + `; + const { detail } = db.prepare(query).get(params); + + assert.notInclude(detail, 'B-TREE'); + assert.notInclude(detail, 'SCAN'); + assert.include( + detail, + 'SEARCH messages USING INDEX messages_story_replies (storyId=? AND received_at { - const { messageId, receivedAt } = pagination || {}; + const storyId = story.id; + const parentConversation = window.ConversationController.get( + story.conversationId + ); + const isGroupConversation = Boolean( + parentConversation && !isDirectConversation(parentConversation.attributes) + ); - const replies = await window.Signal.Data.getOlderMessagesByConversation({ - conversationId, - includeStoryReplies: false, - messageId, - receivedAt, + const replies = await window.Signal.Data.getRecentStoryReplies( storyId, - }); + pagination + ); + + const logId = `cleanupStoryReplies(${storyId}/isGroup=${isGroupConversation})`; + const lastMessage = replies[replies.length - 1]; + const lastMessageId = lastMessage?.id; + const lastReceivedAt = lastMessage?.received_at; + + log.info( + `${logId}: Cleaning ${replies.length} replies, ending with message ${lastMessageId}` + ); if (!replies.length) { return; } - const lastMessage = replies[replies.length - 1]; - const lastMessageId = lastMessage.id; - const lastReceivedAt = lastMessage.received_at; - - if (messageId === lastMessageId) { + if (pagination?.messageId === lastMessageId) { + log.info( + `${logId}: Returning early; last message id is pagination starting id` + ); return; } @@ -74,14 +75,16 @@ async function cleanupStoryReplies( ); } else { // Refresh the storyReplyContext data for 1:1 conversations - replies.forEach(reply => { - const model = window.MessageController.register(reply.id, reply); - model.unset('storyReplyContext'); - drop(model.hydrateStoryContext()); - }); + await Promise.all( + replies.map(async reply => { + const model = window.MessageController.register(reply.id, reply); + model.unset('storyReplyContext'); + await model.hydrateStoryContext(story, { shouldSave: true }); + }) + ); } - return cleanupStoryReplies(conversationId, storyId, isGroupConversation, { + return cleanupStoryReplies(story, { messageId: lastMessageId, receivedAt: lastReceivedAt, }); @@ -93,13 +96,9 @@ export async function deleteMessageData( await window.Signal.Migrations.deleteExternalMessageFiles(message); if (isStory(message)) { - const { id, conversationId } = message; - const parentConversation = - window.ConversationController.get(conversationId); - const isGroupConversation = Boolean( - parentConversation && !isDirectConversation(parentConversation.attributes) - ); - await cleanupStoryReplies(conversationId, id, isGroupConversation); + // Attachments have been deleted from disk; remove from memory before replies update + const storyWithoutAttachments = { ...message, attachments: undefined }; + await cleanupStoryReplies(storyWithoutAttachments); } const { sticker } = message;