gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14629 - gnunet/src/fs


From: gnunet
Subject: [GNUnet-SVN] r14629 - gnunet/src/fs
Date: Fri, 11 Mar 2011 17:23:52 +0100

Author: grothoff
Date: 2011-03-11 17:23:52 +0100 (Fri, 11 Mar 2011)
New Revision: 14629

Modified:
   gnunet/src/fs/gnunet-service-fs_new.c
   gnunet/src/fs/gnunet-service-fs_pe.c
   gnunet/src/fs/gnunet-service-fs_pe.h
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_pr.h
Log:
stuff

Modified: gnunet/src/fs/gnunet-service-fs_new.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_new.c       2011-03-11 12:57:57 UTC (rev 
14628)
+++ gnunet/src/fs/gnunet-service-fs_new.c       2011-03-11 16:23:52 UTC (rev 
14629)
@@ -228,9 +228,6 @@
 }
 
 
-
-
-
 /**
  * Handle P2P "PUT" message.
  *
@@ -261,25 +258,6 @@
 
 
 /**
- * Decide with what weight we should forward the given
- * request to the given peer.
- *
- * @param cp target peer
- * @param pr request
- */
-static void
-plan (struct GSF_ConnectedPeer *cp,
-      struct GSF_PendingRequest *pr)
-{
-  GNUNET_CONTAINER_HeapCostType weight;
-
-  weight = 0;
-  /* FIXME: calculate weight properly... */
-  GSF_plan_add_ (cp, pr, weight);
-}
-
-
-/**
  * We have a new request, consider forwarding it to the given
  * peer.
  *
@@ -296,7 +274,7 @@
 {
   struct GSF_PendingRequest *pr = cls;
 
-  plan (cp, pr);
+  GSF_plan_add_ (cp, pr);
 }
 
 
@@ -466,7 +444,7 @@
 {
   struct GSF_ConnectedPeer *cp = cls;
   
-  plan (cp, pr);
+  GSF_plan_add_ (cp, pr);
   return GNUNET_YES;
 }
 

Modified: gnunet/src/fs/gnunet-service-fs_pe.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pe.c        2011-03-11 12:57:57 UTC (rev 
14628)
+++ gnunet/src/fs/gnunet-service-fs_pe.c        2011-03-11 16:23:52 UTC (rev 
14629)
@@ -26,61 +26,174 @@
 #include "platform.h"
 #include "gnunet-service-fs_cp.h"
 #include "gnunet-service-fs_pe.h"
+#include "gnunet-service-fs_pr.h"
 
 /**
- * Hash map from peer identities to GNUNET_CONTAINER_Heap's with
- * pending requests as entries.
+ * Transmission plan for a peer.
  */
+struct PeerPlan
+{
+  /**
+   * Heap with pending queries, smaller weights mean higher priority.
+   */
+  struct GNUNET_CONTAINER_Heap *heap;
+
+  /**
+   * Current transmission request handle.
+   */
+  struct GSF_PeerTransmitHandle *pth;
+
+  /**
+   * Peer for which this is the plan.
+   */
+  struct GSF_ConnectedPeer *cp;
+
+  /**
+   * Current task for executing the plan.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier task;
+};
+
+
+/**
+ * Hash map from peer identities to PeerPlans.
+ */
 static struct GNUNET_CONTAINER_MultiHashMap *plans;
 
 
 /**
- * Get the size of the request queue for the given peer.
+ * Figure out when and how to transmit to the given peer.
  *
- * @param cp connected peer to query 
- * @return number of entries in this peer's request queue
+ * @param cls the 'struct GSF_ConnectedPeer' for transmission
+ * @param tc scheduler context
  */
-static struct GNUNET_CONTAINER_Heap *
-get_heap (const struct GSF_ConnectedPeer *cp)
+static void
+schedule_peer_transmission (void *cls,
+                           const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Function called to get a message for transmission.
+ *
+ * @param cls closure
+ * @param buf_size number of bytes available in buf
+ * @param buf where to copy the message, NULL on error (peer disconnect)
+ * @return number of bytes copied to 'buf', can be 0 (without indicating an 
error)
+ */
+static size_t 
+transmit_message_callback (void *cls,
+                          size_t buf_size,
+                          void *buf)
 {
-  struct GNUNET_PeerIdentity id;
+  struct PeerPlan *pp = cls;
+  struct GSF_PendingRequest *pr;
+  size_t msize;
 
-  GSF_connected_peer_get_identity_ (cp, &id);
-  return GNUNET_CONTAINER_multihashmap_get (plans,
-                                           &id.hashPubKey);
+  if (NULL == buf)
+    {
+      /* failed, try again... */
+      pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+      return 0;
+    }
+  pr = GNUNET_CONTAINER_heap_peek (pp->heap);
+  msize = GSF_pending_request_get_message_ (pr, buf_size, buf);
+  if (msize > buf_size)
+    {
+      /* buffer to small (message changed), try again */
+      pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+      return 0;
+    }
+  /* remove from root, add again elsewhere... */
+  GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap));
+  GSF_plan_add_ (pp->cp, pr);
+  return msize;
 }
 
 
 /**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct PeerPlan'
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+                           const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct PeerPlan *pp = cls;
+  struct GSF_PendingRequest *pr;
+  size_t msize;
+  struct GNUNET_TIME_Relative delay;
+
+  pp->task = GNUNET_SCHEDULER_NO_TASK;
+  if (NULL == pp->heap)
+    return;
+  if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
+    return;
+  GNUNET_assert (NULL == pp->pth);
+  pr = GNUNET_CONTAINER_heap_peek (pp->heap);
+  if (0) // FIXME: if (re)transmission should wait, wait...
+    {
+      delay = GNUNET_TIME_UNIT_SECONDS;
+      // FIXME
+      pp->task = GNUNET_SCHEDULER_add_delayed (delay,
+                                              &schedule_peer_transmission,
+                                              pp);
+      return;
+    }
+  msize = GSF_pending_request_get_message_ (pr, 0, NULL);                      
                   
+  pp->pth = GSF_peer_transmit_ (pp->cp,
+                               GNUNET_YES,
+                               0 /* FIXME: pr->priority? */,
+                               GNUNET_TIME_UNIT_FOREVER_REL,
+                               msize,
+                               &transmit_message_callback,
+                               pp);
+  GNUNET_assert (NULL != pp->pth);
+}
+
+
+/**
  * Create a new query plan entry.
  *
  * @param cp peer with the entry
  * @param pr request with the entry
- * @param weight determines position of the entry in the cp queue,
- *        lower weights are earlier in the queue
  */
 void
 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
