[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r27799 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r27799 - msh/src |
Date: |
Mon, 8 Jul 2013 18:02:51 +0200 |
Author: harsha
Date: 2013-07-08 18:02:51 +0200 (Mon, 08 Jul 2013)
New Revision: 27799
Modified:
msh/src/mshd.c
msh/src/mtypes.h
msh/src/test_scheduler_socket.c
msh/src/util.c
Log:
- round interactions
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/mshd.c 2013-07-08 16:02:51 UTC (rev 27799)
@@ -2,6 +2,7 @@
#include <mpi.h>
#include "util.h"
#include "scheduler.h"
+#include "mtypes.h"
/**
* The port number of our local socket
@@ -19,9 +20,9 @@
static int rank;
/**
- * Array of the string representation of our IP addresses
+ * Array of our IP addresses in network-byte format
*/
-static char **ip_addr_str;
+static in_addr_t *s_addrs;
/**
* Number of IP addresses
@@ -132,19 +133,88 @@
socklen_t addrlen)
{
char hostip[NI_MAXHOST];
+ const struct sockaddr_in *inaddr;
if (sizeof (struct sockaddr_in) != addrlen)
return MSH_OK; /* Only consider IPv4 for now */
if (0 !=
getnameinfo (addr, addrlen, hostip, NI_MAXHOST, NULL, 0, NI_NUMERICHOST))
+ {
LOG_STRERROR ("getnameinfo");
- MSH_array_append (ip_addr_str, nips, strdup (hostip));
+ return MSH_OK;
+ }
+ inaddr = (const struct sockaddr_in *) addr;
+ MSH_array_append (s_addrs, nips, inaddr->sin_addr.s_addr);
LOG_DEBUG ("%d: Found IP: %s\n", rank, hostip);
return MSH_OK;
}
+struct ReadContext
+{
+ struct ReadContext *next;
+
+ struct ReadContext *prev;
+
+ /* struct sockaddr_in addr; */
+
+ /* socklen_t addrlen; */
+
+ struct Task *task;
+};
+
+static struct ReadContext *rhead;
+
+static struct ReadContext *rtail;
+
+
/**
+ * Task to read from socket
+ *
+ * @param sock the socket
+ * @param flags EV_* flags
+ * @param cls &atask
+ */
+static void
+read_socket (evutil_socket_t sock, short flags, void *cls)
+{
+ struct ReadContext *ctx = cls;
+ ssize_t rsize;
+ uint32_t cid;
+
+ scheduler_remove (ctx->task);
+ DLL_remove (rhead, rtail, ctx);
+ free (ctx);
+ if (IS_SHUTDOWN_EVENT (flags))
+ {
+ MSH_close (sock);
+ return;
+ }
+ rsize = read (sock, &cid, sizeof (cid));
+ if (rsize < 0)
+ {
+ LOG_STRERROR ("read");
+ goto err_ret;
+ }
+ if (rsize == 0)
+ {
+ MSH_break (0);
+ goto err_ret;
+ }
+ cid = ntohl (cid);
+ if (!barray_isset (cid))
+ barray_set (cid);
+ MSH_close (sock);
+ return;
+
+ err_ret:
+ MSH_close (sock);
+ scheduler_shutdown ();
+ return;
+}
+
+
+/**
* Task to call accept and close on a listening socket
*
* @param sock the socket
@@ -154,6 +224,9 @@
static void
accept_task (evutil_socket_t sock, short flags, void *cls)
{
+ struct ReadContext *rctx;
+ int csock;
+
scheduler_remove (atask);
atask = NULL;
if (IS_SHUTDOWN_EVENT (flags))
@@ -162,7 +235,82 @@
return;
}
LOG_DEBUG ("Got a connect\n");
+ if (0 > (csock = accept4 (sock, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC)))
+ {
+ LOG_STRERROR ("accept4");
+ MSH_close (sock);
+ scheduler_shutdown ();
+ return;
+ }
+ rctx = MSH_malloc (sizeof (struct ReadContext));
+ DLL_insert_tail (rhead, rtail, rctx);
+ rctx->task = scheduler_add_socket (csock, EV_READ, &read_socket, rctx, NULL);
+}
+
+
+static int
+receive_addresses ()
+{
+ struct MSH_MSG_VerifyAddress **rmsgs;
+ MPI_Status status;
+ int rsize;
+ int lb;
+ int up;
+ int source;
+ int ret;
+ int cnt;
+
+ ret = MSH_SYSERR;
+ rmsgs = MSH_malloc (sizeof (struct MSH_MSG_VerifyAddress *) * rwidth);
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+ &status);
+ MPI_Get_elements (&status, MPI_BYTE, &rsize);
+ /* We expect a message from peers with id p in the range:
+ (rank - round * rwidth - rwidth) <= p <= (rank - (round * rwidth) -1) */
+ lb = rank - round * rwidth - rwidth + nproc;
+ up = rank - (round * rwidth) - 1 + nproc;
+ MSH_assert (lb >= 0);
+ MSH_assert (up >= 0);
+ lb %= nproc;
+ up %= nproc;
+ source = status.MPI_SOURCE;
+ if (lb == up)
+ if (source != lb)
+ {
+ MSH_break (0);
+ LOG_ERROR ("%d: Error: source %d; lb: %d; up: %d\n", rank, source, lb,
up);
+ goto err_ret;
+ }
+ else if ((source > up) || (source < lb))
+ {
+ MSH_break (0);
+ goto err_ret;
+ }
+ rmsgs[cnt] = MSH_malloc (rsize);
+ if (MPI_SUCCESS != MPI_Recv (rmsgs[cnt], rsize, MPI_BYTE, source,
+ MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
+ MPI_STATUS_IGNORE))
+ {
+ MSH_break (0);
+ goto err_ret;
+ }
+ LOG_DEBUG ("Received message of size %d from %d\n", rsize, source);
+ }
+ /* remove this later on and do something useful */
+ for (cnt = 0; cnt < rwidth; cnt++)
+ {
+ MSH_free_non_null (rmsgs[cnt]);
+ rmsgs[cnt] = NULL;
+ }
+ ret = MSH_OK;
+ err_ret:
+ for (cnt = 0; cnt < rwidth; cnt++)
+ MSH_free_non_null (rmsgs[cnt]);
+ free (rmsgs);
+ return ret;
}
@@ -173,10 +321,65 @@
* @return MSH_OK on success; MSH_SYSERR upon error
*/
static int
-send_addresses (int rank)
+send_receive_addresses ()
{
- MSH_break (0);
- return MSH_OK;
+ struct MSH_MSG_VerifyAddress *msg;
+ struct MSH_MSG_VerifyAddress *cpys;
+ MPI_Request *sreqs;
+ size_t msize;
+ int cnt;
+ int ret;
+ int target;
+
+ msize = sizeof (struct MSH_MSG_VerifyAddress) + (nips * sizeof (uint32_t));
+ msg = MSH_malloc (msize);
+ msg->header.size = htons (msize);
+ msg->port = htons (lport);
+ msg->nips = htons (nips);
+ for (cnt = 0; cnt < nips; cnt++)
+ msg->ipaddrs[cnt] = (uint32_t) s_addrs[cnt]; /* IPs already in NB */
+ cpys = NULL;
+ cpys = MSH_malloc (msize * rwidth);
+ sreqs = MSH_malloc (rwidth);
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ (void) memcpy (&cpys[cnt], msg, msize);
+ target = (round * rwidth) + cnt + 1;
+ MSH_assert (target < nproc);
+ target = (rank + target) % nproc;
+ ret = MPI_Isend (&cpys[cnt], msize, MPI_BYTE, target,
+ MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD, &sreqs[cnt]);
+ if (MPI_SUCCESS != ret)
+ break;
+ }
+ free (msg);
+ msg = NULL;
+ if (cnt != rwidth)
+ {
+ for (cnt--; cnt >= 0; cnt--)
+ {
+ MSH_break (MPI_SUCCESS == MPI_Cancel (&sreqs[cnt]));
+ MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+ }
+ goto end;
+ }
+ for (cnt=0; cnt < rwidth; cnt++)
+ {
+ MSH_break (MPI_SUCCESS == MPI_Wait (&sreqs[cnt], MPI_STATUS_IGNORE));
+ }
+ LOG_DEBUG ("%d: Round: %d -- All messages sent successfully\n", rank, round);
+ if (NULL != cpys)
+ {
+ free (cpys);
+ cpys = NULL;
+ }
+ if (MSH_SYSERR == receive_addresses ())
+ goto end;
+
+ end:
+ MSH_free_non_null (cpys);
+ MSH_free_non_null (sreqs);
+ return (MPI_SUCCESS == ret) ? MSH_OK : MSH_SYSERR;
}
@@ -192,7 +395,6 @@
struct sockaddr_in addr;
socklen_t addrlen;
int sock;
- unsigned int cnt;
addrlen = sizeof (struct sockaddr_in);
(void) memset (&addr, 0, addrlen);
@@ -211,15 +413,14 @@
MSH_break (0);
goto clo_ret;
}
- for (cnt = 0; cnt < rwidth; cnt++)
- if (MSH_SYSERR == send_addresses ((round * rwidth) + cnt))
- goto clo_ret;
+ if (MSH_SYSERR == send_receive_addresses ())
+ goto clo_ret;
+ atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
{
MSH_break (0);
goto clo_ret;
}
- atask = scheduler_add_socket (sock, EV_READ, &accept_task, &atask, NULL);
return MSH_OK;
clo_ret:
@@ -267,12 +468,16 @@
static void
schedule_next_round ()
{
+ int trounds;
+
MSH_assert (NULL == rtask);
- if (round < ( (nproc + (rwidth - 1)) / rwidth) )
- {
- round++;
+ /* Number of rounds required to contact all processes except ourselves
(rwidth
+ in parallel in each round) */
+ trounds = ((nproc - 1) + (rwidth - 1)) / rwidth;
+ if (round < trounds)
rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
- }
+ else
+ LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
}
@@ -291,7 +496,10 @@
if (IS_SHUTDOWN_EVENT (flags))
return;
if (MSH_OK == verify_addresses ())
+ {
+ round++;
schedule_next_round ();
+ }
}
@@ -310,16 +518,58 @@
&sigshut_tasks[0], NULL);
sigshut_tasks[1] = scheduler_add_signal (SIGTERM, &sig_shutdown,
&sigshut_tasks[1], NULL);
- rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+ //rtask = scheduler_add (&run_round, NULL, TV_IMMEDIATE);
+ schedule_next_round ();
}
+static void
+print_help ()
+{
+ char *msg =
+"mshd: MSH daemon.\n"
+"This binary is a part of "PACKAGE_NAME "-" PACKAGE_VERSION " available from "
PACKAGE_URL ".\n"
+"This program takes the following options:\n"
+" -w num\t: \t The number of processes which verify at each round.\n"
+" -h \t: \t Print this help\n"
+"Report bugs to " PACKAGE_BUGREPORT "\n"
+ ;
+
+ fprintf (stderr, "%s", msg);
+}
+
int
main (int argc, char **argv)
{
int ret;
+ int c;
ret = 1;
+ rwidth = 1;
+
+ while (-1 != (c = getopt (argc, argv, "hw:")))
+ {
+ switch (c)
+ {
+ case 'w':
+ if (1 != sscanf (optarg, "%u", rwidth))
+ {
+ LOG_ERROR ("-w option requires an unsinged number argument.\n");
+ print_help ();
+ return 1;
+ }
+ break;
+ case 'h':
+ print_help ();
+ return 0;
+ case '?':
+ print_help();
+ return 1;
+ default:
+ printf ("Unknown option: %c\n", c);
+ MSH_assert (0);
+ }
+ }
if (MPI_SUCCESS != MPI_Init(&argc, &argv))
{
LOG_ERROR ("Failed to initialise MPI\n");
@@ -330,6 +580,11 @@
LOG_ERROR ("Cannot determine the number of mshd processes\n");
goto fail;
}
+ if (nproc <= rwidth)
+ {
+ LOG_ERROR ("Given round width is greater than or equal to number of mshd
processes\n");
+ goto fail;
+ }
if (MPI_SUCCESS != MPI_Comm_rank (MPI_COMM_WORLD, &rank))
{
LOG_ERROR ("Cannot determine our MPI rank\n");
@@ -350,10 +605,11 @@
}
barray_destroy ();
ret = 0;
-
+
fail:
MSH_break (MPI_SUCCESS == MPI_Finalize());
- MSH_free_non_null (ip_addr_str);
- //libevent_global_shutdown ();
+ MSH_free_non_null (s_addrs);
+ //libevent_global_shutdown ();
+ LOG_ERROR ("Returning\n");
return ret;
}
Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h 2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/mtypes.h 2013-07-08 16:02:51 UTC (rev 27799)
@@ -38,6 +38,8 @@
/**
* IPv4 addresses to follow as 32 bit unsigned integeters
*/
+ uint32_t ipaddrs[0];
+
#if 0
/* Internet address. */
typedef uint32_t in_addr_t;
@@ -59,7 +61,7 @@
sizeof (struct in_addr)];
};
#endif
-}
+};
/*********************************************************************
Modified: msh/src/test_scheduler_socket.c
===================================================================
--- msh/src/test_scheduler_socket.c 2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/test_scheduler_socket.c 2013-07-08 16:02:51 UTC (rev 27799)
@@ -64,6 +64,7 @@
LOG_STRERROR ("accept4");
MSH_close (lsock);
scheduler_shutdown ();
+ return;
}
if (2 == ++result)
scheduler_shutdown ();
Modified: msh/src/util.c
===================================================================
--- msh/src/util.c 2013-07-08 14:26:50 UTC (rev 27798)
+++ msh/src/util.c 2013-07-08 16:02:51 UTC (rev 27799)
@@ -71,7 +71,7 @@
}
else
{
- MSH_malloc (size);
+ tmp = MSH_malloc (size);
if (*oldCount > newCount)
*oldCount = newCount; /* shrink is also allowed! */
memcpy (tmp, *old, elementSize * (*oldCount));
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r27799 - msh/src,
gnunet <=