diff --git a/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane b/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane index baf80b2e..0d74d03e 100755 --- a/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane +++ b/hosts/by-name/servo/services/cryptocurrencies/clightning-sane/clightning-sane @@ -25,12 +25,13 @@ RPC_FILE = "/var/lib/clightning/bitcoin/lightning-rpc" # set this too low and you might get inadvertent channel closures (?) CLTV = 18 -MAX_SEQUENTIAL_JOB_FAILURES = 100 +MAX_SEQUENTIAL_JOB_FAILURES = 200 +DROP_CACHES_AFTER_N_ROUTE_FAILURES = 40 class LoopError(Enum): """ error when trying to loop sats, or when unable to calculate a route for the loop """ - FAIL_TEMPORARY = "FAIL_TEMPORARY" # try again, we'll maybe find a different route - FAIL_PERMANENT = "FAIL_PERMANENT" # not worth trying again, channels are un-loopable + TRANSIENT = "TRANSIENT" # try again, we'll maybe find a different route + NO_ROUTE = "NO_ROUTE" class RouteError(Enum): """ error when calculated a route """ @@ -258,6 +259,10 @@ class LoopRouter: self.bad_channels = [] # list of directed scid self.nonzero_base_channels = [] # list of directed scid + def drop_caches(self) -> None: + logger.info("LoopRouter.drop_caches()") + self.bad_channels = [] + def _get_directed_scid(self, scid: str, direction: int) -> dict: channels = self.rpc.rpc.listchannels(scid)["channels"] channels = [c for c in channels if c["direction"] == direction] @@ -270,22 +275,22 @@ class LoopRouter: if out_ch.directed_scid_from_me in self.bad_channels or in_ch.directed_scid_to_me in self.bad_channels: logger.info(f"loop {out_scid} -> {in_scid} failed in our own channel") - return LoopError.FAIL_PERMANENT + return LoopError.TRANSIENT # bounds = bounds.restrict_to_htlc(out_ch) # htlc bounds seem to be enforced only in the outward direction bounds = bounds.restrict_to_htlc(in_ch) bounds = bounds.restrict_to_zero_fees(in_ch) if not bounds.is_satisfiable(): - return LoopError.FAIL_PERMANENT # no valid bounds + return LoopError.NO_ROUTE logger.debug(f"route with bounds {bounds}") route = self.route(out_ch, in_ch, bounds) logger.debug(f"route: {route}") if route == RouteError.NO_ROUTE: - return LoopError.FAIL_PERMANENT + return LoopError.NO_ROUTE elif route == RouteError.HAS_BASE_FEE: # try again with a different route - return LoopError.FAIL_TEMPORARY + return LoopError.TRANSIENT amount_msat = route[0]["amount_msat"] invoice_id = f"loop-{time.time():.6f}".replace(".", "_") @@ -305,7 +310,7 @@ class LoopRouter: err_directed_scid = f"{err_scid}/{err_dir}" logger.debug(f"ch failed, adding to excludes: {err_directed_scid}; {e.error}") self.bad_channels.append(err_directed_scid) - return LoopError.FAIL_TEMPORARY + return LoopError.TRANSIENT else: return int(amount_msat) @@ -413,6 +418,11 @@ class AbstractLoopRunner: with ThreadPoolExecutor(max_workers=self.parallelism) as executor: _ = list(executor.map(lambda _i: self._try_invoke(self._worker_thread), range(self.parallelism))) + def drop_caches(self) -> None: + logger.info("AbstractLoopRunner.drop_caches()") + self.looper.drop_caches() + self.bounds_map = {} + def _try_invoke(self, f, *args) -> None: """ @@ -440,10 +450,10 @@ class AbstractLoopRunner: bounds = bounds.intersect(TxBounds(max_msat=job.amount)) if not bounds.is_satisfiable(): logger.debug(f"TxBounds for job are unsatisfiable; skipping: {bounds} {job}") - return LoopError.FAIL_PERMANENT + return LoopError.NO_ROUTE amt_looped = self.looper.loop_once(job.out, job.in_, bounds) - if amt_looped in (0, LoopError.FAIL_PERMANENT, LoopError.FAIL_TEMPORARY): + if amt_looped in (0, LoopError.NO_ROUTE, LoopError.TRANSIENT): return amt_looped logger.info(f"looped {amt_looped} from {job.out} -> {job.in_}") @@ -456,15 +466,16 @@ class LoopBalancer(AbstractLoopRunner): def __init__(self, out: str, in_: str, amount: int, looper: LoopRouter, bounds: TxBounds, parallelism: int=1): super().__init__(looper, bounds, parallelism) self.job = LoopJob(out=out, in_=in_, amount=amount) - self.fail_count = 0 self.out = out self.in_ = in_ self.amount_target = amount self.amount_looped = 0 self.amount_outstanding = 0 + self.tx_fail_count = 0 + self.route_fail_count = 0 def pop_job(self) -> LoopJob | None: - if self.fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: return None + if self.tx_fail_count + self.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: return None amount_avail = self.amount_target - self.amount_looped - self.amount_outstanding if amount_avail < self.bounds.min_msat: return None @@ -475,10 +486,15 @@ class LoopBalancer(AbstractLoopRunner): def finished_job(self, job: LoopJob, progress: int) -> None: # TODO: drop bad_channels cache and bounds_map cache after so many errors - if progress == LoopError.FAIL_PERMANENT: self.fail_count += MAX_SEQUENTIAL_JOB_FAILURES - elif progress == LoopError.FAIL_TEMPORARY: self.fail_count += 1 + if progress == LoopError.NO_ROUTE: + self.route_fail_count += 1 + if self.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0: + self.drop_caches() + elif progress == LoopError.TRANSIENT: + self.tx_fail_count += 1 else: - self.fail_count = 0 + self.tx_fail_count = 0 + self.route_fail_count = 0 self.amount_outstanding -= job.amount self.amount_looped += progress logger.info(f"loop progressed {progress}: {self.amount_looped} of {self.amount_target}")