gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r28338 - msh/src


From: gnunet
Subject: [GNUnet-SVN] r28338 - msh/src
Date: Tue, 30 Jul 2013 15:45:46 +0200

Author: harsha
Date: 2013-07-30 15:45:46 +0200 (Tue, 30 Jul 2013)
New Revision: 28338

Modified:
   msh/src/reduce.c
Log:
-addressmap consensus by broadcasting


Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c    2013-07-30 12:22:57 UTC (rev 28337)
+++ msh/src/reduce.c    2013-07-30 13:45:46 UTC (rev 28338)
@@ -19,205 +19,93 @@
 #define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
 
 
-
-
 /**
- * Send our addressmap to the instance with the given rank
+ * Perform an ntree reduction on address maps
  *
- * @param instance the rank of the instance to which our addressmap has to be 
sent
  * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
  */
-static int
-send_addressmap (int instance)
-{ 
+int
+reduce_ntree ()
+{
   struct MSH_MSG_InstanceAdresses **iaddr_msgs;
-  MPI_Request *send_reqs;
-  MPI_Status *stats;
+  char *buf;
   int nmsg;
-  int cnt;
-  int ret;
-
-  ret = GNUNET_SYSERR;
-  send_reqs = NULL;
-  if (GNUNET_SYSERR == (nmsg = addressmap_pack (addrmap, &iaddr_msgs)))
-  {
-    GNUNET_break (0);
-    return ret;
-  }
-  send_reqs = GNUNET_malloc (sizeof (MPI_Request) * nmsg);
-  LOG_DEBUG ("%d: Sending addressmap to instance %d\n", rank, instance);
-  for (cnt = 0; cnt < nmsg; cnt++)
-  {
-    if (MPI_SUCCESS != 
-        MPI_Isend (iaddr_msgs[cnt], ntohs (iaddr_msgs[cnt]->header.size), 
MPI_BYTE,
-                   instance, MSH_MTYPE_INSTANCE_ADDRESS, MPI_COMM_WORLD, 
-                   &send_reqs[cnt]))
-    {
-      GNUNET_break (0);
-      cnt--;
-      goto cleanup;
-    }
-  }
-  stats = GNUNET_malloc (sizeof (MPI_Status) * nmsg);
-  if (MPI_SUCCESS != MPI_Waitall (nmsg, send_reqs, stats))
-  {
-    GNUNET_break (0);
-    goto cleanup;
-  }
-  GNUNET_free (send_reqs);
-  send_reqs = NULL;
-  for (cnt = 0; cnt < nmsg; cnt++)
-  {
-    if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
-    {
-      GNUNET_break (0);
-      goto cleanup;
-    }
-  }
-  ret = GNUNET_OK;
+  unsigned int cnt;
+  unsigned int grow;
+  unsigned int size;
+  unsigned int preamble[2];         /* 0: total size; 1: n messages */
+  unsigned int nr;
   
- cleanup:
-  for (;(cnt > 0) && (NULL != send_reqs); cnt--)
+  for (cnt = 0; cnt < nproc; cnt++)
   {
-    GNUNET_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt - 1]));
-    GNUNET_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt - 1], 
MPI_STATUS_IGNORE));
-  }
-  for (cnt = 0; cnt < nmsg; cnt++)
-    free (iaddr_msgs[cnt]);
-  GNUNET_free_non_null (iaddr_msgs);
-  GNUNET_free_non_null (send_reqs);
-  GNUNET_free_non_null (stats);
-  return ret;
-}
-
-
-/**
- * Receive address maps during a given reduction step
- *
- * @param step the current reduction step
- * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
- */
-static int
-receive_addressmap (unsigned int step)
-{
-  
-  struct MSH_MSG_InstanceAdresses *msg;
-  MPI_Status stat;
-  unsigned int width;
-  unsigned int lb;
-  unsigned int ub;
-  unsigned int cnt;
-  int nrecv;
-  int ret;
-  int msize;
-
-  width = (rwidth <= 1 ? 2 : rwidth);
-  width--;                      /* we don't receive from us */
-  lb = rank + 1;
-  if (lb == nproc)
-    lb = rank;
-  ub = lb;
-  for (cnt = 0; cnt < width; cnt++)
-  {
-    if (ub + pow (width, step) >= nproc)
-      break;
-    ub = ub + pow (width, step);    
-  }
-  nrecv = cnt;
-  if (0 == nrecv)
-  {
-    LOG_DEBUG ("Not waiting to receive address map\n");
-    return GNUNET_OK;
-  }
-  GNUNET_assert (nrecv >= 0);
-  nrecv *= nproc; /* we get a message for each instance from each instance */
-  LOG_DEBUG ("%d: Waiting to receive %d instance address messages\n", rank, 
nrecv);
-  ret = GNUNET_SYSERR;
-  for (cnt = 0; cnt < nrecv; cnt++)
-  {
-    msg = NULL;
-    if (MPI_SUCCESS != MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_INSTANCE_ADDRESS,
-                                  MPI_COMM_WORLD, &stat))
+    buf = NULL;
+    size = 0;
+    grow = 0;
+    preamble[0] = 0;
+    preamble[1] = 0;
+    iaddr_msgs = NULL;
+    if (rank == cnt)
     {
-      GNUNET_break (0);
-      goto cleanup;
+        if (GNUNET_SYSERR == (nmsg = addressmap_pack (addrmap, &iaddr_msgs)))
+        {
+          GNUNET_break (0);
+          return GNUNET_SYSERR;
+        }
+        for (nr = 0; nr < nmsg; nr++)
+        {
+          grow = ntohs (iaddr_msgs[nr]->header.size);
+          buf = GNUNET_realloc (buf, size + grow);
+          (void) memcpy (buf + size, iaddr_msgs[nr], grow);
+          size += grow;
+        }
+        preamble[0] = size;
+        preamble[1] = nmsg;
+        LOG_DEBUG ("Broadcasting address map from instance %u\n", cnt);
     }
-    if (!((lb <= stat.MPI_SOURCE ) && (stat.MPI_SOURCE <= ub)))
+    if (MPI_SUCCESS != MPI_Bcast (preamble, 2, MPI_UNSIGNED, cnt, 
MPI_COMM_WORLD))
     {
       GNUNET_break (0);
-      LOG_ERROR ("Received a message from unexpected source %d\n", 
stat.MPI_SOURCE);
-      goto cleanup;
+      return GNUNET_SYSERR;
     }
-    LOG_DEBUG ("%d: Receiving %d (nd/th) addressmap message from instance 
%d\n",
-               rank, cnt, stat.MPI_SOURCE);
-    msize = 0;
-    if ((MPI_SUCCESS != MPI_Get_elements (&stat, MPI_BYTE, &msize))
-        || (msize <= 0))
+    if (rank != cnt)
     {
-      GNUNET_break (0);
-      goto cleanup;
+      size = preamble[0];
+      nmsg = preamble[1];
+      buf = GNUNET_malloc (size);
     }
-    msg = GNUNET_malloc (msize);
-    if (MPI_SUCCESS != MPI_Recv (msg, msize, MPI_BYTE, stat.MPI_SOURCE,
-                                 MSH_MTYPE_INSTANCE_ADDRESS, MPI_COMM_WORLD, 
-                                 MPI_STATUS_IGNORE))
+    GNUNET_assert (NULL != buf);
+    GNUNET_assert (0 < size);
+    if (MPI_SUCCESS != MPI_Bcast (buf, size, MPI_BYTE, cnt, MPI_COMM_WORLD))
     {
       GNUNET_break (0);
-      goto cleanup;
+      return GNUNET_SYSERR;
     }
-    LOG_DEBUG ("%d: [%u/%d] Received an instance address from %d\n", rank,
-               cnt + 1, nrecv, stat.MPI_SOURCE);
-    if (0 >= addressmap_intersect_msg (addrmap, msg))
+    if (rank == cnt)
     {
-      LOG_ERROR ("%d: No common addresses found for instance %d\n", rank, 
-                 ntohs (msg->rank));
-      goto cleanup;
+      GNUNET_free (buf);
+      for (nr = 0; nr < nmsg; nr++)
+        GNUNET_free (iaddr_msgs[nr]);
+      GNUNET_free (iaddr_msgs);
+      continue;
     }
-    free (msg);
-    msg = NULL;
-  }
-  
-  ret = GNUNET_OK;
 
