libnm: add test-secret-agent
Implement some basic secret agent functionality in test-networkmanager-service.py, and add test-secret-agent to test that NMSecretAgent works as expected.
This commit is contained in:
@@ -517,6 +517,9 @@ class PermissionDeniedException(dbus.DBusException):
|
||||
class UnknownDeviceException(dbus.DBusException):
|
||||
_dbus_error_name = IFACE_NM + '.UnknownDevice'
|
||||
|
||||
class UnknownConnectionException(dbus.DBusException):
|
||||
_dbus_error_name = IFACE_NM + '.UnknownConnection'
|
||||
|
||||
PM_DEVICES = 'Devices'
|
||||
PM_NETWORKING_ENABLED = 'NetworkingEnabled'
|
||||
PM_WWAN_ENABLED = 'WwanEnabled'
|
||||
@@ -575,6 +578,26 @@ class NetworkManager(ExportedObj):
|
||||
break
|
||||
if not device:
|
||||
raise UnknownDeviceException("No device found for the requested iface.")
|
||||
|
||||
try:
|
||||
connection = settings.get_connection(conpath)
|
||||
except Exception as e:
|
||||
raise UnknownConnectionException("Connection not found")
|
||||
|
||||
# See if we need secrets. For the moment, we only support WPA
|
||||
hash = connection.GetSettings()
|
||||
if hash.has_key('802-11-wireless-security'):
|
||||
s_wsec = hash['802-11-wireless-security']
|
||||
if (s_wsec['key-mgmt'] == 'wpa-psk' and not s_wsec.has_key('psk')):
|
||||
secrets = agent_manager.get_secrets(hash, conpath, '802-11-wireless-security')
|
||||
if secrets is None:
|
||||
raise NoSecretsException("No secret agent available")
|
||||
if not secrets.has_key('802-11-wireless-security'):
|
||||
raise NoSecretsException("No secrets provided")
|
||||
s_wsec = secrets['802-11-wireless-security']
|
||||
if not s_wsec.has_key('psk'):
|
||||
raise NoSecretsException("No secrets provided")
|
||||
|
||||
raise PermissionDeniedException("Not yet implemented")
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE_NM, in_signature='a{sa{sv}}oo', out_signature='oo')
|
||||
@@ -586,7 +609,9 @@ class NetworkManager(ExportedObj):
|
||||
break
|
||||
if not device:
|
||||
raise UnknownDeviceException("No device found for the requested iface.")
|
||||
raise PermissionDeniedException("Not yet implemented")
|
||||
|
||||
conpath = manager.AddConnection(connection)
|
||||
return self.ActivateConnection(conpath, devpath, specific_object)
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE_NM, in_signature='o', out_signature='')
|
||||
def DeactivateConnection(self, active_connection):
|
||||
@@ -822,6 +847,9 @@ class Settings(dbus.service.Object):
|
||||
def auto_remove_next_connection(self):
|
||||
self.remove_next_connection = True;
|
||||
|
||||
def get_connection(self, path):
|
||||
return self.connections[path]
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE_SETTINGS, in_signature='', out_signature='ao')
|
||||
def ListConnections(self):
|
||||
return self.connections.keys()
|
||||
@@ -870,6 +898,65 @@ class Settings(dbus.service.Object):
|
||||
def Quit(self):
|
||||
mainloop.quit()
|
||||
|
||||
###################################################################
|
||||
IFACE_AGENT_MANAGER = 'org.freedesktop.NetworkManager.AgentManager'
|
||||
IFACE_AGENT = 'org.freedesktop.NetworkManager.SecretAgent'
|
||||
|
||||
PATH_SECRET_AGENT = '/org/freedesktop/NetworkManager/SecretAgent'
|
||||
|
||||
FLAG_ALLOW_INTERACTION = 0x1
|
||||
FLAG_REQUEST_NEW = 0x2
|
||||
FLAG_USER_REQUESTED = 0x4
|
||||
|
||||
class NoSecretsException(dbus.DBusException):
|
||||
_dbus_error_name = IFACE_AGENT_MANAGER + '.NoSecrets'
|
||||
|
||||
class UserCanceledException(dbus.DBusException):
|
||||
_dbus_error_name = IFACE_AGENT_MANAGER + '.UserCanceled'
|
||||
|
||||
class AgentManager(dbus.service.Object):
|
||||
def __init__(self, bus, object_path):
|
||||
dbus.service.Object.__init__(self, bus, object_path)
|
||||
self.agents = {}
|
||||
self.bus = bus
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE_AGENT_MANAGER,
|
||||
in_signature='s', out_signature='',
|
||||
sender_keyword='sender')
|
||||
def Register(self, name, sender=None):
|
||||
self.RegisterWithCapabilities(name, 0, sender)
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE_AGENT_MANAGER,
|
||||
in_signature='su', out_signature='',
|
||||
sender_keyword='sender')
|
||||
def RegisterWithCapabilities(self, name, caps, sender=None):
|
||||
self.agents[sender] = self.bus.get_object(sender, PATH_SECRET_AGENT)
|
||||
|
||||
@dbus.service.method(dbus_interface=IFACE_AGENT_MANAGER,
|
||||
in_signature='', out_signature='',
|
||||
sender_keyword='sender')
|
||||
def Unregister(self, sender=None):
|
||||
del self.agents[sender]
|
||||
|
||||
def get_secrets(self, connection, path, setting_name):
|
||||
if len(self.agents) == 0:
|
||||
return None
|
||||
|
||||
secrets = {}
|
||||
for sender in self.agents:
|
||||
agent = self.agents[sender]
|
||||
try:
|
||||
secrets = agent.GetSecrets(connection, path, setting_name,
|
||||
dbus.Array([], 's'),
|
||||
FLAG_ALLOW_INTERACTION | FLAG_USER_REQUESTED,
|
||||
dbus_interface=IFACE_AGENT)
|
||||
break
|
||||
except dbus.DBusException as e:
|
||||
if e.get_dbus_name() == IFACE_AGENT + '.UserCanceled':
|
||||
raise UserCanceledException('User canceled')
|
||||
continue
|
||||
return secrets
|
||||
|
||||
###################################################################
|
||||
|
||||
def stdin_cb(io, condition):
|
||||
@@ -885,9 +972,10 @@ def main():
|
||||
|
||||
bus = dbus.SessionBus()
|
||||
|
||||
global manager, settings
|
||||
global manager, settings, agent_manager
|
||||
manager = NetworkManager(bus, "/org/freedesktop/NetworkManager")
|
||||
settings = Settings(bus, "/org/freedesktop/NetworkManager/Settings")
|
||||
agent_manager = AgentManager(bus, "/org/freedesktop/NetworkManager/AgentManager")
|
||||
|
||||
if not bus.request_name("org.freedesktop.NetworkManager"):
|
||||
sys.exit(1)
|
||||
|
Reference in New Issue
Block a user