Files
wireplumber/lib/wp/event-dispatcher.c
Ashok Sidipotu 3ff9f240c8 event-dispatcher: after-events hooks to get the trigger event info
after-events hooks will get the original event triggering it, instead of the
rescan event.

after-events hook can register with any event, but it is called with rescan event
info. This is so because, after-events hook run after all the on-events hooks
are done with and as a part of the rescan event. so it is triggered with rescan
event data, which doesnt carry much info, instead of rescan event, it makes more
sense to call the after-events hook with the original event which triggered it.
2023-04-17 07:47:09 -04:00

579 lines
17 KiB
C

/* WirePlumber
*
* Copyright © 2022 Collabora Ltd.
* @author George Kiagiadakis <george.kiagiadakis@collabora.com>
*
* SPDX-License-Identifier: MIT
*/
#define G_LOG_DOMAIN "wp-event-dispatcher"
#include "event-dispatcher.h"
#include "event-hook.h"
#include "private/registry.h"
#include "log.h"
#include <spa/support/plugin.h>
#include <spa/support/system.h>
struct _WpEvent
{
grefcount ref;
/* immutable fields */
gint priority;
WpProperties *properties;
GObject *source;
GObject *subject;
GCancellable *cancellable;
/* managed by the dispatcher */
GList *hooks;
gchar *hooks_chain;
gchar *name;
WpEventHook *current_hook_in_async;
};
G_DEFINE_BOXED_TYPE (WpEvent, wp_event, wp_event_ref, wp_event_unref)
static gchar *
form_event_name (WpEvent *e)
{
WpProperties *props = e->properties;
const gchar *type = wp_properties_get (props, "event.type");
const gchar *subject_type = wp_properties_get (props, "event.subject.type");
const gchar *metadata_name = wp_properties_get (props, "metadata.name");
const gchar *param = wp_properties_get (props, "event.subject.param-id");
return g_strdup_printf ("<%p>%s%s%s%s%s%s%s", e, (type ? type : ""),
((type && subject_type) ? "@" : ""),
(subject_type ? subject_type : ""),
((subject_type && metadata_name) ? "@" : ""),
(metadata_name ? metadata_name : ""),
((param && subject_type) ? "@" : ""),
(param ? param : "")
);
}
static void
on_proxy_destroyed (GObject* self, WpEvent* e)
{
if (e->subject == self)
{
const gchar* type = wp_properties_get (e->properties, "event.type");
/* object removal needs to be processed by hooks */
if (g_str_equal (type, "object-removed"))
wp_properties_set (e->properties, "pw-proxy-destroyed", "true");
else
g_cancellable_cancel (e->cancellable);
}
}
/*!
* \brief Creates a new event
* \ingroup wpevent
* \param type the type of the event
* \param priority the priority of the event
* \param properties (transfer full)(nullable): properties of the event
* \param source (transfer none): the source of the event
* \param subject (transfer none)(nullable): the object that the event is about
* \return (transfer full): the newly constructed event
*/
WpEvent *
wp_event_new (const gchar * type, gint priority, WpProperties * properties,
GObject * source, GObject * subject)
{
WpEvent * self = g_slice_new0 (WpEvent);
g_ref_count_init (&self->ref);
self->priority = priority;
self->properties = properties ?
wp_properties_ensure_unique_owner (properties) :
wp_properties_new_empty ();
self->source = source ? g_object_ref (source) : NULL;
self->subject = subject ? g_object_ref (subject) : NULL;
self->cancellable = g_cancellable_new ();
if (self->subject) {
/* merge properties from subject */
/* PW properties */
GParamSpec *pspec = g_object_class_find_property (
G_OBJECT_GET_CLASS (self->subject), "properties");
if (pspec && G_PARAM_SPEC_VALUE_TYPE (pspec) == WP_TYPE_PROPERTIES) {
g_autoptr (WpProperties) subj_props = NULL;
g_object_get (self->subject, "properties", &subj_props, NULL);
if (subj_props) {
wp_properties_update (self->properties, subj_props);
}
}
/* global properties */
pspec = g_object_class_find_property ( G_OBJECT_GET_CLASS (self->subject),
"global-properties");
if (pspec && G_PARAM_SPEC_VALUE_TYPE (pspec) == WP_TYPE_PROPERTIES) {
g_autoptr (WpProperties) subj_props = NULL;
g_object_get (self->subject, "global-properties", &subj_props, NULL);
if (subj_props) {
wp_properties_update (self->properties, subj_props);
}
}
/* watch for subject pw-proxy-destroyed and cancel event */
if (g_type_is_a (G_OBJECT_TYPE (self->subject), WP_TYPE_PROXY)) {
g_signal_connect (self->subject, "pw-proxy-destroyed",
(GCallback) on_proxy_destroyed, self);
}
}
wp_properties_set (self->properties, "event.type", type);
self->name = form_event_name (self);
return self;
}
static void
wp_event_free (WpEvent * self)
{
g_clear_pointer (&self->properties, wp_properties_unref);
g_clear_object (&self->source);
g_clear_object (&self->subject);
g_clear_object (&self->cancellable);
g_free (self->hooks_chain);
g_free (self->name);
}
WpEvent *
wp_event_ref (WpEvent * self)
{
g_ref_count_inc (&self->ref);
return self;
}
void
wp_event_unref (WpEvent * self)
{
if (g_ref_count_dec (&self->ref))
wp_event_free (self);
}
/*!
* \brief Gets the properties of the Event
* \ingroup wpevent
* \param self the handle
* \return (transfer full): the properties of the event
*/
WpProperties *
wp_event_get_properties (WpEvent * self)
{
g_return_val_if_fail(self != NULL, NULL);
return wp_properties_ref (self->properties);
}
/*!
* \brief Gets the Source Object of the Event
* \ingroup wpevent
* \param self the handle
* \return (transfer full): the source of the event
*/
GObject *
wp_event_get_source (WpEvent * self)
{
g_return_val_if_fail(self != NULL, NULL);
return self->source ? g_object_ref (self->source) : NULL;
}
/*!
* \brief Gets the Subject Object of the Event
* \ingroup wpevent
* \param self the handle
* \return (transfer full): the subject of the event
*/
GObject *
wp_event_get_subject (WpEvent * self)
{
g_return_val_if_fail(self != NULL, NULL);
return self->subject ? g_object_ref (self->subject) : NULL;
}
void
wp_event_stop_processing (WpEvent * self)
{
g_return_if_fail (self != NULL);
wp_debug ("stopping event(%s)", self->name);
g_cancellable_cancel (self->cancellable);
}
struct _WpEventDispatcher
{
GObject parent;
GWeakRef core;
GPtrArray *hooks; /* registered hooks */
GSource *source; /* the event loop source */
GList *events; /* the events stack */
gchar *events_chain; /* chain of events for an event run */
WpEvent *rescan_event;
struct spa_system *system;
int eventfd;
};
G_DEFINE_TYPE (WpEventDispatcher, wp_event_dispatcher, G_TYPE_OBJECT)
#define WP_EVENT_SOURCE_DISPATCHER(x) \
WP_EVENT_DISPATCHER (((WpEventSource *) x)->dispatcher)
typedef struct _WpEventSource WpEventSource;
struct _WpEventSource
{
GSource parent;
WpEventDispatcher *dispatcher;
};
static gboolean
wp_event_source_check (GSource * s)
{
WpEventDispatcher *d = WP_EVENT_SOURCE_DISPATCHER (s);
return d && d->events &&
!((WpEvent *) g_list_first (d->events)->data)->current_hook_in_async;
}
static void
on_event_hook_done (WpEventHook * hook, GAsyncResult * res, WpEvent * event)
{
g_autoptr (GError) error = NULL;
g_autoptr (WpEventDispatcher) dispatcher =
wp_event_hook_get_dispatcher (hook);
g_assert (event->current_hook_in_async == hook);
if (!wp_event_hook_finish (hook, res, &error) && error &&
error->domain != G_IO_ERROR && error->code != G_IO_ERROR_CANCELLED)
wp_message_object (hook, "failed: %s", error->message);
if (dispatcher->rescan_event == event)
wp_event_hook_set_event (event->current_hook_in_async, NULL);
g_clear_object (&event->current_hook_in_async);
spa_system_eventfd_write (dispatcher->system, dispatcher->eventfd, 1);
}
static gchar *
build_chain (gchar *link, gint link_priority, gchar *chain)
{
gchar *temp = g_strdup_printf ("%s%s%s(%d)", (chain ? chain : ""),
(chain ? " -> " : ""), link, link_priority);
g_free (chain);
chain = temp;
return chain;
}
static gboolean
wp_event_source_dispatch (GSource * s, GSourceFunc callback, gpointer user_data)
{
WpEventDispatcher *d = WP_EVENT_SOURCE_DISPATCHER (s);
uint64_t count;
/* clear the eventfd */
spa_system_eventfd_read (d->system, d->eventfd, &count);
/* get the highest priority event */
GList *levent = g_list_first (d->events);
while (levent) {
WpEvent *event = (WpEvent *) (levent->data);
/* event hook is still in progress, we will continue later */
if (event->current_hook_in_async)
return G_SOURCE_CONTINUE;
/* remove the remaining hooks if the event was cancelled */
if (g_cancellable_is_cancelled (event->cancellable) && event->hooks) {
wp_debug_object (d, "event(%s) cancelled remove it", event->name);
g_list_free_full (g_steal_pointer (&event->hooks), g_object_unref);
}
/* avoid duplicate entries in chain */
if (!d->events_chain || !strstr (d->events_chain, event->name)) {
d->events_chain = build_chain (event->name, event->priority,
d->events_chain);
wp_debug_object (d, "dispatching event (%s)" WP_OBJECT_FORMAT
" priority(%d)", event->name, WP_OBJECT_ARGS (event->subject),
event->priority);
}
/* get the highest priority hook */
GList *lhook = g_list_first (event->hooks);
if (lhook) {
gchar *name = NULL;
gint priority;
event->current_hook_in_async = WP_EVENT_HOOK (lhook->data);
event->hooks = g_list_delete_link (event->hooks, g_steal_pointer (&lhook));
name = wp_event_hook_get_name (event->current_hook_in_async);
priority = wp_event_hook_get_priority (event->current_hook_in_async);
event->hooks_chain = build_chain (name, priority, event->hooks_chain);
wp_debug_object (d, "running hook <%p>(%s) and hooks chain (%s)",
event->current_hook_in_async, name, event->hooks_chain);
if (event == d->rescan_event) {
WpEvent *hook_event =
wp_event_hook_get_event (event->current_hook_in_async);
/* for after-events, check if the corresponding event is cancelled */
if (g_cancellable_is_cancelled (hook_event->cancellable))
wp_debug_object (d, "remove hook(%s) as its trigger event(%s)"
" is cancelled", name, hook_event->name);
else
wp_event_hook_run (event->current_hook_in_async, hook_event,
event->cancellable, (GAsyncReadyCallback) on_event_hook_done,
event);
}
else
/* execute the hook, possibly async */
wp_event_hook_run (event->current_hook_in_async, event,
event->cancellable, (GAsyncReadyCallback) on_event_hook_done, event);
}
/* clear the event after all hooks are done */
if (!event->hooks && !event->current_hook_in_async) {
d->events = g_list_delete_link (d->events, g_steal_pointer (&levent));
if (event == d->rescan_event)
d->rescan_event = NULL;
g_clear_pointer (&event, wp_event_unref);
}
/* get the next event */
levent = g_list_first (d->events);
}
/* an event run completed reset the events_chain */
g_free (d->events_chain);
d->events_chain = NULL;
return G_SOURCE_CONTINUE;
}
static GSourceFuncs source_funcs = {
NULL,
wp_event_source_check,
wp_event_source_dispatch,
NULL
};
static void
wp_event_dispatcher_init (WpEventDispatcher * self)
{
g_weak_ref_init (&self->core, NULL);
self->hooks = g_ptr_array_new_with_free_func (g_object_unref);
self->source = g_source_new (&source_funcs, sizeof (WpEventSource));
((WpEventSource *) self->source)->dispatcher = self;
/* this is higher than normal "idle" operations but lower than the default
priority, which is used for events from the PipeWire socket and timers */
g_source_set_priority (self->source, G_PRIORITY_HIGH_IDLE);
}
static void
clear_event (WpEvent * event)
{
g_cancellable_cancel (event->cancellable);
g_list_free_full (g_steal_pointer (&event->hooks), g_object_unref);
wp_event_unref (event);
}
static void
wp_event_dispatcher_finalize (GObject * object)
{
WpEventDispatcher *self = WP_EVENT_DISPATCHER (object);
g_list_free_full (g_steal_pointer (&self->events),
(GDestroyNotify) clear_event);
((WpEventSource *) self->source)->dispatcher = NULL;
g_source_destroy (self->source);
g_clear_pointer (&self->source, g_source_unref);
close (self->eventfd);
g_clear_pointer (&self->hooks, g_ptr_array_unref);
g_weak_ref_clear (&self->core);
g_free (self->events_chain);
G_OBJECT_CLASS (wp_event_dispatcher_parent_class)->finalize (object);
}
static void
wp_event_dispatcher_class_init (WpEventDispatcherClass * klass)
{
GObjectClass *object_class = (GObjectClass *) klass;
object_class->finalize = wp_event_dispatcher_finalize;
}
/*!
* \brief Returns the event dispatcher instance that is associated with the
* given core.
*
* This method will also create the instance and register it with the core,
* if it had not been created before.
*
* \ingroup wpeventdispatcher
* \param core the core
* \return (transfer full): the event dispatcher instance
*/
WpEventDispatcher *
wp_event_dispatcher_get_instance (WpCore * core)
{
WpRegistry *registry = wp_core_get_registry (core);
WpEventDispatcher *dispatcher = wp_registry_find_object (registry,
(GEqualFunc) WP_IS_EVENT_DISPATCHER, NULL);
if (G_UNLIKELY (!dispatcher)) {
dispatcher = g_object_new (WP_TYPE_EVENT_DISPATCHER, NULL);
g_weak_ref_set (&dispatcher->core, core);
struct pw_context *context = wp_core_get_pw_context (core);
uint32_t n_support;
const struct spa_support *support =
pw_context_get_support (context, &n_support);
dispatcher->system =
spa_support_find (support, n_support, SPA_TYPE_INTERFACE_System);
dispatcher->eventfd = spa_system_eventfd_create (dispatcher->system, 0);
g_source_add_unix_fd (dispatcher->source, dispatcher->eventfd, G_IO_IN);
g_source_attach (dispatcher->source, wp_core_get_g_main_context (core));
wp_registry_register_object (registry, g_object_ref (dispatcher));
}
return dispatcher;
}
static gint
event_cmp_func (const WpEvent *a, const WpEvent *b)
{
return b->priority - a->priority;
}
static gint
hook_cmp_func (const WpEventHook *a, const WpEventHook *b)
{
return wp_event_hook_get_priority ((WpEventHook *)b) -
wp_event_hook_get_priority ((WpEventHook *)a);
}
/*!
* \brief Pushes a new event onto the event stack for dispatching only if there
* are any hooks are available for it.
* \ingroup wpeventdispatcher
*
* \param self the dispatcher
* \param event (transfer full): the new event
*/
void
wp_event_dispatcher_push_event (WpEventDispatcher * self, WpEvent * event)
{
g_return_if_fail (WP_IS_EVENT_DISPATCHER (self));
g_return_if_fail (event != NULL);
gboolean hooks_added = FALSE;
gboolean rescan_hooks_added = FALSE;
if (!self->rescan_event)
self->rescan_event = wp_event_new ("rescan", G_MININT16, NULL, NULL, NULL);
/* attach hooks that run for this event */
for (guint i = 0; i < self->hooks->len; i++) {
WpEventHook *hook = g_ptr_array_index (self->hooks, i);
if (wp_event_hook_runs_for_event (hook, event)) {
/* ON_EVENT hooks run at the dispatching of the event */
if (wp_event_hook_get_exec_type (hook) == WP_EVENT_HOOK_EXEC_TYPE_ON_EVENT) {
event->hooks = g_list_insert_sorted (event->hooks, g_object_ref (hook),
(GCompareFunc) hook_cmp_func);
wp_debug_object (self, "added hook <%p>(%s(%d))", hook,
wp_event_hook_get_name (hook), wp_event_hook_get_priority (hook));
hooks_added = true;
}
/* AFTER_EVENTS hooks run after all other events have been dispatched */
else if (!g_list_find (self->rescan_event->hooks, hook)) {
self->rescan_event->hooks = g_list_insert_sorted (
self->rescan_event->hooks, g_object_ref (hook),
(GCompareFunc) hook_cmp_func);
wp_debug_object (self, "added rescan hook <%p>(%s(%d))", hook,
wp_event_hook_get_name (hook), wp_event_hook_get_priority (hook));
wp_event_hook_set_event (hook, event);
rescan_hooks_added = true;
}
}
}
if (rescan_hooks_added)
{
wp_debug_object (self, "pushed rescan event %p", self->rescan_event);
if (!g_list_find (self->events, self->rescan_event))
self->events = g_list_insert_sorted (self->events, self->rescan_event,
(GCompareFunc) event_cmp_func);
}
if (hooks_added)
{
wp_debug_object (self, "pushed event (%s)" WP_OBJECT_FORMAT
" priority(%d)", event->name, WP_OBJECT_ARGS (event->subject),
event->priority);
self->events = g_list_insert_sorted (self->events, event,
(GCompareFunc) event_cmp_func);
}
else
g_clear_pointer (&event, wp_event_unref);
if (hooks_added || rescan_hooks_added)
/* wakeup the GSource */
spa_system_eventfd_write (self->system, self->eventfd, 1);
}
/*!
* \brief Registers an event hook
* \ingroup wpeventdispatcher
*
* \param self the event dispatcher
* \param hook (transfer none): the hook to register
*/
void
wp_event_dispatcher_register_hook (WpEventDispatcher * self,
WpEventHook * hook)
{
g_return_if_fail (WP_IS_EVENT_DISPATCHER (self));
g_return_if_fail (WP_IS_EVENT_HOOK (hook));
g_autoptr (WpEventDispatcher) already_registered_dispatcher =
wp_event_hook_get_dispatcher (hook);
g_return_if_fail (already_registered_dispatcher == NULL);
wp_event_hook_set_dispatcher (hook, self);
g_ptr_array_add (self->hooks, g_object_ref (hook));
}
/*!
* \brief Unregisters an event hook
* \ingroup wpeventdispacher
*
* \param self the event dispatcher
* \param hook (transfer none): the hook to unregister
*/
void
wp_event_dispatcher_unregister_hook (WpEventDispatcher * self,
WpEventHook * hook)
{
g_return_if_fail (WP_IS_EVENT_DISPATCHER (self));
g_return_if_fail (WP_IS_EVENT_HOOK (hook));
g_autoptr (WpEventDispatcher) already_registered_dispatcher =
wp_event_hook_get_dispatcher (hook);
g_return_if_fail (already_registered_dispatcher == self);
wp_event_hook_set_dispatcher (hook, NULL);
g_ptr_array_remove_fast (self->hooks, hook);
}