serial: don't let EAGAIN block sending commands until completion

If a port returns EAGAIN on write attempts, previously the code would
spin and attempt to resend the failed byte after send_delay
microseconds.  This resulted in up to 3 second hard blocks in
the serial code when sending to ports that don't respond.  While
in this blocking loop no other events or dbus commands could be
processed.

Instead, send each byte and reschedule sending the next byte in
send_delay microseconds, so that we can process other events in between
attempts to write to stupid ports.

This doesn't hugely decrease the amount of time that probing
requires, since we still need to probe all ports of the device
before exporting the modem to D-Bus, but it does let MM find
responsive ports much more quickly, and ensures that MM doesn't
block any D-Bus requests.
This commit is contained in:
Dan Williams
2011-04-06 13:00:55 -05:00
parent e520c2d448
commit 49150ca3a6

View File

@@ -68,7 +68,7 @@ typedef struct {
guint stopbits; guint stopbits;
guint64 send_delay; guint64 send_delay;
guint queue_schedule; guint queue_id;
guint watch_id; guint watch_id;
guint timeout_id; guint timeout_id;
@@ -76,6 +76,18 @@ typedef struct {
guint connected_id; guint connected_id;
} MMSerialPortPrivate; } MMSerialPortPrivate;
typedef struct {
GByteArray *command;
guint32 idx;
guint32 eagain_count;
gboolean started;
gboolean done;
GCallback callback;
gpointer user_data;
guint32 timeout;
gboolean cached;
} MMQueueData;
#if 0 #if 0
static const char * static const char *
baud_to_string (int baud) baud_to_string (int baud)
@@ -355,57 +367,57 @@ serial_debug (MMSerialPort *self, const char *prefix, const char *buf, gsize len
} }
static gboolean static gboolean
mm_serial_port_send_command (MMSerialPort *self, mm_serial_port_process_command (MMSerialPort *self,
GByteArray *command, MMQueueData *info,
GError **error) GError **error)
{ {
MMSerialPortPrivate *priv = MM_SERIAL_PORT_GET_PRIVATE (self); MMSerialPortPrivate *priv = MM_SERIAL_PORT_GET_PRIVATE (self);
int status, i = 0;
int eagain_count = 1000;
const guint8 *p; const guint8 *p;
int status;
if (priv->fd < 0) { if (priv->fd < 0) {
g_set_error (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED, g_set_error_literal (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED,
"%s", "Sending command failed: device is not enabled"); "Sending command failed: device is not enabled");
return FALSE; return FALSE;
} }
if (mm_port_get_connected (MM_PORT (self))) { if (mm_port_get_connected (MM_PORT (self))) {
g_set_error (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED, g_set_error_literal (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED,
"%s", "Sending command failed: device is connected"); "Sending command failed: device is connected");
return FALSE; return FALSE;
} }
serial_debug (self, "-->", (const char *) command->data, command->len); /* Only print command the first time */
if (info->started == FALSE) {
info->started = TRUE;
serial_debug (self, "-->", (const char *) info->command->data, info->command->len);
}
/* Only accept about 3 seconds of EAGAIN */ /* Send a single byte of the command */
if (priv->send_delay > 0) p = &info->command->data[info->idx];
eagain_count = 3000000 / priv->send_delay; errno = 0;
while (i < command->len) {
p = &command->data[i];
status = write (priv->fd, p, 1); status = write (priv->fd, p, 1);
if (status < 0) { if (status == 1)
info->idx++;
else if (status < 0) {
if (errno == EAGAIN) { if (errno == EAGAIN) {
eagain_count--; info->eagain_count--;
if (eagain_count <= 0) { if (info->eagain_count <= 0) {
g_set_error (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED, g_set_error (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED,
"Sending command failed: '%s'", strerror (errno)); "Sending command failed: '%s'", strerror (errno));
break; return FALSE;
} }
} else { } else {
g_set_error (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED, g_set_error (error, MM_SERIAL_ERROR, MM_SERIAL_ERROR_SEND_FAILED,
"Sending command failed: '%s'", strerror (errno)); "Sending command failed: '%s'", strerror (errno));
break; return FALSE;
} }
} else
i++;
if (priv->send_delay)
usleep (priv->send_delay);
} }
return i == command->len; if (info->idx >= info->command->len)
info->done = TRUE;
return TRUE;
} }
static void static void
@@ -436,35 +448,25 @@ mm_serial_port_get_cached_reply (MMSerialPort *self, GByteArray *command)
return (const GByteArray *) g_hash_table_lookup (MM_SERIAL_PORT_GET_PRIVATE (self)->reply_cache, command); return (const GByteArray *) g_hash_table_lookup (MM_SERIAL_PORT_GET_PRIVATE (self)->reply_cache, command);
} }
typedef struct {
GByteArray *command;
GCallback callback;
gpointer user_data;
guint32 timeout;
gboolean cached;
} MMQueueData;
static void static void
mm_serial_port_schedule_queue_process (MMSerialPort *self) mm_serial_port_schedule_queue_process (MMSerialPort *self, guint timeout_ms)
{ {
MMSerialPortPrivate *priv = MM_SERIAL_PORT_GET_PRIVATE (self); MMSerialPortPrivate *priv = MM_SERIAL_PORT_GET_PRIVATE (self);
GSource *source;
if (priv->timeout_id) { if (priv->timeout_id) {
/* A command is already in progress */ /* A command is already in progress */
return; return;
} }
if (priv->queue_schedule) { if (priv->queue_id) {
/* Already scheduled */ /* Already scheduled */
return; return;
} }
source = g_idle_source_new (); if (timeout_ms)
g_source_set_closure (source, g_cclosure_new_object (G_CALLBACK (mm_serial_port_queue_process), G_OBJECT (self))); priv->queue_id = g_timeout_add (timeout_ms, mm_serial_port_queue_process, self);
g_source_attach (source, NULL); else
priv->queue_schedule = g_source_get_id (source); priv->queue_id = g_idle_add (mm_serial_port_queue_process, self);
g_source_unref (source);
} }
static gsize static gsize
@@ -516,7 +518,7 @@ mm_serial_port_got_response (MMSerialPort *self, GError *error)
if (consumed) if (consumed)
g_byte_array_remove_range (priv->response, 0, consumed); g_byte_array_remove_range (priv->response, 0, consumed);
if (!g_queue_is_empty (priv->queue)) if (!g_queue_is_empty (priv->queue))
mm_serial_port_schedule_queue_process (self); mm_serial_port_schedule_queue_process (self, 0);
} }
static gboolean static gboolean
@@ -547,7 +549,7 @@ mm_serial_port_queue_process (gpointer data)
MMQueueData *info; MMQueueData *info;
GError *error = NULL; GError *error = NULL;
priv->queue_schedule = 0; priv->queue_id = 0;
info = (MMQueueData *) g_queue_peek_head (priv->queue); info = (MMQueueData *) g_queue_peek_head (priv->queue);
if (!info) if (!info)
@@ -563,17 +565,18 @@ mm_serial_port_queue_process (gpointer data)
} }
} }
if (mm_serial_port_send_command (self, info->command, &error)) { if (mm_serial_port_process_command (self, info, &error)) {
GSource *source; if (info->done) {
/* If the command is finished being sent, schedule the timeout */
source = g_timeout_source_new_seconds (info->timeout); priv->timeout_id = g_timeout_add_seconds (info->timeout,
g_source_set_closure (source, g_cclosure_new_object (G_CALLBACK (mm_serial_port_timed_out), G_OBJECT (self))); mm_serial_port_timed_out,
g_source_attach (source, NULL); self);
priv->timeout_id = g_source_get_id (source);
g_source_unref (source);
} else { } else {
mm_serial_port_got_response (self, error); /* Schedule the next byte of the command to be sent */
mm_serial_port_schedule_queue_process (self, priv->send_delay / 1000);
} }
} else
mm_serial_port_got_response (self, error);
return FALSE; return FALSE;
} }
@@ -600,6 +603,7 @@ data_available (GIOChannel *source,
char buf[SERIAL_BUF_SIZE + 1]; char buf[SERIAL_BUF_SIZE + 1];
gsize bytes_read; gsize bytes_read;
GIOStatus status; GIOStatus status;
MMQueueData *info;
if (condition & G_IO_HUP) { if (condition & G_IO_HUP) {
if (priv->response->len) if (priv->response->len)
@@ -614,6 +618,11 @@ data_available (GIOChannel *source,
return TRUE; return TRUE;
} }
/* Don't read any input if the current command isn't done being sent yet */
info = g_queue_peek_nth (priv->queue, 0);
if (info && (info->started == TRUE) && (info->done == FALSE))
return TRUE;
do { do {
GError *err = NULL; GError *err = NULL;
@@ -909,6 +918,13 @@ internal_queue_command (MMSerialPort *self,
info->command = g_byte_array_sized_new (command->len); info->command = g_byte_array_sized_new (command->len);
g_byte_array_append (info->command, command->data, command->len); g_byte_array_append (info->command, command->data, command->len);
} }
/* Only accept about 3 seconds of EAGAIN for this command */
if (priv->send_delay)
info->eagain_count = 3000000 / priv->send_delay;
else
info->eagain_count = 1000;
info->cached = cached; info->cached = cached;
info->timeout = timeout_seconds; info->timeout = timeout_seconds;
info->callback = (GCallback) callback; info->callback = (GCallback) callback;
@@ -921,7 +937,7 @@ internal_queue_command (MMSerialPort *self,
g_queue_push_tail (priv->queue, info); g_queue_push_tail (priv->queue, info);
if (g_queue_get_length (priv->queue) == 1) if (g_queue_get_length (priv->queue) == 1)
mm_serial_port_schedule_queue_process (self); mm_serial_port_schedule_queue_process (self, 0);
} }
void void
@@ -1264,6 +1280,13 @@ get_property (GObject *object, guint prop_id,
static void static void
dispose (GObject *object) dispose (GObject *object)
{ {
MMSerialPortPrivate *priv = MM_SERIAL_PORT_GET_PRIVATE (object);
if (priv->timeout_id) {
g_source_remove (priv->timeout_id);
priv->timeout_id = 0;
}
if (mm_serial_port_is_open (MM_SERIAL_PORT (object))) if (mm_serial_port_is_open (MM_SERIAL_PORT (object)))
mm_serial_port_close_force (MM_SERIAL_PORT (object)); mm_serial_port_close_force (MM_SERIAL_PORT (object));
@@ -1346,7 +1369,7 @@ mm_serial_port_class_init (MMSerialPortClass *klass)
(object_class, PROP_SEND_DELAY, (object_class, PROP_SEND_DELAY,
g_param_spec_uint64 (MM_SERIAL_PORT_SEND_DELAY, g_param_spec_uint64 (MM_SERIAL_PORT_SEND_DELAY,
"SendDelay", "SendDelay",
"Send delay", "Send delay for each byte in microseconds",
0, G_MAXUINT64, 0, 0, G_MAXUINT64, 0,
G_PARAM_READWRITE)); G_PARAM_READWRITE));