servo: clightning-sane: implement an autobalance subcommand

This commit is contained in:
2024-01-13 03:04:24 +00:00
parent 6b5cdd7508
commit 103a300e77

View File

@@ -25,6 +25,9 @@ RPC_FILE = "/var/lib/clightning/bitcoin/lightning-rpc"
# set this too low and you might get inadvertent channel closures (?)
CLTV = 18
# for every sequentally failed transaction, delay this much before trying again.
# note that the initial route building process can involve 10-20 "transient" failures, as it discovers dead channels.
TX_FAIL_BACKOFF = 0.8
MAX_SEQUENTIAL_JOB_FAILURES = 200
DROP_CACHES_AFTER_N_ROUTE_FAILURES = 40
@@ -122,19 +125,19 @@ class LocalChannel:
self.peer_ch = rpc.peerchannel(self.scid, self.remote_peer)
def __repr__(self) -> str:
return self.to_str(with_scid=True, with_bal_ratio=True, with_cost=True, with_ppm_theirs=True)
return self.to_str(with_scid=True, with_bal_ratio=True, with_cost=False, with_ppm_theirs=False)
def to_str(self, with_peer_id: bool = False, with_scid: bool=False, with_bal_msat: bool=False, with_bal_ratio: bool=False, with_cost:bool = False, with_ppm_theirs:bool = False, with_ppm_mine:bool = False) -> str:
base_flag = "*" if self.to_me is None or self.to_me["base_fee_millisatoshi"] != 0 else ""
base_flag = "*" if not self.online or self.base_fee_to_me != 0 else ""
alias = f"({self.remote_alias}){base_flag}"
peerid = f" {self.remote_peer}" if with_peer_id else ""
scid = f" scid:{self.scid:>13}" if with_scid else ""
bal = f" S:{int(self.sendable):11}/R:{int(self.receivable):11}" if with_bal_msat else ""
ratio = f" MINE:{(100*self.send_ratio):>8.4f}%" if with_bal_ratio else ""
cost = f" COST:{self.opportunity_cost_lent:>11}" if with_cost else ""
ppm_theirs = self.to_me["fee_per_millionth"] if self.to_me else "N/A"
ppm_theirs = self.ppm_to_me if self.to_me else "N/A"
ppm_theirs = f" PPM_THEIRS:{ppm_theirs:>6}" if with_ppm_theirs else ""
ppm_mine = self.from_me["fee_per_millionth"] if self.from_me else "N/A"
ppm_mine = self.ppm_from_me if self.from_me else "N/A"
ppm_mine = f" PPM_MINE:{ppm_mine:>6}" if with_ppm_mine else ""
return f"channel{alias:30}{peerid}{scid}{bal}{ratio}{cost}{ppm_theirs}{ppm_mine}"
@@ -209,9 +212,18 @@ class LocalChannel:
def delay_me(self) -> str:
return self.from_me["delay"]
@property
def ppm_to_me(self) -> int:
return self.to_me["fee_per_millionth"]
@property
def ppm_from_me(self) -> int:
return self.peer_ch["fee_proportional_millionths"]
return self.from_me["fee_per_millionth"]
# return self.peer_ch["fee_proportional_millionths"]
@property
def base_fee_to_me(self) -> int:
return self.to_me["base_fee_millisatoshi"]
@property
def receivable(self) -> int:
@@ -397,6 +409,9 @@ class LoopJob:
in_: str
amount: int
@dataclass
class LoopJobIdle:
sec: int = 10
class AbstractLoopRunner:
def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int):
@@ -405,7 +420,7 @@ class AbstractLoopRunner:
self.parallelism = parallelism
self.bounds_map = {} # map (out:str, in_:str) -> TxBounds. it's a cache so we don't have to try 10 routes every time.
def pop_job(self) -> LoopJob | None:
def pop_job(self) -> LoopJob | LoopJobIdle | None:
raise NotImplemented # abstract method
def finished_job(self, job: LoopJob, progress: int|LoopError) -> None:
@@ -441,6 +456,11 @@ class AbstractLoopRunner:
logger.debug(f"popped job: {job}")
if job is None: return
if isinstance(job, LoopJobIdle):
logger.debug(f"idling for {job.sec}")
time.sleep(job.sec)
continue
result = self._execute_job(job)
logger.debug(f"finishing job {job} with {result}")
self.finished_job(job, result)
@@ -462,10 +482,9 @@ class AbstractLoopRunner:
self.bounds_map[(job.out, job.in_)] = bounds
return amt_looped
class LoopBalancer(AbstractLoopRunner):
def __init__(self, out: str, in_: str, amount: int, looper: LoopRouter, bounds: TxBounds, parallelism: int=1):
super().__init__(looper, bounds, parallelism)
self.job = LoopJob(out=out, in_=in_, amount=amount)
class LoopPairState:
# TODO: use this in MultiLoopBalancer, or stop shoving state in here and put it on LoopBalancer instead.
def __init__(self, out: str, in_: str, amount: int):
self.out = out
self.in_ = in_
self.amount_target = amount
@@ -473,33 +492,91 @@ class LoopBalancer(AbstractLoopRunner):
self.amount_outstanding = 0
self.tx_fail_count = 0
self.route_fail_count = 0
self.last_job_start_time = None
self.failed_tx_throttler = 0 # increase by one every time we fail, decreases more gradually, when we succeed
def pop_job(self) -> LoopJob | None:
if self.tx_fail_count + self.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES:
class LoopBalancer(AbstractLoopRunner):
def __init__(self, out: str, in_: str, amount: int, looper: LoopRouter, bounds: TxBounds, parallelism: int=1):
super().__init__(looper, bounds, parallelism)
self.state = LoopPairState(out, in_, amount)
def pop_job(self) -> LoopJob | LoopJobIdle | None:
if self.state.tx_fail_count + self.state.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES:
logger.info("too many sequential failures: giving up")
return None
amount_avail = self.amount_target - self.amount_looped - self.amount_outstanding
if amount_avail < self.bounds.min_msat: return None
if self.state.tx_fail_count + self.state.route_fail_count > 0:
# N.B.: last_job_start_time is guaranteed to have been set by now
idle_until = self.last_job_start_time + TX_FAIL_BACKOFF*self.state.failed_tx_throttler
idle_for = idle_until - time.time()
if self.state.amount_outstanding != 0 or idle_for > 0:
# when we hit transient failures, restrict to just one job in flight at a time.
# this is aimed for the initial route building, where multiple jobs in flight is just useless,
# but it's not a bad idea for network blips, etc, either.
logger.info(f"throttling ({idle_for}): {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures")
return LoopJobIdle(idle_for) if idle_for > 0 else LoopJobIdle()
amount_avail = self.state.amount_target - self.state.amount_looped - self.state.amount_outstanding
if amount_avail < self.bounds.min_msat:
if self.state.amount_outstanding == 0: return None # complete!
return LoopJobIdle() # sending out another job would risk over-transferring
amount_this_job = min(amount_avail, self.bounds.max_msat)
self.amount_outstanding += amount_this_job
return LoopJob(out=self.out, in_=self.in_, amount=amount_this_job)
self.state.amount_outstanding += amount_this_job
self.last_job_start_time = time.time()
return LoopJob(out=self.state.out, in_=self.state.in_, amount=amount_this_job)
def finished_job(self, job: LoopJob, progress: int) -> None:
# TODO: drop bad_channels cache and bounds_map cache after so many errors
self.state.amount_outstanding -= job.amount
if progress == LoopError.NO_ROUTE:
self.route_fail_count += 1
if self.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0:
self.state.route_fail_count += 1
if self.state.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0:
self.drop_caches()
elif progress == LoopError.TRANSIENT:
self.tx_fail_count += 1
self.state.tx_fail_count += 1
else:
self.tx_fail_count = 0
self.route_fail_count = 0
self.amount_outstanding -= job.amount
self.amount_looped += progress
logger.info(f"loop progressed {progress}: {self.amount_looped} of {self.amount_target}")
self.state.amount_looped += progress
self.state.tx_fail_count = 0
self.state.route_fail_count = 0
self.state.failed_tx_throttler = max(0, self.state.failed_tx_throttler - 0.2)
logger.info(f"loop progressed {progress}: {self.state.amount_looped} of {self.state.amount_target}")
class MultiLoopBalancer(AbstractLoopRunner):
"""
multiplexes jobs between multiple LoopBalancers.
note that the child LoopBalancers don't actually execute the jobs -- just produce them.
"""
def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int=1):
super().__init__(looper, bounds, parallelism)
self.loops = []
self.job_count = 0 #< increments on every job so we can grab jobs evenly from each LoopBalancer
def add_loop(self, out: LocalChannel, in_: LocalChannel, amount: int) -> None:
"""
start looping sats from out -> in_
"""
assert not any(l.state.out == out.scid and l.state.in_ == in_.scid for l in self.loops), f"tried to add duplicate loops from {out} -> {in_}"
logger.info(f"looping from ({out}) to ({in_})")
self.loops.append(LoopBalancer(out.scid, in_.scid, amount, self.looper, self.bounds, self.parallelism))
def pop_job(self) -> LoopJob | LoopJobIdle | None:
# N.B.: this can be called in parallel, so try to be consistent enough to not crash
self.job_count += 1
idle_job = None
for i, _ in enumerate(self.loops):
loop = self.loops[(self.job_count + i) % len(self.loops)]
job = loop.pop_job()
if isinstance(job, LoopJob): return job
if isinstance(job, LoopJobIdle):
idle_job = LoopJobIdle(min(job.sec, idle_job.sec)) if idle_job is not None else job
return idle_job # could be None, if no jobs can make progress => terminate
def finished_job(self, job: LoopJob, progress: int) -> None:
# this assumes (enforced externally) that we have only one loop for a given out/in_ pair
for l in self.loops:
if l.state.out == job.out and l.state.in_ == job.in_:
l.finished_job(job, progress)
def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat: int, max_msat: int, parallelism: int):
@@ -509,6 +586,30 @@ def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat:
balancer.run_to_completion()
def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int):
looper = LoopRouter(rpc)
bounds = TxBounds(min_msat=min_msat, max_msat=max_msat)
balancer = MultiLoopBalancer(looper, bounds, parallelism)
channels = []
for peerch in rpc.rpc.listpeerchannels()["channels"]:
try:
channels.append(rpc.localchannel(peerch["short_channel_id"]))
except:
logger.info(f"NO CHANNELS for {peerch['peer_id']}")
channels = [ch for ch in channels if ch.online and ch.base_fee_to_me == 0]
give_to = [ ch for ch in channels if ch.send_ratio > 0.95 ]
take_from = [ ch for ch in channels if ch.send_ratio < 0.20 ]
for to in give_to:
for from_ in take_from:
balancer.add_loop(to, from_, 10000000)
# TODO: run until all jobs have errored (completion) or ONE job has completed,
# then repeat this whole thing; channel balances could have changed.
balancer.run_to_completion()
def show_status(rpc: RpcHelper, full: bool=False):
"""
show a table of channel balances between peers.
@@ -527,6 +628,9 @@ def main():
parser = argparse.ArgumentParser(description="rebalance lightning channel balances")
parser.add_argument("--verbose", action="store_true", help="more logging")
parser.add_argument("--min-msat", default="999", help="min transaction size")
parser.add_argument("--max-msat", default="1000000", help="max transaction size")
parser.add_argument("--jobs", default="1", help="how many HTLCs to keep in-flight at once")
subparsers = parser.add_subparsers(help="action")
status_parser = subparsers.add_parser("status")
@@ -538,9 +642,9 @@ def main():
loop_parser.add_argument("out", help="peer id to send tx through")
loop_parser.add_argument("in_", help="peer id to receive tx through")
loop_parser.add_argument("amount", help="total amount of msat to loop")
loop_parser.add_argument("--min-msat", default="999", help="min transaction size")
loop_parser.add_argument("--max-msat", default="1000000", help="max transaction size")
loop_parser.add_argument("--jobs", default="1", help="how many HTLCs to keep in-flight at once")
autobal_parser = subparsers.add_parser("autobalance")
autobal_parser.set_defaults(action="autobalance")
args = parser.parse_args()
@@ -555,5 +659,8 @@ def main():
if args.action == "loop":
balance_loop(rpc, out=args.out, in_=args.in_, amount_msat=int(args.amount), min_msat=int(args.min_msat), max_msat=int(args.max_msat), parallelism=int(args.jobs))
if args.action == "autobalance":
autobalance(rpc, min_msat=int(args.min_msat), max_msat=int(args.max_msat), parallelism=int(args.jobs))
if __name__ == '__main__':
main()