gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] [gnunet] branch master updated (91f7da525 -> 2408ee6b2)


From: gnunet
Subject: [GNUnet-SVN] [gnunet] branch master updated (91f7da525 -> 2408ee6b2)
Date: Thu, 11 Oct 2018 14:48:51 +0200

This is an automated email from the git hooks/post-receive script.

julius-buenger pushed a change to branch master
in repository gnunet.

    from 91f7da525 Merge branch 'master' of gnunet.org:gnunet
     new 0cdd1af62 RPS api: Clean code and logging
     new fbbf0db19 RPS API: Add creation, deletion of Subs
     new 261dd70aa RPS Tests: Refactor (run post_test before disconnect)
     new 2408ee6b2 RPS Tests: Test Subs

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/rps/.gitignore           |   9 +-
 src/rps/Makefile.am          |  32 +-
 src/rps/gnunet-service-rps.c | 974 ++++++++++++++++++++++++-------------------
 src/rps/rps.h                |  44 ++
 src/rps/rps_api.c            | 125 ++++--
 src/rps/test_rps.c           |  82 +++-
 src/rps/test_rps.conf        |   2 +-
 7 files changed, 779 insertions(+), 489 deletions(-)

diff --git a/src/rps/.gitignore b/src/rps/.gitignore
index cb14f5b09..9e78e2ca0 100644
--- a/src/rps/.gitignore
+++ b/src/rps/.gitignore
@@ -1,15 +1,16 @@
 gnunet-service-rps
 gnunet-rps
 gnunet-rps-profiler
-test_rps_malicious_1
-test_rps_malicious_2
-test_rps_malicious_3
+test_rps_single_req
 test_rps_req_cancel
+test_rps_sub
 test_rps_seed_big
 test_rps_seed_request
-test_rps_single_req
 test_service_rps_custommap
 test_service_rps_sampler_elem
 test_service_rps_view
 test_rps_churn
 test_service_rps_peers
+test_rps_malicious_1
+test_rps_malicious_2
+test_rps_malicious_3
diff --git a/src/rps/Makefile.am b/src/rps/Makefile.am
index e973bb7ca..8d2ddf7d7 100644
--- a/src/rps/Makefile.am
+++ b/src/rps/Makefile.am
@@ -79,14 +79,15 @@ check_PROGRAMS = \
  test_service_rps_view \
  test_service_rps_custommap \
  test_service_rps_sampler_elem \
- test_rps_malicious_1 \
- test_rps_malicious_2 \
- test_rps_malicious_3 \
- test_rps_seed_request \
  test_rps_single_req \
  test_rps_req_cancel \
+ test_rps_sub \
+ test_rps_seed_request \
  test_rps_seed_big \
- test_rps_churn
+ test_rps_churn \
+ test_rps_malicious_1 \
+ test_rps_malicious_2 \
+ test_rps_malicious_3
 endif
 
 rps_test_src = \
@@ -125,15 +126,6 @@ test_service_rps_sampler_elem_SOURCES = \
   test_service_rps_sampler_elem.c
 test_service_rps_sampler_elem_LDADD = $(top_builddir)/src/util/libgnunetutil.la
 
-test_rps_malicious_1_SOURCES = $(rps_test_src)
-test_rps_malicious_1_LDADD = $(ld_rps_test_lib)
-
-test_rps_malicious_2_SOURCES = $(rps_test_src)
-test_rps_malicious_2_LDADD = $(ld_rps_test_lib)
-
-test_rps_malicious_3_SOURCES = $(rps_test_src)
-test_rps_malicious_3_LDADD = $(ld_rps_test_lib)
-
 test_rps_single_req_SOURCES = $(rps_test_src)
 test_rps_single_req_LDADD = $(ld_rps_test_lib)
 
@@ -143,12 +135,24 @@ test_rps_seed_request_LDADD = $(ld_rps_test_lib)
 test_rps_req_cancel_SOURCES = $(rps_test_src)
 test_rps_req_cancel_LDADD = $(ld_rps_test_lib)
 
+test_rps_sub_SOURCES = $(rps_test_src)
+test_rps_sub_LDADD = $(ld_rps_test_lib)
+
 test_rps_seed_big_SOURCES = $(rps_test_src)
 test_rps_seed_big_LDADD = $(ld_rps_test_lib)
 
 test_rps_churn_SOURCES = $(rps_test_src)
 test_rps_churn_LDADD = $(ld_rps_test_lib)
 
+test_rps_malicious_1_SOURCES = $(rps_test_src)
+test_rps_malicious_1_LDADD = $(ld_rps_test_lib)
+
+test_rps_malicious_2_SOURCES = $(rps_test_src)
+test_rps_malicious_2_LDADD = $(ld_rps_test_lib)
+
+test_rps_malicious_3_SOURCES = $(rps_test_src)
+test_rps_malicious_3_LDADD = $(ld_rps_test_lib)
+
 gnunet_rps_profiler_SOURCES = \
  gnunet-service-rps_sampler_elem.h gnunet-service-rps_sampler_elem.c \
  rps-sampler_common.h rps-sampler_common.c \
diff --git a/src/rps/gnunet-service-rps.c b/src/rps/gnunet-service-rps.c
index d1c169239..b41a77074 100644
--- a/src/rps/gnunet-service-rps.c
+++ b/src/rps/gnunet-service-rps.c
@@ -36,11 +36,10 @@
 
 #include <math.h>
 #include <inttypes.h>
+#include <string.h>
 
 #define LOG(kind, ...) GNUNET_log(kind, __VA_ARGS__)
 
-// TODO modify @brief in every file
-
 // TODO check for overflows
 
 // TODO align message structs
@@ -149,9 +148,9 @@ struct ChannelCtx;
 struct PeerContext
 {
   /**
-   * The SubSampler this context belongs to.
+   * The Sub this context belongs to.
    */
-  struct SubSampler *ss;
+  struct Sub *sub;
 
   /**
    * Message queue open to client
@@ -280,24 +279,17 @@ struct AttackedPeer
 #endif /* ENABLE_MALICIOUS */
 
 /**
- * @brief One SubSampler.
+ * @brief One Sub.
  *
  * Essentially one instance of brahms that only connects to other instances
  * with the same (secret) value.
  */
-struct SubSampler
+struct Sub
 {
   /**
-   * @brief Port used for cadet.
-   *
-   * Don't compute multiple times through making it global
-   */
-  struct GNUNET_HashCode port;
-
-  /**
-   * Handler to CADET.
+   * @brief Hash of the shared value that defines Subs.
    */
-  struct GNUNET_CADET_Handle *cadet_handle;
+  struct GNUNET_HashCode hash;
 
   /**
    * @brief Port to communicate to other peers.
@@ -417,6 +409,11 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
 struct GNUNET_STATISTICS_Handle *stats;
 
 /**
+ * Handler to CADET.
+ */
+struct GNUNET_CADET_Handle *cadet_handle;
+
+/**
  * Our own identity.
  */
 static struct GNUNET_PeerIdentity own_identity;
@@ -516,12 +513,12 @@ static uint32_t push_limit = 10000;
 #endif /* ENABLE_MALICIOUS */
 
 /**
- * @brief Main SubSampler.
+ * @brief Main Sub.
  *
  * This is run in any case by all peers and connects to all peers without
  * specifying a shared value.
  */
-static struct SubSampler *mss;
+static struct Sub *msub;
 
 /**
  * @brief Maximum number of valid peers to keep.
@@ -529,12 +526,18 @@ static struct SubSampler *mss;
  */
 static const uint32_t num_valid_peers_max = UINT32_MAX;
 
-
 /***********************************************************************
  * /Globals
 ***********************************************************************/
 
 
+static void
+do_round (void *cls);
+
+static void
+do_mal_round (void *cls);
+
+
 /**
  * @brief Get the #PeerContext associated with a peer
  *
@@ -586,29 +589,29 @@ check_peer_known (const struct 
GNUNET_CONTAINER_MultiPeerMap *peer_map,
 /**
  * @brief Create a new #PeerContext and insert it into the peer map
  *
- * @param ss The SubSampler this context belongs to.
+ * @param sub The Sub this context belongs to.
  * @param peer the peer to create the #PeerContext for
  *
  * @return the #PeerContext
  */
 static struct PeerContext *
-create_peer_ctx (struct SubSampler *ss,
+create_peer_ctx (struct Sub *sub,
                  const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *ctx;
   int ret;
 
-  GNUNET_assert (GNUNET_NO == check_peer_known (ss->peer_map, peer));
+  GNUNET_assert (GNUNET_NO == check_peer_known (sub->peer_map, peer));
 
   ctx = GNUNET_new (struct PeerContext);
   ctx->peer_id = *peer;
-  ctx->ss = ss;
-  ret = GNUNET_CONTAINER_multipeermap_put (ss->peer_map, peer, ctx,
+  ctx->sub = sub;
+  ret = GNUNET_CONTAINER_multipeermap_put (sub->peer_map, peer, ctx,
       GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
   GNUNET_assert (GNUNET_OK == ret);
   GNUNET_STATISTICS_set (stats,
                         "# known peers",
-                        GNUNET_CONTAINER_multipeermap_size (ss->peer_map),
+                        GNUNET_CONTAINER_multipeermap_size (sub->peer_map),
                         GNUNET_NO);
   return ctx;
 }
@@ -617,20 +620,20 @@ create_peer_ctx (struct SubSampler *ss,
 /**
  * @brief Create or get a #PeerContext
  *
- * @param ss The SubSampler to which the created context belongs to
+ * @param sub The Sub to which the created context belongs to
  * @param peer the peer to get the associated context to
  *
  * @return the context
  */
 static struct PeerContext *
-create_or_get_peer_ctx (struct SubSampler *ss,
+create_or_get_peer_ctx (struct Sub *sub,
                         const struct GNUNET_PeerIdentity *peer)
 {
-  if (GNUNET_NO == check_peer_known (ss->peer_map, peer))
+  if (GNUNET_NO == check_peer_known (sub->peer_map, peer))
   {
-    return create_peer_ctx (ss, peer);
+    return create_peer_ctx (sub, peer);
   }
-  return get_peer_ctx (ss->peer_map, peer);
+  return get_peer_ctx (sub->peer_map, peer);
 }
 
 
@@ -648,13 +651,13 @@ static int
 check_connected (struct PeerContext *peer_ctx)
 {
   /* If we don't know about this peer we don't know whether it's online */
-  if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
+  if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
                                      &peer_ctx->peer_id))
   {
     return GNUNET_NO;
   }
   /* Get the context */
-  peer_ctx = get_peer_ctx (peer_ctx->ss->peer_map, &peer_ctx->peer_id);
+  peer_ctx = get_peer_ctx (peer_ctx->sub->peer_map, &peer_ctx->peer_id);
   /* If we have no channel to this peer we don't know whether it's online */
   if ( (NULL == peer_ctx->send_channel_ctx) &&
        (NULL == peer_ctx->recv_channel_ctx) )
@@ -943,10 +946,10 @@ get_channel (struct PeerContext *peer_ctx)
     *ctx_peer = peer_ctx->peer_id;
     peer_ctx->send_channel_ctx = add_channel_ctx (peer_ctx);
     peer_ctx->send_channel_ctx->channel =
-      GNUNET_CADET_channel_create (peer_ctx->ss->cadet_handle,
+      GNUNET_CADET_channel_create (cadet_handle,
                                    peer_ctx->send_channel_ctx, /* context */
                                    &peer_ctx->peer_id,
-                                   &peer_ctx->ss->port,
+                                   &peer_ctx->sub->hash,
                                    GNUNET_CADET_OPTION_RELIABLE,
                                    NULL, /* WindowSize handler */
                                    &cleanup_destroyed_channel, /* Disconnect 
handler */
@@ -1048,7 +1051,7 @@ mq_online_check_successful (void *cls)
     remove_pending_message (peer_ctx->online_check_pending, GNUNET_YES);
     peer_ctx->online_check_pending = NULL;
     set_peer_online (peer_ctx);
-    (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers);
+    (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
   }
 }
 
@@ -1187,9 +1190,9 @@ static int
 destroy_peer (struct PeerContext *peer_ctx)
 {
   GNUNET_assert (NULL != peer_ctx);
-  GNUNET_assert (NULL != peer_ctx->ss->peer_map);
+  GNUNET_assert (NULL != peer_ctx->sub->peer_map);
   if (GNUNET_NO ==
-      GNUNET_CONTAINER_multipeermap_contains (peer_ctx->ss->peer_map,
+      GNUNET_CONTAINER_multipeermap_contains (peer_ctx->sub->peer_map,
                                               &peer_ctx->peer_id))
   {
     return GNUNET_NO;
@@ -1259,15 +1262,15 @@ destroy_peer (struct PeerContext *peer_ctx)
   }
 
   if (GNUNET_YES !=
-      GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->ss->peer_map,
+      GNUNET_CONTAINER_multipeermap_remove_all (peer_ctx->sub->peer_map,
                                                 &peer_ctx->peer_id))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
-         "removing peer from peer_ctx->ss->peer_map failed\n");
+         "removing peer from peer_ctx->sub->peer_map failed\n");
   }
   GNUNET_STATISTICS_set (stats,
                         "# known peers",
-                        GNUNET_CONTAINER_multipeermap_size 
(peer_ctx->ss->peer_map),
+                        GNUNET_CONTAINER_multipeermap_size 
(peer_ctx->sub->peer_map),
                         GNUNET_NO);
   GNUNET_free (peer_ctx);
   return GNUNET_YES;
@@ -1288,10 +1291,10 @@ peermap_clear_iterator (void *cls,
                         const struct GNUNET_PeerIdentity *key,
                         void *value)
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
   (void) value;
 
-  destroy_peer (get_peer_ctx (ss->peer_map, key));
+  destroy_peer (get_peer_ctx (sub->peer_map, key));
   return GNUNET_YES;
 }
 
@@ -1366,36 +1369,36 @@ store_peer_presistently_iterator (void *cls,
 /**
  * @brief Store the peers currently in #valid_peers to disk.
  *
- * @param ss SubSampler for which to store the valid peers
+ * @param sub Sub for which to store the valid peers
  */
 static void
-store_valid_peers (const struct SubSampler *ss)
+store_valid_peers (const struct Sub *sub)
 {
   struct GNUNET_DISK_FileHandle *fh;
   uint32_t number_written_peers;
   int ret;
 
-  if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7))
+  if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
   {
     return;
   }
 
-  ret = GNUNET_DISK_directory_create_for_file (ss->filename_valid_peers);
+  ret = GNUNET_DISK_directory_create_for_file (sub->filename_valid_peers);
   if (GNUNET_SYSERR == ret)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Not able to create directory for file `%s'\n",
-        ss->filename_valid_peers);
+        sub->filename_valid_peers);
     GNUNET_break (0);
   }
   else if (GNUNET_NO == ret)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Directory for file `%s' exists but is not writable for us\n",
-        ss->filename_valid_peers);
+        sub->filename_valid_peers);
     GNUNET_break (0);
   }
-  fh = GNUNET_DISK_file_open (ss->filename_valid_peers,
+  fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
                               GNUNET_DISK_OPEN_WRITE |
                                   GNUNET_DISK_OPEN_CREATE,
                               GNUNET_DISK_PERM_USER_READ |
@@ -1404,19 +1407,19 @@ store_valid_peers (const struct SubSampler *ss)
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Not able to write valid peers to file `%s'\n",
-        ss->filename_valid_peers);
+        sub->filename_valid_peers);
     return;
   }
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Writing %u valid peers to disk\n",
-      GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
+      GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
   number_written_peers =
-    GNUNET_CONTAINER_multipeermap_iterate (ss->valid_peers,
+    GNUNET_CONTAINER_multipeermap_iterate (sub->valid_peers,
                                            store_peer_presistently_iterator,
                                            fh);
   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
   GNUNET_assert (number_written_peers ==
-      GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
+      GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
 }
 
 
@@ -1469,10 +1472,10 @@ s2i_full (const char *string_repr)
 /**
  * @brief Restore the peers on disk to #valid_peers.
  *
- * @param ss SubSampler for which to restore the valid peers
+ * @param sub Sub for which to restore the valid peers
  */
 static void
-restore_valid_peers (const struct SubSampler *ss)
+restore_valid_peers (const struct Sub *sub)
 {
   off_t file_size;
   uint32_t num_peers;
@@ -1483,16 +1486,16 @@ restore_valid_peers (const struct SubSampler *ss)
   char *str_repr;
   const struct GNUNET_PeerIdentity *peer;
 
-  if (0 == strncmp ("DISABLE", ss->filename_valid_peers, 7))
+  if (0 == strncmp ("DISABLE", sub->filename_valid_peers, 7))
   {
     return;
   }
 
-  if (GNUNET_OK != GNUNET_DISK_file_test (ss->filename_valid_peers))
+  if (GNUNET_OK != GNUNET_DISK_file_test (sub->filename_valid_peers))
   {
     return;
   }
-  fh = GNUNET_DISK_file_open (ss->filename_valid_peers,
+  fh = GNUNET_DISK_file_open (sub->filename_valid_peers,
                               GNUNET_DISK_OPEN_READ,
                               GNUNET_DISK_PERM_NONE);
   GNUNET_assert (NULL != fh);
@@ -1504,13 +1507,13 @@ restore_valid_peers (const struct SubSampler *ss)
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Restoring %" PRIu32 " peers from file `%s'\n",
       num_peers,
-      ss->filename_valid_peers);
+      sub->filename_valid_peers);
   for (iter_buf = buf; iter_buf < buf + file_size - 1; iter_buf += 53)
   {
     str_repr = GNUNET_strndup (iter_buf, 53);
     peer = s2i_full (str_repr);
     GNUNET_free (str_repr);
-    add_valid_peer (peer, ss->valid_peers);
+    add_valid_peer (peer, sub->valid_peers);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "Restored valid peer %s from disk\n",
         GNUNET_i2s_full (peer));
@@ -1518,10 +1521,10 @@ restore_valid_peers (const struct SubSampler *ss)
   iter_buf = NULL;
   GNUNET_free (buf);
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-      "num_peers: %" PRIu32 ", _size (ss->valid_peers): %u\n",
+      "num_peers: %" PRIu32 ", _size (sub->valid_peers): %u\n",
       num_peers,
-      GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
-  if (num_peers != GNUNET_CONTAINER_multipeermap_size (ss->valid_peers))
+      GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
+  if (num_peers != GNUNET_CONTAINER_multipeermap_size (sub->valid_peers))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Number of restored peers does not match file size. Have probably 
duplicates.\n");
@@ -1529,33 +1532,33 @@ restore_valid_peers (const struct SubSampler *ss)
   GNUNET_assert (GNUNET_OK == GNUNET_DISK_file_close (fh));
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "Restored %u valid peers from disk\n",
-      GNUNET_CONTAINER_multipeermap_size (ss->valid_peers));
+      GNUNET_CONTAINER_multipeermap_size (sub->valid_peers));
 }
 
 
 /**
  * @brief Delete storage of peers that was created with #initialise_peers ()
  *
- * @param ss SubSampler for which the storage is deleted
+ * @param sub Sub for which the storage is deleted
  */
 static void
