servo: clightning-sane: centralize metric reporting, fix so we blacklist our own channels less frequently

This commit is contained in:
Colin 2024-01-13 04:47:20 +00:00
parent 103a300e77
commit 99858c1384

View File

@ -29,7 +29,6 @@ CLTV = 18
# note that the initial route building process can involve 10-20 "transient" failures, as it discovers dead channels.
TX_FAIL_BACKOFF = 0.8
MAX_SEQUENTIAL_JOB_FAILURES = 200
DROP_CACHES_AFTER_N_ROUTE_FAILURES = 40
class LoopError(Enum):
""" error when trying to loop sats, or when unable to calculate a route for the loop """
@ -41,6 +40,17 @@ class RouteError(Enum):
HAS_BASE_FEE = "HAS_BASE_FEE"
NO_ROUTE = "NO_ROUTE"
class Metrics:
looped_msat: int = 0
sendpay_fail: int = 0
sendpay_succeed: int = 0
own_bad_channel: int = 0
no_route: int = 0
in_ch_unsatisfiable: int = 0
def __repr__(self) -> str:
return f"looped:{self.looped_msat}, tx:{self.sendpay_succeed}, tx_fail:{self.sendpay_fail}, own_bad_ch:{self.own_bad_channel}, no_route:{self.no_route}, in_ch_restricted:{self.in_ch_unsatisfiable}"
@dataclass
class TxBounds:
max_msat: int
@ -52,6 +62,12 @@ class TxBounds:
def is_satisfiable(self) -> bool:
return self.min_msat <= self.max_msat
def raise_max_to_be_satisfiable(self) -> "Self":
if self.max_msat < self.min_msat:
logger.debug(f"raising max_msat to be consistent: {self.max_msat} -> {self.min_msat}")
return TxBounds(self.min_msat, self.min_msat)
return TxBounds(min_msat=self.min_msat, max_msat=self.max_msat)
def intersect(self, other: "TxBounds") -> "Self":
return TxBounds(
min_msat=max(self.min_msat, other.min_msat),
@ -265,9 +281,22 @@ class RpcHelper:
assert len(channels) == 1, f"expected exactly 1 channel, got: {channels}"
return channels[0]
def try_getroute(self, *args, **kwargs) -> dict | None:
""" wrapper for getroute which returns None instead of error if no route exists """
try:
route = self.rpc.getroute(*args, **kwargs)
except RpcError as e:
logger.debug(f"rpc failed: {e}")
return None
else:
route = route["route"]
if route == []: return None
return route
class LoopRouter:
def __init__(self, rpc: RpcHelper):
def __init__(self, rpc: RpcHelper, metrics: Metrics = None):
self.rpc = rpc
self.metrics = metrics or Metrics()
self.bad_channels = [] # list of directed scid
self.nonzero_base_channels = [] # list of directed scid
@ -287,18 +316,21 @@ class LoopRouter:
if out_ch.directed_scid_from_me in self.bad_channels or in_ch.directed_scid_to_me in self.bad_channels:
logger.info(f"loop {out_scid} -> {in_scid} failed in our own channel")
self.metrics.own_bad_channel += 1
return LoopError.TRANSIENT
# bounds = bounds.restrict_to_htlc(out_ch) # htlc bounds seem to be enforced only in the outward direction
bounds = bounds.restrict_to_htlc(in_ch)
bounds = bounds.restrict_to_zero_fees(in_ch)
if not bounds.is_satisfiable():
self.metrics.in_ch_unsatisfiable += 1
return LoopError.NO_ROUTE
logger.debug(f"route with bounds {bounds}")
route = self.route(out_ch, in_ch, bounds)
logger.debug(f"route: {route}")
if route == RouteError.NO_ROUTE:
self.metrics.no_route += 1
return LoopError.NO_ROUTE
elif route == RouteError.HAS_BASE_FEE:
# try again with a different route
@ -317,6 +349,7 @@ class LoopRouter:
wait = self.rpc.rpc.waitsendpay(invoice["payment_hash"])
logger.debug(f"result: {wait}")
except RpcError as e:
self.metrics.sendpay_fail += 1
err_data = e.error["data"]
err_scid, err_dir = err_data["erring_channel"], err_data["erring_direction"]
err_directed_scid = f"{err_scid}/{err_dir}"
@ -324,6 +357,8 @@ class LoopRouter:
self.bad_channels.append(err_directed_scid)
return LoopError.TRANSIENT
else:
self.metrics.sendpay_succeed += 1
self.metrics.looped_msat += int(amount_msat)
return int(amount_msat)
def route(self, out_ch: LocalChannel, in_ch: LocalChannel, bounds: TxBounds) -> list[dict] | RouteError:
@ -354,9 +389,8 @@ class LoopRouter:
return route
def _find_partial_route(self, out_peer: str, in_peer: str, bounds: TxBounds, exclude: list[str]=[]) -> list[dict] | RouteError | TxBounds:
route = self.rpc.rpc.getroute(in_peer, amount_msat=bounds.max_msat, riskfactor=0, fromid=out_peer, exclude=exclude, cltv=CLTV)
route = route["route"]
if route == []:
route = self.rpc.try_getroute(in_peer, amount_msat=bounds.max_msat, riskfactor=0, fromid=out_peer, exclude=exclude, cltv=CLTV)
if route is None:
logger.debug(f"no route for {bounds.max_msat}msat {out_peer} -> {in_peer}")
return RouteError.NO_ROUTE
@ -368,13 +402,14 @@ class LoopRouter:
for hop in route:
hop_scid = hop["channel"]
hop_dir = hop["direction"]
directed_scid = f"{hop_scid}/{hop_dir}"
ch = self._get_directed_scid(hop_scid, hop_dir)
if ch["base_fee_millisatoshi"] != 0:
self.nonzero_base_channels.append(f"{hop_scid}/{hop_dir}")
self.nonzero_base_channels.append(directed_scid)
error = RouteError.HAS_BASE_FEE
bounds = bounds.restrict_to_zero_fees(ppm=ch["fee_per_millionth"])
bounds = bounds.restrict_to_zero_fees(ppm=ch["fee_per_millionth"], why=directed_scid)
return bounds if error is None else error
return bounds.raise_max_to_be_satisfiable() if error is None else error
return route
@ -405,14 +440,18 @@ class LoopRouter:
@dataclass
class LoopJob:
out: str
in_: str
out: str # scid
in_: str # scid
amount: int
@dataclass
class LoopJobIdle:
sec: int = 10
class LoopJobDone(Enum):
COMPLETED = "COMPLETED"
ABORTED = "ABORTED"
class AbstractLoopRunner:
def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int):
self.looper = looper
@ -420,14 +459,17 @@ class AbstractLoopRunner:
self.parallelism = parallelism
self.bounds_map = {} # map (out:str, in_:str) -> TxBounds. it's a cache so we don't have to try 10 routes every time.
def pop_job(self) -> LoopJob | LoopJobIdle | None:
def pop_job(self) -> LoopJob | LoopJobIdle | LoopJobDone:
raise NotImplemented # abstract method
def finished_job(self, job: LoopJob, progress: int|LoopError) -> None:
raise NotImplemented # abstract method
def run_to_completion(self) -> None:
def run_to_completion(self, exit_on_any_completed:bool = False) -> None:
self.exiting = False
self.exit_on_any_completed = exit_on_any_completed
if self.parallelism == 1:
# run inline to aid debugging
self._worker_thread()
else:
with ThreadPoolExecutor(max_workers=self.parallelism) as executor:
@ -451,10 +493,11 @@ class AbstractLoopRunner:
def _worker_thread(self) -> None:
while True:
while not self.exiting:
job = self.pop_job()
logger.debug(f"popped job: {job}")
if job is None: return
if isinstance(job, LoopJobDone):
return self._worker_finished(job)
if isinstance(job, LoopJobIdle):
logger.debug(f"idling for {job.sec}")
@ -482,6 +525,11 @@ class AbstractLoopRunner:
self.bounds_map[(job.out, job.in_)] = bounds
return amt_looped
def _worker_finished(self, job: LoopJobDone) -> None:
if job == LoopJobDone.COMPLETED and self.exit_on_any_completed:
logger.debug(f"worker completed -> exiting pool")
self.exiting = True
class LoopPairState:
# TODO: use this in MultiLoopBalancer, or stop shoving state in here and put it on LoopBalancer instead.
def __init__(self, out: str, in_: str, amount: int):
@ -500,46 +548,46 @@ class LoopBalancer(AbstractLoopRunner):
super().__init__(looper, bounds, parallelism)
self.state = LoopPairState(out, in_, amount)
def pop_job(self) -> LoopJob | LoopJobIdle | None:
if self.state.tx_fail_count + self.state.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES:
logger.info("too many sequential failures: giving up")
return None
def pop_job(self) -> LoopJob | LoopJobIdle | LoopJobDone:
if self.state.tx_fail_count + 10*self.state.route_fail_count >= MAX_SEQUENTIAL_JOB_FAILURES:
logger.info(f"giving up ({self.state.out} -> {self.state.in_}): {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures")
return LoopJobDone.ABORTED
if self.state.tx_fail_count + self.state.route_fail_count > 0:
# N.B.: last_job_start_time is guaranteed to have been set by now
idle_until = self.last_job_start_time + TX_FAIL_BACKOFF*self.state.failed_tx_throttler
idle_until = self.state.last_job_start_time + TX_FAIL_BACKOFF*self.state.failed_tx_throttler
idle_for = idle_until - time.time()
if self.state.amount_outstanding != 0 or idle_for > 0:
# when we hit transient failures, restrict to just one job in flight at a time.
# this is aimed for the initial route building, where multiple jobs in flight is just useless,
# but it's not a bad idea for network blips, etc, either.
logger.info(f"throttling ({idle_for}): {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures")
logger.info(f"throttling ({self.state.out} -> {self.state.in_}) for {idle_for:.0f}: {self.state.tx_fail_count} tx failures, {self.state.route_fail_count} route failures")
return LoopJobIdle(idle_for) if idle_for > 0 else LoopJobIdle()
amount_avail = self.state.amount_target - self.state.amount_looped - self.state.amount_outstanding
if amount_avail < self.bounds.min_msat:
if self.state.amount_outstanding == 0: return None # complete!
if self.state.amount_outstanding == 0: return LoopJobDone.COMPLETED
return LoopJobIdle() # sending out another job would risk over-transferring
amount_this_job = min(amount_avail, self.bounds.max_msat)
self.state.amount_outstanding += amount_this_job
self.last_job_start_time = time.time()
self.state.last_job_start_time = time.time()
return LoopJob(out=self.state.out, in_=self.state.in_, amount=amount_this_job)
def finished_job(self, job: LoopJob, progress: int) -> None:
self.state.amount_outstanding -= job.amount
if progress == LoopError.NO_ROUTE:
self.state.route_fail_count += 1
if self.state.route_fail_count % DROP_CACHES_AFTER_N_ROUTE_FAILURES == 0:
self.drop_caches()
self.state.failed_tx_throttler += 10
elif progress == LoopError.TRANSIENT:
self.state.tx_fail_count += 1
self.state.failed_tx_throttler += 1
else:
self.state.amount_looped += progress
self.state.tx_fail_count = 0
self.state.route_fail_count = 0
self.state.failed_tx_throttler = max(0, self.state.failed_tx_throttler - 0.2)
logger.info(f"loop progressed {progress}: {self.state.amount_looped} of {self.state.amount_target}")
logger.info(f"loop progressed ({job.out} -> {job.in_}) {progress}: {self.state.amount_looped} of {self.state.amount_target}")
class MultiLoopBalancer(AbstractLoopRunner):
"""
@ -549,7 +597,10 @@ class MultiLoopBalancer(AbstractLoopRunner):
def __init__(self, looper: LoopRouter, bounds: TxBounds, parallelism: int=1):
super().__init__(looper, bounds, parallelism)
self.loops = []
self.job_count = 0 #< increments on every job so we can grab jobs evenly from each LoopBalancer
# job_index: increments on every job so we can grab jobs evenly from each LoopBalancer.
# in that event that producers are idling, it can actually increment more than once,
# so don't take this too literally
self.job_index = 0
def add_loop(self, out: LocalChannel, in_: LocalChannel, amount: int) -> None:
"""
@ -559,18 +610,27 @@ class MultiLoopBalancer(AbstractLoopRunner):
logger.info(f"looping from ({out}) to ({in_})")
self.loops.append(LoopBalancer(out.scid, in_.scid, amount, self.looper, self.bounds, self.parallelism))
def pop_job(self) -> LoopJob | LoopJobIdle | None:
def pop_job(self) -> LoopJob | LoopJobIdle | LoopJobDone:
# N.B.: this can be called in parallel, so try to be consistent enough to not crash
self.job_count += 1
idle_job = None
abort_job = None
for i, _ in enumerate(self.loops):
loop = self.loops[(self.job_count + i) % len(self.loops)]
loop = self.loops[(self.job_index + i) % len(self.loops)]
self.job_index += 1
job = loop.pop_job()
if isinstance(job, LoopJob): return job
if isinstance(job, LoopJob):
return job
if isinstance(job, LoopJobIdle):
idle_job = LoopJobIdle(min(job.sec, idle_job.sec)) if idle_job is not None else job
return idle_job # could be None, if no jobs can make progress => terminate
if job == LoopJobDone.ABORTED:
abort_job = job
# either there's a task to idle, or we have to terminate.
# if terminating, terminate ABORTED if any job aborted, else COMPLETED
if idle_job is not None: return idle_job
if abort_job is not None: return abort_job
return LoopJobDone.COMPLETED
def finished_job(self, job: LoopJob, progress: int) -> None:
# this assumes (enforced externally) that we have only one loop for a given out/in_ pair
@ -578,6 +638,8 @@ class MultiLoopBalancer(AbstractLoopRunner):
if l.state.out == job.out and l.state.in_ == job.in_:
l.finished_job(job, progress)
logger.info(f"total: {self.looper.metrics}")
def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat: int, max_msat: int, parallelism: int):
looper = LoopRouter(rpc)
@ -586,9 +648,12 @@ def balance_loop(rpc: RpcHelper, out: str, in_: str, amount_msat: int, min_msat:
balancer.run_to_completion()
def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int):
looper = LoopRouter(rpc)
bounds = TxBounds(min_msat=min_msat, max_msat=max_msat)
def autobalance_once(rpc: RpcHelper, metrics: Metrics, bounds: TxBounds, parallelism: int) -> bool:
"""
autobalances all channels.
returns True if channels are balanced (or as balanced as can be); False if in need of further balancing
"""
looper = LoopRouter(rpc, metrics)
balancer = MultiLoopBalancer(looper, bounds, parallelism)
channels = []
@ -602,13 +667,22 @@ def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int):
give_to = [ ch for ch in channels if ch.send_ratio > 0.95 ]
take_from = [ ch for ch in channels if ch.send_ratio < 0.20 ]
if give_to == [] and take_from == []:
return True
for to in give_to:
for from_ in take_from:
balancer.add_loop(to, from_, 10000000)
# TODO: run until all jobs have errored (completion) or ONE job has completed,
# then repeat this whole thing; channel balances could have changed.
balancer.run_to_completion()
balancer.run_to_completion(exit_on_any_completed=True)
return False
def autobalance(rpc: RpcHelper, min_msat: int, max_msat: int, parallelism: int):
bounds = TxBounds(min_msat=min_msat, max_msat=max_msat)
metrics = Metrics()
while not autobalance_once(rpc, metrics, bounds, parallelism):
pass
def show_status(rpc: RpcHelper, full: bool=False):
"""