From a2c7ac0df98c0f84de9379ae62235e67d4425163 Mon Sep 17 00:00:00 2001 From: lilia Date: Mon, 19 Oct 2015 13:52:44 -0700 Subject: [PATCH] Serialize message sending per-recipient Add a pendingMessages object to MessageSender. This object holds one promise per recipient number. We init this promise with Promise.resolve(), and chain on promises for message sending, replacing the previous promise with the newly chained promise each time. If the current promise resolves and finds that it is still the last promise in the chain, it removes itself. --- js/libtextsecure.js | 31 +++++++++++++++++++++++-------- libtextsecure/sendmessage.js | 31 +++++++++++++++++++++++-------- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/js/libtextsecure.js b/js/libtextsecure.js index 749d2e803..c13c4b5ed 100644 --- a/js/libtextsecure.js +++ b/js/libtextsecure.js @@ -39634,6 +39634,7 @@ textsecure.MessageReceiver.prototype = { */ function MessageSender(url, username, password) { this.server = new TextSecureServer(url, username, password); + this.pendingMessages = {}; } MessageSender.prototype = { @@ -39736,7 +39737,7 @@ MessageSender.prototype = { return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { if (devicesForNumber.length == 0) return registerError(number, "Got empty device list when loading device keys", null); - doSendMessage(number, devicesForNumber, recurse); + return doSendMessage(number, devicesForNumber, recurse); }); } }; @@ -39795,9 +39796,9 @@ MessageSender.prototype = { })); } - p.then(function() { + return p.then(function() { var resetDevices = ((error.code == 410) ? error.response.staleDevices : error.response.missingDevices); - getKeysForNumber(number, resetDevices) + return getKeysForNumber(number, resetDevices) .then(reloadDevicesAndSend(number, (error.code == 409))) .catch(function(error) { registerError(number, "Failed to reload device keys", error); @@ -39809,8 +39810,8 @@ MessageSender.prototype = { }); }.bind(this); - numbers.forEach(function(number) { - textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { + function sendToNumber(number) { + return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { return Promise.all(devicesForNumber.map(function(device) { return textsecure.protocol_wrapper.hasOpenSession(device.encodedNumber).then(function(result) { if (!result) @@ -39819,17 +39820,31 @@ MessageSender.prototype = { })).then(function() { return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { if (devicesForNumber.length == 0) { - getKeysForNumber(number, [1]) + return getKeysForNumber(number, [1]) .then(reloadDevicesAndSend(number, true)) .catch(function(error) { registerError(number, "Failed to retreive new device keys for number " + number, error); }); } else - doSendMessage(number, devicesForNumber, true); + return doSendMessage(number, devicesForNumber, true); }); }); }); - }); + } + + numbers.forEach(function(number) { + var sendPrevious = this.pendingMessages[number] || Promise.resolve(); + var sendCurrent = this.pendingMessages[number] = sendPrevious.then(function() { + return sendToNumber(number); + }).catch(function() { + return sendToNumber(number); + }); + sendCurrent.then(function() { + if (this.pendingMessages[number] === sendCurrent) { + delete this.pendingMessages[number]; + } + }.bind(this)); + }.bind(this)); }, sendIndividualProto: function(number, proto, timestamp) { diff --git a/libtextsecure/sendmessage.js b/libtextsecure/sendmessage.js index 514a2c12f..9ef7eda2f 100644 --- a/libtextsecure/sendmessage.js +++ b/libtextsecure/sendmessage.js @@ -3,6 +3,7 @@ */ function MessageSender(url, username, password) { this.server = new TextSecureServer(url, username, password); + this.pendingMessages = {}; } MessageSender.prototype = { @@ -105,7 +106,7 @@ MessageSender.prototype = { return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { if (devicesForNumber.length == 0) return registerError(number, "Got empty device list when loading device keys", null); - doSendMessage(number, devicesForNumber, recurse); + return doSendMessage(number, devicesForNumber, recurse); }); } }; @@ -164,9 +165,9 @@ MessageSender.prototype = { })); } - p.then(function() { + return p.then(function() { var resetDevices = ((error.code == 410) ? error.response.staleDevices : error.response.missingDevices); - getKeysForNumber(number, resetDevices) + return getKeysForNumber(number, resetDevices) .then(reloadDevicesAndSend(number, (error.code == 409))) .catch(function(error) { registerError(number, "Failed to reload device keys", error); @@ -178,8 +179,8 @@ MessageSender.prototype = { }); }.bind(this); - numbers.forEach(function(number) { - textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { + function sendToNumber(number) { + return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { return Promise.all(devicesForNumber.map(function(device) { return textsecure.protocol_wrapper.hasOpenSession(device.encodedNumber).then(function(result) { if (!result) @@ -188,17 +189,31 @@ MessageSender.prototype = { })).then(function() { return textsecure.storage.devices.getDeviceObjectsForNumber(number).then(function(devicesForNumber) { if (devicesForNumber.length == 0) { - getKeysForNumber(number, [1]) + return getKeysForNumber(number, [1]) .then(reloadDevicesAndSend(number, true)) .catch(function(error) { registerError(number, "Failed to retreive new device keys for number " + number, error); }); } else - doSendMessage(number, devicesForNumber, true); + return doSendMessage(number, devicesForNumber, true); }); }); }); - }); + } + + numbers.forEach(function(number) { + var sendPrevious = this.pendingMessages[number] || Promise.resolve(); + var sendCurrent = this.pendingMessages[number] = sendPrevious.then(function() { + return sendToNumber(number); + }).catch(function() { + return sendToNumber(number); + }); + sendCurrent.then(function() { + if (this.pendingMessages[number] === sendCurrent) { + delete this.pendingMessages[number]; + } + }.bind(this)); + }.bind(this)); }, sendIndividualProto: function(number, proto, timestamp) {