-peers_terminate (struct SubSampler *ss)
+peers_terminate (struct Sub *sub)
 {
   if (GNUNET_SYSERR ==
-      GNUNET_CONTAINER_multipeermap_iterate (ss->peer_map,
+      GNUNET_CONTAINER_multipeermap_iterate (sub->peer_map,
                                              &peermap_clear_iterator,
-                                             ss))
+                                             sub))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
         "Iteration destroying peers was aborted.\n");
   }
-  GNUNET_CONTAINER_multipeermap_destroy (ss->peer_map);
-  ss->peer_map = NULL;
-  store_valid_peers (ss);
-  GNUNET_free (ss->filename_valid_peers);
-  ss->filename_valid_peers = NULL;
-  GNUNET_CONTAINER_multipeermap_destroy (ss->valid_peers);
-  ss->valid_peers = NULL;
+  GNUNET_CONTAINER_multipeermap_destroy (sub->peer_map);
+  sub->peer_map = NULL;
+  store_valid_peers (sub);
+  GNUNET_free (sub->filename_valid_peers);
+  sub->filename_valid_peers = NULL;
+  GNUNET_CONTAINER_multipeermap_destroy (sub->valid_peers);
+  sub->valid_peers = NULL;
 }
 
 
@@ -1615,21 +1618,21 @@ get_valid_peers (const struct 
GNUNET_CONTAINER_MultiPeerMap *valid_peers,
  * This function is called on new peer_ids from 'external' sources
  * (client seed, cadet get_peers(), ...)
  *
- * @param ss SubSampler with the peer map that the @a peer will be added to
+ * @param sub Sub with the peer map that the @a peer will be added to
  * @param peer the new #GNUNET_PeerIdentity
  *
  * @return #GNUNET_YES if peer was inserted
  *         #GNUNET_NO  otherwise
  */
 static int
-insert_peer (struct SubSampler *ss,
+insert_peer (struct Sub *sub,
              const struct GNUNET_PeerIdentity *peer)
 {
-  if (GNUNET_YES == check_peer_known (ss->peer_map, peer))
+  if (GNUNET_YES == check_peer_known (sub->peer_map, peer))
   {
     return GNUNET_NO; /* We already know this peer - nothing to do */
   }
-  (void) create_peer_ctx (ss, peer);
+  (void) create_peer_ctx (sub, peer);
   return GNUNET_YES;
 }
 
@@ -1665,20 +1668,20 @@ check_peer_flag (const struct 
GNUNET_CONTAINER_MultiPeerMap *peer_map,
  *
  * If not known yet, insert into known peers
  *
- * @param ss SubSampler which would contain the @a peer
+ * @param sub Sub which would contain the @a peer
  * @param peer the peer whose online is to be checked
  * @return #GNUNET_YES if the check was issued
  *         #GNUNET_NO  otherwise
  */
 static int
-issue_peer_online_check (struct SubSampler *ss,
+issue_peer_online_check (struct Sub *sub,
                          const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *peer_ctx;
 
-  (void) insert_peer (ss, peer); // TODO even needed?
-  peer_ctx = get_peer_ctx (ss->peer_map, peer);
-  if ( (GNUNET_NO == check_peer_flag (ss->peer_map, peer, Peers_ONLINE)) &&
+  (void) insert_peer (sub, peer); // TODO even needed?
+  peer_ctx = get_peer_ctx (sub->peer_map, peer);
+  if ( (GNUNET_NO == check_peer_flag (sub->peer_map, peer, Peers_ONLINE)) &&
        (NULL == peer_ctx->online_check_pending) )
   {
     check_peer_online (peer_ctx);
@@ -1704,7 +1707,7 @@ issue_peer_online_check (struct SubSampler *ss,
 static int
 check_removable (const struct PeerContext *peer_ctx)
 {
-  if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains 
(peer_ctx->ss->peer_map,
+  if (GNUNET_NO == GNUNET_CONTAINER_multipeermap_contains 
(peer_ctx->sub->peer_map,
                                                            &peer_ctx->peer_id))
   {
     return GNUNET_SYSERR;
@@ -1749,7 +1752,7 @@ check_peer_valid (const struct 
GNUNET_CONTAINER_MultiPeerMap *valid_peers,
 static void
 indicate_sending_intention (struct PeerContext *peer_ctx)
 {
-  GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map,
+  GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
                                                  &peer_ctx->peer_id));
   (void) get_channel (peer_ctx);
 }
@@ -1778,7 +1781,7 @@ check_peer_send_intention (const struct PeerContext 
*peer_ctx)
 /**
  * Handle the channel a peer opens to us.
  *
- * @param cls The closure - SubSampler
+ * @param cls The closure - Sub
  * @param channel The channel the peer wants to establish
  * @param initiator The peer's peer ID
  *
@@ -1793,22 +1796,22 @@ handle_inbound_channel (void *cls,
   struct PeerContext *peer_ctx;
   struct GNUNET_PeerIdentity *ctx_peer;
   struct ChannelCtx *channel_ctx;
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
       "New channel was established to us (Peer %s).\n",
       GNUNET_i2s (initiator));
   GNUNET_assert (NULL != channel); /* according to cadet API */
   /* Make sure we 'know' about this peer */
-  peer_ctx = create_or_get_peer_ctx (ss, initiator);
+  peer_ctx = create_or_get_peer_ctx (sub, initiator);
   set_peer_online (peer_ctx);
-  (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->ss->valid_peers);
+  (void) add_valid_peer (&peer_ctx->peer_id, peer_ctx->sub->valid_peers);
   ctx_peer = GNUNET_new (struct GNUNET_PeerIdentity);
   *ctx_peer = *initiator;
   channel_ctx = add_channel_ctx (peer_ctx);
   channel_ctx->channel = channel;
   /* We only accept one incoming channel per peer */
-  if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (ss->peer_map,
+  if (GNUNET_YES == check_peer_send_intention (get_peer_ctx (sub->peer_map,
                                                              initiator)))
   {
     LOG (GNUNET_ERROR_TYPE_WARNING,
@@ -1835,7 +1838,7 @@ handle_inbound_channel (void *cls,
 static int
 check_sending_channel_exists (const struct PeerContext *peer_ctx)
 {
-  if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
+  if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
                                      &peer_ctx->peer_id))
   { /* If no such peer exists, there is no channel */
     return GNUNET_NO;
@@ -1859,7 +1862,7 @@ check_sending_channel_exists (const struct PeerContext 
*peer_ctx)
 static int
 destroy_sending_channel (struct PeerContext *peer_ctx)
 {
-  if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
+  if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
                                      &peer_ctx->peer_id))
   {
     return GNUNET_NO;
@@ -1922,7 +1925,7 @@ schedule_operation (struct PeerContext *peer_ctx,
 {
   struct PeerPendingOp pending_op;
 
-  GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->ss->peer_map,
+  GNUNET_assert (GNUNET_YES == check_peer_known (peer_ctx->sub->peer_map,
                                                  &peer_ctx->peer_id));
 
   //TODO if ONLINE execute immediately
@@ -2010,9 +2013,9 @@ struct ClientContext
   struct GNUNET_SERVICE_Client *client;
 
   /**
-   * The #SubSampler this context belongs to
+   * The #Sub this context belongs to
    */
-  struct SubSampler *ss;
+  struct Sub *sub;
 };
 
 /**
@@ -2109,35 +2112,35 @@ insert_in_view_op (void *cls,
  *
  * Called once we know a peer is online.
  *
- * @param ss SubSampler in with the view to insert in
+ * @param sub Sub in with the view to insert in
  * @param peer the peer to insert
  *
  * @return GNUNET_OK if peer was actually inserted
  *         GNUNET_NO if peer was not inserted
  */
 static int
-insert_in_view (struct SubSampler *ss,
+insert_in_view (struct Sub *sub,
                 const struct GNUNET_PeerIdentity *peer)
 {
   struct PeerContext *peer_ctx;
   int online;
   int ret;
 
-  online = check_peer_flag (ss->peer_map, peer, Peers_ONLINE);
-  peer_ctx = get_peer_ctx (ss->peer_map, peer); // TODO indirection needed?
+  online = check_peer_flag (sub->peer_map, peer, Peers_ONLINE);
+  peer_ctx = get_peer_ctx (sub->peer_map, peer); // TODO indirection needed?
   if ( (GNUNET_NO == online) ||
        (GNUNET_SYSERR == online) ) /* peer is not even known */
   {
-    (void) issue_peer_online_check (ss, peer);
+    (void) issue_peer_online_check (sub, peer);
     (void) schedule_operation (peer_ctx, insert_in_view_op, NULL);
     return GNUNET_NO;
   }
   /* Open channel towards peer to keep connection open */
   indicate_sending_intention (peer_ctx);
-  ret = View_put (ss->view, peer);
+  ret = View_put (sub->view, peer);
   GNUNET_STATISTICS_set (stats,
                          "view size",
-                         View_size (peer_ctx->ss->view),
+                         View_size (peer_ctx->sub->view),
                          GNUNET_NO);
   return ret;
 }
@@ -2160,8 +2163,8 @@ send_view (const struct ClientContext *cli_ctx,
 
   if (NULL == view_array)
   {
-    view_size = View_size (cli_ctx->ss->view);
-    view_array = View_get_as_array (cli_ctx->ss->view);
+    view_size = View_size (cli_ctx->sub->view);
+    view_array = View_get_as_array (cli_ctx->sub->view);
   }
 
   ev = GNUNET_MQ_msg_extra (out_msg,
@@ -2210,17 +2213,17 @@ send_stream_peers (const struct ClientContext *cli_ctx,
 /**
  * @brief sends updates to clients that are interested
  *
- * @param ss Subsampler for which to notify clients
+ * @param sub Sub for which to notify clients
  */
 static void
-clients_notify_view_update (const struct SubSampler *ss)
+clients_notify_view_update (const struct Sub *sub)
 {
   struct ClientContext *cli_ctx_iter;
   uint64_t num_peers;
   const struct GNUNET_PeerIdentity *view_array;
 
-  num_peers = View_size (ss->view);
-  view_array = View_get_as_array(ss->view);
+  num_peers = View_size (sub->view);
+  view_array = View_get_as_array(sub->view);
   /* check size of view is small enough */
   if (GNUNET_MAX_MESSAGE_SIZE < num_peers)
   {
@@ -2260,7 +2263,8 @@ clients_notify_view_update (const struct SubSampler *ss)
  * @param peers the array of peers to send
  */
 static void
-clients_notify_stream_peer (uint64_t num_peers,
+clients_notify_stream_peer (const struct Sub *sub,
+                            uint64_t num_peers,
                             const struct GNUNET_PeerIdentity *peers)
                             // TODO enum StreamPeerSource)
 {
@@ -2274,7 +2278,8 @@ clients_notify_stream_peer (uint64_t num_peers,
        NULL != cli_ctx_iter;
        cli_ctx_iter = cli_ctx_iter->next)
   {
-    if (GNUNET_YES == cli_ctx_iter->stream_update)
+    if (GNUNET_YES == cli_ctx_iter->stream_update &&
+        (sub == cli_ctx_iter->sub || sub == msub))
     {
       send_stream_peers (cli_ctx_iter, num_peers, peers);
     }
@@ -2287,7 +2292,7 @@ clients_notify_stream_peer (uint64_t num_peers,
  *
  * @param ids Array of Peers to insert into view
  * @param num_peers Number of peers to insert
- * @param cls Closure - The SubSampler for which this is to be done
+ * @param cls Closure - The Sub for which this is to be done
  */
 static void
 hist_update (const struct GNUNET_PeerIdentity *ids,
@@ -2295,21 +2300,21 @@ hist_update (const struct GNUNET_PeerIdentity *ids,
              void *cls)
 {
   unsigned int i;
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
   for (i = 0; i < num_peers; i++)
   {
     int inserted;
-    inserted = insert_in_view (ss, &ids[i]);
+    inserted = insert_in_view (sub, &ids[i]);
     if (GNUNET_OK == inserted)
     {
-      clients_notify_stream_peer (1, &ids[i]);
+      clients_notify_stream_peer (sub, 1, &ids[i]);
     }
-    to_file (ss->file_name_view_log,
+    to_file (sub->file_name_view_log,
              "+%s\t(hist)",
              GNUNET_i2s_full (ids));
   }
-  clients_notify_view_update (ss);
+  clients_notify_view_update (sub);
 }
 
 
@@ -2433,16 +2438,16 @@ send_pull_reply (struct PeerContext *peer_ctx,
  *
  * Called once we know a peer is online.
  *
- * @param cls Closure - SubSampler with the pull map to insert into
+ * @param cls Closure - Sub with the pull map to insert into
  * @param peer Peer to insert
  */
 static void
 insert_in_pull_map (void *cls,
                     const struct GNUNET_PeerIdentity *peer)
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
-  CustomPeerMap_put (ss->pull_map, peer);
+  CustomPeerMap_put (sub->pull_map, peer);
 }
 
 
@@ -2452,20 +2457,20 @@ insert_in_pull_map (void *cls,
  * Called once we know a peer is online.
  * Implements #PeerOp
  *
- * @param cls Closure - SubSampler with view to insert peer into
+ * @param cls Closure - Sub with view to insert peer into
  * @param peer the peer to insert
  */
 static void
 insert_in_view_op (void *cls,
                    const struct GNUNET_PeerIdentity *peer)
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
   int inserted;
 
-  inserted = insert_in_view (ss, peer);
+  inserted = insert_in_view (sub, peer);
   if (GNUNET_OK == inserted)
   {
-    clients_notify_stream_peer (1, peer);
+    clients_notify_stream_peer (sub, 1, peer);
   }
 }
 
@@ -2474,41 +2479,41 @@ insert_in_view_op (void *cls,
  * Update sampler with given PeerID.
  * Implements #PeerOp
  *
- * @param cls Closure - SubSampler containing the sampler to insert into
+ * @param cls Closure - Sub containing the sampler to insert into
  * @param peer Peer to insert
  */
 static void
 insert_in_sampler (void *cls,
                    const struct GNUNET_PeerIdentity *peer)
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Updating samplers with peer %s from insert_in_sampler()\n",
        GNUNET_i2s (peer));
-  RPS_sampler_update (ss->sampler, peer);
-  if (0 < RPS_sampler_count_id (ss->sampler, peer))
+  RPS_sampler_update (sub->sampler, peer);
+  if (0 < RPS_sampler_count_id (sub->sampler, peer))
   {
     /* Make sure we 'know' about this peer */
-    (void) issue_peer_online_check (ss, peer);
+    (void) issue_peer_online_check (sub, peer);
     /* Establish a channel towards that peer to indicate we are going to send
      * messages to it */
     //indicate_sending_intention (peer);
   }
   #ifdef TO_FILE
-  ss->num_observed_peers++;
+  sub->num_observed_peers++;
   GNUNET_CONTAINER_multipeermap_put
-    (ss->observed_unique_peers,
+    (sub->observed_unique_peers,
      peer,
      NULL,
      GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY);
   uint32_t num_observed_unique_peers =
-    GNUNET_CONTAINER_multipeermap_size (ss->observed_unique_peers);
-  to_file (ss->file_name_observed_log,
+    GNUNET_CONTAINER_multipeermap_size (sub->observed_unique_peers);
+  to_file (sub->file_name_observed_log,
           "%" PRIu32 " %" PRIu32 " %f\n",
-          ss->num_observed_peers,
+          sub->num_observed_peers,
           num_observed_unique_peers,
-          1.0*num_observed_unique_peers/ss->num_observed_peers)
+          1.0*num_observed_unique_peers/sub->num_observed_peers)
   #endif /* TO_FILE */
 }
 
@@ -2520,20 +2525,20 @@ insert_in_sampler (void *cls,
  *
  * "External sources" refer to every source except the gossip.
  *
- * @param ss SubSampler for which @a peer was received
+ * @param sub Sub for which @a peer was received
  * @param peer peer to insert/peer received
  */
 static void
-got_peer (struct SubSampler *ss,
+got_peer (struct Sub *sub,
           const struct GNUNET_PeerIdentity *peer)
 {
   /* If we did not know this peer already, insert it into sampler and view */
-  if (GNUNET_YES == issue_peer_online_check (ss, peer))
+  if (GNUNET_YES == issue_peer_online_check (sub, peer))
   {
-    schedule_operation (get_peer_ctx (ss->peer_map, peer),
-                        &insert_in_sampler, ss);
-    schedule_operation (get_peer_ctx (ss->peer_map, peer),
-                        &insert_in_view_op, ss);
+    schedule_operation (get_peer_ctx (sub->peer_map, peer),
+                        &insert_in_sampler, sub);
+    schedule_operation (get_peer_ctx (sub->peer_map, peer),
+                        &insert_in_view_op, sub);
   }
   GNUNET_STATISTICS_update (stats,
                             "# learnd peers",
@@ -2553,22 +2558,22 @@ static int
 check_sending_channel_needed (const struct PeerContext *peer_ctx)
 {
   /* struct GNUNET_CADET_Channel *channel; */
-  if (GNUNET_NO == check_peer_known (peer_ctx->ss->peer_map,
+  if (GNUNET_NO == check_peer_known (peer_ctx->sub->peer_map,
                                      &peer_ctx->peer_id))
   {
     return GNUNET_NO;
   }
   if (GNUNET_YES == check_sending_channel_exists (peer_ctx))
   {
-    if ( (0 < RPS_sampler_count_id (peer_ctx->ss->sampler,
+    if ( (0 < RPS_sampler_count_id (peer_ctx->sub->sampler,
                                     &peer_ctx->peer_id)) ||
-         (GNUNET_YES == View_contains_peer (peer_ctx->ss->view,
+         (GNUNET_YES == View_contains_peer (peer_ctx->sub->view,
                                             &peer_ctx->peer_id)) ||
-         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->push_map,
+         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->push_map,
                                                      &peer_ctx->peer_id)) ||
-         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->ss->pull_map,
+         (GNUNET_YES == CustomPeerMap_contains_peer (peer_ctx->sub->pull_map,
                                                      &peer_ctx->peer_id)) ||
-         (GNUNET_YES == check_peer_flag (peer_ctx->ss->peer_map,
+         (GNUNET_YES == check_peer_flag (peer_ctx->sub->peer_map,
                                          &peer_ctx->peer_id,
                                          Peers_PULL_REPLY_PENDING)))
     { /* If we want to keep the connection to peer open */
@@ -2584,18 +2589,18 @@ check_sending_channel_needed (const struct PeerContext 
*peer_ctx)
  * @brief remove peer from our knowledge, the view, push and pull maps and
  * samplers.
  *
- * @param ss SubSampler with the data structures the peer is to be removed from
+ * @param sub Sub with the data structures the peer is to be removed from
  * @param peer the peer to remove
  */
 static void
-remove_peer (struct SubSampler *ss,
+remove_peer (struct Sub *sub,
              const struct GNUNET_PeerIdentity *peer)
 {
-  (void) View_remove_peer (ss->view, peer);
-  CustomPeerMap_remove_peer (ss->pull_map, peer);
-  CustomPeerMap_remove_peer (ss->push_map, peer);
-  RPS_sampler_reinitialise_by_value (ss->sampler, peer);
-  destroy_peer (get_peer_ctx (ss->peer_map, peer));
+  (void) View_remove_peer (sub->view, peer);
+  CustomPeerMap_remove_peer (sub->pull_map, peer);
+  CustomPeerMap_remove_peer (sub->push_map, peer);
+  RPS_sampler_reinitialise_by_value (sub->sampler, peer);
+  destroy_peer (get_peer_ctx (sub->peer_map, peer));
 }
 
 
@@ -2604,14 +2609,14 @@ remove_peer (struct SubSampler *ss,
  *
  * If the sending channel is no longer needed it is destroyed.
  *
- * @param ss SubSamper in which the current peer is to be cleaned
+ * @param sub Sub in which the current peer is to be cleaned
  * @param peer the peer whose data is about to be cleaned
  */
 static void
-clean_peer (struct SubSampler *ss,
+clean_peer (struct Sub *sub,
             const struct GNUNET_PeerIdentity *peer)
 {
-  if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (ss->peer_map,
+  if (GNUNET_NO == check_sending_channel_needed (get_peer_ctx (sub->peer_map,
                                                                peer)))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -2619,24 +2624,24 @@ clean_peer (struct SubSampler *ss,
         GNUNET_i2s (peer));
     #ifdef ENABLE_MALICIOUS
     if (0 != GNUNET_CRYPTO_cmp_peer_identity (&attacked_peer, peer))
-      (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer));
+      (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
     #else /* ENABLE_MALICIOUS */
-    (void) destroy_sending_channel (get_peer_ctx (ss->peer_map, peer));
+    (void) destroy_sending_channel (get_peer_ctx (sub->peer_map, peer));
     #endif /* ENABLE_MALICIOUS */
   }
 
-  if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (ss->peer_map,
+  if ( (GNUNET_NO == check_peer_send_intention (get_peer_ctx (sub->peer_map,
                                                               peer))) &&
-       (GNUNET_NO == View_contains_peer (ss->view, peer)) &&
-       (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) &&
-       (GNUNET_NO == CustomPeerMap_contains_peer (ss->push_map, peer)) &&
-       (0 == RPS_sampler_count_id (ss->sampler,   peer)) &&
-       (GNUNET_NO != check_removable (get_peer_ctx (ss->peer_map, peer))) )
+       (GNUNET_NO == View_contains_peer (sub->view, peer)) &&
+       (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
+       (GNUNET_NO == CustomPeerMap_contains_peer (sub->push_map, peer)) &&
+       (0 == RPS_sampler_count_id (sub->sampler,   peer)) &&
+       (GNUNET_NO != check_removable (get_peer_ctx (sub->peer_map, peer))) )
   { /* We can safely remove this peer */
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "Going to remove peer %s\n",
         GNUNET_i2s (peer));
-    remove_peer (ss, peer);
+    remove_peer (sub, peer);
     return;
   }
 }
@@ -2666,7 +2671,7 @@ cleanup_destroyed_channel (void *cls,
   if (NULL != peer_ctx &&
       peer_ctx->send_channel_ctx == channel_ctx)
   {
-    remove_peer (peer_ctx->ss, &peer_ctx->peer_id);
+    remove_peer (peer_ctx->sub, &peer_ctx->peer_id);
   }
 }
 
@@ -2677,35 +2682,30 @@ cleanup_destroyed_channel (void *cls,
 
 
 /***********************************************************************
- * SubSampler
+ * Sub
 ***********************************************************************/
 
 /**
- * @brief Create a new SUbSampler
+ * @brief Create a new Sub
  *
- * @param shared_value Value shared among rps instances on other hosts that
- * defines a subgroup to sample from.
+ * @param hash Hash of value shared among rps instances on other hosts that
+ *        defines a subgroup to sample from.
  * @param sampler_size Size of the sampler
  * @param round_interval Interval (in average) between two rounds
  *
- * @return SubSampler
+ * @return Sub
  */
-struct SubSampler *
-new_subsampler (const char *shared_value,
-                uint32_t sampler_size,
-                struct GNUNET_TIME_Relative round_interval)
+struct Sub *
+new_sub (const struct GNUNET_HashCode *hash,
+         uint32_t sampler_size,
+         struct GNUNET_TIME_Relative round_interval)
 {
-  struct SubSampler *ss;
-  char hash_port_string[512] = GNUNET_APPLICATION_PORT_RPS;
+  struct Sub *sub;
 
-  ss = GNUNET_new (struct SubSampler);
+  sub = GNUNET_new (struct Sub);
 
   /* With the hash generated from the secret value this service only connects
    * to rps instances that share the value */
-  strcat (hash_port_string, shared_value);
-  GNUNET_CRYPTO_hash (hash_port_string,
-                      strlen (hash_port_string),
-                      &ss->port);
   struct GNUNET_MQ_MessageHandler cadet_handlers[] = {
     GNUNET_MQ_hd_fixed_size (peer_check,
                              GNUNET_MESSAGE_TYPE_RPS_PP_CHECK_LIVE,
@@ -2725,17 +2725,16 @@ new_subsampler (const char *shared_value,
                            NULL),
     GNUNET_MQ_handler_end ()
   };
-  ss->cadet_handle = GNUNET_CADET_connect (cfg);
-  GNUNET_assert (NULL != ss->cadet_handle);
-  ss->cadet_port =
-    GNUNET_CADET_open_port (ss->cadet_handle,
-                            &ss->port,
+  sub->hash = *hash;
+  sub->cadet_port =
+    GNUNET_CADET_open_port (cadet_handle,
+                            &sub->hash,
                             &handle_inbound_channel, /* Connect handler */
-                            ss, /* cls */
+                            sub, /* cls */
                             NULL, /* WindowSize handler */
                             &cleanup_destroyed_channel, /* Disconnect handler 
*/
                             cadet_handlers);
-  if (NULL == ss->cadet_port)
+  if (NULL == sub->cadet_port)
   {
     LOG (GNUNET_ERROR_TYPE_ERROR,
         "Cadet port `%s' is already in use.\n",
@@ -2744,53 +2743,98 @@ new_subsampler (const char *shared_value,
   }
 
   /* Set up general data structure to keep track about peers */
-  ss->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
+  sub->valid_peers = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
   if (GNUNET_OK !=
       GNUNET_CONFIGURATION_get_value_filename (cfg,
                                                "rps",
                                                "FILENAME_VALID_PEERS",
-                                               &ss->filename_valid_peers))
+                                               &sub->filename_valid_peers))
   {
     GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
                                "rps",
                                "FILENAME_VALID_PEERS");
   }
-  ss->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
+  sub->peer_map = GNUNET_CONTAINER_multipeermap_create (4, GNUNET_NO);
 
   /* Set up the sampler */
-  ss->sampler_size_est_min = sampler_size;
-  ss->sampler_size_est_need = sampler_size;;
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", ss->sampler_size_est_min);
-  ss->round_interval = round_interval;
-  ss->sampler = RPS_sampler_init (sampler_size,
+  sub->sampler_size_est_min = sampler_size;
+  sub->sampler_size_est_need = sampler_size;;
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "MINSIZE is %u\n", sub->sampler_size_est_min);
+  GNUNET_assert (0 != round_interval.rel_value_us);
+  sub->round_interval = round_interval;
+  sub->sampler = RPS_sampler_init (sampler_size,
                                   round_interval);
 
   /* Logging of internals */
-  ss->file_name_view_log = store_prefix_file_name (&own_identity, "view");
+  sub->file_name_view_log = store_prefix_file_name (&own_identity, "view");
   #ifdef TO_FILE
-  ss->file_name_observed_log = store_prefix_file_name (&own_identity,
+  sub->file_name_observed_log = store_prefix_file_name (&own_identity,
                                                        "observed");
-  ss->num_observed_peers = 0;
-  ss->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
+  sub->num_observed_peers = 0;
+  sub->observed_unique_peers = GNUNET_CONTAINER_multipeermap_create (1,
                                                                     GNUNET_NO);
   #endif /* TO_FILE */
 
   /* Set up data structures for gossip */
-  ss->push_map = CustomPeerMap_create (4);
-  ss->pull_map = CustomPeerMap_create (4);
-  ss->view_size_est_min = sampler_size;;
-  ss->view = View_create (ss->view_size_est_min);
+  sub->push_map = CustomPeerMap_create (4);
+  sub->pull_map = CustomPeerMap_create (4);
+  sub->view_size_est_min = sampler_size;;
+  sub->view = View_create (sub->view_size_est_min);
   GNUNET_STATISTICS_set (stats,
                          "view size aim",
-                         ss->view_size_est_min,
+                         sub->view_size_est_min,
                          GNUNET_NO);
 
+  /* Start executing rounds */
+  sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
 
-  return ss;
+  return sub;
 }
 
+
+/**
+ * @brief Destroy Sub.
+ *
+ * @param sub Sub to destroy
+ */
+static void
+destroy_sub (struct Sub *sub)
+{
+  GNUNET_assert (NULL != sub);
+  GNUNET_assert (NULL != sub->do_round_task);
+  GNUNET_SCHEDULER_cancel (sub->do_round_task);
+  sub->do_round_task = NULL;
+
+  /* Disconnect from cadet */
+  GNUNET_CADET_close_port (sub->cadet_port);
+
+  /* Clean up data structures for peers */
+  RPS_sampler_destroy (sub->sampler);
+  sub->sampler = NULL;
+  View_destroy (sub->view);
+  sub->view = NULL;
+  CustomPeerMap_destroy (sub->push_map);
+  sub->push_map = NULL;
+  CustomPeerMap_destroy (sub->pull_map);
+  sub->pull_map = NULL;
+  peers_terminate (sub);
+
+  /* Free leftover data structures */
+  GNUNET_free (sub->file_name_view_log);
+  sub->file_name_view_log = NULL;
+#ifdef TO_FILE
+  GNUNET_free (sub->file_name_observed_log);
+  sub->file_name_observed_log = NULL;
+  GNUNET_CONTAINER_multipeermap_destroy (sub->observed_unique_peers);
+  sub->observed_unique_peers = NULL;
+#endif /* TO_FILE */
+
+  GNUNET_free (sub);
+}
+
+
 /***********************************************************************
- * /SubSampler
+ * /Sub
 ***********************************************************************/
 
 
@@ -2806,58 +2850,88 @@ destroy_cli_ctx (struct ClientContext *cli_ctx)
   GNUNET_CONTAINER_DLL_remove (cli_ctx_head,
                                cli_ctx_tail,
                                cli_ctx);
+  if (NULL != cli_ctx->sub)
+  {
+    destroy_sub (cli_ctx->sub);
+    cli_ctx->sub = NULL;
+  }
   GNUNET_free (cli_ctx);
 }
 
 
 /**
- * Function called by NSE.
+ * @brief Update sizes in sampler and view on estimate update from nse service
  *
- * Updates sizes of sampler list and view and adapt those lists
- * accordingly.
- *
- * implements #GNUNET_NSE_Callback
- *
- * @param cls Closure - SubSampler
- * @param timestamp time when the estimate was received from the server (or 
created by the server)
+ * @param sub Sub
  * @param logestimate the log(Base 2) value of the current network size 
estimate
  * @param std_dev standard deviation for the estimate
  */
 static void
-nse_callback (void *cls,
-              struct GNUNET_TIME_Absolute timestamp,
-              double logestimate, double std_dev)
+adapt_sizes (struct Sub *sub, double logestimate, double std_dev)
 {
   double estimate;
   //double scale; // TODO this might go gloabal/config
-  struct SubSampler *ss = cls;
-  (void) timestamp;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Received a ns estimate - logest: %f, std_dev: %f (old_size: %u)\n",
-       logestimate, std_dev, RPS_sampler_get_size (ss->sampler));
+       logestimate, std_dev, RPS_sampler_get_size (sub->sampler));
   //scale = .01;
   estimate = GNUNET_NSE_log_estimate_to_n (logestimate);
   // GNUNET_NSE_log_estimate_to_n (logestimate);
   estimate = pow (estimate, 1.0 / 3);
   // TODO add if std_dev is a number
   // estimate += (std_dev * scale);
-  if (ss->view_size_est_min < ceil (estimate))
+  if (sub->view_size_est_min < ceil (estimate))
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Changing estimate to %f\n", estimate);
-    ss->sampler_size_est_need = estimate;
-    ss->view_size_est_need = estimate;
+    sub->sampler_size_est_need = estimate;
+    sub->view_size_est_need = estimate;
   } else
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Not using estimate %f\n", estimate);
-    //ss->sampler_size_est_need = ss->view_size_est_min;
-    ss->view_size_est_need = ss->view_size_est_min;
+    //sub->sampler_size_est_need = sub->view_size_est_min;
+    sub->view_size_est_need = sub->view_size_est_min;
   }
-  GNUNET_STATISTICS_set (stats, "view size aim", ss->view_size_est_need, 
GNUNET_NO);
+  GNUNET_STATISTICS_set (stats, "view size aim", sub->view_size_est_need, 
GNUNET_NO);
 
   /* If the NSE has changed adapt the lists accordingly */
-  resize_wrapper (ss->sampler, ss->sampler_size_est_need);
-  View_change_len (ss->view, ss->view_size_est_need);
+  resize_wrapper (sub->sampler, sub->sampler_size_est_need);
+  View_change_len (sub->view, sub->view_size_est_need);
+}
+
+
+/**
+ * Function called by NSE.
+ *
+ * Updates sizes of sampler list and view and adapt those lists
+ * accordingly.
+ *
+ * implements #GNUNET_NSE_Callback
+ *
+ * @param cls Closure - unused
+ * @param timestamp time when the estimate was received from the server (or 
created by the server)
+ * @param logestimate the log(Base 2) value of the current network size 
estimate
+ * @param std_dev standard deviation for the estimate
+ */
+static void
+nse_callback (void *cls,
+              struct GNUNET_TIME_Absolute timestamp,
+              double logestimate, double std_dev)
+{
+  (void) cls;
+  (void) timestamp;
+  struct ClientContext *cli_ctx_iter;
+
+  adapt_sizes (msub, logestimate, std_dev);
+  for (cli_ctx_iter = cli_ctx_head;
+      NULL != cli_ctx_iter;
+      cli_ctx_iter = cli_ctx_iter->next)
+  {
+    if (NULL != cli_ctx_iter->sub)
+    {
+      adapt_sizes (cli_ctx_iter->sub, logestimate, std_dev);
+    }
+  }
 }
 
 
@@ -2881,6 +2955,10 @@ check_client_seed (void *cls, const struct 
GNUNET_RPS_CS_SeedMessage *msg)
   if ( (msize / sizeof (struct GNUNET_PeerIdentity) != num_peers) ||
        (msize % sizeof (struct GNUNET_PeerIdentity) != 0) )
   {
+    LOG (GNUNET_ERROR_TYPE_ERROR,
+        "message says it sends %" PRIu32 " peers, have space for %lu peers\n",
+        ntohl (msg->num_peers),
+        (msize / sizeof (struct GNUNET_PeerIdentity)));
     GNUNET_break (0);
     GNUNET_SERVICE_client_drop (cli_ctx->client);
     return GNUNET_SYSERR;
@@ -2918,7 +2996,8 @@ handle_client_seed (void *cls,
          i,
          GNUNET_i2s (&peers[i]));
 
-    got_peer (cli_ctx->ss, &peers[i]);
+    if (NULL != msub) got_peer (msub, &peers[i]);
+    if (NULL != cli_ctx->sub) got_peer (cli_ctx->sub, &peers[i]);
   }
   GNUNET_SERVICE_client_continue (cli_ctx->client);
 }
@@ -3013,15 +3092,63 @@ handle_client_stream_cancel (void *cls,
   (void) msg;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Client requested peers from biased stream.\n");
+       "Client canceled receiving peers from biased stream.\n");
   cli_ctx->stream_update = GNUNET_NO;
 
   GNUNET_assert (NULL != cli_ctx);
   GNUNET_SERVICE_client_continue (cli_ctx->client);
-  if (0 == cli_ctx->view_updates_left)
+}
+
+
+/**
+ * @brief Create and start a Sub.
+ *
+ * @param cls Closure - unused
+ * @param msg Message containing the necessary information
+ */
+static void
+handle_client_start_sub (void *cls,
+                         const struct GNUNET_RPS_CS_SubStartMessage *msg)
+{
+  struct ClientContext *cli_ctx = cls;
+
+  LOG (GNUNET_ERROR_TYPE_DEBUG, "Client requested start of a new sub.\n");
+  if (NULL != cli_ctx->sub &&
+      0 != memcmp (&cli_ctx->sub->hash,
+                   &msg->hash,
+                   sizeof (struct GNUNET_HashCode)))
   {
-    destroy_cli_ctx (cli_ctx);
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Already have a Sub with different share 
for this client. Remove old one, add new.\n");
+    destroy_sub (cli_ctx->sub);
+    cli_ctx->sub = NULL;
   }
+  cli_ctx->sub = new_sub (&msg->hash,
+                         msub->sampler_size_est_min, // TODO make api input?
+                         GNUNET_TIME_relative_ntoh (msg->round_interval));
+  GNUNET_SERVICE_client_continue (cli_ctx->client);
+}
+
+
+/**
+ * @brief Destroy the Sub
+ *
+ * @param cls Closure - unused
+ * @param msg Message containing the hash that identifies the Sub
+ */
+static void
+handle_client_stop_sub (void *cls,
+                        const struct GNUNET_RPS_CS_SubStopMessage *msg)
+{
+  struct ClientContext *cli_ctx = cls;
+
+  GNUNET_assert (NULL != cli_ctx->sub);
+  if (0 != memcmp (&cli_ctx->sub->hash, &msg->hash, sizeof (struct 
GNUNET_HashCode)))
+  {
+    LOG (GNUNET_ERROR_TYPE_WARNING, "Share of current sub and request 
differ!\n");
+  }
+  destroy_sub (cli_ctx->sub);
+  cli_ctx->sub = NULL;
+  GNUNET_SERVICE_client_continue (cli_ctx->client);
 }
 
 
@@ -3109,9 +3236,9 @@ handle_peer_push (void *cls,
   #endif /* ENABLE_MALICIOUS */
 
   /* Add the sending peer to the push_map */
-  CustomPeerMap_put (channel_ctx->peer_ctx->ss->push_map, peer);
+  CustomPeerMap_put (channel_ctx->peer_ctx->sub->push_map, peer);
 
-  GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map,
+  GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
                                      &channel_ctx->peer_ctx->peer_id));
   GNUNET_CADET_receive_done (channel_ctx->channel);
 }
@@ -3154,13 +3281,13 @@ handle_peer_pull_request (void *cls,
   }
   #endif /* ENABLE_MALICIOUS */
 
-  GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map,
+  GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
                                      &channel_ctx->peer_ctx->peer_id));
   GNUNET_CADET_receive_done (channel_ctx->channel);
-  view_array = View_get_as_array (channel_ctx->peer_ctx->ss->view);
+  view_array = View_get_as_array (channel_ctx->peer_ctx->sub->view);
   send_pull_reply (peer_ctx,
                    view_array,
-                   View_size (channel_ctx->peer_ctx->ss->view));
+                   View_size (channel_ctx->peer_ctx->sub->view));
 }
 
 
@@ -3196,7 +3323,7 @@ check_peer_pull_reply (void *cls,
     return GNUNET_SYSERR;
   }
 
-  if (GNUNET_YES != check_peer_flag (sender_ctx->ss->peer_map,
+  if (GNUNET_YES != check_peer_flag (sender_ctx->sub->peer_map,
                                      &sender_ctx->peer_id,
                                      Peers_PULL_REPLY_PENDING))
   {
@@ -3277,27 +3404,27 @@ handle_peer_pull_reply (void *cls,
     }
     #endif /* ENABLE_MALICIOUS */
     /* Make sure we 'know' about this peer */
-    (void) insert_peer (channel_ctx->peer_ctx->ss, &peers[i]);
+    (void) insert_peer (channel_ctx->peer_ctx->sub, &peers[i]);
 
-    if (GNUNET_YES == check_peer_valid (channel_ctx->peer_ctx->ss->valid_peers,
+    if (GNUNET_YES == check_peer_valid 
(channel_ctx->peer_ctx->sub->valid_peers,
                                         &peers[i]))
     {
-      CustomPeerMap_put (channel_ctx->peer_ctx->ss->pull_map, &peers[i]);
+      CustomPeerMap_put (channel_ctx->peer_ctx->sub->pull_map, &peers[i]);
     }
     else
     {
       schedule_operation (channel_ctx->peer_ctx,
                           insert_in_pull_map,
-                          channel_ctx->peer_ctx->ss); /* cls */
-      (void) issue_peer_online_check (channel_ctx->peer_ctx->ss, &peers[i]);
+                          channel_ctx->peer_ctx->sub); /* cls */
+      (void) issue_peer_online_check (channel_ctx->peer_ctx->sub, &peers[i]);
     }
   }
 
-  UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->ss->peer_map, sender),
+  UNSET_PEER_FLAG (get_peer_ctx (channel_ctx->peer_ctx->sub->peer_map, sender),
                    Peers_PULL_REPLY_PENDING);
-  clean_peer (channel_ctx->peer_ctx->ss, sender);
+  clean_peer (channel_ctx->peer_ctx->sub, sender);
 
-  GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->ss->peer_map,
+  GNUNET_break_op (check_peer_known (channel_ctx->peer_ctx->sub->peer_map,
                                      sender));
   GNUNET_CADET_receive_done (channel_ctx->channel);
 }
@@ -3362,7 +3489,7 @@ send_pull_request (struct PeerContext *peer_ctx)
 {
   struct GNUNET_MQ_Envelope *ev;
 
-  GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->ss->peer_map,
+  GNUNET_assert (GNUNET_NO == check_peer_flag (peer_ctx->sub->peer_map,
                                                &peer_ctx->peer_id,
                                                Peers_PULL_REPLY_PENDING));
   SET_PEER_FLAG (peer_ctx, Peers_PULL_REPLY_PENDING);
@@ -3397,12 +3524,6 @@ send_push (struct PeerContext *peer_ctx)
 }
 
 
-static void
-do_round (void *cls);
-
-static void
-do_mal_round (void *cls);
-
 #ifdef ENABLE_MALICIOUS
 
 
@@ -3452,7 +3573,7 @@ handle_client_act_malicious (void *cls,
   struct GNUNET_PeerIdentity *peers;
   uint32_t num_mal_peers_sent;
   uint32_t num_mal_peers_old;
-  struct SubSampler *ss = cli_ctx->ss;
+  struct Sub *sub = cli_ctx->sub;
 
   /* Do actual logic */
   peers = (struct GNUNET_PeerIdentity *) &msg[1];
@@ -3484,8 +3605,15 @@ handle_client_act_malicious (void *cls,
                            mal_peer_set);
 
     /* Substitute do_round () with do_mal_round () */
-    GNUNET_SCHEDULER_cancel (ss->do_round_task);
-    ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss);
+    if (NULL != sub)
+    {
+      GNUNET_SCHEDULER_cancel (sub->do_round_task);
+      sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
+    }
+    else
+    {
+      LOG (GNUNET_ERROR_TYPE_WARNING, "do_round_task is NULL, probably in 
shutdown\n");
+    }
   }
 
   else if ( (2 == mal_type) ||
@@ -3517,9 +3645,10 @@ handle_client_act_malicious (void *cls,
             &msg->attacked_peer,
             sizeof (struct GNUNET_PeerIdentity));
     /* Set the flag of the attacked peer to valid to avoid problems */
-    if (GNUNET_NO == check_peer_known (ss->peer_map, &attacked_peer))
+    if (NULL != sub &&
+        GNUNET_NO == check_peer_known (sub->peer_map, &attacked_peer))
     {
-      (void) issue_peer_online_check (ss, &attacked_peer);
+      (void) issue_peer_online_check (sub, &attacked_peer);
     }
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
@@ -3527,16 +3656,20 @@ handle_client_act_malicious (void *cls,
          GNUNET_i2s (&attacked_peer));
 
     /* Substitute do_round () with do_mal_round () */
-    GNUNET_SCHEDULER_cancel (ss->do_round_task);
-    ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, ss);
+    if (NULL != sub && NULL != sub->do_round_task)
+    {
+      /* Probably in shutdown */
+      GNUNET_SCHEDULER_cancel (sub->do_round_task);
+      sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_mal_round, sub);
+    }
   }
   else if (0 == mal_type)
   { /* Stop acting malicious */
     GNUNET_array_grow (mal_peers, num_mal_peers, 0);
 
     /* Substitute do_mal_round () with do_round () */
-    GNUNET_SCHEDULER_cancel (ss->do_round_task);
-    ss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, ss);
+    GNUNET_SCHEDULER_cancel (sub->do_round_task);
+    sub->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, sub);
   }
   else
   {
@@ -3552,7 +3685,7 @@ handle_client_act_malicious (void *cls,
  *
  * This is executed regylary.
  *
- * @param cls Closure - SubSamper
+ * @param cls Closure - Sub
  */
 static void
 do_mal_round (void *cls)
@@ -3561,12 +3694,12 @@ do_mal_round (void *cls)
   uint32_t i;
   struct GNUNET_TIME_Relative time_next_round;
   struct AttackedPeer *tmp_att_peer;
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Going to execute next round maliciously type %" PRIu32 ".\n",
       mal_type);
-  ss->do_round_task = NULL;
+  sub->do_round_task = NULL;
   GNUNET_assert (mal_type <= 3);
   /* Do malicious actions */
   if (1 == mal_type)
@@ -3589,7 +3722,7 @@ do_mal_round (void *cls)
       else
         att_peer_index = att_peer_index->next;
 
-      send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id));
+      send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
     }
 
     /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3601,7 +3734,7 @@ do_mal_round (void *cls)
       else
         att_peer_index = tmp_att_peer->next;
 
-      send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id));
+      send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
     }
   }
 
@@ -3612,11 +3745,11 @@ do_mal_round (void *cls)
      * Send as many pushes to the attacked peer as possible
      * That is one push per round as it will ignore more.
      */
-    (void) issue_peer_online_check (ss, &attacked_peer);
-    if (GNUNET_YES == check_peer_flag (ss->peer_map,
+    (void) issue_peer_online_check (sub, &attacked_peer);
+    if (GNUNET_YES == check_peer_flag (sub->peer_map,
                                        &attacked_peer,
                                        Peers_ONLINE))
-      send_push (get_peer_ctx (ss->peer_map, &attacked_peer));
+      send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
   }
 
 
@@ -3624,20 +3757,20 @@ do_mal_round (void *cls)
   { /* Combined attack */
 
     /* Send PUSH to attacked peers */
-    if (GNUNET_YES == check_peer_known (ss->peer_map, &attacked_peer))
+    if (GNUNET_YES == check_peer_known (sub->peer_map, &attacked_peer))
     {
-      (void) issue_peer_online_check (ss, &attacked_peer);
-      if (GNUNET_YES == check_peer_flag (ss->peer_map,
+      (void) issue_peer_online_check (sub, &attacked_peer);
+      if (GNUNET_YES == check_peer_flag (sub->peer_map,
                                          &attacked_peer,
                                          Peers_ONLINE))
       {
         LOG (GNUNET_ERROR_TYPE_DEBUG,
             "Goding to send push to attacked peer (%s)\n",
             GNUNET_i2s (&attacked_peer));
-        send_push (get_peer_ctx (ss->peer_map, &attacked_peer));
+        send_push (get_peer_ctx (sub->peer_map, &attacked_peer));
       }
     }
-    (void) issue_peer_online_check (ss, &attacked_peer);
+    (void) issue_peer_online_check (sub, &attacked_peer);
 
     /* The maximum of pushes we're going to send this round */
     num_pushes = GNUNET_MIN (GNUNET_MIN (push_limit - 1,
@@ -3655,7 +3788,7 @@ do_mal_round (void *cls)
       else
         att_peer_index = att_peer_index->next;
 
-      send_push (get_peer_ctx (ss->peer_map, &att_peer_index->peer_id));
+      send_push (get_peer_ctx (sub->peer_map, &att_peer_index->peer_id));
     }
 
     /* Send PULLs to some peers to learn about additional peers to attack */
@@ -3667,18 +3800,16 @@ do_mal_round (void *cls)
       else
         att_peer_index = tmp_att_peer->next;
 
-      send_pull_request (get_peer_ctx (ss->peer_map, &tmp_att_peer->peer_id));
+      send_pull_request (get_peer_ctx (sub->peer_map, &tmp_att_peer->peer_id));
     }
   }
 
   /* Schedule next round */
-  time_next_round = compute_rand_delay (ss->round_interval, 2);
+  time_next_round = compute_rand_delay (sub->round_interval, 2);
 
-  //ss->do_round_task = GNUNET_SCHEDULER_add_delayed (ss->round_interval, 
&do_mal_round,
-  //NULL);
-  GNUNET_assert (NULL == ss->do_round_task);
-  ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
-                                                    &do_mal_round, ss);
+  GNUNET_assert (NULL == sub->do_round_task);
+  sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
+                                                    &do_mal_round, sub);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
 }
 #endif /* ENABLE_MALICIOUS */
@@ -3688,7 +3819,7 @@ do_mal_round (void *cls)
  *
  * This is executed regylary.
  *
- * @param cls Closure - SubSampler
+ * @param cls Closure - Sub
  */
 static void
 do_round (void *cls)
@@ -3702,66 +3833,66 @@ do_round (void *cls)
   uint32_t second_border;
   struct GNUNET_PeerIdentity peer;
   struct GNUNET_PeerIdentity *update_peer;
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Going to execute next round.\n");
   GNUNET_STATISTICS_update(stats, "# rounds", 1, GNUNET_NO);
-  ss->do_round_task = NULL;
+  sub->do_round_task = NULL;
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "Printing view:\n");
-  to_file (ss->file_name_view_log,
+  to_file (sub->file_name_view_log,
            "___ new round ___");
-  view_array = View_get_as_array (ss->view);
-  for (i = 0; i < View_size (ss->view); i++)
+  view_array = View_get_as_array (sub->view);
+  for (i = 0; i < View_size (sub->view); i++)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "\t%s\n", GNUNET_i2s (&view_array[i]));
-    to_file (ss->file_name_view_log,
+    to_file (sub->file_name_view_log,
              "=%s\t(do round)",
              GNUNET_i2s_full (&view_array[i]));
   }
 
 
   /* Send pushes and pull requests */
-  if (0 < View_size (ss->view))
+  if (0 < View_size (sub->view))
   {
     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
-                                           View_size (ss->view));
+                                           View_size (sub->view));
 
     /* Send PUSHes */
-    a_peers = ceil (alpha * View_size (ss->view));
+    a_peers = ceil (alpha * View_size (sub->view));
 
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Going to send pushes to %u (ceil (%f * %u)) peers.\n",
-         a_peers, alpha, View_size (ss->view));
+         a_peers, alpha, View_size (sub->view));
     for (i = 0; i < a_peers; i++)
     {
       peer = view_array[permut[i]];
       // FIXME if this fails schedule/loop this for later
-      send_push (get_peer_ctx (ss->peer_map, &peer));
+      send_push (get_peer_ctx (sub->peer_map, &peer));
     }
 
     /* Send PULL requests */
-    b_peers = ceil (beta * View_size (ss->view));
+    b_peers = ceil (beta * View_size (sub->view));
     first_border = a_peers;
     second_border = a_peers + b_peers;
-    if (second_border > View_size (ss->view))
+    if (second_border > View_size (sub->view))
     {
-      first_border = View_size (ss->view) - b_peers;
-      second_border = View_size (ss->view);
+      first_border = View_size (sub->view) - b_peers;
+      second_border = View_size (sub->view);
     }
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "Going to send pulls to %u (ceil (%f * %u)) peers.\n",
-        b_peers, beta, View_size (ss->view));
+        b_peers, beta, View_size (sub->view));
     for (i = first_border; i < second_border; i++)
     {
       peer = view_array[permut[i]];
-      if ( GNUNET_NO == check_peer_flag (ss->peer_map,
+      if ( GNUNET_NO == check_peer_flag (sub->peer_map,
                                          &peer,
                                          Peers_PULL_REPLY_PENDING))
       { // FIXME if this fails schedule/loop this for later
-        send_pull_request (get_peer_ctx (ss->peer_map, &peer));
+        send_pull_request (get_peer_ctx (sub->peer_map, &peer));
       }
     }
 
@@ -3773,10 +3904,9 @@ do_round (void *cls)
   /* Update view */
   /* TODO see how many peers are in push-/pull- list! */
 
-  if ((CustomPeerMap_size (ss->push_map) <= alpha * ss->view_size_est_need) &&
-      (0 < CustomPeerMap_size (ss->push_map)) &&
-      (0 < CustomPeerMap_size (ss->pull_map)))
-  //if (GNUNET_YES) // disable blocking temporarily
+  if ((CustomPeerMap_size (sub->push_map) <= alpha * sub->view_size_est_need) 
&&
+      (0 < CustomPeerMap_size (sub->push_map)) &&
+      (0 < CustomPeerMap_size (sub->pull_map)))
   { /* If conditions for update are fulfilled, update */
     LOG (GNUNET_ERROR_TYPE_DEBUG, "Update of the view.\n");
 
@@ -3788,23 +3918,23 @@ do_round (void *cls)
     peers_to_clean_size = 0;
     GNUNET_array_grow (peers_to_clean,
                        peers_to_clean_size,
-                       View_size (ss->view));
+                       View_size (sub->view));
     GNUNET_memcpy (peers_to_clean,
             view_array,
-            View_size (ss->view) * sizeof (struct GNUNET_PeerIdentity));
+            View_size (sub->view) * sizeof (struct GNUNET_PeerIdentity));
 
     /* Seems like recreating is the easiest way of emptying the peermap */
-    View_clear (ss->view);
-    to_file (ss->file_name_view_log,
+    View_clear (sub->view);
+    to_file (sub->file_name_view_log,
              "--- emptied ---");
 
-    first_border  = GNUNET_MIN (ceil (alpha * ss->view_size_est_need),
-                                CustomPeerMap_size (ss->push_map));
+    first_border  = GNUNET_MIN (ceil (alpha * sub->view_size_est_need),
+                                CustomPeerMap_size (sub->push_map));
     second_border = first_border +
-                    GNUNET_MIN (floor (beta  * ss->view_size_est_need),
-                                CustomPeerMap_size (ss->pull_map));
+                    GNUNET_MIN (floor (beta  * sub->view_size_est_need),
+                                CustomPeerMap_size (sub->pull_map));
     final_size    = second_border +
-      ceil ((1 - (alpha + beta)) * ss->view_size_est_need);
+      ceil ((1 - (alpha + beta)) * sub->view_size_est_need);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
         "first border: %" PRIu32 ", second border: %" PRIu32 ", final size: 
%"PRIu32 "\n",
         first_border,
@@ -3813,19 +3943,20 @@ do_round (void *cls)
 
     /* Update view with peers received through PUSHes */
     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
-                                           CustomPeerMap_size (ss->push_map));
+                                           CustomPeerMap_size (sub->push_map));
     for (i = 0; i < first_border; i++)
     {
       int inserted;
-      inserted = insert_in_view (ss,
-                                 CustomPeerMap_get_peer_by_index (ss->push_map,
+      inserted = insert_in_view (sub,
+                                 CustomPeerMap_get_peer_by_index 
(sub->push_map,
                                                                   permut[i]));
       if (GNUNET_OK == inserted)
       {
-        clients_notify_stream_peer (1,
-            CustomPeerMap_get_peer_by_index (ss->push_map, permut[i]));
+        clients_notify_stream_peer (sub,
+            1,
+            CustomPeerMap_get_peer_by_index (sub->push_map, permut[i]));
       }
-      to_file (ss->file_name_view_log,
+      to_file (sub->file_name_view_log,
                "+%s\t(push list)",
                GNUNET_i2s_full (&view_array[i]));
       // TODO change the peer_flags accordingly
@@ -3835,20 +3966,21 @@ do_round (void *cls)
 
     /* Update view with peers received through PULLs */
     permut = GNUNET_CRYPTO_random_permute (GNUNET_CRYPTO_QUALITY_STRONG,
-                                           CustomPeerMap_size (ss->pull_map));
+                                           CustomPeerMap_size (sub->pull_map));
     for (i = first_border; i < second_border; i++)
     {
       int inserted;
-      inserted = insert_in_view (ss,
-          CustomPeerMap_get_peer_by_index (ss->pull_map,
+      inserted = insert_in_view (sub,
+          CustomPeerMap_get_peer_by_index (sub->pull_map,
                                            permut[i - first_border]));
       if (GNUNET_OK == inserted)
       {
-        clients_notify_stream_peer (1,
-            CustomPeerMap_get_peer_by_index (ss->pull_map,
+        clients_notify_stream_peer (sub,
+            1,
+            CustomPeerMap_get_peer_by_index (sub->pull_map,
                                              permut[i - first_border]));
       }
-      to_file (ss->file_name_view_log,
+      to_file (sub->file_name_view_log,
                "+%s\t(pull list)",
                GNUNET_i2s_full (&view_array[i]));
       // TODO change the peer_flags accordingly
@@ -3857,106 +3989,106 @@ do_round (void *cls)
     permut = NULL;
 
     /* Update view with peers from history */
-    RPS_sampler_get_n_rand_peers (ss->sampler,
+    RPS_sampler_get_n_rand_peers (sub->sampler,
                                   final_size - second_border,
                                   hist_update,
-                                  ss);
+                                  sub);
     // TODO change the peer_flags accordingly
 
-    for (i = 0; i < View_size (ss->view); i++)
+    for (i = 0; i < View_size (sub->view); i++)
       rem_from_list (&peers_to_clean, &peers_to_clean_size, &view_array[i]);
 
     /* Clean peers that were removed from the view */
     for (i = 0; i < peers_to_clean_size; i++)
     {
-      to_file (ss->file_name_view_log,
+      to_file (sub->file_name_view_log,
                "-%s",
                GNUNET_i2s_full (&peers_to_clean[i]));
-      clean_peer (ss, &peers_to_clean[i]);
+      clean_peer (sub, &peers_to_clean[i]);
     }
 
     GNUNET_array_grow (peers_to_clean, peers_to_clean_size, 0);
-    clients_notify_view_update (ss);
+    clients_notify_view_update (sub);
   } else {
     LOG (GNUNET_ERROR_TYPE_DEBUG, "No update of the view.\n");
     GNUNET_STATISTICS_update(stats, "# rounds blocked", 1, GNUNET_NO);
-    if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) &&
-        !(0 >= CustomPeerMap_size (ss->pull_map)))
+    if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
+        !(0 >= CustomPeerMap_size (sub->pull_map)))
       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes", 1, 
GNUNET_NO);
-    if (CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) &&
-        (0 >= CustomPeerMap_size (ss->pull_map)))
+    if (CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
+        (0 >= CustomPeerMap_size (sub->pull_map)))
       GNUNET_STATISTICS_update(stats, "# rounds blocked - too many pushes, no 
pull replies", 1, GNUNET_NO);
-    if (0 >= CustomPeerMap_size (ss->push_map) &&
-        !(0 >= CustomPeerMap_size (ss->pull_map)))
+    if (0 >= CustomPeerMap_size (sub->push_map) &&
+        !(0 >= CustomPeerMap_size (sub->pull_map)))
       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes", 1, 
GNUNET_NO);
-    if (0 >= CustomPeerMap_size (ss->push_map) &&
-        (0 >= CustomPeerMap_size (ss->pull_map)))
+    if (0 >= CustomPeerMap_size (sub->push_map) &&
+        (0 >= CustomPeerMap_size (sub->pull_map)))
       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pushes, no pull 
replies", 1, GNUNET_NO);
-    if (0 >= CustomPeerMap_size (ss->pull_map) &&
-        CustomPeerMap_size (ss->push_map) > alpha * View_size (ss->view) &&
-        0 >= CustomPeerMap_size (ss->push_map))
+    if (0 >= CustomPeerMap_size (sub->pull_map) &&
+        CustomPeerMap_size (sub->push_map) > alpha * View_size (sub->view) &&
+        0 >= CustomPeerMap_size (sub->push_map))
       GNUNET_STATISTICS_update(stats, "# rounds blocked - no pull replies", 1, 
GNUNET_NO);
   }
   // TODO independent of that also get some peers from CADET_get_peers()?
   GNUNET_STATISTICS_set (stats,
       "# peers in push map at end of round",
-      CustomPeerMap_size (ss->push_map),
+      CustomPeerMap_size (sub->push_map),
       GNUNET_NO);
   GNUNET_STATISTICS_set (stats,
       "# peers in pull map at end of round",
-      CustomPeerMap_size (ss->pull_map),
+      CustomPeerMap_size (sub->pull_map),
       GNUNET_NO);
   GNUNET_STATISTICS_set (stats,
       "# peers in view at end of round",
-      View_size (ss->view),
+      View_size (sub->view),
       GNUNET_NO);
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
-       "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size 
(ss->view%u) = %.2f)\n",
-       CustomPeerMap_size (ss->push_map),
-       CustomPeerMap_size (ss->pull_map),
+       "Received %u pushes and %u pulls last round (alpha (%.2f) * view_size 
(sub->view%u) = %.2f)\n",
+       CustomPeerMap_size (sub->push_map),
+       CustomPeerMap_size (sub->pull_map),
        alpha,
-       View_size (ss->view),
-       alpha * View_size (ss->view));
+       View_size (sub->view),
+       alpha * View_size (sub->view));
 
   /* Update samplers */
-  for (i = 0; i < CustomPeerMap_size (ss->push_map); i++)
+  for (i = 0; i < CustomPeerMap_size (sub->push_map); i++)
   {
-    update_peer = CustomPeerMap_get_peer_by_index (ss->push_map, i);
+    update_peer = CustomPeerMap_get_peer_by_index (sub->push_map, i);
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Updating with peer %s from push list\n",
          GNUNET_i2s (update_peer));
-    insert_in_sampler (ss, update_peer);
-    clean_peer (ss, update_peer); /* This cleans only if it is not in the view 
*/
+    insert_in_sampler (sub, update_peer);
+    clean_peer (sub, update_peer); /* This cleans only if it is not in the 
view */
   }
 
-  for (i = 0; i < CustomPeerMap_size (ss->pull_map); i++)
+  for (i = 0; i < CustomPeerMap_size (sub->pull_map); i++)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Updating with peer %s from pull list\n",
-         GNUNET_i2s (CustomPeerMap_get_peer_by_index (ss->pull_map, i)));
-    insert_in_sampler (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i));
+         GNUNET_i2s (CustomPeerMap_get_peer_by_index (sub->pull_map, i)));
+    insert_in_sampler (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, 
i));
     /* This cleans only if it is not in the view */
-    clean_peer (ss, CustomPeerMap_get_peer_by_index (ss->pull_map, i));
+    clean_peer (sub, CustomPeerMap_get_peer_by_index (sub->pull_map, i));
   }
 
 
   /* Empty push/pull lists */
-  CustomPeerMap_clear (ss->push_map);
-  CustomPeerMap_clear (ss->pull_map);
+  CustomPeerMap_clear (sub->push_map);
+  CustomPeerMap_clear (sub->pull_map);
 
   GNUNET_STATISTICS_set (stats,
                          "view size",
-                         View_size(ss->view),
+                         View_size(sub->view),
                          GNUNET_NO);
 
   struct GNUNET_TIME_Relative time_next_round;
 
-  time_next_round = compute_rand_delay (ss->round_interval, 2);
+  time_next_round = compute_rand_delay (sub->round_interval, 2);
 
   /* Schedule next round */
-  ss->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
-                                                    &do_round, ss);
+  sub->do_round_task = GNUNET_SCHEDULER_add_delayed (time_next_round,
+                                                     &do_round, sub);
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Finished round\n");
 }
 
@@ -3969,7 +4101,7 @@ do_round (void *cls)
  *
  * implements #GNUNET_CADET_PeersCB
  *
- * @param cls Closure - SubSampler
+ * @param cls Closure - Sub
  * @param peer Peer, or NULL on "EOF".
  * @param tunnel Do we have a tunnel towards this peer?
  * @param n_paths Number of known paths towards this peer.
@@ -3979,12 +4111,12 @@ do_round (void *cls)
 void
 init_peer_cb (void *cls,
               const struct GNUNET_PeerIdentity *peer,
-              int tunnel, // "Do we have a tunnel towards this peer?"
-              unsigned int n_paths, // "Number of known paths towards this 
peer"
-              unsigned int best_path) // "How long is the best path?
-                                      // (0 = unknown, 1 = ourselves, 2 = 
neighbor)"
+              int tunnel, /* "Do we have a tunnel towards this peer?" */
+              unsigned int n_paths, /* "Number of known paths towards this 
peer" */
+              unsigned int best_path) /* "How long is the best path?
+                                       * (0 = unknown, 1 = ourselves, 2 = 
neighbor)" */
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
   (void) tunnel;
   (void) n_paths;
   (void) best_path;
@@ -3994,7 +4126,7 @@ init_peer_cb (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Got peer_id %s from cadet\n",
          GNUNET_i2s (peer));
-    got_peer (ss, peer);
+    got_peer (sub, peer);
   }
 }
 
@@ -4004,7 +4136,7 @@ init_peer_cb (void *cls,
  *
  * We initialise the sampler with those.
  *
- * @param cls Closure - SubSampler
+ * @param cls Closure - Sub
  * @param peer the peer id
  * @return #GNUNET_YES if we should continue to
  *         iterate,
@@ -4014,14 +4146,14 @@ static int
 valid_peers_iterator (void *cls,
                       const struct GNUNET_PeerIdentity *peer)
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
 
   if (NULL != peer)
   {
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Got stored, valid peer %s\n",
          GNUNET_i2s (peer));
-    got_peer (ss, peer);
+    got_peer (sub, peer);
   }
   return GNUNET_YES;
 }
@@ -4030,7 +4162,7 @@ valid_peers_iterator (void *cls,
 /**
  * Iterator over peers from peerinfo.
  *
- * @param cls Closure - SubSampler
+ * @param cls Closure - Sub
  * @param peer id of the peer, NULL for last call
  * @param hello hello message for the peer (can be NULL)
  * @param error message
@@ -4041,7 +4173,7 @@ process_peerinfo_peers (void *cls,
                         const struct GNUNET_HELLO_Message *hello,
                         const char *err_msg)
 {
-  struct SubSampler *ss = cls;
+  struct Sub *sub = cls;
   (void) hello;
   (void) err_msg;
 
@@ -4050,7 +4182,7 @@ process_peerinfo_peers (void *cls,
     LOG (GNUNET_ERROR_TYPE_DEBUG,
          "Got peer_id %s from peerinfo\n",
          GNUNET_i2s (peer));
-    got_peer (ss, peer);
+    got_peer (sub, peer);
   }
 }
 
@@ -4058,14 +4190,13 @@ process_peerinfo_peers (void *cls,
 /**
  * Task run during shutdown.
  *
- * @param cls Closure - SubSampler containing all datastructures to clean
+ * @param cls Closure - unused
  */
 static void
 shutdown_task (void *cls)
 {
-  struct ClientContext *client_ctx;
   (void) cls;
-  struct SubSampler *ss = cls;
+  struct ClientContext *client_ctx;
 
   LOG (GNUNET_ERROR_TYPE_DEBUG,
        "RPS service is going down\n");
@@ -4077,39 +4208,28 @@ shutdown_task (void *cls)
   {
     destroy_cli_ctx (client_ctx);
   }
-  GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
-  GNUNET_PEERINFO_disconnect (peerinfo_handle);
-  peerinfo_handle = NULL;
-  if (NULL != ss->do_round_task)
+  if (NULL != msub)
   {
-    GNUNET_SCHEDULER_cancel (ss->do_round_task);
-    ss->do_round_task = NULL;
+    destroy_sub (msub);
+    msub = NULL;
   }
 
-  peers_terminate (ss);
-
+  /* Disconnect from other services */
+  GNUNET_PEERINFO_notify_cancel (peerinfo_notify_handle);
+  GNUNET_PEERINFO_disconnect (peerinfo_handle);
+  peerinfo_handle = NULL;
   GNUNET_NSE_disconnect (nse);
-  RPS_sampler_destroy (ss->sampler);
-  GNUNET_CADET_close_port (ss->cadet_port);
-  GNUNET_CADET_disconnect (ss->cadet_handle);
-  ss->cadet_handle = NULL;
-  View_destroy (ss->view);
-  CustomPeerMap_destroy (ss->push_map);
-  CustomPeerMap_destroy (ss->pull_map);
+
   if (NULL != stats)
   {
     GNUNET_STATISTICS_destroy (stats,
                                GNUNET_NO);
     stats = NULL;
   }
+  GNUNET_CADET_disconnect (cadet_handle);
+  cadet_handle = NULL;
 #ifdef ENABLE_MALICIOUS
   struct AttackedPeer *tmp_att_peer;
-  /* it is ok to free this const during shutdown: */
-  GNUNET_free ((char *) ss->file_name_view_log);
-#ifdef TO_FILE
-  GNUNET_free ((char *) ss->file_name_observed_log);
-  GNUNET_CONTAINER_multipeermap_destroy (ss->observed_unique_peers);
-#endif /* TO_FILE */
   GNUNET_array_grow (mal_peers,
                      num_mal_peers,
                      0);
@@ -4154,7 +4274,6 @@ client_connect_cb (void *cls,
   cli_ctx->view_updates_left = -1;
   cli_ctx->stream_update = GNUNET_NO;
   cli_ctx->client = client;
-  cli_ctx->ss = mss;
   GNUNET_CONTAINER_DLL_insert (cli_ctx_head,
                                cli_ctx_tail,
                                cli_ctx);
@@ -4206,6 +4325,8 @@ run (void *cls,
   char *fn_valid_peers;
   struct GNUNET_TIME_Relative round_interval;
   long long unsigned int sampler_size;
+  char hash_port_string[] = GNUNET_APPLICATION_PORT_RPS;
+  struct GNUNET_HashCode hash;
 
   (void) cls;
   (void) service;
@@ -4262,40 +4383,43 @@ run (void *cls,
                                "FILENAME_VALID_PEERS");
   }
 
+  cadet_handle = GNUNET_CADET_connect (cfg);
+  GNUNET_assert (NULL != cadet_handle);
+
+
   alpha = 0.45;
   beta  = 0.45;
 
 
-  /* Set up main SubSampler */
-  mss = new_subsampler ("", /* this is the main sampler - no shared value */
-                         sampler_size, /* Will be overwritten by config */
-                         round_interval);
+  /* Set up main Sub */
+  GNUNET_CRYPTO_hash (hash_port_string,
+                      strlen (hash_port_string),
+                      &hash);
+  msub = new_sub (&hash,
+                 sampler_size, /* Will be overwritten by config */
+                 round_interval);
 
 
   peerinfo_handle = GNUNET_PEERINFO_connect (cfg);
 
   /* connect to NSE */
-  nse = GNUNET_NSE_connect (cfg, nse_callback, mss);
+  nse = GNUNET_NSE_connect (cfg, nse_callback, NULL);
 
   //LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting peers from CADET\n");
-  //GNUNET_CADET_get_peers (mss.cadet_handle, &init_peer_cb, mss);
+  //GNUNET_CADET_get_peers (cadet_handle, &init_peer_cb, msub);
   // TODO send push/pull to each of those peers?
-  // TODO read stored valid peers from last run
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Requesting stored valid peers\n");
-  restore_valid_peers (mss);
-  get_valid_peers (mss->valid_peers, valid_peers_iterator, mss);
+  restore_valid_peers (msub);
+  get_valid_peers (msub->valid_peers, valid_peers_iterator, msub);
 
   peerinfo_notify_handle = GNUNET_PEERINFO_notify (cfg,
                                                    GNUNET_NO,
                                                    process_peerinfo_peers,
-                                                   mss);
+                                                   msub);
 
   LOG (GNUNET_ERROR_TYPE_INFO, "Ready to receive requests from clients\n");
 
-  mss->do_round_task = GNUNET_SCHEDULER_add_now (&do_round, mss);
-  LOG (GNUNET_ERROR_TYPE_DEBUG, "Scheduled first round\n");
-
-  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, mss);
+  GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL);
   stats = GNUNET_STATISTICS_create ("rps", cfg);
 }
 
@@ -4336,6 +4460,14 @@ GNUNET_SERVICE_MAIN
    GNUNET_MESSAGE_TYPE_RPS_CS_DEBUG_STREAM_CANCEL,
    struct GNUNET_MessageHeader,
    NULL),
+ GNUNET_MQ_hd_fixed_size (client_start_sub,
+   GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START,
+   struct GNUNET_RPS_CS_SubStartMessage,
+   NULL),
+ GNUNET_MQ_hd_fixed_size (client_stop_sub,
+   GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP,
+   struct GNUNET_RPS_CS_SubStopMessage,
+   NULL),
  GNUNET_MQ_handler_end());
 
 /* end of gnunet-service-rps.c */
diff --git a/src/rps/rps.h b/src/rps/rps.h
index 915524f88..616eabdac 100644
--- a/src/rps/rps.h
+++ b/src/rps/rps.h
@@ -114,6 +114,50 @@ struct GNUNET_RPS_CS_ActMaliciousMessage
 #endif /* ENABLE_MALICIOUS */
 
 
+/**
+ * Message from client to service telling it to start a new sub
+ */
+struct GNUNET_RPS_CS_SubStartMessage
+{
+  /**
+   * Header including size and type in NBO
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * For alignment.
+   */
+  uint32_t reserved GNUNET_PACKED;
+
+  /**
+   * Mean interval between two rounds
+   */
+  struct GNUNET_TIME_RelativeNBO round_interval;
+
+  /**
+   * Length of the shared value represented as string.
+   */
+  struct GNUNET_HashCode hash GNUNET_PACKED;
+};
+
+
+/**
+ * Message from client to service telling it to stop a new sub
+ */
+struct GNUNET_RPS_CS_SubStopMessage
+{
+  /**
+   * Header including size and type in NBO
+   */
+  struct GNUNET_MessageHeader header;
+
+  /**
+   * Length of the shared value represented as string.
+   */
+  struct GNUNET_HashCode hash GNUNET_PACKED;
+};
+
+
 /* Debug messages */
 
 /**
diff --git a/src/rps/rps_api.c b/src/rps/rps_api.c
index 7d0674aff..34b28cd6a 100644
--- a/src/rps/rps_api.c
+++ b/src/rps/rps_api.c
@@ -267,6 +267,9 @@ collect_peers_cb (void *cls,
 {
   struct GNUNET_RPS_Request_Handle *rh = cls;
 
+  LOG (GNUNET_ERROR_TYPE_DEBUG,
+       "Service sent %" PRIu64 " peers from stream\n",
+       num_peers);
   for (uint64_t i = 0; i < num_peers; i++)
   {
     RPS_sampler_update (rh->sampler, &peers[i]);
@@ -274,44 +277,6 @@ collect_peers_cb (void *cls,
 }
 
 
-/**
- * @brief Create new request handle
- *
- * @param rps_handle Handle to the service
- * @param num_requests Number of requests
- * @param ready_cb Callback
- * @param cls Closure
- *
- * @return The newly created request handle
- */
-static struct GNUNET_RPS_Request_Handle *
-new_request_handle (struct GNUNET_RPS_Handle *rps_handle,
-                    uint64_t num_requests,
-                    GNUNET_RPS_NotifyReadyCB ready_cb,
-                    void *cls)
-{
-  struct GNUNET_RPS_Request_Handle *rh;
-
-  rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
-  rh->rps_handle = rps_handle;
-  rh->num_requests = num_requests;
-  rh->sampler = RPS_sampler_mod_init (num_requests,
-                                      GNUNET_TIME_UNIT_SECONDS); // TODO 
remove this time-stuff
-  rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
-                                                 num_requests,
-                                                 peers_ready_cb,
-                                                 rh);
-  rh->srh = GNUNET_RPS_stream_request (rps_handle,
-                                       0, /* infinite updates */
-                                       collect_peers_cb,
-                                       rh); /* cls */
-  rh->ready_cb = ready_cb;
-  rh->ready_cb_cls = cls;
-
-  return rh;
-}
-
-
 /* Get internals for debugging/profiling purposes */
 
 /**
@@ -623,6 +588,24 @@ mq_error_handler (void *cls,
 
 
 /**
+ * @brief Create the hash value from the share value that defines the sub
+ * (-group)
+ *
+ * @param share_val Share value - strings longer than 508 (512 - 4) will be
+ *        truncated.
+ * @param hash Pointer to the location in which the hash will be stored.
+ */
+static void
+hash_from_share_val (const char *share_val, struct GNUNET_HashCode *hash)
+{
+  char hash_port_string[512] = "rps";
+
+  (void) strncat (hash_port_string, share_val, 508);
+  GNUNET_CRYPTO_hash (hash_port_string, strlen (hash_port_string), hash);
+}
+
+
+/**
  * Reconnect to the service
  */
 static void
@@ -674,6 +657,49 @@ GNUNET_RPS_connect (const struct 
GNUNET_CONFIGURATION_Handle *cfg)
 
 
 /**
+ * @brief Start a sub with the given shared value
+ *
+ * @param h Handle to rps
+ * @param shared_value The shared value that defines the members of the sub 
(-gorup)
+ */
+void
+GNUNET_RPS_sub_start (struct GNUNET_RPS_Handle *h,
+                      const char *shared_value)
+{
+  struct GNUNET_RPS_CS_SubStartMessage *msg;
+  struct GNUNET_MQ_Envelope *ev;
+
+  ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_START);
+  hash_from_share_val (shared_value, &msg->hash);
+  msg->round_interval = GNUNET_TIME_relative_hton (// TODO read from config!
+    GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30));
+  GNUNET_assert (0 != msg->round_interval.rel_value_us__);
+
+  GNUNET_MQ_send (h->mq, ev);
+}
+
+
+/**
+ * @brief Stop a sub with the given shared value
+ *
+ * @param h Handle to rps
+ * @param shared_value The shared value that defines the members of the sub 
(-gorup)
+ */
+void
+GNUNET_RPS_sub_stop (struct GNUNET_RPS_Handle *h,
+                     const char *shared_value)
+{
+  struct GNUNET_RPS_CS_SubStopMessage *msg;
+  struct GNUNET_MQ_Envelope *ev;
+
+  ev = GNUNET_MQ_msg (msg, GNUNET_MESSAGE_TYPE_RPS_CS_SUB_STOP);
+  hash_from_share_val (shared_value, &msg->hash);
+
+  GNUNET_MQ_send (h->mq, ev);
+}
+
+
+/**
  * Request n random peers.
  *
  * @param rps_handle handle to the rps service
@@ -690,11 +716,24 @@ GNUNET_RPS_request_peers (struct GNUNET_RPS_Handle 
*rps_handle,
 {
   struct GNUNET_RPS_Request_Handle *rh;
 
-  rh = new_request_handle (rps_handle,
-                           num_req_peers,
-                           ready_cb,
-                           cls);
-
+  LOG (GNUNET_ERROR_TYPE_INFO,
+       "Client requested %" PRIu32 " peers\n",
+       num_req_peers);
+  rh = GNUNET_new (struct GNUNET_RPS_Request_Handle);
+  rh->rps_handle = rps_handle;
+  rh->num_requests = num_req_peers;
+  rh->sampler = RPS_sampler_mod_init (num_req_peers,
+                                      GNUNET_TIME_UNIT_SECONDS); // TODO 
remove this time-stuff
+  rh->sampler_rh = RPS_sampler_get_n_rand_peers (rh->sampler,
+                                                 num_req_peers,
+                                                 peers_ready_cb,
+                                                 rh);
+  rh->srh = GNUNET_RPS_stream_request (rps_handle,
+                                       0, /* infinite updates */
+                                       collect_peers_cb,
+                                       rh); /* cls */
+  rh->ready_cb = ready_cb;
+  rh->ready_cb_cls = cls;
 
   return rh;
 }
