diff --git a/ts/background.ts b/ts/background.ts index 4aaf5554a..b4507d404 100644 --- a/ts/background.ts +++ b/ts/background.ts @@ -14,6 +14,8 @@ import { isValidReactionEmoji } from './reactions/isValidReactionEmoji'; import { ConversationModel } from './models/conversations'; import { createBatcher } from './util/batcher'; import { updateConversationsWithUuidLookup } from './updateConversationsWithUuidLookup'; +import { initializeAllJobQueues } from './jobs/initializeAllJobQueues'; +import { removeStorageKeyJobQueue } from './jobs/removeStorageKeyJobQueue'; const MAX_ATTACHMENT_DOWNLOAD_AGE = 3600 * 72 * 1000; @@ -31,6 +33,8 @@ export async function startApp(): Promise { ); } + initializeAllJobQueues(); + let resolveOnAppView: (() => void) | undefined; const onAppView = new Promise(resolve => { resolveOnAppView = resolve; @@ -654,6 +658,13 @@ export async function startApp(): Promise { await window.Signal.Data.clearAllErrorStickerPackAttempts(); } + if (window.isBeforeVersion(lastVersion, 'v5.2.0')) { + const legacySenderCertificateStorageKey = 'senderCertificateWithUuid'; + await removeStorageKeyJobQueue.add({ + key: legacySenderCertificateStorageKey, + }); + } + // This one should always be last - it could restart the app if (window.isBeforeVersion(lastVersion, 'v1.15.0-beta.5')) { await window.Signal.Logs.deleteAll(); diff --git a/ts/jobs/Job.ts b/ts/jobs/Job.ts new file mode 100644 index 000000000..b143e9d85 --- /dev/null +++ b/ts/jobs/Job.ts @@ -0,0 +1,17 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { ParsedJob } from './types'; + +/** + * A single job instance. Shouldn't be instantiated directly, except by `JobQueue`. + */ +export class Job implements ParsedJob { + constructor( + readonly id: string, + readonly timestamp: number, + readonly queueType: string, + readonly data: T, + readonly completion: Promise + ) {} +} diff --git a/ts/jobs/JobError.ts b/ts/jobs/JobError.ts new file mode 100644 index 000000000..948bf2730 --- /dev/null +++ b/ts/jobs/JobError.ts @@ -0,0 +1,22 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { reallyJsonStringify } from '../util/reallyJsonStringify'; + +/** + * An error that wraps job errors. + * + * Should not be instantiated directly, except by `JobQueue`. + */ +export class JobError extends Error { + constructor(public readonly lastErrorThrownByJob: unknown) { + super(`Job failed. Last error: ${formatError(lastErrorThrownByJob)}`); + } +} + +function formatError(err: unknown): string { + if (err instanceof Error) { + return err.message; + } + return reallyJsonStringify(err); +} diff --git a/ts/jobs/JobQueue.ts b/ts/jobs/JobQueue.ts new file mode 100644 index 000000000..b4d6256c9 --- /dev/null +++ b/ts/jobs/JobQueue.ts @@ -0,0 +1,233 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { v4 as uuid } from 'uuid'; +import { noop } from 'lodash'; + +import { Job } from './Job'; +import { JobError } from './JobError'; +import { ParsedJob, StoredJob, JobQueueStore } from './types'; +import { assert } from '../util/assert'; +import * as log from '../logging/log'; + +const noopOnCompleteCallbacks = { + resolve: noop, + reject: noop, +}; + +type JobQueueOptions = { + /** + * The backing store for jobs. Typically a wrapper around the database. + */ + store: JobQueueStore; + + /** + * A unique name for this job queue. For example, might be "attachment downloads" or + * "message send". + */ + queueType: string; + + /** + * The maximum number of attempts for a job in this queue. A value of 1 will not allow + * the job to fail; a value of 2 will allow the job to fail once; etc. + */ + maxAttempts: number; + + /** + * `parseData` will be called with the raw data from `store`. For example, if the job + * takes a single number, `parseData` should throw if `data` is a number and should + * return the number otherwise. + * + * If it throws, the job will be deleted from the store and the job will not be run. + * + * Will only be called once per job, even if `maxAttempts > 1`. + */ + parseData: (data: unknown) => T; + + /** + * Run the job, given data. + * + * If it resolves, the job will be deleted from the store. + * + * If it rejects, the job will be retried up to `maxAttempts - 1` times, after which it + * will be deleted from the store. + */ + run: (job: Readonly>) => Promise; +}; + +export class JobQueue { + private readonly maxAttempts: number; + + private readonly parseData: (data: unknown) => T; + + private readonly queueType: string; + + private readonly run: (job: Readonly>) => Promise; + + private readonly store: JobQueueStore; + + private readonly logPrefix: string; + + private readonly onCompleteCallbacks = new Map< + string, + { + resolve: () => void; + reject: (err: unknown) => void; + } + >(); + + private started = false; + + constructor(options: Readonly>) { + assert( + Number.isInteger(options.maxAttempts) && options.maxAttempts >= 1, + 'maxAttempts should be a positive integer' + ); + assert( + options.maxAttempts <= Number.MAX_SAFE_INTEGER, + 'maxAttempts is too large' + ); + assert( + options.queueType.trim().length, + 'queueType should be a non-blank string' + ); + + this.maxAttempts = options.maxAttempts; + this.parseData = options.parseData; + this.queueType = options.queueType; + this.run = options.run; + this.store = options.store; + + this.logPrefix = `${this.queueType} job queue:`; + } + + /** + * Start streaming jobs from the store. + */ + async streamJobs(): Promise { + if (this.started) { + throw new Error( + `${this.logPrefix} should not start streaming more than once` + ); + } + this.started = true; + + log.info(`${this.logPrefix} starting to stream jobs`); + + const stream = this.store.stream(this.queueType); + // We want to enqueue the jobs in sequence, not in parallel. `for await ... of` is a + // good way to do that. + // eslint-disable-next-line no-restricted-syntax + for await (const storedJob of stream) { + this.enqueueStoredJob(storedJob); + } + } + + /** + * Add a job, which should cause it to be enqueued and run. + * + * If `streamJobs` has not been called yet, it will be called. + */ + async add(data: Readonly): Promise> { + if (!this.started) { + throw new Error( + `${this.logPrefix} has not started streaming. Make sure to call streamJobs().` + ); + } + + const id = uuid(); + const timestamp = Date.now(); + + const completionPromise = new Promise((resolve, reject) => { + this.onCompleteCallbacks.set(id, { resolve, reject }); + }); + const completion = (async () => { + try { + await completionPromise; + } catch (err: unknown) { + throw new JobError(err); + } finally { + this.onCompleteCallbacks.delete(id); + } + })(); + + log.info(`${this.logPrefix} added new job ${id}`); + + const job = new Job(id, timestamp, this.queueType, data, completion); + await this.store.insert(job); + return job; + } + + private async enqueueStoredJob(storedJob: Readonly) { + assert( + storedJob.queueType === this.queueType, + 'Received a mis-matched queue type' + ); + + log.info(`${this.logPrefix} enqueuing job ${storedJob.id}`); + + // It's okay if we don't have a callback; that likely means the job was created before + // the process was started (e.g., from a previous run). + const { resolve, reject } = + this.onCompleteCallbacks.get(storedJob.id) || noopOnCompleteCallbacks; + + let parsedData: T; + try { + parsedData = this.parseData(storedJob.data); + } catch (err) { + log.error( + `${this.logPrefix} failed to parse data for job ${storedJob.id}` + ); + reject( + new Error( + 'Failed to parse job data. Was unexpected data loaded from the database?' + ) + ); + return; + } + + const parsedJob: ParsedJob = { + ...storedJob, + data: parsedData, + }; + + let result: + | undefined + | { success: true } + | { success: false; err: unknown }; + + for (let attempt = 1; attempt <= this.maxAttempts; attempt += 1) { + log.info( + `${this.logPrefix} running job ${storedJob.id}, attempt ${attempt} of ${this.maxAttempts}` + ); + try { + // We want an `await` in the loop, as we don't want a single job running more + // than once at a time. Ideally, the job will succeed on the first attempt. + // eslint-disable-next-line no-await-in-loop + await this.run(parsedJob); + result = { success: true }; + log.info( + `${this.logPrefix} job ${storedJob.id} succeeded on attempt ${attempt}` + ); + break; + } catch (err: unknown) { + result = { success: false, err }; + log.error( + `${this.logPrefix} job ${storedJob.id} failed on attempt ${attempt}` + ); + } + } + + await this.store.delete(storedJob.id); + + assert( + result, + 'The job never ran. This indicates a developer error in the job queue' + ); + if (result.success) { + resolve(); + } else { + reject(result.err); + } + } +} diff --git a/ts/jobs/JobQueueDatabaseStore.ts b/ts/jobs/JobQueueDatabaseStore.ts new file mode 100644 index 000000000..fe66fc161 --- /dev/null +++ b/ts/jobs/JobQueueDatabaseStore.ts @@ -0,0 +1,107 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { pick, noop } from 'lodash'; +import { AsyncQueue } from '../util/AsyncQueue'; +import { concat, wrapPromise } from '../util/asyncIterables'; +import { JobQueueStore, StoredJob } from './types'; +import databaseInterface from '../sql/Client'; +import * as log from '../logging/log'; + +type Database = { + getJobsInQueue(queueType: string): Promise>; + insertJob(job: Readonly): Promise; + deleteJob(id: string): Promise; +}; + +export class JobQueueDatabaseStore implements JobQueueStore { + private activeQueueTypes = new Set(); + + private queues = new Map>(); + + private initialFetchPromises = new Map>(); + + constructor(private readonly db: Database) {} + + async insert(job: Readonly): Promise { + log.info( + `JobQueueDatabaseStore adding job ${job.id} to queue ${JSON.stringify( + job.queueType + )}` + ); + + const initialFetchPromise = this.initialFetchPromises.get(job.queueType); + if (!initialFetchPromise) { + throw new Error( + `JobQueueDatabaseStore tried to add job for queue ${JSON.stringify( + job.queueType + )} but streaming had not yet started` + ); + } + await initialFetchPromise; + + await this.db.insertJob( + pick(job, ['id', 'timestamp', 'queueType', 'data']) + ); + + this.getQueue(job.queueType).add(job); + } + + async delete(id: string): Promise { + await this.db.deleteJob(id); + } + + stream(queueType: string): AsyncIterable { + if (this.activeQueueTypes.has(queueType)) { + throw new Error( + `Cannot stream queue type ${JSON.stringify(queueType)} more than once` + ); + } + this.activeQueueTypes.add(queueType); + + return concat([ + wrapPromise(this.fetchJobsAtStart(queueType)), + this.getQueue(queueType), + ]); + } + + private getQueue(queueType: string): AsyncQueue { + const existingQueue = this.queues.get(queueType); + if (existingQueue) { + return existingQueue; + } + + const result = new AsyncQueue(); + this.queues.set(queueType, result); + return result; + } + + private async fetchJobsAtStart(queueType: string): Promise> { + log.info( + `JobQueueDatabaseStore fetching existing jobs for queue ${JSON.stringify( + queueType + )}` + ); + + // This is initialized to `noop` because TypeScript doesn't know that `Promise` calls + // its callback synchronously, making sure `onFinished` is defined. + let onFinished: () => void = noop; + const initialFetchPromise = new Promise(resolve => { + onFinished = resolve; + }); + this.initialFetchPromises.set(queueType, initialFetchPromise); + + const result = await this.db.getJobsInQueue(queueType); + log.info( + `JobQueueDatabaseStore finished fetching existing ${ + result.length + } jobs for queue ${JSON.stringify(queueType)}` + ); + onFinished(); + return result; + } +} + +export const jobQueueDatabaseStore = new JobQueueDatabaseStore( + databaseInterface +); diff --git a/ts/jobs/initializeAllJobQueues.ts b/ts/jobs/initializeAllJobQueues.ts new file mode 100644 index 000000000..5ab8a0652 --- /dev/null +++ b/ts/jobs/initializeAllJobQueues.ts @@ -0,0 +1,11 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { removeStorageKeyJobQueue } from './removeStorageKeyJobQueue'; + +/** + * Start all of the job queues. Should be called when the database is ready. + */ +export function initializeAllJobQueues(): void { + removeStorageKeyJobQueue.streamJobs(); +} diff --git a/ts/jobs/removeStorageKeyJobQueue.ts b/ts/jobs/removeStorageKeyJobQueue.ts new file mode 100644 index 000000000..50228273f --- /dev/null +++ b/ts/jobs/removeStorageKeyJobQueue.ts @@ -0,0 +1,35 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import * as z from 'zod'; + +import { JobQueue } from './JobQueue'; +import { jobQueueDatabaseStore } from './JobQueueDatabaseStore'; + +const removeStorageKeyJobDataSchema = z.object({ + key: z.string().min(1), +}); + +type RemoveStorageKeyJobData = z.infer; + +export const removeStorageKeyJobQueue = new JobQueue({ + store: jobQueueDatabaseStore, + + queueType: 'remove storage key', + + maxAttempts: 100, + + parseData(data: unknown): RemoveStorageKeyJobData { + return removeStorageKeyJobDataSchema.parse(data); + }, + + async run({ + data, + }: Readonly<{ data: RemoveStorageKeyJobData }>): Promise { + await new Promise(resolve => { + window.storage.onready(resolve); + }); + + await window.storage.remove(data.key); + }, +}); diff --git a/ts/jobs/types.ts b/ts/jobs/types.ts new file mode 100644 index 000000000..2d3dfb47a --- /dev/null +++ b/ts/jobs/types.ts @@ -0,0 +1,37 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +export type JobQueueStore = { + /** + * Add a job to the database. Doing this should enqueue it in the stream. + */ + insert(job: Readonly): Promise; + + /** + * Remove a job. This should be called when a job finishes successfully or + * if a job has totally failed. + * + * It should NOT be called to cancel a job. + */ + delete(id: string): Promise; + + /** + * Stream jobs for a given queue. At startup, this stream may produce a bunch of + * jobs. After that, it should produce one job per `insert`. + */ + stream(queueType: string): AsyncIterable; +}; + +export type ParsedJob = { + readonly id: string; + readonly timestamp: number; + readonly queueType: string; + readonly data: T; +}; + +export type StoredJob = { + readonly id: string; + readonly timestamp: number; + readonly queueType: string; + readonly data?: unknown; +}; diff --git a/ts/services/senderCertificate.ts b/ts/services/senderCertificate.ts index c792fa33d..4958e49c1 100644 --- a/ts/services/senderCertificate.ts +++ b/ts/services/senderCertificate.ts @@ -60,8 +60,6 @@ export class SenderCertificateService { this.navigator = navigator; this.onlineEventTarget = onlineEventTarget; this.storage = storage; - - removeOldKey(storage); } async get( @@ -242,12 +240,4 @@ function isExpirationValid(expiration: unknown): expiration is number { return typeof expiration === 'number' && expiration > Date.now(); } -function removeOldKey(storage: Readonly) { - const oldCertKey = 'senderCertificateWithUuid'; - const oldUuidCert = storage.get(oldCertKey); - if (oldUuidCert) { - storage.remove(oldCertKey); - } -} - export const senderCertificateService = new SenderCertificateService(); diff --git a/ts/sql/Client.ts b/ts/sql/Client.ts index 299584d75..675bf629f 100644 --- a/ts/sql/Client.ts +++ b/ts/sql/Client.ts @@ -33,6 +33,7 @@ import { ConversationModelCollectionType, MessageModelCollectionType, } from '../model-types.d'; +import { StoredJob } from '../jobs/types'; import { AttachmentDownloadJobType, @@ -225,6 +226,10 @@ const dataInterface: ClientInterface = { getMessagesWithVisualMediaAttachments, getMessagesWithFileAttachments, + getJobsInQueue, + insertJob, + deleteJob, + // Test-only _getAllMessages, @@ -1491,3 +1496,15 @@ async function getMessagesWithFileAttachments( limit, }); } + +function getJobsInQueue(queueType: string): Promise> { + return channels.getJobsInQueue(queueType); +} + +function insertJob(job: Readonly): Promise { + return channels.insertJob(job); +} + +function deleteJob(id: string): Promise { + return channels.deleteJob(id); +} diff --git a/ts/sql/Interface.ts b/ts/sql/Interface.ts index c04b4997d..b75235b90 100644 --- a/ts/sql/Interface.ts +++ b/ts/sql/Interface.ts @@ -13,6 +13,7 @@ import { } from '../model-types.d'; import { MessageModel } from '../models/messages'; import { ConversationModel } from '../models/conversations'; +import { StoredJob } from '../jobs/types'; export type AttachmentDownloadJobType = { id: string; @@ -289,6 +290,10 @@ export type DataInterface = { conversationId: string, options: { limit: number } ) => Promise>; + + getJobsInQueue(queueType: string): Promise>; + insertJob(job: Readonly): Promise; + deleteJob(id: string): Promise; }; // The reason for client/server divergence is the need to inject Backbone models and diff --git a/ts/sql/Server.ts b/ts/sql/Server.ts index 7c1f13c4a..56aea81cc 100644 --- a/ts/sql/Server.ts +++ b/ts/sql/Server.ts @@ -31,8 +31,10 @@ import { import { assert } from '../util/assert'; import { isNormalNumber } from '../util/isNormalNumber'; import { combineNames } from '../util/combineNames'; +import { isNotNil } from '../util/isNotNil'; import { GroupV2MemberType } from '../model-types.d'; +import { StoredJob } from '../jobs/types'; import { AttachmentDownloadJobType, @@ -214,6 +216,10 @@ const dataInterface: ServerInterface = { getMessagesWithVisualMediaAttachments, getMessagesWithFileAttachments, + getJobsInQueue, + insertJob, + deleteJob, + // Server-only initialize, @@ -1687,6 +1693,27 @@ async function updateToSchemaVersion27(currentVersion: number, db: Database) { })(); } +function updateToSchemaVersion28(currentVersion: number, db: Database) { + if (currentVersion >= 28) { + return; + } + + db.transaction(() => { + db.exec(` + CREATE TABLE jobs( + id TEXT PRIMARY KEY, + queueType TEXT STRING NOT NULL, + timestamp INTEGER NOT NULL, + data STRING TEXT + ); + + CREATE INDEX jobs_timestamp ON jobs (timestamp); + `); + + db.pragma('user_version = 28'); + })(); +} + const SCHEMA_VERSIONS = [ updateToSchemaVersion1, updateToSchemaVersion2, @@ -1715,6 +1742,7 @@ const SCHEMA_VERSIONS = [ updateToSchemaVersion25, updateToSchemaVersion26, updateToSchemaVersion27, + updateToSchemaVersion28, ]; function updateSchema(db: Database): void { @@ -4241,6 +4269,7 @@ async function removeAll(): Promise { DELETE FROM stickers; DELETE FROM sticker_packs; DELETE FROM sticker_references; + DELETE FROM jobs; `); })(); } @@ -4257,6 +4286,7 @@ async function removeAllConfiguration(): Promise { DELETE FROM sessions; DELETE FROM signedPreKeys; DELETE FROM unprocessed; + DELETE FROM jobs; `); })(); } @@ -4638,3 +4668,48 @@ async function removeKnownDraftAttachments( return Object.keys(lookup); } + +async function getJobsInQueue(queueType: string): Promise> { + const db = getInstance(); + + return db + .prepare( + ` + SELECT id, timestamp, data + FROM jobs + WHERE queueType = $queueType + ORDER BY timestamp; + ` + ) + .all({ queueType }) + .map(row => ({ + id: row.id, + queueType, + timestamp: row.timestamp, + data: isNotNil(row.data) ? JSON.parse(row.data) : undefined, + })); +} + +async function insertJob(job: Readonly): Promise { + const db = getInstance(); + + db.prepare( + ` + INSERT INTO jobs + (id, queueType, timestamp, data) + VALUES + ($id, $queueType, $timestamp, $data); + ` + ).run({ + id: job.id, + queueType: job.queueType, + timestamp: job.timestamp, + data: isNotNil(job.data) ? JSON.stringify(job.data) : null, + }); +} + +async function deleteJob(id: string): Promise { + const db = getInstance(); + + db.prepare('DELETE FROM jobs WHERE id = $id').run({ id }); +} diff --git a/ts/test-both/AsyncQueue_test.ts b/ts/test-both/AsyncQueue_test.ts new file mode 100644 index 000000000..c20a411f4 --- /dev/null +++ b/ts/test-both/AsyncQueue_test.ts @@ -0,0 +1,36 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable no-restricted-syntax */ + +import { assert } from 'chai'; + +import { AsyncQueue } from '../util/AsyncQueue'; + +describe('AsyncQueue', () => { + it('yields values as they are added, even if they were added before consuming', async () => { + const queue = new AsyncQueue(); + + queue.add(1); + queue.add(2); + + const resultPromise = (async () => { + const results = []; + for await (const value of queue) { + results.push(value); + if (value === 4) { + break; + } + } + return results; + })(); + + queue.add(3); + queue.add(4); + + // Ignored, because we should've stopped iterating. + queue.add(5); + + assert.deepEqual(await resultPromise, [1, 2, 3, 4]); + }); +}); diff --git a/ts/test-both/util/asyncIterables_test.ts b/ts/test-both/util/asyncIterables_test.ts new file mode 100644 index 000000000..b52d4917e --- /dev/null +++ b/ts/test-both/util/asyncIterables_test.ts @@ -0,0 +1,89 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable no-await-in-loop */ +/* eslint-disable no-restricted-syntax */ + +import { assert } from 'chai'; + +import { + MaybeAsyncIterable, + concat, + wrapPromise, +} from '../../util/asyncIterables'; + +describe('async iterable utilities', () => { + describe('concat', () => { + it('returns an empty async iterable if called with an empty list', async () => { + const result = concat([]); + + assert.isEmpty(await collect(result)); + }); + + it('concatenates synchronous and asynchronous iterables', async () => { + function* makeSync() { + yield 'sync 1'; + yield 'sync 2'; + } + async function* makeAsync() { + yield 'async 1'; + yield 'async 2'; + } + + const syncIterable: Iterable = makeSync(); + const asyncIterable1: AsyncIterable = makeAsync(); + const asyncIterable2: AsyncIterable = makeAsync(); + + const result = concat([ + syncIterable, + asyncIterable1, + ['array 1', 'array 2'], + asyncIterable2, + ]); + + assert.deepEqual(await collect(result), [ + 'sync 1', + 'sync 2', + 'async 1', + 'async 2', + 'array 1', + 'array 2', + 'async 1', + 'async 2', + ]); + }); + }); + + describe('wrapPromise', () => { + it('resolves to an array when wrapping a synchronous iterable', async () => { + const iterable = new Set([1, 2, 3]); + + const result = wrapPromise(Promise.resolve(iterable)); + assert.sameMembers(await collect(result), [1, 2, 3]); + }); + + it('resolves to an array when wrapping an asynchronous iterable', async () => { + const iterable = (async function* test() { + yield 1; + yield 2; + yield 3; + })(); + + const result = wrapPromise(Promise.resolve(iterable)); + assert.deepEqual(await collect(result), [1, 2, 3]); + }); + }); +}); + +/** + * Turns an iterable into a fully-realized array. + * + * If we want this outside of tests, we could make it into a "real" function. + */ +async function collect(iterable: MaybeAsyncIterable): Promise> { + const result: Array = []; + for await (const value of iterable) { + result.push(value); + } + return result; +} diff --git a/ts/test-electron/services/senderCertificate_test.ts b/ts/test-electron/services/senderCertificate_test.ts index ad6dd22d7..8cdd6498f 100644 --- a/ts/test-electron/services/senderCertificate_test.ts +++ b/ts/test-electron/services/senderCertificate_test.ts @@ -93,24 +93,6 @@ describe('SenderCertificateService', () => { fakeStorage.get.withArgs('password').returns('abc123'); }); - describe('initialize', () => { - it('removes an old storage service key if it was present', () => { - fakeStorage.get - .withArgs('senderCertificateWithUuid') - .returns('some value'); - - initializeTestService(); - - sinon.assert.calledWith(fakeStorage.remove, 'senderCertificateWithUuid'); - }); - - it("doesn't remove anything from storage if it wasn't there", () => { - initializeTestService(); - - sinon.assert.notCalled(fakeStorage.put); - }); - }); - describe('get', () => { it('returns valid yes-E164 certificates from storage if they exist', async () => { const cert = { diff --git a/ts/test-node/jobs/JobError_test.ts b/ts/test-node/jobs/JobError_test.ts new file mode 100644 index 000000000..dcb7c81ea --- /dev/null +++ b/ts/test-node/jobs/JobError_test.ts @@ -0,0 +1,33 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { JobError } from '../../jobs/JobError'; + +describe('JobError', () => { + it('stores the provided argument as a property', () => { + const fakeError = new Error('uh oh'); + const jobError1 = new JobError(fakeError); + assert.strictEqual(jobError1.lastErrorThrownByJob, fakeError); + + const jobError2 = new JobError(123); + assert.strictEqual(jobError2.lastErrorThrownByJob, 123); + }); + + it('if passed an Error, augments its `message`', () => { + const fakeError = new Error('uh oh'); + const jobError = new JobError(fakeError); + + assert.strictEqual(jobError.message, 'Job failed. Last error: uh oh'); + }); + + it('if passed a non-Error, stringifies it', () => { + const jobError = new JobError({ foo: 'bar' }); + + assert.strictEqual( + jobError.message, + 'Job failed. Last error: {"foo":"bar"}' + ); + }); +}); diff --git a/ts/test-node/jobs/JobQueueDatabaseStore_test.ts b/ts/test-node/jobs/JobQueueDatabaseStore_test.ts new file mode 100644 index 000000000..89a2bbffb --- /dev/null +++ b/ts/test-node/jobs/JobQueueDatabaseStore_test.ts @@ -0,0 +1,202 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable no-restricted-syntax */ + +import { assert } from 'chai'; +import * as sinon from 'sinon'; +import { noop } from 'lodash'; +import { StoredJob } from '../../jobs/types'; + +import { JobQueueDatabaseStore } from '../../jobs/JobQueueDatabaseStore'; + +describe('JobQueueDatabaseStore', () => { + let fakeDatabase: { + getJobsInQueue: sinon.SinonStub; + insertJob: sinon.SinonStub; + deleteJob: sinon.SinonStub; + }; + + beforeEach(() => { + fakeDatabase = { + getJobsInQueue: sinon.stub().resolves([]), + insertJob: sinon.stub(), + deleteJob: sinon.stub(), + }; + }); + + describe('insert', () => { + it("fails if streaming hasn't started yet", async () => { + const store = new JobQueueDatabaseStore(fakeDatabase); + + let error: unknown; + try { + await store.insert({ + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); + } catch (err: unknown) { + error = err; + } + + assert.instanceOf(error, Error); + }); + + it('adds jobs to the database', async () => { + const store = new JobQueueDatabaseStore(fakeDatabase); + store.stream('test queue'); + + await store.insert({ + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); + + sinon.assert.calledOnce(fakeDatabase.insertJob); + sinon.assert.calledWithMatch(fakeDatabase.insertJob, { + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); + }); + + it('enqueues jobs after putting them in the database', async () => { + const events: Array = []; + + fakeDatabase.insertJob.callsFake(() => { + events.push('insert'); + }); + + const store = new JobQueueDatabaseStore(fakeDatabase); + + const streamPromise = (async () => { + // We don't actually care about using the variable from the async iterable. + // eslint-disable-next-line @typescript-eslint/no-unused-vars + for await (const _job of store.stream('test queue')) { + events.push('yielded job'); + break; + } + })(); + + await store.insert({ + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); + + await streamPromise; + + assert.deepEqual(events, ['insert', 'yielded job']); + }); + + it("doesn't insert jobs until the initial fetch has completed", async () => { + const events: Array = []; + + let resolveGetJobsInQueue = noop; + const getJobsInQueuePromise = new Promise(resolve => { + resolveGetJobsInQueue = resolve; + }); + + fakeDatabase.getJobsInQueue.callsFake(() => { + events.push('loaded jobs'); + return getJobsInQueuePromise; + }); + fakeDatabase.insertJob.callsFake(() => { + events.push('insert'); + }); + + const store = new JobQueueDatabaseStore(fakeDatabase); + store.stream('test queue'); + + const insertPromise = store.insert({ + id: 'abc', + timestamp: 1234, + queueType: 'test queue', + data: { hi: 5 }, + }); + + sinon.assert.notCalled(fakeDatabase.insertJob); + + resolveGetJobsInQueue([]); + await insertPromise; + + sinon.assert.calledOnce(fakeDatabase.insertJob); + assert.deepEqual(events, ['loaded jobs', 'insert']); + }); + }); + + describe('delete', () => { + it('deletes jobs from the database', async () => { + const store = new JobQueueDatabaseStore(fakeDatabase); + + await store.delete('xyz'); + + sinon.assert.calledOnce(fakeDatabase.deleteJob); + sinon.assert.calledWith(fakeDatabase.deleteJob, 'xyz'); + }); + }); + + describe('stream', () => { + it('yields all values in the database, then all values inserted', async () => { + const makeJob = (id: string, queueType: string) => ({ + id, + timestamp: Date.now(), + queueType, + data: { hi: 5 }, + }); + + const ids = async ( + stream: AsyncIterable, + amount: number + ): Promise> => { + const result: Array = []; + for await (const job of stream) { + result.push(job.id); + if (result.length >= amount) { + break; + } + } + return result; + }; + + fakeDatabase.getJobsInQueue + .withArgs('queue A') + .resolves([ + makeJob('A.1', 'queue A'), + makeJob('A.2', 'queue A'), + makeJob('A.3', 'queue A'), + ]); + + fakeDatabase.getJobsInQueue.withArgs('queue B').resolves([]); + + fakeDatabase.getJobsInQueue + .withArgs('queue C') + .resolves([makeJob('C.1', 'queue C'), makeJob('C.2', 'queue C')]); + + const store = new JobQueueDatabaseStore(fakeDatabase); + + const streamA = store.stream('queue A'); + const streamB = store.stream('queue B'); + const streamC = store.stream('queue C'); + + await store.insert(makeJob('A.4', 'queue A')); + await store.insert(makeJob('C.3', 'queue C')); + await store.insert(makeJob('B.1', 'queue B')); + await store.insert(makeJob('A.5', 'queue A')); + + const streamAIds = await ids(streamA, 5); + const streamBIds = await ids(streamB, 1); + const streamCIds = await ids(streamC, 3); + assert.deepEqual(streamAIds, ['A.1', 'A.2', 'A.3', 'A.4', 'A.5']); + assert.deepEqual(streamBIds, ['B.1']); + assert.deepEqual(streamCIds, ['C.1', 'C.2', 'C.3']); + + sinon.assert.calledThrice(fakeDatabase.getJobsInQueue); + }); + }); +}); diff --git a/ts/test-node/jobs/JobQueue_test.ts b/ts/test-node/jobs/JobQueue_test.ts new file mode 100644 index 000000000..5ee0a459f --- /dev/null +++ b/ts/test-node/jobs/JobQueue_test.ts @@ -0,0 +1,533 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; +import * as sinon from 'sinon'; +import EventEmitter, { once } from 'events'; +import * as z from 'zod'; +import { identity, noop, groupBy } from 'lodash'; +import { v4 as uuid } from 'uuid'; +import { JobError } from '../../jobs/JobError'; +import { TestJobQueueStore } from './TestJobQueueStore'; +import { missingCaseError } from '../../util/missingCaseError'; +import { assertRejects } from '../helpers'; + +import { JobQueue } from '../../jobs/JobQueue'; +import { ParsedJob, StoredJob, JobQueueStore } from '../../jobs/types'; + +describe('JobQueue', () => { + describe('end-to-end tests', () => { + it('writes jobs to the database, processes them, and then deletes them', async () => { + const testJobSchema = z.object({ + a: z.number(), + b: z.number(), + }); + + type TestJobData = z.infer; + + const results = new Set(); + const store = new TestJobQueueStore(); + + const addQueue = new JobQueue({ + store, + queueType: 'test add queue', + maxAttempts: 1, + parseData(data: unknown): TestJobData { + return testJobSchema.parse(data); + }, + async run({ data }: ParsedJob): Promise { + results.add(data.a + data.b); + }, + }); + + assert.deepEqual(results, new Set()); + assert.isEmpty(store.storedJobs); + + addQueue.streamJobs(); + + store.pauseStream('test add queue'); + const job1 = await addQueue.add({ a: 1, b: 2 }); + const job2 = await addQueue.add({ a: 3, b: 4 }); + + assert.deepEqual(results, new Set()); + assert.lengthOf(store.storedJobs, 2); + + store.resumeStream('test add queue'); + + await job1.completion; + await job2.completion; + + assert.deepEqual(results, new Set([3, 7])); + assert.isEmpty(store.storedJobs); + }); + + it('writes jobs to the database correctly', async () => { + const store = new TestJobQueueStore(); + + const queue1 = new JobQueue({ + store, + queueType: 'test 1', + maxAttempts: 1, + parseData: (data: unknown): string => { + return z.string().parse(data); + }, + run: sinon.stub().resolves(), + }); + const queue2 = new JobQueue({ + store, + queueType: 'test 2', + maxAttempts: 1, + parseData: (data: unknown): string => { + return z.string().parse(data); + }, + run(): Promise { + return Promise.resolve(); + }, + }); + + store.pauseStream('test 1'); + store.pauseStream('test 2'); + + queue1.streamJobs(); + queue2.streamJobs(); + + await queue1.add('one'); + await queue2.add('A'); + await queue1.add('two'); + await queue2.add('B'); + await queue1.add('three'); + + assert.lengthOf(store.storedJobs, 5); + + const ids = store.storedJobs.map(job => job.id); + assert.lengthOf( + store.storedJobs, + new Set(ids).size, + 'Expected every job to have a unique ID' + ); + + const timestamps = store.storedJobs.map(job => job.timestamp); + timestamps.forEach(timestamp => { + assert.approximately( + timestamp, + Date.now(), + 3000, + 'Expected the timestamp to be ~now' + ); + }); + + const datas = store.storedJobs.map(job => job.data); + assert.sameMembers( + datas, + ['three', 'two', 'one', 'A', 'B'], + "Expected every job's data to be stored" + ); + + const queueTypes = groupBy(store.storedJobs, 'queueType'); + assert.hasAllKeys(queueTypes, ['test 1', 'test 2']); + assert.lengthOf(queueTypes['test 1'], 3); + assert.lengthOf(queueTypes['test 2'], 2); + }); + + it('retries jobs, running them up to maxAttempts times', async () => { + type TestJobData = 'foo' | 'bar'; + + let fooAttempts = 0; + let barAttempts = 0; + let fooSucceeded = false; + + const store = new TestJobQueueStore(); + + const retryQueue = new JobQueue({ + store, + queueType: 'test retry queue', + maxAttempts: 5, + parseData(data: unknown): TestJobData { + if (data !== 'foo' && data !== 'bar') { + throw new Error('Invalid data'); + } + return data; + }, + async run({ data }: ParsedJob): Promise { + switch (data) { + case 'foo': + fooAttempts += 1; + if (fooAttempts < 3) { + throw new Error( + 'foo job should fail the first and second time' + ); + } + fooSucceeded = true; + break; + case 'bar': + barAttempts += 1; + throw new Error('bar job always fails in this test'); + break; + default: + throw missingCaseError(data); + } + }, + }); + + retryQueue.streamJobs(); + + await (await retryQueue.add('foo')).completion; + + let booErr: unknown; + try { + await (await retryQueue.add('bar')).completion; + } catch (err: unknown) { + booErr = err; + } + + assert.strictEqual(fooAttempts, 3); + assert.isTrue(fooSucceeded); + + assert.strictEqual(barAttempts, 5); + + // Chai's `assert.instanceOf` doesn't tell TypeScript anything, so we do it here. + if (!(booErr instanceof JobError)) { + assert.fail('Expected error to be a JobError'); + return; + } + assert.include(booErr.message, 'bar job always fails in this test'); + + assert.isEmpty(store.storedJobs); + }); + + it('makes job.completion reject if parseData throws', async () => { + const queue = new JobQueue({ + store: new TestJobQueueStore(), + queueType: 'test queue', + maxAttempts: 999, + parseData: (data: unknown): string => { + if (data === 'valid') { + return data; + } + throw new Error('uh oh'); + }, + run: sinon.stub().resolves(), + }); + + queue.streamJobs(); + + const job = await queue.add('this will fail to parse'); + + let jobError: unknown; + try { + await job.completion; + } catch (err: unknown) { + jobError = err; + } + + // Chai's `assert.instanceOf` doesn't tell TypeScript anything, so we do it here. + if (!(jobError instanceof JobError)) { + assert.fail('Expected error to be a JobError'); + return; + } + assert.include( + jobError.message, + 'Failed to parse job data. Was unexpected data loaded from the database?' + ); + }); + + it("doesn't run the job if parseData throws", async () => { + const run = sinon.stub().resolves(); + + const queue = new JobQueue({ + store: new TestJobQueueStore(), + queueType: 'test queue', + maxAttempts: 999, + parseData: (data: unknown): string => { + if (data === 'valid') { + return data; + } + throw new Error('invalid data!'); + }, + run, + }); + + queue.streamJobs(); + + (await queue.add('invalid')).completion.catch(noop); + (await queue.add('invalid')).completion.catch(noop); + await queue.add('valid'); + (await queue.add('invalid')).completion.catch(noop); + (await queue.add('invalid')).completion.catch(noop); + + sinon.assert.calledOnce(run); + sinon.assert.calledWithMatch(run, { data: 'valid' }); + }); + + it('keeps jobs in the storage if parseData throws', async () => { + const store = new TestJobQueueStore(); + + const queue = new JobQueue({ + store, + queueType: 'test queue', + maxAttempts: 999, + parseData: (data: unknown): string => { + if (data === 'valid') { + return data; + } + throw new Error('uh oh'); + }, + run: sinon.stub().resolves(), + }); + + queue.streamJobs(); + + await (await queue.add('invalid 1')).completion.catch(noop); + await (await queue.add('invalid 2')).completion.catch(noop); + + const datas = store.storedJobs.map(job => job.data); + assert.sameMembers(datas, ['invalid 1', 'invalid 2']); + }); + + it('adding the job resolves AFTER inserting the job into the database', async () => { + let inserted = false; + + const store = new TestJobQueueStore(); + store.events.on('insert', () => { + inserted = true; + }); + + const queue = new JobQueue({ + store, + queueType: 'test queue', + maxAttempts: 999, + parseData: identity, + run: sinon.stub().resolves(), + }); + + queue.streamJobs(); + + const addPromise = queue.add(undefined); + assert.isFalse(inserted); + + await addPromise; + assert.isTrue(inserted); + }); + + it('starts the job AFTER inserting the job into the database', async () => { + const events: Array = []; + + const store = new TestJobQueueStore(); + store.events.on('insert', () => { + events.push('insert'); + }); + + const queue = new JobQueue({ + store, + queueType: 'test queue', + maxAttempts: 999, + parseData: (data: unknown): unknown => { + events.push('parsing data'); + return data; + }, + async run() { + events.push('running'); + }, + }); + + queue.streamJobs(); + + await (await queue.add(123)).completion; + + assert.deepEqual(events, ['insert', 'parsing data', 'running']); + }); + + it('resolves job.completion AFTER deleting the job from the database', async () => { + const events: Array = []; + + const store = new TestJobQueueStore(); + store.events.on('delete', () => { + events.push('delete'); + }); + + const queue = new JobQueue({ + store, + queueType: 'test queue', + maxAttempts: 999, + parseData: identity, + run: sinon.stub().resolves(), + }); + + queue.streamJobs(); + + store.pauseStream('test queue'); + const job = await queue.add(undefined); + // eslint-disable-next-line more/no-then + const jobCompletionPromise = job.completion.then(() => { + events.push('resolved'); + }); + assert.lengthOf(store.storedJobs, 1); + + store.resumeStream('test queue'); + + await jobCompletionPromise; + + assert.deepEqual(events, ['delete', 'resolved']); + }); + + it('if the job fails after every attempt, rejects job.completion AFTER deleting the job from the database', async () => { + const events: Array = []; + + const store = new TestJobQueueStore(); + store.events.on('delete', () => { + events.push('delete'); + }); + + const queue = new JobQueue({ + store, + queueType: 'test queue', + maxAttempts: 5, + parseData: identity, + async run() { + events.push('running'); + throw new Error('uh oh'); + }, + }); + + queue.streamJobs(); + + store.pauseStream('test queue'); + const job = await queue.add(undefined); + const jobCompletionPromise = job.completion.catch(() => { + events.push('rejected'); + }); + assert.lengthOf(store.storedJobs, 1); + + store.resumeStream('test queue'); + + await jobCompletionPromise; + + assert.deepEqual(events, [ + 'running', + 'running', + 'running', + 'running', + 'running', + 'delete', + 'rejected', + ]); + }); + }); + + describe('streamJobs', () => { + const storedJobSchema = z.object({ + id: z.string(), + timestamp: z.number(), + queueType: z.string(), + data: z.unknown(), + }); + + class FakeStream implements AsyncIterable { + private eventEmitter = new EventEmitter(); + + async *[Symbol.asyncIterator]() { + while (true) { + // eslint-disable-next-line no-await-in-loop + const [job] = await once(this.eventEmitter, 'drip'); + yield storedJobSchema.parse(job); + } + } + + drip(job: Readonly): void { + this.eventEmitter.emit('drip', job); + } + } + + let fakeStream: FakeStream; + let fakeStore: JobQueueStore; + + beforeEach(() => { + fakeStream = new FakeStream(); + fakeStore = { + insert: sinon.stub().resolves(), + delete: sinon.stub().resolves(), + stream: sinon.stub().returns(fakeStream), + }; + }); + + it('starts streaming jobs from the store', async () => { + const eventEmitter = new EventEmitter(); + + const noopQueue = new JobQueue({ + store: fakeStore, + queueType: 'test noop queue', + maxAttempts: 99, + parseData(data: unknown): number { + return z.number().parse(data); + }, + async run({ data }: Readonly<{ data: number }>) { + eventEmitter.emit('run', data); + }, + }); + + sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub); + + noopQueue.streamJobs(); + + sinon.assert.calledOnce(fakeStore.stream as sinon.SinonStub); + + fakeStream.drip({ + id: uuid(), + timestamp: Date.now(), + queueType: 'test noop queue', + data: 123, + }); + const [firstRunData] = await once(eventEmitter, 'run'); + + fakeStream.drip({ + id: uuid(), + timestamp: Date.now(), + queueType: 'test noop queue', + data: 456, + }); + const [secondRunData] = await once(eventEmitter, 'run'); + + assert.strictEqual(firstRunData, 123); + assert.strictEqual(secondRunData, 456); + }); + + it('rejects when called more than once', async () => { + const noopQueue = new JobQueue({ + store: fakeStore, + queueType: 'test noop queue', + maxAttempts: 99, + parseData: identity, + run: sinon.stub().resolves(), + }); + + noopQueue.streamJobs(); + + await assertRejects(() => noopQueue.streamJobs()); + await assertRejects(() => noopQueue.streamJobs()); + + sinon.assert.calledOnce(fakeStore.stream as sinon.SinonStub); + }); + }); + + describe('add', () => { + it('rejects if the job queue has not started streaming', async () => { + const fakeStore = { + insert: sinon.stub().resolves(), + delete: sinon.stub().resolves(), + stream: sinon.stub(), + }; + + const noopQueue = new JobQueue({ + store: fakeStore, + queueType: 'test noop queue', + maxAttempts: 99, + parseData: identity, + run: sinon.stub().resolves(), + }); + + await assertRejects(() => noopQueue.add(undefined)); + + sinon.assert.notCalled(fakeStore.stream as sinon.SinonStub); + }); + }); +}); diff --git a/ts/test-node/jobs/Job_test.ts b/ts/test-node/jobs/Job_test.ts new file mode 100644 index 000000000..805eaba75 --- /dev/null +++ b/ts/test-node/jobs/Job_test.ts @@ -0,0 +1,24 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { assert } from 'chai'; + +import { Job } from '../../jobs/Job'; + +describe('Job', () => { + it('stores its arguments', () => { + const id = 'abc123'; + const timestamp = Date.now(); + const queueType = 'test queue'; + const data = { foo: 'bar' }; + const completion = Promise.resolve(); + + const job = new Job(id, timestamp, queueType, data, completion); + + assert.strictEqual(job.id, id); + assert.strictEqual(job.timestamp, timestamp); + assert.strictEqual(job.queueType, queueType); + assert.strictEqual(job.data, data); + assert.strictEqual(job.completion, completion); + }); +}); diff --git a/ts/test-node/jobs/TestJobQueueStore.ts b/ts/test-node/jobs/TestJobQueueStore.ts new file mode 100644 index 000000000..1fc370310 --- /dev/null +++ b/ts/test-node/jobs/TestJobQueueStore.ts @@ -0,0 +1,131 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable max-classes-per-file */ +/* eslint-disable no-restricted-syntax */ +/* eslint-disable no-await-in-loop */ + +import EventEmitter, { once } from 'events'; + +import { JobQueueStore, StoredJob } from '../../jobs/types'; +import { sleep } from '../../util/sleep'; + +export class TestJobQueueStore implements JobQueueStore { + events = new EventEmitter(); + + private openStreams = new Set(); + + private pipes = new Map(); + + storedJobs: Array = []; + + constructor(jobs: ReadonlyArray = []) { + jobs.forEach(job => { + this.insert(job); + }); + } + + async insert(job: Readonly): Promise { + await fakeDelay(); + + this.storedJobs.forEach(storedJob => { + if (job.id === storedJob.id) { + throw new Error('Cannot store two jobs with the same ID'); + } + }); + + this.storedJobs.push(job); + + this.getPipe(job.queueType).add(job); + + this.events.emit('insert'); + } + + async delete(id: string): Promise { + await fakeDelay(); + + this.storedJobs = this.storedJobs.filter(job => job.id !== id); + + this.events.emit('delete'); + } + + stream(queueType: string): Pipe { + if (this.openStreams.has(queueType)) { + throw new Error('Cannot stream the same queueType more than once'); + } + this.openStreams.add(queueType); + + return this.getPipe(queueType); + } + + pauseStream(queueType: string): void { + return this.getPipe(queueType).pause(); + } + + resumeStream(queueType: string): void { + return this.getPipe(queueType).resume(); + } + + private getPipe(queueType: string): Pipe { + const existingPipe = this.pipes.get(queueType); + if (existingPipe) { + return existingPipe; + } + + const result = new Pipe(); + this.pipes.set(queueType, result); + return result; + } +} + +class Pipe implements AsyncIterable { + private queue: Array = []; + + private eventEmitter = new EventEmitter(); + + private isLocked = false; + + private isPaused = false; + + add(value: Readonly) { + this.queue.push(value); + this.eventEmitter.emit('add'); + } + + async *[Symbol.asyncIterator]() { + if (this.isLocked) { + throw new Error('Cannot iterate over a pipe more than once'); + } + this.isLocked = true; + + while (true) { + for (const value of this.queue) { + await this.waitForUnpaused(); + yield value; + } + this.queue = []; + + // We do this because we want to yield values in series. + await once(this.eventEmitter, 'add'); + } + } + + pause(): void { + this.isPaused = true; + } + + resume(): void { + this.isPaused = false; + this.eventEmitter.emit('resume'); + } + + private async waitForUnpaused() { + if (this.isPaused) { + await once(this.eventEmitter, 'resume'); + } + } +} + +function fakeDelay(): Promise { + return sleep(0); +} diff --git a/ts/util/AsyncQueue.ts b/ts/util/AsyncQueue.ts new file mode 100644 index 000000000..5b2f5ef6d --- /dev/null +++ b/ts/util/AsyncQueue.ts @@ -0,0 +1,48 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +import { once, noop } from 'lodash'; + +/** + * You can do two things with an async queue: + * + * 1. Put values in. + * 2. Consume values out in the order they were added. + * + * Values are removed from the queue when they're consumed. + * + * There can only be one consumer, though this could be changed. + * + * See the tests to see how this works. + */ +export class AsyncQueue implements AsyncIterable { + private onAdd: () => void = noop; + + private queue: Array = []; + + private isReading = false; + + add(value: Readonly): void { + this.queue.push(value); + this.onAdd(); + } + + async *[Symbol.asyncIterator](): AsyncIterator { + if (this.isReading) { + throw new Error('Cannot iterate over a queue more than once'); + } + this.isReading = true; + + while (true) { + yield* this.queue; + + this.queue = []; + + // We want to iterate over the queue in series. + // eslint-disable-next-line no-await-in-loop + await new Promise(resolve => { + this.onAdd = once(resolve); + }); + } + } +} diff --git a/ts/util/asyncIterables.ts b/ts/util/asyncIterables.ts new file mode 100644 index 000000000..cd00128ba --- /dev/null +++ b/ts/util/asyncIterables.ts @@ -0,0 +1,42 @@ +// Copyright 2021 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only + +/* eslint-disable max-classes-per-file */ +/* eslint-disable no-await-in-loop */ +/* eslint-disable no-restricted-syntax */ + +export type MaybeAsyncIterable = Iterable | AsyncIterable; + +export function concat( + iterables: Iterable> +): AsyncIterable { + return new ConcatAsyncIterable(iterables); +} + +class ConcatAsyncIterable implements AsyncIterable { + constructor(private readonly iterables: Iterable>) {} + + async *[Symbol.asyncIterator](): AsyncIterator { + for (const iterable of this.iterables) { + for await (const value of iterable) { + yield value; + } + } + } +} + +export function wrapPromise( + promise: Promise> +): AsyncIterable { + return new WrapPromiseAsyncIterable(promise); +} + +class WrapPromiseAsyncIterable implements AsyncIterable { + constructor(private readonly promise: Promise>) {} + + async *[Symbol.asyncIterator](): AsyncIterator { + for await (const value of await this.promise) { + yield value; + } + } +}