-              struct GSF_PendingRequest *pr,
-              GNUNET_CONTAINER_HeapCostType weight)
+              struct GSF_PendingRequest *pr)
 {
   struct GNUNET_PeerIdentity id;
-  struct GNUNET_CONTAINER_Heap *h;
-
+  struct PeerPlan *pp;
+  GNUNET_CONTAINER_HeapCostType weight;
+  
   GSF_connected_peer_get_identity_ (cp, &id);
-  h = GNUNET_CONTAINER_multihashmap_get (plans,
-                                        &id.hashPubKey);
-  if (NULL == h)
+  pp = GNUNET_CONTAINER_multihashmap_get (plans,
+                                         &id.hashPubKey);
+  if (NULL == pp)
     {
-      h = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+      pp = GNUNET_malloc (sizeof (struct PeerPlan));
+      pp->heap = GNUNET_CONTAINER_heap_create 
(GNUNET_CONTAINER_HEAP_ORDER_MIN);
       GNUNET_CONTAINER_multihashmap_put (plans,
                                         &id.hashPubKey,
-                                        h,
+                                        pp,
                                         
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
     }
-  GNUNET_CONTAINER_heap_insert (h,
+  weight = 0; // FIXME: calculate real weight!
+  GNUNET_CONTAINER_heap_insert (pp->heap,
                                pr,
                                weight);
+  if (pp->pth != NULL)
+    {
+      if (pr != GNUNET_CONTAINER_heap_peek (pp->heap))
+       return;
+      GSF_peer_transmit_cancel_ (pp->pth);
+      pp->pth = NULL;
+    }
+  if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+    GNUNET_SCHEDULER_cancel (pp->task);
+  pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
+                                      pp);
 }
 
 
@@ -94,15 +207,20 @@
 GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
 {
   struct GNUNET_PeerIdentity id;
-  struct GNUNET_CONTAINER_Heap *h;
+  struct PeerPlan *pp;
 
   GSF_connected_peer_get_identity_ (cp, &id);
-  h = GNUNET_CONTAINER_multihashmap_get (plans,
-                                        &id.hashPubKey);
+  pp = GNUNET_CONTAINER_multihashmap_get (plans,
+                                         &id.hashPubKey);
   GNUNET_CONTAINER_multihashmap_remove (plans,
                                        &id.hashPubKey,
-                                       h);
-  GNUNET_CONTAINER_heap_destroy (h);
+                                       pp);
+  if (NULL != pp->pth)
+    GSF_peer_transmit_cancel_ (pp->pth);
+  if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+    GNUNET_SCHEDULER_cancel (pp->task);
+  GNUNET_CONTAINER_heap_destroy (pp->heap);
+  GNUNET_free (pp);
 }
 
 
@@ -152,11 +270,11 @@
 
 
 /**
- * Remove the given request from all heaps. * 
+ * Remove the given request from all heaps. * FIXME: O(n) -- inefficient!
  *
  * @param cls 'struct GSF_PendingRequest' to purge
  * @param key identity of the peer we're currently looking at (unused)
- * @param value request heap for the given peer to search for the 'cls'
+ * @param value PeerPlan for the given peer to search for the 'cls'
  * @return GNUNET_OK (continue iteration)
  */
 static int
