gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3889 - GNUnet/src/server


From: grothoff
Subject: [GNUnet-SVN] r3889 - GNUnet/src/server
Date: Thu, 7 Dec 2006 00:43:48 -0800 (PST)

Author: grothoff
Date: 2006-12-07 00:43:46 -0800 (Thu, 07 Dec 2006)
New Revision: 3889

Modified:
   GNUnet/src/server/connection.c
Log:
bugfixes

Modified: GNUnet/src/server/connection.c
===================================================================
--- GNUnet/src/server/connection.c      2006-12-07 08:43:42 UTC (rev 3888)
+++ GNUnet/src/server/connection.c      2006-12-07 08:43:46 UTC (rev 3889)
@@ -269,20 +269,41 @@
  * callback).
  */
 typedef struct {
-  /** how long is this message part expected to be? */
+
+  /**
+   * how long is this message part expected to be? 
+   */
   unsigned short len;
-  /** flags */
+
+  /**
+   * flags 
+   */
   unsigned short flags;
-  /** how important is this message part? */
+
+  /**
+   * how important is this message part? 
+   */
   unsigned int pri;
-  /** when did we intend to transmit? */
+
+  /**
+   * when do/did we intend to transmit? 
+   */
   cron_t transmissionTime;
-  /** callback to call to create the message part */
+  
+  /**
+   * callback to call to create the message part 
+   */
   BuildMessageCallback callback;
-  /** argument to callback, call FREENONNULL(closure) if we
-      can not transmit this MessagePart. */
-  void *closure;
-  /** YES if selected by knapsack for sending */
+
+  /**
+   * argument to callback, call FREENONNULL(closure) if we
+   * can not transmit this MessagePart. 
+   */
+  void * closure;
+
+  /**
+   * YES if selected by knapsack for sending 
+   */
   int knapsackSolution;
 } SendEntry;
 
@@ -314,40 +335,74 @@
  * Type of the connection table.
  */
 typedef struct BufferEntry_ {
-  /** Session for the connection */
+  /**
+   * Session for the connection 
+   */
   Session session;
-  /** the current session key used for encryption */
+
+  /**
+   * the current session key used for encryption 
+   */
   SESSIONKEY skey_local;
-  /** at which time was the local sessionkey created */
+
+  /**
+   * at which time was the local sessionkey created 
+   */
   TIME_T skey_local_created;
-  /** the current session key used for decryption */
+
+  /**
+   * the current session key used for decryption 
+   */
   SESSIONKEY skey_remote;
-  /** at which time was the remote sessionkey created */
+  
+  /**
+   * at which time was the remote sessionkey created 
+   */
   TIME_T skey_remote_created;
-  /** is this host alive? timestamp of the time of the last-active
-      point (as witnessed by some higher-level application, typically
-      topology+pingpong) */
+
+  /**
+   * is this host alive? timestamp of the time of the last-active
+   * point (as witnessed by some higher-level application, typically
+   * topology+pingpong) 
+   */
   cron_t isAlive;
-  /**  Status of the connection (STAT_XXX) */
+
+  /**
+   * Status of the connection (STAT_XXX) 
+   */
   unsigned int status;
 
-  /** last sequence number received on this connection (highest) */
+  /**
+   * last sequence number received on this connection (highest) 
+   */
   unsigned int lastSequenceNumberReceived;
-  /** bit map indicating which of the 32 sequence numbers before the last
-      were received (good for accepting out-of-order packets and
-      estimating reliability of the connection) */
+
+  /**
+   * bit map indicating which of the 32 sequence numbers before the last
+   * were received (good for accepting out-of-order packets and
+   * estimating reliability of the connection) 
+   */
   unsigned int lastPacketsBitmap;
-  /** last sequence number transmitted */
+
+  /**
+   * last sequence number transmitted 
+   */
   unsigned int lastSequenceNumberSend;
 
-  /** number of entries in the send buffer */
+  /**
+   * number of entries in the send buffer 
+   */
   unsigned int sendBufferSize;
 
-  /** buffer of entries waiting to be transmitted */
+  /**
+   * buffer of entries waiting to be transmitted 
+   */
   SendEntry **sendBuffer;
 
-  /** time of the last send-attempt (to avoid
-      solving knapsack's too often) */
+  /**
+   * time of the last send-attempt (to avoid
+   * solving knapsack's too often) 
+   */
   cron_t lastSendAttempt;
 
   /**
@@ -360,44 +415,64 @@
    */
   cron_t MAX_SEND_FREQUENCY;
 
-  /** a hash collision overflow chain */
+  /**
+   * a hash collision overflow chain 
+   */
   struct BufferEntry_ *overflowChain;
 
 
   /* *********** outbound bandwidth limits ********** */
 
-  /** byte-per-minute limit for this connection */
+  /**
+   * byte-per-minute limit for this connection 
+   */
   unsigned int max_bpm;
 
-  /** current bps (actually bytes per minute) for this connection
-      (incremented every minute by max_bpm,
-       bounded by max_bpm * secondsInactive/2;
-       may get negative if we have VERY high priority
-       content) */
+  /**
+   * current bps (actually bytes per minute) for this connection
+   * (incremented every minute by max_bpm, bounded by max_bpm *
+   * secondsInactive/2; may get negative if we have VERY high priority
+   * content) */
   long long available_send_window;
 
-  /** time of the last increment of available_send_window */
+  /**
+   * time of the last increment of available_send_window 
+   */
   cron_t last_bps_update;
 
   /* *********** inbound bandwidth accounting ******** */
 
-  /** how much traffic (bytes) did we receive on this connection since
-     the last update-round? */
+  /**
+   * how much traffic (bytes) did we receive on this connection since
+   * the last update-round? 
+   */
   long long recently_received;
 
-  /** How valueable were the messages of this peer recently? */
+  /**
+   * How valueable were the messages of this peer recently? 
+   */
   double current_connection_value;
 
-  /** the highest bandwidth limit that a well-behaved peer
-      must have received by now */
+  /**
+   * the highest bandwidth limit that a well-behaved peer
+   * must have received by now 
+   */
   unsigned int max_transmitted_limit;
 
-  /** what is the limit that we are currently shooting for? (byte per minute) 
*/
+  /**
+   * what is the limit that we are currently shooting for? (bytes per minute) 
+   */
   unsigned int idealized_limit;
 
+  /**
+   * How often has the other peer violated the traffic bounds
+   * recently?
+   */
   unsigned int violations;
 
-  /** are we currently in "sendBuffer" for this entry? */
+  /**
+   * are we currently in "sendBuffer" for this entry? 
+   */
   int inSendBuffer;
 
 } BufferEntry;
