[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r32466 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r32466 - msh/src |
Date: |
Mon, 24 Feb 2014 18:32:45 +0100 |
Author: harsha
Date: 2014-02-24 18:32:45 +0100 (Mon, 24 Feb 2014)
New Revision: 32466
Modified:
msh/src/mshd.c
msh/src/reduce.c
Log:
Shorten the round width to the remaining peers in the last round.
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2014-02-24 16:25:43 UTC (rev 32465)
+++ msh/src/mshd.c 2014-02-24 17:32:45 UTC (rev 32466)
@@ -189,12 +189,18 @@
int rank;
/**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
+ * Given width of the round -- how many other mshd instances verify our IP
+ * addresses in a round
*/
-unsigned int rwidth;
+unsigned int round_width;
/**
+ * The actual width of the current round. This should be less than or equal to
+ * round_width
+ */
+unsigned int current_round_width;
+
+/**
* The number of total mshd processes
*/
int nproc;
@@ -316,6 +322,11 @@
static unsigned int current_round;
/**
+ * Total number of rounds required
+ */
+static unsigned int total_rounds;
+
+/**
* The port number of our local socket
*/
uint16_t listen_port;
@@ -847,12 +858,8 @@
struct GNUNET_DISK_FileHandle *write_end;
sigset_t sigset;
intmax_t pid;
- int total_rounds;
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK == rtask);
- /* Number of rounds required to contact all processes except ourselves
(rwidth
- in parallel in each round) */
- total_rounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
if (current_round < total_rounds)
{
rtask = GNUNET_SCHEDULER_add_now (&run_round, NULL);
@@ -871,7 +878,11 @@
return;
}
LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
- GNUNET_break (GNUNET_OK == reduce_ntree ());
+ if (GNUNET_SYSERR == reduce_ntree ())
+ {
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
pid = (intmax_t) getpid ();
GNUNET_assert (0 < asprintf (&unixpath, "/%ju.sock", pid));
hostsfile = GNUNET_DISK_mktemp ("MSHD_HOSTS");
@@ -991,7 +1002,7 @@
{
cleanup_verifyaddressctx (ctx);
}
- for (cnt = 0; cnt < rwidth; cnt++)
+ for (cnt = 0; cnt < current_round_width; cnt++)
instance_address_info_destroy (riainfos[cnt]);
if (1 != bitmap_allset (bitmap))
{
@@ -1019,7 +1030,7 @@
int off;
ctx->close_task = GNUNET_SCHEDULER_NO_TASK;
- lb = rank - (current_round * rwidth) - rwidth + nproc;
+ lb = rank - (current_round * round_width) - current_round_width + nproc;
GNUNET_assert (0 <= lb);
lb %= nproc;
source = instance_address_info_get_rank (ctx->iainfo);
@@ -1198,8 +1209,8 @@
MPI_Status status;
int cnt;
- iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) * rwidth);
- for (cnt=0; cnt < rwidth; cnt++)
+ iainfos = GNUNET_malloc (sizeof (struct InstanceAddrInfo *) *
current_round_width);
+ for (cnt=0; cnt < current_round_width; cnt++)
{
struct MSH_MSG_VerifyAddress *msg;
int rsize;
@@ -1212,34 +1223,26 @@
MPI_COMM_WORLD, &status));
MPI_Get_elements (&status, MPI_BYTE, &rsize);
/* We expect a message from peers with id p in the range:
- (rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
- lb = rank - current_round * rwidth - rwidth + nproc;
- up = rank - (current_round * rwidth) - 1 + nproc;
+ (rank - current_round * round_width - current_round_width) < p <= (rank
- (current_round * round_width) -1) */
+ up = rank - (current_round * round_width) - 1 + nproc;
+ lb = (up - current_round_width) + nproc;
GNUNET_assert (lb >= 0);
GNUNET_assert (up >= 0);
lb %= nproc;
up %= nproc;
source = status.MPI_SOURCE;
- if (lb == up)
+ GNUNET_assert (lb != up);
+ if(lb < up)
{
- if (source != lb)
+ if ((source <= lb) || (source > up))
{
GNUNET_break (0);
- LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb,
up);
goto err_ret;
}
}
- else if(lb < up)
- {
- if ((source < lb) || (source > up))
- {
- GNUNET_break (0);
- goto err_ret;
- }
- }
else if (up < lb)
{
- if ((source > up) && (source < lb))
+ if ((source > up) && (source <= lb))
{
GNUNET_break (0);
goto err_ret;
@@ -1264,7 +1267,7 @@
return iainfos;
err_ret:
- for (cnt=0; cnt < rwidth; cnt++)
+ for (cnt=0; cnt < current_round_width; cnt++)
{
if (NULL != iainfos[cnt])
instance_address_info_destroy (iainfos[cnt]);
@@ -1290,7 +1293,6 @@
int cnt;
int ret;
int target;
- unsigned int width;
msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
msg = GNUNET_malloc (msize);
@@ -1301,16 +1303,13 @@
{
msg->ipaddrs[cnt] = htonl ((uint32_t) s_addrs[cnt]);
}
- width = rwidth;
- if ( (0 != ( (nproc - 1) % rwidth)) && (current_round == ( (nproc - 1) /
rwidth)) )
- width = (nproc - 1) % rwidth;
cpys = NULL;
- cpys = GNUNET_malloc (msize * width);
- sreqs = GNUNET_malloc (width * sizeof (MPI_Request));
- for (cnt=0; cnt < width; cnt++)
+ cpys = GNUNET_malloc (msize * current_round_width);
+ sreqs = GNUNET_malloc (current_round_width * sizeof (MPI_Request));
+ for (cnt=0; cnt < current_round_width; cnt++)
{
(void) memcpy (&cpys[cnt], msg, msize);
- target = (current_round * rwidth) + cnt + 1;
+ target = (current_round * round_width) + cnt + 1;
GNUNET_assert (target < nproc);
target = (rank + target) % nproc;
LOG_DEBUG ("%d: Sending message to %d\n", rank, target);
@@ -1321,7 +1320,7 @@
}
free (msg);
msg = NULL;
- if (cnt != width)
+ if (cnt != current_round_width)
{
for (cnt--; cnt >= 0; cnt--)
{
@@ -1330,7 +1329,7 @@
}
goto err_ret;
}
- for (cnt=0; cnt < width; cnt++)
+ for (cnt=0; cnt < current_round_width; cnt++)
{
GNUNET_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
}
@@ -1362,6 +1361,9 @@
{
unsigned int cnt;
+ current_round_width = round_width;
+ if ( (0 != ( (nproc - 1) % round_width)) && (current_round == ( (nproc - 1)
/ round_width)) )
+ current_round_width = (nproc - 1) % round_width;
if (GNUNET_SYSERR == send_addresses ())
return GNUNET_SYSERR;
if (NULL == (riainfos = receive_addresses ()))
@@ -1374,7 +1376,7 @@
GNUNET_break (0);
return GNUNET_SYSERR;
}
- for (cnt = 0; cnt < rwidth; cnt++)
+ for (cnt = 0; cnt < current_round_width; cnt++)
verify_addresses (riainfos[cnt]);
finalise_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&finalise_round, NULL);
@@ -1414,12 +1416,12 @@
unsigned int cnt;
LOG_DEBUG ("Running main task\n");
- if (0 == rwidth)
+ if (0 == round_width)
{
LOG_ERROR ("Round width cannot be 0. Exiting\n");
return;
}
- if (nproc <= rwidth)
+ if (nproc <= round_width)
{
LOG_ERROR ("Round width should be less than the number of processes\n");
return;
@@ -1431,12 +1433,12 @@
return;
}
run_args = copy_argv (args);
- bitmap = bitmap_create (rwidth);
+ bitmap = bitmap_create (round_width);
addrmap = addressmap_create (nproc);
addrlen = sizeof (struct sockaddr_in);
(void) memset (&addr, 0, addrlen);
addr.sin_addr.s_addr = INADDR_ANY; /* bind to all available addresses */
- listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
rwidth);
+ listen_socket = open_listen_socket ((struct sockaddr *) &addr, addrlen,
round_width);
listen_port = ntohs (addr.sin_port);
if (NULL == listen_socket)
return;
@@ -1452,6 +1454,9 @@
LOG_ERROR ("No IP addresses found\n");
return;
}
+ /* Number of rounds required to contact all processes except ourselves
(round_width
+ in parallel in each round) */
+ total_rounds = ((nproc - 1) + (round_width - 1)) / round_width;
schedule_next_round ();
shutdown_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&do_shutdown, NULL);
@@ -1476,13 +1481,13 @@
static const struct GNUNET_GETOPT_CommandLineOption options[] = {
{'w', "round-width", "COUNT",
"set the size of each round to COUNT",
- GNUNET_YES, &GNUNET_GETOPT_set_uint, &rwidth},
+ GNUNET_YES, &GNUNET_GETOPT_set_uint, &round_width},
GNUNET_GETOPT_OPTION_END
};
int ret;
ret = 1;
- rwidth = 1;
+ round_width = 1;
GNUNET_log_setup ("mshd", NULL, NULL);
if (MPI_SUCCESS != MPI_Init(&argc, &argv))
{
@@ -1494,7 +1499,7 @@
LOG_ERROR ("Cannot determine the number of mshd processes\n");
goto fail;
}
- if (nproc <= rwidth)
+ if (nproc <= round_width)
{
LOG_ERROR ("Given round width is greater than or equal to number of mshd
processes\n");
goto fail;
Modified: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c 2014-02-24 16:25:43 UTC (rev 32465)
+++ msh/src/reduce.c 2014-02-24 17:32:45 UTC (rev 32466)
@@ -96,16 +96,19 @@
{
iaddr_msgs[nr] = (struct MSH_MSG_InstanceAdresses *) (buf + grow);
grow += ntohs (iaddr_msgs[nr]->header.size);
- if (0 > addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
+ if (0 >= addressmap_intersect_msg (addrmap, iaddr_msgs[nr]))
{
LOG_ERROR ("No common address found for instance %u\n",
ntohs (iaddr_msgs[nr]->rank));
break;
}
}
- LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
GNUNET_free (buf);
GNUNET_free (iaddr_msgs);
+ if (nr == nmsg)
+ LOG_DEBUG ("Intersected received addressmap from instance %u\n", cnt);
+ else
+ return GNUNET_SYSERR;
}
GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_NONCE, &shash);
if (MPI_SUCCESS != MPI_Bcast (&shash, sizeof (struct GNUNET_HashCode),
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r32466 - msh/src,
gnunet <=