@@ -165,7 +283,8 @@
                void *value)
 {
   const struct GSF_PendingRequest *pr = cls;
-  struct GNUNET_CONTAINER_Heap *h = value;
+  struct PeerPlan *pp = value;
+  struct GNUNET_CONTAINER_Heap *h = pp->heap;
   struct FindRequestClosure frc;
 
   frc.pr = pr;
@@ -199,44 +318,6 @@
 
 
 /**
- * Get the lowest-weight entry for the respective peer
- * from the plan.  Removes the entry from the plan's queue.
- *
- * @param cp connected peer to query for the next request
- * @return NULL if the queue for this peer is empty
- */
-struct GSF_PendingRequest *
-GSF_plan_get_ (const struct GSF_ConnectedPeer *cp)
-{
-  struct GNUNET_CONTAINER_Heap *h;
-
-  h = get_heap (cp);
-  if (NULL == h)
-    return NULL;
-  return GNUNET_CONTAINER_heap_remove_root (h);
-}
-
-
-/**
- * Get the size of the request queue for the given peer.
- *
- * @param cp connected peer to query 
- * @return number of entries in this peer's request queue
- */
-unsigned int
-GSF_plan_size_ (const struct GSF_ConnectedPeer *cp)
-{
-  struct GNUNET_CONTAINER_Heap *h;
-
-  h = get_heap (cp);
-  if (NULL == h)
-    return 0;
-  return GNUNET_CONTAINER_heap_get_size (h);
-}
-
-
-
-/**
  * Initialize plan subsystem.
  */
 void

Modified: gnunet/src/fs/gnunet-service-fs_pe.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pe.h        2011-03-11 12:57:57 UTC (rev 
14628)
+++ gnunet/src/fs/gnunet-service-fs_pe.h        2011-03-11 16:23:52 UTC (rev 
14629)
@@ -39,8 +39,7 @@
  */
 void
 GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
-              struct GSF_PendingRequest *pr,
-              GNUNET_CONTAINER_HeapCostType weight);
+              struct GSF_PendingRequest *pr);
 
 
 /**
@@ -64,27 +63,6 @@
 
 
 /**
- * Get the lowest-weight entry for the respective peer
- * from the plan.  Removes the entry from the plan's queue.
- *
- * @param cp connected peer to query for the next request
- * @return NULL if the queue for this peer is empty
- */
-struct GSF_PendingRequest *
-GSF_plan_get_ (const struct GSF_ConnectedPeer *cp);
-
-
-/**
- * Get the size of the request queue for the given peer.
- *
- * @param cp connected peer to query 
- * @return number of entries in this peer's request queue
- */
-unsigned int
-GSF_plan_size_ (const struct GSF_ConnectedPeer *cp);
-
-
-/**
  * Initialize plan subsystem.
  */
 void

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2011-03-11 12:57:57 UTC (rev 
14628)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2011-03-11 16:23:52 UTC (rev 
14629)
@@ -423,14 +423,12 @@
  * transmission to other peers (or at least determine its size).
  *
  * @param pr request to generate the message for
- * @param do_route are we routing the reply
  * @param buf_size number of bytes available in buf
  * @param buf where to copy the message (can be NULL)
  * @return number of bytes needed (if > buf_size) or used
  */
 size_t
 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
-                                 int do_route,
                                  size_t buf_size,
                                  void *buf)
 {
@@ -444,10 +442,13 @@
   size_t bf_size;
   struct GNUNET_TIME_Absolute now;
   int64_t ttl;
+  int do_route;
 
+
   k = 0;
   bm = 0;
-  if (GNUNET_YES != do_route)
+  do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
+  if (! do_route)
     {
       bm |= GET_MESSAGE_BIT_RETURN_TO;
       k++;      
@@ -471,7 +472,7 @@
   gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
   gm->header.size = htons (msize);
   gm->type = htonl (pr->public_data.type);
-  if (GNUNET_YES == do_route)
+  if (do_route)
     prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
                                     pr->public_data.priority + 1);
   else
@@ -486,7 +487,7 @@
   gm->query = pr->public_data.query;
   ext = (GNUNET_HashCode*) &gm[1];
   k = 0;  
-  if (GNUNET_YES != do_route)
+  if (! do_route)
     GNUNET_PEER_resolve (pr->sender_pid, 
                         (struct GNUNET_PeerIdentity*) &ext[k++]);
   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)

Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h        2011-03-11 12:57:57 UTC (rev 
14628)
+++ gnunet/src/fs/gnunet-service-fs_pr.h        2011-03-11 16:23:52 UTC (rev 
14629)
@@ -234,14 +234,12 @@
  * transmission to other peers (or at least determine its size).
  *
  * @param pr request to generate the message for
- * @param do_route are we routing the reply
  * @param buf_size number of bytes available in buf
  * @param buf where to copy the message (can be NULL)
  * @return number of bytes needed (if buf_size too small) or used
  */
 size_t
 GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
-                                 int do_route,
                                  size_t buf_size,
                                  void *buf);
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]