port-serial: use GIO Async API like method for command()

This commit is contained in:
Aleksander Morgado
2013-11-18 00:07:04 +01:00
parent 8122153a88
commit e909edcf1f
4 changed files with 329 additions and 397 deletions

View File

@@ -151,19 +151,6 @@ parse_response (MMPortSerial *port, GByteArray *response, GError **error)
return found; return found;
} }
static gsize
handle_response (MMPortSerial *port,
GByteArray *response,
GError *error,
GCallback callback,
gpointer callback_data)
{
MMSerialResponseFn response_callback = (MMSerialResponseFn) callback;
response_callback (port, response, error, callback_data);
return response->len;
}
/*****************************************************************************/ /*****************************************************************************/
typedef struct { typedef struct {
@@ -365,24 +352,31 @@ string_free (GString *str)
static void static void
serial_command_ready (MMPortSerial *port, serial_command_ready (MMPortSerial *port,
GByteArray *response, GAsyncResult *res,
GError *error,
GSimpleAsyncResult *simple) GSimpleAsyncResult *simple)
{ {
if (error) GByteArray *response_buffer;
g_simple_async_result_set_from_error (simple, error); GError *error = NULL;
else if (response) { GString *response;
GString *str;
/* Build a GString just with the response we need, and clear the response_buffer = mm_port_serial_command_finish (port, res, &error);
* processed range from the response buffer */ if (!response_buffer) {
str = g_string_new_len ((const gchar *)response->data, response->len); g_simple_async_result_take_error (simple, error);
g_simple_async_result_set_op_res_gpointer (simple, g_simple_async_result_complete (simple);
str, g_object_unref (simple);
(GDestroyNotify)string_free); return;
} else }
g_assert_not_reached ();
/* Build a GString just with the response we need, and clear the
* processed range from the response buffer */
response = g_string_new_len ((const gchar *)response_buffer->data, response_buffer->len);
if (response_buffer->len > 0)
g_byte_array_remove_range (response_buffer, 0, response_buffer->len);
g_byte_array_unref (response_buffer);
g_simple_async_result_set_op_res_gpointer (simple,
response,
(GDestroyNotify)string_free);
g_simple_async_result_complete (simple); g_simple_async_result_complete (simple);
g_object_unref (simple); g_object_unref (simple);
} }
@@ -413,22 +407,14 @@ mm_port_serial_at_command (MMPortSerialAt *self,
user_data, user_data,
mm_port_serial_at_command); mm_port_serial_at_command);
if (!allow_cached) mm_port_serial_command (MM_PORT_SERIAL (self),
mm_port_serial_queue_command (MM_PORT_SERIAL (self), buf,
buf, timeout_seconds,
TRUE, allow_cached,
timeout_seconds, cancellable,
cancellable, (GAsyncReadyCallback)serial_command_ready,
(MMSerialResponseFn)serial_command_ready, simple);
simple); g_byte_array_unref (buf);
else
mm_port_serial_queue_command_cached (MM_PORT_SERIAL (self),
buf,
TRUE,
timeout_seconds,
cancellable,
(MMSerialResponseFn)serial_command_ready,
simple);
} }
static void static void
@@ -637,7 +623,6 @@ mm_port_serial_at_class_init (MMPortSerialAtClass *klass)
serial_class->parse_unsolicited = parse_unsolicited; serial_class->parse_unsolicited = parse_unsolicited;
serial_class->parse_response = parse_response; serial_class->parse_response = parse_response;
serial_class->handle_response = handle_response;
serial_class->debug_log = debug_log; serial_class->debug_log = debug_log;
serial_class->config = config; serial_class->config = config;

View File

