[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28341 - gnunet/src/mesh
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28341 - gnunet/src/mesh |
Date: |
Wed, 31 Jul 2013 10:07:58 +0200 |
Author: bartpolot
Date: 2013-07-31 10:07:58 +0200 (Wed, 31 Jul 2013)
New Revision: 28341
Modified:
gnunet/src/mesh/Makefile.am
gnunet/src/mesh/gnunet-service-mesh-enc.c
gnunet/src/mesh/mesh_protocol_enc.h
Log:
- wip
Modified: gnunet/src/mesh/Makefile.am
===================================================================
--- gnunet/src/mesh/Makefile.am 2013-07-30 18:42:38 UTC (rev 28340)
+++ gnunet/src/mesh/Makefile.am 2013-07-31 08:07:58 UTC (rev 28341)
@@ -20,6 +20,9 @@
AM_CLFAGS = -g
+EXP_LIBEXEC = \
+ gnunet-service-mesh-enc
+
libexec_PROGRAMS = \
gnunet-service-mesh $(EXP_LIBEXEC)
Modified: gnunet/src/mesh/gnunet-service-mesh-enc.c
===================================================================
--- gnunet/src/mesh/gnunet-service-mesh-enc.c 2013-07-30 18:42:38 UTC (rev
28340)
+++ gnunet/src/mesh/gnunet-service-mesh-enc.c 2013-07-31 08:07:58 UTC (rev
28341)
@@ -161,6 +161,7 @@
struct MeshClient;
struct MeshPeer;
struct MeshTunnel2;
+struct MeshConnection;
struct MeshChannel;
struct MeshChannelReliability;
@@ -186,11 +187,16 @@
struct MeshPeer *peer;
/**
- * Tunnel this message belongs to.
+ * Connection this message belongs to.
*/
- struct MeshTunnel2 *tunnel;
+ struct MeshConnection *c;
/**
+ * Channel this message belongs to, if known.
+ */
+ struct MeshChannel *ch;
+
+ /**
* Pointer to info stucture used as cls.
*/
void *cls;
@@ -214,6 +220,11 @@
struct MeshFlowControl
{
/**
+ * Peer
+ */
+ struct MeshPeer *peer;
+
+ /**
* Transmission queue to core DLL head
*/
struct MeshPeerQueue *queue_head;
@@ -983,26 +994,42 @@
connection_bck_keepalive (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc);
+
/**
+ * Change the tunnel state.
+ *
+ * @param c Connection whose state to change.
+ * @param state New state.
+ */
+static void
+connection_change_state (struct MeshConnection* c,
+ enum MeshConnectionState state);
+
+
+
+/**
* @brief Queue and pass message to core when possible.
*
- * If type is payload (UNICAST, TO_ORIGIN) checks for queue status
- * and accounts for it. In case the queue is full, the message is dropped and
+ * If type is payload (UNICAST, TO_ORIGIN) checks for queue status and
+ * accounts for it. In case the queue is full, the message is dropped and
* a break issued.
+ *
+ * Otherwise, message is treated as internal and allowed to go regardless of
+ * queue status.
*
- * Otherwise, the message is treated as internal and allowed to go,
- * regardless of queue status.
- *
* @param cls Closure (@c type dependant). It will be used by queue_send to
* build the message to be sent if not already prebuilt.
* @param type Type of the message, 0 for a raw message.
* @param size Size of the message.
* @param dst Neighbor to send message to.
- * @param t Tunnel this message belongs to.
+ * @param c Connection this message belongs to.
+ * @param ch Channel this message belongs to, if applicable (otherwise NULL).
*/
static void
queue_add (void *cls, uint16_t type, size_t size,
- struct MeshPeer *dst, struct MeshTunnel2 *t);
+ struct MeshPeer *dst,
+ struct MeshConnection *c,
+ struct MeshChannel *ch);
/**
@@ -1157,6 +1184,38 @@
/**
+ * Get the previous hop in a connection
+ *
+ * @param c Connection.
+ *
+ * @return Short ID of the previous peer.
+ */
+GNUNET_PEER_Id
+connection_get_prev_hop (struct MeshConnection *c)
+{
+ if (0 == c->own_pos || c->path->length < 2)
+ return c->path->peers[0];
+ return c->path->peers[c->own_pos - 1];
+}
+
+
+/**
+ * Get the next hop in a connection
+ *
+ * @param c Connection.
+ *
+ * @return Short ID of the next peer.
+ */
+GNUNET_PEER_Id
+connection_get_next_hop (struct MeshConnection *c)
+{
+ if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
+ return c->path->peers[c->path->length - 1];
+ return c->path->peers[c->own_pos + 1];
+}
+
+
+/**
* Check if client has registered with the service and has not disconnected
*
* @param client the client to check
@@ -1279,99 +1338,175 @@
}
+/**
+ * Pick a connection on which send the next data message.
+ *
+ * @param t Tunnel on which to send the message.
+ * @param fwd Is this a fwd message?
+ *
+ * @return The connection on which to send the next message.
+ */
+static struct MeshConnection *
+tunnel_get_connection (struct MeshTunnel2 *t, int fwd)
+{
+ struct MeshConnection *c;
+ struct MeshConnection *best;
+ struct MeshPeer *neighbor;
+ GNUNET_PEER_Id id;
+ unsigned int lowest_q;
+
+ neighbor = NULL;
+ best = NULL;
+ lowest_q = UINT_MAX;
+ for (c = t->connection_head; NULL != c; c = c->next)
+ {
+ if (MESH_CONNECTION_READY == c->state)
+ {
+ id = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
+ neighbor = peer_get_short (id);
+ if (NULL == neighbor->fc)
+ {
+ GNUNET_break (0);
+ continue;
+ }
+ if (neighbor->fc->queue_n < lowest_q)
+ {
+ best = c;
+ lowest_q = neighbor->fc->queue_n;
+ }
+ }
+ }
+ return best;
+}
+
+
/**
- * Sends an already built message to a peer, properly registrating
+ * Sends an already built message on a tunnel, properly registering
* all used resources.
*
* @param message Message to send. Function makes a copy of it.
- * @param peer Short ID of the neighbor whom to send the message.
- * @param t Tunnel on which this message is transmitted.
+ * @param c Connection on which this message is transmitted.
+ * @param ch Channel on which this message is transmitted.
+ * @param fwd Is this a fwd message?
*/
static void
-send_prebuilt_message (const struct GNUNET_MessageHeader *message,
- GNUNET_PEER_Id peer,
- struct MeshTunnel2 *t)
+send_prebuilt_message_connection (const struct GNUNET_MessageHeader *message,
+ struct MeshConnection *c,
+ struct MeshChannel *ch,
+ int fwd)
{
struct MeshPeer *neighbor;
- struct MeshPeerPath *p;
+ GNUNET_PEER_Id id;
void *data;
size_t size;
uint16_t type;
- if (0 == peer)
+ id = fwd ? connection_get_next_hop (c) : connection_get_prev_hop (c);
+ neighbor = peer_get_short (id);
+ if (NULL == neighbor)
+ {
+ GNUNET_break (0);
return;
+ }
size = ntohs (message->size);
data = GNUNET_malloc (size);
memcpy (data, message, size);
type = ntohs(message->type);
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
- GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
+ GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
{
struct GNUNET_MESH_Data *u;
u = (struct GNUNET_MESH_Data *) data;
u->ttl = htonl (ntohl (u->ttl) - 1);
}
- neighbor = peer_get_short (peer);
- for (p = neighbor->path_head; NULL != p; p = p->next)
- {
- if (2 >= p->length)
- {
- break;
- }
- }
- if (NULL == p)
- {
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- " no direct connection to %s\n",
- GNUNET_i2s (GNUNET_PEER_resolve2 (peer)));
- GNUNET_free (data);
- return;
- }
- if (GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK == type) // FIXME
- type = 0;
+
queue_add (data,
type,
size,
neighbor,
- t);
+ c,
+ ch);
}
-GNUNET_PEER_Id
-connection_get_prev_hop (struct MeshConnection *c)
+/**
+ * Sends an already built message on a tunnel, properly registering
+ * all used resources.
+ *
+ * @param message Message to send. Function makes a copy of it.
+ * @param t Tunnel on which this message is transmitted.
+ * @param ch Channel on which this message is transmitted.
+ * @param fwd Is this a fwd message?
+ */
+static void
+send_prebuilt_message_tunnel (const struct GNUNET_MessageHeader *message,
+ struct MeshTunnel2 *t,
+ struct MeshChannel *ch,
+ int fwd)
{
- if (0 == c->own_pos || c->path->length < 2)
- return c->path->peers[0];
- return c->path->peers[c->own_pos - 1];
+ struct MeshConnection *c;
+
+ c = tunnel_get_connection (t, fwd);
+ if (NULL == c)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ send_prebuilt_message_connection (message, c, ch, fwd);
}
-GNUNET_PEER_Id
-connection_get_next_hop (struct MeshConnection *c)
+/**
+ * Sends an already built message directly to a peer.
+ *
+ * @param message Message to send. Function makes a copy of it.
+ * @param peer Tunnel on which this message is transmitted.
+ */
+static void
+send_prebuilt_message_peer (const struct GNUNET_MessageHeader *message,
+ struct MeshPeer *peer)
{
- if ((c->path->length - 1) == c->own_pos || c->path->length < 2)
- return c->path->peers[c->path->length - 1];
- return c->path->peers[c->own_pos + 1];
+ void *data;
+ size_t size;
+ uint16_t type;
+
+ if (NULL == peer)
+ {
+ GNUNET_break (0);
+ return;
+ }
+
+ size = ntohs (message->size);
+ data = GNUNET_malloc (size);
+ memcpy (data, message, size);
+ type = ntohs(message->type);
+
+ queue_add (data,
+ type,
+ size,
+ peer,
+ NULL,
+ NULL);
}
/**
- * Sends a CREATE CONNECTION message for a path to a peer, properly
registrating
- * all used resources.
+ * Sends a CREATE CONNECTION message for a path to a peer.
+ * Changes the connection and tunnel states if necessary.
*
- * @param t Tunnel for which the connection is created.
* @param connection Connection to create.
*/
static void
-send_connection_create (struct MeshTunnel2 *t,
- struct MeshConnection *connection)
+send_connection_create (struct MeshConnection *connection)
{
struct MeshPeer *neighbor;
+ struct MeshTunnel2 *t;
+ t = connection->t;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send connection create\n");
neighbor = peer_get_short (connection_get_next_hop (connection));
queue_add (connection,
@@ -1380,9 +1515,12 @@
(connection->path->length *
sizeof (struct GNUNET_PeerIdentity)),
neighbor,
- t);
+ connection,
+ NULL);
if (MESH_TUNNEL_SEARCHING == t->state)
tunnel_change_state (t, MESH_TUNNEL_WAITING);
+ if (MESH_CONNECTION_NEW == connection->state)
+ connection_change_state (connection, MESH_CONNECTION_SENT);
}
@@ -1390,21 +1528,23 @@
* Sends a CONNECTION ACK message in reponse to a received CONNECTION_CREATE
* directed to us.
*
- * @param t Tunnel which to confirm.
* @param connection Connection to confirm.
*/
static void
-send_connection_ack (struct MeshTunnel2 *t, struct MeshConnection *connection)
+send_connection_ack (struct MeshConnection *connection)
{
struct MeshPeer *neighbor;
+ struct MeshTunnel2 *t;
+ t = connection->t;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Send connection ack\n");
neighbor = peer_get_short (connection_get_prev_hop (connection));
queue_add (connection,
GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK,
sizeof (struct GNUNET_MESH_ConnectionACK),
neighbor,
- t);
+ connection,
+ NULL);
if (MESH_TUNNEL_NEW == t->state)
tunnel_change_state (t, MESH_TUNNEL_WAITING);
}
@@ -1425,7 +1565,7 @@
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_ACK);
msg.pid = htonl (ack);
- send_prebuilt_message (&msg.header, peer, NULL);
+ send_prebuilt_message_peer (&msg.header, peer_get_short (peer));
}
@@ -1771,27 +1911,27 @@
if (NULL != p)
{
c = tunnel_use_path (t, p);
- send_connection_create (t, c);
- connection_change_state (c, MESH_CONNECTION_SENT);
+ send_connection_create (c);
}
}
else if (NULL == peer->dhtget)
{
- struct GNUNET_PeerIdentity id;
+ const struct GNUNET_PeerIdentity *id;
- GNUNET_PEER_resolve (peer->id, &id);
+ id = GNUNET_PEER_resolve2 (peer->id);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- " Starting DHT GET for peer %s\n", GNUNET_i2s (&id));
+ " Starting DHT GET for peer %s\n", GNUNET_i2s (id));
peer->dhtget = GNUNET_DHT_get_start (dht_handle, /* handle */
GNUNET_BLOCK_TYPE_MESH_PEER, /* type
*/
- &id.hashPubKey, /* key to search
*/
+ &id->hashPubKey, /* key to search
*/
dht_replication_level, /* replication
level */
GNUNET_DHT_RO_RECORD_ROUTE |
GNUNET_DHT_RO_DEMULTIPLEX_EVERYWHERE,
NULL, /* xquery */
0, /* xquery bits */
&dht_get_id_handler, peer);
- tunnel_change_state (t, MESH_TUNNEL_SEARCHING);
+ if (MESH_TUNNEL_NEW == t->state)
+ tunnel_change_state (t, MESH_TUNNEL_SEARCHING);
}
else
{
@@ -1864,7 +2004,7 @@
for (q = fc->queue_head; NULL != q; q = next)
{
next = q->next;
- if (q->tunnel == t)
+ if (q->peer->tunnel == t)
{
if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == q->type ||
GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == q->type)
@@ -2099,57 +2239,8 @@
}
-/**
- * Add a tunnel to the list of tunnels a peer participates in.
- * Update the tunnel's destination.
- *
- * @param p Peer to add to.
- * @param t Tunnel to add.
- */
-static void
-peer_add_tunnel (struct MeshPeer *p, struct MeshTunnel *t)
-{
- if (0 != t->dest)
- {
- GNUNET_break (t->dest == p->id);
- return;
- }
- t->dest = p->id;
- GNUNET_PEER_change_rc (t->dest, 1);
- GNUNET_array_append (p->tunnels, p->ntunnels, t);
-}
-
/**
- * Remove a tunnel from the list of tunnels a peer participates in.
- * Free the tunnel's destination.
- *
- * @param p Peer to clean.
- * @param t Tunnel to remove.
- */
-static void
-peer_remove_tunnel (struct MeshPeer *p, struct MeshTunnel *t)
-{
- unsigned int i;
-
- if (t->dest == p->id)
- {
- GNUNET_PEER_change_rc (t->dest, -1);
- t->dest = 0;
- }
- for (i = 0; i < p->ntunnels; i++)
- {
- if (p->tunnels[i] == t)
- {
- p->tunnels[i] = p->tunnels[p->ntunnels - 1];
- GNUNET_array_grow (p->tunnels, p->ntunnels, p->ntunnels - 1);
- return;
- }
- }
-}
-
-
-/**
* Function called if the connection to the peer has been stalled for a while,
* possibly due to a missed ACK. Poll the peer about its ACK status.
*
@@ -2157,11 +2248,10 @@
* @param tc TaskContext.
*/
static void
-tunnel_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+peer_poll (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct MeshFlowControl *fc = cls;
struct GNUNET_MESH_Poll msg;
- struct MeshTunnel *t = fc->t;
GNUNET_PEER_Id peer;
fc->poll_task = GNUNET_SCHEDULER_NO_TASK;
@@ -2171,44 +2261,19 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** Polling!\n");
+ peer = fc->peer->id;
- GNUNET_PEER_resolve (t->id.oid, &msg.oid);
-
- if (fc == &t->prev_fc)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** prev peer!\n");
- peer = t->prev_hop;
- }
- else if (fc == &t->next_fc)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** next peer!\n");
- peer = t->next_hop;
- }
- else
- {
- GNUNET_break (0);
- return;
- }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** peer: %s!\n",
- GNUNET_i2s(GNUNET_PEER_resolve2 (peer)));
- if (0 == peer)
- {
- if (GNUNET_YES == t->destroy)
- tunnel_destroy (t);
- else
- GNUNET_break (0);
+ GNUNET_i2s (GNUNET_PEER_resolve2 (peer)));
- return;
- }
msg.header.type = htons (GNUNET_MESSAGE_TYPE_MESH_POLL);
msg.header.size = htons (sizeof (msg));
- msg.tid = htonl (t->id.tid);
- msg.pid = htonl (peer_get_first_payload_pid (peer_get_short (peer), t));
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", ntohl (msg.pid));
- send_prebuilt_message (&msg.header, peer, t);
+ msg.pid = htonl (fc->last_pid_sent);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " *** pid (%u)!\n", fc->last_pid_sent);
+ send_prebuilt_message_peer (&msg.header, peer_get_short (peer));
fc->poll_time = GNUNET_TIME_STD_BACKOFF (fc->poll_time);
fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
- &tunnel_poll, fc);
+ &peer_poll, fc);
}
@@ -2361,14 +2426,10 @@
static struct MeshTunnel *
channel_get_by_pi (GNUNET_PEER_Id pi, MESH_ChannelNumber tid)
{
- struct MESH_TunnelID id;
- struct GNUNET_HashCode hash;
+// struct GNUNET_HashCode hash;
- id.oid = pi;
- id.tid = tid;
-
- GNUNET_CRYPTO_hash (&id, sizeof (struct MESH_TunnelID), &hash);
- return GNUNET_CONTAINER_multihashmap_get (tunnels, &hash);
+// return GNUNET_CONTAINER_multihashmap_get (tunnels, &hash); FIXME
+ return NULL;
}
@@ -2408,14 +2469,9 @@
}
-/**
- * Change the tunnel state.
- *
- * @param c Connection whose state to change.
- * @param state New state.
- */
static void
-connection_change_state (MeshConnection* c, MeshConnectionState state)
+connection_change_state (struct MeshConnection* c,
+ enum MeshConnectionState state)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Connection %s[%X] state was %s\n",
@@ -2542,26 +2598,24 @@
/**
* Send an end-to-end FWD ACK message for the most recent in-sequence payload.
*
- * @param t Tunnel this is about.
+ * @param ch Channel this is about.
* @param fwd Is for FWD traffic? (ACK dest->owner)
*/
static void
-tunnel_send_data_ack (struct MeshTunnel *t, int fwd)
+channel_send_data_ack (struct MeshChannel *ch, int fwd)
{
struct GNUNET_MESH_DataACK msg;
struct MeshChannelReliability *rel;
struct MeshReliableMessage *copy;
- GNUNET_PEER_Id hop;
uint64_t mask;
unsigned int delta;
- rel = fwd ? t->bck_rel : t->fwd_rel;
- hop = fwd ? t->prev_hop : t->next_hop;
+ rel = fwd ? ch->bck_rel : ch->fwd_rel;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"send_data_ack for %u\n",
rel->mid_recv - 1);
- if (GNUNET_NO == t->reliable)
+ if (GNUNET_NO == ch->reliable)
{
GNUNET_break_op (0);
return;
@@ -2586,7 +2640,7 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " final futures %llX\n", msg.futures);
- send_prebuilt_message (&msg.header, hop, t);
+ send_prebuilt_message_tunnel (&msg.header, t, ch, fwd);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "send_data_ack END\n");
}
@@ -2594,17 +2648,18 @@
/**
* Send an ACK informing the predecessor about the available buffer space.
* In case there is no predecessor, inform the owning client.
- * If buffering is off, send only on behalf of children or self if endpoint.
- * If buffering is on, send when sent to children and buffer space is free.
+ *
* Note that although the name is fwd_ack, the FWD mean forward *traffic*,
* the ACK itself goes "back" (towards root).
*
- * @param t Tunnel on which to send the ACK.
+ * @param ch Channel on which to send the ACK.
+ * @param c Connection on which to send the ACK.
* @param type Type of message that triggered the ACK transmission.
* @param fwd Is this FWD ACK? (Going dest->owner)
*/
static void
-tunnel_send_ack (struct MeshTunnel *t, uint16_t type, int fwd)
+channel_send_ack (struct MeshChannel *ch, struct MeshConnection *c,
+ uint16_t type, int fwd)
{
struct MeshChannelReliability *rel;
struct MeshFlowControl *next_fc;
@@ -2616,12 +2671,12 @@
uint32_t ack;
int delta;
- rel = fwd ? t->fwd_rel : t->bck_rel;
- c = fwd ? t->client : t->owner;
- o = fwd ? t->owner : t->client;
+ rel = fwd ? ch->fwd_rel : ch->bck_rel;
+ c = fwd ? ch->client : ch->owner;
+ o = fwd ? ch->owner : ch->client;
+ hop = fwd ? connection_get_prev_hop (c) : connection_get_next_hop (c);
next_fc = fwd ? &t->next_fc : &t->prev_fc;
prev_fc = fwd ? &t->prev_fc : &t->next_fc;
- hop = fwd ? t->prev_hop : t->next_hop;
switch (type)
{
@@ -3079,28 +3134,26 @@
msg = (struct GNUNET_MESH_TunnelKeepAlive *) cbuf;
msg->header.size = htons (size);
msg->header.type = htons (type);
- msg->oid = *(GNUNET_PEER_resolve2 (t->id.oid));
- msg->tid = htonl (t->id.tid);
- send_prebuilt_message (&msg->header, hop, t);
+ msg->oid = *(GNUNET_PEER_resolve2 (c->t->id.oid));
+ msg->tid = htonl (c->t->id.tid);
+ send_prebuilt_message (&msg->header, hop, c->t);
}
/**
- * Send create (PATH_CREATE/PATH_ACK) packets for a tunnel.
+ * Send CONNECTION_{CREATE/ACK} packets for a connection.
*
- * @param t Tunnel for which to send the message.
+ * @param c Connection for which to send the message.
* @param fwd If GNUNET_YES, send CREATE, otherwise send ACK.
*/
static void
-connection_recreate (struct MeshTunnel *t, int fwd)
+connection_recreate (struct MeshConnection *c, int fwd)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "sending path recreate for tunnel %s[%X]\n",
- GNUNET_i2s (GNUNET_PEER_resolve2 (t->id.oid)), t->id.tid);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "sending connection recreate\n");
if (fwd)
- send_path_create (t);
+ send_connection_create (c);
else
- send_path_ack (t);
+ send_connection_ack (c);
}
@@ -3125,10 +3178,10 @@
case MESH_CONNECTION_NEW:
GNUNET_break (0);
case MESH_CONNECTION_SENT:
- connection_recreate (t, fwd);
+ connection_recreate (c, fwd);
break;
case MESH_CONNECTION_READY:
- connection_keepalive (t, fwd);
+ connection_keepalive (c, fwd);
break;
default:
break;
@@ -3695,12 +3748,12 @@
struct MeshPeerQueue *q;
struct GNUNET_MESH_Data *dmsg;
- struct MeshTunnel* t;
+ struct MeshTunnel2 *t;
uint32_t pid;
uint32_t ack;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* selecting message\n");
- for (q = peer->queue_head; NULL != q; q = q->next)
+ for (q = peer->fc->queue_head; NULL != q; q = q->next)
{
t = q->tunnel;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -3748,14 +3801,20 @@
struct MeshPeer *peer = cls;
struct GNUNET_MessageHeader *msg;
struct MeshPeerQueue *queue;
- struct MeshTunnel *t;
- struct GNUNET_PeerIdentity dst_id;
+ struct MeshTunnel2 *t;
+ struct GNUNET_PeerIdentity *dst_id;
struct MeshFlowControl *fc;
size_t data_size;
uint32_t pid;
uint16_t type;
- peer->core_transmit = NULL;
+ fc = peer->fc;
+ if (NULL == fc)
+ {
+ GNUNET_break (0);
+ return 0;
+ }
+ fc->core_transmit = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* Queue send\n");
@@ -3770,28 +3829,28 @@
if (NULL == queue)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not ready, return\n");
- if (NULL == peer->queue_head)
+ if (NULL == fc->queue_head)
GNUNET_break (0); /* Core tmt_rdy should've been canceled */
return 0;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* not empty\n");
- GNUNET_PEER_resolve (peer->id, &dst_id);
+ dst_id = GNUNET_PEER_resolve (peer->id);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"* towards %s\n",
- GNUNET_i2s (&dst_id));
+ GNUNET_i2s (dst_id));
/* Check if buffer size is enough for the message */
if (queue->size > size)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"* not enough room, reissue\n");
- peer->core_transmit =
+ fc->core_transmit =
GNUNET_CORE_notify_transmit_ready (core_handle,
GNUNET_NO,
0,
GNUNET_TIME_UNIT_FOREVER_REL,
- &dst_id,
+ dst_id,
queue->size,
&queue_send,
peer);
@@ -3799,9 +3858,7 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* size ok\n");
- t = queue->tunnel;
- GNUNET_assert (0 < t->pending_messages);
- t->pending_messages--;
+ t = queue->peer->tunnel;
type = 0;
/* Fill buf */
@@ -3829,11 +3886,14 @@
break;
case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_CREATE:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path create\n");
- data_size = send_core_path_create (queue->cls, size, buf);
+ data_size = send_core_connection_create (queue->cls, size, buf);
break;
case GNUNET_MESSAGE_TYPE_MESH_CONNECTION_ACK:
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "* path ack\n");
- data_size = send_core_path_ack (queue->cls, size, buf);
+ if (NULL != t->client)
+ data_size = send_core_connection_ack (queue->cls, size, buf);
+ else
+ data_size = send_core_data_raw (queue->cls, size, buf);
break;
default:
GNUNET_break (0);
@@ -3939,56 +3999,40 @@
}
-/**
- * @brief Queue and pass message to core when possible.
- *
- * If type is payload (UNICAST, TO_ORIGIN) checks for queue status and
- * accounts for it. In case the queue is full, the message is dropped and
- * a break issued.
- *
- * Otherwise, message is treated as internal and allowed to go regardless of
- * queue status.
- *
- * @param cls Closure (@c type dependant). It will be used by queue_send to
- * build the message to be sent if not already prebuilt.
- * @param type Type of the message, 0 for a raw message.
- * @param size Size of the message.
- * @param dst Neighbor to send message to.
- * @param t Tunnel this message belongs to.
- */
static void
queue_add (void *cls, uint16_t type, size_t size,
- struct MeshPeer *dst, struct MeshTunnel *t)
+ struct MeshPeer *dst,
+ struct MeshConnection *c,
+ struct MeshChannel *ch)
{
struct MeshPeerQueue *queue;
struct MeshFlowControl *fc;
+ struct MeshTunnel2 *t;
int priority;
fc = NULL;
priority = GNUNET_NO;
- if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type)
+ if (GNUNET_MESSAGE_TYPE_MESH_UNICAST == type ||
+ GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
{
- fc = &t->next_fc;
+ fc = dst->fc;
}
- else if (GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)
- {
- fc = &t->prev_fc;
- }
if (NULL != fc)
{
- if (fc->queue_n >= t->queue_max)
+ if (fc->queue_n >= fc->queue_max)
{
/* If this isn't a retransmission, drop the message */
- if (GNUNET_NO == t->reliable ||
- (NULL == t->owner && GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) ||
- (NULL == t->client && GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type))
+ if (NULL != ch &&
+ (GNUNET_NO == ch->reliable ||
+ (NULL == ch->owner && GNUNET_MESSAGE_TYPE_MESH_UNICAST == type) ||
+ (NULL == ch->client && GNUNET_MESSAGE_TYPE_MESH_TO_ORIGIN == type)))
{
GNUNET_STATISTICS_update (stats, "# messages dropped (buffer full)",
1, GNUNET_NO);
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"queue full: %u/%u\n",
- fc->queue_n, t->queue_max);
+ fc->queue_n, fc->queue_max);
return; /* Drop this message */
}
priority = GNUNET_YES;
@@ -3997,25 +4041,28 @@
if (GMC_is_pid_bigger(fc->last_pid_sent + 1, fc->last_ack_recv) &&
GNUNET_SCHEDULER_NO_TASK == fc->poll_task)
fc->poll_task = GNUNET_SCHEDULER_add_delayed (fc->poll_time,
- &tunnel_poll,
- fc);
+ &peer_poll,
+ dst);
}
queue = GNUNET_malloc (sizeof (struct MeshPeerQueue));
queue->cls = cls;
queue->type = type;
queue->size = size;
queue->peer = dst;
- queue->tunnel = t;
+ queue->c = c;
+ queue->ch = ch;
if (GNUNET_YES == priority)
{
struct GNUNET_MESH_Data *d;
uint32_t prev;
uint32_t next;
- GNUNET_CONTAINER_DLL_insert (dst->queue_head, dst->queue_tail, queue);
+ GNUNET_CONTAINER_DLL_insert (dst->fc->queue_head,
+ dst->fc->queue_tail,
+ queue);
d = (struct GNUNET_MESH_Data *) queue->cls;
prev = d->pid;
- for (queue = dst->queue_tail; NULL != queue; queue = queue->prev)
+ for (queue = dst->fc->queue_tail; NULL != queue; queue = queue->prev)
{
if (queue->type != type)
continue;
@@ -4026,11 +4073,13 @@
}
}
else
- GNUNET_CONTAINER_DLL_insert_tail (dst->queue_head, dst->queue_tail, queue);
+ GNUNET_CONTAINER_DLL_insert_tail (dst->fc->queue_head,
+ dst->fc->queue_tail,
+ queue);
- if (NULL == dst->core_transmit)
+ if (NULL == dst->fc->core_transmit)
{
- dst->core_transmit =
+ dst->fc->core_transmit =
GNUNET_CORE_notify_transmit_ready (core_handle,
0,
0,
@@ -4040,7 +4089,6 @@
&queue_send,
dst);
}
- t->pending_messages++;
}
@@ -4995,6 +5043,7 @@
/* Count connections */
for (c = peer->tunnel->connection_head, i = 0; NULL != c; c = c->next, i++);
+ /* If we already have 3 (or more (?!)) connections, it's enough */
if (3 <= i)
return;
@@ -5699,6 +5748,12 @@
path->peers[0] = myid;
GNUNET_PEER_change_rc (myid, 1);
peer_add_path (peer_info, path, GNUNET_YES);
+ if (NULL == peer_info->fc)
+ {
+ peer_info->fc = GNUNET_new (struct MeshFlowControl);
+ fc_init (peer_info->fc);
+ peer_info->fc->peer = peer_info;
+ }
return;
}
@@ -5715,6 +5770,7 @@
struct MeshPeer *pi;
struct MeshPeerQueue *q;
struct MeshPeerQueue *n;
+ struct MeshFlowControl *fc;
DEBUG_CONN ("Peer disconnected\n");
pi = GNUNET_CONTAINER_multihashmap_get (peers, &peer->hashPubKey);
@@ -5723,25 +5779,34 @@
GNUNET_break (0);
return;
}
- q = pi->queue_head;
+ fc = pi->fc;
+ if (NULL != fc)
+ {
+ GNUNET_break (0);
+ return;
+ }
+ pi->fc = NULL;
+
+ q = fc->queue_head;
while (NULL != q)
{
n = q->next;
- /* TODO try to reroute this traffic instead */
- queue_destroy(q, GNUNET_YES);
+ queue_destroy (q, GNUNET_YES);
q = n;
}
- if (NULL != pi->core_transmit)
- {
- GNUNET_CORE_notify_transmit_ready_cancel(pi->core_transmit);
- pi->core_transmit = NULL;
- }
+ if (NULL != fc->core_transmit)
+ GNUNET_CORE_notify_transmit_ready_cancel (fc->core_transmit);
+ if (GNUNET_SCHEDULER_NO_TASK != fc->poll_task)
+ GNUNET_SCHEDULER_cancel (fc->poll_task);
+
peer_remove_path (pi, pi->id, myid);
if (myid == pi->id)
{
DEBUG_CONN (" (self)\n");
}
GNUNET_STATISTICS_update (stats, "# peers", -1, GNUNET_NO);
+ GNUNET_free (fc);
+
return;
}
Modified: gnunet/src/mesh/mesh_protocol_enc.h
===================================================================
--- gnunet/src/mesh/mesh_protocol_enc.h 2013-07-30 18:42:38 UTC (rev 28340)
+++ gnunet/src/mesh/mesh_protocol_enc.h 2013-07-31 08:07:58 UTC (rev 28341)
@@ -201,16 +201,6 @@
struct GNUNET_MessageHeader header;
/**
- * TID of the tunnel
- */
- uint32_t tid GNUNET_PACKED;
-
- /**
- * OID of the tunnel
- */
- struct GNUNET_PeerIdentity oid;
-
- /**
* Last packet sent.
*/
uint32_t pid GNUNET_PACKED;
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28341 - gnunet/src/mesh,
gnunet <=