From 99858c1384d85303b8f56bf436cf2a7a28fe8208 Mon Sep 17 00:00:00 2001 From: Colin Date: Sat, 13 Jan 2024 04:47:20 +0000 Subject: [PATCH] servo: clightning-sane: centralize metric reporting, fix so we blacklist our own channels less frequently --- .../clightning-sane/clightning-sane | 148 +++++++++++++----- 1 file changed, 111 insertions(+), 37 deletions(-) diff --git a/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane b/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane index b9d2b2bf..0d7db826 100755 --- a/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane +++ b/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane @@ -29,7 +29,6 @@ CLTV = 18 # note that the initial route building process can involve 10-20 "transient" failures, as it discovers dead channels. TX_FAIL_BACKOFF = 0.8 MAX_SEQUENTIAL_JOB_FAILURES = 200 -DROP_CACHES_AFTER_N_ROUTE_FAILURES = 40 class LoopError(Enum): """ error when trying to loop sats, or when unable to calculate a route for the loop """ @@ -41,6 +40,17 @@ class RouteError(Enum): HAS_BASE_FEE = "HAS_BASE_FEE" NO_ROUTE = "NO_ROUTE" +class Metrics: + looped_msat: int = 0 + sendpay_fail: int = 0 + sendpay_succeed: int = 0 + own_bad_channel: int = 0 + no_route: int = 0 + in_ch_unsatisfiable: int = 0 + + def __repr__(self) -> str: + return f"looped:{self.looped_msat}, tx:{self.sendpay_succeed}, tx_fail:{self.sendpay_fail}, own_bad_ch:{self.own_bad_channel}, no_route:{self.no_route}, in_ch_restricted:{self.in_ch_unsatisfiable}" + @dataclass class TxBounds: max_msat: int @@ -52,6 +62,12 @@ class TxBounds: def is_satisfiable(self) -> bool: return self.min_msat <= self.max_msat + def raise_max_to_be_satisfiable(self) -> "Self": + if self.max_msat < self.min_msat: + logger.debug(f"raising max_msat to be consistent: {self.max_msat} -> {self.min_msat}") + return TxBounds(self.min_msat, self.min_msat) + return TxBounds(min_msat=self.min_msat, max_msat=self.max_msat) + def intersect(self, other: "TxBounds") -> "Self": return TxBounds( min_msat=max(self.min_msat, other.min_msat), @@ -265,9 +281,22 @@ class RpcHelper: assert len(channels) == 1, f"expected exactly 1 channel, got: {channels}" return channels[0] + def try_getroute(self, *args, **kwargs) -> dict | None: + """ wrapper for getroute which returns None instead of error if no route exists """ + try: + route = self.rpc.getroute(*args, **kwargs) + except RpcError as e: + logger.debug(f"rpc failed: {e}") + return None + else: + route = route["route"] + if route == []: return None + return route + class LoopRouter: - def __init__(self, rpc: RpcHelper): + def __init__(self, rpc: RpcHelper, metrics: Metrics = None): self.rpc = rpc + self.metrics = metrics or Metrics() self.bad_channels = [] # list of directed scid self.nonzero_base_channels = [] # list of directed scid @@ -287,18 +316,21 @@ class LoopRouter: if out_ch.directed_scid_from_me in self.bad_channels or in_ch.directed_scid_to_me in self.bad_channels: logger.info(f"loop {out_scid} -> {in_scid} failed in our own channel") + self.metrics.own_bad_channel += 1 return LoopError.TRANSIENT # bounds = bounds.restrict_to_htlc(out_ch) # htlc bounds seem to be enforced only in the outward direction bounds = bounds.restrict_to_htlc(in_ch) bounds = bounds.restrict_to_zero_fees(in_ch) if not bounds.is_satisfiable(): + self.metrics.in_ch_unsatisfiable += 1 return LoopError.NO_ROUTE logger.debug(f"route with bounds {bounds}") route = self.route(out_ch, in_ch, bounds) logger.debug(f"route: {route}") if route == RouteError.NO_ROUTE: + self.metrics.no_route += 1 return LoopError.NO_ROUTE elif route == RouteError.HAS_BASE_FEE: # try again with a different route @@ -317,6 +349,7 @@ class LoopRouter: wait = self.rpc.rpc.waitsendpay(invoice["payment_hash"]) logger.debug(f"result: {wait}") except RpcError as e: + self.metrics.sendpay_fail += 1 err_data = e.error["data"] err_scid, err_dir = err_data["erring_channel"], err_data["erring_direction"] err_directed_scid = f"{err_scid}/{err_dir}" @@ -324,6 +357,8 @@ class LoopRouter: self.bad_channels.append(err_directed_scid) return LoopError.TRANSIENT else: + self.metrics.sendpay_succeed += 1 + self.metrics.looped_msat += int(amount_msat) return int(amount_msat) def route(self, out_ch: LocalChannel, in_ch: LocalChannel, bounds: TxBounds) -> list[dict] | RouteError: @@ -354,9 +389,8 @@ class LoopRouter: return route def _find_partial_route(self, out_peer: str, in_peer: str, bounds: TxBounds, exclude: list[str]=[]) -> list[dict] | RouteError | TxBounds: - route = self.rpc.rpc.getroute(in_peer, amount_msat=bounds.max_msat, riskfactor=0, fromid=out_peer, exclude=exclude, cltv=CLTV) - route = route["route"] - if route == []: + route = self.rpc.try_getroute(in_peer, amount_msat=bounds.max_msat, riskfactor=0, fromid=out_peer, exclude=exclude, cltv=CLTV) + if route is None: logger.debug(f"no route for {bounds.max_msat}msat {out_peer} -> {in_peer}") return RouteError.NO_ROUTE @@ -368,13 +402,14 @@ class LoopRouter: for hop in route: hop_scid = hop["channel"] hop_dir = hop["direction"] + directed_scid = f"{hop_scid}/{hop_dir}" ch = self._get_directed_scid(hop_scid, hop_dir) if ch["base_fee_millisatoshi"] != 0: - self.nonzero_base_channels.append(f"{hop_scid}/{hop_dir}") + self.nonzero_base_channels.append(directed_scid) error = RouteError.HAS_BASE_FEE - bounds = bounds.restrict_to_zero_fees(ppm=ch["fee_per_millionth"]) + bounds = bounds.restrict_to_zero_fees(ppm=ch["fee_per_millionth"], why=directed_scid) - return bounds if error is None else error + return bounds.raise_max_to_be_satisfiable() if error is None else error return route @@ -405,14 +440,18 @@ class LoopRouter: @dataclass class LoopJob: - out: str - in_: str + out: str # scid + in_: str # scid amount: int @dataclass class LoopJobIdle: sec: int = 10 +class LoopJobDone(Enum): + COMPLETED = "COMPLETED" + ABORTED = "ABORTED" + class AbstractLoopRunner: def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int): self.looper = looper @@ -420,14 +459,17 @@ class AbstractLoopRunner: self.parallelism = parallelism self.bounds_map = {} # map (out:str, in_:str) -> TxBounds. it's a cache so we don't have to try 10 routes every time. - def pop_job(self) -> LoopJob | LoopJobIdle | None: + def pop_job(self) -> LoopJob | LoopJobIdle | LoopJobDone: raise NotImplemented # abstract method def finished_job(self, job: LoopJob, progress: int|LoopError) -> None: raise NotImplemented # abstract method - def run_to_completion(self) -> None: + def run_to_completion(self, exit_on_any_completed:bool = False) -> None: + self.exiting = False + self.exit_on_any_completed = exit_on_any_completed if self.parallelism == 1: + # run inline to aid debugging self._worker_thread() else: with ThreadPoolExecutor(max_workers=self.parallelism) as executor: @@ -451,10 +493,11 @@ class AbstractLoopRunner: def _worker_thread(self) -> None: - while True: + while not self.exiting: job = self.pop_job() logger.debug(f"popped job: {job}") - if job is None: return + if isinstance(job, LoopJobDone): + return self._worker_finished(job) if isinstance(job, LoopJobIdle): logger.debug(f"idling for {job.sec}") @@ -482,6 +525,11 @@ class AbstractLoopRunner: self.bounds_map[(job.out, job.in_)] = bounds return amt_looped + def _worker_finished(self, job: LoopJobDone) -> None: + if job == LoopJobDone.COMPLETED and self.exit_on_any_completed: + logger.debug(f"worker completed -> exiting pool") + self.exiting = True + class LoopPairState: # TODO: use this in MultiLoopBalancer, or stop shoving state in here and put it on LoopBalancer instead. def __init__(self, out: str, in_: str, amount: int): @@ -500,46 +548,46 @@ class LoopBalancer(AbstractLoopRunner): super().__init__(looper, bounds, parallelism) self.state = LoopPairState(out, in_, amount) - def pop_job(self) -> LoopJob | LoopJobIdle | None: - if self.state.tx_fail_count + self.state.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: - logger.info("too many sequential failures: giving up") - return None + def pop_job(self) -> LoopJob | LoopJobIdle | LoopJobDone: + if self.state.tx_fail_count + 10*self.state.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: + logger.info(f"giving up ({self.state.out} -> {self.state.in_}): {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures") + return LoopJobDone.ABORTED if self.state.tx_fail_count + self.state.route_fail_count > 0: # N.B.: last_job_start_time is guaranteed to have been set by now - idle_until = self.last_job_start_time + TX_FAIL_BACKOFF*self.state.failed_tx_throttler + idle_until = self.state.last_job_start_time + TX_FAIL_BACKOFF*self.state.failed_tx_throttler idle_for = idle_until - time.time() if self.state.amount_outstanding != 0 or idle_for > 0: # when we hit transient failures, restrict to just one job in flight at a time. # this is aimed for the initial route building, where multiple jobs in flight is just useless, # but it's not a bad idea for network blips, etc, either. - logger.info(f"throttling ({idle_for}): {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures") + logger.info(f"throttling ({self.state.out} -> {self.state.in_}) for {idle_for:.0f}: {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures") return LoopJobIdle(idle_for) if idle_for > 0 else LoopJobIdle() amount_avail = self.state.amount_target - self.state.amount_looped - self.state.amount_outstanding if amount_avail < self.bounds.min_msat: - if self.state.amount_outstanding == 0: return None # complete! + if self.state.amount_outstanding == 0: return LoopJobDone.COMPLETED return LoopJobIdle() # sending out another job would risk over-transferring amount_this_job = min(amount_avail, self.bounds.max_msat) self.state.amount_outstanding += amount_this_job - self.last_job_start_time = time.time() + self.state.last_job_start_time = time.time() return LoopJob(out=self.state.out, in_=self.state.in_, amount=amount_this_job) def finished_job(self, job: LoopJob, progress: int) -> None: self.state.amount_outstanding -= job.amount if progress == LoopError.NO_ROUTE: self.state.route_fail_count += 1 - if self.state.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0: - self.drop_caches() + self.state.failed_tx_throttler += 10 elif progress == LoopError.TRANSIENT: self.state.tx_fail_count += 1 + self.state.failed_tx_throttler += 1 else: self.state.amount_looped += progress self.state.tx_fail_count = 0 self.state.route_fail_count = 0 self.state.failed_tx_throttler = max(0, self.state.failed_tx_throttler - 0.2) - logger.info(f"loop progressed {progress}: {self.state.amount_looped} of {self.state.amount_target}") + logger.info(f"loop progressed ({job.out} -> {job.in_}) {progress}: {self.state.amount_looped} of {self.state.amount_target}") class MultiLoopBalancer(AbstractLoopRunner): """ @@ -549,7 +597,10 @@ class MultiLoopBalancer(AbstractLoopRunner): def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int=1): super().__init__(looper, bounds, parallelism) self.loops = [] - self.job_count = 0 #< increments on every job so we can grab jobs evenly from each LoopBalancer + # job_index: increments on every job so we can grab jobs evenly from each LoopBalancer. + # in that event that producers are idling, it can actually increment more than once, + # so don't take this too literally + self.job_index = 0 def add_loop(self, out: LocalChannel, in_: LocalChannel, amount: int) -> None: """ @@ -559,18 +610,27 @@ class MultiLoopBalancer(AbstractLoopRunner): logger.info(f"looping from ({out}) to ({in_})") self.loops.append(LoopBalancer(out.scid, in_.scid, amount, self.looper, self.bounds, self.parallelism)) - def pop_job(self) -> LoopJob | LoopJobIdle | None: + def pop_job(self) -> LoopJob | LoopJobIdle | LoopJobDone: # N.B.: this can be called in parallel, so try to be consistent enough to not crash - self.job_count += 1 idle_job = None + abort_job = None for i, _ in enumerate(self.loops): - loop = self.loops[(self.job_count + i) % len(self.loops)] + loop = self.loops[(self.job_index + i) % len(self.loops)] + self.job_index += 1 job = loop.pop_job() - if isinstance(job, LoopJob): return job + if isinstance(job, LoopJob): + return job if isinstance(job, LoopJobIdle): idle_job = LoopJobIdle(min(job.sec, idle_job.sec)) if idle_job is not None else job - return idle_job # could be None, if no jobs can make progress => terminate + if job == LoopJobDone.ABORTED: + abort_job = job + + # either there's a task to idle, or we have to terminate. + # if terminating, terminate ABORTED if any job aborted, else COMPLETED + if idle_job is not None: return idle_job + if abort_job is not None: return abort_job + return LoopJobDone.COMPLETED def finished_job(self, job: LoopJob, progress: int) -> None: # this assumes (enforced externally) that we have only one loop for a given out/in_ pair @@ -578,6 +638,8 @@ class MultiLoopBalancer(AbstractLoopRunner): if l.state.out == job.out and l.state.in_ == job.in_: l.finished_job(job, progress) + logger.info(f"total: {self.looper.metrics}") + def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat: int, max_msat: int, parallelism: int): looper = LoopRouter(rpc) @@ -586,9 +648,12 @@ def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat: balancer.run_to_completion() -def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int): - looper = LoopRouter(rpc) - bounds = TxBounds(min_msat=min_msat, max_msat=max_msat) +def autobalance_once(rpc: RpcHelper, metrics: Metrics, bounds: TxBounds, parallelism: int) -> bool: + """ + autobalances all channels. + returns True if channels are balanced (or as balanced as can be); False if in need of further balancing + """ + looper = LoopRouter(rpc, metrics) balancer = MultiLoopBalancer(looper, bounds, parallelism) channels = [] @@ -602,13 +667,22 @@ def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int): give_to = [ ch for ch in channels if ch.send_ratio > 0.95 ] take_from = [ ch for ch in channels if ch.send_ratio < 0.20 ] + if give_to == [] and take_from == []: + return True + for to in give_to: for from_ in take_from: balancer.add_loop(to, from_, 10000000) - # TODO: run until all jobs have errored (completion) or ONE job has completed, - # then repeat this whole thing; channel balances could have changed. - balancer.run_to_completion() + balancer.run_to_completion(exit_on_any_completed=True) + return False + + +def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int): + bounds = TxBounds(min_msat=min_msat, max_msat=max_msat) + metrics = Metrics() + while not autobalance_once(rpc, metrics, bounds, parallelism): + pass def show_status(rpc: RpcHelper, full: bool=False): """