@@ -65,76 +65,6 @@ parse_response (MMPortSerial *port, GByteArray *response, GError **error)
return find_qcdm_start (response, NULL); return find_qcdm_start (response, NULL);
} }
static gsize
handle_response (MMPortSerial *port,
GByteArray *response,
GError *error,
GCallback callback,
gpointer callback_data)
{
MMSerialResponseFn response_callback = (MMSerialResponseFn) callback;
GByteArray *unescaped = NULL;
guint8 *unescaped_buffer;
GError *dm_error = NULL;
gsize used = 0;
gsize start = 0;
gboolean success = FALSE;
qcdmbool more = FALSE;
gsize unescaped_len = 0;
if (error)
goto callback;
/* Get the offset into the buffer of where the QCDM frame starts */
if (!find_qcdm_start (response, &start)) {
g_set_error_literal (&dm_error,
MM_CORE_ERROR, MM_CORE_ERROR_FAILED,
"Failed to parse QCDM packet.");
/* Discard the unparsable data */
used = response->len;
goto callback;
}
unescaped_buffer = g_malloc (1024);
success = dm_decapsulate_buffer ((const char *) (response->data + start),
response->len - start,
(char *) unescaped_buffer,
1024,
&unescaped_len,
&used,
&more);
if (!success) {
g_set_error_literal (&dm_error,
MM_CORE_ERROR, MM_CORE_ERROR_FAILED,
"Failed to unescape QCDM packet.");
g_free (unescaped_buffer);
unescaped_buffer = NULL;
} else if (more) {
/* Need more data; we shouldn't have gotten here since the parse
* function checks for the end-of-frame marker, but whatever.
*/
g_free (unescaped_buffer);
return 0;
} else {
/* Successfully decapsulated the DM command */
g_assert (unescaped_len <= 1024);
unescaped_buffer = g_realloc (unescaped_buffer, unescaped_len);
unescaped = g_byte_array_new_take (unescaped_buffer, unescaped_len);
}
callback:
response_callback (MM_PORT_SERIAL (port),
unescaped,
dm_error ? dm_error : error,
callback_data);
if (unescaped)
g_byte_array_unref (unescaped);
g_clear_error (&dm_error);
return start + used;
}
/*****************************************************************************/ /*****************************************************************************/
GByteArray * GByteArray *
@@ -150,18 +80,77 @@ mm_port_serial_qcdm_command_finish (MMPortSerialQcdm *self,
static void static void
serial_command_ready (MMPortSerial *port, serial_command_ready (MMPortSerial *port,
GByteArray *response, GAsyncResult *res,
GError *error,
GSimpleAsyncResult *simple) GSimpleAsyncResult *simple)
{ {
GByteArray *response_buffer;
GByteArray *response;
GError *error = NULL;
gsize used = 0;
gsize start = 0;
guint8 *unescaped_buffer = NULL;
gboolean success = FALSE;
qcdmbool more = FALSE;
gsize unescaped_len = 0;
response_buffer = mm_port_serial_command_finish (port, res, &error);
if (!response_buffer)
goto out;
/* Get the offset into the buffer of where the QCDM frame starts */
start = 0;
if (!find_qcdm_start (response_buffer, &start)) {
error = g_error_new_literal (MM_CORE_ERROR,
MM_CORE_ERROR_FAILED,
"Failed to parse QCDM packet");
/* Discard the unparsable data */
used = response_buffer->len;
goto out;
}
unescaped_buffer = g_malloc (1024);
success = dm_decapsulate_buffer ((const char *)(response_buffer->data + start),
response_buffer->len - start,
(char *)unescaped_buffer,
1024,
&unescaped_len,
&used,
&more);
if (!success) {
error = g_error_new_literal (MM_CORE_ERROR,
MM_CORE_ERROR_FAILED,
"Failed to unescape QCDM packet");
g_free (unescaped_buffer);
unescaped_buffer = NULL;
goto out;
}
if (more) {
/* Need more data; we shouldn't have gotten here since the parse
* function checks for the end-of-frame marker, but whatever.
*/
error = g_error_new_literal (MM_CORE_ERROR,
MM_CORE_ERROR_FAILED,
"QCDM packet is not complete");
g_free (unescaped_buffer);
unescaped_buffer = NULL;
goto out;
}
/* Successfully decapsulated the DM command */
g_assert (error == NULL);
g_assert (unescaped_len <= 1024);
unescaped_buffer = g_realloc (unescaped_buffer, unescaped_len);
response = g_byte_array_new_take (unescaped_buffer, unescaped_len);
g_simple_async_result_set_op_res_gpointer (simple, response, (GDestroyNotify)g_byte_array_unref);
out:
if (error) if (error)
g_simple_async_result_set_from_error (simple, error); g_simple_async_result_take_error (simple, error);
else if (response) if (start + used)
g_simple_async_result_set_op_res_gpointer (simple, g_byte_array_remove_range (response_buffer, 0, start + used);
g_byte_array_ref (response), if (response_buffer)
(GDestroyNotify)g_byte_array_unref); g_byte_array_unref (response_buffer);
else
g_assert_not_reached ();
g_simple_async_result_complete (simple); g_simple_async_result_complete (simple);
g_object_unref (simple); g_object_unref (simple);
@@ -187,23 +176,13 @@ mm_port_serial_qcdm_command (MMPortSerialQcdm *self,
mm_port_serial_qcdm_command); mm_port_serial_qcdm_command);
/* 'command' is expected to be already CRC-ed and escaped */ /* 'command' is expected to be already CRC-ed and escaped */
mm_port_serial_command (MM_PORT_SERIAL (self),
if (!allow_cached) command,
mm_port_serial_queue_command (MM_PORT_SERIAL (self), timeout_seconds,
g_byte_array_ref (command), allow_cached,
TRUE, cancellable,
timeout_seconds, (GAsyncReadyCallback)serial_command_ready,
cancellable, simple);
(MMSerialResponseFn)serial_command_ready,
simple);
else
mm_port_serial_queue_command_cached (MM_PORT_SERIAL (self),
g_byte_array_ref (command),
TRUE,
timeout_seconds,
cancellable,
(MMSerialResponseFn)serial_command_ready,
simple);
} }
static void static void
@@ -283,7 +262,6 @@ mm_port_serial_qcdm_class_init (MMPortSerialQcdmClass *klass)
/* Virtual methods */ /* Virtual methods */
port_class->parse_response = parse_response; port_class->parse_response = parse_response;
port_class->handle_response = handle_response;
port_class->config_fd = config_fd; port_class->config_fd = config_fd;
port_class->debug_log = debug_log; port_class->debug_log = debug_log;
} }

