[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r25845 - in gnunet-java/src/org/gnunet: mesh requests util
From: |
gnunet |
Subject: |
[GNUnet-SVN] r25845 - in gnunet-java/src/org/gnunet: mesh requests util |
Date: |
Mon, 21 Jan 2013 16:43:14 +0100 |
Author: dold
Date: 2013-01-21 16:43:13 +0100 (Mon, 21 Jan 2013)
New Revision: 25845
Added:
gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java
Modified:
gnunet-java/src/org/gnunet/mesh/Mesh.java
gnunet-java/src/org/gnunet/requests/Request.java
gnunet-java/src/org/gnunet/util/Scheduler.java
Log:
fix
Modified: gnunet-java/src/org/gnunet/mesh/Mesh.java
===================================================================
--- gnunet-java/src/org/gnunet/mesh/Mesh.java 2013-01-21 15:36:00 UTC (rev
25844)
+++ gnunet-java/src/org/gnunet/mesh/Mesh.java 2013-01-21 15:43:13 UTC (rev
25845)
@@ -22,6 +22,7 @@
import com.google.common.collect.Maps;
import org.gnunet.construct.Construct;
+import org.gnunet.requests.FixedMessageRequest;
import org.gnunet.requests.Request;
import org.gnunet.requests.RequestQueue;
import org.gnunet.util.*;
@@ -39,9 +40,17 @@
private static final Logger logger = LoggerFactory
.getLogger(Mesh.class);
+ /**
+ * How many messages can we send to the service until we have to wait for
an ACK from it?
+ */
private static final int INITIAL_WINDOW_SIZE = 8;
-
+ /**
+ * Requests queued to be sent to the mesh service.
+ */
private RequestQueue requestQueue;
+ /**
+ * Called whenever a tunnel was destroyed.
+ */
private TunnelEndHandler tunnelEndHandler;
private MeshRunabout messageReceiver;
private int[] applications;
@@ -62,52 +71,45 @@
public DisconnectHandler disconnectHandler;
public ConnectHandler connectHandler;
- class ConnectByTypeRequest extends Request {
- int appType;
- @Override
- public void transmit(Connection.MessageSink sink) {
- ConnectPeerByTypeMessage m = new ConnectPeerByTypeMessage();
- m.applicationType = appType;
- m.tunnelId = tunnelId;
- sink.send(m);
- }
- }
-
public void addPeer(PeerIdentity peerIdentity) {
throw new UnsupportedOperationException("not implemented");
}
+
/**
* Request that the given peer isn't added to this tunnel in calls to
* connect_by_* calls, (due to misbehaviour, bad performance, ...).
*
* @param peerIdentity peer identity of the peer which should be
blacklisted
- * for the tunnel.
+ * for the tunnel.
*/
public void blacklist(PeerIdentity peerIdentity) {
throw new UnsupportedOperationException("not implemented");
}
+
/**
* Request that the given peer isn't blacklisted anymore from this
tunnel,
* and therefore can be added in future calls to connect*.
* The peer must have been previously blacklisted for this tunnel.
*
* @param peerIdentity peer identity of the peer which shouldn't be
blacklisted
- * for the tunnel anymore.
+ * for the tunnel anymore.
*/
public void unblacklist(PeerIdentity peerIdentity) {
throw new UnsupportedOperationException("not implemented");
}
+
/**
* Request that the mesh should try to connect to a peer supporting
the given
* message type.
*
* @param appType application type that must be supported by the peer
- * (MESH should discover peer in proximity handling
this type)
+ * (MESH should discover peer in proximity handling
this type)
*/
public void requestConnectByType(int appType) {
- ConnectByTypeRequest r = new ConnectByTypeRequest();
- r.appType = appType;
- requestQueue.add(r);
+ ConnectPeerByTypeMessage m = new ConnectPeerByTypeMessage();
+ m.applicationType = appType;
+ m.tunnelId = tunnelId;
+ requestQueue.add(new FixedMessageRequest(m));
}
/**
@@ -136,7 +138,7 @@
*
* @param peer peer to remove
*/
- public void requestConnectDel (PeerIdentity peer) {
+ public void requestConnectDel(PeerIdentity peer) {
throw new UnsupportedOperationException("not implemented");
}
@@ -151,6 +153,7 @@
static class Sink implements Connection.MessageSink {
byte[] payload;
+
@Override
public void send(GnunetMessage.Body m) {
if (payload != null) {
@@ -217,14 +220,14 @@
* Only one call can be active at any time, to issue another request,
* wait for the callback or cancel the current request.
*
- * @param maxdelay how long can the message wait?
- * @param target destination for the message
- * NULL for multicast to all tunnel targets
+ * @param maxdelay how long can the message wait?
+ * @param target destination for the message
+ * NULL for multicast to all tunnel targets
* @param notify_size how many bytes of buffer space does notify want?
* @param transmitter handler to call when buffer space is available;
- * will be called with NULL on timeout or if the overall queue
- * for this peer is larger than queue_size and this is currently
- * the message with the lowest priority
+ * will be called with NULL on timeout or if the
overall queue
+ * for this peer is larger than queue_size and this
is currently
+ * the message with the lowest priority
* @return non-NULL if the notify callback was queued,
* NULL if we can not even queue the request (insufficient
* memory); if NULL is returned, "notify" will NOT be called.
@@ -395,11 +398,11 @@
/**
* Connect to the mesh service.
*
- * @param cfg configuration to use
+ * @param cfg configuration to use
* @param inboundTunnelHandler function called when an *inbound* tunnel is
created
- * @param tunnelEndHandler function called when an *inbound* tunnel is
destroyed by the
- * remote peer, it is *not* called if Tunnel.destroy
- * is called on the tunnel
+ * @param tunnelEndHandler function called when an *inbound* tunnel is
destroyed by the
+ * remote peer, it is *not* called if
Tunnel.destroy
+ * is called on the tunnel
*/
public Mesh(Configuration cfg, InboundTunnelHandler inboundTunnelHandler,
TunnelEndHandler tunnelEndHandler, MeshRunabout
messageReceiver, int... applications) {
@@ -418,8 +421,8 @@
* Create a new tunnel (we're initiator and will be allowed to add/remove
peers
* and to broadcast).
*
- * @param connectHandler callback for when a new peer connects to the
tunnel, either because the origin added him,
- * or the client joined the tunnel
+ * @param connectHandler callback for when a new peer connects to the
tunnel, either because the origin added him,
+ * or the client joined the tunnel
* @param disconnectHandler callback for when when a peer is disconnected
*/
public OriginTunnel createTunnel(ConnectHandler connectHandler,
DisconnectHandler disconnectHandler) {
@@ -436,7 +439,7 @@
/**
* Announce to ther peer the availability of services described by the
regex,
* in order to be reachable to other peers via connect_by_string.
- *
+ * <p/>
* Note that the first 8 characters are considered to be part of a prefix,
* (for instance 'gnunet://'). If you put a variable part in there (*, +.
()),
* all matching strings will be stored in the DHT.
Added: gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java
===================================================================
--- gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java
(rev 0)
+++ gnunet-java/src/org/gnunet/requests/FixedMessageRequest.java
2013-01-21 15:43:13 UTC (rev 25845)
@@ -0,0 +1,23 @@
+package org.gnunet.requests;
+
+import org.gnunet.util.Connection;
+import org.gnunet.util.GnunetMessage;
+
+/**
+ * A request that sends a message, pre-determined at construction of the
FixedMessageRequest.
+ *
+ * @author Florian Dold
+ */
+public class FixedMessageRequest extends Request {
+ private final GnunetMessage.Body msg;
+
+ public FixedMessageRequest(GnunetMessage.Body msg) {
+ this.msg = msg;
+ }
+
+ final
+ @Override
+ public void transmit(Connection.MessageSink sink) {
+ sink.send(msg);
+ }
+}
Modified: gnunet-java/src/org/gnunet/requests/Request.java
===================================================================
--- gnunet-java/src/org/gnunet/requests/Request.java 2013-01-21 15:36:00 UTC
(rev 25844)
+++ gnunet-java/src/org/gnunet/requests/Request.java 2013-01-21 15:43:13 UTC
(rev 25845)
@@ -5,9 +5,26 @@
import org.gnunet.util.RelativeTime;
/**
-* ...
-*
-* @author Florian Dold
+ * Abstract base class for a Request.
+ *
+ * Every request defines what happens when one of the following happens:
+ * <ul>
+ * <li>
+ * The request is canceled. There may be some cleanup necessary,
depending on whether the request has already been
+ * sent to the service or not.
+ * </li>
+ * <li>
+ * On timeout.
+ * </li>
+ * <li>
+ * On reconnect. In particular, every Request has to decide whether it
will be kept after a reconnect to the service.
+ * </li>
+ * <li>
+ * On destruction of the request queue. Some request may be important
enough to delay the destruction until they have been sent.
+ * </li>
+ * </ul>
+ *
+ * @author Florian Dold
*/
public abstract class Request {
protected AbsoluteTime deadline = AbsoluteTime.FOREVER;
Modified: gnunet-java/src/org/gnunet/util/Scheduler.java
===================================================================
--- gnunet-java/src/org/gnunet/util/Scheduler.java 2013-01-21 15:36:00 UTC
(rev 25844)
+++ gnunet-java/src/org/gnunet/util/Scheduler.java 2013-01-21 15:43:13 UTC
(rev 25845)
@@ -45,12 +45,12 @@
private static int readyCount = 0;
// for every priority, there is a list of tasks that is definitely ready
to run
- final private static ArrayList<LinkedList<TaskConfiguration>> ready = new
ArrayList<LinkedList<TaskConfiguration>>
- (Priority.size);
+ @SuppressWarnings("unchecked")
+ final private static LinkedList<TaskConfiguration>[] readyLists = new
LinkedList[Priority.numberOfPriorities];
static {
- for (int i = 0; i < Priority.size; ++i) {
- ready.add(new LinkedList<TaskConfiguration>());
+ for (int i = 0; i < Priority.numberOfPriorities; ++i) {
+ readyLists[i] = new LinkedList<TaskConfiguration>();
}
}
@@ -61,11 +61,11 @@
Reason.ACCEPT_READY, Reason.CONNECT_READY};
+ /**
+ * Selector, used to check file descriptors for readiness.
+ */
private static Selector selector = null;
- private static boolean scheduler_running = false;
-
-
static {
try {
selector = SelectorProvider.provider().openSelector();
@@ -76,13 +76,36 @@
}
}
+ /**
+ * true iff the scheduler is currently running.
+ */
+ private static boolean scheduler_running = false;
+
+
+ // tasks that are waiting for an event, which are executed anyway after
the deadline has occurred
+ final private static Queue<TaskConfiguration> pending = new
PriorityQueue<TaskConfiguration>(5, new Comparator
+ <TaskConfiguration>() {
+ @Override
+ public int compare(TaskConfiguration a, TaskConfiguration b) {
+ return a.deadline.compareTo(b.deadline);
+ }
+ });
+
+
+ /**
+ * Priority for Tasks.
+ */
public enum Priority {
IDLE, BACKGROUND, DEFAULT, HIGH, UI, URGENT, SHUTDOWN;
- private static final int size = Priority.values().length;
- public static final Priority KEEP = null;
+
+ // how many different priorities do we have?
+ private static final int numberOfPriorities = Priority.values().length;
}
+ /**
+ * Reasons for executing a task.
+ */
public enum Reason {
STARTUP, SHUTDOWN, TIMEOUT, READ_READY, WRITE_READY, ACCEPT_READY,
CONNECT_READY
}
@@ -98,10 +121,6 @@
public RunContext() {
}
-
- public RunContext(Set<Reason> reasons) {
- this.reasons = reasons;
- }
}
/**
@@ -130,7 +149,7 @@
/**
* Create a TaskIdentifier.
*
- * @param task
+ * @param task task to run with this TaskIdentifier
*/
TaskConfiguration(RelativeTime delay, Task task) {
this.task = task;
@@ -257,20 +276,6 @@
}
}
- // tasks that are waiting for an event, which are executed anyway after
the deadline has occurred
- final private static Queue<TaskConfiguration> pending = new
PriorityQueue<TaskConfiguration>(5, new Comparator
- <TaskConfiguration>() {
- @Override
- public int compare(TaskConfiguration a, TaskConfiguration b) {
- return a.deadline.compareTo(b.deadline);
- }
- });
-
-
- public static boolean getCurrentLifeness() {
- return (activeTask == null) || activeTask.lifeness;
- }
-
/**
* Run the task regardless of any prerequisites, before any other task of
* the same priority.
@@ -357,7 +362,7 @@
*/
private static void queueReady(TaskConfiguration tid) {
int idx = tid.priority.ordinal();
- ready.get(idx).add(tid);
+ readyLists[idx].add(tid);
readyCount++;
pending.remove(tid);
@@ -458,9 +463,8 @@
*/
public static void run(Task initialTask) {
if (scheduler_running) {
- throw new AssertionError("can't call run recursively");
+ throw new AssertionError("Scheduler already running");
}
-
scheduler_running = true;
if (initialTask != null) {
@@ -488,6 +492,27 @@
runReady();
}
+
+ if (readyCount != 0) {
+ throw new AssertionError("tasks ready after scheduler ran
(count)");
+ }
+
+ for (List readyList : Scheduler.readyLists) {
+ if (!readyList.isEmpty()) {
+ throw new AssertionError("tasks ready after scheduler ran
(list)");
+ }
+ }
+
+ if (pending.size() != 0) {
+ throw new AssertionError("pending tasks after scheduler ran");
+ }
+
+ if (activeTask != null) {
+ throw new AssertionError("active task after scheduler ran");
+ }
+
+
+ scheduler_running = false;
}
@@ -504,9 +529,9 @@
return;
}
// start executing from the highest priority down to 0
- for (int p = Priority.size - 1; p >= 0; p--) {
+ for (int p = Priority.numberOfPriorities - 1; p >= 0; p--) {
// execute all tasks with priority p
- LinkedList<TaskConfiguration> queue = ready.get(p);
+ LinkedList<TaskConfiguration> queue = readyLists[p];
while (!queue.isEmpty()) {
TaskConfiguration tid = queue.removeFirst();
readyCount--;
@@ -518,7 +543,7 @@
}
/**
- * Request the disconnect of a scheduler. Marks all currently pending
tasks as
+ * Request the shutdown of a scheduler. Marks all currently pending tasks
as
* ready because of disconnect. This will cause all tasks to run (as soon
as
* possible, respecting priorities and prerequisite tasks). Note that tasks
* scheduled AFTER this call may still be delayed arbitrarily.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r25845 - in gnunet-java/src/org/gnunet: mesh requests util,
gnunet <=