sftpgo_external_auth_hook: refactor

This commit is contained in:
2024-03-14 12:05:57 +00:00
parent 4c1a7fc910
commit c7c2785ad8

View File

@@ -42,7 +42,25 @@ import os
authFail = dict(username="") authFail = dict(username="")
def mkAuthOk(username: str) -> dict: PERM_RO = [ "list", "download" ]
PERM_RW = [
# read-only:
"list",
"download",
# write:
"upload",
"overwrite",
"delete",
"rename",
"create_dirs",
"create_symlinks",
# intentionally omitted:
# "chmod",
# "chown",
# "chtimes",
]
def mkAuthOk(username: str, permissions: dict[str, list[str]]) -> dict:
return dict( return dict(
status = 1, status = 1,
username = username, username = username,
@@ -60,25 +78,7 @@ def mkAuthOk(username: str) -> dict:
# quota_*: 0 means to not use SFTP's quota system # quota_*: 0 means to not use SFTP's quota system
quota_size = 0, quota_size = 0,
quota_files = 0, quota_files = 0,
permissions = { permissions = permissions,
"/": [ "list", "download" ],
"/playground": [
# read-only:
"list",
"download",
# write:
"upload",
"overwrite",
"delete",
"rename",
"create_dirs",
"create_symlinks",
# intentionally omitted:
# "chmod",
# "chown",
# "chtimes",
],
},
upload_bandwidth = 0, upload_bandwidth = 0,
download_bandwidth = 0, download_bandwidth = 0,
filters = dict( filters = dict(
@@ -100,7 +100,7 @@ def isLan(ip: str) -> bool:
def isWireguard(ip: str) -> bool: def isWireguard(ip: str) -> bool:
return ip.startswith("10.0.10.") return ip.startswith("10.0.10.")
def getAuthResponse(username: str, ip: str) -> dict: def getAuthResponse(ip: str, username: str) -> dict:
""" """
return a sftpgo auth response either denying the user or approving them return a sftpgo auth response either denying the user or approving them
with a set of permissions. with a set of permissions.
@@ -108,17 +108,23 @@ def getAuthResponse(username: str, ip: str) -> dict:
if isLan(ip): if isLan(ip):
if username == "anonymous": if username == "anonymous":
# allow anonymous users on the LAN # allow anonymous users on the LAN
return mkAuthOk("anonymous") return mkAuthOk("anonymous", permissions = {
"/": PERM_RO,
"/playground": PERM_RW,
})
if isWireguard(ip): if isWireguard(ip):
# allow any user from wireguard # allow any user from wireguard
return mkAuthOk(username) return mkAuthOk(username, permissions = {
"/": PERM_RW,
"/playground": PERM_RW,
})
return authFail return authFail
def main(): def main():
username = os.environ.get("SFTPGO_AUTHD_USERNAME") ip = os.environ.get("SFTPGO_AUTHD_IP", "")
ip = os.environ.get("SFTPGO_AUTHD_IP") username = os.environ.get("SFTPGO_AUTHD_USERNAME", "")
resp = getAuthResponse(username, ip) resp = getAuthResponse(ip, username)
print(json.dumps(resp)) print(json.dumps(resp))
if __name__ == "__main__": if __name__ == "__main__":