@@ -481,6 +556,12 @@
 static unsigned long long max_bpm;
 
 /**
+ * What is the available upstream bandwidth (in bytes
+ * per minute)?
+ */
+static unsigned long long max_bpm_up;
+
+/**
  * Registered Send-Notify handlers.
  */
 static MessagePartHandler *rsns;
@@ -522,18 +603,23 @@
 /* ******************** CODE ********************* */
 
 #if DEBUG_CONNECTION
-static void printMsg(const char *prefix, PeerIdentity * sender,
-                     SESSIONKEY * key, const INITVECTOR * iv, int crc) {
+static void printMsg(const char *prefix, 
+                    const PeerIdentity * sender,
+                     const SESSIONKEY * key, 
+                    const INITVECTOR * iv, 
+                    int crc) {
   char skey[65];
   char *dst;
   int idx;
   EncName enc;
 
-  hash2enc(&sender->hashPubKey, &enc);
-
+  hash2enc(&sender->hashPubKey,
+          &enc);
   dst = skey;
   for(idx = 0; idx < SESSIONKEY_LEN; idx++) {
-    sprintf(dst, "%02x", key->key[idx]);
+    sprintf(dst, 
+           "%02x",
+           key->key[idx]);
     dst += 2;
   }
   *dst = 0;
@@ -544,7 +630,7 @@
         prefix,
         &enc,
         skey,
-        *((int *) iv),
+        *((const int *) iv),
         crc);
 }
 #endif
@@ -1767,10 +1853,9 @@
   int count = 0;
   BufferEntry *be;
 
