158 lines
4.1 KiB
Python
Executable File
158 lines
4.1 KiB
Python
Executable File
#!/usr/bin/env nix-shell
|
|
#!nix-shell -i python3 -p "python3.withPackages (ps: [ ])"
|
|
# vim: set filetype=python :
|
|
#
|
|
# available environment variables:
|
|
# - SFTPGO_AUTHD_USERNAME
|
|
# - SFTPGO_AUTHD_USER
|
|
# - SFTPGO_AUTHD_IP
|
|
# - SFTPGO_AUTHD_PROTOCOL = { "DAV", "FTP", "HTTP", "SSH" }
|
|
# - SFTPGO_AUTHD_PASSWORD
|
|
# - SFTPGO_AUTHD_PUBLIC_KEY
|
|
# - SFTPGO_AUTHD_KEYBOARD_INTERACTIVE
|
|
# - SFTPGO_AUTHD_TLS_CERT
|
|
#
|
|
# user permissions:
|
|
# - see <repo:drakkan/sftpgo:internal/dataprovider/user.go>
|
|
# - "*" = grant all permissions
|
|
# - read-only perms:
|
|
# - "list" = list files and directories
|
|
# - "download"
|
|
# - rw perms:
|
|
# - "upload"
|
|
# - "overwrite" = allow uploads to replace existing files
|
|
# - "delete" = delete files and directories
|
|
# - "delete_files"
|
|
# - "delete_dirs"
|
|
# - "rename" = rename files and directories
|
|
# - "rename_files"
|
|
# - "rename_dirs"
|
|
# - "create_dirs"
|
|
# - "create_symlinks"
|
|
# - "chmod"
|
|
# - "chown"
|
|
# - "chtimes" = change atime/mtime (access and modification times)
|
|
#
|
|
# home_dir:
|
|
# - it seems (empirically) that a user can't cd above their home directory.
|
|
# though i don't have a reference for that in the docs.
|
|
|
|
import crypt
|
|
import json
|
|
import os
|
|
|
|
from hmac import compare_digest
|
|
|
|
authFail = dict(username="")
|
|
|
|
PERM_RO = [ "list", "download" ]
|
|
PERM_RW = [
|
|
# read-only:
|
|
"list",
|
|
"download",
|
|
# write:
|
|
"upload",
|
|
"overwrite",
|
|
"delete",
|
|
"rename",
|
|
"create_dirs",
|
|
"create_symlinks",
|
|
# intentionally omitted:
|
|
# "chmod",
|
|
# "chown",
|
|
# "chtimes",
|
|
]
|
|
|
|
TRUSTED_CREDS = [
|
|
# /etc/shadow style creds.
|
|
# mkpasswd -m sha-512
|
|
# $<method>$<salt>$<hash>
|
|
"$6$Zq3c2u4ghUH4S6EP$pOuRt13sEKfX31OqPbbd1LuhS21C9MICMc94iRdTAgdAcJ9h95gQH/6Jf6Ie4Obb0oxQtojRJ1Pd/9QHOlFMW." #< m. rocket boy
|
|
]
|
|
|
|
def mkAuthOk(username: str, permissions: dict[str, list[str]]) -> dict:
|
|
return dict(
|
|
status = 1,
|
|
username = username,
|
|
expiration_date = 0,
|
|
home_dir = "/var/export",
|
|
# uid/gid 0 means to inherit sftpgo uid.
|
|
# - i.e. users can't read files which Linux user `sftpgo` can't read
|
|
# - uploaded files belong to Linux user `sftpgo`
|
|
# other uid/gid values aren't possible for localfs backend, unless i let sftpgo use `sudo`.
|
|
uid = 0,
|
|
gid = 0,
|
|
# uid = 65534,
|
|
# gid = 65534,
|
|
max_sessions = 0,
|
|
# quota_*: 0 means to not use SFTP's quota system
|
|
quota_size = 0,
|
|
quota_files = 0,
|
|
permissions = permissions,
|
|
upload_bandwidth = 0,
|
|
download_bandwidth = 0,
|
|
filters = dict(
|
|
allowed_ip = [],
|
|
denied_ip = [],
|
|
),
|
|
public_keys = [],
|
|
# other fields:
|
|
# ? groups
|
|
# ? virtual_folders
|
|
)
|
|
|
|
def isLan(ip: str) -> bool:
|
|
return ip.startswith("10.78.76.") \
|
|
or ip.startswith("10.78.77.") \
|
|
or ip.startswith("10.78.78.") \
|
|
or ip.startswith("10.78.79.")
|
|
|
|
def isWireguard(ip: str) -> bool:
|
|
return ip.startswith("10.0.10.")
|
|
|
|
def isTrustedCred(password: str) -> bool:
|
|
for cred in TRUSTED_CREDS:
|
|
_, method, salt, hash_ = cred.split("$")
|
|
# assert method == "6", f"unrecognized crypt entry: {cred}"
|
|
if crypt.crypt(password, f"${method}${salt}") == cred:
|
|
return True
|
|
|
|
return False
|
|
|
|
def getAuthResponse(ip: str, username: str, password: str) -> dict:
|
|
"""
|
|
return a sftpgo auth response either denying the user or approving them
|
|
with a set of permissions.
|
|
"""
|
|
if isTrustedCred(password) and username != "colin":
|
|
# allow r/w access from those with a special token
|
|
return mkAuthOk(username, permissions = {
|
|
"/": PERM_RW,
|
|
"/playground": PERM_RW,
|
|
})
|
|
if isWireguard(ip):
|
|
# allow any user from wireguard
|
|
return mkAuthOk(username, permissions = {
|
|
"/": PERM_RW,
|
|
"/playground": PERM_RW,
|
|
})
|
|
if isLan(ip):
|
|
if username == "anonymous":
|
|
# allow anonymous users on the LAN
|
|
return mkAuthOk("anonymous", permissions = {
|
|
"/": PERM_RO,
|
|
"/playground": PERM_RW,
|
|
})
|
|
|
|
return authFail
|
|
|
|
def main():
|
|
ip = os.environ.get("SFTPGO_AUTHD_IP", "")
|
|
username = os.environ.get("SFTPGO_AUTHD_USERNAME", "")
|
|
password = os.environ.get("SFTPGO_AUTHD_PASSWORD", "")
|
|
resp = getAuthResponse(ip, username, password)
|
|
print(json.dumps(resp))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|