Remove libsignal shadowing modes
This commit is contained in:
@@ -13,7 +13,7 @@ import type { connection as WebSocket } from 'websocket';
|
||||
import qs from 'querystring';
|
||||
import EventListener from 'events';
|
||||
|
||||
import { AbortableProcess } from '../util/AbortableProcess';
|
||||
import type { AbortableProcess } from '../util/AbortableProcess';
|
||||
import { strictAssert } from '../util/assert';
|
||||
import { explodePromise } from '../util/explodePromise';
|
||||
import {
|
||||
@@ -42,7 +42,6 @@ import WebSocketResource, {
|
||||
LibsignalWebSocketResource,
|
||||
ServerRequestType,
|
||||
TransportOption,
|
||||
WebSocketResourceWithShadowing,
|
||||
} from './WebsocketResources';
|
||||
import { ConnectTimeoutError, HTTPError } from './Errors';
|
||||
import type { IRequestHandler, WebAPICredentials } from './Types.d';
|
||||
@@ -50,6 +49,7 @@ import { connect as connectWebSocket } from './WebSocket';
|
||||
import { isNightly, isBeta, isStaging } from '../util/version';
|
||||
import { getBasicAuth } from '../util/getBasicAuth';
|
||||
import { isTestOrMockEnvironment } from '../environment';
|
||||
import type { ConfigKeyType } from '../RemoteConfig';
|
||||
|
||||
const FIVE_MINUTES = 5 * durations.MINUTE;
|
||||
|
||||
@@ -645,34 +645,19 @@ export class SocketManager extends EventListener {
|
||||
return TransportOption.Libsignal;
|
||||
}
|
||||
|
||||
// in alpha, switch to using libsignal transport, unless user opts out,
|
||||
// in which case switching to shadowing
|
||||
let libsignalRemoteConfigFlag: ConfigKeyType;
|
||||
if (isNightly(this.options.version)) {
|
||||
const configValue = window.Signal.RemoteConfig.isEnabled(
|
||||
'desktop.experimentalTransportEnabled.alpha'
|
||||
);
|
||||
return configValue
|
||||
? TransportOption.Libsignal
|
||||
: TransportOption.ShadowingHigh;
|
||||
}
|
||||
|
||||
// in beta, switch to using 'ShadowingHigh' mode, unless user opts out,
|
||||
// in which case switching to `ShadowingLow`
|
||||
if (isBeta(this.options.version)) {
|
||||
const configValue = window.Signal.RemoteConfig.isEnabled(
|
||||
'desktop.experimentalTransportEnabled.beta'
|
||||
);
|
||||
return configValue
|
||||
? TransportOption.ShadowingHigh
|
||||
: TransportOption.ShadowingLow;
|
||||
libsignalRemoteConfigFlag = 'desktop.experimentalTransportEnabled.alpha';
|
||||
} else if (isBeta(this.options.version)) {
|
||||
libsignalRemoteConfigFlag = 'desktop.experimentalTransportEnabled.beta';
|
||||
} else {
|
||||
libsignalRemoteConfigFlag = 'desktop.experimentalTransportEnabled.prod';
|
||||
}
|
||||
|
||||
const configValue = window.Signal.RemoteConfig.isEnabled(
|
||||
'desktop.experimentalTransportEnabled.prod'
|
||||
libsignalRemoteConfigFlag
|
||||
);
|
||||
return configValue
|
||||
? TransportOption.ShadowingLow
|
||||
: TransportOption.Original;
|
||||
return configValue ? TransportOption.Libsignal : TransportOption.Original;
|
||||
}
|
||||
|
||||
async #getUnauthenticatedResource(): Promise<IWebSocketResource> {
|
||||
@@ -812,64 +797,7 @@ export class SocketManager extends EventListener {
|
||||
},
|
||||
});
|
||||
|
||||
const shadowingModeEnabled =
|
||||
!resourceOptions.transportOption ||
|
||||
resourceOptions.transportOption === TransportOption.Original;
|
||||
return shadowingModeEnabled
|
||||
? webSocketResourceConnection
|
||||
: this.#connectWithShadowing(
|
||||
webSocketResourceConnection,
|
||||
resourceOptions
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* A method that takes in an `AbortableProcess<>` that establishes
|
||||
* a `WebSocketResource` connection and wraps it in a process
|
||||
* that also tries to establish a `LibsignalWebSocketResource` connection.
|
||||
*
|
||||
* The shadowing connection will not block the main one (e.g. if it takes
|
||||
* longer to connect) and an error in the shadowing connection will not
|
||||
* affect the overall behavior.
|
||||
*
|
||||
* @param mainConnection an `AbortableProcess<WebSocketResource>` responsible
|
||||
* for establishing a Desktop system WebSocket connection.
|
||||
* @param options `WebSocketResourceOptions` options
|
||||
* @private
|
||||
*/
|
||||
#connectWithShadowing(
|
||||
mainConnection: AbortableProcess<WebSocketResource>,
|
||||
options: WebSocketResourceOptions
|
||||
): AbortableProcess<IWebSocketResource> {
|
||||
// creating an `AbortableProcess` of libsignal websocket connection
|
||||
const shadowingConnection = connectUnauthenticatedLibsignal({
|
||||
libsignalNet: this.libsignalNet,
|
||||
name: options.name,
|
||||
keepalive: options.keepalive ?? {},
|
||||
});
|
||||
const shadowWrapper = async () => {
|
||||
// if main connection results in an error,
|
||||
// it's propagated as the error of the resulting process
|
||||
const mainSocket = await mainConnection.resultPromise;
|
||||
// here, we're not awaiting on `shadowingConnection.resultPromise`
|
||||
// and just letting `WebSocketResourceWithShadowing`
|
||||
// initiate and handle the result of the shadowing connection attempt
|
||||
return new WebSocketResourceWithShadowing(
|
||||
mainSocket,
|
||||
shadowingConnection,
|
||||
options
|
||||
);
|
||||
};
|
||||
return new AbortableProcess<IWebSocketResource>(
|
||||
`WebSocketResourceWithShadowing.connect(${options.name})`,
|
||||
{
|
||||
abort() {
|
||||
mainConnection.abort();
|
||||
shadowingConnection.abort();
|
||||
},
|
||||
},
|
||||
shadowWrapper()
|
||||
);
|
||||
return webSocketResourceConnection;
|
||||
}
|
||||
|
||||
async #checkResource(
|
||||
|
@@ -32,8 +32,6 @@ import pTimeout from 'p-timeout';
|
||||
import { Response } from 'node-fetch';
|
||||
import net from 'net';
|
||||
import { z } from 'zod';
|
||||
import { clearInterval } from 'timers';
|
||||
import { random } from 'lodash';
|
||||
|
||||
import type { LibSignalError, Net } from '@signalapp/libsignal-client';
|
||||
import { Buffer } from 'node:buffer';
|
||||
@@ -55,9 +53,7 @@ import { SignalService as Proto } from '../protobuf';
|
||||
import * as log from '../logging/log';
|
||||
import * as Timers from '../Timers';
|
||||
import type { IResource } from './WebSocket';
|
||||
import { isProduction } from '../util/version';
|
||||
|
||||
import { ToastType } from '../types/Toast';
|
||||
import { AbortableProcess } from '../util/AbortableProcess';
|
||||
import type { WebAPICredentials } from './Types';
|
||||
import { NORMAL_DISCONNECT_CODE } from './SocketManager';
|
||||
@@ -65,8 +61,6 @@ import { parseUnknown } from '../util/schemas';
|
||||
|
||||
const THIRTY_SECONDS = 30 * durations.SECOND;
|
||||
|
||||
const STATS_UPDATE_INTERVAL = durations.MINUTE;
|
||||
|
||||
const MAX_MESSAGE_SIZE = 512 * 1024;
|
||||
|
||||
const AGGREGATED_STATS_KEY = 'websocketStats';
|
||||
@@ -270,17 +264,6 @@ export type SendRequestResult = Readonly<{
|
||||
export enum TransportOption {
|
||||
// Only original transport is used
|
||||
Original = 'original',
|
||||
// All requests are going through the original transport,
|
||||
// but for every request that completes sucessfully we're initiating
|
||||
// a healthcheck request via libsignal transport,
|
||||
// collecting comparison statistics, and if we see many inconsistencies,
|
||||
// we're showing a toast asking user to submit a debug log
|
||||
ShadowingHigh = 'shadowingHigh',
|
||||
// Similar to `shadowingHigh`, however, only 10% of requests
|
||||
// will trigger a healthcheck, and toast is never shown.
|
||||
// Statistics data is still added to the debug logs,
|
||||
// so it will be available to us with all the debug log uploads.
|
||||
ShadowingLow = 'shadowingLow',
|
||||
// Only libsignal transport is used
|
||||
Libsignal = 'libsignal',
|
||||
}
|
||||
@@ -580,169 +563,6 @@ export class LibsignalWebSocketResource
|
||||
}
|
||||
}
|
||||
|
||||
export class WebSocketResourceWithShadowing implements IWebSocketResource {
|
||||
#shadowing: LibsignalWebSocketResource | undefined;
|
||||
#stats: AggregatedStats;
|
||||
#statsTimer: NodeJS.Timeout;
|
||||
#shadowingWithReporting: boolean;
|
||||
#logId: string;
|
||||
|
||||
constructor(
|
||||
private readonly main: WebSocketResource,
|
||||
private readonly shadowingConnection: AbortableProcess<LibsignalWebSocketResource>,
|
||||
options: WebSocketResourceOptions
|
||||
) {
|
||||
this.#stats = AggregatedStats.createEmpty();
|
||||
this.#logId = `WebSocketResourceWithShadowing(${options.name})`;
|
||||
this.#statsTimer = setInterval(
|
||||
() => this.#updateStats(options.name),
|
||||
STATS_UPDATE_INTERVAL
|
||||
);
|
||||
this.#shadowingWithReporting =
|
||||
options.transportOption === TransportOption.ShadowingHigh;
|
||||
|
||||
// the idea is that we want to keep the shadowing connection process
|
||||
// "in the background", so that the main connection wouldn't need to wait on it.
|
||||
// then when we're connected, `this.shadowing` socket resource is initialized
|
||||
// or an error reported in case of connection failure
|
||||
const initializeAfterConnected = async () => {
|
||||
try {
|
||||
this.#shadowing = await shadowingConnection.resultPromise;
|
||||
// checking IP one time per connection
|
||||
if (this.main.ipVersion() !== this.#shadowing.ipVersion()) {
|
||||
this.#stats.ipVersionMismatches += 1;
|
||||
const mainIpType = this.main.ipVersion();
|
||||
const shadowIpType = this.#shadowing.ipVersion();
|
||||
log.warn(
|
||||
`${this.#logId}: libsignal websocket IP [${shadowIpType}], Desktop websocket IP [${mainIpType}]`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
this.#stats.connectionFailures += 1;
|
||||
}
|
||||
};
|
||||
drop(initializeAfterConnected());
|
||||
|
||||
this.addEventListener('close', (_ev): void => {
|
||||
clearInterval(this.#statsTimer);
|
||||
this.#updateStats(options.name);
|
||||
});
|
||||
}
|
||||
|
||||
#updateStats(name: string) {
|
||||
const storedStats = AggregatedStats.loadOrCreateEmpty(name);
|
||||
let updatedStats = AggregatedStats.add(storedStats, this.#stats);
|
||||
if (
|
||||
this.#shadowingWithReporting &&
|
||||
AggregatedStats.shouldReportError(updatedStats) &&
|
||||
!isProduction(window.getVersion())
|
||||
) {
|
||||
window.reduxActions.toast.showToast({
|
||||
toastType: ToastType.TransportError,
|
||||
});
|
||||
log.warn(
|
||||
`${this.#logId}: experimental transport toast displayed, flushing transport statistics before resetting`,
|
||||
updatedStats
|
||||
);
|
||||
updatedStats = AggregatedStats.createEmpty();
|
||||
updatedStats.lastToastTimestamp = Date.now();
|
||||
}
|
||||
AggregatedStats.store(updatedStats, name);
|
||||
this.#stats = AggregatedStats.createEmpty();
|
||||
}
|
||||
|
||||
public localPort(): number | undefined {
|
||||
return this.main.localPort();
|
||||
}
|
||||
|
||||
public addEventListener(
|
||||
name: 'close',
|
||||
handler: (ev: CloseEvent) => void
|
||||
): void {
|
||||
this.main.addEventListener(name, handler);
|
||||
}
|
||||
|
||||
public close(code = NORMAL_DISCONNECT_CODE, reason?: string): void {
|
||||
this.main.close(code, reason);
|
||||
if (this.#shadowing) {
|
||||
this.#shadowing.close(code, reason);
|
||||
this.#shadowing = undefined;
|
||||
} else {
|
||||
this.shadowingConnection.abort();
|
||||
}
|
||||
}
|
||||
|
||||
public shutdown(): void {
|
||||
this.main.shutdown();
|
||||
if (this.#shadowing) {
|
||||
this.#shadowing.shutdown();
|
||||
this.#shadowing = undefined;
|
||||
} else {
|
||||
this.shadowingConnection.abort();
|
||||
}
|
||||
}
|
||||
|
||||
public forceKeepAlive(timeout?: number): void {
|
||||
this.main.forceKeepAlive(timeout);
|
||||
}
|
||||
|
||||
public async sendRequest(options: SendRequestOptions): Promise<Response> {
|
||||
const responsePromise = this.main.sendRequest(options);
|
||||
const response = await responsePromise;
|
||||
|
||||
// if we're received a response from the main channel and the status was successful,
|
||||
// attempting to run a healthcheck on a libsignal transport.
|
||||
if (
|
||||
isSuccessfulStatusCode(response.status) &&
|
||||
this.#shouldSendShadowRequest()
|
||||
) {
|
||||
drop(this.#sendShadowRequest());
|
||||
}
|
||||
|
||||
return response;
|
||||
}
|
||||
|
||||
async #sendShadowRequest(): Promise<void> {
|
||||
// In the shadowing mode, it could be that we're either
|
||||
// still connecting libsignal websocket or have already closed it.
|
||||
// In those cases we're not running shadowing check.
|
||||
if (!this.#shadowing) {
|
||||
log.info(
|
||||
`${this.#logId}: skipping healthcheck - websocket not connected or already closed`
|
||||
);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const healthCheckResult = await this.#shadowing.sendRequest({
|
||||
verb: 'GET',
|
||||
path: '/v1/keepalive',
|
||||
timeout: KEEPALIVE_TIMEOUT_MS,
|
||||
});
|
||||
this.#stats.requestsCompared += 1;
|
||||
if (!isSuccessfulStatusCode(healthCheckResult.status)) {
|
||||
this.#stats.healthcheckBadStatus += 1;
|
||||
log.warn(
|
||||
`${this.#logId}: keepalive via libsignal responded with status [${healthCheckResult.status}]`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
this.#stats.healthcheckFailures += 1;
|
||||
log.warn(
|
||||
`${this.#logId}: failed to send keepalive via libsignal`,
|
||||
Errors.toLogFormat(error)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#shouldSendShadowRequest(): boolean {
|
||||
return this.#shadowingWithReporting || random(0, 100) < 10;
|
||||
}
|
||||
}
|
||||
|
||||
function isSuccessfulStatusCode(status: number): boolean {
|
||||
return status >= 200 && status < 300;
|
||||
}
|
||||
|
||||
export default class WebSocketResource
|
||||
extends EventTarget
|
||||
implements IWebSocketResource
|
||||
|
Reference in New Issue
Block a user