gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r32466 - msh/src


From: gnunet
Subject: [GNUnet-SVN] r32466 - msh/src
Date: Mon, 24 Feb 2014 18:32:45 +0100

Author: harsha
Date: 2014-02-24 18:32:45 +0100 (Mon, 24 Feb 2014)
New Revision: 32466

Modified:
   msh/src/mshd.c
   msh/src/reduce.c
Log:
Shorten the round width to the remaining peers in the last round.


Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c      2014-02-24 16:25:43 UTC (rev 32465)
+++ msh/src/mshd.c      2014-02-24 17:32:45 UTC (rev 32466)
@@ -189,12 +189,18 @@
 int rank;
 
 /**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
+ * Given width of the round -- how many other mshd instances verify our IP
+ * addresses in a round
  */
-unsigned int rwidth;
+unsigned int round_width;
 
 /**
+ * The actual width of the current round.  This should be less than or equal to
+ * round_width
+ */
+unsigned int current_round_width;
+
+/**
  * The number of total mshd processes 
  */
 int nproc;
@@ -316,6 +322,11 @@
 static unsigned int current_round;
 
 /**
+ * Total number of rounds required
+ */
+static unsigned int total_rounds;
+
+/**
  * The port number of our local socket
  */
 uint16_t listen_port;
@@ -847,12 +858,8 @@
   struct GNUNET_DISK_FileHandle *write_end;
   sigset_t sigset;
   intmax_t pid;
-  int total_rounds;
 
   GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
-  /* Number of rounds required to contact all processes except ourselves 
(rwidth
-     in parallel in each round) */
-  total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
   if (current_round < total_rounds)
   {
     rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
@@ -871,7 +878,11 @@
     return;
   }
   LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
-  GNUNET_break (GNUNET_OK == reduce_ntree ());
+  if (GNUNET_SYSERR == reduce_ntree ())
+  {
+    GNUNET_SCHEDULER_shutdown ();
+    return;
+  }
   pid = (intmax_t) getpid ();
   GNUNET_assert (0 < asprintf (&unixpath, "/%ju.sock", pid));
   hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
@@ -991,7 +1002,7 @@
   {
     cleanup_verifyaddressctx (ctx);
   }
-  for (cnt = 0; cnt < rwidth; cnt++)
+  for (cnt = 0; cnt < current_round_width; cnt++)
     instance_address_info_destroy (riainfos[cnt]);
   if (1 != bitmap_allset (bitmap))
   {
@@ -1019,7 +1030,7 @@
   int off;
 
   ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
-  lb = rank - (current_round * rwidth) - rwidth + nproc;
+  lb = rank - (current_round * round_width) - current_round_width + nproc;
   GNUNET_assert (0 <= lb);
   lb %= nproc;
   source = instance_address_info_get_rank (ctx->iainfo);
@@ -1198,8 +1209,8 @@
   MPI_Status status;
   int cnt;
 
-  iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
-  for (cnt=0; cnt < rwidth; cnt++)
+  iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * 
current_round_width);
+  for (cnt=0; cnt < current_round_width; cnt++)
   {
     struct MSH_MSG_VerifyAddress *msg;
     int rsize;
@@ -1212,34 +1223,26 @@
                           MPI_COMM_WORLD, &status));
     MPI_Get_elements (&status, MPI_BYTE, &rsize);
     /* We expect a message from peers with id p in the range:       
-       (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round 
* rwidth) -1) */
-    lb = rank - current_round * rwidth - rwidth + nproc;
-    up = rank - (current_round * rwidth) - 1 + nproc;
+       (rank - current_round * round_width - current_round_width) < p <= (rank 
- (current_round * round_width) -1) */
+    up = rank - (current_round * round_width) - 1 + nproc;
+    lb = (up - current_round_width) + nproc;
     GNUNET_assert (lb >= 0);
     GNUNET_assert (up >= 0);
     lb %= nproc;
     up %= nproc;
     source = status.MPI_SOURCE;
-    if (lb == up) 
+    GNUNET_assert (lb != up);
+    if(lb < up)
     {
-      if (source != lb)
+      if ((source <= lb) || (source > up))
       {
         GNUNET_break (0);
-        LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb, 
up);
         goto err_ret;
       }
     }
-    else if(lb < up)
-    {
-      if ((source < lb) || (source > up))
-      {
-        GNUNET_break (0);
-        goto err_ret;
-      }
-    }
     else if (up < lb)
     {
-      if ((source > up) && (source < lb))
+      if ((source > up) && (source <= lb))
       {
         GNUNET_break (0);
         goto err_ret;
@@ -1264,7 +1267,7 @@
   return iainfos;
   
  err_ret:
-  for (cnt=0; cnt < rwidth; cnt++)
+  for (cnt=0; cnt < current_round_width; cnt++)
   {
     if (NULL != iainfos[cnt])
       instance_address_info_destroy (iainfos[cnt]);
@@ -1290,7 +1293,6 @@
   int cnt;
   int ret;
   int target;
-  unsigned int width;
 
   msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
   msg = GNUNET_malloc (msize);
@@ -1301,16 +1303,13 @@
   {    
     msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
   }
-  width = rwidth;  
-  if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) / 
rwidth)) )
-    width = (nproc - 1) % rwidth;
   cpys = NULL;
