From 103a300e77fa961c2436a0bddc483cf53f184763 Mon Sep 17 00:00:00 2001 From: Colin Date: Sat, 13 Jan 2024 03:04:24 +0000 Subject: [PATCH] servo: clightning-sane: implement an autobalance subcommand --- .../clightning-sane/clightning-sane | 163 +++++++++++++++--- 1 file changed, 135 insertions(+), 28 deletions(-) diff --git a/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane b/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane index 2ba81b938..b9d2b2bfd 100755 --- a/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane +++ b/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane @@ -25,6 +25,9 @@ RPC_FILE = "/var/lib/clightning/bitcoin/lightning-rpc" # set this too low and you might get inadvertent channel closures (?) CLTV = 18 +# for every sequentally failed transaction, delay this much before trying again. +# note that the initial route building process can involve 10-20 "transient" failures, as it discovers dead channels. +TX_FAIL_BACKOFF = 0.8 MAX_SEQUENTIAL_JOB_FAILURES = 200 DROP_CACHES_AFTER_N_ROUTE_FAILURES = 40 @@ -122,19 +125,19 @@ class LocalChannel: self.peer_ch = rpc.peerchannel(self.scid, self.remote_peer) def __repr__(self) -> str: - return self.to_str(with_scid=True, with_bal_ratio=True, with_cost=True, with_ppm_theirs=True) + return self.to_str(with_scid=True, with_bal_ratio=True, with_cost=False, with_ppm_theirs=False) def to_str(self, with_peer_id: bool = False, with_scid: bool=False, with_bal_msat: bool=False, with_bal_ratio: bool=False, with_cost:bool = False, with_ppm_theirs:bool = False, with_ppm_mine:bool = False) -> str: - base_flag = "*" if self.to_me is None or self.to_me["base_fee_millisatoshi"] != 0 else "" + base_flag = "*" if not self.online or self.base_fee_to_me != 0 else "" alias = f"({self.remote_alias}){base_flag}" peerid = f" {self.remote_peer}" if with_peer_id else "" scid = f" scid:{self.scid:>13}" if with_scid else "" bal = f" S:{int(self.sendable):11}/R:{int(self.receivable):11}" if with_bal_msat else "" ratio = f" MINE:{(100*self.send_ratio):>8.4f}%" if with_bal_ratio else "" cost = f" COST:{self.opportunity_cost_lent:>11}" if with_cost else "" - ppm_theirs = self.to_me["fee_per_millionth"] if self.to_me else "N/A" + ppm_theirs = self.ppm_to_me if self.to_me else "N/A" ppm_theirs = f" PPM_THEIRS:{ppm_theirs:>6}" if with_ppm_theirs else "" - ppm_mine = self.from_me["fee_per_millionth"] if self.from_me else "N/A" + ppm_mine = self.ppm_from_me if self.from_me else "N/A" ppm_mine = f" PPM_MINE:{ppm_mine:>6}" if with_ppm_mine else "" return f"channel{alias:30}{peerid}{scid}{bal}{ratio}{cost}{ppm_theirs}{ppm_mine}" @@ -209,9 +212,18 @@ class LocalChannel: def delay_me(self) -> str: return self.from_me["delay"] + @property + def ppm_to_me(self) -> int: + return self.to_me["fee_per_millionth"] + @property def ppm_from_me(self) -> int: - return self.peer_ch["fee_proportional_millionths"] + return self.from_me["fee_per_millionth"] + # return self.peer_ch["fee_proportional_millionths"] + + @property + def base_fee_to_me(self) -> int: + return self.to_me["base_fee_millisatoshi"] @property def receivable(self) -> int: @@ -397,6 +409,9 @@ class LoopJob: in_: str amount: int +@dataclass +class LoopJobIdle: + sec: int = 10 class AbstractLoopRunner: def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int): @@ -405,7 +420,7 @@ class AbstractLoopRunner: self.parallelism = parallelism self.bounds_map = {} # map (out:str, in_:str) -> TxBounds. it's a cache so we don't have to try 10 routes every time. - def pop_job(self) -> LoopJob | None: + def pop_job(self) -> LoopJob | LoopJobIdle | None: raise NotImplemented # abstract method def finished_job(self, job: LoopJob, progress: int|LoopError) -> None: @@ -441,6 +456,11 @@ class AbstractLoopRunner: logger.debug(f"popped job: {job}") if job is None: return + if isinstance(job, LoopJobIdle): + logger.debug(f"idling for {job.sec}") + time.sleep(job.sec) + continue + result = self._execute_job(job) logger.debug(f"finishing job {job} with {result}") self.finished_job(job, result) @@ -462,10 +482,9 @@ class AbstractLoopRunner: self.bounds_map[(job.out, job.in_)] = bounds return amt_looped -class LoopBalancer(AbstractLoopRunner): - def __init__(self, out: str, in_: str, amount: int, looper: LoopRouter, bounds: TxBounds, parallelism: int=1): - super().__init__(looper, bounds, parallelism) - self.job = LoopJob(out=out, in_=in_, amount=amount) +class LoopPairState: + # TODO: use this in MultiLoopBalancer, or stop shoving state in here and put it on LoopBalancer instead. + def __init__(self, out: str, in_: str, amount: int): self.out = out self.in_ = in_ self.amount_target = amount @@ -473,33 +492,91 @@ class LoopBalancer(AbstractLoopRunner): self.amount_outstanding = 0 self.tx_fail_count = 0 self.route_fail_count = 0 + self.last_job_start_time = None + self.failed_tx_throttler = 0 # increase by one every time we fail, decreases more gradually, when we succeed - def pop_job(self) -> LoopJob | None: - if self.tx_fail_count + self.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: +class LoopBalancer(AbstractLoopRunner): + def __init__(self, out: str, in_: str, amount: int, looper: LoopRouter, bounds: TxBounds, parallelism: int=1): + super().__init__(looper, bounds, parallelism) + self.state = LoopPairState(out, in_, amount) + + def pop_job(self) -> LoopJob | LoopJobIdle | None: + if self.state.tx_fail_count + self.state.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: logger.info("too many sequential failures: giving up") return None - amount_avail = self.amount_target - self.amount_looped - self.amount_outstanding - if amount_avail < self.bounds.min_msat: return None + if self.state.tx_fail_count + self.state.route_fail_count > 0: + # N.B.: last_job_start_time is guaranteed to have been set by now + idle_until = self.last_job_start_time + TX_FAIL_BACKOFF*self.state.failed_tx_throttler + idle_for = idle_until - time.time() + if self.state.amount_outstanding != 0 or idle_for > 0: + # when we hit transient failures, restrict to just one job in flight at a time. + # this is aimed for the initial route building, where multiple jobs in flight is just useless, + # but it's not a bad idea for network blips, etc, either. + logger.info(f"throttling ({idle_for}): {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures") + return LoopJobIdle(idle_for) if idle_for > 0 else LoopJobIdle() + + amount_avail = self.state.amount_target - self.state.amount_looped - self.state.amount_outstanding + if amount_avail < self.bounds.min_msat: + if self.state.amount_outstanding == 0: return None # complete! + return LoopJobIdle() # sending out another job would risk over-transferring amount_this_job = min(amount_avail, self.bounds.max_msat) - self.amount_outstanding += amount_this_job - return LoopJob(out=self.out, in_=self.in_, amount=amount_this_job) + self.state.amount_outstanding += amount_this_job + self.last_job_start_time = time.time() + return LoopJob(out=self.state.out, in_=self.state.in_, amount=amount_this_job) def finished_job(self, job: LoopJob, progress: int) -> None: - # TODO: drop bad_channels cache and bounds_map cache after so many errors + self.state.amount_outstanding -= job.amount if progress == LoopError.NO_ROUTE: - self.route_fail_count += 1 - if self.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0: + self.state.route_fail_count += 1 + if self.state.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0: self.drop_caches() elif progress == LoopError.TRANSIENT: - self.tx_fail_count += 1 + self.state.tx_fail_count += 1 else: - self.tx_fail_count = 0 - self.route_fail_count = 0 - self.amount_outstanding -= job.amount - self.amount_looped += progress - logger.info(f"loop progressed {progress}: {self.amount_looped} of {self.amount_target}") + self.state.amount_looped += progress + self.state.tx_fail_count = 0 + self.state.route_fail_count = 0 + self.state.failed_tx_throttler = max(0, self.state.failed_tx_throttler - 0.2) + logger.info(f"loop progressed {progress}: {self.state.amount_looped} of {self.state.amount_target}") + +class MultiLoopBalancer(AbstractLoopRunner): + """ + multiplexes jobs between multiple LoopBalancers. + note that the child LoopBalancers don't actually execute the jobs -- just produce them. + """ + def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int=1): + super().__init__(looper, bounds, parallelism) + self.loops = [] + self.job_count = 0 #< increments on every job so we can grab jobs evenly from each LoopBalancer + + def add_loop(self, out: LocalChannel, in_: LocalChannel, amount: int) -> None: + """ + start looping sats from out -> in_ + """ + assert not any(l.state.out == out.scid and l.state.in_ == in_.scid for l in self.loops), f"tried to add duplicate loops from {out} -> {in_}" + logger.info(f"looping from ({out}) to ({in_})") + self.loops.append(LoopBalancer(out.scid, in_.scid, amount, self.looper, self.bounds, self.parallelism)) + + def pop_job(self) -> LoopJob | LoopJobIdle | None: + # N.B.: this can be called in parallel, so try to be consistent enough to not crash + self.job_count += 1 + + idle_job = None + for i, _ in enumerate(self.loops): + loop = self.loops[(self.job_count + i) % len(self.loops)] + job = loop.pop_job() + if isinstance(job, LoopJob): return job + if isinstance(job, LoopJobIdle): + idle_job = LoopJobIdle(min(job.sec, idle_job.sec)) if idle_job is not None else job + return idle_job # could be None, if no jobs can make progress => terminate + + def finished_job(self, job: LoopJob, progress: int) -> None: + # this assumes (enforced externally) that we have only one loop for a given out/in_ pair + for l in self.loops: + if l.state.out == job.out and l.state.in_ == job.in_: + l.finished_job(job, progress) def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat: int, max_msat: int, parallelism: int): @@ -509,6 +586,30 @@ def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat: balancer.run_to_completion() +def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int): + looper = LoopRouter(rpc) + bounds = TxBounds(min_msat=min_msat, max_msat=max_msat) + balancer = MultiLoopBalancer(looper, bounds, parallelism) + + channels = [] + for peerch in rpc.rpc.listpeerchannels()["channels"]: + try: + channels.append(rpc.localchannel(peerch["short_channel_id"])) + except: + logger.info(f"NO CHANNELS for {peerch['peer_id']}") + + channels = [ch for ch in channels if ch.online and ch.base_fee_to_me == 0] + give_to = [ ch for ch in channels if ch.send_ratio > 0.95 ] + take_from = [ ch for ch in channels if ch.send_ratio < 0.20 ] + + for to in give_to: + for from_ in take_from: + balancer.add_loop(to, from_, 10000000) + + # TODO: run until all jobs have errored (completion) or ONE job has completed, + # then repeat this whole thing; channel balances could have changed. + balancer.run_to_completion() + def show_status(rpc: RpcHelper, full: bool=False): """ show a table of channel balances between peers. @@ -527,6 +628,9 @@ def main(): parser = argparse.ArgumentParser(description="rebalance lightning channel balances") parser.add_argument("--verbose", action="store_true", help="more logging") + parser.add_argument("--min-msat", default="999", help="min transaction size") + parser.add_argument("--max-msat", default="1000000", help="max transaction size") + parser.add_argument("--jobs", default="1", help="how many HTLCs to keep in-flight at once") subparsers = parser.add_subparsers(help="action") status_parser = subparsers.add_parser("status") @@ -538,9 +642,9 @@ def main(): loop_parser.add_argument("out", help="peer id to send tx through") loop_parser.add_argument("in_", help="peer id to receive tx through") loop_parser.add_argument("amount", help="total amount of msat to loop") - loop_parser.add_argument("--min-msat", default="999", help="min transaction size") - loop_parser.add_argument("--max-msat", default="1000000", help="max transaction size") - loop_parser.add_argument("--jobs", default="1", help="how many HTLCs to keep in-flight at once") + + autobal_parser = subparsers.add_parser("autobalance") + autobal_parser.set_defaults(action="autobalance") args = parser.parse_args() @@ -555,5 +659,8 @@ def main(): if args.action == "loop": balance_loop(rpc, out=args.out, in_=args.in_, amount_msat=int(args.amount), min_msat=int(args.min_msat), max_msat=int(args.max_msat), parallelism=int(args.jobs)) + if args.action == "autobalance": + autobalance(rpc, min_msat=int(args.min_msat), max_msat=int(args.max_msat), parallelism=int(args.jobs)) + if __name__ == '__main__': main()