[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r21380 - in gnunet-java/src/org/gnunet: dht statistics util
From: |
gnunet |
Subject: |
[GNUnet-SVN] r21380 - in gnunet-java/src/org/gnunet: dht statistics util |
Date: |
Wed, 9 May 2012 15:02:28 +0200 |
Author: dold
Date: 2012-05-09 15:02:28 +0200 (Wed, 09 May 2012)
New Revision: 21380
Added:
gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java
gnunet-java/src/org/gnunet/dht/ResultCallback.java
gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java
Modified:
gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
gnunet-java/src/org/gnunet/statistics/Statistics.java
gnunet-java/src/org/gnunet/util/HashCode.java
gnunet-java/src/org/gnunet/util/RequestQueue.java
Log:
rewritten dht put for the new protocol, using the MessageQueue
Copied: gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java (from
rev 21245, gnunet-java/src/org/gnunet/dht/ClientPutMessage.java)
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ClientPutConfirmationMessage.java
2012-05-09 13:02:28 UTC (rev 21380)
@@ -0,0 +1,18 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UInt64;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+
+
address@hidden(155)
+public class ClientPutConfirmationMessage implements GnunetMessage.Body {
+ @UInt32
+ public int reserved;
+ /**
+ * UID used to identify request with the response
+ */
+ @UInt64
+ public long uid;
+}
Modified: gnunet-java/src/org/gnunet/dht/ClientPutMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ClientPutMessage.java 2012-05-09
12:56:50 UTC (rev 21379)
+++ gnunet-java/src/org/gnunet/dht/ClientPutMessage.java 2012-05-09
13:02:28 UTC (rev 21380)
@@ -1,20 +1,11 @@
package org.gnunet.dht;
-import org.gnunet.construct.ByteFill;
-import org.gnunet.construct.NestedMessage;
-import org.gnunet.construct.UInt32;
-import org.gnunet.construct.UnionCase;
+import org.gnunet.construct.*;
import org.gnunet.util.AbsoluteTimeMessage;
import org.gnunet.util.GnunetMessage;
import org.gnunet.util.HashCode;
-/**
-* Created with IntelliJ IDEA.
-* User: dold
-* Date: 5/2/12
-* Time: 7:05 PM
-* To change this template use File | Settings | File Templates.
-*/
+
@UnionCase(142)
public class ClientPutMessage implements GnunetMessage.Body {
/**
@@ -29,6 +20,11 @@
public int options;
@UInt32
public int desiredReplicationLevel;
+ /**
+ * UID used to identify request with the response
+ */
+ @UInt64
+ public long uid;
@NestedMessage
public AbsoluteTimeMessage expiration;
@NestedMessage
Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-05-09
12:56:50 UTC (rev 21379)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-05-09
13:02:28 UTC (rev 21380)
@@ -6,98 +6,85 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
/**
* API for the gnunet dht service.
* <p/>
* Stores data under a key, distributed across the network.
* <p/>
- * TODO: implement monitoring
*/
public class DistributedHashTable {
private static final Logger logger = LoggerFactory
.getLogger(DistributedHashTable.class);
- private Configuration cfg;
+ private Client client;
+
private RequestQueue requestQueue;
/**
- * Callback object for requests to the dht
+ * next UID used on get/monitor requests, incremented after each use.
*/
- public interface ResultCallback {
- /**
- * Called when the dht returns a result
- *
- * @param expiration expiration of the returned entry
- * @param key key of the returned entry
- * @param getPath put path of the returned entry
- * @param putPath put path of the returned entry
- * @param type type of data in the entry
- * @param data data of the returned entry
- */
- public void handleResult(AbsoluteTime expiration, HashCode key,
- List<PeerIdentity> getPath,
List<PeerIdentity> putPath,
- BlockType type, byte[] data);
- }
+ private long nextUid = 1;
- private Client client;
- private long uid = 1;
+ private List<GetRequest> activeGetRequests = new ArrayList<GetRequest>(5);
+
/**
+ * put request awaiting response. may be null if no put request is active.
+ */
+ private PutRequest currentPutRequest;
+
+
+ /**
* Create a connection with the DHT service.
*
* @param cfg the configuration to use
*/
public DistributedHashTable(Configuration cfg) {
- this.cfg = cfg;
client = new Client("dht", cfg);
+ requestQueue = new RequestQueue(client, new DHTMessageReceiver());
}
+ private class PutRequest extends RequestQueue.Request {
+ public AbsoluteTime deadline;
+ public byte[] data;
+ public HashCode key;
+ public int replicationLevel;
+ public AbsoluteTime expiration;
+ public int type;
+ public Continuation cont;
+ public long uid;
- /**
- * Put data into the dht. Put request cannot be queued.
- *
- * @param key key key to store the data under
- * @param data data data to store
- * @param replicationLevel how many peers should store this value
- * @param routeOptions additional options
- * @param type type of the data to store
- * @param expiration how long should the value be stored? TODO: what
is the maximum?
- * @param timeout how long after we give up on storing the value?
- * @param cont called after the put operation failed or
succeeded
- */
- public void put(HashCode key, byte[] data, int replicationLevel,
Set<RouteOption> routeOptions,
- int type, AbsoluteTime expiration,
- RelativeTime timeout, final Continuation cont) {
- final ClientPutMessage cpm = new ClientPutMessage();
- cpm.data = data;
- cpm.hash = key;
- cpm.desiredReplicationLevel = replicationLevel;
- cpm.expiration = expiration.asMessage();
- cpm.type = type;
+ public PutRequest() {
+ this.uid = nextUid++;
+ }
- client.notifyTransmitReady(timeout, true,
- 0, new MessageTransmitter() {
- @Override
- public void transmit(Connection.MessageSink sink) {
- sink.send(cpm);
- // is there no way to get an ack from the dht service?
- cont.cont(true);
- }
+ @Override
+ public AbsoluteTime getDeadline() {
+ return deadline;
+ }
- @Override
- public void handleError() {
- cont.cont(false);
- }
- });
+ @Override
+ public void transmit(Connection.MessageSink sink) {
+ final ClientPutMessage cpm = new ClientPutMessage();
+ cpm.data = data;
+ cpm.hash = key;
+ cpm.desiredReplicationLevel = replicationLevel;
+ cpm.expiration = expiration.asMessage();
+ cpm.type = type;
+ cpm.uid = uid;
+ sink.send(cpm);
+ }
+
+ public boolean onDestroy() {
+ return true; // keep!
+ }
}
- private class GetRequest implements Cancelable {
+
+ private class GetRequest extends RequestQueue.Request {
public long uid;
public HashCode key;
public ResultCallback cb;
@@ -105,52 +92,33 @@
public boolean canceled = false;
public GetRequest() {
- uid = DistributedHashTable.this.uid++;
+ uid = DistributedHashTable.this.nextUid++;
}
- public void cancel() {
- if (!activeRequests.contains(this)) {
- throw new AssertionError("cancel called on invalid request");
- }
+ @Override
+ public AbsoluteTime getDeadline() {
+ return deadline;
+ }
- canceled = true;
+ @Override
+ public void transmit(Connection.MessageSink sink) {
+ }
+ }
- activeRequests.remove(this);
- if (activeRequests.isEmpty()) {
- receiveHandle.cancel();
+ public class DHTMessageReceiver extends RunaboutMessageReceiver {
+ public void visit(ClientPutConfirmationMessage pcm) {
+ if (currentPutRequest == null || pcm.uid != currentPutRequest.uid)
{
+ logger.warn("DHT service got confused with UIDs");
+ return;
}
-
- client.notifyTransmitReady(RelativeTime.FOREVER, false, 0, new
MessageTransmitter() {
- @Override
- public void transmit(Connection.MessageSink sink) {
- final ClientGetStopMessage sm = new ClientGetStopMessage();
- sm.key = key;
- sm.unique_id = uid;
- sink.send(sm);
- }
-
- @Override
- public void handleError() {
- }
- });
+ currentPutRequest.cont.cont(true);
}
- }
- private List<GetRequest> activeRequests = new ArrayList<GetRequest>(5);
-
- private Cancelable receiveHandle = null;
-
-
- private class ResponseDispatcher implements MessageReceiver {
- @Override
- public void process(GnunetMessage.Body msg) {
- receiveHandle = null;
-
- ClientResultMessage rm = (ClientResultMessage) msg;
+ public void visit(ClientResultMessage rm) {
GetRequest request = null;
- for (GetRequest r : activeRequests) {
+ for (GetRequest r : activeGetRequests) {
if (r.uid == rm.uid) {
request = r;
break;
@@ -162,19 +130,49 @@
request.cb.handleResult(AbsoluteTime.fromNetwork(rm.expiration), rm.key, null,
null, BlockType.TEST,
rm.data);
}
-
- if (!activeRequests.isEmpty()) {
- receiveHandle = client.receive(RelativeTime.FOREVER, rh);
- }
}
@Override
public void handleError() {
+ throw new AssertionError("not handled");
}
}
- private final ResponseDispatcher rh = new ResponseDispatcher();
+ /**
+ * Put data into the dht.
+ *
+ * @param key key key to store the data under
+ * @param data data data to store
+ * @param replicationLevel how many peers should store this value
+ * @param routeOptions additional options
+ * @param type type of the data to store
+ * @param expiration how long should the value be stored? TODO: what
is the maximum?
+ * @param timeout how long after we give up on storing the value?
+ * @param cont called after the put operation failed or
succeeded
+ */
+ public void put(HashCode key, byte[] data, int replicationLevel,
Set<RouteOption> routeOptions,
+ int type, AbsoluteTime expiration,
+ RelativeTime timeout, final Continuation cont) {
+ if (currentPutRequest != null) {
+ throw new AssertionError("only one put may be active at a time");
+ }
+
+ PutRequest pr = new PutRequest();
+ pr.key = key;
+ pr.data = data;
+ pr.replicationLevel = replicationLevel;
+ pr.deadline = timeout.toAbsolute();
+ pr.expiration = expiration;
+ pr.type = type;
+ pr.cont = cont;
+
+ currentPutRequest = pr;
+
+ requestQueue.add(pr);
+ }
+
+
/**
* Request results from the DHT.
*
@@ -191,55 +189,12 @@
int replication, EnumSet<RouteOption>
routeOptions,
byte[] xquery, ResultCallback cb) {
- final GetRequest request = new GetRequest();
- request.key = key;
- request.cb = cb;
- request.deadline = timeout.toAbsolute();
+ final GetRequest getRequest = new GetRequest();
+ getRequest.key = key;
+ getRequest.cb = cb;
+ getRequest.deadline = timeout.toAbsolute();
- final ClientGetMessage getMessage = new ClientGetMessage();
- getMessage.desiredReplicationLevel = replication;
- getMessage.key = key;
- getMessage.options = 0;
- getMessage.xquery = xquery;
- getMessage.type = type.val;
- getMessage.uniqueId = request.uid;
-
-
- activeRequests.add(request);
-
- if (receiveHandle == null) {
- receiveHandle = client.receive(RelativeTime.FOREVER, rh);
- }
-
- client.notifyTransmitReady(timeout, false, 0, new MessageTransmitter()
{
- @Override
- public void transmit(Connection.MessageSink sink) {
- sink.send(getMessage);
- }
-
- @Override
- public void handleError() {
- if (!request.canceled) {
- request.cancel();
- }
- if (activeRequests.isEmpty()) {
- client.disconnect();
- }
- }
- });
-
- Scheduler.addDelayed(timeout, new Scheduler.Task() {
- @Override
- public void run(Scheduler.RunContext ctx) {
- logger.debug("timeout task has been run");
- // maybe already canceled by notifyTransmitReady timeout
- if (!request.canceled) {
- request.cancel();
- }
- }
- });
-
- return request;
+ return requestQueue.add(getRequest);
}
public Cancelable monitorStart(int blockType, HashCode key,
MonitorGetHandler getHandler,
@@ -259,20 +214,13 @@
/**
- * not yet implemented
- *
- * @return a handle to cancel the monitoring
- */
- public Cancelable startMonitor() {
- return null;
- }
-
-
- /**
* Destroy the connection to the service.
*/
public void destroy() {
+ // there's nothing to sync, just destroy!
+ requestQueue.destroy();
client.disconnect();
+ System.out.println("the end!!!");
}
public static void main(String[] args) {
@@ -339,6 +287,7 @@
} else {
System.out.println("error");
}
+ dht.destroy();
}
});
} else {
@@ -370,4 +319,6 @@
private class MonitorGetResponseHandler {}
private class MonitorPutHandler {}
+
+
}
Added: gnunet-java/src/org/gnunet/dht/ResultCallback.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/ResultCallback.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/ResultCallback.java 2012-05-09 13:02:28 UTC
(rev 21380)
@@ -0,0 +1,26 @@
+package org.gnunet.dht;
+
+import org.gnunet.util.AbsoluteTime;
+import org.gnunet.util.HashCode;
+import org.gnunet.util.PeerIdentity;
+
+import java.util.List;
+
+/**
+ * Callback object for requests to the dht
+ */
+public interface ResultCallback {
+ /**
+ * Called when the dht returns a result
+ *
+ * @param expiration expiration of the returned entry
+ * @param key key of the returned entry
+ * @param getPath put path of the returned entry
+ * @param putPath put path of the returned entry
+ * @param type type of data in the entry
+ * @param data data of the returned entry
+ */
+ public void handleResult(AbsoluteTime expiration, HashCode key,
+ List<PeerIdentity> getPath, List<PeerIdentity>
putPath,
+ BlockType type, byte[] data);
+}
Modified: gnunet-java/src/org/gnunet/statistics/Statistics.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/Statistics.java 2012-05-09
12:56:50 UTC (rev 21379)
+++ gnunet-java/src/org/gnunet/statistics/Statistics.java 2012-05-09
13:02:28 UTC (rev 21380)
@@ -46,12 +46,11 @@
*/
private Continuation currentGetContinuation;
+ /**
+ * List of all watch requests, canceled watch requests are null (but stay
due to protocol limitations)
+ */
ArrayList<StatisticsWatchRequest> watchRequests = new
ArrayList<StatisticsWatchRequest>();
- public interface StatisticsReceiver {
- public void onReceive(String subsystem, String name, long value);
- }
-
/**
* A request to the statistics service.
*/
@@ -277,8 +276,8 @@
*/
public void destroy() {
// the request queue handles the destruction, maybe we still have
important messages pending etc.
- requestQueue.add(new TESTRequest());
- requestQueue.destroy();
+ requestQueue.sendTEST(SET_TIMEOUT.toAbsolute());
+ requestQueue.shutdown();
}
@@ -369,20 +368,4 @@
}.start();
}
-
- private class TESTRequest extends RequestQueue.Request {
- public boolean onDestroy() {
- // keep on destroy
- return true;
- }
-
- public AbsoluteTime getDeadline() {
- return SET_TIMEOUT.toAbsolute();
- }
-
- public void transmit(Connection.MessageSink sink) {
- sink.send(new TESTMessage());
- // todo: disconnect when not receiving the TEST message back after
timeout
- }
- }
}
Added: gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java
===================================================================
--- gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java
(rev 0)
+++ gnunet-java/src/org/gnunet/statistics/StatisticsReceiver.java
2012-05-09 13:02:28 UTC (rev 21380)
@@ -0,0 +1,7 @@
+package org.gnunet.statistics;
+
+
+
+public interface StatisticsReceiver {
+ public void onReceive(String subsystem, String name, long value);
+}
Modified: gnunet-java/src/org/gnunet/util/HashCode.java
===================================================================
--- gnunet-java/src/org/gnunet/util/HashCode.java 2012-05-09 12:56:50 UTC
(rev 21379)
+++ gnunet-java/src/org/gnunet/util/HashCode.java 2012-05-09 13:02:28 UTC
(rev 21380)
@@ -14,7 +14,7 @@
public class HashCode implements Message {
@FixedSizeByteArray(length = 64)
- private final byte[] data;
+ public final byte[] data;
public HashCode(byte[] hash) {
if (hash.length != 64) {
Modified: gnunet-java/src/org/gnunet/util/RequestQueue.java
===================================================================
--- gnunet-java/src/org/gnunet/util/RequestQueue.java 2012-05-09 12:56:50 UTC
(rev 21379)
+++ gnunet-java/src/org/gnunet/util/RequestQueue.java 2012-05-09 13:02:28 UTC
(rev 21380)
@@ -40,7 +40,7 @@
*/
private Cancelable currentReceive;
- private boolean destroyRequested = false;
+ private boolean destroyed = false;
private final Client client;
@@ -122,7 +122,7 @@
}
private void handleReceive() {
- if (currentReceive != null || destroyRequested) {
+ if (currentReceive != null || destroyed) {
return;
}
currentReceive = client.receive(RelativeTime.FOREVER, new
MessageReceiver() {
@@ -192,9 +192,35 @@
handleNextTransmit();
}
- public void destroy() {
- destroyRequested = true;
+ private static class TESTRequest extends RequestQueue.Request {
+ private AbsoluteTime deadline;
+
+ public TESTRequest(AbsoluteTime deadline) {
+ this.deadline = deadline;
+ }
+
+ public boolean onDestroy() {
+ // keep on destroy
+ return true;
+ }
+
+ public AbsoluteTime getDeadline() {
+ return deadline;
+ }
+
+ public void transmit(Connection.MessageSink sink) {
+ sink.send(new TESTMessage());
+ // todo: disconnect when not receiving the TEST message back after
timeout
+ }
+ }
+
+ public void sendTEST(AbsoluteTime deadline) {
+ add(new TESTRequest(deadline));
+ }
+
+ public void shutdown() {
+
final LinkedList<Request> remove = new LinkedList<Request>();
for (Request r : allRequests) {
@@ -215,4 +241,17 @@
handleNextTransmit();
handleReceive();
}
+
+ public void destroy() {
+ destroyed = true;
+ allRequests.clear();
+ persistentRequests.clear();
+ requestsAwaitingTransmit.clear();
+ if (currentTransmit != null) {
+ currentTransmit.cancel();
+ }
+ if (currentReceive != null) {
+ currentReceive.cancel();
+ }
+ }
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r21380 - in gnunet-java/src/org/gnunet: dht statistics util,
gnunet <=