diff --git a/src/rps/test_rps.c b/src/rps/test_rps.c
index 92d8c12ea..cbd3ba845 100644
--- a/src/rps/test_rps.c
+++ b/src/rps/test_rps.c
@@ -673,6 +673,13 @@ ids_to_file (char *file_name,
 } */
 
 /**
+ * Task run on timeout to collect statistics and potentially shut down.
+ */
+static void
+post_test_op (void *cls);
+
+
+/**
  * Test the success of a single test
  */
 static int
@@ -732,6 +739,8 @@ static int check_statistics_collect_completed_single_peer (
   }
   return GNUNET_YES;
 }
+
+
 /**
  * @brief Checks if all peers already received their statistics value from the
  * statistics service.
@@ -758,6 +767,7 @@ static int check_statistics_collect_completed ()
   return GNUNET_YES;
 }
 
+
 /**
  * Task run on timeout to shut everything down.
  */
@@ -765,6 +775,7 @@ static void
 shutdown_op (void *cls)
 {
   unsigned int i;
+  (void) cls;
 
   GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
               "Shutdown task scheduled, going down.\n");
@@ -772,6 +783,7 @@ shutdown_op (void *cls)
   if (NULL != post_test_task)
   {
     GNUNET_SCHEDULER_cancel (post_test_task);
+    post_test_op (NULL);
   }
   if (NULL != churn_task)
   {
@@ -799,6 +811,7 @@ static void
 post_test_op (void *cls)
 {
   unsigned int i;
+  (void) cls;
 
   post_test_task = NULL;
   post_test = GNUNET_YES;
@@ -811,16 +824,16 @@ post_test_op (void *cls)
   }
   for (i = 0; i < num_peers; i++)
   {
-    if (NULL != rps_peers[i].op)
-    {
-      GNUNET_TESTBED_operation_done (rps_peers[i].op);
-      rps_peers[i].op = NULL;
-    }
     if (NULL != cur_test_run.post_test)
     {
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Executing post_test for peer 
%u\n", i);
       cur_test_run.post_test (&rps_peers[i]);
     }
+    if (NULL != rps_peers[i].op)
+    {
+      GNUNET_TESTBED_operation_done (rps_peers[i].op);
+      rps_peers[i].op = NULL;
+    }
   }
   /* If we do not collect statistics, shut down directly */
   if (NO_COLLECT_STATISTICS == cur_test_run.have_collect_statistics ||
@@ -905,6 +918,7 @@ info_cb (void *cb_cls,
          const char *emsg)
 {
   struct OpListEntry *entry = (struct OpListEntry *) cb_cls;
+  (void) op;
 
   if (GNUNET_YES == in_shutdown || GNUNET_YES == post_test)
   {
@@ -1070,6 +1084,9 @@ stat_complete_cb (void *cls, struct 
GNUNET_TESTBED_Operation *op,
 {
   //struct GNUNET_STATISTICS_Handle *sh = ca_result;
   //struct RPSPeer *peer = (struct RPSPeer *) cls;
+  (void) cls;
+  (void) op;
+  (void) ca_result;
 
   if (NULL != emsg)
   {
@@ -1098,6 +1115,7 @@ rps_disconnect_adapter (void *cls,
 {
   struct RPSPeer *peer = cls;
   struct GNUNET_RPS_Handle *h = op_result;
+
   GNUNET_assert (NULL != peer);
   GNUNET_RPS_disconnect (h);
   peer->rps_handle = NULL;
@@ -1441,6 +1459,7 @@ seed_big_cb (struct RPSPeer *rps_peer)
 static void
 single_peer_seed_cb (struct RPSPeer *rps_peer)
 {
+  (void) rps_peer;
   // TODO
 }
 
@@ -1525,6 +1544,31 @@ churn_test_cb (struct RPSPeer *rps_peer)
 }
 
 /***********************************
+ * SUB
+***********************************/
+
+void sub_post (struct RPSPeer *rps_peer)
+{
+  GNUNET_RPS_sub_stop (rps_peer->rps_handle, "test");
+}
+
+static void
+sub_stop_op (void *cls)
+{
+  struct GNUNET_RPS_Handle *h = cls;
+
+  GNUNET_RPS_sub_stop (h, "test");
+}
+
+static void
+sub_pre (struct RPSPeer *rps_peer, struct GNUNET_RPS_Handle *h)
+{
+  (void) rps_peer;
+
+  GNUNET_RPS_sub_start (h, "test");
+}
+
+/***********************************
  * PROFILER
 ***********************************/
 
@@ -1540,6 +1584,7 @@ churn_cb (void *cls,
           struct GNUNET_TESTBED_Operation *op,
           const char *emsg)
 {
+  (void) op;
   // FIXME
   struct OpListEntry *entry = cls;
 
@@ -1670,6 +1715,7 @@ manage_service_wrapper (unsigned int i, unsigned int j,
 static void
 churn (void *cls)
 {
+  (void) cls;
   unsigned int i;
   unsigned int j;
   double portion_online;
@@ -1832,6 +1878,8 @@ profiler_cb (struct RPSPeer *rps_peer)
 int
 file_name_cb (void *cls, const char *filename)
 {
+  (void) cls;
+
   if (NULL != strstr (filename, "sampler_el"))
   {
     struct RPS_SamplerElement *s_elem;
@@ -2501,6 +2549,8 @@ stat_iterator (void *cls,
                uint64_t value,
                int is_persistent)
 {
+  (void) subsystem;
+  (void) is_persistent;
   const struct STATcls *stat_cls = (const struct STATcls *) cls;
   struct RPSPeer *rps_peer = (struct RPSPeer *) stat_cls->rps_peer;
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Got stat value: %s - %" PRIu64 "\n",
@@ -2635,6 +2685,9 @@ run (void *cls,
      unsigned int links_succeeded,
      unsigned int links_failed)
 {
+  (void) cls;
+  (void) h;
+  (void) links_failed;
   unsigned int i;
   struct OpListEntry *entry;
 
@@ -2727,6 +2780,7 @@ int
 main (int argc, char *argv[])
 {
   int ret_value;
+  (void) argc;
 
   /* Defaults for tests */
   num_peers = 5;
@@ -2748,6 +2802,7 @@ main (int argc, char *argv[])
     cur_test_run.pre_test = mal_pre;
     cur_test_run.main_test = mal_cb;
     cur_test_run.init_peer = mal_init_peer;
+    timeout_s = 40;
 
     if (strstr (argv[0], "_1") != NULL)
     {
@@ -2843,7 +2898,22 @@ main (int argc, char *argv[])
     cur_test_run.eval_cb = default_eval_cb;
     cur_test_run.have_churn = HAVE_NO_CHURN;
     cur_test_run.have_quick_quit = HAVE_NO_QUICK_QUIT;
-    timeout_s = 10;
+    timeout_s = 40;
+  }
+
+  else if (strstr (argv[0], "_sub") != NULL)
+  {
+    GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Test subs\n");
+    cur_test_run.name = "test-rps-sub";
+    num_peers = 5;
+    //cur_test_run.init_peer = &default_init_peer;
+    cur_test_run.pre_test = &sub_pre;
+    cur_test_run.main_test = &single_req_cb;
+    //cur_test_run.reply_handle = default_reply_handle;
+    cur_test_run.post_test = &sub_post;
+    //cur_test_run.eval_cb = default_eval_cb;
+    cur_test_run.have_churn = HAVE_NO_CHURN;
+    cur_test_run.have_quick_quit = HAVE_QUICK_QUIT;
   }
 
   else if (strstr (argv[0], "profiler") != NULL)
diff --git a/src/rps/test_rps.conf b/src/rps/test_rps.conf
index 84e0e5049..c55930649 100644
--- a/src/rps/test_rps.conf
+++ b/src/rps/test_rps.conf
@@ -12,7 +12,7 @@ NOARMBIND = YES
 #OPTIONS=-l /tmp/rps_profiler_logs/rps-[]-%Y-%m-%d.log
 
 # This is the timeinterval between the rounds
-ROUNDINTERVAL = 2 s
+ROUNDINTERVAL = 1 s
 FILENAME_VALID_PEERS = $GNUNET_DATA_HOME/rps/valid_peers.txt
 
 # This is the 'estimate' in the beginning.

-- 
To stop receiving notification emails like this one, please contact
address@hidden



reply via email to

[Prev in Thread] Current Thread [Next in Thread]