servo: clightning-sane: drop caches after so many failures
This commit is contained in:
parent
585a87130c
commit
2f1e354400
|
@ -25,12 +25,13 @@ RPC_FILE = "/var/lib/clightning/bitcoin/lightning-rpc"
|
|||
# set this too low and you might get inadvertent channel closures (?)
|
||||
CLTV = 18
|
||||
|
||||
MAX_SEQUENTIAL_JOB_FAILURES = 100
|
||||
MAX_SEQUENTIAL_JOB_FAILURES = 200
|
||||
DROP_CACHES_AFTER_N_ROUTE_FAILURES = 40
|
||||
|
||||
class LoopError(Enum):
|
||||
""" error when trying to loop sats, or when unable to calculate a route for the loop """
|
||||
FAIL_TEMPORARY = "FAIL_TEMPORARY" # try again, we'll maybe find a different route
|
||||
FAIL_PERMANENT = "FAIL_PERMANENT" # not worth trying again, channels are un-loopable
|
||||
TRANSIENT = "TRANSIENT" # try again, we'll maybe find a different route
|
||||
NO_ROUTE = "NO_ROUTE"
|
||||
|
||||
class RouteError(Enum):
|
||||
""" error when calculated a route """
|
||||
|
@ -258,6 +259,10 @@ class LoopRouter:
|
|||
self.bad_channels = [] # list of directed scid
|
||||
self.nonzero_base_channels = [] # list of directed scid
|
||||
|
||||
def drop_caches(self) -> None:
|
||||
logger.info("LoopRouter.drop_caches()")
|
||||
self.bad_channels = []
|
||||
|
||||
def _get_directed_scid(self, scid: str, direction: int) -> dict:
|
||||
channels = self.rpc.rpc.listchannels(scid)["channels"]
|
||||
channels = [c for c in channels if c["direction"] == direction]
|
||||
|
@ -270,22 +275,22 @@ class LoopRouter:
|
|||
|
||||
if out_ch.directed_scid_from_me in self.bad_channels or in_ch.directed_scid_to_me in self.bad_channels:
|
||||
logger.info(f"loop {out_scid} -> {in_scid} failed in our own channel")
|
||||
return LoopError.FAIL_PERMANENT
|
||||
return LoopError.TRANSIENT
|
||||
|
||||
# bounds = bounds.restrict_to_htlc(out_ch) # htlc bounds seem to be enforced only in the outward direction
|
||||
bounds = bounds.restrict_to_htlc(in_ch)
|
||||
bounds = bounds.restrict_to_zero_fees(in_ch)
|
||||
if not bounds.is_satisfiable():
|
||||
return LoopError.FAIL_PERMANENT # no valid bounds
|
||||
return LoopError.NO_ROUTE
|
||||
|
||||
logger.debug(f"route with bounds {bounds}")
|
||||
route = self.route(out_ch, in_ch, bounds)
|
||||
logger.debug(f"route: {route}")
|
||||
if route == RouteError.NO_ROUTE:
|
||||
return LoopError.FAIL_PERMANENT
|
||||
return LoopError.NO_ROUTE
|
||||
elif route == RouteError.HAS_BASE_FEE:
|
||||
# try again with a different route
|
||||
return LoopError.FAIL_TEMPORARY
|
||||
return LoopError.TRANSIENT
|
||||
|
||||
amount_msat = route[0]["amount_msat"]
|
||||
invoice_id = f"loop-{time.time():.6f}".replace(".", "_")
|
||||
|
@ -305,7 +310,7 @@ class LoopRouter:
|
|||
err_directed_scid = f"{err_scid}/{err_dir}"
|
||||
logger.debug(f"ch failed, adding to excludes: {err_directed_scid}; {e.error}")
|
||||
self.bad_channels.append(err_directed_scid)
|
||||
return LoopError.FAIL_TEMPORARY
|
||||
return LoopError.TRANSIENT
|
||||
else:
|
||||
return int(amount_msat)
|
||||
|
||||
|
@ -413,6 +418,11 @@ class AbstractLoopRunner:
|
|||
with ThreadPoolExecutor(max_workers=self.parallelism) as executor:
|
||||
_ = list(executor.map(lambda _i: self._try_invoke(self._worker_thread), range(self.parallelism)))
|
||||
|
||||
def drop_caches(self) -> None:
|
||||
logger.info("AbstractLoopRunner.drop_caches()")
|
||||
self.looper.drop_caches()
|
||||
self.bounds_map = {}
|
||||
|
||||
|
||||
def _try_invoke(self, f, *args) -> None:
|
||||
"""
|
||||
|
@ -440,10 +450,10 @@ class AbstractLoopRunner:
|
|||
bounds = bounds.intersect(TxBounds(max_msat=job.amount))
|
||||
if not bounds.is_satisfiable():
|
||||
logger.debug(f"TxBounds for job are unsatisfiable; skipping: {bounds} {job}")
|
||||
return LoopError.FAIL_PERMANENT
|
||||
return LoopError.NO_ROUTE
|
||||
|
||||
amt_looped = self.looper.loop_once(job.out, job.in_, bounds)
|
||||
if amt_looped in (0, LoopError.FAIL_PERMANENT, LoopError.FAIL_TEMPORARY):
|
||||
if amt_looped in (0, LoopError.NO_ROUTE, LoopError.TRANSIENT):
|
||||
return amt_looped
|
||||
|
||||
logger.info(f"looped {amt_looped} from {job.out} -> {job.in_}")
|
||||
|
@ -456,15 +466,16 @@ class LoopBalancer(AbstractLoopRunner):
|
|||
def __init__(self, out: str, in_: str, amount: int, looper: LoopRouter, bounds: TxBounds, parallelism: int=1):
|
||||
super().__init__(looper, bounds, parallelism)
|
||||
self.job = LoopJob(out=out, in_=in_, amount=amount)
|
||||
self.fail_count = 0
|
||||
self.out = out
|
||||
self.in_ = in_
|
||||
self.amount_target = amount
|
||||
self.amount_looped = 0
|
||||
self.amount_outstanding = 0
|
||||
self.tx_fail_count = 0
|
||||
self.route_fail_count = 0
|
||||
|
||||
def pop_job(self) -> LoopJob | None:
|
||||
if self.fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: return None
|
||||
if self.tx_fail_count + self.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES: return None
|
||||
|
||||
amount_avail = self.amount_target - self.amount_looped - self.amount_outstanding
|
||||
if amount_avail < self.bounds.min_msat: return None
|
||||
|
@ -475,10 +486,15 @@ class LoopBalancer(AbstractLoopRunner):
|
|||
|
||||
def finished_job(self, job: LoopJob, progress: int) -> None:
|
||||
# TODO: drop bad_channels cache and bounds_map cache after so many errors
|
||||
if progress == LoopError.FAIL_PERMANENT: self.fail_count += MAX_SEQUENTIAL_JOB_FAILURES
|
||||
elif progress == LoopError.FAIL_TEMPORARY: self.fail_count += 1
|
||||
if progress == LoopError.NO_ROUTE:
|
||||
self.route_fail_count += 1
|
||||
if self.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0:
|
||||
self.drop_caches()
|
||||
elif progress == LoopError.TRANSIENT:
|
||||
self.tx_fail_count += 1
|
||||
else:
|
||||
self.fail_count = 0
|
||||
self.tx_fail_count = 0
|
||||
self.route_fail_count = 0
|
||||
self.amount_outstanding -= job.amount
|
||||
self.amount_looped += progress
|
||||
logger.info(f"loop progressed {progress}: {self.amount_looped} of {self.amount_target}")
|
||||
|
|
Loading…
Reference in New Issue
Block a user