[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r3885 - GNUnet/src/server
From: |
grothoff |
Subject: |
[GNUnet-SVN] r3885 - GNUnet/src/server |
Date: |
Wed, 6 Dec 2006 23:03:56 -0800 (PST) |
Author: grothoff
Date: 2006-12-06 23:03:53 -0800 (Wed, 06 Dec 2006)
New Revision: 3885
Modified:
GNUnet/src/server/connection.c
Log:
improvements and bugfixes in bandwidth scheduling code
Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c 2006-12-05 06:00:28 UTC (rev 3884)
+++ GNUnet/src/server/connection.c 2006-12-07 07:03:53 UTC (rev 3885)
@@ -107,18 +107,12 @@
/**
* How many ping/pong messages to we want to transmit
- * per SECONDS_INACTIVE_DROP interval? (must be >=4 to
+ * per SECONDS_INACTIVE_DROP interval? (must be >= 4 to
* keep connection alive with reasonable probability).
*/
#define TARGET_MSG_SID 8
/**
- * Minimum number of sample messages (per peer) before we recompute
- * traffic assignments?
- */
-#define MINIMUM_SAMPLE_COUNT 8
-
-/**
* What is the minimum number of bytes per minute that
* we allocate PER peer? (5 minutes inactivity timeout,
* 32768 MTU, 8 MSGs => 8 * 32768 / 5 = ~50000 bpm [ ~800 bps])
@@ -126,9 +120,15 @@
#define MIN_BPM_PER_PEER (TARGET_MSG_SID * EXPECTED_MTU * 60 /
SECONDS_INACTIVE_DROP)
/**
+ * Minimum number of sample messages (per peer) before we recompute
+ * traffic assignments?
+ */
+#define MINIMUM_SAMPLE_COUNT 2
+
+/**
* How often do we expect to re-run the traffic allocation
* code? (depends on MINIMUM_SAMPLE_COUNT and MIN_BPM_PER_PEER
- * and MTU size). [2 * 32 M / 50 = 5M ]
+ * and MTU size). [2 * 32 M / 50 = 78s ]
*/
#define MIN_SAMPLE_TIME (MINIMUM_SAMPLE_COUNT * cronMINUTES * EXPECTED_MTU /
MIN_BPM_PER_PEER)
@@ -412,7 +412,8 @@
* @param be the buffer entry
* @param data context for callee
*/
-typedef void (*BufferEntryCallback) (BufferEntry * be, void *data);
+typedef void (*BufferEntryCallback) (BufferEntry * be,
+ void *data);
/* ***************** globals ********************** */
@@ -629,7 +630,8 @@
* @return the overall priority that was achieved
*/
static unsigned int
-approximateKnapsack(BufferEntry * be, unsigned int available) {
+approximateKnapsack(BufferEntry * be,
+ unsigned int available) {
unsigned int i;
unsigned int count;
SendEntry **entries;
@@ -676,7 +678,7 @@
int *efflen;
cron_t startTime;
cron_t endTime;
- SendEntry **entries;
+ SendEntry ** entries;
unsigned int count;
#define VARR(i,j) v[(i)+(j)*(count+1)]
@@ -1879,12 +1881,16 @@
FREENONNULL(be->sendBuffer[i]->closure);
FREE(be->sendBuffer[i]);
}
- GROW(be->sendBuffer, be->sendBufferSize, 0);
+ GROW(be->sendBuffer,
+ be->sendBufferSize,
+ 0);
}
/* ******** inbound bandwidth scheduling ************* */
-static void gatherEntries(BufferEntry * be, UTL_Closure * utl) {
+static void gatherEntries(BufferEntry * be,
+ void * cls) {
+ UTL_Closure * utl = cls;
utl->e[utl->pos++] = be;
}
@@ -1941,6 +1947,7 @@
int earlyRun;
int load;
int * perm;
+ EncName enc;
MUTEX_LOCK(lock);
now = get_time();
@@ -1953,14 +1960,12 @@
MUTEX_UNLOCK(lock);
return;
}
-
activePeerCount = forAllConnectedHosts(NULL, NULL);
if (activePeerCount == 0) {
MUTEX_UNLOCK(lock);
return; /* nothing to be done here. */
}
-
/* if time difference is too small, we don't have enough
sample data and should NOT update the limits;
however, if we have FAR to few peers, reschedule
@@ -1980,19 +1985,18 @@
timeDifference = 1;
/* build an array containing all BEs */
-
entries = MALLOC(sizeof(BufferEntry *) * activePeerCount);
utl.pos = 0;
utl.e = entries;
- forAllConnectedHosts((BufferEntryCallback) & gatherEntries, &utl);
+ forAllConnectedHosts(&gatherEntries,
+ &utl);
-
- /* compute shares */
+ /* compute latest shares based on traffic preferences */
shares = MALLOC(sizeof(double) * activePeerCount);
shareSum = 0.0;
- for(u = 0; u < activePeerCount; u++) {
+ for (u = 0; u < activePeerCount; u++) {
shares[u] = SHARE_DISTRIBUTION_FUNCTION(entries[u]);
- if(shares[u] < 0.0)
+ if (shares[u] < 0.0)
shares[u] = 0.0;
shareSum += shares[u];
}
@@ -2011,32 +2015,27 @@
minCon = minConnect();
if (minCon > activePeerCount)
minCon = activePeerCount;
- schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
+ if (max_bpm > minCon * MIN_BPM_PER_PEER) {
+ schedulableBandwidth = max_bpm - minCon * MIN_BPM_PER_PEER;
+ } else {
+ schedulableBandwidth = 0;
+ minCon = max_bpm / MIN_BPM_PER_PEER;
+ }
load = os_network_monitor_get_load(load_monitor,
Download);
- if (load > 100) {
- /* take counter measures! */
- schedulableBandwidth = schedulableBandwidth * 100 / load;
- /* make sure we do not take it down too far */
- if ( (schedulableBandwidth < minCon * MIN_BPM_PER_PEER / 2) &&
- (max_bpm > minCon * MIN_BPM_PER_PEER * 2) )
- schedulableBandwidth = minCon * MIN_BPM_PER_PEER / 2;
- }
+ if (load > 100) /* take counter measure */
+ schedulableBandwidth = schedulableBandwidth * 100 / load;
+ /* compute recent activity profile of the peer */
adjustedRR = MALLOC(sizeof(long long) * activePeerCount);
-
- /* reset idealized limits; if we want a smoothed-limits
- algorithm, we'd need to compute the new limits separately
- and then merge the values; but for now, let's just go
- hardcore and adjust all values rapidly */
- GE_ASSERT(ectx, timeDifference != 0);
- for(u = 0; u < activePeerCount; u++) {
- adjustedRR[u] =
- entries[u]->recently_received * cronMINUTES / timeDifference / 2;
-
+ GE_ASSERT(ectx,
+ timeDifference != 0);
+ for (u=0;u<activePeerCount;u++) {
+ adjustedRR[u]
+ = entries[u]->recently_received * cronMINUTES / timeDifference / 2;
+
#if DEBUG_CONNECTION
- if(adjustedRR[u] > entries[u]->idealized_limit) {
- EncName enc;
+ if (adjustedRR[u] > entries[u]->idealized_limit) {
IF_GELOG(ectx,
GE_INFO | GE_BULK | GE_USER,
hash2enc(&entries[u]->session.sender.hashPubKey,
@@ -2057,8 +2056,6 @@
(adjustedRR[u] > 2 * MAX_BUF_FACT *
entries[u]->max_transmitted_limit) &&
(adjustedRR[u] > 2 * MAX_BUF_FACT * entries[u]->idealized_limit)) {
- EncName enc;
-
entries[u]->violations++;
entries[u]->recently_received = 0; /* "clear" slate */
if (entries[u]->violations > 10) {
@@ -2074,7 +2071,8 @@
adjustedRR[u],
entries[u]->max_transmitted_limit, entries[u]->idealized_limit);
identity->blacklistHost(&entries[u]->session.sender,
- 1 / topology->getSaturation(), YES);
+ 1 / topology->getSaturation(),
+ YES);
shutdownConnection(entries[u]);
activePeerCount--;
entries[u] = entries[activePeerCount];
@@ -2093,11 +2091,13 @@
entries[u]->violations--;
}
}
-
- if (adjustedRR[u] < MIN_BPM_PER_PEER / 2)
- adjustedRR[u] = MIN_BPM_PER_PEER / 2;
/* even if we received NO traffic, allow
at least MIN_BPM_PER_PEER */
+ if (adjustedRR[u] < MIN_BPM_PER_PEER)
+ adjustedRR[u] = MIN_BPM_PER_PEER;
+ /* initial adjustedRR's should reflect aged value
+ from previous idealized_limit / iteration */
+ adjustedRR[u] = (entries[u]->idealized_limit * 3 + adjustedRR[u]) / 4;
}
/* now distribute the schedulableBandwidth according
@@ -2111,10 +2111,11 @@
potentially under-allocated. Since there's always some
(unencrypted) traffic that we're not quite accounting for anyway,
that's probably not so bad. */
+
didAssign = YES;
/* in the first round we cap by 2* previous utilization */
firstRound = YES;
- for (u = 0; u < activePeerCount; u++)
+ for (u = 0; u < activePeerCount; u++)
entries[u]->idealized_limit = 0;
while ( (schedulableBandwidth > CONNECTION_MAX_HOSTS_ * 100) &&
(activePeerCount > 0) &&
@@ -2122,7 +2123,6 @@
didAssign = NO;
decrementSB = 0;
for (u = 0; u < activePeerCount; u++) {
- /* always allow allocating MIN_BPM_PER_PEER */
if ( (firstRound == NO) ||
(entries[u]->idealized_limit < adjustedRR[u] * 2) ) {
unsigned int share;
@@ -2132,8 +2132,9 @@
(unsigned int) (shares[u] * schedulableBandwidth);
if (share < entries[u]->idealized_limit)
share = 0xFFFFFFFF; /* int overflow */
- if ( (share > adjustedRR[u] * 2) && (firstRound == YES))
+ if ( (share > adjustedRR[u] * 2) && (firstRound == YES) )
share = adjustedRR[u] * 2;
+ /* always allow allocating MIN_BPM_PER_PEER */
if ( (share < MIN_BPM_PER_PEER) &&
(minCon > 0) ) {
/* use one of the minCon's to keep the connection! */
@@ -2143,11 +2144,12 @@
}
if (share > entries[u]->idealized_limit) {
decrementSB += share - entries[u]->idealized_limit;
- didAssign = YES;
- }
- entries[u]->idealized_limit = share;
+ didAssign = YES;
+ entries[u]->idealized_limit = share;
+ }
}
} /* end for all peers */
+
if (decrementSB < schedulableBandwidth) {
schedulableBandwidth -= decrementSB;
} else {
@@ -2161,29 +2163,29 @@
for (u = 0; u < activePeerCount; u++) {
unsigned int v = perm[u]; /* use perm to avoid preference to
low-numbered slots */
if ( (firstRound == NO) ||
- (entries[v]->idealized_limit < adjustedRR[u] * 2)) {
+ (entries[v]->idealized_limit < adjustedRR[v] * 2)) {
unsigned int share;
share =
entries[v]->idealized_limit +
(unsigned int) (schedulableBandwidth);
- if (share < entries[u]->idealized_limit)
+ if (share < entries[v]->idealized_limit)
share = 0xFFFFFFFF; /* int overflow */
- if ((firstRound == YES) && (share > adjustedRR[u] * 2))
- share = adjustedRR[u] * 2;
-
- schedulableBandwidth -= share - entries[v]->idealized_limit;
- entries[v]->idealized_limit = share;
+ if ( (firstRound == YES) && (share > adjustedRR[v] * 2) )
+ share = adjustedRR[v] * 2;
+ if (share > entries[v]->idealized_limit) {
+ schedulableBandwidth -= share - entries[v]->idealized_limit;
+ entries[v]->idealized_limit = share;
+ }
}
}
FREE(perm);
perm = NULL;
-
} /* didAssign == NO? */
if (firstRound == YES) {
/* keep some bandwidth off the market
for new connections */
- schedulableBandwidth /= 2;
+ schedulableBandwidth = (schedulableBandwidth * 7) / 8;
}
firstRound = NO;
} /* while bandwidth to distribute */
@@ -2194,14 +2196,15 @@
perm = permute(WEAK, activePeerCount);
for(u = 0; u < activePeerCount; u++) {
unsigned int share;
+ unsigned int v = perm[u]; /* use perm to avoid preference to
low-numbered slots */
share =
- entries[perm[u]]->idealized_limit +
+ entries[v]->idealized_limit +
(unsigned int) (schedulableBandwidth / activePeerCount);
- if (share > entries[perm[u]]->idealized_limit) { /* no int-overflow? */
- entries[perm[u]]->idealized_limit = share;
+ if (share > entries[v]->idealized_limit) { /* no int-overflow? */
+ entries[v]->idealized_limit = share;
} else {
- entries[perm[u]]->idealized_limit = 0xFFFF0000;
+ entries[v]->idealized_limit = 0xFFFF0000;
}
}
schedulableBandwidth = 0;
@@ -2213,16 +2216,14 @@
yield some fluctuation, but some amount of fluctuation should be
good since it creates opportunities. */
if (activePeerCount > 0)
- for(u = 0; u < minCon; u++)
+ for (u=0;u<minCon;u++)
entries[weak_randomi(activePeerCount)]->idealized_limit
+= MIN_BPM_PER_PEER;
/* prepare for next round */
lastRoundStart = now;
- for(u = 0; u < activePeerCount; u++) {
+ for (u=0;u<activePeerCount;u++) {
#if DEBUG_CONNECTION
- EncName enc;
-
IF_GELOG(ectx,
GE_DEBUG | GE_BULK | GE_USER,
hash2enc(&entries[u]->session.sender.hashPubKey,
@@ -2236,17 +2237,15 @@
#endif
if ( (timeDifference > 50) &&
(weak_randomi(timeDifference + 1) > 50) )
- entries[u]->current_connection_value /= 2.0;
+ entries[u]->current_connection_value *= 0.9; /* age */
decrementSB = entries[u]->idealized_limit * timeDifference / cronMINUTES /
2;
if ( (decrementSB == 0) &&
(weak_randomi(timeDifference + 1) != 0) )
decrementSB = 1;
- if (entries[u]->recently_received >= decrementSB) {
- entries[u]->recently_received
- -= decrementSB;
- } else {
- entries[u]->recently_received = 0;
- }
+ if (entries[u]->recently_received >= decrementSB)
+ entries[u]->recently_received -= decrementSB;
+ else
+ entries[u]->recently_received = 0;
}
/* free memory */
@@ -2259,8 +2258,6 @@
continue;
if (be->idealized_limit < MIN_BPM_PER_PEER) {
#if DEBUG_CONNECTION
- EncName enc;
-
IF_GELOG(ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
hash2enc(&be->session.sender.hashPubKey,
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r3885 - GNUnet/src/server,
grothoff <=