-  ENTRY();
-  for(i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
+  for(i=0;i<CONNECTION_MAX_HOSTS_;i++) {
     be = CONNECTION_buffer_[i];
-    while(be != NULL) {
+    while (be != NULL) {
       if(be->status == STAT_UP) {
         if(method != NULL)
           method(be, arg);
@@ -1952,8 +2037,6 @@
   int load;
   int * perm;
   EncName enc;
-  unsigned long long total_allowed_sent;
-  unsigned long long total_allowed_recv;
 
   MUTEX_LOCK(lock);
   now = get_time();
@@ -2031,7 +2114,10 @@
                                     Download);
   if (load > 100) /* take counter measure */
     schedulableBandwidth = schedulableBandwidth * 100 / load;  
-
+#if 0
+  printf("Scheduling %llu bytes per minute\n",
+        schedulableBandwidth);
+#endif
   /* compute recent activity profile of the peer */
   adjustedRR = MALLOC(sizeof(long long) * activePeerCount);
   GE_ASSERT(ectx, 
@@ -2207,7 +2293,7 @@
       share =
        entries[v]->idealized_limit +
        (unsigned int) (schedulableBandwidth / activePeerCount);
-      if (share > entries[v]->idealized_limit) { /* no int-overflow? */
+      if (share >= entries[v]->idealized_limit) { /* no int-overflow? */
        entries[v]->idealized_limit = share;
       } else {
        entries[v]->idealized_limit = 0xFFFF0000;       
@@ -2258,8 +2344,6 @@
   FREE(adjustedRR);
   FREE(shares);
 
-  total_allowed_sent = 0;
-  total_allowed_recv = 0;
   for (u=0;u<activePeerCount;u++) {
     BufferEntry * be = entries[u];
 
@@ -2281,16 +2365,13 @@
       be->idealized_limit = MIN_BPM_PER_PEER;
       shutdownConnection(be);
     } else {
-      total_allowed_sent += be->max_bpm;
-      total_allowed_recv += be->idealized_limit;
+#if 0
+      printf("Assigned %u bytes to peer %u\n",
+            be->idealized_limit,
+            u);
+#endif
     }
   }
-  if (stats != NULL) {
-    stats->set(stat_total_allowed_sent,
-              total_allowed_sent);
-    stats->set(stat_total_allowed_recv,
-              total_allowed_recv);
-  }
 
   FREE(entries);
   MUTEX_UNLOCK(lock);
@@ -2310,9 +2391,13 @@
   BufferEntry * tmp;
   cron_t now;
   int i;
+  unsigned long long total_allowed_sent;
+  unsigned long long total_allowed_recv;
 
   scheduleInboundTraffic();
   now = get_time();
+  total_allowed_sent = 0;
+  total_allowed_recv = 0;
   MUTEX_LOCK(lock);
   for (i = 0; i < CONNECTION_MAX_HOSTS_; i++) {
     root = CONNECTION_buffer_[i];
@@ -2330,6 +2415,8 @@
         FREE(tmp);
         continue;               /* no need to call 'send buffer' */
       case STAT_UP:
+       total_allowed_sent += root->max_bpm;
+       total_allowed_recv += root->idealized_limit;
         if ( (now > root->isAlive) && /* concurrency might make this false... 
*/
             (now - root->isAlive > SECONDS_INACTIVE_DROP * cronSECONDS)) {
           EncName enc;
@@ -2399,6 +2486,14 @@
     }                           /* end of while */
   }                             /* for all buckets */
   MUTEX_UNLOCK(lock);
+  if (stats != NULL) {
+    if (total_allowed_sent > max_bpm_up)
+      total_allowed_sent = max_bpm_up;
+    stats->set(stat_total_allowed_sent,
+              total_allowed_sent / 60); /* bpm to bps */
+    stats->set(stat_total_allowed_recv,
+              total_allowed_recv / 60); /* bpm to bps */
+  }
 }
 
 /**
@@ -2591,7 +2686,9 @@
  *                   YES if it is the key for sending
  */
 void assignSessionKey(const SESSIONKEY * key,
-                      const PeerIdentity * peer, TIME_T age, int forSending) {
+                      const PeerIdentity * peer, 
+                     TIME_T age,
+                     int forSending) {
   BufferEntry *be;
 
   MUTEX_LOCK(lock);
@@ -2820,6 +2917,14 @@
                                              50000, /* default: 50 kbps */
                                              &new_max_bpm))
     return SYSERR;
+  GC_get_configuration_value_number(cfg,
+                                   "LOAD",
+                                   "MAXNETUPBPSTOTAL",
+                                   0,
+                                   ((unsigned long long)-1)/60,
+                                   50000, /* default: 50 kbps */
+                                   &max_bpm_up);
+  max_bpm_up *= 60; /* bps -> bpm */
   MUTEX_LOCK(lock);
   new_max_bpm = 60 * new_max_bpm;
   if(max_bpm != new_max_bpm) {
@@ -2972,9 +3077,9 @@
     stat_noise_sent 
       = stats->create(gettext_noop("# bytes noise sent"));
     stat_total_allowed_sent 
-      = stats->create(gettext_noop("# total advertised bytes per minute 
received limit"));
+      = stats->create(gettext_noop("# total advertised bytes per second 
received limit"));
     stat_total_allowed_recv
-      = stats->create(gettext_noop("# total allowed bytes per minute 
transmission limit"));
+      = stats->create(gettext_noop("# total allowed bytes per second 
transmission limit"));
   }
   transport->start(&core_receive);
 }
@@ -3250,14 +3355,16 @@
                      BuildMessageCallback callback,
                      void *closure,
                      unsigned short len,
-                     unsigned int importance, unsigned int maxdelay) {
+                     unsigned int importance,
+                    unsigned int maxdelay) {
   BufferEntry *be;
 #if DEBUG_CONNECTION
   EncName enc;
 
   IF_GELOG(ectx,
           GE_DEBUG | GE_REQUEST | GE_USER,
-          hash2enc(&hostId->hashPubKey, &enc));
+          hash2enc(&hostId->hashPubKey,
+                   &enc));
   GE_LOG(ectx,
         GE_DEBUG | GE_REQUEST | GE_USER,
         "%s: sending message to host %s message of size %d\n",
@@ -3298,7 +3405,8 @@
  */
 void unicast(const PeerIdentity * receiver,
              const MESSAGE_HEADER * msg,
-             unsigned int importance, unsigned int maxdelay) {
+             unsigned int importance, 
+            unsigned int maxdelay) {
   char *closure;
   unsigned short len;
 
@@ -3320,7 +3428,11 @@
   closure = MALLOC(len);
   memcpy(closure, msg, len);
   unicastCallback(receiver,
-                  &copyCallback, closure, len, importance, maxdelay);
+                  &copyCallback, 
+                 closure, 
+                 len, 
+                 importance,
+                 maxdelay);
 }
 
 /**





reply via email to

[Prev in Thread] Current Thread [Next in Thread]