Retry SQL query on successful FTS5 recovery

This commit is contained in:
Fedor Indutny
2025-02-20 20:05:15 -07:00
committed by GitHub
parent 4ad361f92b
commit cfe5a51a1f
3 changed files with 71 additions and 43 deletions

View File

@@ -1141,7 +1141,7 @@ export type ServerWritableDirectInterface = WritableInterface & {
allStickers: ReadonlyArray<string> allStickers: ReadonlyArray<string>
) => Array<string>; ) => Array<string>;
runCorruptionChecks: () => void; runCorruptionChecks: () => boolean;
}; };
export type ServerWritableInterface = export type ServerWritableInterface =

View File

@@ -6142,41 +6142,54 @@ function getAllBadgeImageFileLocalPaths(db: ReadableDB): Set<string> {
return new Set(localPaths); return new Set(localPaths);
} }
function runCorruptionChecks(db: ReadableDB): void { function runCorruptionChecks(db: WritableDB, isRetrying = false): boolean {
let writable: WritableDB; let ok = true;
try { try {
writable = toUnsafeWritableDB(db, 'integrity check'); const result = db.pragma('integrity_check');
} catch (error) {
logger.error(
'runCorruptionChecks: not running the check, no writable instance',
Errors.toLogFormat(error)
);
return;
}
try {
const result = writable.pragma('integrity_check');
if (result.length === 1 && result.at(0)?.integrity_check === 'ok') { if (result.length === 1 && result.at(0)?.integrity_check === 'ok') {
logger.info('runCorruptionChecks: general integrity is ok'); logger.info('runCorruptionChecks: general integrity is ok');
} else { } else {
logger.error('runCorruptionChecks: general integrity is not ok', result); logger.error('runCorruptionChecks: general integrity is not ok', result);
ok = false;
} }
} catch (error) { } catch (error) {
logger.error( logger.error(
'runCorruptionChecks: general integrity check error', 'runCorruptionChecks: general integrity check error',
Errors.toLogFormat(error) Errors.toLogFormat(error)
); );
ok = false;
} }
try { try {
writable.exec( db.exec("INSERT INTO messages_fts(messages_fts) VALUES('integrity-check')");
"INSERT INTO messages_fts(messages_fts) VALUES('integrity-check')"
);
logger.info('runCorruptionChecks: FTS5 integrity ok'); logger.info('runCorruptionChecks: FTS5 integrity ok');
} catch (error) { } catch (error) {
logger.error( logger.error(
'runCorruptionChecks: FTS5 integrity check error.', 'runCorruptionChecks: FTS5 integrity check error.',
Errors.toLogFormat(error) Errors.toLogFormat(error)
); );
ok = false;
if (!isRetrying) {
try {
db.exec("INSERT INTO messages_fts(messages_fts) VALUES('rebuild');");
logger.info('runCorruptionChecks: FTS5 index rebuilt');
} catch (rebuildError) {
logger.error(
'runCorruptionChecks: FTS5 recovery failed',
Errors.toLogFormat(rebuildError)
);
return false;
} }
// Successfully recovered, try again.
logger.info('runCorruptionChecks: retrying');
return runCorruptionChecks(db, true);
}
}
return ok;
} }
type StoryDistributionForDatabase = Readonly< type StoryDistributionForDatabase = Readonly<

View File

@@ -20,28 +20,12 @@ if (!parentPort) {
const port = parentPort; const port = parentPort;
// eslint-disable-next-line @typescript-eslint/no-explicit-any // eslint-disable-next-line @typescript-eslint/no-explicit-any
function respond(seq: number, error: Error | undefined, response?: any) { function respond(seq: number, response?: any) {
let errorKind: SqliteErrorKind | undefined;
if (error !== undefined) {
errorKind = parseSqliteError(error);
if (errorKind === SqliteErrorKind.Corrupted && db != null) {
DataWriter.runCorruptionChecks(db);
}
}
const wrappedResponse: WrappedWorkerResponse = { const wrappedResponse: WrappedWorkerResponse = {
type: 'response', type: 'response',
seq, seq,
error: error: undefined,
error == null errorKind: undefined,
? undefined
: {
name: error.name,
message: error.message,
stack: error.stack,
},
errorKind,
response, response,
}; };
port.postMessage(wrappedResponse); port.postMessage(wrappedResponse);
@@ -84,7 +68,10 @@ let db: WritableDB | undefined;
let isPrimary = false; let isPrimary = false;
let isRemoved = false; let isRemoved = false;
port.on('message', ({ seq, request }: WrappedWorkerRequest) => { const onMessage = (
{ seq, request }: WrappedWorkerRequest,
isRetrying = false
): void => {
try { try {
if (request.type === 'init') { if (request.type === 'init') {
isPrimary = request.isPrimary; isPrimary = request.isPrimary;
@@ -95,13 +82,13 @@ port.on('message', ({ seq, request }: WrappedWorkerRequest) => {
logger, logger,
}); });
respond(seq, undefined, undefined); respond(seq, undefined);
return; return;
} }
// 'close' is sent on shutdown, but we already removed the database. // 'close' is sent on shutdown, but we already removed the database.
if (isRemoved && request.type === 'close') { if (isRemoved && request.type === 'close') {
respond(seq, undefined, undefined); respond(seq, undefined);
process.exit(0); process.exit(0);
return; return;
} }
@@ -127,7 +114,7 @@ port.on('message', ({ seq, request }: WrappedWorkerRequest) => {
isRemoved = true; isRemoved = true;
respond(seq, undefined, undefined); respond(seq, undefined);
return; return;
} }
@@ -143,7 +130,7 @@ port.on('message', ({ seq, request }: WrappedWorkerRequest) => {
} }
db = undefined; db = undefined;
respond(seq, undefined, undefined); respond(seq, undefined);
process.exit(0); process.exit(0);
return; return;
} }
@@ -161,11 +148,39 @@ port.on('message', ({ seq, request }: WrappedWorkerRequest) => {
const result = method(db, ...request.args); const result = method(db, ...request.args);
const end = performance.now(); const end = performance.now();
respond(seq, undefined, { result, duration: end - start }); respond(seq, { result, duration: end - start });
} else { } else {
throw new Error('Unexpected request type'); throw new Error('Unexpected request type');
} }
} catch (error) { } catch (error) {
respond(seq, error, undefined); const errorKind = parseSqliteError(error);
if (errorKind === SqliteErrorKind.Corrupted && db != null) {
const wasRecovered = DataWriter.runCorruptionChecks(db);
if (
wasRecovered &&
!isRetrying &&
// Don't retry 'init'/'close'/'removeDB' automatically and notify user
// about the database error (even on successful recovery).
(request.type === 'sqlCall:read' || request.type === 'sqlCall:write')
) {
logger.error(`Retrying request: ${request.type}`);
return onMessage({ seq, request }, true);
} }
}); }
const wrappedResponse: WrappedWorkerResponse = {
type: 'response',
seq,
error: {
name: error.name,
message: error.message,
stack: error.stack,
},
errorKind,
response: undefined,
};
port.postMessage(wrappedResponse);
}
};
port.on('message', (message: WrappedWorkerRequest) => onMessage(message));