clients/tests: expose IP and DHCP configs in test-networkmanager-service.py stub

For adding tests what nmcli shows regarding IP and DHCP configuration,
let the stub service generate config instances.
This commit is contained in:
Thomas Haller
2018-06-06 20:28:17 +02:00
parent dd2da759de
commit f671fa5137
9 changed files with 682 additions and 13 deletions

View File

@@ -22,6 +22,7 @@ import dbus.mainloop.glib
import random
import uuid
import hashlib
import socket
import collections
###############################################################################
@@ -46,6 +47,88 @@ class TestError(AssertionError):
class Util:
PY3 = (sys.version_info[0] == 3)
@staticmethod
def addr_family_check(family, allow_af_unspec = False):
if family == socket.AF_INET:
return
if family == socket.AF_INET6:
return
if allow_af_unspec and family == socket.AF_UNSPEC:
return
raise TestError('invalid address family %s' % (family))
@staticmethod
def ip_addr_pton(addr, family=None):
if addr is None:
return (None, None)
if family is not None and family is not socket.AF_UNSPEC:
Util.addr_family_check(family)
a = socket.inet_pton(family, addr)
else:
a = None
family = None
try:
a = socket.inet_pton(socket.AF_INET, addr)
family = socket.AF_INET
except:
a = socket.inet_pton(socket.AF_INET6, addr)
family = socket.AF_INET6
if Util.PY3:
a = tuple([int(c) for c in a])
else:
a = tuple([ord(c) for c in a])
return (a, family)
@staticmethod
def ip_addr_ntop(addr, family = None):
if Util.PY3:
a = bytes(addr)
else:
a = ''.join([chr(c) for c in addr])
if len(a) == 4:
f = socket.AF_INET
elif len(a) == 16:
f = socket.AF_INET6
else:
raise TestError("Invalid binary IP address '%s'" % (repr(addr)))
if family is not None and f != family:
raise TestError("Unexpected address family. Expected %s but ip address was %s" % (family, repr(addr)))
return socket.inet_ntop(f, a)
@staticmethod
def ip_addr_norm(addr, family = None):
a, family = Util.ip_addr_pton(addr, family)
return (Util.ip_addr_ntop(a, family), family)
@staticmethod
def ip4_addr_ne32(addr):
a, family = Util.ip_addr_pton(addr, socket.AF_INET)
n = 0
for i in range(4):
n = (n << 8) + a[i]
return socket.htonl(n)
@staticmethod
def ip6_addr_ay(addr):
return Util.ip_addr_pton(addr, socket.AF_INET6)[0]
@staticmethod
def ip_net_parse(net, family = None):
parts = net.split('/')
if len(parts) != 2:
raise TestError("Invalid IP network '%s' has not '/' for the prefix length" % (net))
prefix = int(parts[1])
addr, family = Util.ip_addr_norm(parts[0], family)
if family == socket.AF_INET:
if prefix < 0 or prefix > 32:
raise TestError("Invalid prefix length for IPv4 address '%s'" % (net))
else:
if prefix < 0 or prefix > 128:
raise TestError("Invalid prefix length for IPv4 address '%s'" % (net))
return (addr, prefix, family)
class RandomSeed():
def __init__(self, seed):
self.cnt = 0
@@ -152,6 +235,35 @@ class Util:
def random_mac(seed):
return '%02X:%02X:%02X:%02X:%02X:%02X' % tuple(Util.random_stream(seed, 6))
@staticmethod
def random_ip(seed, net = None, family = None):
if net is not None:
mask, prefix, family = Util.ip_net_parse(net, family)
a_mask, unused = Util.ip_addr_pton(mask, family)
else:
prefix = None
Util.addr_family_check(family)
if family == socket.AF_INET:
l = 4
else:
l = 16
a = tuple(Util.random_stream(seed, l))
if prefix is not None:
a2 = []
for i in range(l):
if prefix == 0:
c = a[i]
elif prefix >= 8:
c = a_mask[i]
prefix -= 8
else:
c = 0xFF & (0xFF << (8 - prefix))
c = (a[i] & ~c) | (a_mask[i] & c)
prefix = 0
a2.append(c)
a = tuple(a2)
return (Util.ip_addr_ntop(a, family), family)
@staticmethod
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
@@ -206,6 +318,7 @@ class Util:
###############################################################################
IFACE_DBUS = 'org.freedesktop.DBus'
IFACE_OBJECT_MANAGER = 'org.freedesktop.DBus.ObjectManager'
IFACE_CONNECTION = 'org.freedesktop.NetworkManager.Settings.Connection'
IFACE_DEVICE = 'org.freedesktop.NetworkManager.Device'
IFACE_WIFI = 'org.freedesktop.NetworkManager.Device.Wireless'
@@ -222,7 +335,10 @@ IFACE_WIMAX_NSP = 'org.freedesktop.NetworkManager.WiMax.Nsp'
IFACE_ACTIVE_CONNECTION = 'org.freedesktop.NetworkManager.Connection.Active'
IFACE_VPN_CONNECTION = 'org.freedesktop.NetworkManager.VPN.Connection'
IFACE_DNS_MANAGER = 'org.freedesktop.NetworkManager.DnsManager'
IFACE_OBJECT_MANAGER = 'org.freedesktop.DBus.ObjectManager'
IFACE_IP4_CONFIG = 'org.freedesktop.NetworkManager.IP4Config'
IFACE_IP6_CONFIG = 'org.freedesktop.NetworkManager.IP6Config'
IFACE_DHCP4_CONFIG = 'org.freedesktop.NetworkManager.DHCP4Config'
IFACE_DHCP6_CONFIG = 'org.freedesktop.NetworkManager.DHCP6Config'
###############################################################################
@@ -577,16 +693,21 @@ class Device(ExportedObj):
ExportedObj.__init__(self, ExportedObj.create_path(Device), ident)
self.ip4_config = None
self.ip6_config = None
self.dhcp4_config = None
self.dhcp6_config = None
props = {
PRP_DEVICE_UDI: "/sys/devices/virtual/%s" % (iface),
PRP_DEVICE_IFACE: iface,
PRP_DEVICE_DRIVER: "virtual",
PRP_DEVICE_STATE: dbus.UInt32(NM.DeviceState.UNAVAILABLE),
PRP_DEVICE_ACTIVE_CONNECTION: ExportedObj.to_path(None),
PRP_DEVICE_IP4_CONFIG: ExportedObj.to_path(None),
PRP_DEVICE_IP6_CONFIG: ExportedObj.to_path(None),
PRP_DEVICE_DHCP4_CONFIG: ExportedObj.to_path(None),
PRP_DEVICE_DHCP6_CONFIG: ExportedObj.to_path(None),
PRP_DEVICE_IP4_CONFIG: ExportedObj.to_path(self.ip4_config),
PRP_DEVICE_IP6_CONFIG: ExportedObj.to_path(self.ip6_config),
PRP_DEVICE_DHCP4_CONFIG: ExportedObj.to_path(self.dhcp4_config),
PRP_DEVICE_DHCP6_CONFIG: ExportedObj.to_path(self.dhcp6_config),
PRP_DEVICE_MANAGED: True,
PRP_DEVICE_AUTOCONNECT: True,
PRP_DEVICE_DEVICE_TYPE: dbus.UInt32(devtype),
@@ -595,6 +716,34 @@ class Device(ExportedObj):
self.dbus_interface_add(IFACE_DEVICE, props, Device.PropertiesChanged)
def start(self):
self.ip4_config = IP4Config()
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_IP4_CONFIG, ExportedObj.to_path(self.ip4_config))
self.ip6_config = IP6Config()
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_IP6_CONFIG, ExportedObj.to_path(self.ip6_config))
self.dhcp4_config = Dhcp4Config()
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_DHCP4_CONFIG, ExportedObj.to_path(self.dhcp4_config))
self.dhcp6_config = Dhcp6Config()
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_DHCP6_CONFIG, ExportedObj.to_path(self.dhcp6_config))
def stop(self):
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_IP4_CONFIG, ExportedObj.to_path(None))
if self.ip4_config is not None:
self.ip4_config.unexport()
self.ip4_config = None
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_IP6_CONFIG, ExportedObj.to_path(None))
if self.ip6_config is not None:
self.ip6_config.unexport()
self.ip6_config = None
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_DHCP4_CONFIG, ExportedObj.to_path(None))
if self.dhcp4_config is not None:
self.dhcp4_config.unexport()
self.dhcp4_config = None
self._dbus_property_set(IFACE_DEVICE, PRP_DEVICE_DHCP6_CONFIG, ExportedObj.to_path(None))
if self.dhcp6_config is not None:
self.dhcp6_config.unexport()
self.dhcp6_config = None
@dbus.service.method(dbus_interface=IFACE_DEVICE, in_signature='', out_signature='')
def Disconnect(self):
pass
@@ -1283,9 +1432,11 @@ class NetworkManager(ExportedObj):
self._dbus_property_set(IFACE_NM, PRP_NM_DEVICES, ExportedObj.to_path_array(self.devices))
self._dbus_property_set(IFACE_NM, PRP_NM_ALL_DEVICES, ExportedObj.to_path_array(self.devices))
self.DeviceAdded(ExportedObj.to_path(device))
device.start()
return device
def remove_device(self, device):
device.stop()
self.devices.remove(device)
self._dbus_property_set(IFACE_NM, PRP_NM_DEVICES, ExportedObj.to_path_array(self.devices))
self._dbus_property_set(IFACE_NM, PRP_NM_ALL_DEVICES, ExportedObj.to_path_array(self.devices))
@@ -1601,6 +1752,350 @@ class Settings(ExportedObj):
###############################################################################
PRP_IP4_CONFIG_ADDRESSES = 'Addresses'
PRP_IP4_CONFIG_ADDRESSDATA = 'AddressData'
PRP_IP4_CONFIG_GATEWAY = 'Gateway'
PRP_IP4_CONFIG_ROUTES = 'Routes'
PRP_IP4_CONFIG_ROUTEDATA = 'RouteData'
PRP_IP4_CONFIG_NAMESERVERS = 'Nameservers'
PRP_IP4_CONFIG_DOMAINS = 'Domains'
PRP_IP4_CONFIG_SEARCHES = 'Searches'
PRP_IP4_CONFIG_DNSOPTIONS = 'DnsOptions'
PRP_IP4_CONFIG_DNSPRIORITY = 'DnsPriority'
PRP_IP4_CONFIG_WINSSERVERS = 'WinsServers'
class IP4Config(ExportedObj):
path_counter_next = 1
path_prefix = "/org/freedesktop/NetworkManager/IP4Config/"
def __init__(self, generate_seed = _DEFAULT_ARG):
ExportedObj.__init__(self, ExportedObj.create_path(IP4Config))
if generate_seed == _DEFAULT_ARG:
generate_seed = self.path
props = self._props_generate(generate_seed)
self.dbus_interface_add(IFACE_IP4_CONFIG, props, IP4Config.PropertiesChanged)
self.export()
def _props_generate(self, generate_seed):
seed = Util.RandomSeed.wrap(generate_seed)
gateway = None
if seed:
if Util.random_bool(seed):
gateway = Util.random_ip(seed, net = '192.168.0.0/16')[0]
addrs = []
if seed:
for n in range(0, Util.random_int(seed, 4)):
a = {
'addr': Util.random_ip(seed, net = '192.168.0.0/16')[0],
'prefix': Util.random_int(seed, 17, 32),
'gateway': gateway if n == 0 else None,
}
addrs.append(a)
routes = []
if seed:
for n in range(0, Util.random_int(seed, 4)):
a = {
'dest': Util.random_ip(seed, net = '192.168.0.0/16')[0],
'prefix': Util.random_int(seed, 17, 32),
'next-hop': None if (Util.random_int(seed) % 3 == 0) else Util.random_ip(seed, net = '192.168.0.0/16')[0],
'metric': -1 if (Util.random_int(seed) % 3 == 0) else Util.random_int(seed, 0, 0xFFFFFFFF),
}
routes.append(a)
nameservers = []
if seed:
nameservers = list([Util.random_ip(seed, net = '192.168.0.0/16')[0] for x in range(Util.random_int(seed, 4))])
names_selection = ['foo1.bar', 'foo2.bar', 'foo3.bar', 'foo4.bar', 'fo.o.bar', 'fo.x.y'];
domains = []
if seed:
domains = Util.random_subset(seed, ['dom4.' + s for s in names_selection])
searches = []
if seed:
domains = Util.random_subset(seed, ['sear4.' + s for s in names_selection])
dnsoptions = []
if seed:
dnsoptions = Util.random_subset(seed, ['dns4-opt1', 'dns4-opt2', 'dns4-opt3', 'dns4-opt4'])
dnspriority = 0
if seed:
dnspriority = Util.random_int(seed, -10000, 10000)
winsservers = []
if seed:
winsservers = list([Util.random_ip(seed, net = '192.168.0.0/16')[0] for x in range(Util.random_int(seed, 4))])
return {
PRP_IP4_CONFIG_ADDRESSES: dbus.Array([
[ Util.ip4_addr_ne32(a['addr']),
a['prefix'],
Util.ip4_addr_ne32(a['gateway']) if a['gateway'] else 0
] for a in addrs
],
'au'),
PRP_IP4_CONFIG_ADDRESSDATA: dbus.Array([
dbus.Dictionary(collections.OrderedDict( [ ('address', dbus.String(a['addr'])),
('prefix', dbus.UInt32(a['prefix']))] + \
([ ('gateway', dbus.String(a['gateway'])) ] if a['gateway'] else [])),
'sv')
for a in addrs
],
'a{sv}'),
PRP_IP4_CONFIG_GATEWAY: dbus.String(gateway) if gateway else "",
PRP_IP4_CONFIG_ROUTES: dbus.Array([
[ Util.ip4_addr_ne32(a['dest']),
a['prefix'],
Util.ip4_addr_ne32(a['next-hop'] or '0.0.0.0'),
max(a['metric'], 0)
] for a in routes
],
'au'),
PRP_IP4_CONFIG_ROUTEDATA: dbus.Array([
dbus.Dictionary(collections.OrderedDict( [ ('dest', dbus.String(a['dest'])),
('prefix', dbus.UInt32(a['prefix']))] + \
([ ('next-hop', dbus.String(a['next-hop'])) ] if a['next-hop'] else []) + \
([ ('metric', dbus.UInt32(a['metric'])) ] if a['metric'] != -1 else [])),
'sv')
for a in routes
],
'a{sv}'),
PRP_IP4_CONFIG_NAMESERVERS: dbus.Array([dbus.UInt32(Util.ip4_addr_ne32(n)) for n in nameservers], 'u'),
PRP_IP4_CONFIG_DOMAINS: dbus.Array(domains, 's'),
PRP_IP4_CONFIG_SEARCHES: dbus.Array(searches, 's'),
PRP_IP4_CONFIG_DNSOPTIONS: dbus.Array(dnsoptions, 's'),
PRP_IP4_CONFIG_DNSPRIORITY: dbus.Int32(dnspriority),
PRP_IP4_CONFIG_WINSSERVERS: dbus.Array([dbus.UInt32(Util.ip4_addr_ne32(n)) for n in winsservers], 'u'),
}
def props_regenerate(self, generate_seed):
props = self.generate_props(generate_seed)
for k,v in props.items():
self._dbus_property_set(IFACE_IP4_CONFIG, k, v)
@dbus.service.signal(IFACE_IP4_CONFIG, signature='a{sv}')
def PropertiesChanged(self, path):
pass
###############################################################################
PRP_IP6_CONFIG_ADDRESSES = "Addresses"
PRP_IP6_CONFIG_ADDRESSDATA = "AddressData"
PRP_IP6_CONFIG_GATEWAY = "Gateway"
PRP_IP6_CONFIG_ROUTES = "Routes"
PRP_IP6_CONFIG_ROUTEDATA = "RouteData"
PRP_IP6_CONFIG_NAMESERVERS = "Nameservers"
PRP_IP6_CONFIG_DOMAINS = "Domains"
PRP_IP6_CONFIG_SEARCHES = "Searches"
PRP_IP6_CONFIG_DNSOPTIONS = "DnsOptions"
PRP_IP6_CONFIG_DNSPRIORITY = "DnsPriority"
class IP6Config(ExportedObj):
path_counter_next = 1
path_prefix = "/org/freedesktop/NetworkManager/IP6Config/"
def __init__(self, generate_seed = _DEFAULT_ARG):
ExportedObj.__init__(self, ExportedObj.create_path(IP6Config))
if generate_seed == _DEFAULT_ARG:
generate_seed = self.path
props = self._props_generate(generate_seed)
self.dbus_interface_add(IFACE_IP6_CONFIG, props, IP6Config.PropertiesChanged)
self.export()
def _props_generate(self, generate_seed):
seed = Util.RandomSeed.wrap(generate_seed)
gateway = None
if seed:
if Util.random_bool(seed):
gateway = Util.random_ip(seed, net = '2001:a::/64')[0]
addrs = []
if seed:
for n in range(0, Util.random_int(seed, 4)):
a = {
'addr': Util.random_ip(seed, net = '2001:a::/64')[0],
'prefix': Util.random_int(seed, 65, 128),
'gateway': gateway if n == 0 else None,
}
addrs.append(a)
routes = []
if seed:
for n in range(0, Util.random_int(seed, 4)):
a = {
'dest': Util.random_ip(seed, net = '2001:a::/64')[0],
'prefix': Util.random_int(seed, 65, 128),
'next-hop': None if (Util.random_int(seed) % 3 == 0) else Util.random_ip(seed, net = '2001:a::/64')[0],
'metric': -1 if (Util.random_int(seed) % 3 == 0) else Util.random_int(seed, 0, 0xFFFFFFFF),
}
routes.append(a)
nameservers = []
if seed:
nameservers = list([Util.random_ip(seed, net = '2001:a::/64')[0] for x in range(Util.random_int(seed, 4))])
names_selection = ['foo1.bar', 'foo2.bar', 'foo3.bar', 'foo4.bar', 'fo.o.bar', 'fo.x.y'];
domains = []
if seed:
domains = Util.random_subset(seed, ['dom6.' + s for s in names_selection])
searches = []
if seed:
domains = Util.random_subset(seed, ['sear6.' + s for s in names_selection])
dnsoptions = []
if seed:
dnsoptions = Util.random_subset(seed, ['dns6-opt1', 'dns6-opt2', 'dns6-opt3', 'dns6-opt4'])
dnspriority = 0
if seed:
dnspriority = Util.random_int(seed, -10000, 10000)
return {
PRP_IP6_CONFIG_ADDRESSES: dbus.Array([
[ Util.ip6_addr_ay(a['addr']),
a['prefix'],
Util.ip6_addr_ay(a['gateway'] or '::')
] for a in addrs
],
'(ayuay)'),
PRP_IP6_CONFIG_ADDRESSDATA: dbus.Array([
dbus.Dictionary(collections.OrderedDict( [ ('address', dbus.String(a['addr'])),
('prefix', dbus.UInt32(a['prefix']))] + \
([ ('gateway', dbus.String(a['gateway'])) ] if a['gateway'] else [])),
'sv')
for a in addrs
],
'a{sv}'),
PRP_IP6_CONFIG_GATEWAY: dbus.String(gateway) if gateway else "",
PRP_IP6_CONFIG_ROUTES: dbus.Array([
[ Util.ip6_addr_ay(a['dest']),
a['prefix'],
Util.ip6_addr_ay(a['next-hop'] or '::'),
max(a['metric'], 0)
] for a in routes
],
'(ayuayu)'),
PRP_IP6_CONFIG_ROUTEDATA: dbus.Array([
dbus.Dictionary(collections.OrderedDict( [ ('dest', dbus.String(a['dest'])),
('prefix', dbus.UInt32(a['prefix']))] + \
([ ('next-hop', dbus.String(a['next-hop'])) ] if a['next-hop'] else []) + \
([ ('metric', dbus.UInt32(a['metric'])) ] if a['metric'] != -1 else [])),
'sv')
for a in routes
],
'a{sv}'),
PRP_IP6_CONFIG_NAMESERVERS: dbus.Array([Util.ip6_addr_ay(n) for n in nameservers], 'ay'),
PRP_IP6_CONFIG_DOMAINS: dbus.Array(domains, 's'),
PRP_IP6_CONFIG_SEARCHES: dbus.Array(searches, 's'),
PRP_IP6_CONFIG_DNSOPTIONS: dbus.Array(dnsoptions, 's'),
PRP_IP6_CONFIG_DNSPRIORITY: dbus.Int32(dnspriority),
}
def props_regenerate(self, generate_seed):
props = self.generate_props(generate_seed)
for k,v in props.items():
self._dbus_property_set(IFACE_IP6_CONFIG, k, v)
@dbus.service.signal(IFACE_IP6_CONFIG, signature='a{sv}')
def PropertiesChanged(self, path):
pass
###############################################################################
PRP_DHCP4_CONFIG_OPTIONS = 'Options'
class Dhcp4Config(ExportedObj):
path_counter_next = 1
path_prefix = "/org/freedesktop/NetworkManager/DHCP4Config/"
def __init__(self, generate_seed = _DEFAULT_ARG):
ExportedObj.__init__(self, ExportedObj.create_path(Dhcp4Config))
if generate_seed == _DEFAULT_ARG:
generate_seed = self.path
props = self._props_generate(generate_seed)
self.dbus_interface_add(IFACE_DHCP4_CONFIG, props, Dhcp4Config.PropertiesChanged)
self.export()
def _props_generate(self, generate_seed):
seed = Util.RandomSeed.wrap(generate_seed)
options = []
if seed:
options = Util.random_subset(seed, [('dhcp-4-opt-' + str(i), 'val-' + str(i)) for i in range(10)])
return {
PRP_DHCP4_CONFIG_OPTIONS: dbus.Dictionary(collections.OrderedDict(options),
'sv')
}
def props_regenerate(self, generate_seed):
props = self.generate_props(generate_seed)
for k,v in props.items():
self._dbus_property_set(IFACE_DHCP4_CONFIG, k, v)
@dbus.service.signal(IFACE_DHCP4_CONFIG, signature='a{sv}')
def PropertiesChanged(self, path):
pass
###############################################################################
PRP_DHCP6_CONFIG_OPTIONS = 'Options'
class Dhcp6Config(ExportedObj):
path_counter_next = 1
path_prefix = "/org/freedesktop/NetworkManager/DHCP6Config/"
def __init__(self, generate_seed = _DEFAULT_ARG):
ExportedObj.__init__(self, ExportedObj.create_path(Dhcp6Config))
if generate_seed == _DEFAULT_ARG:
generate_seed = self.path
props = self._props_generate(generate_seed)
self.dbus_interface_add(IFACE_DHCP6_CONFIG, props, Dhcp6Config.PropertiesChanged)
self.export()
def _props_generate(self, generate_seed):
seed = Util.RandomSeed.wrap(generate_seed)
options = []
if seed:
options = Util.random_subset(seed, [('dhcp-6-opt-' + str(i), 'val-' + str(i)) for i in range(10)])
return {
PRP_DHCP4_CONFIG_OPTIONS: dbus.Dictionary(collections.OrderedDict(options),
'sv')
}
def props_regenerate(self, generate_seed):
props = self.generate_props(generate_seed)
for k,v in props.items():
self._dbus_property_set(IFACE_DHCP6_CONFIG, k, v)
@dbus.service.signal(IFACE_DHCP6_CONFIG, signature='a{sv}')
def PropertiesChanged(self, path):
pass
###############################################################################
PRP_DNS_MANAGER_MODE = 'Mode'
PRP_DNS_MANAGER_RC_MANAGER = 'RcManager'
PRP_DNS_MANAGER_CONFIGURATION = 'Configuration'