[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r14629 - gnunet/src/fs
From: |
gnunet |
Subject: |
[GNUnet-SVN] r14629 - gnunet/src/fs |
Date: |
Fri, 11 Mar 2011 17:23:52 +0100 |
Author: grothoff
Date: 2011-03-11 17:23:52 +0100 (Fri, 11 Mar 2011)
New Revision: 14629
Modified:
gnunet/src/fs/gnunet-service-fs_new.c
gnunet/src/fs/gnunet-service-fs_pe.c
gnunet/src/fs/gnunet-service-fs_pe.h
gnunet/src/fs/gnunet-service-fs_pr.c
gnunet/src/fs/gnunet-service-fs_pr.h
Log:
stuff
Modified: gnunet/src/fs/gnunet-service-fs_new.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_new.c 2011-03-11 12:57:57 UTC (rev
14628)
+++ gnunet/src/fs/gnunet-service-fs_new.c 2011-03-11 16:23:52 UTC (rev
14629)
@@ -228,9 +228,6 @@
}
-
-
-
/**
* Handle P2P "PUT" message.
*
@@ -261,25 +258,6 @@
/**
- * Decide with what weight we should forward the given
- * request to the given peer.
- *
- * @param cp target peer
- * @param pr request
- */
-static void
-plan (struct GSF_ConnectedPeer *cp,
- struct GSF_PendingRequest *pr)
-{
- GNUNET_CONTAINER_HeapCostType weight;
-
- weight = 0;
- /* FIXME: calculate weight properly... */
- GSF_plan_add_ (cp, pr, weight);
-}
-
-
-/**
* We have a new request, consider forwarding it to the given
* peer.
*
@@ -296,7 +274,7 @@
{
struct GSF_PendingRequest *pr = cls;
- plan (cp, pr);
+ GSF_plan_add_ (cp, pr);
}
@@ -466,7 +444,7 @@
{
struct GSF_ConnectedPeer *cp = cls;
- plan (cp, pr);
+ GSF_plan_add_ (cp, pr);
return GNUNET_YES;
}
Modified: gnunet/src/fs/gnunet-service-fs_pe.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pe.c 2011-03-11 12:57:57 UTC (rev
14628)
+++ gnunet/src/fs/gnunet-service-fs_pe.c 2011-03-11 16:23:52 UTC (rev
14629)
@@ -26,61 +26,174 @@
#include "platform.h"
#include "gnunet-service-fs_cp.h"
#include "gnunet-service-fs_pe.h"
+#include "gnunet-service-fs_pr.h"
/**
- * Hash map from peer identities to GNUNET_CONTAINER_Heap's with
- * pending requests as entries.
+ * Transmission plan for a peer.
*/
+struct PeerPlan
+{
+ /**
+ * Heap with pending queries, smaller weights mean higher priority.
+ */
+ struct GNUNET_CONTAINER_Heap *heap;
+
+ /**
+ * Current transmission request handle.
+ */
+ struct GSF_PeerTransmitHandle *pth;
+
+ /**
+ * Peer for which this is the plan.
+ */
+ struct GSF_ConnectedPeer *cp;
+
+ /**
+ * Current task for executing the plan.
+ */
+ GNUNET_SCHEDULER_TaskIdentifier task;
+};
+
+
+/**
+ * Hash map from peer identities to PeerPlans.
+ */
static struct GNUNET_CONTAINER_MultiHashMap *plans;
/**
- * Get the size of the request queue for the given peer.
+ * Figure out when and how to transmit to the given peer.
*
- * @param cp connected peer to query
- * @return number of entries in this peer's request queue
+ * @param cls the 'struct GSF_ConnectedPeer' for transmission
+ * @param tc scheduler context
*/
-static struct GNUNET_CONTAINER_Heap *
-get_heap (const struct GSF_ConnectedPeer *cp)
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc);
+
+
+/**
+ * Function called to get a message for transmission.
+ *
+ * @param cls closure
+ * @param buf_size number of bytes available in buf
+ * @param buf where to copy the message, NULL on error (peer disconnect)
+ * @return number of bytes copied to 'buf', can be 0 (without indicating an
error)
+ */
+static size_t
+transmit_message_callback (void *cls,
+ size_t buf_size,
+ void *buf)
{
- struct GNUNET_PeerIdentity id;
+ struct PeerPlan *pp = cls;
+ struct GSF_PendingRequest *pr;
+ size_t msize;
- GSF_connected_peer_get_identity_ (cp, &id);
- return GNUNET_CONTAINER_multihashmap_get (plans,
- &id.hashPubKey);
+ if (NULL == buf)
+ {
+ /* failed, try again... */
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
+ pr = GNUNET_CONTAINER_heap_peek (pp->heap);
+ msize = GSF_pending_request_get_message_ (pr, buf_size, buf);
+ if (msize > buf_size)
+ {
+ /* buffer to small (message changed), try again */
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission, pp);
+ return 0;
+ }
+ /* remove from root, add again elsewhere... */
+ GNUNET_assert (pr == GNUNET_CONTAINER_heap_remove_root (pp->heap));
+ GSF_plan_add_ (pp->cp, pr);
+ return msize;
}
/**
+ * Figure out when and how to transmit to the given peer.
+ *
+ * @param cls the 'struct PeerPlan'
+ * @param tc scheduler context
+ */
+static void
+schedule_peer_transmission (void *cls,
+ const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+ struct PeerPlan *pp = cls;
+ struct GSF_PendingRequest *pr;
+ size_t msize;
+ struct GNUNET_TIME_Relative delay;
+
+ pp->task = GNUNET_SCHEDULER_NO_TASK;
+ if (NULL == pp->heap)
+ return;
+ if (0 == GNUNET_CONTAINER_heap_get_size (pp->heap))
+ return;
+ GNUNET_assert (NULL == pp->pth);
+ pr = GNUNET_CONTAINER_heap_peek (pp->heap);
+ if (0) // FIXME: if (re)transmission should wait, wait...
+ {
+ delay = GNUNET_TIME_UNIT_SECONDS;
+ // FIXME
+ pp->task = GNUNET_SCHEDULER_add_delayed (delay,
+ &schedule_peer_transmission,
+ pp);
+ return;
+ }
+ msize = GSF_pending_request_get_message_ (pr, 0, NULL);
+ pp->pth = GSF_peer_transmit_ (pp->cp,
+ GNUNET_YES,
+ 0 /* FIXME: pr->priority? */,
+ GNUNET_TIME_UNIT_FOREVER_REL,
+ msize,
+ &transmit_message_callback,
+ pp);
+ GNUNET_assert (NULL != pp->pth);
+}
+
+
+/**
* Create a new query plan entry.
*
* @param cp peer with the entry
* @param pr request with the entry
- * @param weight determines position of the entry in the cp queue,
- * lower weights are earlier in the queue
*/
void
GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
- struct GSF_PendingRequest *pr,
- GNUNET_CONTAINER_HeapCostType weight)
+ struct GSF_PendingRequest *pr)
{
struct GNUNET_PeerIdentity id;
- struct GNUNET_CONTAINER_Heap *h;
-
+ struct PeerPlan *pp;
+ GNUNET_CONTAINER_HeapCostType weight;
+
GSF_connected_peer_get_identity_ (cp, &id);
- h = GNUNET_CONTAINER_multihashmap_get (plans,
- &id.hashPubKey);
- if (NULL == h)
+ pp = GNUNET_CONTAINER_multihashmap_get (plans,
+ &id.hashPubKey);
+ if (NULL == pp)
{
- h = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN);
+ pp = GNUNET_malloc (sizeof (struct PeerPlan));
+ pp->heap = GNUNET_CONTAINER_heap_create
(GNUNET_CONTAINER_HEAP_ORDER_MIN);
GNUNET_CONTAINER_multihashmap_put (plans,
&id.hashPubKey,
- h,
+ pp,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
}
- GNUNET_CONTAINER_heap_insert (h,
+ weight = 0; // FIXME: calculate real weight!
+ GNUNET_CONTAINER_heap_insert (pp->heap,
pr,
weight);
+ if (pp->pth != NULL)
+ {
+ if (pr != GNUNET_CONTAINER_heap_peek (pp->heap))
+ return;
+ GSF_peer_transmit_cancel_ (pp->pth);
+ pp->pth = NULL;
+ }
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ pp->task = GNUNET_SCHEDULER_add_now (&schedule_peer_transmission,
+ pp);
}
@@ -94,15 +207,20 @@
GSF_plan_notify_peer_disconnect_ (const struct GSF_ConnectedPeer *cp)
{
struct GNUNET_PeerIdentity id;
- struct GNUNET_CONTAINER_Heap *h;
+ struct PeerPlan *pp;
GSF_connected_peer_get_identity_ (cp, &id);
- h = GNUNET_CONTAINER_multihashmap_get (plans,
- &id.hashPubKey);
+ pp = GNUNET_CONTAINER_multihashmap_get (plans,
+ &id.hashPubKey);
GNUNET_CONTAINER_multihashmap_remove (plans,
&id.hashPubKey,
- h);
- GNUNET_CONTAINER_heap_destroy (h);
+ pp);
+ if (NULL != pp->pth)
+ GSF_peer_transmit_cancel_ (pp->pth);
+ if (GNUNET_SCHEDULER_NO_TASK != pp->task)
+ GNUNET_SCHEDULER_cancel (pp->task);
+ GNUNET_CONTAINER_heap_destroy (pp->heap);
+ GNUNET_free (pp);
}
@@ -152,11 +270,11 @@
/**
- * Remove the given request from all heaps. *
+ * Remove the given request from all heaps. * FIXME: O(n) -- inefficient!
*
* @param cls 'struct GSF_PendingRequest' to purge
* @param key identity of the peer we're currently looking at (unused)
- * @param value request heap for the given peer to search for the 'cls'
+ * @param value PeerPlan for the given peer to search for the 'cls'
* @return GNUNET_OK (continue iteration)
*/
static int
@@ -165,7 +283,8 @@
void *value)
{
const struct GSF_PendingRequest *pr = cls;
- struct GNUNET_CONTAINER_Heap *h = value;
+ struct PeerPlan *pp = value;
+ struct GNUNET_CONTAINER_Heap *h = pp->heap;
struct FindRequestClosure frc;
frc.pr = pr;
@@ -199,44 +318,6 @@
/**
- * Get the lowest-weight entry for the respective peer
- * from the plan. Removes the entry from the plan's queue.
- *
- * @param cp connected peer to query for the next request
- * @return NULL if the queue for this peer is empty
- */
-struct GSF_PendingRequest *
-GSF_plan_get_ (const struct GSF_ConnectedPeer *cp)
-{
- struct GNUNET_CONTAINER_Heap *h;
-
- h = get_heap (cp);
- if (NULL == h)
- return NULL;
- return GNUNET_CONTAINER_heap_remove_root (h);
-}
-
-
-/**
- * Get the size of the request queue for the given peer.
- *
- * @param cp connected peer to query
- * @return number of entries in this peer's request queue
- */
-unsigned int
-GSF_plan_size_ (const struct GSF_ConnectedPeer *cp)
-{
- struct GNUNET_CONTAINER_Heap *h;
-
- h = get_heap (cp);
- if (NULL == h)
- return 0;
- return GNUNET_CONTAINER_heap_get_size (h);
-}
-
-
-
-/**
* Initialize plan subsystem.
*/
void
Modified: gnunet/src/fs/gnunet-service-fs_pe.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pe.h 2011-03-11 12:57:57 UTC (rev
14628)
+++ gnunet/src/fs/gnunet-service-fs_pe.h 2011-03-11 16:23:52 UTC (rev
14629)
@@ -39,8 +39,7 @@
*/
void
GSF_plan_add_ (const struct GSF_ConnectedPeer *cp,
- struct GSF_PendingRequest *pr,
- GNUNET_CONTAINER_HeapCostType weight);
+ struct GSF_PendingRequest *pr);
/**
@@ -64,27 +63,6 @@
/**
- * Get the lowest-weight entry for the respective peer
- * from the plan. Removes the entry from the plan's queue.
- *
- * @param cp connected peer to query for the next request
- * @return NULL if the queue for this peer is empty
- */
-struct GSF_PendingRequest *
-GSF_plan_get_ (const struct GSF_ConnectedPeer *cp);
-
-
-/**
- * Get the size of the request queue for the given peer.
- *
- * @param cp connected peer to query
- * @return number of entries in this peer's request queue
- */
-unsigned int
-GSF_plan_size_ (const struct GSF_ConnectedPeer *cp);
-
-
-/**
* Initialize plan subsystem.
*/
void
Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c 2011-03-11 12:57:57 UTC (rev
14628)
+++ gnunet/src/fs/gnunet-service-fs_pr.c 2011-03-11 16:23:52 UTC (rev
14629)
@@ -423,14 +423,12 @@
* transmission to other peers (or at least determine its size).
*
* @param pr request to generate the message for
- * @param do_route are we routing the reply
* @param buf_size number of bytes available in buf
* @param buf where to copy the message (can be NULL)
* @return number of bytes needed (if > buf_size) or used
*/
size_t
GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
- int do_route,
size_t buf_size,
void *buf)
{
@@ -444,10 +442,13 @@
size_t bf_size;
struct GNUNET_TIME_Absolute now;
int64_t ttl;
+ int do_route;
+
k = 0;
bm = 0;
- if (GNUNET_YES != do_route)
+ do_route = (0 == (pr->public_data.options & GSF_PRO_FORWARD_ONLY));
+ if (! do_route)
{
bm |= GET_MESSAGE_BIT_RETURN_TO;
k++;
@@ -471,7 +472,7 @@
gm->header.type = htons (GNUNET_MESSAGE_TYPE_FS_GET);
gm->header.size = htons (msize);
gm->type = htonl (pr->public_data.type);
- if (GNUNET_YES == do_route)
+ if (do_route)
prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK,
pr->public_data.priority + 1);
else
@@ -486,7 +487,7 @@
gm->query = pr->public_data.query;
ext = (GNUNET_HashCode*) &gm[1];
k = 0;
- if (GNUNET_YES != do_route)
+ if (! do_route)
GNUNET_PEER_resolve (pr->sender_pid,
(struct GNUNET_PeerIdentity*) &ext[k++]);
if (GNUNET_BLOCK_TYPE_FS_SBLOCK == pr->public_data.type)
Modified: gnunet/src/fs/gnunet-service-fs_pr.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.h 2011-03-11 12:57:57 UTC (rev
14628)
+++ gnunet/src/fs/gnunet-service-fs_pr.h 2011-03-11 16:23:52 UTC (rev
14629)
@@ -234,14 +234,12 @@
* transmission to other peers (or at least determine its size).
*
* @param pr request to generate the message for
- * @param do_route are we routing the reply
* @param buf_size number of bytes available in buf
* @param buf where to copy the message (can be NULL)
* @return number of bytes needed (if buf_size too small) or used
*/
size_t
GSF_pending_request_get_message_ (struct GSF_PendingRequest *pr,
- int do_route,
size_t buf_size,
void *buf);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r14629 - gnunet/src/fs,
gnunet <=