servo: clightning: initialize a script for rebalancing with peers

This commit is contained in:
2024-01-11 23:11:33 +00:00
parent e2a43ddfa0
commit 432a66bf5f

View File

@@ -0,0 +1,316 @@
#!/usr/bin/env nix-shell
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ps.pyln-client ])"
# pyln-client docs: <https://github.com/ElementsProject/lightning/tree/master/contrib/pyln-client>
# terminology:
# - "scid": "Short Channel ID", e.g. 123456x7890x0
# from this id, we can locate the actual channel, its peers, and its parameters
import argparse
import logging
import math
import sys
import time
from enum import Enum
from pyln.client import LightningRpc, Millisatoshi, RpcError
logger = logging.getLogger(__name__)
RPC_FILE = "/var/lib/clightning/bitcoin/lightning-rpc"
# CLTV (HLTC delta) of the final hop
CLTV = 9
class RebalanceResult(Enum):
SUCCESS = "SUCCESS"
FAIL_TEMPORARY = "FAIL_TEMPORARY"
FAIL_PERMANENT = "FAIL_PERMANENT"
class RouteError(Enum):
HAS_BASE_FEE = "HAS_BASE_FEE"
NO_ROUTE = "NO_ROUTE"
class LocalChannel:
def __init__(self, channels: list, self_id: str):
assert len(channels) == 2, f"unexpected: more than 2 channels: {channels}"
out = None
in_ = None
for c in channels:
if c["source"] == self_id:
assert out is None, f"unexpected: multiple channels from self: {channels}"
out = c
if c["destination"] == self_id:
assert in_ is None, f"unexpected: multiple channels to self: {channels}"
in_ = c
assert out is not None, f"no channel from self: {channels}"
assert in_ is not None, f"no channel to self: {channels}"
assert out["destination"] == in_["source"], f"channel peers are asymmetric?! {channels}"
assert out["short_channel_id"] == in_["short_channel_id"], f"channel ids differ?! {channels}"
self.from_me = out
self.to_me = in_
@property
def remote_peer(self) -> str:
return self.from_me["destination"]
@property
def scid(self) -> str:
return self.from_me["short_channel_id"]
@property
def htlc_minimum_msat(self) -> Millisatoshi:
return max(self.from_me["htlc_minimum_msat"], self.to_me["htlc_minimum_msat"])
@property
def htlc_maximum_msat(self) -> Millisatoshi:
return min(self.from_me["htlc_maximum_msat"], self.to_me["htlc_maximum_msat"])
@property
def direction_to_me(self) -> int:
return self.to_me["direction"]
@property
def direction_from_me(self) -> int:
return self.from_me["direction"]
@property
def directed_scid_to_me(self) -> str:
scid, dir = self.to_me["short_channel_id"], self.direction_to_me
return f"{scid}/{dir}"
@property
def directed_scid_from_me(self) -> str:
scid, dir = self.from_me["short_channel_id"], self.direction_from_me
return f"{scid}/{dir}"
@property
def delay_them(self) -> str:
return self.to_me["delay"]
@property
def delay_me(self) -> str:
return self.from_me["delay"]
class Balancer:
def __init__(self, rpc: LightningRpc):
self.rpc = rpc
self.self_id = rpc.getinfo()["id"]
self.bad_channels = [] # list of directed scid
self.nonzero_base_channels = [] # list of directed scid
def _localchannel(self, scid: str) -> LocalChannel:
return LocalChannel(self.rpc.listchannels(scid)["channels"], self.self_id)
def _get_directed_scid(self, scid: str, direction: int) -> dict:
channels = self.rpc.listchannels(scid)["channels"]
channels = [c for c in channels if c["direction"] == direction]
assert len(channels) == 1, f"expected exactly 1 channel: {channels}"
return channels[0]
def balance_once_with_retries(self, out_scid: str, in_scid: str, min_tx_msat: int, max_tx_msat: int, retries: int = 20) -> None:
for i in range(retries):
if i != 0:
logger.info(f"retrying rebalance: {i} of {retries}\n")
res = self.balance_once(out_scid, in_scid, min_tx_msat, max_tx_msat)
if res == RebalanceResult.SUCCESS:
logger.info(f"rebalanced once with success {out_scid} -> {in_scid}")
break
if res == RebalanceResult.FAIL_PERMANENT:
logger.info(f"rebalance {out_scid} -> {in_scid} is impossible (likely no route)")
break
else:
logger.info(f"failed to rebalance {out_scid} -> {in_scid} within {retries} attempts")
def balance_once(self, out_scid: str, in_scid: str, min_tx_msat: int, max_tx_msat: int) -> None:
out_ch = self._localchannel(out_scid)
in_ch = self._localchannel(in_scid)
if out_ch.directed_scid_from_me in self.bad_channels or in_ch.directed_scid_to_me in self.bad_channels:
logger.info(f"rebalance {out_scid} -> {in_scid} failed in our own channel")
return RebalanceResult.FAIL_PERMANENT
tx_bounds = self._bound_tx_size(out_ch, in_ch, min_tx_msat, max_tx_msat)
if tx_bounds is None:
return RebalanceResult.FAIL_PERMANENT # no valid bounds
min_tx_msat, max_tx_msat = tx_bounds
route = self.route(out_ch, in_ch, min_tx_msat, max_tx_msat)
logger.debug(f"route: {route}")
if route == RouteError.NO_ROUTE:
return RebalanceResult.FAIL_PERMANENT
elif route == RouteError.HAS_BASE_FEE:
# try again with a different route
return RebalanceResult.FAIL_TEMPORARY
amount_msat = route[0]["amount_msat"]
invoice_id = f"rebalance-{time.time():.6f}".replace(".", "_")
invoice_desc = f"bal {out_scid}:{in_scid}"
invoice = self.rpc.invoice("any", invoice_id, invoice_desc)
logger.debug(f"invoice: {invoice}")
payment = self.rpc.sendpay(route, invoice["payment_hash"], invoice_id, amount_msat, invoice["bolt11"], invoice["payment_secret"])
logger.debug(f"sent: {payment}")
try:
wait = self.rpc.waitsendpay(invoice["payment_hash"])
logger.debug(f"result: {wait}")
except RpcError as e:
err_data = e.error["data"]
err_scid, err_dir = err_data["erring_channel"], err_data["erring_direction"]
err_directed_scid = f"{err_scid}/{err_dir}"
logger.debug(f"ch failed, adding to excludes: {err_directed_scid}; {e.error}")
self.bad_channels.append(err_directed_scid)
return RebalanceResult.FAIL_TEMPORARY
else:
return RebalanceResult.SUCCESS
def _bound_tx_size(self, out_ch: LocalChannel, in_ch: LocalChannel, min_tx_msat: int, max_tx_msat: int) -> tuple[int, int] | None:
# don't even try to route if the channels advertise to not support our request
min_, max_ = min_tx_msat, max_tx_msat
min_tx_msat = max(min_tx_msat, out_ch.htlc_minimum_msat)
min_tx_msat = max(min_tx_msat, in_ch.htlc_minimum_msat)
max_tx_msat = min(max_tx_msat, out_ch.htlc_maximum_msat)
max_tx_msat = min(max_tx_msat, in_ch.htlc_maximum_msat)
if min_ != min_tx_msat:
logger.debug(f"increased min_tx_msat due to route requirements: {min_} -> {min_tx_msat}")
if max_ != max_tx_msat:
logger.debug(f"decreased max_tx_msat due to route requirements: {max_} -> {max_tx_msat}")
if in_ch.to_me["base_fee_millisatoshi"] != 0:
logger.info(f"aborting route because inbound requires base fees")
return None
per_mili = in_ch.to_me["fee_per_millionth"]
if per_mili != 0:
new_max = math.ceil(1000000 / per_mili) - 1
if new_max < max_tx_msat:
logger.debug(f"decreased max_tx_msat due to inbound fee ppm: {max_tx_msat} -> {new_max}")
max_tx_msat = new_max
if min_tx_msat > max_tx_msat:
logger.info(f"aborting route because of conflicting HTLC min/max requirements ({min_tx_msat} > {max_tx_msat})")
return None
return min_tx_msat, max_tx_msat
def route(self, out_ch: LocalChannel, in_ch: LocalChannel, min_tx_msat: int, max_tx_msat: int) -> list[dict] | RouteError:
exclude = [
# ensure the payment doesn't cross either channel in reverse.
# note that this doesn't preclude it from taking additional trips through self, with other peers.
# out_ch.directed_scid_to_me,
# in_ch.directed_scid_from_me,
# alternatively, never route through self. this avoids a class of logic error, like what to do with fees i charge "myself".
self.self_id
] + self.bad_channels + self.nonzero_base_channels
out_peer = out_ch.remote_peer
in_peer = in_ch.remote_peer
route_or_max_tx = self._find_partial_route(out_peer, in_peer, max_tx_msat, exclude=exclude)
while isinstance(route_or_max_tx, int):
logger.debug(f"max feeless tx: {route_or_max_tx}")
try_again_msat = max(min_tx_msat, route_or_max_tx)
if try_again_msat == route_or_max_tx:
return RouteError.NO_ROUTE
# due to per-channel HTLC size requirements, we have to try again and we'll maybe get a different route
route_or_max_tx = self._find_partial_route(out_peer, in_peer, try_again_msat, exclude=exclude)
route = route_or_max_tx
if isinstance(route, RouteError):
return route
route = self._add_route_endpoints(route, out_ch, in_ch)
return route
def _find_partial_route(self, out_peer: str, in_peer: str, tx_msat: int, exclude: list[str]=[]) -> list[dict] | RouteError | int:
route = self.rpc.getroute(in_peer, amount_msat=tx_msat, riskfactor=0, fromid=out_peer, exclude=exclude, cltv=CLTV)
route = route["route"]
if route == []:
logger.debug(f"no route for {tx_msat}msat {out_peer} -> {in_peer}")
return RouteError.NO_ROUTE
send_msat = route[0]["amount_msat"]
if send_msat != Millisatoshi(tx_msat):
logger.debug(f"found route with non-zero fee: {send_msat} -> {tx_msat}. {route}")
return self._max_feeless_tx_for_route(route)
return route
def _max_feeless_tx_for_route(self, route: list[dict]) -> int|None:
has_base_fee = False
max_fee_per_mili = 0
for hop in route:
hop_scid = hop["channel"]
hop_dir = hop["direction"]
ch = self._get_directed_scid(hop_scid, hop_dir)
feebase = ch["base_fee_millisatoshi"]
if feebase:
has_base_fee = True
self.nonzero_base_channels.append(f"{hop_scid}/{hop_dir}")
per_mili = ch["fee_per_millionth"]
max_fee_per_mili = max(max_fee_per_mili, per_mili)
if has_base_fee:
return RouteError.HAS_BASE_FEE
if max_fee_per_mili == 0:
return int(route[0]["amount_msat"]) # no practical limit
return math.ceil(1000000 / max_fee_per_mili) - 1
def _add_route_endpoints(self, route, out_ch: LocalChannel, in_ch: LocalChannel):
inbound_hop = dict(
id=self.self_id,
channel=in_ch.scid,
direction=in_ch.direction_to_me,
amount_msat=route[-1]["amount_msat"],
delay=route[-1]["delay"],
style="tlv",
)
route = self._add_route_delay(route, in_ch.delay_them) + [ inbound_hop ]
outbound_hop = dict(
id=out_ch.remote_peer,
channel=out_ch.scid,
direction=out_ch.direction_from_me,
amount_msat=route[0]["amount_msat"],
delay=route[0]["delay"] + out_ch.delay_them,
style="tlv",
)
route = [ outbound_hop ] + route
return route
def _add_route_delay(self, route: list[dict], delay: int) -> list[dict]:
return [ dict(hop, delay=hop["delay"] + delay) for hop in route ]
def main():
logging.basicConfig()
logger.setLevel(logging.INFO)
parser = argparse.ArgumentParser(description="rebalance lightning channel balances")
parser.add_argument("out", help="peer id to send tx through")
parser.add_argument("in_", help="peer id to receive tx through")
parser.add_argument("--verbose", action="store_true", help="more logging")
parser.add_argument("--min-msat", default="999", help="min to rebalance")
parser.add_argument("--max-msat", default="1000000", help="max to rebalance")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
rpc = LightningRpc(RPC_FILE)
balancer = Balancer(rpc)
balancer.balance_once_with_retries(args.out, args.in_, int(args.min_msat), int(args.max_msat))
if __name__ == '__main__':
main()