-  cpys = GNUNET_malloc (msize * width);
-  sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
-  for (cnt=0; cnt < width; cnt++)
+  cpys = GNUNET_malloc (msize * current_round_width);
+  sreqs = GNUNET_malloc (current_round_width * sizeof (MPI_Request));
+  for (cnt=0; cnt < current_round_width; cnt++)
   {    
     (void) memcpy (&cpys[cnt], msg, msize);
-    target = (current_round * rwidth) + cnt + 1;
+    target = (current_round * round_width) + cnt + 1;
     GNUNET_assert (target < nproc);
     target = (rank + target) % nproc;
     LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
@@ -1321,7 +1320,7 @@
   }
   free (msg);
   msg = NULL;
-  if (cnt != width)
+  if (cnt != current_round_width)
   {
     for (cnt--; cnt >= 0; cnt--)
     {
@@ -1330,7 +1329,7 @@
     }
     goto err_ret;
   }
-  for (cnt=0; cnt < width; cnt++)
+  for (cnt=0; cnt < current_round_width; cnt++)
   {
     GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));   
 
   }
@@ -1362,6 +1361,9 @@
 {
   unsigned int cnt;
 
+  current_round_width = round_width;
+  if ( (0 != ( (nproc - 1) % round_width)) && (current_round == ( (nproc - 1) 
/ round_width)) )
+    current_round_width = (nproc - 1) % round_width;
   if (GNUNET_SYSERR == send_addresses ())
     return GNUNET_SYSERR;
   if (NULL == (riainfos = receive_addresses ()))
@@ -1374,7 +1376,7 @@
     GNUNET_break (0);
     return GNUNET_SYSERR;
   }
-  for (cnt = 0; cnt < rwidth; cnt++)
+  for (cnt = 0; cnt < current_round_width; cnt++)
     verify_addresses (riainfos[cnt]);
   finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, 
                                                 &finalise_round, NULL);
@@ -1414,12 +1416,12 @@
   unsigned int cnt;
 
   LOG_DEBUG ("Running main task\n");
-  if (0 == rwidth)
+  if (0 == round_width)
   {
     LOG_ERROR ("Round width cannot be 0.  Exiting\n");
     return;
   }
-  if (nproc <= rwidth)
+  if (nproc <= round_width)
   {
     LOG_ERROR ("Round width should be less than the number of processes\n");
     return;
@@ -1431,12 +1433,12 @@
     return;
   }
   run_args = copy_argv (args);
-  bitmap = bitmap_create (rwidth);
+  bitmap = bitmap_create (round_width);
   addrmap = addressmap_create (nproc);
   addrlen = sizeof (struct sockaddr_in);
   (void) memset (&addr, 0, addrlen);
   addr.sin_addr.s_addr = INADDR_ANY;   /* bind to all available addresses */
-  listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen, 
rwidth);
+  listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen, 
round_width);
   listen_port = ntohs (addr.sin_port);
   if (NULL == listen_socket)
     return;
@@ -1452,6 +1454,9 @@
     LOG_ERROR ("No IP addresses found\n");
     return;
   }
+  /* Number of rounds required to contact all processes except ourselves 
(round_width
+     in parallel in each round) */
+  total_rounds = ((nproc - 1) + (round_width - 1)) / round_width;
   schedule_next_round ();
   shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL, 
                                                 &do_shutdown, NULL);
@@ -1476,13 +1481,13 @@
   static const struct GNUNET_GETOPT_CommandLineOption options[] = {
     {'w', "round-width", "COUNT",
      "set the size of each round to COUNT",
-     GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+     GNUNET_YES, &GNUNET_GETOPT_set_uint, &round_width},
     GNUNET_GETOPT_OPTION_END
   };
   int ret;
 
   ret = 1;
-  rwidth = 1;
+  round_width = 1;
   GNUNET_log_setup ("mshd", NULL, NULL);
   if (MPI_SUCCESS != MPI_Init(&argc, &argv))
   {
@@ -1494,7 +1499,7 @@
     LOG_ERROR ("Cannot determine the number of mshd processes\n");
     goto fail;
   }
-  if (nproc <= rwidth)
+  if (nproc <= round_width)
   {
     LOG_ERROR ("Given round width is greater than or equal to number of mshd 
processes\n");
     goto fail;

Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c    2014-02-24 16:25:43 UTC (rev 32465)
+++ msh/src/reduce.c    2014-02-24 17:32:45 UTC (rev 32466)
@@ -96,16 +96,19 @@
     {
       iaddr_msgs[nr] = (struct MSH_MSG_InstanceAdresses *) (buf + grow);
       grow += ntohs (iaddr_msgs[nr]->header.size);
-      if (0 > addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
+      if (0 >= addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
       {
         LOG_ERROR ("No common address found for instance %u\n", 
                    ntohs (iaddr_msgs[nr]->rank));
         break;
       }
     }
-    LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
     GNUNET_free (buf);
     GNUNET_free (iaddr_msgs);
+    if (nr == nmsg)
+      LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
+    else
+      return GNUNET_SYSERR;
   }
   GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_NONCE, &shash);
   if (MPI_SUCCESS != MPI_Bcast (&shash, sizeof (struct GNUNET_HashCode),




reply via email to

[Prev in Thread] Current Thread [Next in Thread]