shared: add nm_utils_g_main_context_create_integrate_source() for integrating a GMainContext in another

We will rework NMClient entirely. Then, the synchronous initialization will also use
the asynchronous code paths. The difference will be that with synchronous initialization,
all D-Bus interaction will be done with an internal GMainContext as current thread default,
and that internal context will run until initialization completes.

Note that even after initialization completes, it cannot be swapped back to the user's
(outer) GMainContext. That is because contexts are essentially the queue for our
D-Bus events, and we cannot swap from one queue to the other in a race
free manner (or a full resync). In other words, the two contexts are not in sync,
so after using the internal context NMClient needs to stick to that (at least, until
the name owner gets lost, which gives an opportunity to resync and switch back to the
user's main context).

We thus need to hook the internal (inner) GMainContext with the user's (outer) context,
so when the user iterates the outer context, events on the inner context get dispatched.

Add nm_utils_g_main_context_create_integrate_source() to create such a GSource for
integrating two contexts.

Note that the use-case here is limited: the integrated, inner main context must
not be explicitly iterated except from being dispatched by the integrating
source. Otherwise, you'd get recursive runs, possible deadlocks and general
ugliness. NMClient must show restrain how to use the inner context while it is
integrated.
This commit is contained in:
Thomas Haller
2019-11-02 16:55:43 +01:00
parent 45c9f3c39b
commit 4fad8c7c64
3 changed files with 562 additions and 0 deletions

View File

@@ -7,6 +7,11 @@
#include "nm-default.h" #include "nm-default.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <glib-unix.h>
#include "nm-std-aux/c-list-util.h" #include "nm-std-aux/c-list-util.h"
#include "nm-glib-aux/nm-enum-utils.h" #include "nm-glib-aux/nm-enum-utils.h"
@@ -8103,6 +8108,191 @@ test_ethtool_offload (void)
/*****************************************************************************/ /*****************************************************************************/
typedef struct {
GMainLoop *loop1;
GMainContext *c2;
GSource *extra_sources[2];
bool got_signal[5];
int fd_2;
} IntegData;
static gboolean
_test_integrate_cb_handle (IntegData *d, int signal)
{
int i;
g_assert (d);
g_assert (signal >= 0);
g_assert (signal < G_N_ELEMENTS (d->got_signal));
g_assert (!d->got_signal[signal]);
d->got_signal[signal] = TRUE;
for (i = 0; i < G_N_ELEMENTS (d->got_signal); i++) {
if (!d->got_signal[i])
break;
}
if (i == G_N_ELEMENTS (d->got_signal))
g_main_loop_quit (d->loop1);
return G_SOURCE_REMOVE;
}
static gboolean
_test_integrate_cb_timeout_1 (gpointer user_data)
{
return _test_integrate_cb_handle (user_data, 0);
}
static gboolean
_test_integrate_cb_fd_2 (int fd,
GIOCondition condition,
gpointer user_data)
{
IntegData *d = user_data;
g_assert (d->got_signal[1]);
g_assert (d->got_signal[2]);
g_assert (d->got_signal[3]);
g_assert (d->extra_sources[0]);
g_assert (d->extra_sources[1]);
return _test_integrate_cb_handle (d, 4);
}
static gboolean
_test_integrate_cb_idle_2 (gpointer user_data)
{
IntegData *d = user_data;
GSource *extra_source;
g_assert (d->got_signal[1]);
g_assert (d->got_signal[2]);
g_assert (d->extra_sources[0]);
g_assert (!d->extra_sources[1]);
extra_source = g_unix_fd_source_new (d->fd_2, G_IO_IN);
g_source_set_callback (extra_source, G_SOURCE_FUNC (_test_integrate_cb_fd_2), d, NULL);
g_source_attach (extra_source, d->c2);
d->extra_sources[1] = extra_source;
return _test_integrate_cb_handle (d, 3);
}
static gboolean
_test_integrate_cb_idle_1 (gpointer user_data)
{
IntegData *d = user_data;
GSource *extra_source;
g_assert (d->got_signal[2]);
g_assert (!d->extra_sources[0]);
extra_source = g_idle_source_new ();
g_source_set_callback (extra_source, _test_integrate_cb_idle_2, d, NULL);
g_source_attach (extra_source, d->c2);
d->extra_sources[0] = extra_source;
return _test_integrate_cb_handle (d, 1);
}
static gboolean
_test_integrate_cb_fd_1 (int fd,
GIOCondition condition,
gpointer user_data)
{
IntegData *d = user_data;
g_assert (!d->got_signal[1]);
return _test_integrate_cb_handle (d, 2);
}
static gboolean
_test_integrate_maincontext_cb_idle1 (gpointer user_data)
{
guint32 *p_count = user_data;
g_assert (*p_count < 5);
(*p_count)++;
return G_SOURCE_CONTINUE;
}
static void
test_integrate_maincontext (gconstpointer test_data)
{
const guint TEST_IDX = GPOINTER_TO_UINT (test_data);
GMainContext *c1 = g_main_context_default ();
nm_auto_unref_gmaincontext GMainContext *c2 = g_main_context_new ();
nm_auto_destroy_and_unref_gsource GSource *integ_source = NULL;
integ_source = nm_utils_g_main_context_create_integrate_source (c2);
g_source_attach (integ_source, c1);
if (TEST_IDX == 1) {
nm_auto_destroy_and_unref_gsource GSource *idle_source_1 = NULL;
guint32 count = 0;
idle_source_1 = g_idle_source_new ();
g_source_set_callback (idle_source_1, _test_integrate_maincontext_cb_idle1, &count, NULL);
g_source_attach (idle_source_1, c2);
nmtst_main_context_iterate_until (c1, 2000, count == 5);
}
if (TEST_IDX == 2) {
nm_auto_destroy_and_unref_gsource GSource *main_timeout_source = NULL;
nm_auto_destroy_and_unref_gsource GSource *timeout_source_1 = NULL;
nm_auto_destroy_and_unref_gsource GSource *idle_source_1 = NULL;
nm_auto_destroy_and_unref_gsource GSource *fd_source_1 = NULL;
nm_auto_unref_gmainloop GMainLoop *loop1 = NULL;
nm_auto_close int fd_1 = -1;
nm_auto_close int fd_2 = -1;
IntegData d;
int i;
main_timeout_source = g_timeout_source_new (3000);
g_source_set_callback (main_timeout_source, nmtst_g_source_assert_not_called, NULL, NULL);
g_source_attach (main_timeout_source, c1);
loop1 = g_main_loop_new (c1, FALSE);
d = (IntegData) {
.loop1 = loop1,
.c2 = c2,
};
fd_1 = open ("/dev/null", O_RDONLY | O_CLOEXEC);
g_assert (fd_1 >= 0);
fd_source_1 = g_unix_fd_source_new (fd_1, G_IO_IN);
g_source_set_callback (fd_source_1, G_SOURCE_FUNC (_test_integrate_cb_fd_1), &d, NULL);
g_source_attach (fd_source_1, c2);
fd_2 = open ("/dev/null", O_RDONLY | O_CLOEXEC);
g_assert (fd_2 >= 0);
d.fd_2 = fd_2;
idle_source_1 = g_idle_source_new ();
g_source_set_callback (idle_source_1, _test_integrate_cb_idle_1, &d, NULL);
g_source_attach (idle_source_1, c2);
timeout_source_1 = g_timeout_source_new (5);
g_source_set_callback (timeout_source_1, _test_integrate_cb_timeout_1, &d, NULL);
g_source_attach (timeout_source_1, c2);
g_main_loop_run (loop1);
for (i = 0; i < G_N_ELEMENTS (d.extra_sources); i++) {
g_assert (d.extra_sources[i]);
nm_clear_pointer (&d.extra_sources[i], nm_g_source_destroy_and_unref);
}
}
}
/*****************************************************************************/
NMTST_DEFINE (); NMTST_DEFINE ();
int main (int argc, char **argv) int main (int argc, char **argv)
@@ -8263,5 +8453,8 @@ int main (int argc, char **argv)
g_test_add_func ("/core/general/test_nm_va_args_macros", test_nm_va_args_macros); g_test_add_func ("/core/general/test_nm_va_args_macros", test_nm_va_args_macros);
g_test_add_func ("/core/general/test_ethtool_offload", test_ethtool_offload); g_test_add_func ("/core/general/test_ethtool_offload", test_ethtool_offload);
g_test_add_data_func ("/core/general/test_integrate_maincontext/1", GUINT_TO_POINTER (1), test_integrate_maincontext);
g_test_add_data_func ("/core/general/test_integrate_maincontext/2", GUINT_TO_POINTER (2), test_integrate_maincontext);
return g_test_run (); return g_test_run ();
} }

