[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28168 - msh/src
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28168 - msh/src |
Date: |
Thu, 18 Jul 2013 15:49:33 +0200 |
Author: harsha
Date: 2013-07-18 15:49:32 +0200 (Thu, 18 Jul 2013)
New Revision: 28168
Added:
msh/src/mshd.h
msh/src/reduce.c
msh/src/reduce.h
Modified:
msh/src/Makefile.am
msh/src/addressmap.c
msh/src/addressmap.h
msh/src/mshd.c
msh/src/mtypes.h
msh/src/test_addressmap.c
Log:
- removed MSG_MSG_AddressMap
- move reduce code to separate module
Modified: msh/src/Makefile.am
===================================================================
--- msh/src/Makefile.am 2013-07-18 13:47:39 UTC (rev 28167)
+++ msh/src/Makefile.am 2013-07-18 13:49:32 UTC (rev 28168)
@@ -2,8 +2,8 @@
mping_SOURCES = mping.c
-mshd_SOURCES = mshd.c util.c util.h scheduler.c scheduler.h \
- common.h bitmap.c bitmap.h addressmap.c addressmap.h
+mshd_SOURCES = mshd.c mshd.h util.c util.h scheduler.c scheduler.h \
+ common.h bitmap.c bitmap.h addressmap.c addressmap.h reduce.h reduce.c
mshd_LDADD = -levent -lm
mshd_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
mshd_LDFLAGS = $(LIBEVENT_LDFLAGS)
Modified: msh/src/addressmap.c
===================================================================
--- msh/src/addressmap.c 2013-07-18 13:47:39 UTC (rev 28167)
+++ msh/src/addressmap.c 2013-07-18 13:49:32 UTC (rev 28168)
@@ -423,21 +423,15 @@
* instance for merging into its address map
*
* @param m the address map
- * @param map_msg the message describing the contents of the address map. This
- * message is to be sent before sending the instance addresses in the
- * address map. The memory allocated for the message is to be freed
by
- * the caller after sending the message
* @param iaddr_msgs instance addresses. The memory allocated for the instance
* address messages is to be freed by the caller after sending the
* messages
- * @return MSH_OK upon successful completion; msh_syserr upon failure (the
- * contents of map_msg and iaddr_msgs are left unchanged)
+ * @return the number of message in iaddr_msgs; MSH_SYSERR upon failure (the
+ * contents of iaddr_msgs are left unchanged)
*/
int
-addressmap_pack (AddressMap *m, struct MSH_MSG_AddressMap **map_msg,
- struct MSH_MSG_InstanceAdresses ***iaddr_msgs)
+addressmap_pack (AddressMap *m, struct MSH_MSG_InstanceAdresses ***iaddr_msgs)
{
- struct MSH_MSG_AddressMap *_map_msg;
struct MSH_MSG_InstanceAdresses **_iaddr_msgs;
struct InstanceAddrInfo *iainfo;
struct InstanceAddr *iaddr;
@@ -450,20 +444,17 @@
for (cnt = 0; cnt < m->size; cnt++)
{
if (NULL == m->map[cnt])
- continue;
+ {
+ LOG_ERROR ("Addresses for an instance is missing\n");
+ return MSH_SYSERR;
+ }
n++;
}
cnt = n;
- msize = sizeof (struct MSH_MSG_AddressMap);
- _map_msg = MSH_malloc (msize);
- _map_msg->header.size = htons (msize);
- _map_msg->num = htons (n);
-
_iaddr_msgs = MSH_malloc (sizeof (struct MSH_MSG_InstanceAdresses *) * n);
for (cnt = 0; cnt < m->size; cnt++)
{
- if (NULL == (iainfo = m->map[cnt]))
- continue;
+ MSH_assert (NULL != (iainfo = m->map[cnt]));
msize = sizeof (struct MSH_MSG_InstanceAdresses)
+ (sizeof (uint32_t) * iainfo->naddrs);
_iaddr_msgs[cnt] = MSH_malloc (msize);
@@ -476,10 +467,9 @@
MSH_assert (NULL != iaddr);
_iaddr_msgs[cnt]->ipaddrs[nip] = instance_address_ip (iaddr);
}
- }
- *map_msg = _map_msg;
+ }
*iaddr_msgs = _iaddr_msgs;
- return MSH_OK;
+ return m->size;
}
Modified: msh/src/addressmap.h
===================================================================
--- msh/src/addressmap.h 2013-07-18 13:47:39 UTC (rev 28167)
+++ msh/src/addressmap.h 2013-07-18 13:49:32 UTC (rev 28168)
@@ -219,19 +219,14 @@
* instance for merging into its address map
*
* @param m the address map
- * @param map_msg the message describing the contents of the address map. This
- * message is to be sent before sending the instance addresses in the
- * address map. The memory allocated for the message is to be freed
by
- * the caller after sending the message
* @param iaddr_msgs instance addresses. The memory allocated for the instance
* address messages is to be freed by the caller after sending the
* messages
- * @return MSH_OK upon successful completion; msh_syserr upon failure (the
- * contents of map_msg and iaddr_msgs are left unchanged)
+ * @return the number of message in iaddr_msgs; MSH_SYSERR upon failure (the
+ * contents of iaddr_msgs are left unchanged)
*/
int
-addressmap_pack (AddressMap *m, struct MSH_MSG_AddressMap **map_msg,
- struct MSH_MSG_InstanceAdresses ***iaddr_msgs);
+addressmap_pack (AddressMap *m, struct MSH_MSG_InstanceAdresses ***iaddr_msgs);
/**
Modified: msh/src/mshd.c
===================================================================
--- msh/src/mshd.c 2013-07-18 13:47:39 UTC (rev 28167)
+++ msh/src/mshd.c 2013-07-18 13:49:32 UTC (rev 28168)
@@ -1,6 +1,5 @@
#include "common.h"
#include <mpi.h>
-#include <math.h>
#include "util.h"
#include "scheduler.h"
#include "mtypes.h"
@@ -68,8 +67,33 @@
struct Task *task;
};
+/**
+ * Mapping for instance addresses
+ */
+AddressMap *addrmap;
/**
+ * Rank of this process
+ */
+int rank;
+
+/**
+ * width of the round -- how many other mshd instances verify our IP addresses
+ * in a round
+ */
+unsigned int rwidth;
+
+/**
+ * The number of total mshd processes
+ */
+int nproc;
+
+
+/****************************/
+/* static variables */
+/****************************/
+
+/**
* DLL head for address verification contexts
*/
static struct VerifyAddressesCtx *vactx_head;
@@ -126,21 +150,6 @@
static struct ReadContext *rtail;
/**
- * Mapping for instance addresses
- */
-static struct AddressMap *addrmap;
-
-/**
- * The number of total mshd processes
- */
-static int nproc;
-
-/**
- * Rank of this process
- */
-static int rank;
-
-/**
* The listen socket for the current round
*/
static int listen_sock;
@@ -156,12 +165,6 @@
static unsigned int current_round;
/**
- * width of the round -- how many other mshd instances verify our IP addresses
- * in a round
- */
-static unsigned int rwidth;
-
-/**
* The port number of our local socket
*/
uint16_t lport;
@@ -197,131 +200,6 @@
/**
- * Send our addressmap to the instance with the given rank
- *
- * @param rank the rank of the instance to which our addressmap has to be sent
- * @return MSH_OK upon success; MSH_SYSERR upon failure
- */
-static int
-send_addressmap (int rank)
-{
- struct MSH_MSG_AddressMap *map_msg;
- struct MSH_MSG_InstanceAdresses **iaddr_msgs;
- struct MSH_MessageHeader *msg;
- //MPI_Request **send_reqs;
- MPI_Request *send_reqs;
- MPI_Status *stats;
- unsigned int nmsg;
- int cnt;
- int ret;
- int type;
-
- ret = MSH_SYSERR;
- send_reqs = NULL;
- if (MSH_OK != addressmap_pack (addrmap, &map_msg, &iaddr_msgs))
- {
- MSH_break (0);
- return ret;
- }
- nmsg = ntohs (map_msg->num) + 1;
- //send_reqs = MSH_malloc (sizeof (MPI_Request *) * nmsg);
- send_reqs = MSH_malloc (sizeof (MPI_Request) * nmsg);
- for (cnt = 0; cnt < nmsg; cnt++)
- {
- if (0 == cnt)
- {
- msg = (struct MSH_MessageHeader *) map_msg;
- type = MSH_MTYPE_ADDRESS_MAP;
- }
- else
- {
- msg = (struct MSH_MessageHeader *) iaddr_msgs[cnt - 1];
- type = MSH_MTYPE_INSTANCE_ADDRESS;
- }
- //send_reqs[cnt] = MSH_malloc (sizeof (MPI_Request));
- if (MPI_SUCCESS !=
- MPI_Isend (msg, ntohs (msg->size), MPI_BYTE, rank, type,
- MPI_COMM_WORLD, &send_reqs[cnt]))
- {
- MSH_break (0);
- cnt--;
- goto cleanup;
- }
- }
- stats = MSH_malloc (sizeof (MPI_Status) * nmsg);
- if (MPI_SUCCESS != MPI_Waitall (nmsg, send_reqs, stats))
- {
- MSH_break (0);
- free (send_reqs);
- send_reqs = NULL;
- goto cleanup;
- }
- for (cnt = 0; cnt < nmsg; cnt++)
- {
- free (send_reqs[cnt]);
- send_reqs[cnt] = NULL;
- }
- for (cnt = 0; cnt < nmsg; cnt++)
- {
- if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
- {
- MSH_break (0);
- goto cleanup;
- }
- }
- ret = MSH_OK;
-
- cleanup:
- for (;(NULL != send_reqs) && (cnt >= 0); cnt--)
- {
- MSH_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt]));
- MSH_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt], MPI_STATUS_IGNORE));
- }
- for (cnt = 0; cnt < nmsg; cnt++)
- {
- if (0 == cnt)
- msg = (struct MSH_MessageHeader *) map_msg;
- else
- msg = (struct MSH_MessageHeader *) iaddr_msgs[cnt - 1];
- free (msg);
- }
- MSH_free_non_null (send_reqs);
- MSH_free_non_null (stats);
-
- return ret;
-}
-
-
-/**
- * Perform an ntree reduction on address maps
- *
- * @return MSH_OK upon success; MSH_SYSERR upon failure
- */
-static int
-ntree_reduction ()
-{
- unsigned int cnt;
- unsigned int max_steps;
- unsigned int aggregator;
- unsigned int step_width;
-
- max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double)
rwidth) );
- for (cnt = 0; cnt < max_steps; cnt++)
- {
- if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
- return MSH_SYSERR;
- step_width = (unsigned int) pow (rwidth, cnt);
- if (0 != (aggregator = (rank % step_width)))
- {
- aggregator = rank - aggregator;
- return send_addressmap (aggregator);
- }
- /* receive address maps */
- }
-}
-
-
-/**
* Callback function invoked for each interface found.
*
* @param cls closure
@@ -467,7 +345,7 @@
return;
}
LOG_DEBUG ("Verification phase complete; commencing reduction phase\n");
- //ntree_reduction ();
+ reduce_ntree ();
}
@@ -715,8 +593,9 @@
int source;
int ret;
- MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES, MPI_COMM_WORLD,
- &status);
+ MSH_break (MPI_SUCCESS ==
+ MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_VERIFY_ADDRESSES,
+ MPI_COMM_WORLD, &status));
MPI_Get_elements (&status, MPI_BYTE, &rsize);
/* We expect a message from peers with id p in the range:
(rank - current_round * rwidth - rwidth) <= p <= (rank - (current_round
* rwidth) -1) */
@@ -1063,7 +942,6 @@
MSH_break (0);
goto fail;
}
- //barray_destroy ();
ret = 0;
fail:
@@ -1079,7 +957,6 @@
}
MSH_break (MPI_SUCCESS == MPI_Finalize());
MSH_free_non_null (s_addrs);
- //libevent_global_shutdown ();
LOG_ERROR ("Returning\n");
return ret;
}
Added: msh/src/mshd.h
===================================================================
--- msh/src/mshd.h (rev 0)
+++ msh/src/mshd.h 2013-07-18 13:49:32 UTC (rev 28168)
@@ -0,0 +1,34 @@
+/**
+ * @file mshd.h
+ * @brief lists all external variables in the mshd binary
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#ifndef MSHD_H_
+#define MSHD_H_
+
+#include "common.h"
+#include "addressmap.h"
+
+/**
+ * Our addressmap; created in the main(). Do not destroy elsewhere.
+ */
+extern AddressMap *addrmap;
+
+/**
+ * Our instance rank
+ */
+extern int rank;
+
+/**
+ * round width
+ */
+extern unsigned int rwidth;
+
+/**
+ * Number of instances of mshd
+ */
+extern unsigned int nproc;
+
+#endif /* MSHD_H_ */
+/* End of mshd.h */
Modified: msh/src/mtypes.h
===================================================================
--- msh/src/mtypes.h 2013-07-18 13:47:39 UTC (rev 28167)
+++ msh/src/mtypes.h 2013-07-18 13:49:32 UTC (rev 28168)
@@ -77,26 +77,7 @@
/**
- * Message for signifying transmission of an address map. Use MPI tag
- * MSH_MTYPE_ADDRESS_MAP
- */
-struct MSH_MSG_AddressMap
-{
- /**
- * Header for this message
- */
- struct MSH_MessageHeader header;
-
- /**
- * Number of instances in this address map
- */
- uint16_t num MSH_PACKED;
-};
-
-
-/**
- * Structure for representing verified addresses of an instance. This does not
- * denote a message but is used in @see MSH_MSG_AddressMap. The type for these
+ * Structure for representing verified addresses of an instance. The type for
these
* messages should be MSH_MTYPE_INSTANCE_ADDRESS
*/
struct MSH_MSG_InstanceAdresses
@@ -124,17 +105,13 @@
};
-
-
/*********************************************************************
* MPI tag numbers for each message type
*********************************************************************/
#define MSH_MTYPE_VERIFY_ADDRESSES 100
-#define MSH_MTYPE_ADDRESS_MAP 101
+#define MSH_MTYPE_INSTANCE_ADDRESS 101
-#define MSH_MTYPE_INSTANCE_ADDRESS 102
-
#endif /* MTYPES_H_ */
Added: msh/src/reduce.c
===================================================================
--- msh/src/reduce.c (rev 0)
+++ msh/src/reduce.c 2013-07-18 13:49:32 UTC (rev 28168)
@@ -0,0 +1,197 @@
+/**
+ * @file reduce.c
+ * @brief functionality for the reduction operations
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#include "common.h"
+#include <math.h>
+#include <mpi.h>
+#include "addressmap.h"
+#include "mshd.h"
+#include "mtypes.h"
+
+/**
+ * Send our addressmap to the instance with the given rank
+ *
+ * @param instance the rank of the instance to which our addressmap has to be
sent
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+static int
+send_addressmap (int instance)
+{
+ struct MSH_MSG_InstanceAdresses **iaddr_msgs;
+ MPI_Request *send_reqs;
+ MPI_Status *stats;
+ int nmsg;
+ int cnt;
+ int ret;
+
+ ret = MSH_SYSERR;
+ send_reqs = NULL;
+ if (MSH_SYSERR == (nmsg = addressmap_pack (addrmap, &iaddr_msgs)))
+ {
+ MSH_break (0);
+ return ret;
+ }
+ send_reqs = MSH_malloc (sizeof (MPI_Request) * nmsg);
+ for (cnt = 0; cnt < nmsg; cnt++)
+ {
+ if (MPI_SUCCESS !=
+ MPI_Isend (iaddr_msgs[cnt], ntohs (iaddr_msgs[cnt]->header.size),
MPI_BYTE,
+ instance, MSH_MTYPE_INSTANCE_ADDRESS, MPI_COMM_WORLD,
+ &send_reqs[cnt]))
+ {
+ MSH_break (0);
+ cnt--;
+ goto cleanup;
+ }
+ }
+ stats = MSH_malloc (sizeof (MPI_Status) * nmsg);
+ if (MPI_SUCCESS != MPI_Waitall (nmsg, send_reqs, stats))
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ for (cnt = 0; cnt < nmsg; cnt++)
+ {
+ if (MPI_SUCCESS != stats[cnt].MPI_ERROR)
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ }
+ ret = MSH_OK;
+
+ cleanup:
+ for (;(NULL != send_reqs) && (cnt >= 0); cnt--)
+ {
+ MSH_break (MPI_SUCCESS == MPI_Cancel (&send_reqs[cnt]));
+ MSH_break (MPI_SUCCESS == MPI_Wait (&send_reqs[cnt], MPI_STATUS_IGNORE));
+ }
+ for (cnt = 0; cnt < nmsg; cnt++)
+ free (iaddr_msgs[cnt]);
+ MSH_free_non_null (iaddr_msgs);
+ MSH_free_non_null (send_reqs);
+ MSH_free_non_null (stats);
+ return ret;
+}
+
+
+/**
+ * Receive address maps during a given reduction step
+ *
+ * @param step the current reduction step
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+static int
+receive_addressmap (unsigned int step)
+{
+
+ struct MSH_MSG_InstanceAdresses *msg;
+ MPI_Status stat;
+ unsigned int step_width;
+ unsigned int lb;
+ unsigned int ub;
+ unsigned int cnt;
+ int nrecv;
+ int ret;
+ int msize;
+
+ step_width = (unsigned int) pow (rwidth, step);
+ lb = 0;
+ ub = 0;
+ ret = MSH_SYSERR;
+ if (rank + step_width <= nproc)
+ {
+ nrecv = rwidth;
+ lb = rank + 1;
+ ub = rank + step_width - 1;
+ }
+ else
+ {
+ nrecv = ((nproc - rank) - 1) / step_width;
+ if (0 != nrecv)
+ {
+ lb = rank + 1;
+ ub = nproc - 1;
+ }
+ }
+ MSH_assert (nrecv >= 0);
+ nrecv *= nproc;
+ for (cnt = 0; cnt < nrecv; cnt++)
+ {
+ if (MPI_SUCCESS != MPI_Probe (MPI_ANY_SOURCE, MSH_MTYPE_INSTANCE_ADDRESS,
+ MPI_COMM_WORLD, &stat))
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ if (!((lb <= stat.MPI_SOURCE ) && (stat.MPI_SOURCE <= ub)))
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ msize = 0;
+ if ((MPI_SUCCESS != MPI_Get_elements (&stat, MPI_BYTE, &msize))
+ || (msize <= 0))
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ msg = MSH_malloc (msize);
+ if (MPI_SUCCESS != MPI_Recv (msg, msize, MPI_BYTE, stat.MPI_SOURCE,
+ MSH_MTYPE_INSTANCE_ADDRESS, MPI_COMM_WORLD,
+ MPI_STATUS_IGNORE))
+ {
+ MSH_break (0);
+ goto cleanup;
+ }
+ LOG_DEBUG ("%d: [%u/%d] Received an instance address from %d\n", rank,
+ cnt + 1, nrecv, stat.MPI_SOURCE);
+ if (0 >= addressmap_intersect_msg (addrmap, msg))
+ {
+ LOG_ERROR ("%d: No common addresses found for instance %d\n", rank,
+ ntohs (msg->rank));
+ goto cleanup;
+ }
+ free (msg);
+ msg = NULL;
+ }
+
+ ret = MSH_OK;
+
+ cleanup:
+ MSH_free_non_null (msg);
+ return ret;
+}
+
+
+/**
+ * Perform an ntree reduction on address maps
+ *
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+int
+reduce_ntree ()
+{
+ unsigned int step;
+ unsigned int max_steps;
+ unsigned int aggregator;
+ unsigned int step_width;
+
+ max_steps = (unsigned int) ceil (log ((double) nproc) / log ((double)
rwidth) );
+ for (step = 1; step < max_steps; step++)
+ {
+ if (MPI_SUCCESS != MPI_Barrier (MPI_COMM_WORLD))
+ return MSH_SYSERR;
+ step_width = (unsigned int) pow (rwidth, step);
+ if (0 != (aggregator = (rank % step_width)))
+ {
+ aggregator = rank - aggregator;
+ return send_addressmap (aggregator);
+ }
+ /* receive address maps */
+ receive_addressmap (step);
+ }
+}
Added: msh/src/reduce.h
===================================================================
--- msh/src/reduce.h (rev 0)
+++ msh/src/reduce.h 2013-07-18 13:49:32 UTC (rev 28168)
@@ -0,0 +1,21 @@
+/**
+ * @file reduce.h
+ * @brief interface for the reduction operations
+ * @author Sree Harsha Totakura <address@hidden>
+ */
+
+#ifndef REDUCE_H_
+#define REDUCE_H_
+
+#include "common.h"
+
+/**
+ * Perform an ntree reduction on address maps
+ *
+ * @return MSH_OK upon success; MSH_SYSERR upon failure
+ */
+int
+reduce_ntree ();
+
+#endif /* REDUCE_H_ */
+/* End of reduce.h */
Modified: msh/src/test_addressmap.c
===================================================================
--- msh/src/test_addressmap.c 2013-07-18 13:47:39 UTC (rev 28167)
+++ msh/src/test_addressmap.c 2013-07-18 13:49:32 UTC (rev 28168)
@@ -33,19 +33,16 @@
static void
test_serialization (AddressMap *m)
{
- struct MSH_MSG_AddressMap *map_msg;
struct MSH_MSG_InstanceAdresses **iaddr_msgs;
- unsigned int n;
+ int n;
unsigned int cnt;
- MSH_assert (MSH_OK == addressmap_pack (m, &map_msg, &iaddr_msgs));
- n = ntohs (map_msg->num);
+ MSH_assert (0 < (n = addressmap_pack (m, &iaddr_msgs)));
for (cnt = 0; cnt < n; cnt++)
{
free (iaddr_msgs[cnt]);
}
free (iaddr_msgs);
- free (map_msg);
}
@@ -91,9 +88,8 @@
MSH_assert (MSH_OK ==
addressmap_iterate_instance_addresses (m2, 0, &iterator_cb,
NULL));
MSH_assert (1 == cnt);
- MSH_assert (MSH_OK == addressmap_pack (m2, &map_msg, &iaddr_msgs));
+ MSH_assert (0 < addressmap_pack (m2, &iaddr_msgs));
addressmap_destroy (m2);
- free (map_msg);
}
/* check intersection of the message with the first addressmap */
MSH_assert (3 == addressmap_intersect_msg (m, iaddr_msgs[1]));
@@ -147,6 +143,7 @@
instance_address_info_destroy (iainfo);
/* test message serialization of the addressmap */
+ MSH_assert (MSH_OK == addressmap_add (m, 1, 0, 99));
test_serialization (m);
addressmap_destroy (m);
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28168 - msh/src,
gnunet <=