View File

@@ -34,9 +34,14 @@
#include "mm-port-serial.h" #include "mm-port-serial.h"
#include "mm-log.h" #include "mm-log.h"
static gboolean mm_port_serial_queue_process (gpointer data); static gboolean port_serial_queue_process (gpointer data);
static void port_serial_close_force (MMPortSerial *self); static void port_serial_schedule_queue_process (MMPortSerial *self,
static void port_serial_reopen_cancel (MMPortSerial *self); guint timeout_ms);
static void port_serial_close_force (MMPortSerial *self);
static void port_serial_reopen_cancel (MMPortSerial *self);
static void port_serial_set_cached_reply (MMPortSerial *self,
const GByteArray *command,
const GByteArray *response);
G_DEFINE_TYPE (MMPortSerial, mm_port_serial, MM_TYPE_PORT) G_DEFINE_TYPE (MMPortSerial, mm_port_serial, MM_TYPE_PORT)
@@ -104,18 +109,103 @@ typedef struct {
gpointer reopen_ctx; gpointer reopen_ctx;
} MMPortSerialPrivate; } MMPortSerialPrivate;
/*****************************************************************************/
/* Command */
typedef struct { typedef struct {
MMPortSerial *self;
GSimpleAsyncResult *result;
GCancellable *cancellable;
GByteArray *command; GByteArray *command;
guint32 idx; guint32 timeout;
gboolean allow_cached;
guint32 eagain_count; guint32 eagain_count;
guint32 idx;
gboolean started; gboolean started;
gboolean done; gboolean done;
GCallback callback; } CommandContext;
gpointer user_data;
guint32 timeout; static void
gboolean cached; command_context_complete_and_free (CommandContext *ctx, gboolean idle)
GCancellable *cancellable; {
} MMQueueData; if (idle)
g_simple_async_result_complete_in_idle (ctx->result);
else
g_simple_async_result_complete (ctx->result);
g_object_unref (ctx->result);
g_byte_array_unref (ctx->command);
if (ctx->cancellable)
g_object_unref (ctx->cancellable);
g_object_unref (ctx->self);
g_slice_free (CommandContext, ctx);
}
GByteArray *
mm_port_serial_command_finish (MMPortSerial *self,
GAsyncResult *res,
GError **error)
{
if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (res), error))
return NULL;
return g_byte_array_ref (g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)));
}
void
mm_port_serial_command (MMPortSerial *self,
GByteArray *command,
guint32 timeout_seconds,
gboolean allow_cached,
GCancellable *cancellable,
GAsyncReadyCallback callback,
gpointer user_data)
{
CommandContext *ctx;
MMPortSerialPrivate *priv;
g_return_if_fail (MM_IS_PORT_SERIAL (self));
g_return_if_fail (command != NULL);
priv = MM_PORT_SERIAL_GET_PRIVATE (self);
/* Setup command context */
ctx = g_slice_new0 (CommandContext);
ctx->self = g_object_ref (self);
ctx->result = g_simple_async_result_new (G_OBJECT (self),
callback,
user_data,
mm_port_serial_command);
ctx->command = g_byte_array_ref (command);
ctx->allow_cached = allow_cached;
ctx->timeout = timeout_seconds;
ctx->cancellable = (cancellable ? g_object_ref (cancellable) : NULL);
/* Only accept about 3 seconds of EAGAIN for this command */
if (priv->send_delay)
ctx->eagain_count = 3000000 / priv->send_delay;
else
ctx->eagain_count = 1000;
if (priv->open_count == 0) {
g_simple_async_result_set_error (ctx->result,
MM_SERIAL_ERROR,
MM_SERIAL_ERROR_SEND_FAILED,
"Sending command failed: device is not open");
command_context_complete_and_free (ctx, TRUE);
return;
}
/* Clear the cached value for this command if not asking for cached value */
if (!allow_cached)
port_serial_set_cached_reply (self, ctx->command, NULL);
g_queue_push_tail (priv->queue, ctx);
if (g_queue_get_length (priv->queue) == 1)
port_serial_schedule_queue_process (self, 0);
}
/*****************************************************************************/
#if 0 #if 0
static const char * static const char *
@@ -431,9 +521,9 @@ serial_debug (MMPortSerial *self, const char *prefix, const char *buf, gsize len
} }
static gboolean static gboolean
mm_port_serial_process_command (MMPortSerial *self, port_serial_process_command (MMPortSerial *self,
MMQueueData *info, CommandContext *ctx,
GError **error) GError **error)
{ {
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self); MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self);
const guint8 *p; const guint8 *p;
@@ -452,31 +542,31 @@ mm_port_serial_process_command (MMPortSerial *self,
} }
/* Only print command the first time */ /* Only print command the first time */
if (info->started == FALSE) { if (ctx->started == FALSE) {
info->started = TRUE; ctx->started = TRUE;
serial_debug (self, "-->", (const char *) info->command->data, info->command->len); serial_debug (self, "-->", (const char *) ctx->command->data, ctx->command->len);
} }
if (priv->send_delay == 0) { if (priv->send_delay == 0) {
/* Send the whole command in one write */ /* Send the whole command in one write */
send_len = expected_status = info->command->len; send_len = expected_status = ctx->command->len;
p = info->command->data; p = ctx->command->data;
} else { } else {
/* Send just one byte of the command */ /* Send just one byte of the command */
send_len = expected_status = 1; send_len = expected_status = 1;
p = &info->command->data[info->idx]; p = &ctx->command->data[ctx->idx];
} }
/* Send a single byte of the command */ /* Send a single byte of the command */
errno = 0; errno = 0;
status = write (priv->fd, p, send_len); status = write (priv->fd, p, send_len);
if (status > 0) if (status > 0)
info->idx += status; ctx->idx += status;
else { else {
/* Error or no bytes written */ /* Error or no bytes written */
if (errno == EAGAIN || status == 0) { if (errno == EAGAIN || status == 0) {
info->eagain_count--; ctx->eagain_count--;
if (info->eagain_count <= 0) { if (ctx->eagain_count <= 0) {
/* If we reach the limit of EAGAIN errors, treat as a timeout error. */ /* If we reach the limit of EAGAIN errors, treat as a timeout error. */
priv->n_consecutive_timeouts++; priv->n_consecutive_timeouts++;
g_signal_emit (self, signals[TIMED_OUT], 0, priv->n_consecutive_timeouts); g_signal_emit (self, signals[TIMED_OUT], 0, priv->n_consecutive_timeouts);
@@ -492,16 +582,16 @@ mm_port_serial_process_command (MMPortSerial *self,
} }
} }
if (info->idx >= info->command->len) if (ctx->idx >= ctx->command->len)
info->done = TRUE; ctx->done = TRUE;
return TRUE; return TRUE;
} }
static void static void
mm_port_serial_set_cached_reply (MMPortSerial *self, port_serial_set_cached_reply (MMPortSerial *self,
const GByteArray *command, const GByteArray *command,
const GByteArray *response) const GByteArray *response)
{ {
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self); MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self);
@@ -521,13 +611,13 @@ mm_port_serial_set_cached_reply (MMPortSerial *self,
} }
static const GByteArray * static const GByteArray *
mm_port_serial_get_cached_reply (MMPortSerial *self, GByteArray *command) port_serial_get_cached_reply (MMPortSerial *self, GByteArray *command)
{ {
return (const GByteArray *) g_hash_table_lookup (MM_PORT_SERIAL_GET_PRIVATE (self)->reply_cache, command); return (const GByteArray *) g_hash_table_lookup (MM_PORT_SERIAL_GET_PRIVATE (self)->reply_cache, command);
} }
static void static void
mm_port_serial_schedule_queue_process (MMPortSerial *self, guint timeout_ms) port_serial_schedule_queue_process (MMPortSerial *self, guint timeout_ms)
{ {
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self); MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self);
@@ -542,30 +632,19 @@ mm_port_serial_schedule_queue_process (MMPortSerial *self, guint timeout_ms)
} }
if (timeout_ms) if (timeout_ms)
priv->queue_id = g_timeout_add (timeout_ms, mm_port_serial_queue_process, self); priv->queue_id = g_timeout_add (timeout_ms, port_serial_queue_process, self);
else else
priv->queue_id = g_idle_add (mm_port_serial_queue_process, self); priv->queue_id = g_idle_add (port_serial_queue_process, self);
}
static gsize
real_handle_response (MMPortSerial *self,
GByteArray *response,
GError *error,
GCallback callback,
gpointer callback_data)
{
MMSerialResponseFn response_callback = (MMSerialResponseFn) callback;
response_callback (self, response, error, callback_data);
return response->len;
} }
static void static void
mm_port_serial_got_response (MMPortSerial *self, GError *error) port_serial_got_response (MMPortSerial *self,
const GError *error)
{ {
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self); MMPortSerialPrivate *priv;
MMQueueData *info; CommandContext *ctx;
gsize consumed = priv->response->len;
priv = MM_PORT_SERIAL_GET_PRIVATE (self);
if (priv->timeout_id) { if (priv->timeout_id) {
g_source_remove (priv->timeout_id); g_source_remove (priv->timeout_id);
@@ -581,36 +660,32 @@ mm_port_serial_got_response (MMPortSerial *self, GError *error)
g_clear_object (&priv->cancellable); g_clear_object (&priv->cancellable);
info = (MMQueueData *) g_queue_pop_head (priv->queue); ctx = (CommandContext *) g_queue_pop_head (priv->queue);
if (info) { if (ctx) {
if (info->cached && !error) if (error)
mm_port_serial_set_cached_reply (self, info->command, priv->response); g_simple_async_result_set_from_error (ctx->result, error);
else {
if (ctx->allow_cached && !error)
port_serial_set_cached_reply (self, ctx->command, priv->response);
if (info->callback) { /* Upon completion, it is a task of the caller to remove from the response
g_warn_if_fail (MM_PORT_SERIAL_GET_CLASS (self)->handle_response != NULL); * buffer the processed data */
consumed = MM_PORT_SERIAL_GET_CLASS (self)->handle_response (self, g_simple_async_result_set_op_res_gpointer (ctx->result,
priv->response, g_byte_array_ref (priv->response),
error, (GDestroyNotify)g_byte_array_unref);
info->callback,
info->user_data);
} }
g_clear_object (&info->cancellable); /* Don't complete in idle. We need the caller remove the response range which
g_byte_array_unref (info->command); * was processed, and that must be done before processing any new queued command */
g_slice_free (MMQueueData, info); command_context_complete_and_free (ctx, FALSE);
} }
if (error)
g_error_free (error);
if (consumed)
g_byte_array_remove_range (priv->response, 0, consumed);
if (!g_queue_is_empty (priv->queue)) if (!g_queue_is_empty (priv->queue))
mm_port_serial_schedule_queue_process (self, 0); port_serial_schedule_queue_process (self, 0);
} }
static gboolean static gboolean
mm_port_serial_timed_out (gpointer data) port_serial_timed_out (gpointer data)
{ {
MMPortSerial *self = MM_PORT_SERIAL (data); MMPortSerial *self = MM_PORT_SERIAL (data);
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self); MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self);
@@ -621,14 +696,14 @@ mm_port_serial_timed_out (gpointer data)
/* Update number of consecutive timeouts found */ /* Update number of consecutive timeouts found */
priv->n_consecutive_timeouts++; priv->n_consecutive_timeouts++;
/* FIXME: This is not completely correct - if the response finally arrives and there's
* some other command waiting for response right now, the other command will
* get the output of the timed out command. Not sure what to do here. */
error = g_error_new_literal (MM_SERIAL_ERROR, error = g_error_new_literal (MM_SERIAL_ERROR,
MM_SERIAL_ERROR_RESPONSE_TIMEOUT, MM_SERIAL_ERROR_RESPONSE_TIMEOUT,
"Serial command timed out"); "Serial command timed out");
port_serial_got_response (self, error);
/* FIXME: This is not completely correct - if the response finally arrives and there's g_error_free (error);
some other command waiting for response right now, the other command will
get the output of the timed out command. Not sure what to do here. */
mm_port_serial_got_response (self, error);
/* Emit a timed out signal, used by upper layers to identify a disconnected /* Emit a timed out signal, used by upper layers to identify a disconnected
* serial port */ * serial port */
@@ -647,33 +722,34 @@ port_serial_response_wait_cancelled (GCancellable *cancellable,
/* We don't want to call disconnect () while in the signal handler */ /* We don't want to call disconnect () while in the signal handler */
priv->cancellable_id = 0; priv->cancellable_id = 0;
/* FIXME: This is not completely correct - if the response finally arrives and there's
* some other command waiting for response right now, the other command will
* get the output of the cancelled command. Not sure what to do here. */
error = g_error_new_literal (MM_CORE_ERROR, error = g_error_new_literal (MM_CORE_ERROR,
MM_CORE_ERROR_CANCELLED, MM_CORE_ERROR_CANCELLED,
"Waiting for the reply cancelled"); "Waiting for the reply cancelled");
port_serial_got_response (self, error);
/* FIXME: This is not completely correct - if the response finally arrives and there's g_error_free (error);
some other command waiting for response right now, the other command will
get the output of the cancelled command. Not sure what to do here. */
mm_port_serial_got_response (self, error);
} }
static gboolean static gboolean
mm_port_serial_queue_process (gpointer data) port_serial_queue_process (gpointer data)
{ {
MMPortSerial *self = MM_PORT_SERIAL (data); MMPortSerial *self = MM_PORT_SERIAL (data);
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self); MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self);
MMQueueData *info; CommandContext *ctx;
GError *error = NULL; GError *error = NULL;
priv->queue_id = 0; priv->queue_id = 0;
info = (MMQueueData *) g_queue_peek_head (priv->queue); ctx = (CommandContext *) g_queue_peek_head (priv->queue);
if (!info) if (!ctx)
return FALSE; return FALSE;
if (info->cached) { if (ctx->allow_cached) {
const GByteArray *cached = mm_port_serial_get_cached_reply (self, info->command); const GByteArray *cached;
cached = port_serial_get_cached_reply (self, ctx->command);
if (cached) { if (cached) {
/* Ensure the response array is fully empty before setting the /* Ensure the response array is fully empty before setting the
* cached response. */ * cached response. */
@@ -686,41 +762,48 @@ mm_port_serial_queue_process (gpointer data)
} }
g_byte_array_append (priv->response, cached->data, cached->len); g_byte_array_append (priv->response, cached->data, cached->len);
mm_port_serial_got_response (self, NULL); port_serial_got_response (self, NULL);
return FALSE;
}
/* Cached reply wasn't found, keep on */
}
/* If error, report it */
if (!port_serial_process_command (self, ctx, &error)) {
port_serial_got_response (self, error);
g_error_free (error);
return FALSE;
}
/* Schedule the next byte of the command to be sent */
if (!ctx->done) {
port_serial_schedule_queue_process (self, priv->send_delay / 1000);
return FALSE;
}
/* Setup the cancellable so that we can stop waiting for a response */
if (ctx->cancellable) {
priv->cancellable = g_object_ref (ctx->cancellable);
priv->cancellable_id = (g_cancellable_connect (
ctx->cancellable,
(GCallback)port_serial_response_wait_cancelled,
self,
NULL));
if (!priv->cancellable_id) {
error = g_error_new (MM_CORE_ERROR,
MM_CORE_ERROR_CANCELLED,
"Won't wait for the reply");
port_serial_got_response (self, error);
g_error_free (error);
return FALSE; return FALSE;
} }
} }
if (mm_port_serial_process_command (self, info, &error)) { /* If the command is finished being sent, schedule the timeout */
if (info->done) { priv->timeout_id = g_timeout_add_seconds (ctx->timeout,
/* setup the cancellable so that we can stop waiting for a response */ port_serial_timed_out,
if (info->cancellable) { self);
priv->cancellable = g_object_ref (info->cancellable);
priv->cancellable_id = (g_cancellable_connect (
info->cancellable,
(GCallback) port_serial_response_wait_cancelled,
self,
NULL));
if (!priv->cancellable_id) {
error = g_error_new (MM_CORE_ERROR,
MM_CORE_ERROR_CANCELLED,
"Won't wait for the reply");
mm_port_serial_got_response (self, error);
return FALSE;
}
}
/* If the command is finished being sent, schedule the timeout */
priv->timeout_id = g_timeout_add_seconds (info->timeout,
mm_port_serial_timed_out,
self);
} else {
/* Schedule the next byte of the command to be sent */
mm_port_serial_schedule_queue_process (self, priv->send_delay / 1000);
}
} else
mm_port_serial_got_response (self, error);
return FALSE; return FALSE;
} }
@@ -746,8 +829,9 @@ data_available (GIOChannel *source,
char buf[SERIAL_BUF_SIZE + 1]; char buf[SERIAL_BUF_SIZE + 1];
gsize bytes_read; gsize bytes_read;
GIOStatus status; GIOStatus status;
MMQueueData *info; CommandContext *ctx;
const char *device; const char *device;
GError *error = NULL;
if (condition & G_IO_HUP) { if (condition & G_IO_HUP) {
device = mm_port_get_device (MM_PORT (self)); device = mm_port_get_device (MM_PORT (self));
@@ -766,22 +850,20 @@ data_available (GIOChannel *source,
} }
/* Don't read any input if the current command isn't done being sent yet */ /* Don't read any input if the current command isn't done being sent yet */
info = g_queue_peek_nth (priv->queue, 0); ctx = g_queue_peek_nth (priv->queue, 0);
if (info && (info->started == TRUE) && (info->done == FALSE)) if (ctx && (ctx->started == TRUE) && (ctx->done == FALSE))
return TRUE; return TRUE;
do { do {
GError *err = NULL;
bytes_read = 0; bytes_read = 0;
status = g_io_channel_read_chars (source, buf, SERIAL_BUF_SIZE, &bytes_read, &err); status = g_io_channel_read_chars (source, buf, SERIAL_BUF_SIZE, &bytes_read, &error);
if (status == G_IO_STATUS_ERROR) { if (status == G_IO_STATUS_ERROR) {
if (err && err->message) { if (error) {
mm_warn ("(%s): read error: %s", mm_warn ("(%s): read error: %s",
mm_port_get_device (MM_PORT (self)), mm_port_get_device (MM_PORT (self)),
err->message); error->message);
} }
g_clear_error (&err); g_clear_error (&error);
} }
/* If no bytes read, just let g_io_channel wait for more data */ /* If no bytes read, just let g_io_channel wait for more data */
@@ -799,10 +881,14 @@ data_available (GIOChannel *source,
g_byte_array_remove_range (priv->response, 0, (SERIAL_BUF_SIZE / 2)); g_byte_array_remove_range (priv->response, 0, (SERIAL_BUF_SIZE / 2));
} }
if (parse_response (self, priv->response, &err)) { /* Parse response. Returns TRUE either if an error is provided or if
* we really have the response to process. */
if (parse_response (self, priv->response, &error)) {
/* Reset number of consecutive timeouts only here */ /* Reset number of consecutive timeouts only here */
priv->n_consecutive_timeouts = 0; priv->n_consecutive_timeouts = 0;
mm_port_serial_got_response (self, err); /* Process response retrieved */
port_serial_got_response (self, error);
g_clear_error (&error);
} }
} while ( (bytes_read == SERIAL_BUF_SIZE || status == G_IO_STATUS_AGAIN) } while ( (bytes_read == SERIAL_BUF_SIZE || status == G_IO_STATUS_AGAIN)
&& (priv->watch_id > 0)); && (priv->watch_id > 0));
@@ -1085,31 +1171,15 @@ mm_port_serial_close (MMPortSerial *self)
/* Clear the command queue */ /* Clear the command queue */
for (i = 0; i < g_queue_get_length (priv->queue); i++) { for (i = 0; i < g_queue_get_length (priv->queue); i++) {
MMQueueData *item = g_queue_peek_nth (priv->queue, i); CommandContext *ctx;
if (item->callback) { ctx = g_queue_peek_nth (priv->queue, i);
GError *error; g_simple_async_result_set_error (ctx->result,
GByteArray *response; MM_SERIAL_ERROR,
g_warn_if_fail (MM_PORT_SERIAL_GET_CLASS (self)->handle_response != NULL);
error = g_error_new_literal (MM_SERIAL_ERROR,
MM_SERIAL_ERROR_SEND_FAILED, MM_SERIAL_ERROR_SEND_FAILED,
"Serial port is now closed"); "Serial port is now closed");
response = g_byte_array_sized_new (1); g_simple_async_result_complete (ctx->result);
g_byte_array_append (response, (const guint8 *) "\0", 1); command_context_complete_and_free (ctx, FALSE);
MM_PORT_SERIAL_GET_CLASS (self)->handle_response (self,
response,
error,
item->callback,
item->user_data);
g_error_free (error);
g_byte_array_unref (response);
}
g_clear_object (&item->cancellable);
g_byte_array_unref (item->command);
g_slice_free (MMQueueData, item);
} }
g_queue_clear (priv->queue); g_queue_clear (priv->queue);
@@ -1168,86 +1238,6 @@ port_serial_close_force (MMPortSerial *self)
g_signal_emit (self, signals[FORCED_CLOSE], 0); g_signal_emit (self, signals[FORCED_CLOSE], 0);
} }
static void
internal_queue_command (MMPortSerial *self,
GByteArray *command,
gboolean take_command,
gboolean cached,
guint32 timeout_seconds,
GCancellable *cancellable,
MMSerialResponseFn callback,
gpointer user_data)
{
MMPortSerialPrivate *priv = MM_PORT_SERIAL_GET_PRIVATE (self);
MMQueueData *info;
g_return_if_fail (MM_IS_PORT_SERIAL (self));
g_return_if_fail (command != NULL);
if (priv->open_count == 0) {
GError *error = g_error_new_literal (MM_SERIAL_ERROR,
MM_SERIAL_ERROR_SEND_FAILED,
"Sending command failed: device is not enabled");
if (callback)
callback (self, NULL, error, user_data);
g_error_free (error);
return;
}
info = g_slice_new0 (MMQueueData);
if (take_command)
info->command = command;
else {
info->command = g_byte_array_sized_new (command->len);
g_byte_array_append (info->command, command->data, command->len);
}
/* Only accept about 3 seconds of EAGAIN for this command */
if (priv->send_delay)
info->eagain_count = 3000000 / priv->send_delay;
else
info->eagain_count = 1000;
info->cached = cached;
info->timeout = timeout_seconds;
info->cancellable = (cancellable ? g_object_ref (cancellable) : NULL);
info->callback = (GCallback) callback;
info->user_data = user_data;
/* Clear the cached value for this command if not asking for cached value */
if (!cached)
mm_port_serial_set_cached_reply (self, info->command, NULL);
g_queue_push_tail (priv->queue, info);
if (g_queue_get_length (priv->queue) == 1)
mm_port_serial_schedule_queue_process (self, 0);
}
void
mm_port_serial_queue_command (MMPortSerial *self,
GByteArray *command,
gboolean take_command,
guint32 timeout_seconds,
GCancellable *cancellable,
MMSerialResponseFn callback,
gpointer user_data)
{
internal_queue_command (self, command, take_command, FALSE, timeout_seconds, cancellable, callback, user_data);
}
void
mm_port_serial_queue_command_cached (MMPortSerial *self,
GByteArray *command,
gboolean take_command,
guint32 timeout_seconds,
GCancellable *cancellable,
MMSerialResponseFn callback,
gpointer user_data)
{
internal_queue_command (self, command, take_command, TRUE, timeout_seconds, cancellable, callback, user_data);
}
/*****************************************************************************/ /*****************************************************************************/
/* Reopen */ /* Reopen */
@@ -1820,7 +1810,6 @@ mm_port_serial_class_init (MMPortSerialClass *klass)
object_class->finalize = finalize; object_class->finalize = finalize;
klass->config_fd = real_config_fd; klass->config_fd = real_config_fd;
klass->handle_response = real_handle_response;
/* Properties */ /* Properties */
g_object_class_install_property g_object_class_install_property

