[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r34298 - gnunet/src/sensor
From: |
gnunet |
Subject: |
[GNUnet-SVN] r34298 - gnunet/src/sensor |
Date: |
Sat, 20 Sep 2014 19:42:17 +0200 |
Author: otarabai
Date: 2014-09-20 19:42:17 +0200 (Sat, 20 Sep 2014)
New Revision: 34298
Modified:
gnunet/src/sensor/gnunet-service-sensor_reporting.c
Log:
sensor: retry reporting to collection point if fails
Modified: gnunet/src/sensor/gnunet-service-sensor_reporting.c
===================================================================
--- gnunet/src/sensor/gnunet-service-sensor_reporting.c 2014-09-18 17:10:50 UTC
(rev 34297)
+++ gnunet/src/sensor/gnunet-service-sensor_reporting.c 2014-09-20 17:42:17 UTC
(rev 34298)
@@ -33,7 +33,12 @@
#define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
+/**
+ * Retry time when failing to connect to collection point
+ */
+#define CP_RETRY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 1)
+
/**
* When we are still generating a proof-of-work and we need to send an anomaly
* report, we queue them until the generation is complete
@@ -225,6 +230,16 @@
struct GNUNET_MQ_Handle *mq;
/**
+ * CADET transmit handle
+ */
+ struct GNUNET_CADET_TransmitHandle *th;
+
+ /**
+ * Task used to try reconnection to collection point after failure
+ */
+ GNUNET_SCHEDULER_TaskIdentifier reconnect_task;
+
+ /**
* Are we currently destroying the channel and its context?
*/
int destroying;
@@ -324,6 +339,13 @@
+/**
+ * Try reconnecting to collection point and send last queued message
+ */
+static void
+cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
/******************************************************************************/
/****************************** CLEANUP
******************************/
/******************************************************************************/
@@ -428,6 +450,11 @@
destroy_cadet_peer (struct CadetPeer *cadetp)
{
cadetp->destroying = GNUNET_YES;
+ if (GNUNET_SCHEDULER_NO_TASK != cadetp->reconnect_task)
+ {
+ GNUNET_SCHEDULER_cancel (cadetp->reconnect_task);
+ cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+ }
if (NULL != cadetp->mq)
{
GNUNET_MQ_destroy (cadetp->mq);
@@ -536,6 +563,142 @@
/**
+ * Function called to notify a client about the connection
+ * begin ready to queue more data. "buf" will be
+ * NULL and "size" zero if the connection was closed for
+ * writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t
+cp_mq_ntr (void *cls, size_t size, void *buf)
+{
+ struct CadetPeer *cadetp = cls;
+ const struct GNUNET_MessageHeader *msg = GNUNET_MQ_impl_current (cadetp->mq);
+ uint16_t msize;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_ntr()\n");
+ cadetp->th = NULL;
+ if (NULL == buf)
+ {
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Sending anomaly report to collection point failed."
+ " Retrying connection in %s.\n",
+ GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
+ cadetp->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
+ return 0;
+ }
+ msize = ntohs (msg->size);
+ GNUNET_assert (msize <= size);
+ memcpy (buf, msg, msize);
+ GNUNET_MQ_impl_send_continue (cadetp->mq);
+ return msize;
+}
+
+
+/**
+ * Try reconnecting to collection point and send last queued message
+ */
+static void
+cp_reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct CadetPeer *cadetp = cls;
+ const struct GNUNET_MessageHeader *msg;
+
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Retrying connection to collection point `%s'.\n",
+ GNUNET_i2s (&cadetp->peer_id));
+ cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_assert (NULL == cadetp->channel);
+ cadetp->channel =
+ GNUNET_CADET_channel_create (cadet, cadetp, &cadetp->peer_id,
+ GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
+ GNUNET_CADET_OPTION_RELIABLE);
+ msg = GNUNET_MQ_impl_current (cadetp->mq);
+ cadetp->th =
+ GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ ntohs (msg->size), cp_mq_ntr,
cadetp);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * sending functionality of a message queue.
+ *
+ * @param mq the message queue
+ * @param msg the message to send
+ * @param impl_state state of the implementation
+ */
+static void
+cp_mq_send_impl (struct GNUNET_MQ_Handle *mq,
+ const struct GNUNET_MessageHeader *msg, void *impl_state)
+{
+ struct CadetPeer *cadetp = impl_state;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_send_impl()\n");
+ GNUNET_assert (NULL == cadetp->th);
+ if (NULL == cadetp->channel)
+ {
+ LOG (GNUNET_ERROR_TYPE_INFO,
+ "Sending anomaly report to collection point failed."
+ " Retrying connection in %s.\n",
+ GNUNET_STRINGS_relative_time_to_string (CP_RETRY, GNUNET_NO));
+ cadetp->reconnect_task =
+ GNUNET_SCHEDULER_add_delayed (CP_RETRY, &cp_reconnect, cadetp);
+ return;
+ }
+ cadetp->th =
+ GNUNET_CADET_notify_transmit_ready (cadetp->channel, GNUNET_NO,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ ntohs (msg->size), cp_mq_ntr,
cadetp);
+}
+
+
+/**
+ * Signature of functions implementing the
+ * destruction of a message queue.
+ * Implementations must not free 'mq', but should
+ * take care of 'impl_state'.
+ *
+ * @param mq the message queue to destroy
+ * @param impl_state state of the implementation
+ */
+static void
+cp_mq_destroy_impl (struct GNUNET_MQ_Handle *mq, void *impl_state)
+{
+ struct CadetPeer *cp = impl_state;
+
+ LOG (GNUNET_ERROR_TYPE_DEBUG, "cp_mq_destroy_impl()\n");
+ if (NULL != cp->th)
+ {
+ GNUNET_CADET_notify_transmit_ready_cancel (cp->th);
+ cp->th = NULL;
+ }
+}
+
+
+/**
+ * Create the message queue used to send messages to a collection point.
+ * This will be used to make sure that the message are queued even if the
+ * connection to the collection point can not be established at the moment.
+ *
+ * @param cp CadetPeer information struct
+ * @return Message queue handle
+ */
+static struct GNUNET_MQ_Handle *
+cp_mq_create (struct CadetPeer *cp)
+{
+ return GNUNET_MQ_queue_for_callbacks (cp_mq_send_impl, cp_mq_destroy_impl,
+ NULL, cp, NULL, NULL, NULL);
+}
+
+
+/**
* Returns context of a connected CADET peer.
* Creates it first if didn't exist before.
*
@@ -563,7 +726,8 @@
GNUNET_CADET_channel_create (cadet, cadetp, &pid,
GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
GNUNET_CADET_OPTION_RELIABLE);
- cadetp->mq = GNUNET_CADET_mq_create (cadetp->channel);
+ cadetp->mq = cp_mq_create (cadetp);
+ cadetp->reconnect_task = GNUNET_SCHEDULER_NO_TASK;
GNUNET_CONTAINER_DLL_insert (cadetp_head, cadetp_tail, cadetp);
return cadetp;
}
@@ -1009,9 +1173,12 @@
LOG (GNUNET_ERROR_TYPE_DEBUG,
"CADET channel was destroyed by remote peer `%s' or failed to start.\n",
GNUNET_i2s (&cadetp->peer_id));
- GNUNET_CONTAINER_DLL_remove (cadetp_head, cadetp_tail, cadetp);
+ if (NULL != cadetp->th)
+ {
+ GNUNET_CADET_notify_transmit_ready_cancel (cadetp->th);
+ cadetp->th = NULL;
+ }
cadetp->channel = NULL;
- destroy_cadet_peer (cadetp);
}
@@ -1098,6 +1265,12 @@
"Now trying to report last seen value of `%s' to collection point.\n",
sensor->name);
cadetp = get_cadet_peer (*sensor->collection_point);
+ if (NULL == cadetp->channel)
+ {
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Trying to send value to collection point but connection failed,
discarding.\n");
+ return;
+ }
ev = create_value_message (vi);
GNUNET_MQ_send (cadetp->mq, ev);
vi->last_value_reported = GNUNET_YES;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r34298 - gnunet/src/sensor,
gnunet <=