[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28336 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28336 - msh/src |
Date: |
Mon, 29 Jul 2013 16:21:24 +0200 |
Author: harsha
Date: 2013-07-29 16:21:24 +0200 (Mon, 29 Jul 2013)
New Revision: 28336
Modified:
msh/src/mshd.c
msh/src/reduce.c
Log:
- fixed reduce algorithm
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-07-29 07:10:22 UTC (rev 28335)
+++ msh/src/mshd.c 2013-07-29 14:21:24 UTC (rev 28336)
@@ -892,6 +892,11 @@
LOG_ERROR ("Round width cannot be 0. Exiting\n");
return;
}
+ if (nproc <= rwidth)
+ {
+ LOG_ERROR ("Round width should be less than the number of processes\n");
+ return;
+ }
bitmap = bitmap_create (rwidth);
addrmap = addressmap_create (nproc);
fh = GNUNET_DISK_pipe_handle (sigpipe, GNUNET_DISK_PIPE_END_READ);
@@ -1001,7 +1006,7 @@
}
GNUNET_break (MPI_SUCCESS == MPI_Finalize());
GNUNET_free_non_null (s_addrs);
- LOG_ERROR ("Returning\n");
+ LOG_DEBUG ("Returning\n");
uninstall_sighandlers:
if (NULL != listen_socket)
Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c 2013-07-29 07:10:22 UTC (rev 28335)
+++ msh/src/reduce.c 2013-07-29 14:21:24 UTC (rev 28336)
@@ -18,6 +18,9 @@
#define LOG_ERROR(...) LOG(GNUNET_ERROR_TYPE_ERROR, __VA_ARGS__)
+
+
+
/**
* Send our addressmap to the instance with the given rank
*
@@ -100,7 +103,7 @@
struct MSH_MSG_InstanceAdresses *msg;
MPI_Status stat;
- unsigned int step_width;
+ unsigned int width;
unsigned int lb;
unsigned int ub;
unsigned int cnt;
@@ -108,26 +111,28 @@
int ret;
int msize;
- step_width = (unsigned int) pow (rwidth, step);
- lb = 0;
- ub = 0;
- ret = GNUNET_SYSERR;
- if (rank + step_width <= nproc)
+ width = (rwidth <= 1 ? 2 : rwidth);
+ width--; /* we don't receive from us */
+ lb = rank + 1;
+ if (lb == nproc)
+ lb = rank;
+ ub = lb;
+ for (cnt = 0; cnt < width; cnt++)
{
- nrecv = rwidth;
- lb = rank + 1;
- ub = rank + step_width - 1;
+ if (ub + pow (width, step) >= nproc)
+ break;
+ ub = ub + pow (width, step);
}
- else
+ nrecv = cnt;
+ if (0 == nrecv)
{
- nrecv = ((nproc - rank) - 1) / step_width;
- if (0 == nrecv)
- return GNUNET_OK;
- lb = rank + 1;
- ub = nproc - 1;
+ LOG_DEBUG ("Not waiting to receive address map\n");
+ return GNUNET_OK;
}
GNUNET_assert (nrecv >= 0);
nrecv *= nproc; /* we get a message for each instance from each instance */
+ LOG_DEBUG ("%d: Waiting to receive %d instance address messages\n", rank,
nrecv);
+ ret = GNUNET_SYSERR;
for (cnt = 0; cnt < nrecv; cnt++)
{
msg = NULL;
@@ -140,6 +145,7 @@
if (!((lb <= stat.MPI_SOURCE ) && (stat.MPI_SOURCE <= ub)))
{
GNUNET_break (0);
+ LOG_ERROR ("Received a message from unexpected source %d\n",
stat.MPI_SOURCE);
goto cleanup;
}
LOG_DEBUG ("%d: Receiving %d (nd/th) addressmap message from instance
%d\n",
@@ -191,19 +197,23 @@
unsigned int max_steps;
unsigned int aggregator;
unsigned int step_width;
-
- max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double)
rwidth) );
- for (step = 1; step < max_steps; step++)
+ unsigned int width;
+
+ width = (rwidth <= 1 ? 2 : rwidth);
+ max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double)
width));
+ LOG_DEBUG ("Reduction with max steps: %u\n", max_steps);
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ return GNUNET_SYSERR;
+ for (step = 0; step < max_steps; step++)
{
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- return GNUNET_SYSERR;
LOG_DEBUG ("Reduction step %u\n", step);
- step_width = (unsigned int) pow (rwidth, step);
+ step_width = (unsigned int) pow (width, step + 1);
if (0 != (aggregator = (rank % step_width)))
{
aggregator = rank - aggregator;
if (GNUNET_SYSERR == send_addressmap (aggregator))
return GNUNET_SYSERR;
+ return GNUNET_OK;
}
/* receive address maps */
if (GNUNET_SYSERR == receive_addressmap (step))
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28336 - msh/src,
gnunet <=