gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21380 - in gnunet-java/src/org/gnunet: dht statistics util


From: gnunet
Subject: [GNUnet-SVN] r21380 - in gnunet-java/src/org/gnunet: dht statistics util
Date: Wed, 9 May 2012 15:02:28 +0200

Author: dold
Date: 2012-05-09 15:02:28 +0200 (Wed, 09 May 2012)
New Revision: 21380

Added:
   gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java
   gnunet-java/src/org/gnunet/dht/ResultCallback.java
   gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java
Modified:
   gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
   gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
   gnunet-java/src/org/gnunet/statistics/Statistics.java
   gnunet-java/src/org/gnunet/util/HashCode.java
   gnunet-java/src/org/gnunet/util/RequestQueue.java
Log:
rewritten dht put for the new protocol, using the MessageQueue

Copied: gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java (from 
rev 21245, gnunet-java/src/org/gnunet/dht/ClientPutMessage.java)
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java            
                (rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java    
2012-05-09 13:02:28 UTC (rev 21380)
@@ -0,0 +1,18 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+
+
address@hidden(155)
+public class ClientPutConfirmationMessage implements GnunetMessage.Body {
+    @UInt32
+    public int reserved;
+    /**
+     * UID used to identify request with the response
+     */
+    @UInt64
+    public long uid;
+}

Modified: gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientPutMessage.java        2012-05-09 
12:56:50 UTC (rev 21379)
+++ gnunet-java/src/org/gnunet/dht/ClientPutMessage.java        2012-05-09 
13:02:28 UTC (rev 21380)
@@ -1,20 +1,11 @@
 package org.gnunet.dht;
 
-import org.gnunet.construct.ByteFill;
-import org.gnunet.construct.NestedMessage;
-import org.gnunet.construct.UInt32;
-import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.*;
 import org.gnunet.util.AbsoluteTimeMessage;
 import org.gnunet.util.GnunetMessage;
 import org.gnunet.util.HashCode;
 
-/**
-* Created with IntelliJ IDEA.
-* User: dold
-* Date: 5/2/12
-* Time: 7:05 PM
-* To change this template use File | Settings | File Templates.
-*/
+
 @UnionCase(142)
 public class ClientPutMessage implements GnunetMessage.Body {
     /**
@@ -29,6 +20,11 @@
     public int options;
     @UInt32
     public int desiredReplicationLevel;
+    /**
+     * UID used to identify request with the response
+     */
+    @UInt64
+    public long uid;
     @NestedMessage
     public AbsoluteTimeMessage expiration;
     @NestedMessage

Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-05-09 
12:56:50 UTC (rev 21379)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-05-09 
13:02:28 UTC (rev 21380)
@@ -6,98 +6,85 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
 
 /**
  * API for the gnunet dht service.
  * <p/>
  * Stores data under a key, distributed across the network.
  * <p/>
- * TODO: implement monitoring
  */
 public class DistributedHashTable {
     private static final Logger logger = LoggerFactory
             .getLogger(DistributedHashTable.class);
 
-    private Configuration cfg;
 
+    private Client client;
+
     private RequestQueue requestQueue;
 
     /**
-     * Callback object for requests to the dht
+     * next UID used on get/monitor requests, incremented after each use.
      */
-    public interface ResultCallback {
-        /**
-         * Called when the dht returns a result
-         *
-         * @param expiration expiration of the returned entry
-         * @param key        key of the returned entry
-         * @param getPath    put path of the returned entry
-         * @param putPath    put path of the returned entry
-         * @param type       type of data in the entry
-         * @param data       data of the returned entry
-         */
-        public void handleResult(AbsoluteTime expiration, HashCode key,
-                                 List<PeerIdentity> getPath, 
List<PeerIdentity> putPath,
-                                 BlockType type, byte[] data);
-    }
+    private long nextUid = 1;
 
-    private Client client;
-    private long uid = 1;
+    private List<GetRequest> activeGetRequests = new ArrayList<GetRequest>(5);
 
+
     /**
+     * put request awaiting response. may be null if no put request is active.
+     */
+    private PutRequest currentPutRequest;
+
+
+    /**
      * Create a connection with the DHT service.
      *
      * @param cfg the configuration to use
      */
     public DistributedHashTable(Configuration cfg) {
-        this.cfg = cfg;
         client = new Client("dht", cfg);
+        requestQueue = new RequestQueue(client, new DHTMessageReceiver());
     }
 
+    private class PutRequest extends RequestQueue.Request {
+        public AbsoluteTime deadline;
+        public byte[] data;
+        public HashCode key;
+        public int replicationLevel;
+        public AbsoluteTime expiration;
+        public int type;
+        public Continuation cont;
+        public long uid;
 
-    /**
-     * Put data into the dht. Put request cannot be queued.
-     *
-     * @param key              key key to store the data under
-     * @param data             data data to store
-     * @param replicationLevel how many peers should store this value
-     * @param routeOptions     additional options
-     * @param type             type of the data to store
-     * @param expiration       how long should the value be stored? TODO: what 
is the maximum?
-     * @param timeout          how long after we give up on storing the value?
-     * @param cont             called after the put operation failed or 
succeeded
-     */
-    public void put(HashCode key, byte[] data, int replicationLevel, 
Set<RouteOption> routeOptions,
-                    int type, AbsoluteTime expiration,
-                    RelativeTime timeout, final Continuation cont) {
-        final ClientPutMessage cpm = new ClientPutMessage();
-        cpm.data = data;
-        cpm.hash = key;
-        cpm.desiredReplicationLevel = replicationLevel;
-        cpm.expiration = expiration.asMessage();
-        cpm.type = type;
+        public PutRequest() {
+            this.uid = nextUid++;
+        }
 
-        client.notifyTransmitReady(timeout, true,
-                0, new MessageTransmitter() {
-            @Override
-            public void transmit(Connection.MessageSink sink) {
-                sink.send(cpm);
-                // is there no way to get an ack from the dht service?
-                cont.cont(true);
-            }
+        @Override
+        public AbsoluteTime getDeadline() {
+            return deadline;
+        }
 
-            @Override
-            public void handleError() {
-                cont.cont(false);
-            }
-        });
+        @Override
+        public void transmit(Connection.MessageSink sink) {
+            final ClientPutMessage cpm = new ClientPutMessage();
+            cpm.data = data;
+            cpm.hash = key;
+            cpm.desiredReplicationLevel = replicationLevel;
+            cpm.expiration = expiration.asMessage();
+            cpm.type = type;
+            cpm.uid = uid;
+            sink.send(cpm);
+        }
+
+        public boolean onDestroy() {
+            return true; // keep!
+        }
     }
 
-    private class GetRequest implements Cancelable {
+
+    private class GetRequest extends RequestQueue.Request {
         public long uid;
         public HashCode key;
         public ResultCallback cb;
@@ -105,52 +92,33 @@
         public boolean canceled = false;
 
         public GetRequest() {
-            uid = DistributedHashTable.this.uid++;
+            uid = DistributedHashTable.this.nextUid++;
         }
 
-        public void cancel() {
-            if (!activeRequests.contains(this)) {
-                throw new AssertionError("cancel called on invalid request");
-            }
+        @Override
+        public AbsoluteTime getDeadline() {
+            return deadline;
+        }
 
-            canceled = true;
+        @Override
+        public void transmit(Connection.MessageSink sink) {
+        }
+    }
 
-            activeRequests.remove(this);
 
-            if (activeRequests.isEmpty()) {
-                receiveHandle.cancel();
+    public class DHTMessageReceiver extends RunaboutMessageReceiver {
+        public void visit(ClientPutConfirmationMessage pcm) {
+            if (currentPutRequest == null || pcm.uid != currentPutRequest.uid) 
{
+                logger.warn("DHT service got confused with UIDs");
+                return;
             }
-
-            client.notifyTransmitReady(RelativeTime.FOREVER, false, 0, new 
MessageTransmitter() {
-                @Override
-                public void transmit(Connection.MessageSink sink) {
-                    final ClientGetStopMessage sm = new ClientGetStopMessage();
-                    sm.key = key;
-                    sm.unique_id = uid;
-                    sink.send(sm);
-                }
-
-                @Override
-                public void handleError() {
-                }
-            });
+            currentPutRequest.cont.cont(true);
         }
-    }
 
-    private List<GetRequest> activeRequests = new ArrayList<GetRequest>(5);
-
-    private Cancelable receiveHandle = null;
-
-
-    private class ResponseDispatcher implements MessageReceiver {
-        @Override
-        public void process(GnunetMessage.Body msg) {
-            receiveHandle = null;
-
-            ClientResultMessage rm = (ClientResultMessage) msg;
+        public void visit(ClientResultMessage rm) {
             GetRequest request = null;
 
-            for (GetRequest r : activeRequests) {
+            for (GetRequest r : activeGetRequests) {
                 if (r.uid == rm.uid) {
                     request = r;
                     break;
@@ -162,19 +130,49 @@
                 
request.cb.handleResult(AbsoluteTime.fromNetwork(rm.expiration), rm.key, null, 
null, BlockType.TEST,
                         rm.data);
             }
-
-            if (!activeRequests.isEmpty()) {
-                receiveHandle = client.receive(RelativeTime.FOREVER, rh);
-            }
         }
 
         @Override
         public void handleError() {
+            throw new AssertionError("not handled");
         }
     }
 
-    private final ResponseDispatcher rh = new ResponseDispatcher();
+    /**
+     * Put data into the dht.
+     *
+     * @param key              key key to store the data under
+     * @param data             data data to store
+     * @param replicationLevel how many peers should store this value
+     * @param routeOptions     additional options
+     * @param type             type of the data to store
+     * @param expiration       how long should the value be stored? TODO: what 
is the maximum?
+     * @param timeout          how long after we give up on storing the value?
+     * @param cont             called after the put operation failed or 
succeeded
+     */
+    public void put(HashCode key, byte[] data, int replicationLevel, 
Set<RouteOption> routeOptions,
+                    int type, AbsoluteTime expiration,
+                    RelativeTime timeout, final Continuation cont) {
 
+        if (currentPutRequest != null) {
+            throw new AssertionError("only one put may be active at a time");
+        }
+
+        PutRequest pr = new PutRequest();
+        pr.key = key;
+        pr.data = data;
+        pr.replicationLevel = replicationLevel;
+        pr.deadline = timeout.toAbsolute();
+        pr.expiration = expiration;
+        pr.type = type;
+        pr.cont = cont;
+
+        currentPutRequest = pr;
+
+        requestQueue.add(pr);
+    }
+
+
     /**
      * Request results from the DHT.
      *
@@ -191,55 +189,12 @@
                                int replication, EnumSet<RouteOption> 
routeOptions,
                                byte[] xquery, ResultCallback cb) {
 
-        final GetRequest request = new GetRequest();
-        request.key = key;
-        request.cb = cb;
-        request.deadline = timeout.toAbsolute();
+        final GetRequest getRequest = new GetRequest();
+        getRequest.key = key;
+        getRequest.cb = cb;
+        getRequest.deadline = timeout.toAbsolute();
 
-        final ClientGetMessage getMessage = new ClientGetMessage();
-        getMessage.desiredReplicationLevel = replication;
-        getMessage.key = key;
-        getMessage.options = 0;
-        getMessage.xquery = xquery;
-        getMessage.type = type.val;
-        getMessage.uniqueId = request.uid;
-
-
-        activeRequests.add(request);
-
-        if (receiveHandle == null) {
-            receiveHandle = client.receive(RelativeTime.FOREVER, rh);
-        }
-
-        client.notifyTransmitReady(timeout, false, 0, new MessageTransmitter() 
{
-            @Override
-            public void transmit(Connection.MessageSink sink) {
-                sink.send(getMessage);
-            }
-
-            @Override
-            public void handleError() {
-                if (!request.canceled) {
-                    request.cancel();
-                }
-                if (activeRequests.isEmpty()) {
-                    client.disconnect();
-                }
-            }
-        });
-
-        Scheduler.addDelayed(timeout, new Scheduler.Task() {
-            @Override
-            public void run(Scheduler.RunContext ctx) {
-                logger.debug("timeout task has been run");
-                // maybe already canceled by notifyTransmitReady timeout
-                if (!request.canceled) {
-                    request.cancel();
-                }
-            }
-        });
-
-        return request;
+        return requestQueue.add(getRequest);
     }
 
     public Cancelable monitorStart(int blockType, HashCode key, 
MonitorGetHandler getHandler,
@@ -259,20 +214,13 @@
 
 
     /**
-     * not yet implemented
-     *
-     * @return a handle to cancel the monitoring
-     */
-    public Cancelable startMonitor() {
-        return null;
-    }
-
-
-    /**
      * Destroy the connection to the service.
      */
     public void destroy() {
+        // there's nothing to sync, just destroy!
+        requestQueue.destroy();
         client.disconnect();
+        System.out.println("the end!!!");
     }
 
     public static void main(String[] args) {
@@ -339,6 +287,7 @@
                             } else {
                                 System.out.println("error");
                             }
+                            dht.destroy();
                         }
                     });
                 } else {
@@ -370,4 +319,6 @@
     private class MonitorGetResponseHandler {}
 
     private class MonitorPutHandler {}
+
+
 }

Added: gnunet-java/src/org/gnunet/dht/ResultCallback.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ResultCallback.java                          
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ResultCallback.java  2012-05-09 13:02:28 UTC 
(rev 21380)
@@ -0,0 +1,26 @@
+package org.gnunet.dht;
+
+import org.gnunet.util.AbsoluteTime;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+import java.util.List;
+
+/**
+ * Callback object for requests to the dht
+ */
+public interface ResultCallback {
+    /**
+     * Called when the dht returns a result
+     *
+     * @param expiration expiration of the returned entry
+     * @param key        key of the returned entry
+     * @param getPath    put path of the returned entry
+     * @param putPath    put path of the returned entry
+     * @param type       type of data in the entry
+     * @param data       data of the returned entry
+     */
+    public void handleResult(AbsoluteTime expiration, HashCode key,
+                             List<PeerIdentity> getPath, List<PeerIdentity> 
putPath,
+                             BlockType type, byte[] data);
+}

Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-05-09 
12:56:50 UTC (rev 21379)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java       2012-05-09 
13:02:28 UTC (rev 21380)
@@ -46,12 +46,11 @@
      */
     private Continuation currentGetContinuation;
 
+    /**
+     * List of all watch requests, canceled watch requests are null (but stay 
due to protocol limitations)
+     */
     ArrayList<StatisticsWatchRequest> watchRequests = new 
ArrayList<StatisticsWatchRequest>();
 
-    public interface StatisticsReceiver {
-        public void onReceive(String subsystem, String name, long value);
-    }
-
     /**
      * A request to the statistics service.
      */
@@ -277,8 +276,8 @@
      */
     public void destroy() {
         // the request queue handles the destruction, maybe we still have 
important messages pending etc.
-        requestQueue.add(new TESTRequest());
-        requestQueue.destroy();
+        requestQueue.sendTEST(SET_TIMEOUT.toAbsolute());
+        requestQueue.shutdown();
     }
 
 
@@ -369,20 +368,4 @@
         }.start();
     }
 
-
-    private class TESTRequest extends RequestQueue.Request {
-        public boolean onDestroy() {
-            // keep on destroy
-            return true;
-        }
-
-        public AbsoluteTime getDeadline() {
-            return SET_TIMEOUT.toAbsolute();
-        }
-
-        public void transmit(Connection.MessageSink sink) {
-            sink.send(new TESTMessage());
-            // todo: disconnect when not receiving the TEST message back after 
timeout
-        }
-    }
 }

Added: gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java               
                (rev 0)
+++ gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java       
2012-05-09 13:02:28 UTC (rev 21380)
@@ -0,0 +1,7 @@
+package org.gnunet.statistics;
+
+
+
+public interface StatisticsReceiver {
+    public void onReceive(String subsystem, String name, long value);
+}

Modified: gnunet-java/src/org/gnunet/util/HashCode.java
===================================================================
--- gnunet-java/src/org/gnunet/util/HashCode.java       2012-05-09 12:56:50 UTC 
(rev 21379)
+++ gnunet-java/src/org/gnunet/util/HashCode.java       2012-05-09 13:02:28 UTC 
(rev 21380)
@@ -14,7 +14,7 @@
 public class HashCode implements Message {
 
     @FixedSizeByteArray(length = 64)
-    private final byte[] data;
+    public final byte[] data;
 
     public HashCode(byte[] hash) {
         if (hash.length != 64) {

Modified: gnunet-java/src/org/gnunet/util/RequestQueue.java
===================================================================
--- gnunet-java/src/org/gnunet/util/RequestQueue.java   2012-05-09 12:56:50 UTC 
(rev 21379)
+++ gnunet-java/src/org/gnunet/util/RequestQueue.java   2012-05-09 13:02:28 UTC 
(rev 21380)
@@ -40,7 +40,7 @@
      */
     private Cancelable currentReceive;
 
-    private boolean destroyRequested = false;
+    private boolean destroyed = false;
     private final Client client;
 
 
@@ -122,7 +122,7 @@
     }
 
     private void handleReceive() {
-        if (currentReceive != null || destroyRequested) {
+        if (currentReceive != null || destroyed) {
             return;
         }
         currentReceive = client.receive(RelativeTime.FOREVER, new 
MessageReceiver() {
@@ -192,9 +192,35 @@
         handleNextTransmit();
     }
 
-    public void destroy() {
-        destroyRequested = true;
 
+    private static class TESTRequest extends RequestQueue.Request {
+        private AbsoluteTime deadline;
+
+        public TESTRequest(AbsoluteTime deadline) {
+            this.deadline = deadline;
+        }
+
+        public boolean onDestroy() {
+            // keep on destroy
+            return true;
+        }
+
+        public AbsoluteTime getDeadline() {
+            return deadline;
+        }
+
+        public void transmit(Connection.MessageSink sink) {
+            sink.send(new TESTMessage());
+            // todo: disconnect when not receiving the TEST message back after 
timeout
+        }
+    }
+
+    public void sendTEST(AbsoluteTime deadline) {
+        add(new TESTRequest(deadline));
+    }
+
+    public void shutdown() {
+
         final LinkedList<Request> remove = new LinkedList<Request>();
 
         for (Request r : allRequests) {
@@ -215,4 +241,17 @@
         handleNextTransmit();
         handleReceive();
     }
+
+    public void destroy() {
+        destroyed = true;
+        allRequests.clear();
+        persistentRequests.clear();
+        requestsAwaitingTransmit.clear();
+        if (currentTransmit != null) {
+            currentTransmit.cancel();
+        }
+        if (currentReceive != null) {
+            currentReceive.cancel();
+        }
+    }
 }




reply via email to

[Prev in Thread] Current Thread [Next in Thread]