View File

@@ -3535,3 +3535,368 @@ nm_g_unix_signal_source_new (int signum,
g_source_set_callback (source, handler, user_data, notify); g_source_set_callback (source, handler, user_data, notify);
return source; return source;
} }
/*****************************************************************************/
#define _CTX_LOG(fmt, ...) \
G_STMT_START { \
if (FALSE) { \
gint64 _ts = g_get_monotonic_time () / 100; \
\
g_printerr (">>>> [%"G_GINT64_FORMAT".%05"G_GINT64_FORMAT"] [src:%p]: " fmt "\n", \
_ts / 10000, \
_ts % 10000, \
(ctx_src), \
##__VA_ARGS__); \
} \
} G_STMT_END
typedef struct {
int fd;
guint events;
guint registered_events;
union {
int one;
int *many;
} idx;
gpointer tag;
bool stale:1;
bool has_many_idx:1;
} PollData;
typedef struct {
GSource source;
GMainContext *context;
GHashTable *fds;
GPollFD *fds_arr;
int fds_len;
int max_priority;
bool acquired:1;
} CtxIntegSource;
static void
_poll_data_free (gpointer user_data)
{
PollData *poll_data = user_data;
if (poll_data->has_many_idx)
g_free (poll_data->idx.many);
nm_g_slice_free (poll_data);
}
static void
_ctx_integ_source_reacquire (CtxIntegSource *ctx_src)
{
if (G_LIKELY ( ctx_src->acquired
&& g_main_context_is_owner (ctx_src->context)))
return;
/* the parent context now iterates on a different thread.
* We need to release and reacquire the inner context. */
if (ctx_src->acquired)
g_main_context_release (ctx_src->context);
if (G_UNLIKELY (!g_main_context_acquire (ctx_src->context))) {
/* Nobody is supposed to reacquire the context while we use it. This is a bug
* of the user. */
ctx_src->acquired = FALSE;
g_return_if_reached ();
}
ctx_src->acquired = TRUE;
}
static gboolean
_ctx_integ_source_prepare (GSource *source,
int *out_timeout)
{
CtxIntegSource *ctx_src = ((CtxIntegSource *) source);
int max_priority;
int timeout = -1;
gboolean any_ready;
int fds_allocated;
int fds_len_old;
gs_free GPollFD *fds_arr_old = NULL;
GHashTableIter h_iter;
PollData *poll_data;
gboolean fds_changed;
int i;
_CTX_LOG ("prepare...");
_ctx_integ_source_reacquire (ctx_src);
any_ready = g_main_context_prepare (ctx_src->context, &max_priority);
fds_arr_old = g_steal_pointer (&ctx_src->fds_arr);
fds_len_old = ctx_src->fds_len;
fds_allocated = NM_MAX (1, fds_len_old); /* there is at least the wakeup's FD */
ctx_src->fds_arr = g_new (GPollFD, fds_allocated);
while ((ctx_src->fds_len = g_main_context_query (ctx_src->context,
max_priority,
&timeout,
ctx_src->fds_arr,
fds_allocated)) > fds_allocated) {
fds_allocated = ctx_src->fds_len;
g_free (ctx_src->fds_arr);
ctx_src->fds_arr = g_new (GPollFD, fds_allocated);
}
fds_changed = FALSE;
if (fds_len_old != ctx_src->fds_len)
fds_changed = TRUE;
else {
for (i = 0; i < ctx_src->fds_len; i++) {
if ( fds_arr_old[i].fd != ctx_src->fds_arr[i].fd
|| fds_arr_old[i].events != ctx_src->fds_arr[i].events) {
fds_changed = TRUE;
break;
}
}
}
if (G_UNLIKELY (fds_changed)) {
g_hash_table_iter_init (&h_iter, ctx_src->fds);
while (g_hash_table_iter_next (&h_iter, (gpointer *) &poll_data, NULL))
poll_data->stale = TRUE;
for (i = 0; i < ctx_src->fds_len; i++) {
const GPollFD *fd = &ctx_src->fds_arr[i];
poll_data = g_hash_table_lookup (ctx_src->fds, &fd->fd);
if (G_UNLIKELY (!poll_data)) {
poll_data = g_slice_new (PollData);
*poll_data = (PollData) {
.fd = fd->fd,
.idx.one = i,
.has_many_idx = FALSE,
.events = fd->events,
.registered_events = 0,
.tag = NULL,
.stale = FALSE,
};
g_hash_table_add (ctx_src->fds, poll_data);
nm_assert (poll_data == g_hash_table_lookup (ctx_src->fds, &fd->fd));
continue;
}
if (G_LIKELY (poll_data->stale)) {
if (poll_data->has_many_idx) {
g_free (poll_data->idx.many);
poll_data->has_many_idx = FALSE;
}
poll_data->events = fd->events;
poll_data->idx.one = i;
poll_data->stale = FALSE;
continue;
}
/* How odd. We have duplicate FDs. In fact, currently g_main_context_query() always
* coalesces the FDs and this cannot happen. However, that is not documented behavior,
* so we should not rely on that. So we need to keep a list of indexes... */
poll_data->events |= fd->events;
if (!poll_data->has_many_idx) {
int idx0;
idx0 = poll_data->idx.one;
poll_data->has_many_idx = TRUE;
poll_data->idx.many = g_new (int, 4);
poll_data->idx.many[0] = 2; /* number allocated */
poll_data->idx.many[1] = 2; /* number used */
poll_data->idx.many[2] = idx0;
poll_data->idx.many[3] = i;
} else {
if (poll_data->idx.many[0] == poll_data->idx.many[1]) {
poll_data->idx.many[0] *= 2;
poll_data->idx.many = g_realloc (poll_data->idx.many, sizeof (int) * (2 + poll_data->idx.many[0]));
}
poll_data->idx.many[2 + poll_data->idx.many[1]] = i;
poll_data->idx.many[1]++;
}
}
g_hash_table_iter_init (&h_iter, ctx_src->fds);
while (g_hash_table_iter_next (&h_iter, (gpointer *) &poll_data, NULL)) {
if (poll_data->stale) {
nm_assert (poll_data->tag);
nm_assert (poll_data->events == poll_data->registered_events);
_CTX_LOG ("prepare: remove poll fd=%d, events=0x%x", poll_data->fd, poll_data->events);
g_source_remove_unix_fd (&ctx_src->source, poll_data->tag);
g_hash_table_iter_remove (&h_iter);
continue;
}
if (!poll_data->tag) {
_CTX_LOG ("prepare: add poll fd=%d, events=0x%x", poll_data->fd, poll_data->events);
poll_data->registered_events = poll_data->events;
poll_data->tag = g_source_add_unix_fd (&ctx_src->source, poll_data->fd, poll_data->registered_events);
continue;
}
if (poll_data->registered_events != poll_data->events) {
_CTX_LOG ("prepare: update poll fd=%d, events=0x%x", poll_data->fd, poll_data->events);
poll_data->registered_events = poll_data->events;
g_source_modify_unix_fd (&ctx_src->source, poll_data->tag, poll_data->registered_events);
}
}
}
NM_SET_OUT (out_timeout, timeout);
ctx_src->max_priority = max_priority;
_CTX_LOG ("prepare: done, any-ready=%d, timeout=%d, max-priority=%d", any_ready, timeout, max_priority);
/* we always need to poll, because we have some file descriptors. */
return FALSE;
}
static gboolean
_ctx_integ_source_check (GSource *source)
{
CtxIntegSource *ctx_src = ((CtxIntegSource *) source);
GHashTableIter h_iter;
gboolean some_ready;
PollData *poll_data;
nm_assert (ctx_src->context);
_CTX_LOG ("check");
_ctx_integ_source_reacquire (ctx_src);
g_hash_table_iter_init (&h_iter, ctx_src->fds);
while (g_hash_table_iter_next (&h_iter, (gpointer *) &poll_data, NULL)) {
guint revents;
revents = g_source_query_unix_fd (&ctx_src->source, poll_data->tag);
if (G_UNLIKELY (poll_data->has_many_idx)) {
int num = poll_data->idx.many[1];
int *p_idx = &poll_data->idx.many[2];
for (; num > 0; num--, p_idx++)
ctx_src->fds_arr[*p_idx].revents = revents;
} else
ctx_src->fds_arr[poll_data->idx.one].revents = revents;
}
some_ready = g_main_context_check (ctx_src->context,
ctx_src->max_priority,
ctx_src->fds_arr,
ctx_src->fds_len);
_CTX_LOG ("check (some-ready=%d)...", some_ready);
return some_ready;
}
static gboolean
_ctx_integ_source_dispatch (GSource *source,
GSourceFunc callback,
gpointer user_data)
{
CtxIntegSource *ctx_src = ((CtxIntegSource *) source);
nm_assert (ctx_src->context);
_ctx_integ_source_reacquire (ctx_src);
_CTX_LOG ("dispatch");
g_main_context_dispatch (ctx_src->context);
return G_SOURCE_CONTINUE;
}
static void
_ctx_integ_source_finalize (GSource *source)
{
CtxIntegSource *ctx_src = ((CtxIntegSource *) source);
GHashTableIter h_iter;
PollData *poll_data;
g_return_if_fail (ctx_src->context);
_CTX_LOG ("finalize...");
g_hash_table_iter_init (&h_iter, ctx_src->fds);
while (g_hash_table_iter_next (&h_iter, (gpointer *) &poll_data, NULL)) {
nm_assert (poll_data->tag);
_CTX_LOG ("prepare: remove poll fd=%d, events=0x%x", poll_data->fd, poll_data->events);
g_source_remove_unix_fd (&ctx_src->source, poll_data->tag);
g_hash_table_iter_remove (&h_iter);
}
nm_clear_pointer (&ctx_src->fds, g_hash_table_unref);
nm_clear_g_free (&ctx_src->fds_arr);
ctx_src->fds_len = 0;
if (ctx_src->acquired) {
ctx_src->acquired = FALSE;
g_main_context_release (ctx_src->context);
}
nm_clear_pointer (&ctx_src->context, g_main_context_unref);
}
static GSourceFuncs ctx_integ_source_funcs = {
.prepare = _ctx_integ_source_prepare,
.check = _ctx_integ_source_check,
.dispatch = _ctx_integ_source_dispatch,
.finalize = _ctx_integ_source_finalize,
};
/**
* nm_utils_g_main_context_create_integrate_source:
* @inner_context: the inner context that will be integrated to an
* outer #GMainContext.
*
* By integrating the inner context with an outer context, when iterating the outer
* context sources on the inner context will be dispatched. Note that while the
* created source exists, the @inner_context will be acquired. The user gets restricted
* what to do with the inner context. In particular while the inner context is integrated,
* the user should not acquire the inner context again or explicitly iterate it. What
* the user of course still can (and wants to) do is attaching new sources to the inner
* context.
*
* Note that GSource has a priority. While each context dispatches events based on
* their source's priorities, the outer context dispatches to the inner context
* only with one priority (the priority of the created source). That is, the sources
* from the two contexts are kept separate and are not sorted by their priorities.
*
* Returns: a newly created GSource that should be attached to the
* outer context.
*/
GSource *
nm_utils_g_main_context_create_integrate_source (GMainContext *inner_context)
{
CtxIntegSource *ctx_src;
g_return_val_if_fail (inner_context, NULL);
if (!g_main_context_acquire (inner_context)) {
/* We require to acquire the context while it's integrated. We need to keep it acquired
* for the entire duration.
*
* This is also necessary because g_source_attach() only wakes up the context, if
* the context is currently acquired. */
g_return_val_if_reached (NULL);
}
ctx_src = (CtxIntegSource *) g_source_new (&ctx_integ_source_funcs, sizeof (CtxIntegSource));
g_source_set_name (&ctx_src->source, "ContextIntegrateSource");
ctx_src->context = g_main_context_ref (inner_context);
ctx_src->fds = g_hash_table_new_full (nm_pint_hash, nm_pint_equals, _poll_data_free, NULL);
ctx_src->fds_len = 0;
ctx_src->fds_arr = NULL;
ctx_src->acquired = TRUE;
ctx_src->max_priority = G_MAXINT;
_CTX_LOG ("create new integ-source for %p", inner_context);
return &ctx_src->source;
}

View File

@@ -1294,6 +1294,10 @@ void nm_utils_invoke_on_idle (NMUtilsInvokeOnIdleCallback callback,
/*****************************************************************************/ /*****************************************************************************/
GSource *nm_utils_g_main_context_create_integrate_source (GMainContext *internal);
/*****************************************************************************/
static inline void static inline void
nm_strv_ptrarray_add_string_take (GPtrArray *cmd, nm_strv_ptrarray_add_string_take (GPtrArray *cmd,
char *str) char *str)