tests: Add test suite
IIO devices aren't supported by umockdev, so make do with supporting hwmon devices, as well as our 2 fake light and compass drivers.
This commit is contained in:
@@ -61,3 +61,5 @@ if get_option('gtk_doc')
|
||||
)
|
||||
subdir('docs')
|
||||
endif
|
||||
|
||||
subdir('tests')
|
||||
|
337
tests/integration-test
Executable file
337
tests/integration-test
Executable file
@@ -0,0 +1,337 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
# iio-sensor-proxy integration test suite
|
||||
#
|
||||
# Run in built tree to test local built binaries, or from anywhere else to test
|
||||
# system installed binaries.
|
||||
#
|
||||
# Copyright: (C) 2011 Martin Pitt <martin.pitt@ubuntu.com>
|
||||
# (C) 2021 Bastien Nocera <hadess@hadess.net>
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import dbus
|
||||
import tempfile
|
||||
import subprocess
|
||||
import unittest
|
||||
import time
|
||||
|
||||
try:
|
||||
import gi
|
||||
from gi.repository import GLib
|
||||
from gi.repository import Gio
|
||||
except ImportError as e:
|
||||
sys.stderr.write('Skipping tests, PyGobject not available for Python 3, or missing GI typelibs: %s\n' % str(e))
|
||||
sys.exit(0)
|
||||
|
||||
try:
|
||||
gi.require_version('UMockdev', '1.0')
|
||||
from gi.repository import UMockdev
|
||||
except ImportError:
|
||||
sys.stderr.write('Skipping tests, umockdev not available (https://github.com/martinpitt/umockdev)\n')
|
||||
sys.exit(0)
|
||||
|
||||
try:
|
||||
import dbusmock
|
||||
except ImportError:
|
||||
sys.stderr.write('Skipping tests, python-dbusmock not available (http://pypi.python.org/pypi/python-dbusmock).\n')
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
SP = 'net.hadess.SensorProxy'
|
||||
SP_PATH = '/net/hadess/SensorProxy'
|
||||
SP_COMPASS = 'net.hadess.SensorProxy.Compass'
|
||||
SP_COMPASS_PATH = '/net/hadess/SensorProxy/Compass'
|
||||
|
||||
class Tests(dbusmock.DBusTestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# run from local build tree if we are in one, otherwise use system instance
|
||||
builddir = os.getenv('top_builddir', '.')
|
||||
if os.access(os.path.join(builddir, 'src', 'iio-sensor-proxy'), os.X_OK):
|
||||
cls.daemon_path = os.path.join(builddir, 'src', 'iio-sensor-proxy')
|
||||
print('Testing binaries from local build tree (%s)' % cls.daemon_path)
|
||||
elif os.environ.get('UNDER_JHBUILD', False):
|
||||
jhbuild_prefix = os.environ['JHBUILD_PREFIX']
|
||||
cls.daemon_path = os.path.join(jhbuild_prefix, 'libexec', 'iio-sensor-proxy')
|
||||
print('Testing binaries from JHBuild (%s)' % cls.daemon_path)
|
||||
else:
|
||||
cls.daemon_path = None
|
||||
with open('/usr/lib/systemd/system/iio-sensor-proxy.service') as f:
|
||||
for line in f:
|
||||
if line.startswith('ExecStart='):
|
||||
cls.daemon_path = line.split('=', 1)[1].strip()
|
||||
break
|
||||
assert cls.daemon_path, 'could not determine daemon path from systemd .service file'
|
||||
print('Testing installed system binary (%s)' % cls.daemon_path)
|
||||
|
||||
# fail on CRITICALs on client and server side
|
||||
GLib.log_set_always_fatal(GLib.LogLevelFlags.LEVEL_WARNING |
|
||||
GLib.LogLevelFlags.LEVEL_ERROR |
|
||||
GLib.LogLevelFlags.LEVEL_CRITICAL)
|
||||
os.environ['G_DEBUG'] = 'fatal_warnings'
|
||||
|
||||
# set up a fake system D-BUS
|
||||
cls.test_bus = Gio.TestDBus.new(Gio.TestDBusFlags.NONE)
|
||||
cls.test_bus.up()
|
||||
try:
|
||||
del os.environ['DBUS_SESSION_BUS_ADDRESS']
|
||||
except KeyError:
|
||||
pass
|
||||
os.environ['DBUS_SYSTEM_BUS_ADDRESS'] = cls.test_bus.get_bus_address()
|
||||
|
||||
cls.dbus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
|
||||
cls.dbus_con = cls.get_dbus(True)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
cls.test_bus.down()
|
||||
dbusmock.DBusTestCase.tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
'''Set up a local umockdev testbed.
|
||||
|
||||
The testbed is initially empty.
|
||||
'''
|
||||
self.testbed = UMockdev.Testbed.new()
|
||||
|
||||
self.proxy = None
|
||||
self.log = None
|
||||
self.daemon = None
|
||||
|
||||
def tearDown(self):
|
||||
del self.testbed
|
||||
self.stop_daemon()
|
||||
|
||||
# on failures, print daemon log
|
||||
errors = [x[1] for x in self._outcome.errors if x[1]]
|
||||
if errors and self.log:
|
||||
with open(self.log.name) as f:
|
||||
sys.stderr.write('\n-------------- daemon log: ----------------\n')
|
||||
sys.stderr.write(f.read())
|
||||
sys.stderr.write('------------------------------\n')
|
||||
|
||||
#
|
||||
# Daemon control and D-BUS I/O
|
||||
#
|
||||
|
||||
def start_daemon(self, env = None):
|
||||
'''Start daemon and create DBus proxy.
|
||||
|
||||
When done, this sets self.proxy as the Gio.DBusProxy for power-profiles-daemon.
|
||||
'''
|
||||
if not env:
|
||||
env = os.environ.copy()
|
||||
env['G_DEBUG'] = 'fatal-criticals'
|
||||
env['G_MESSAGES_DEBUG'] = 'all'
|
||||
env['UMOCKDEV_DEBUG'] = 'all'
|
||||
# note: Python doesn't propagate the setenv from Testbed.new(), so we
|
||||
# have to do that ourselves
|
||||
env['UMOCKDEV_DIR'] = self.testbed.get_root_dir()
|
||||
self.log = tempfile.NamedTemporaryFile()
|
||||
if os.getenv('VALGRIND') != None:
|
||||
daemon_path = ['valgrind', self.daemon_path, '-v']
|
||||
else:
|
||||
daemon_path = [self.daemon_path, '-v']
|
||||
|
||||
self.daemon = subprocess.Popen(daemon_path,
|
||||
env=env, stdout=self.log,
|
||||
stderr=subprocess.STDOUT)
|
||||
|
||||
# wait until the daemon gets online
|
||||
timeout = 100
|
||||
while timeout > 0:
|
||||
time.sleep(0.1)
|
||||
timeout -= 1
|
||||
try:
|
||||
self.get_dbus_property('HasAccelerometer')
|
||||
break
|
||||
except GLib.GError:
|
||||
pass
|
||||
else:
|
||||
self.fail('daemon did not start in 10 seconds')
|
||||
|
||||
self.proxy = Gio.DBusProxy.new_sync(
|
||||
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, SP,
|
||||
SP_PATH, SP, None)
|
||||
|
||||
self.assertEqual(self.daemon.poll(), None, 'daemon crashed')
|
||||
|
||||
def stop_daemon(self):
|
||||
'''Stop the daemon if it is running.'''
|
||||
|
||||
if self.daemon:
|
||||
try:
|
||||
self.daemon.kill()
|
||||
except OSError:
|
||||
pass
|
||||
self.daemon.wait()
|
||||
self.daemon = None
|
||||
self.proxy = None
|
||||
|
||||
def get_dbus_property(self, name):
|
||||
'''Get property value from daemon D-Bus interface.'''
|
||||
|
||||
proxy = Gio.DBusProxy.new_sync(
|
||||
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, SP,
|
||||
SP_PATH, 'org.freedesktop.DBus.Properties', None)
|
||||
return proxy.Get('(ss)', SP, name)
|
||||
|
||||
def get_compass_dbus_property(self, name):
|
||||
'''Get property value from daemon compass D-Bus interface.'''
|
||||
|
||||
proxy = Gio.DBusProxy.new_sync(
|
||||
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, SP,
|
||||
SP_COMPASS_PATH, 'org.freedesktop.DBus.Properties', None)
|
||||
return proxy.Get('(ss)', SP_COMPASS, name)
|
||||
|
||||
def have_text_in_log(self, text):
|
||||
return self.count_text_in_log(text) > 0
|
||||
|
||||
def count_text_in_log(self, text):
|
||||
with open(self.log.name) as f:
|
||||
return f.read().count(text)
|
||||
|
||||
def read_sysfs_attr(self, device, attribute):
|
||||
with open(os.path.join(self.testbed.get_root_dir() + device, attribute), 'rb') as f:
|
||||
return f.read()
|
||||
return None
|
||||
|
||||
def read_file(self, path):
|
||||
with open(path, 'rb') as f:
|
||||
return f.read()
|
||||
return None
|
||||
|
||||
def assertEventually(self, condition, message=None, timeout=50):
|
||||
'''Assert that condition function eventually returns True.
|
||||
|
||||
Timeout is in deciseconds, defaulting to 50 (5 seconds). message is
|
||||
printed on failure.
|
||||
'''
|
||||
while timeout >= 0:
|
||||
context = GLib.MainContext.default()
|
||||
while context.iteration(False):
|
||||
pass
|
||||
if condition():
|
||||
break
|
||||
timeout -= 1
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
self.fail(message or 'timed out waiting for ' + str(condition))
|
||||
|
||||
#
|
||||
# Actual test cases
|
||||
#
|
||||
|
||||
def test_hwmon_light(self):
|
||||
'''hwmon light'''
|
||||
hwmon = self.testbed.add_device('platform', 'hwmon-als', None,
|
||||
['light', '(128,128)'],
|
||||
['IIO_SENSOR_PROXY_TYPE', 'hwmon-als']
|
||||
)
|
||||
self.start_daemon()
|
||||
self.assertEqual(self.get_dbus_property('HasAmbientLight'), True)
|
||||
self.assertEqual(self.get_dbus_property('HasAccelerometer'), False)
|
||||
|
||||
# Default values
|
||||
self.assertEqual(self.get_dbus_property('LightLevelUnit'), 'lux')
|
||||
self.assertEqual(self.get_dbus_property('LightLevel'), 0)
|
||||
|
||||
self.proxy.ClaimLight()
|
||||
self.assertEventually(lambda: int(self.get_dbus_property('LightLevel')) == 50)
|
||||
self.assertEqual(self.get_dbus_property('LightLevelUnit'), 'vendor')
|
||||
|
||||
self.testbed.set_attribute(hwmon, 'light', '(255,255)')
|
||||
# DEFAULT_POLL_TIME is 8000
|
||||
time.sleep(8)
|
||||
self.assertEventually(lambda: self.get_dbus_property('LightLevel') == 100)
|
||||
|
||||
# process = subprocess.Popen(['gdbus', 'introspect', '--system', '--dest', 'net.hadess.SensorProxy', '--object-path', '/net/hadess/SensorProxy'])
|
||||
# print (self.get_dbus_property('Foo'))
|
||||
|
||||
self.stop_daemon()
|
||||
|
||||
def test_fake_light(self):
|
||||
'''fake light'''
|
||||
hwmon = self.testbed.add_device('input', 'fake-light', None,
|
||||
[],
|
||||
['NAME', '"Power Button"']
|
||||
)
|
||||
env = os.environ.copy()
|
||||
env['FAKE_LIGHT_SENSOR'] = '1'
|
||||
self.start_daemon(env)
|
||||
self.assertEqual(self.get_dbus_property('HasAmbientLight'), True)
|
||||
self.assertEqual(self.get_dbus_property('HasAccelerometer'), False)
|
||||
|
||||
# Default values
|
||||
self.assertEqual(self.get_dbus_property('LightLevelUnit'), 'lux')
|
||||
self.assertEqual(self.get_dbus_property('LightLevel'), 0)
|
||||
|
||||
self.proxy.ClaimLight()
|
||||
self.assertEventually(lambda: int(self.get_dbus_property('LightLevel')) == 1.0)
|
||||
self.assertEqual(self.get_dbus_property('LightLevelUnit'), 'lux')
|
||||
|
||||
self.assertEventually(lambda: int(self.get_dbus_property('LightLevel')) == 2.0)
|
||||
self.assertEqual(self.get_dbus_property('LightLevelUnit'), 'lux')
|
||||
|
||||
# process = subprocess.Popen(['gdbus', 'introspect', '--system', '--dest', 'net.hadess.SensorProxy', '--object-path', '/net/hadess/SensorProxy'])
|
||||
# print (self.get_dbus_property('Foo'))
|
||||
|
||||
self.stop_daemon()
|
||||
|
||||
def test_fake_compass(self):
|
||||
'''fake compass'''
|
||||
hwmon = self.testbed.add_device('input', 'fake-compass', None,
|
||||
[],
|
||||
['NAME', '"Power Button"']
|
||||
)
|
||||
env = os.environ.copy()
|
||||
env['FAKE_COMPASS'] = '1'
|
||||
self.start_daemon(env)
|
||||
self.assertEqual(self.get_dbus_property('HasAmbientLight'), False)
|
||||
self.assertEqual(self.get_compass_dbus_property('HasCompass'), True)
|
||||
|
||||
# Default values
|
||||
compass_proxy = Gio.DBusProxy.new_sync(
|
||||
self.dbus, Gio.DBusProxyFlags.DO_NOT_AUTO_START, None, SP,
|
||||
SP_COMPASS_PATH, SP_COMPASS, None)
|
||||
compass_proxy.ClaimCompass()
|
||||
process = subprocess.Popen(['gdbus', 'introspect', '--system', '--dest', 'net.hadess.SensorProxy', '--object-path', '/net/hadess/SensorProxy/Compass'])
|
||||
|
||||
self.assertEventually(lambda: int(self.get_compass_dbus_property('CompassHeading')) == 10)
|
||||
self.assertEventually(lambda: int(self.get_compass_dbus_property('CompassHeading')) == 20)
|
||||
|
||||
# print (self.get_dbus_property('Foo'))
|
||||
|
||||
self.stop_daemon()
|
||||
|
||||
#
|
||||
# Helper methods
|
||||
#
|
||||
|
||||
@classmethod
|
||||
def _props_to_str(cls, properties):
|
||||
'''Convert a properties dictionary to uevent text representation.'''
|
||||
|
||||
prop_str = ''
|
||||
if properties:
|
||||
for k, v in properties.items():
|
||||
prop_str += '%s=%s\n' % (k, v)
|
||||
return prop_str
|
||||
|
||||
if __name__ == '__main__':
|
||||
# run ourselves under umockdev
|
||||
if 'umockdev' not in os.environ.get('LD_PRELOAD', ''):
|
||||
os.execvp('umockdev-wrapper', ['umockdev-wrapper'] + sys.argv)
|
||||
|
||||
unittest.main()
|
10
tests/meson.build
Normal file
10
tests/meson.build
Normal file
@@ -0,0 +1,10 @@
|
||||
integration_test = find_program('integration-test')
|
||||
|
||||
envs = environment()
|
||||
envs.set ('top_builddir', meson.build_root())
|
||||
envs.set ('top_srcdir', meson.source_root())
|
||||
|
||||
test('iio-sensor-proxy-integration-test',
|
||||
integration_test,
|
||||
env: envs
|
||||
)
|
Reference in New Issue
Block a user