[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3398 - in GNUnet: . src/applications/fs/ecrs src/applicati
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3398 - in GNUnet: . src/applications/fs/ecrs src/applications/tbench src/server src/transports |
Date: |
Wed, 13 Sep 2006 22:09:49 -0700 (PDT) |
Author: grothoff
Date: 2006-09-13 22:09:41 -0700 (Wed, 13 Sep 2006)
New Revision: 3398
Modified:
GNUnet/src/applications/fs/ecrs/ecrstest.c
GNUnet/src/applications/tbench/Makefile.am
GNUnet/src/applications/tbench/gnunet-tbench.c
GNUnet/src/applications/tbench/tbench.c
GNUnet/src/applications/tbench/tbenchtest.c
GNUnet/src/applications/tbench/tbenchtest_udp.c
GNUnet/src/server/connection.c
GNUnet/src/server/gnunetd.c
GNUnet/src/transports/tcp.c
GNUnet/src/transports/tcp_helper.c
GNUnet/todo
Log:
fixing bugs in tbench, transports, core bandwidth allocation and core message
buffering
Modified: GNUnet/src/applications/fs/ecrs/ecrstest.c
===================================================================
--- GNUnet/src/applications/fs/ecrs/ecrstest.c 2006-09-12 21:15:40 UTC (rev
3397)
+++ GNUnet/src/applications/fs/ecrs/ecrstest.c 2006-09-14 05:09:41 UTC (rev
3398)
@@ -302,6 +302,7 @@
if (sock != NULL)
connection_destroy(sock);
GE_ASSERT(NULL, OK == os_daemon_stop(NULL, daemon));
+ GC_free(cfg);
return (ok == YES) ? 0 : 1;
}
Modified: GNUnet/src/applications/tbench/Makefile.am
===================================================================
--- GNUnet/src/applications/tbench/Makefile.am 2006-09-12 21:15:40 UTC (rev
3397)
+++ GNUnet/src/applications/tbench/Makefile.am 2006-09-14 05:09:41 UTC (rev
3398)
@@ -44,6 +44,7 @@
$(top_builddir)/src/applications/stats/libgnunetstats_api.la \
$(top_builddir)/src/util/network_client/libgnunetutil_network_client.la \
$(top_builddir)/src/util/loggers/libgnunetutil_logging.la \
+ $(top_builddir)/src/util/crypto/libgnunetutil_crypto.la \
$(top_builddir)/src/util/config_impl/libgnunetutil_config.la \
$(top_builddir)/src/util/libgnunetutil.la
@@ -53,6 +54,7 @@
$(top_builddir)/src/applications/stats/libgnunetstats_api.la \
$(top_builddir)/src/util/network_client/libgnunetutil_network_client.la \
$(top_builddir)/src/util/loggers/libgnunetutil_logging.la \
+ $(top_builddir)/src/util/crypto/libgnunetutil_crypto.la \
$(top_builddir)/src/util/config_impl/libgnunetutil_config.la \
$(top_builddir)/src/util/libgnunetutil.la
Modified: GNUnet/src/applications/tbench/gnunet-tbench.c
===================================================================
--- GNUnet/src/applications/tbench/gnunet-tbench.c 2006-09-12 21:15:40 UTC
(rev 3397)
+++ GNUnet/src/applications/tbench/gnunet-tbench.c 2006-09-14 05:09:41 UTC
(rev 3398)
@@ -82,14 +82,14 @@
&gnunet_getopt_configure_set_ulong, &messageSize },
{ 'S', "space", "SPACE",
gettext_noop("sleep for SPACE ms after each a message block"), 1,
- &gnunet_getopt_configure_set_ulong, &messageTrainSize },
+ &gnunet_getopt_configure_set_ulong, &messageSpacing },
{ 't', "timeout", "TIMEOUT",
gettext_noop("time to wait for the completion of an iteration (in ms)"), 1,
&gnunet_getopt_configure_set_ulong, &messageTimeOut },
COMMAND_LINE_OPTION_VERSION(PACKAGE_VERSION), /* -v */
{ 'X', "xspace", "COUNT",
gettext_noop("number of messages in a message block"), 1,
- &gnunet_getopt_configure_set_ulong, &messageSpacing },
+ &gnunet_getopt_configure_set_ulong, &messageTrainSize },
COMMAND_LINE_OPTION_END,
};
@@ -185,7 +185,7 @@
GE_ASSERT(ectx,
ntohs(buffer->header.size) ==
sizeof(CS_tbench_reply_MESSAGE));
- if ((float)buffer->mean_loss <= 0){
+ if ((float)buffer->mean_loss < 0){
GE_BREAK(ectx, 0);
messagesPercentLoss = 0.0;
} else {
Modified: GNUnet/src/applications/tbench/tbench.c
===================================================================
--- GNUnet/src/applications/tbench/tbench.c 2006-09-12 21:15:40 UTC (rev
3397)
+++ GNUnet/src/applications/tbench/tbench.c 2006-09-14 05:09:41 UTC (rev
3398)
@@ -56,16 +56,9 @@
*/
static struct MUTEX * lock;
-static struct SEMAPHORE * presem;
-
static struct SEMAPHORE * postsem;
/**
- * What was the packet number we received?
- */
-static unsigned int lastPacketNumber;
-
-/**
* What is the current iteration counter? (Used to verify
* that replies match the current request series).
*/
@@ -82,49 +75,15 @@
static CoreAPIForApplication * coreAPI;
+static IterationData * results;
+
/**
- * Check if we have received a p2p reply,
- * update result counters accordingly.
- *
- * @return 0 if we have received all results, >0 otherwise
+ * Did we receive the last response for the current iteration
+ * before the timeout? If so, when?
*/
-static int pollResults(IterationData * results,
- int blocking) {
- if (results->lossCount == 0)
- return 0;
- if (blocking == YES) {
- if (timeoutOccured == YES)
- return results->lossCount;
- SEMAPHORE_DOWN(postsem, YES);
- } else {
- if (OK != SEMAPHORE_DOWN(postsem, NO))
- return results->lossCount;
- }
- do {
- if (timeoutOccured == YES) {
- SEMAPHORE_UP(presem);
- return results->lossCount;
- }
- if (lastPacketNumber > results->maxPacketNumber) {
- SEMAPHORE_UP(presem);
- return results->lossCount;
- }
- if (0 == results->packetsReceived[lastPacketNumber]++) {
- results->lossCount--;
- } else {
- results->duplicateCount++;
-#if DEBUG_TBENCH
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Received duplicate message %u from iteration %u\n",
- lastPacketNumber,
- currIteration);
-#endif
- }
- SEMAPHORE_UP(presem);
- } while (OK == SEMAPHORE_DOWN(postsem, NO));
- return results->lossCount;
-}
+static cron_t earlyEnd;
+
/**
* Another peer send us a tbench request. Just turn
* around and send it back.
@@ -134,6 +93,11 @@
MESSAGE_HEADER * reply;
const P2P_tbench_MESSAGE * msg;
+#if DEBUG_TBENCH
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "Received tbench request\n");
+#endif
if ( ntohs(message->size) < sizeof(P2P_tbench_MESSAGE)) {
GE_BREAK(ectx, 0);
return SYSERR;
@@ -147,11 +111,12 @@
}
#if DEBUG_TBENCH
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Received request %u from iteration %u/%u\n",
- htonl(msg->packetNum),
- htonl(msg->iterationNum),
- htonl(msg->nounce));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "Received request %u from iteration %u/%u\n",
+ htonl(msg->packetNum),
+ htonl(msg->iterationNum),
+ htonl(msg->nounce));
#endif
reply = MALLOC(ntohs(message->size));
memcpy(reply,
@@ -160,7 +125,7 @@
reply->type = htons(P2P_PROTO_tbench_REPLY);
coreAPI->unicast(sender,
reply,
- ntohl(msg->priority), /* medium importance */
+ ntohl(msg->priority),
0); /* no delay */
FREE(reply);
return OK;
@@ -172,6 +137,8 @@
static int handleTBenchReply(const PeerIdentity * sender,
const MESSAGE_HEADER * message) {
const P2P_tbench_MESSAGE * pmsg;
+ unsigned int lastPacketNumber;
+ IterationData * res;
if (ntohs(message->size) < sizeof(P2P_tbench_MESSAGE)) {
GE_BREAK(ectx, 0);
@@ -184,29 +151,38 @@
GE_BREAK(ectx, 0);
return SYSERR;
}
-#if DEBUG_TBENCH
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Received response %u from iteration %u/%u\n",
- htonl(pmsg->packetNum),
- htonl(pmsg->iterationNum),
- htonl(pmsg->nounce));
-#endif
- MUTEX_LOCK(lock);
+ MUTEX_LOCK(lock);
if ( (timeoutOccured == NO) &&
- (presem != NULL) &&
(postsem != NULL) &&
(htonl(pmsg->iterationNum) == currIteration) &&
(htonl(pmsg->nounce) == currNounce) ) {
- SEMAPHORE_DOWN(presem, YES);
+ res = &results[currIteration];
lastPacketNumber = ntohl(pmsg->packetNum);
- SEMAPHORE_UP(postsem);
+ if (lastPacketNumber <= res->maxPacketNumber) {
+ if (0 == res->packetsReceived[lastPacketNumber]++) {
+ res->lossCount--;
+ if (res->lossCount == 0)
+ earlyEnd = get_time();
+ } else {
+ res->duplicateCount++;
+ }
+ }
+#if DEBUG_TBENCH
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "Received response %u from iteration %u/%u on time!\n",
+ htonl(pmsg->packetNum),
+ htonl(pmsg->iterationNum),
+ htonl(pmsg->nounce));
+#endif
} else {
#if DEBUG_TBENCH
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Received message %u from iteration %u too late (now at iteration
%u)\n",
- ntohl(pmsg->packetNum),
- ntohl(pmsg->iterationNum),
- currIteration);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "Received message %u from iteration %u too late (now at iteration
%u)\n",
+ ntohl(pmsg->packetNum),
+ ntohl(pmsg->iterationNum),
+ currIteration);
#endif
}
MUTEX_UNLOCK(lock);
@@ -237,20 +213,25 @@
cron_t endTime;
cron_t now;
cron_t delay;
- cron_t delayStart;
- IterationData * results;
unsigned long long sum_loss;
unsigned int max_loss;
unsigned int min_loss;
cron_t sum_time;
cron_t min_time;
cron_t max_time;
- cron_t earlyEnd;
double sum_variance_time;
double sum_variance_loss;
unsigned int msgCnt;
unsigned int iterations;
+#if DEBUG_TBENCH
+ GE_LOG(ectx,
+ GE_DEBUG | GE_USER | GE_BULK,
+ "Tbench received request from client.\n",
+ msgCnt,
+ size,
+ iterations);
+#endif
if ( ntohs(message->size) != sizeof(CS_tbench_request_MESSAGE) )
return SYSERR;
@@ -262,12 +243,21 @@
iterations = ntohl(msg->iterations);
msgCnt = ntohl(msg->msgCnt);
#if DEBUG_TBENCH
- LOG(LOG_MESSAGE,
- "Tbench runs %u test messages of size %u in %u iterations.\n",
- msgCnt,
- size,
- iterations);
+ GE_LOG(ectx,
+ GE_INFO | GE_USER | GE_BULK,
+ "Tbench runs %u test messages of size %u in %u iterations.\n",
+ msgCnt,
+ size,
+ iterations);
#endif
+ MUTEX_LOCK(lock);
+ if (results != NULL) {
+ GE_LOG(ectx,
+ GE_WARNING | GE_USER | GE_IMMEDIATE,
+ "Cannot run multiple tbench sessions at the same time!\n");
+ MUTEX_UNLOCK(lock);
+ return SYSERR;
+ }
results = MALLOC(sizeof(IterationData) * iterations);
p2p = MALLOC(size);
@@ -278,7 +268,6 @@
p2p->header.type = htons(P2P_PROTO_tbench_REQUEST);
p2p->priority = msg->priority;
- MUTEX_LOCK(lock);
for (iteration=0;iteration<iterations;iteration++) {
results[iteration].maxPacketNumber = msgCnt;
results[iteration].packetsReceived = MALLOC(msgCnt);
@@ -289,7 +278,6 @@
results[iteration].duplicateCount = 0;
earlyEnd = 0;
- presem = SEMAPHORE_CREATE(1);
postsem = SEMAPHORE_CREATE(0);
currNounce = weak_randomi(0xFFFFFF);
p2p->nounce
@@ -316,72 +304,43 @@
postsem);
for (packetNum=0;packetNum<msgCnt;packetNum++){
now = get_time();
- if (now > endTime)
- break; /* timeout */
-
p2p->packetNum = htonl(packetNum);
#if DEBUG_TBENCH
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Sending message %u of size %u in iteration %u\n",
- packetNum,
- size,
- iteration);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "Sending message %u of size %u in iteration %u\n",
+ packetNum,
+ size,
+ iteration);
#endif
coreAPI->unicast(&msg->receiverId,
&p2p->header,
ntohl(msg->priority),
0); /* no delay */
- pollResults(&results[iteration], NO);
if ( (delay != 0) &&
(htonl(msg->trainSize) != 0) &&
- (packetNum % htonl(msg->trainSize)) == 0) {
- delayStart = now;
- while ( (get_time() < (delayStart+delay)) &&
- (timeoutOccured == NO) ) {
- now = get_time();
- if (delayStart + delay - now > 5 * cronMILLIS) {
- pollResults(&results[iteration], NO);
- PTHREAD_SLEEP(5 * cronMILLIS);
- } else
- PTHREAD_SLEEP(delayStart + delay - now);
- }
- }
- if ( (0 == pollResults(&results[iteration], NO)) &&
- (earlyEnd == 0) )
- earlyEnd = get_time();
+ (packetNum % htonl(msg->trainSize)) == 0)
+ PTHREAD_SLEEP(delay);
}
- while ( (timeoutOccured == NO) &&
- (get_time() < endTime) ) {
- if ( (0 == pollResults(&results[iteration], YES) ) &&
- (earlyEnd == 0) )
- earlyEnd = get_time();
- PTHREAD_SLEEP(5 * cronMILLIS);
- }
-
- /* make sure to unblock waiting jobs */
- timeoutOccured = YES;
- SEMAPHORE_UP(presem);
-
+ SEMAPHORE_DOWN(postsem, YES);
MUTEX_LOCK(lock);
if (earlyEnd == 0)
- earlyEnd = now;
+ earlyEnd = get_time();
results[iteration].totalTime
= earlyEnd - startTime;
FREE(results[iteration].packetsReceived);
- cron_suspend(coreAPI->cron,
- NO);
- cron_del_job(coreAPI->cron,
- &semaUp,
- 0,
- postsem);
- cron_resume_jobs(coreAPI->cron,
- NO);
- SEMAPHORE_DESTROY(presem);
SEMAPHORE_DESTROY(postsem);
- presem = NULL;
postsem = NULL;
}
MUTEX_UNLOCK(lock);
+#if DEBUG_TBENCH
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "Done waiting for response.\n",
+ packetNum,
+ size,
+ iteration);
+#endif
sum_loss = 0;
sum_time = 0;
@@ -427,6 +386,7 @@
reply.variance_time = sum_variance_time/(iterations-1);
reply.variance_loss = sum_variance_loss/(iterations-1);
FREE(results);
+ results = NULL;
return coreAPI->sendToClient(client,
&reply.header);
}
@@ -455,7 +415,7 @@
GE_ASSERT(capi->ectx,
0 == GC_set_configuration_value_string(capi->cfg,
capi->ectx,
- "ABOUT",
+ "ABOUT",
"tbench",
gettext_noop("allows
profiling of direct "
"peer-to-peer
connections")));
Modified: GNUnet/src/applications/tbench/tbenchtest.c
===================================================================
--- GNUnet/src/applications/tbench/tbenchtest.c 2006-09-12 21:15:40 UTC (rev
3397)
+++ GNUnet/src/applications/tbench/tbenchtest.c 2006-09-14 05:09:41 UTC (rev
3398)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing
authors)
+ (C) 2001, 2002, 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -27,15 +27,11 @@
#include "platform.h"
#include "gnunet_protocols.h"
#include "gnunet_stats_lib.h"
+#include "gnunet_util_crypto.h"
#include "gnunet_util_config_impl.h"
#include "gnunet_util_network_client.h"
#include "tbench.h"
-/**
- * Identity of peer 2.
- */
-static PeerIdentity peer2;
-
static int test(struct ClientServerConnection * sock,
unsigned int messageSize,
unsigned int messageCnt,
@@ -43,11 +39,15 @@
cron_t messageSpacing,
unsigned int messageTrainSize,
cron_t messageTimeOut /* in milli-seconds */) {
+ PeerIdentity peer2;
int ret;
CS_tbench_request_MESSAGE msg;
CS_tbench_reply_MESSAGE * buffer;
float messagesPercentLoss;
+
enc2hash("BV3AS3KMIIBVIFCGEG907N6NTDTH26B7T6FODUSLSGK5B2Q58IEU1VF5FTR838449CSHVBOAHLDVQAOA33O77FOPDA8F1VIKESLSNBO",
+ &peer2.hashPubKey);
+
printf(_("Using %u messages of size %u for %u times.\n"),
messageCnt,
messageSize,
@@ -64,7 +64,7 @@
msg.receiverId = peer2;
if (SYSERR == connection_write(sock,
- &msg.header))
+ &msg.header))
return -1;
ret = 0;
@@ -211,7 +211,7 @@
printf(_("Running benchmark...\n"));
/* 'slow' pass: wait for bandwidth negotiation! */
if (ret == 0)
- ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 30 * cronSECONDS);
+ ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 5 * cronSECONDS);
checkConnected(sock);
/* 'blast' pass: hit bandwidth limits! */
for (i=8;i<60000;i*=2) {
Modified: GNUnet/src/applications/tbench/tbenchtest_udp.c
===================================================================
--- GNUnet/src/applications/tbench/tbenchtest_udp.c 2006-09-12 21:15:40 UTC
(rev 3397)
+++ GNUnet/src/applications/tbench/tbenchtest_udp.c 2006-09-14 05:09:41 UTC
(rev 3398)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet.
- (C) 2001, 2002, 2004, 2005 Christian Grothoff (and other contributing
authors)
+ (C) 2001, 2002, 2004, 2005, 2006 Christian Grothoff (and other
contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -27,15 +27,11 @@
#include "platform.h"
#include "gnunet_protocols.h"
#include "gnunet_stats_lib.h"
+#include "gnunet_util_crypto.h"
#include "gnunet_util_config_impl.h"
#include "gnunet_util_network_client.h"
#include "tbench.h"
-/**
- * Identity of peer 2.
- */
-static PeerIdentity peer2;
-
static int test(struct ClientServerConnection * sock,
unsigned int messageSize,
unsigned int messageCnt,
@@ -43,11 +39,14 @@
cron_t messageSpacing,
unsigned int messageTrainSize,
cron_t messageTimeOut /* in milli-seconds */) {
+ PeerIdentity peer2;
int ret;
CS_tbench_request_MESSAGE msg;
CS_tbench_reply_MESSAGE * buffer;
float messagesPercentLoss;
+
enc2hash("BV3AS3KMIIBVIFCGEG907N6NTDTH26B7T6FODUSLSGK5B2Q58IEU1VF5FTR838449CSHVBOAHLDVQAOA33O77FOPDA8F1VIKESLSNBO",
+ &peer2.hashPubKey);
printf(_("Using %u messages of size %u for %u times.\n"),
messageCnt,
messageSize,
@@ -165,8 +164,8 @@
#endif
/* in case existing hellos have expired */
PTHREAD_SLEEP(30 * cronSECONDS);
- system("cp peer1/data/hosts/* peer2/data/hosts/");
- system("cp peer2/data/hosts/* peer1/data/hosts/");
+ system("cp peer1-udp/data/hosts/* peer2-udp/data/hosts/");
+ system("cp peer2-udp/data/hosts/* peer1-udp/data/hosts/");
ret = 0;
#if START_PEERS
if (daemon1 != -1) {
@@ -211,7 +210,7 @@
printf(_("Running benchmark...\n"));
/* 'slow' pass: wait for bandwidth negotiation! */
if (ret == 0)
- ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 30 * cronSECONDS);
+ ret = test(sock, 64, 100, 4, 50 * cronMILLIS, 1, 5 * cronSECONDS);
checkConnected(sock);
/* 'blast' pass: hit bandwidth limits! */
for (i=8;i<60000;i*=2) {
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/server/connection.c 2006-09-14 05:09:41 UTC (rev 3398)
@@ -128,7 +128,7 @@
/**
* How often do we expect to re-run the traffic allocation
* code? (depends on MINIMUM_SAMPLE_COUNT and MIN_BPM_PER_PEER
- * and MTU size).
+ * and MTU size). [2 * 32 M / 50 = 5M ]
*/
#define MIN_SAMPLE_TIME (MINIMUM_SAMPLE_COUNT * cronMINUTES * EXPECTED_MTU /
MIN_BPM_PER_PEER)
@@ -532,9 +532,14 @@
}
*dst = 0;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "%s: Sender `%s', key `%s', IV %u msg CRC %u\n",
- prefix, &enc, skey, *((int *) iv), crc);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "%s: Sender `%s', key `%s', IV %u msg CRC %u\n",
+ prefix,
+ &enc,
+ skey,
+ *((int *) iv),
+ crc);
}
#endif
@@ -854,14 +859,16 @@
/ (be->max_bpm * cronMINUTES / cronMILLIS) /* bytes per ms */
/2; /* some head-room */
}
- /* Also: allow at least MINIMUM_SAMPLE_COUNT knapsack
+ /* Also: allow at least 2 * MINIMUM_SAMPLE_COUNT knapsack
solutions for any MIN_SAMPLE_TIME! */
- if(be->MAX_SEND_FREQUENCY > MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
- be->MAX_SEND_FREQUENCY = MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
+ if (be->MAX_SEND_FREQUENCY > 2 * MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT)
+ be->MAX_SEND_FREQUENCY = 2 * MIN_SAMPLE_TIME / MINIMUM_SAMPLE_COUNT;
- if(be->lastSendAttempt + be->MAX_SEND_FREQUENCY > get_time()) {
-#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Send frequency too high
(CPU load), send deferred.\n");
+ if (be->lastSendAttempt + be->MAX_SEND_FREQUENCY > get_time()) {
+#if DEBUG_CONNECTION || 1
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Send frequency too high (CPU load), send deferred.\n");
#endif
return NO; /* frequency too high, wait */
}
@@ -882,6 +889,7 @@
int i;
int j;
int approxProb;
+ cron_t deadline;
totalMessageSize = 0;
(*priority) = 0;
@@ -891,45 +899,56 @@
if (be->session.mtu == 0) {
totalMessageSize = sizeof(P2P_PACKET_HEADER);
+ deadline = (cron_t) -1L; /* infinity */
+
i = 0;
/* assumes entries are sorted by priority! */
- while(i < be->sendBufferSize) {
+ while (i < be->sendBufferSize) {
entry = be->sendBuffer[i];
- if((totalMessageSize + entry->len < MAX_BUFFER_SIZE) &&
- (entry->pri >= EXTREME_PRIORITY)) {
+ if ( (totalMessageSize + entry->len < MAX_BUFFER_SIZE) &&
+ (entry->pri >= EXTREME_PRIORITY)) {
entry->knapsackSolution = YES;
+ if (entry->transmissionTime < deadline)
+ deadline = entry->transmissionTime;
(*priority) += entry->pri;
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Selecting msg %u with
length %u\n", i, entry->len);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Selecting msg %u with length %u\n",
+ i,
+ entry->len);
#endif
totalMessageSize += entry->len;
- }
- else {
+ } else {
entry->knapsackSolution = NO;
break;
}
i++;
}
- if((i == 0) && (be->sendBuffer[i]->len > be->available_send_window))
+ if ( (i == 0) &&
+ (be->sendBuffer[i]->len > be->available_send_window)) {
return 0; /* always wait for the highest-priority
message (otherwise large messages may
starve! */
- while((i < be->sendBufferSize) &&
- (be->available_send_window > totalMessageSize)) {
+ }
+ while ( (i < be->sendBufferSize) &&
+ (be->available_send_window > totalMessageSize)) {
entry = be->sendBuffer[i];
- if((entry->len + totalMessageSize <=
- be->available_send_window) &&
- (totalMessageSize + entry->len < MAX_BUFFER_SIZE)) {
+ if ( (entry->len + totalMessageSize <= be->available_send_window) &&
+ (totalMessageSize + entry->len < MAX_BUFFER_SIZE)) {
entry->knapsackSolution = YES;
+ if (entry->transmissionTime < deadline)
+ deadline = entry->transmissionTime;
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Selecting msg %u with
length %u\n", i, entry->len);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Selecting msg %u with length %u\n", i, entry->len);
#endif
totalMessageSize += entry->len;
(*priority) += entry->pri;
- }
- else {
+ } else {
entry->knapsackSolution = NO;
- if(totalMessageSize == sizeof(P2P_PACKET_HEADER)) {
+ if (totalMessageSize == sizeof(P2P_PACKET_HEADER)) {
/* if the highest-priority message does not yet
fit, wait for send window to grow so that
we can get it out (otherwise we would starve
@@ -939,10 +958,11 @@
}
i++;
}
- if((totalMessageSize == sizeof(P2P_PACKET_HEADER)) ||
- (((*priority) < EXTREME_PRIORITY) &&
- ((totalMessageSize / sizeof(P2P_PACKET_HEADER)) < 4) &&
- (weak_randomi(16) != 0))) {
+ if ( (totalMessageSize == sizeof(P2P_PACKET_HEADER)) ||
+ ( ((*priority) < EXTREME_PRIORITY) &&
+ ((totalMessageSize / sizeof(P2P_PACKET_HEADER)) < 4) &&
+ (deadline > get_time() + 500 * cronMILLIS) &&
+ (weak_randomi(16) != 0) ) ) {
/* randomization necessary to ensure we eventually send
a small message if there is nothing else to do! */
return 0;
@@ -963,7 +983,10 @@
be->session.mtu -
sizeof(P2P_PACKET_HEADER));
#if DEBUG_COLLECT_PRIO == YES
- FPRINTF(prioFile, "%llu 0 %d\n", get_time(), priority);
+ FPRINTF(prioFile,
+ "%llu 0 %d\n",
+ get_time(),
+ priority);
#endif
}
else {
@@ -971,7 +994,10 @@
be->session.mtu -
sizeof(P2P_PACKET_HEADER));
#if DEBUG_COLLECT_PRIO == YES
- FPRINTF(prioFile, "%llu 1 %d\n", get_time(), priority);
+ FPRINTF(prioFile,
+ "%llu 1 %d\n",
+ get_time(),
+ priority);
#endif
}
}
@@ -980,26 +1006,32 @@
be->session.mtu -
sizeof(P2P_PACKET_HEADER));
#if DEBUG_COLLECT_PRIO == YES
- FPRINTF(prioFile, "%llu 2 %d\n", get_time(), priority);
+ FPRINTF(prioFile,
+ "%llu 2 %d\n",
+ get_time(),
+ priority);
#endif
}
j = 0;
for(i = 0; i < be->sendBufferSize; i++)
if(be->sendBuffer[i]->knapsackSolution == YES)
j++;
- if(j == 0) {
- GE_LOG(ectx, GE_ERROR | GE_BULK | GE_DEVELOPER,
- _("`%s' selected %d out of %d messages (MTU: %d).\n"),
- __FUNCTION__,
- j, be->sendBufferSize,
- be->session.mtu - sizeof(P2P_PACKET_HEADER));
-
+ if (j == 0) {
+ GE_LOG(ectx,
+ GE_ERROR | GE_BULK | GE_DEVELOPER,
+ _("`%s' selected %d out of %d messages (MTU: %d).\n"),
+ __FUNCTION__,
+ j,
+ be->sendBufferSize,
+ be->session.mtu - sizeof(P2P_PACKET_HEADER));
+
for(j = 0; j < be->sendBufferSize; j++)
- GE_LOG(ectx, GE_ERROR | GE_BULK | GE_DEVELOPER,
- _("Message details: %u: length %d, priority: %d\n"),
- j,
- be->sendBuffer[j]->len,
- be->sendBuffer[j]->pri);
+ GE_LOG(ectx,
+ GE_ERROR | GE_BULK | GE_DEVELOPER,
+ _("Message details: %u: length %d, priority: %d\n"),
+ j,
+ be->sendBuffer[j]->len,
+ be->sendBuffer[j]->pri);
return 0;
}
@@ -1007,11 +1039,12 @@
/* if we have a very high priority, we may
want to ignore bandwidth availability (e.g. for HANGUP,
which has EXTREME_PRIORITY) */
- if((*priority) < EXTREME_PRIORITY) {
+ if ((*priority) < EXTREME_PRIORITY) {
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "bandwidth limits prevent sending (send window %u too small).\n",
- be->available_send_window);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "bandwidth limits prevent sending (send window %u too small).\n",
+ be->available_send_window);
#endif
return 0; /* can not send, BPS available is too small */
}
@@ -1039,7 +1072,9 @@
be->lastSendAttempt = get_time();
expired = be->lastSendAttempt - SECONDS_PINGATTEMPT * cronSECONDS;
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "policy prevents sending
message\n");
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "policy prevents sending message\n");
#endif
load = os_cpu_get_load(ectx, cfg);
@@ -1067,10 +1102,11 @@
continue;
if(entry->transmissionTime <= expired) {
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "expiring message, expired %ds ago, queue size is %llu (bandwidth
stressed)\n",
- (int) ((get_time() - entry->transmissionTime) / cronSECONDS),
- usedBytes);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "expiring message, expired %ds ago, queue size is %llu (bandwidth
stressed)\n",
+ (int) ((get_time() - entry->transmissionTime) / cronSECONDS),
+ usedBytes);
#endif
if (stats != NULL) {
stats->change(stat_messagesDropped, 1);
@@ -1308,30 +1344,48 @@
GE_BREAK(ectx, 0);
return;
}
- if((be->status != STAT_UP) ||
- (be->sendBufferSize == 0) || (be->inSendBuffer == YES)) {
+ if ( (be->status != STAT_UP) ||
+ (be->sendBufferSize == 0) ||
+ (be->inSendBuffer == YES) ) {
return; /* must not run */
}
be->inSendBuffer = YES;
- if((OK != ensureTransportConnected(be)) ||
- (be->sendBufferSize == 0) || (OK != checkSendFrequency(be))) {
+ if ( (OK != ensureTransportConnected(be)) ||
+ (be->sendBufferSize == 0) ||
+ (OK != checkSendFrequency(be)) ){
be->inSendBuffer = NO;
+#if 0
+ GE_LOG(ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Will not try to send: %d %d %d\n",
+ (OK != ensureTransportConnected(be)),
+ (be->sendBufferSize == 0),
+ (OK != checkSendFrequency(be)));
+#endif
return;
}
/* test if receiver has enough bandwidth available! */
updateCurBPS(be);
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "receiver window available: %lld bytes (MTU: %u)\n",
- be->available_send_window, be->session.mtu);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "receiver window available: %lld bytes (MTU: %u)\n",
+ be->available_send_window,
+ be->session.mtu);
#endif
totalMessageSize = selectMessagesToSend(be, &priority);
if (totalMessageSize == 0) {
expireSendBufferEntries(be);
be->inSendBuffer = NO;
+#if DEBUG_CONNECTION
+ GE_LOG(ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "No messages selected for sending (%d)\n",
+ be->available_send_window);
+#endif
return; /* deferr further */
}
GE_ASSERT(ectx, totalMessageSize > sizeof(P2P_PACKET_HEADER));
@@ -1339,8 +1393,11 @@
/* check if we (sender) have enough bandwidth available
if so, trigger callbacks on selected entries; if either
fails, return (but clean up garbage) */
- if((SYSERR == outgoingCheck(priority)) ||
- (0 == prepareSelectedMessages(be))) {
+ if ((SYSERR == outgoingCheck(priority)) ||
+ (0 == prepareSelectedMessages(be))) {
+ GE_LOG(ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Insufficient bandwidth or priority to send message\n");
expireSendBufferEntries(be);
be->inSendBuffer = NO;
return; /* deferr further */
@@ -1365,7 +1422,11 @@
continue;
if(entry->knapsackSolution == YES) {
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Queuing msg %u with
length %u\n", perm[i], entry->len);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Queuing msg %u with length %u\n",
+ perm[i],
+ entry->len);
#endif
GE_ASSERT(ectx, entry->callback == NULL);
GE_ASSERT(ectx, p + entry->len <= totalMessageSize);
@@ -1414,14 +1475,22 @@
(const INITVECTOR *) encryptedMsg, /* IV */
&((P2P_PACKET_HEADER *) encryptedMsg)->sequenceNumber);
#if DEBUG_CONNECTION
- printMsg("Encrypting P2P data", &be->session.sender,
- &be->skey_local, (const INITVECTOR *) encryptedMsg,
+ printMsg("Encrypting P2P data",
+ &be->session.sender,
+ &be->skey_local,
+ (const INITVECTOR *) encryptedMsg,
crc32N(&((P2P_PACKET_HEADER *) encryptedMsg)->sequenceNumber,
ret));
#endif
if(stats != NULL)
stats->change(stat_encrypted, p - sizeof(HashCode512));
GE_ASSERT(ectx, be->session.tsession != NULL);
+#if DEBUG_CONNECTION
+ GE_LOG(ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Asking transport to send message with priority %u\n",
+ priority);
+#endif
ret = transport->send(be->session.tsession,
encryptedMsg,
p,
@@ -1512,19 +1581,24 @@
}
#if DEBUG_CONNECTION
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
hash2enc(&be->session.sender.hashPubKey, &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "adding message of size %d to buffer of host `%s'\n",
- se->len,
- &enc);
+ IF_GELOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ hash2enc(&be->session.sender.hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "adding message of size %d to buffer of host `%s'\n",
+ se->len,
+ &enc);
#endif
if((be->sendBufferSize > 0) && (be->status != STAT_UP)) {
/* as long as we do not have a confirmed
connection, do NOT queue messages! */
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "not connected to `%s', message dropped\n",
- &enc);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "not connected to `%s', message dropped\n",
+ &enc);
#endif
FREE(se->closure);
FREE(se);
@@ -1534,23 +1608,24 @@
for(i = 0; i < be->sendBufferSize; i++)
queueSize += be->sendBuffer[i]->len;
- if(queueSize >= MAX_SEND_BUFFER_SIZE) {
+ if (queueSize >= MAX_SEND_BUFFER_SIZE) {
/* first, try to remedy! */
sendBuffer(be);
/* did it work? */
queueSize = 0;
- for(i = 0; i < be->sendBufferSize; i++)
+ for (i = 0; i < be->sendBufferSize; i++)
queueSize += be->sendBuffer[i]->len;
- if(queueSize >= MAX_SEND_BUFFER_SIZE) {
+ if (queueSize >= MAX_SEND_BUFFER_SIZE) {
/* we need to enforce some hard limit here, otherwise we may take
FAR too much memory (200 MB easily) */
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "queueSize (%llu) >= %d, refusing to queue message.\n",
- queueSize,
- MAX_SEND_BUFFER_SIZE);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "queueSize (%llu) >= %d, refusing to queue message.\n",
+ queueSize,
+ MAX_SEND_BUFFER_SIZE);
#endif
FREE(se->closure);
FREE(se);
@@ -1847,12 +1922,13 @@
int firstRound;
int earlyRun;
int load;
+ int * perm;
MUTEX_LOCK(lock);
now = get_time();
/* if this is the first round, don't bother... */
- if(lastRoundStart == 0) {
+ if (lastRoundStart == 0) {
/* no allocation the first time this function is called! */
lastRoundStart = now;
forAllConnectedHosts(&resetRecentlyReceived, NULL);
@@ -1861,7 +1937,7 @@
}
activePeerCount = forAllConnectedHosts(NULL, NULL);
- if(activePeerCount == 0) {
+ if (activePeerCount == 0) {
MUTEX_UNLOCK(lock);
return; /* nothing to be done here. */
}
@@ -1874,15 +1950,15 @@
to the limits anyway) */
timeDifference = now - lastRoundStart;
earlyRun = 0;
- if(timeDifference < MIN_SAMPLE_TIME) {
+ if (timeDifference < MIN_SAMPLE_TIME) {
earlyRun = 1;
- if(activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
+ if (activePeerCount > CONNECTION_MAX_HOSTS_ / 16) {
MUTEX_UNLOCK(lock);
return; /* don't update too frequently, we need at
least some
semi-representative sampling! */
}
}
- if(timeDifference == 0)
+ if (timeDifference == 0)
timeDifference = 1;
/* build an array containing all BEs */
@@ -1904,28 +1980,28 @@
}
/* normalize distribution */
- if(shareSum >= 0.00001) { /* avoid numeric glitches... */
+ if (shareSum >= 0.00001) { /* avoid numeric glitches... */
for(u = 0; u < activePeerCount; u++)
shares[u] = shares[u] / shareSum;
- }
- else {
+ } else {
+ /* proportional shareing */
for(u = 0; u < activePeerCount; u++)
shares[u] = 1 / activePeerCount;
}
/* compute how much bandwidth we can bargain with */
minCon = minConnect();
- if(minCon > activePeerCount)
+ if (minCon > activePeerCount)
minCon = activePeerCount;
schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
load = os_network_monitor_get_load(load_monitor,
Download);
- if(load > 100) {
+ if (load > 100) {
/* take counter measures! */
schedulableBandwidth = schedulableBandwidth * 100 / load;
/* make sure we do not take it down too far */
- if((schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
- (max_bpm > minCon * MIN_BPM_PER_PEER * 2))
+ if ( (schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
+ (max_bpm > minCon * MIN_BPM_PER_PEER * 2) )
schedulableBandwidth = minCon * MIN_BPM_PER_PEER / 2;
}
@@ -1943,34 +2019,42 @@
#if DEBUG_CONNECTION
if(adjustedRR[u] > entries[u]->idealized_limit) {
EncName enc;
- IF_GELOG(ectx, GE_INFO | GE_BULK | GE_USER,
hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
- GE_LOG(ectx, GE_INFO | GE_BULK | GE_USER,
- "peer `%s' transmitted above limit: %llu bpm > %u bpm\n",
- &enc, adjustedRR[u], entries[u]->idealized_limit);
+ IF_GELOG(ectx,
+ GE_INFO | GE_BULK | GE_USER,
+ hash2enc(&entries[u]->session.sender.hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_INFO | GE_BULK | GE_USER,
+ "peer `%s' transmitted above limit: %llu bpm > %u bpm\n",
+ &enc,
+ adjustedRR[u],
+ entries[u]->idealized_limit);
}
#endif
/* Check for peers grossly exceeding send limits. Be a bit
* reasonable and make the check against the max value we have
* sent to this peer (assume announcements may have got lost).
*/
- if((earlyRun == 0) &&
- (adjustedRR[u] > 2 * MAX_BUF_FACT *
- entries[u]->max_transmitted_limit) &&
- (adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
+ if ( (earlyRun == 0) &&
+ (adjustedRR[u] > 2 * MAX_BUF_FACT *
+ entries[u]->max_transmitted_limit) &&
+ (adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
EncName enc;
entries[u]->violations++;
entries[u]->recently_received = 0; /* "clear" slate */
if (entries[u]->violations > 10) {
- IF_GELOG(ectx, GE_INFO | GE_BULK | GE_USER,
- hash2enc(&entries[u]->session.sender.hashPubKey,
- &enc));
- GE_LOG(ectx, GE_INFO | GE_BULK | GE_USER,
- "blacklisting `%s': sent repeatedly %llu bpm "
- "(limit %u bpm, target %u bpm)\n",
- &enc,
- adjustedRR[u],
- entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
+ IF_GELOG(ectx,
+ GE_INFO | GE_BULK | GE_USER,
+ hash2enc(&entries[u]->session.sender.hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_INFO | GE_BULK | GE_USER,
+ "blacklisting `%s': sent repeatedly %llu bpm "
+ "(limit %u bpm, target %u bpm)\n",
+ &enc,
+ adjustedRR[u],
+ entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
identity->blacklistHost(&entries[u]->session.sender,
1 / topology->getSaturation(), YES);
shutdownConnection(entries[u]);
@@ -1981,11 +2065,10 @@
u--;
continue;
}
- }
- else {
- if((earlyRun == 0) &&
- (adjustedRR[u] < entries[u]->max_transmitted_limit / 2) &&
- (entries[u]->violations > 0)) {
+ } else {
+ if ( (earlyRun == 0) &&
+ (adjustedRR[u] < entries[u]->max_transmitted_limit / 2) &&
+ (entries[u]->violations > 0)) {
/* allow very low traffic volume to
balance out (rare) times of high
volume */
@@ -1993,7 +2076,7 @@
}
}
- if(adjustedRR[u] < MIN_BPM_PER_PEER / 2)
+ if (adjustedRR[u] < MIN_BPM_PER_PEER / 2)
adjustedRR[u] = MIN_BPM_PER_PEER / 2;
/* even if we received NO traffic, allow
at least MIN_BPM_PER_PEER */
@@ -2015,84 +2098,73 @@
firstRound = YES;
for (u = 0; u < activePeerCount; u++)
entries[u]->idealized_limit = 0;
- while((schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
- (activePeerCount > 0) && (didAssign == YES)) {
+ while ( (schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
+ (activePeerCount > 0) &&
+ (didAssign == YES) ) {
didAssign = NO;
decrementSB = 0;
- for(u = 0; u < activePeerCount; u++) {
+ for (u = 0; u < activePeerCount; u++) {
/* always allow allocating MIN_BPM_PER_PEER */
- if((firstRound == NO) ||
- (entries[u]->idealized_limit < adjustedRR[u] * 2)) {
+ if ( (firstRound == NO) ||
+ (entries[u]->idealized_limit < adjustedRR[u] * 2) ) {
unsigned int share;
- share =
+ share =
entries[u]->idealized_limit +
(unsigned int) (shares[u] * schedulableBandwidth);
- if(share < entries[u]->idealized_limit)
+ if (share < entries[u]->idealized_limit)
share = 0xFFFFFFFF; /* int overflow */
- if((share > adjustedRR[u] * 2) && (firstRound == YES))
+ if ( (share > adjustedRR[u] * 2) && (firstRound == YES))
share = adjustedRR[u] * 2;
- if(share > entries[u]->idealized_limit) {
+ if ( (share < MIN_BPM_PER_PEER) &&
+ (minCon > 0) ) {
+ /* use one of the minCon's to keep the connection! */
+ share += MIN_BPM_PER_PEER;
+ decrementSB -= MIN_BPM_PER_PEER; /* do not count */
+ minCon--;
+ }
+ if (share > entries[u]->idealized_limit) {
decrementSB += share - entries[u]->idealized_limit;
didAssign = YES;
}
- if((share < MIN_BPM_PER_PEER) && (minCon > 0)) {
- /* use one of the minCon's to keep the connection! */
- decrementSB -= share;
- share = MIN_BPM_PER_PEER;
- minCon--;
- }
+ if (share > 0)
+ didAssign = YES;
entries[u]->idealized_limit = share;
}
- }
- if(decrementSB > schedulableBandwidth) {
+ } /* end for all peers */
+ if (decrementSB < schedulableBandwidth) {
schedulableBandwidth -= decrementSB;
- }
- else {
+ } else {
schedulableBandwidth = 0;
break;
}
- if((activePeerCount > 0) && (didAssign == NO)) {
- int *perm = permute(WEAK, activePeerCount);
+ if ( (activePeerCount > 0) &&
+ (didAssign == NO) ) {
+ perm = permute(WEAK, activePeerCount);
/* assign also to random "worthless" (zero-share) peers */
- for(u = 0; u < activePeerCount; u++) {
+ for (u = 0; u < activePeerCount; u++) {
unsigned int v = perm[u]; /* use perm to avoid preference to
low-numbered slots */
- if((firstRound == NO) ||
- (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
+ if ( (firstRound == NO) ||
+ (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
unsigned int share;
share =
entries[v]->idealized_limit +
(unsigned int) (schedulableBandwidth);
- if(share < entries[u]->idealized_limit)
+ if (share < entries[u]->idealized_limit)
share = 0xFFFFFFFF; /* int overflow */
- if((firstRound == YES) && (share > adjustedRR[u] * 2))
+ if ((firstRound == YES) && (share > adjustedRR[u] * 2))
share = adjustedRR[u] * 2;
- schedulableBandwidth -= share - entries[v]->idealized_limit;
- entries[v]->idealized_limit = share;
+
+ schedulableBandwidth -= share - entries[v]->idealized_limit;
+ entries[v]->idealized_limit = share;
}
}
FREE(perm);
perm = NULL;
- if((schedulableBandwidth > 0) && (activePeerCount > 0)) {
- /* assign rest disregarding traffic limits */
- perm = permute(WEAK, activePeerCount);
- for(u = 0; u < activePeerCount; u++) {
- unsigned int share;
-
- share =
- entries[perm[u]]->idealized_limit +
- (unsigned int) (schedulableBandwidth / activePeerCount);
- if(share > entries[perm[u]]->idealized_limit) /* no int-overflow? */
- entries[perm[u]]->idealized_limit = share;
- }
- schedulableBandwidth = 0;
- FREE(perm);
- perm = NULL;
- }
- } /* didAssign == NO? */
- if(firstRound == YES) {
+ } /* didAssign == NO? */
+ if (firstRound == YES) {
/* keep some bandwidth off the market
for new connections */
schedulableBandwidth /= 2;
@@ -2100,11 +2172,31 @@
firstRound = NO;
} /* while bandwidth to distribute */
+ if ( (schedulableBandwidth > 0) &&
+ (activePeerCount > 0) ) {
+ /* assign rest disregarding traffic limits */
+ perm = permute(WEAK, activePeerCount);
+ for(u = 0; u < activePeerCount; u++) {
+ unsigned int share;
+
+ share =
+ entries[perm[u]]->idealized_limit +
+ (unsigned int) (schedulableBandwidth / activePeerCount);
+ if (share > entries[perm[u]]->idealized_limit) { /* no int-overflow? */
+ entries[perm[u]]->idealized_limit = share;
+ } else {
+ entries[perm[u]]->idealized_limit = 0xFFFF0000;
+ }
+ }
+ schedulableBandwidth = 0;
+ FREE(perm);
+ perm = NULL;
+ }
/* randomly add the remaining MIN_BPM_PER_PEER to minCon peers; yes, this
will
yield some fluctuation, but some amount of fluctuation should be
good since it creates opportunities. */
- if(activePeerCount > 0)
+ if (activePeerCount > 0)
for(u = 0; u < minCon; u++)
entries[weak_randomi(activePeerCount)]->idealized_limit
+= MIN_BPM_PER_PEER;
@@ -2115,13 +2207,30 @@
#if DEBUG_CONNECTION
EncName enc;
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
hash2enc(&entries[u]->session.sender.hashPubKey, &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "inbound limit for peer %u: %s set to %u bpm\n",
- u, &enc, entries[u]->idealized_limit);
+ IF_GELOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ hash2enc(&entries[u]->session.sender.hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_USER,
+ "inbound limit for peer %u: %s set to %u bpm\n",
+ u,
+ &enc,
+ entries[u]->idealized_limit);
#endif
- entries[u]->current_connection_value /= 2.0;
- entries[u]->recently_received /= 2;
+ if ( (timeDifference > 50) &&
+ (weak_randomi(timeDifference + 1) > 50) )
+ entries[u]->current_connection_value /= 2.0;
+ decrementSB = entries[u]->idealized_limit * timeDifference / cronMINUTES /
2;
+ if ( (decrementSB == 0) &&
+ (weak_randomi(timeDifference + 1) != 0) )
+ decrementSB = 1;
+ if (entries[u]->recently_received >= decrementSB) {
+ entries[u]->recently_received
+ -= decrementSB;
+ } else {
+ entries[u]->recently_received = 0;
+ }
}
/* free memory */
@@ -2129,19 +2238,22 @@
FREE(shares);
FREE(entries);
for (u = 0; u < CONNECTION_MAX_HOSTS_; u++) {
- BufferEntry *be = CONNECTION_buffer_[u];
- if(be == NULL)
+ BufferEntry * be = CONNECTION_buffer_[u];
+ if (be == NULL)
continue;
if (be->idealized_limit < MIN_BPM_PER_PEER) {
-#if DEBUG_CONNECTION || 1
+#if DEBUG_CONNECTION
EncName enc;
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- hash2enc(&be->session.sender.hashPubKey,
- &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Number of connections too high, shutting down low-traffic
connection to `%s' (had only %u bpm)\n",
- &enc, be->idealized_limit);
+ IF_GELOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ hash2enc(&be->session.sender.hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Number of connections too high, shutting down low-traffic
connection to `%s' (had only %u bpm)\n",
+ &enc,
+ be->idealized_limit);
#endif
/* We need to avoid giving a too low limit (especially 0, which
would indicate a plaintex msg). So we set the limit to the
@@ -2374,7 +2486,10 @@
be->max_bpm = ntohl(msg->bandwidth);
#if DEBUG_CONNECTION
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER, "Received bandwidth cap of %u
bpm\n", be->max_bpm);
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Received bandwidth cap of %u bpm\n",
+ be->max_bpm);
#endif
if(be->available_send_window >= be->max_bpm) {
be->available_send_window = be->max_bpm;
@@ -2472,9 +2587,10 @@
if(be != NULL) {
be->isAlive = get_time();
identity->whitelistHost(peer);
- if(((be->status & STAT_SETKEY_SENT) > 0) &&
- ((be->status & STAT_SETKEY_RECEIVED) > 0) &&
- (OK == ensureTransportConnected(be)) && (be->status != STAT_UP)) {
+ if( ((be->status & STAT_SETKEY_SENT) > 0) &&
+ ((be->status & STAT_SETKEY_RECEIVED) > 0) &&
+ (OK == ensureTransportConnected(be)) &&
+ (be->status != STAT_UP) ) {
be->status = STAT_UP;
be->lastSequenceNumberReceived = 0;
be->lastSequenceNumberSend = 1;
@@ -3082,7 +3198,8 @@
ENTRY();
MUTEX_LOCK(lock);
be = addHost(hostId, YES);
- if((be != NULL) && (be->status != STAT_DOWN)) {
+ if ((be != NULL) &&
+ (be->status != STAT_DOWN)) {
SendEntry *entry;
entry = MALLOC(sizeof(SendEntry));
@@ -3094,8 +3211,7 @@
entry->closure = closure;
entry->knapsackSolution = NO;
appendToBuffer(be, entry);
- }
- else {
+ } else {
FREENONNULL(closure);
}
MUTEX_UNLOCK(lock);
@@ -3116,17 +3232,21 @@
char *closure;
unsigned short len;
- if(msg == NULL) {
+ if (msg == NULL) {
/* little hack for topology,
which cannot do this directly
due to cyclic dependencies! */
- if(getBandwidthAssignedTo(receiver) == 0)
+ if (getBandwidthAssignedTo(receiver) == 0)
session->tryConnect(receiver);
return;
}
len = ntohs(msg->size);
- if(len == 0)
+ if (len == 0) {
+ GE_LOG(ectx,
+ GE_DEBUG | GE_BULK | GE_DEVELOPER,
+ "Empty message send (hopefully used to initiate connection
attempt)\n");
return;
+ }
closure = MALLOC(len);
memcpy(closure, msg, len);
unicastCallback(receiver,
@@ -3199,13 +3319,14 @@
* @param node the identity of the other peer
* @param preference how much should the traffic preference be increased?
*/
-void updateTrafficPreference(const PeerIdentity * node, double preference) {
+void updateTrafficPreference(const PeerIdentity * node,
+ double preference) {
BufferEntry *be;
ENTRY();
MUTEX_LOCK(lock);
be = lookForHost(node);
- if(be != NULL)
+ if (be != NULL)
be->current_connection_value += preference;
MUTEX_UNLOCK(lock);
}
@@ -3224,9 +3345,14 @@
MUTEX_LOCK(lock);
be = lookForHost(node);
if(be != NULL) {
- IF_GELOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
hash2enc(&node->hashPubKey, &enc));
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "Closing connection to `%s' as requested by application.\n", &enc);
+ IF_GELOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ hash2enc(&node->hashPubKey,
+ &enc));
+ GE_LOG(ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "Closing connection to `%s' as requested by application.\n",
+ &enc);
shutdownConnection(be);
}
MUTEX_UNLOCK(lock);
Modified: GNUnet/src/server/gnunetd.c
===================================================================
--- GNUnet/src/server/gnunetd.c 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/server/gnunetd.c 2006-09-14 05:09:41 UTC (rev 3398)
@@ -199,7 +199,7 @@
"Sorry, your C compiler did not properly align the C structs.
Aborting.\n");
return -1;
}
- ectx = GE_create_context_stderr(NO,
+ ectx = GE_create_context_stderr(YES,
GE_DEBUG |
GE_WARNING | GE_ERROR | GE_FATAL |
GE_USER | GE_ADMIN | GE_DEVELOPER |
Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/transports/tcp.c 2006-09-14 05:09:41 UTC (rev 3398)
@@ -31,7 +31,7 @@
#include "platform.h"
#include "ip.h"
-#define DEBUG_TCP YES
+#define DEBUG_TCP NO
/**
* after how much time of the core not being associated with a tcp
Modified: GNUnet/src/transports/tcp_helper.c
===================================================================
--- GNUnet/src/transports/tcp_helper.c 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/src/transports/tcp_helper.c 2006-09-14 05:09:41 UTC (rev 3398)
@@ -310,6 +310,12 @@
memcpy(&mp[1],
msg,
size);
+#if DEBUG_TCP
+ GE_LOG(ectx,
+ GE_DEBUG | GE_DEVELOPER | GE_BULK,
+ "Transport asks select to queue message of size %u\n",
+ size);
+#endif
ok = select_write(selector,
tcpSession->sock,
mp,
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-09-12 21:15:40 UTC (rev 3397)
+++ GNUnet/todo 2006-09-14 05:09:41 UTC (rev 3398)
@@ -18,10 +18,9 @@
+ loggers: SMTP logger
+ use new loggers in for CS error reporting
* make testcases compile & pass again:
- + tbench -- compiles
- + gap
- + fs/namespace
- + fs/fsui
+ + gap -- does not yet compile
+ + fs/namespace -- does not yet compile
+ + fs/fsui -- downloadtest does not yet compile
+ dht/tools, dht/module
* transports:
+ SMTP/HTTP: do not yet compile (commented out from build)
@@ -34,7 +33,6 @@
- state
- advertising,
- ecrs_core
- - tbench
- tracekit
- gap
- rpc
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3398 - in GNUnet: . src/applications/fs/ecrs src/applications/tbench src/server src/transports,
grothoff <=