- cleanup:
-  GNUNET_free_non_null (msg);
-  return ret;
-}
-
-
-/**
- * Perform an ntree reduction on address maps
- *
- * @return GNUNET_OK upon success; GNUNET_SYSERR upon failure
- */
-int
-reduce_ntree ()
-{
-  unsigned int step;
-  unsigned int max_steps;
-  unsigned int aggregator;
-  unsigned int step_width;
-  unsigned int width;
-
-  width = (rwidth <= 1 ? 2 : rwidth);
-  max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double) 
width));
-  LOG_DEBUG ("Reduction with max steps: %u\n", max_steps);
-  if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
-    return GNUNET_SYSERR;
-  for (step = 0; step < max_steps; step++)
-  {
-    LOG_DEBUG ("Reduction step %u\n", step);
-    step_width = (unsigned int) pow (width, step + 1);
-    if (0 != (aggregator = (rank % step_width)))
+    iaddr_msgs = GNUNET_malloc (sizeof (struct MSH_MSG_InstanceAdresses *) *
+                                nmsg);
+    grow = 0;    
+    for (nr = 0; nr < nmsg; nr++)
     {
-      aggregator = rank - aggregator;
-      if (GNUNET_SYSERR == send_addressmap (aggregator))
-        return GNUNET_SYSERR;
-      return GNUNET_OK;
+      iaddr_msgs[nr] = (struct MSH_MSG_InstanceAdresses *) (buf + grow);
+      grow += ntohs (iaddr_msgs[nr]->header.size);
+      if (0 > addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
+      {
+        LOG_ERROR ("No common address found for instance %u\n", 
+                   ntohs (iaddr_msgs[nr]->rank));
+        break;
+      }
     }
-    /* receive address maps */
-    if (GNUNET_SYSERR  == receive_addressmap (step))
-      return GNUNET_SYSERR;
+    LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
+    GNUNET_free (buf);
+    GNUNET_free (iaddr_msgs);
   }
   return GNUNET_OK;
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]