View File

@@ -43,11 +43,6 @@
typedef struct _MMPortSerial MMPortSerial; typedef struct _MMPortSerial MMPortSerial;
typedef struct _MMPortSerialClass MMPortSerialClass; typedef struct _MMPortSerialClass MMPortSerialClass;
typedef void (*MMSerialResponseFn) (MMPortSerial *port,
GByteArray *response,
GError *error,
gpointer user_data);
struct _MMPortSerial { struct _MMPortSerial {
MMPort parent; MMPort parent;
@@ -73,16 +68,6 @@ struct _MMPortSerialClass {
GByteArray *response, GByteArray *response,
GError **error); GError **error);
/* Called after parsing to allow the command response to be delivered to
* it's callback to be handled. Returns the # of bytes of the response
* consumed.
*/
gsize (*handle_response) (MMPortSerial *self,
GByteArray *response,
GError *error,
GCallback callback,
gpointer callback_data);
/* Called to configure the serial port fd after it's opened. On error, should /* Called to configure the serial port fd after it's opened. On error, should
* return FALSE and set 'error' as appropriate. * return FALSE and set 'error' as appropriate.
*/ */
@@ -139,20 +124,15 @@ void mm_port_serial_flash_cancel (MMPortSerial *self);
gboolean mm_port_serial_get_flash_ok (MMPortSerial *self); gboolean mm_port_serial_get_flash_ok (MMPortSerial *self);
void mm_port_serial_queue_command (MMPortSerial *self, void mm_port_serial_command (MMPortSerial *self,
GByteArray *command, GByteArray *command,
gboolean take_command,
guint32 timeout_seconds, guint32 timeout_seconds,
gboolean allow_cached,
GCancellable *cancellable, GCancellable *cancellable,
MMSerialResponseFn callback, GAsyncReadyCallback callback,
gpointer user_data); gpointer user_data);
GByteArray *mm_port_serial_command_finish (MMPortSerial *self,
void mm_port_serial_queue_command_cached (MMPortSerial *self, GAsyncResult *res,
GByteArray *command, GError **error);
gboolean take_command,
guint32 timeout_seconds,
GCancellable *cancellable,
MMSerialResponseFn callback,
gpointer user_data);
#endif /* MM_PORT_SERIAL_H */ #endif /* MM_PORT_SERIAL_H */