MessageReceipts: Removed dropped receipts from cache and database
This commit is contained in:
@@ -35,7 +35,8 @@ import { drop } from '../util/drop';
|
|||||||
import { getMessageById } from '../messages/getMessageById';
|
import { getMessageById } from '../messages/getMessageById';
|
||||||
import { MessageModel } from '../models/messages';
|
import { MessageModel } from '../models/messages';
|
||||||
|
|
||||||
const { deleteSentProtoRecipient, removeSyncTaskById } = DataWriter;
|
const { deleteSentProtoRecipient, removeSyncTasks, removeSyncTaskById } =
|
||||||
|
DataWriter;
|
||||||
|
|
||||||
export const messageReceiptTypeSchema = z.enum(['Delivery', 'Read', 'View']);
|
export const messageReceiptTypeSchema = z.enum(['Delivery', 'Read', 'View']);
|
||||||
|
|
||||||
@@ -197,9 +198,21 @@ async function processReceiptsForMessage(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const { validReceipts } = await updateMessageWithReceipts(message, receipts);
|
const { droppedReceipts, validReceipts } = await updateMessageWithReceipts(
|
||||||
|
message,
|
||||||
|
receipts
|
||||||
|
);
|
||||||
|
|
||||||
await window.MessageCache.saveMessage(message.attributes);
|
await Promise.all([
|
||||||
|
window.MessageCache.saveMessage(message.attributes),
|
||||||
|
removeSyncTasks(
|
||||||
|
droppedReceipts.map(item => {
|
||||||
|
const { syncTaskId } = item;
|
||||||
|
cachedReceipts.delete(syncTaskId);
|
||||||
|
return syncTaskId;
|
||||||
|
})
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
|
||||||
// Confirm/remove receipts, and delete sent protos
|
// Confirm/remove receipts, and delete sent protos
|
||||||
for (const receipt of validReceipts) {
|
for (const receipt of validReceipts) {
|
||||||
@@ -219,6 +232,7 @@ async function updateMessageWithReceipts(
|
|||||||
message: MessageModel,
|
message: MessageModel,
|
||||||
receipts: Array<MessageReceiptAttributesType>
|
receipts: Array<MessageReceiptAttributesType>
|
||||||
): Promise<{
|
): Promise<{
|
||||||
|
droppedReceipts: Array<MessageReceiptAttributesType>;
|
||||||
validReceipts: Array<MessageReceiptAttributesType>;
|
validReceipts: Array<MessageReceiptAttributesType>;
|
||||||
}> {
|
}> {
|
||||||
const logId = `updateMessageWithReceipts(timestamp=${message.get('timestamp')})`;
|
const logId = `updateMessageWithReceipts(timestamp=${message.get('timestamp')})`;
|
||||||
@@ -244,7 +258,8 @@ async function updateMessageWithReceipts(
|
|||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
`${logId}: batch processing ${receipts.length}` +
|
`${logId}: batch processing ${receipts.length}` +
|
||||||
` receipt${receipts.length === 1 ? '' : 's'}`
|
` receipt${receipts.length === 1 ? '' : 's'}` +
|
||||||
|
`, dropped count: ${droppedReceipts.length}`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Generate the updated message synchronously
|
// Generate the updated message synchronously
|
||||||
@@ -257,7 +272,7 @@ async function updateMessageWithReceipts(
|
|||||||
}
|
}
|
||||||
message.set(attributes);
|
message.set(attributes);
|
||||||
|
|
||||||
return { validReceipts: receiptsToProcess };
|
return { droppedReceipts, validReceipts: receiptsToProcess };
|
||||||
}
|
}
|
||||||
|
|
||||||
const deleteSentProtoBatcher = createWaitBatcher({
|
const deleteSentProtoBatcher = createWaitBatcher({
|
||||||
|
@@ -902,6 +902,7 @@ type WritableInterface = {
|
|||||||
) => void;
|
) => void;
|
||||||
|
|
||||||
removeSyncTaskById: (id: string) => void;
|
removeSyncTaskById: (id: string) => void;
|
||||||
|
removeSyncTasks: (ids: ReadonlyArray<string>) => void;
|
||||||
saveSyncTasks: (tasks: Array<SyncTaskType>) => void;
|
saveSyncTasks: (tasks: Array<SyncTaskType>) => void;
|
||||||
|
|
||||||
incrementAllSyncTaskAttempts: () => void;
|
incrementAllSyncTaskAttempts: () => void;
|
||||||
|
@@ -520,6 +520,7 @@ export const DataWriter: ServerWritableInterface = {
|
|||||||
incrementMessagesMigrationAttempts,
|
incrementMessagesMigrationAttempts,
|
||||||
|
|
||||||
removeSyncTaskById,
|
removeSyncTaskById,
|
||||||
|
removeSyncTasks,
|
||||||
saveSyncTasks,
|
saveSyncTasks,
|
||||||
incrementAllSyncTaskAttempts,
|
incrementAllSyncTaskAttempts,
|
||||||
dequeueOldestSyncTasks,
|
dequeueOldestSyncTasks,
|
||||||
@@ -2147,6 +2148,19 @@ export function removeSyncTaskById(db: WritableDB, id: string): void {
|
|||||||
|
|
||||||
db.prepare(query).run(parameters);
|
db.prepare(query).run(parameters);
|
||||||
}
|
}
|
||||||
|
function removeSyncTaskBatch(db: WritableDB, ids: ReadonlyArray<string>): void {
|
||||||
|
db.prepare(
|
||||||
|
`
|
||||||
|
DELETE FROM syncTasks
|
||||||
|
WHERE id IN ( ${ids.map(() => '?').join(', ')} );
|
||||||
|
`
|
||||||
|
).run(ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
function removeSyncTasks(db: WritableDB, ids: ReadonlyArray<string>): void {
|
||||||
|
batchMultiVarQuery(db, ids, batch => removeSyncTaskBatch(db, batch));
|
||||||
|
}
|
||||||
|
|
||||||
export function saveSyncTasks(
|
export function saveSyncTasks(
|
||||||
db: WritableDB,
|
db: WritableDB,
|
||||||
tasks: Array<SyncTaskType>
|
tasks: Array<SyncTaskType>
|
||||||
|
Reference in New Issue
Block a user