[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r34054 - in gnunet/src: include sensor sensordashboard
From: |
gnunet |
Subject: |
[GNUnet-SVN] r34054 - in gnunet/src: include sensor sensordashboard |
Date: |
Fri, 25 Jul 2014 13:42:08 +0200 |
Author: otarabai
Date: 2014-07-25 13:42:08 +0200 (Fri, 25 Jul 2014)
New Revision: 34054
Modified:
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_sensor_util_lib.h
gnunet/src/sensor/gnunet-service-sensor-update.c
gnunet/src/sensor/gnunet-service-sensor.c
gnunet/src/sensordashboard/gnunet-service-sensordashboard.c
Log:
sensor: towards update functionality
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2014-07-25 10:27:43 UTC (rev
34053)
+++ gnunet/src/include/gnunet_protocols.h 2014-07-25 11:42:08 UTC (rev
34054)
@@ -2434,9 +2434,9 @@
#define GNUNET_MESSAGE_TYPE_SENSOR_LIST_REQ 805
/**
- * Sensor list sent from update point to requesting peer
+ * Messsage carrying brief sensor information (name, version)
*/
-#define GNUNET_MESSAGE_TYPE_SENSOR_LIST 806
+#define GNUNET_MESSAGE_TYPE_SENSOR_BRIEF 806
/*******************************************************************************
Modified: gnunet/src/include/gnunet_sensor_util_lib.h
===================================================================
--- gnunet/src/include/gnunet_sensor_util_lib.h 2014-07-25 10:27:43 UTC (rev
34053)
+++ gnunet/src/include/gnunet_sensor_util_lib.h 2014-07-25 11:42:08 UTC (rev
34054)
@@ -187,34 +187,6 @@
};
-/**
- * Carries a single reading from a sensor
- */
-struct GNUNET_SENSOR_Reading
-{
-
- /**
- * Sensor this reading is related to
- */
- struct GNUNET_SENSOR_SensorInfo *sensor;
-
- /**
- * Timestamp of taking the reading
- */
- uint64_t timestamp;
-
- /**
- * Reading value
- */
- void *value;
-
- /**
- * Size of @e value
- */
- uint16_t value_size;
-
-};
-
GNUNET_NETWORK_STRUCT_BEGIN
/**
@@ -258,6 +230,34 @@
};
+/**
+ * Used to communicate brief information about a sensor.
+ */
+struct GNUNET_SENSOR_SensorBriefMessage
+{
+
+ /**
+ * GNUNET general message header.
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Size of sensor name string, allocated at position 0 after this struct.
+ */
+ uint16_t name_size;
+
+ /**
+ * First part of sensor version number
+ */
+ uint16_t version_major;
+
+ /**
+ * Second part of sensor version number
+ */
+ uint16_t version_minor;
+
+};
+
GNUNET_NETWORK_STRUCT_END
@@ -273,7 +273,7 @@
/*
* Get path to the directory containing the sensor definition files
*
- * @return sensor files directory
+ * @return sensor files directory string
*/
char *
GNUNET_SENSOR_get_sensor_dir ();
Modified: gnunet/src/sensor/gnunet-service-sensor-update.c
===================================================================
--- gnunet/src/sensor/gnunet-service-sensor-update.c 2014-07-25 10:27:43 UTC
(rev 34053)
+++ gnunet/src/sensor/gnunet-service-sensor-update.c 2014-07-25 11:42:08 UTC
(rev 34054)
@@ -35,12 +35,12 @@
/**
* Interval at which to contact update points for new sensor updates.
*/
-//#define SENSOR_UPDATE_CHECK_INTERVAL GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_DAYS, 1)
+#define SENSOR_UPDATE_CHECK_INTERVAL GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_DAYS, 1)
/**
- * When connecting to update points fail, retry after...
+ * Interval at which to retry contacting update point if we were busy.
*/
-//#define SENSOR_UPDATE_RETRY GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_MINUTES, 5)
+#define SENSOR_UPDATE_CHECK_RETRY GNUNET_TIME_relative_multiply
(GNUNET_TIME_UNIT_HOURS, 1)
/**
@@ -79,6 +79,11 @@
*/
int expecting_sensor_list;
+ /**
+ * Did a failure occur while dealing with this update point before?
+ */
+ int failed;
+
};
@@ -107,8 +112,24 @@
*/
static struct GNUNET_CADET_Handle *cadet;
+/**
+ * Are we in the process of checking and updating sensors?
+ */
+static int updating; //TODO: when done, set to #GNUNET_NO and destroy channel
+
/**
+ * Contact update points to check for new updates
+ *
+ * @param cls unused
+ * @param tc GNUnet scheduler task context
+ */
+static void
+check_for_updates (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
* Cleanup update point context. This does not destroy the struct itself.
*
* @param up UpdatePoint struct
@@ -164,8 +185,30 @@
static void
fail ()
{
+ struct UpdatePoint *up;
+
cleanup_updatepoint (up_default);
- //TODO:
+ if (up_default == up_tail)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "All defined update points failed. Will retry again in %s.\n",
+ GNUNET_STRINGS_relative_time_to_string (SENSOR_UPDATE_CHECK_INTERVAL,
+ GNUNET_NO));
+ up = up_head;
+ while (NULL != up)
+ {
+ up->failed = GNUNET_NO;
+ up = up->next;
+ }
+ GNUNET_SCHEDULER_add_delayed (SENSOR_UPDATE_CHECK_INTERVAL,
+ &check_for_updates, NULL);
+ return;
+ }
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Update point `%s' failed, trying next one now.\n",
+ GNUNET_i2s (&up_default->peer_id));
+ up_default = up_default->next;
+ GNUNET_SCHEDULER_add_now (&check_for_updates, NULL);
}
@@ -187,6 +230,8 @@
struct GNUNET_MessageHeader *msg;
size_t msg_size;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending sensor list request now.\n");
up_default->sensor_list_req_th = NULL;
if (NULL == buf)
{
@@ -215,6 +260,18 @@
{
if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
return;
+ if (GNUNET_YES == updating)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Update process still running and update interval already exhausted."
+ "Retrying in %s.\n",
+ GNUNET_STRINGS_relative_time_to_string (SENSOR_UPDATE_CHECK_RETRY,
+ GNUNET_NO));
+ GNUNET_SCHEDULER_add_delayed (SENSOR_UPDATE_CHECK_RETRY,
+ &check_for_updates, NULL);
+ return;
+ }
+ updating = GNUNET_YES;
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Checking for sensor updates.\n");
GNUNET_assert (NULL != up_default);
@@ -239,6 +296,8 @@
GNUNET_TIME_UNIT_FOREVER_REL,
sizeof (struct GNUNET_MessageHeader),
&do_send_sensor_list_req, NULL);
+ GNUNET_SCHEDULER_add_delayed (SENSOR_UPDATE_CHECK_INTERVAL,
+ &check_for_updates, NULL);
}
@@ -296,6 +355,7 @@
up->ch = NULL;
up->sensor_list_req_th = NULL;
up->expecting_sensor_list = GNUNET_NO;
+ up->failed = GNUNET_NO;
GNUNET_CONTAINER_DLL_insert (up_head, up_tail, up);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Loaded update point `%s'.\n",
@@ -316,20 +376,67 @@
* #GNUNET_SYSERR to close it (signal serious error).
*/
static int
-handle_sensor_list (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
+handle_sensor_brief (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **channel_ctx,
+ const struct GNUNET_MessageHeader *message)
{
+ struct GNUNET_SENSOR_SensorBriefMessage *sbm;
+
GNUNET_assert (*channel_ctx == up_default);
GNUNET_assert (GNUNET_YES == up_default->expecting_sensor_list);
- up_default->expecting_sensor_list = GNUNET_NO;
- //TODO
+ if (GNUNET_MESSAGE_TYPE_SENSOR_END == ntohs (message->type))
+ {
+ up_default->expecting_sensor_list = GNUNET_NO;
+ //TODO: cleanup
+ updating = GNUNET_NO; //FIXME: should not be here, only for testing
+ }
+ else
+ {
+ sbm = (struct GNUNET_SENSOR_SensorBriefMessage *)message;
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "Sensor brief: %.*s %d.%d\n",
+ ntohs (sbm->name_size),
+ &sbm[1],
+ ntohs (sbm->version_major),
+ ntohs (sbm->version_minor));
+ }
+ GNUNET_CADET_receive_done (channel);
return GNUNET_OK;
}
/**
+ * Function called whenever a channel is destroyed. Should clean up
+ * any associated state.
+ *
+ * It must NOT call #GNUNET_CADET_channel_destroy on the channel.
+ *
+ * @param cls closure (set from #GNUNET_CADET_connect)
+ * @param channel connection to the other end (henceforth invalid)
+ * @param channel_ctx place where local state associated
+ * with the channel is stored
+ */
+static void
+cadet_channel_destroyed (void *cls,
+ const struct GNUNET_CADET_Channel *channel,
+ void *channel_ctx)
+{
+ struct UpdatePoint *up = channel_ctx;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
+ "CADET Channel destroyed callback triggered.\n");
+ up->ch = NULL;
+ if (GNUNET_YES == updating)
+ {
+ fail ();
+ return;
+ }
+ cleanup_updatepoint (up);
+}
+
+
+/**
* Start the sensor update module
*
* @param c our service configuration
@@ -341,7 +448,8 @@
struct GNUNET_CONTAINER_MultiHashMap *sensors)
{
static struct GNUNET_CADET_MessageHandler cadet_handlers[] = {
- {&handle_sensor_list, GNUNET_MESSAGE_TYPE_SENSOR_LIST, 0},
+ {&handle_sensor_brief, GNUNET_MESSAGE_TYPE_SENSOR_BRIEF, 0},
+ {&handle_sensor_brief, GNUNET_MESSAGE_TYPE_SENSOR_END, 0},
{NULL, 0, 0}
};
@@ -350,7 +458,7 @@
cadet = GNUNET_CADET_connect(cfg,
NULL,
NULL,
- NULL,
+ &cadet_channel_destroyed,
cadet_handlers,
NULL);
if (NULL == cadet)
@@ -367,6 +475,7 @@
return GNUNET_SYSERR;
}
up_default = up_head;
+ updating = GNUNET_NO;
GNUNET_SCHEDULER_add_now (&check_for_updates, NULL);
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sensor update module started.\n");
Modified: gnunet/src/sensor/gnunet-service-sensor.c
===================================================================
--- gnunet/src/sensor/gnunet-service-sensor.c 2014-07-25 10:27:43 UTC (rev
34053)
+++ gnunet/src/sensor/gnunet-service-sensor.c 2014-07-25 11:42:08 UTC (rev
34054)
@@ -92,7 +92,7 @@
shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
- //SENSOR_update_stop ();
+ SENSOR_update_stop ();
SENSOR_reporting_stop ();
SENSOR_analysis_stop ();
GNUNET_SENSOR_destroy_sensors (sensors);
@@ -595,7 +595,7 @@
schedule_all_sensors();
SENSOR_analysis_start(c, sensors);
SENSOR_reporting_start(c, sensors);
- //SENSOR_update_start (c, sensors);
+ SENSOR_update_start (c, sensors);
statistics = GNUNET_STATISTICS_create("sensor", cfg);
GNUNET_CRYPTO_get_peer_identity(cfg, &peerid);
peerstore = GNUNET_PEERSTORE_connect(cfg);
Modified: gnunet/src/sensordashboard/gnunet-service-sensordashboard.c
===================================================================
--- gnunet/src/sensordashboard/gnunet-service-sensordashboard.c 2014-07-25
10:27:43 UTC (rev 34053)
+++ gnunet/src/sensordashboard/gnunet-service-sensordashboard.c 2014-07-25
11:42:08 UTC (rev 34054)
@@ -60,24 +60,90 @@
struct GNUNET_CADET_Channel *ch;
/**
+ * CADET transmit handle if we requested a transmission
+ */
+ struct GNUNET_CADET_TransmitHandle *th;
+
+ /**
+ * Head of DLL of pending messages to be sent to client
+ */
+ struct PendingMessage *pm_head;
+
+ /**
+ * Tail of DLL of pending messages to be sent to client
+ */
+ struct PendingMessage *pm_tail;
+
+ /**
* Are we in the process of destroying this context?
*/
int destroying;
};
+/**
+ * Message queued to be sent to a client stored in a DLL
+ */
+struct PendingMessage
+{
+ /**
+ * DLL
+ */
+ struct PendingMessage *prev;
+
+ /**
+ * DLL
+ */
+ struct PendingMessage *next;
+
+ /**
+ * Actual queued message
+ */
+ struct GNUNET_MessageHeader *msg;
+
+};
+
/**
- * Handle to CADET service
+ * Carries a single reading from a sensor
*/
-static struct GNUNET_CADET_Handle *cadet;
+struct ClientSensorReading
+{
+ /**
+ * Sensor this reading is related to
+ */
+ struct GNUNET_SENSOR_SensorInfo *sensor;
+
+ /**
+ * Timestamp of taking the reading
+ */
+ uint64_t timestamp;
+
+ /**
+ * Reading value
+ */
+ void *value;
+
+ /**
+ * Size of @e value
+ */
+ uint16_t value_size;
+
+};
+
+
/**
* Global hashmap of defined sensors
*/
static struct GNUNET_CONTAINER_MultiHashMap *sensors;
/**
+ * Handle to CADET service
+ */
+static struct GNUNET_CADET_Handle *cadet;
+
+/**
* Handle to the peerstore service connection
*/
static struct GNUNET_PEERSTORE_Handle *peerstore;
@@ -99,6 +165,15 @@
/**
+ * Trigger sending next pending message to the given client peer if any.
+ *
+ * @param cp client peer context struct
+ */
+static void
+trigger_send_next_msg (struct ClientPeerContext *cp);
+
+
+/**
* Destroy a given client peer context
*
* @param cp client peer context
@@ -106,7 +181,22 @@
static void
destroy_clientpeer (struct ClientPeerContext *cp)
{
+ struct PendingMessage *pm;
+
cp->destroying = GNUNET_YES;
+ if (NULL != cp->th)
+ {
+ GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
+ cp->th = NULL;
+ }
+ pm = cp->pm_head;
+ while (NULL != pm)
+ {
+ GNUNET_CONTAINER_DLL_remove (cp->pm_head, cp->pm_tail, pm);
+ GNUNET_free (pm->msg);
+ GNUNET_free (pm);
+ pm = cp->pm_head;
+ }
if (NULL != cp->ch)
{
GNUNET_CADET_channel_destroy (cp->ch);
@@ -201,6 +291,9 @@
{
struct ClientPeerContext *cp;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received a channel connection from peer `%s'.\n",
+ GNUNET_i2s (initiator));
cp = GNUNET_new (struct ClientPeerContext);
cp->peerid = *initiator;
cp->ch = channel;
@@ -211,13 +304,168 @@
/**
+ * Function called to notify a client about the connection begin ready
+ * to queue more data. @a buf will be NULL and @a size zero if the
+ * connection was closed for writing in the meantime.
+ *
+ * Perform the actual sending of the message to client peer.
+ *
+ * @param cls closure, a `struct ClientPeerContext *`
+ * @param size number of bytes available in @a buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to @a buf
+ */
+static size_t
+do_send_msg (void *cls, size_t size, void *buf)
+{
+ struct ClientPeerContext *cp = cls;
+ struct PendingMessage *pm;
+ size_t msg_size;
+
+ cp->th = NULL;
+ pm = cp->pm_head;
+ msg_size = ntohs (pm->msg->size);
+ GNUNET_CONTAINER_DLL_remove (cp->pm_head, cp->pm_tail, pm);
+ if (NULL == buf || size < msg_size)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Error trying to send a message to peer `%s'.\n"),
+ GNUNET_i2s (&cp->peerid));
+ return 0;
+ }
+ memcpy (buf, pm->msg, msg_size);
+ GNUNET_free (pm->msg);
+ GNUNET_free (pm);
+ trigger_send_next_msg (cp);
+ return msg_size;
+}
+
+
+/**
+ * Trigger sending next pending message to the given client peer if any.
+ *
+ * @param cp client peer context struct
+ */
+static void
+trigger_send_next_msg (struct ClientPeerContext *cp)
+{
+ struct PendingMessage *pm;
+
+ if (NULL == cp->pm_head)
+ return;
+ if (NULL != cp->th)
+ return;
+ pm = cp->pm_head;
+ cp->th = GNUNET_CADET_notify_transmit_ready (cp->ch,
+ GNUNET_YES,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ ntohs (pm->msg->size),
+ &do_send_msg,
+ cp);
+}
+
+
+/**
+ * Add a new message to the queue to be sent to the given client peer.
+ *
+ * @param msg Message to be queued
+ * @param cp Client peer context
+ */
+static void
+queue_msg (struct GNUNET_MessageHeader *msg, struct ClientPeerContext *cp)
+{
+ struct PendingMessage *pm;
+
+ pm = GNUNET_new (struct PendingMessage);
+ pm->msg = msg;
+ GNUNET_CONTAINER_DLL_insert_tail (cp->pm_head, cp->pm_tail, pm);
+ trigger_send_next_msg (cp);
+}
+
+
+/**
+ * Iterate over defined sensors, creates and sends brief sensor information to
+ * given client peer over CADET.
+ *
+ * @param cls closure, the client peer
+ * @param key sensor key
+ * @param value sensor value
+ * @return #GNUNET_YES to continue iteration
+ */
+static int
+send_sensor_brief (void *cls,
+ const struct GNUNET_HashCode *key,
+ void *value)
+{
+ struct ClientPeerContext *cp = cls;
+ struct GNUNET_SENSOR_SensorInfo *sensor = value;
+ struct GNUNET_SENSOR_SensorBriefMessage *msg;
+ uint16_t sensorname_size;
+ uint16_t total_size;
+
+ /* Create message struct */
+ sensorname_size = strlen (sensor->name) + 1;
+ total_size = sizeof (struct GNUNET_SENSOR_SensorBriefMessage) +
+ sensorname_size;
+ msg = GNUNET_malloc (total_size);
+ msg->header.size = htons (total_size);
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_SENSOR_BRIEF);
+ msg->name_size = htons (sensorname_size);
+ msg->version_major = htons (sensor->version_major);
+ msg->version_minor = htons (sensor->version_minor);
+ memcpy (&msg[1], sensor->name, sensorname_size);
+ /* Queue the msg */
+ queue_msg ((struct GNUNET_MessageHeader *)msg, cp);
+ return GNUNET_YES;
+}
+
+
+/**
+ * Called with any sensor list request received.
+ *
+ * Each time the function must call #GNUNET_CADET_receive_done on the channel
+ * in order to receive the next message. This doesn't need to be immediate:
+ * can be delayed if some processing is done on the message.
+ *
+ * @param cls Closure (set from #GNUNET_CADET_connect).
+ * @param channel Connection to the other end.
+ * @param channel_ctx Place to store local state associated with the channel.
+ * @param message The actual message.
+ * @return #GNUNET_OK to keep the channel open,
+ * #GNUNET_SYSERR to close it (signal serious error).
+ */
+static int
+handle_sensor_list_req (void *cls,
+ struct GNUNET_CADET_Channel *channel,
+ void **channel_ctx,
+ const struct GNUNET_MessageHeader *message)
+{
+ struct ClientPeerContext *cp = *channel_ctx;
+ struct GNUNET_MessageHeader *end_msg;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Received a sensor list request from peer `%s'.\n",
+ GNUNET_i2s (&cp->peerid));
+ GNUNET_CONTAINER_multihashmap_iterate (sensors,
+ &send_sensor_brief,
+ cp);
+ end_msg = GNUNET_new (struct GNUNET_MessageHeader);
+ end_msg->size = htons (sizeof (struct GNUNET_MessageHeader));
+ end_msg->type = htons (GNUNET_MESSAGE_TYPE_SENSOR_END);
+ queue_msg (end_msg, cp);
+ GNUNET_CADET_receive_done (channel);
+ return GNUNET_OK;
+}
+
+
+/**
* Parses a sensor reading message struct
*
* @param msg message header received
* @param sensors multihashmap of loaded sensors
* @return sensor reading struct or NULL if error
*/
-static struct GNUNET_SENSOR_Reading *
+static struct ClientSensorReading *
parse_reading_message (const struct GNUNET_MessageHeader *msg,
struct GNUNET_CONTAINER_MultiHashMap *sensors)
{
@@ -229,7 +477,7 @@
char *sensorname;
struct GNUNET_HashCode key;
struct GNUNET_SENSOR_SensorInfo *sensor;
- struct GNUNET_SENSOR_Reading *reading;
+ struct ClientSensorReading *reading;
msg_size = ntohs (msg->size);
if (msg_size < sizeof (struct GNUNET_SENSOR_ReadingMessage))
@@ -272,7 +520,7 @@
"Invalid value size for a numerical sensor.\n");
return NULL;
}
- reading = GNUNET_new (struct GNUNET_SENSOR_Reading);
+ reading = GNUNET_new (struct ClientSensorReading);
reading->sensor = sensor;
reading->timestamp = GNUNET_be64toh (rm->timestamp);
reading->value_size = value_size;
@@ -304,7 +552,7 @@
const struct GNUNET_MessageHeader *message)
{
struct ClientPeerContext *cp = *channel_ctx;
- struct GNUNET_SENSOR_Reading *reading;
+ struct ClientSensorReading *reading;
reading = parse_reading_message (message, sensors);
if (NULL == reading)
@@ -335,32 +583,6 @@
/**
- * Called with any sensor list request received.
- *
- * Each time the function must call #GNUNET_CADET_receive_done on the channel
- * in order to receive the next message. This doesn't need to be immediate:
- * can be delayed if some processing is done on the message.
- *
- * @param cls Closure (set from #GNUNET_CADET_connect).
- * @param channel Connection to the other end.
- * @param channel_ctx Place to store local state associated with the channel.
- * @param message The actual message.
- * @return #GNUNET_OK to keep the channel open,
- * #GNUNET_SYSERR to close it (signal serious error).
- */
-static int
-handle_sensor_list_req (void *cls,
- struct GNUNET_CADET_Channel *channel,
- void **channel_ctx,
- const struct GNUNET_MessageHeader *message)
-{
- //TODO
- GNUNET_CADET_receive_done (channel);
- return GNUNET_OK;
-}
-
-
-/**
* Process sensordashboard requests.
*
* @param cls closure
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r34054 - in gnunet/src: include sensor sensordashboard,
gnunet <=