gzz-commits
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Gzz-commits] storm/org/nongnu/storm/modules/gispmap GispP2PM...


From: Benja Fallenstein
Subject: [Gzz-commits] storm/org/nongnu/storm/modules/gispmap GispP2PM...
Date: Sun, 11 May 2003 13:33:39 -0400

CVSROOT:        /cvsroot/storm
Module name:    storm
Changes by:     Benja Fallenstein <address@hidden>      03/05/11 13:33:39

Modified files:
        org/nongnu/storm/modules/gispmap: GispP2PMap.java 

Log message:
        Republish GispP2PMap contents after 25 minutes

CVSWeb URLs:
http://savannah.gnu.org/cgi-bin/viewcvs/storm/storm/org/nongnu/storm/modules/gispmap/GispP2PMap.java.diff?tr1=1.10&tr2=1.11&r1=text&r2=text

Patches:
Index: storm/org/nongnu/storm/modules/gispmap/GispP2PMap.java
diff -u storm/org/nongnu/storm/modules/gispmap/GispP2PMap.java:1.10 
storm/org/nongnu/storm/modules/gispmap/GispP2PMap.java:1.11
--- storm/org/nongnu/storm/modules/gispmap/GispP2PMap.java:1.10 Wed May  7 
06:55:10 2003
+++ storm/org/nongnu/storm/modules/gispmap/GispP2PMap.java      Sun May 11 
13:33:39 2003
@@ -32,6 +32,7 @@
 import org.nongnu.storm.util.ByteArrayKey;
 
 import java.io.*;
+import java.lang.ref.SoftReference;
 import java.util.*;
 import com.axlight.jnushare.gisp.*;
 
@@ -44,10 +45,11 @@
  *  That's also the main reason this is in a module.
  */
 public class GispP2PMap implements P2PMap {
-    static public boolean dbg = false;
+    static public boolean dbg = true;
     static private void pa(String s) { System.out.println(s); }
 
     protected GISP gisp;
+    protected Set entries = new HashSet();
     protected Map cache = new HashMap();
     
     /** The "STORM" port. */
@@ -59,8 +61,12 @@
        if(seedAddresses != null)
            gisp.addSeedAddresses(seedAddresses);
        gisp.start(new String[] {"strength_min=1"});
+
+       // thread used to republish items into the hashtable
+       // GISP drops items after a 30 minute timeout
+       keepaliveThread.start();
                
-       /** Doesn't work...maybe too quicly requested   
+       /** Doesn't work...maybe requested too quickly
         pa("Number of known peers in the network: " + 
String.valueOf(this.gisp.getNumOfPeers()));
        */      
     }
@@ -83,8 +89,21 @@
     public void put(String key, String value) throws IOException {
        gisp.insert(key, value);
        cache.remove(key);
+       synchronized(entries) {
+           entries.add(new Entry(key, value));
+       }
+    }
 
-       // XXX keep alive after ttl...
+    /** Publish all entries from the map in the DHT again.
+     *  Used to keep them alive after timeout.
+     */
+    public void republish() {
+       synchronized(entries) {
+           for(Iterator i=entries.iterator(); i.hasNext();) {
+               Entry e = (Entry)i.next();
+               gisp.insert(e.key, e.value);
+           }
+       }
     }
 
     public Collector get(String key) throws IOException {
@@ -121,6 +140,37 @@
        public void queryExpired() {
            finish(true);
        }
+    }
+
+    protected class Entry {
+       protected final String key, value;
+       protected Entry(String k, String v) { key=k; value=v; }
+    }
+
+    protected Thread keepaliveThread = new Thread(new Keepalive(this));
+    protected static class Keepalive implements Runnable {
+       SoftReference ref;
+       protected Keepalive(GispP2PMap map) {
+           this.ref = new SoftReference(map);
+       }
+       public void run() {
+           while(true) {
+               // wait 25 minutes before republishing everything
+               // GISP timeout is 30 minutes
+               try {
+                   Thread.sleep(25 * 60 * 1000);
+               } catch(InterruptedException _) {}
+               GispP2PMap map = (GispP2PMap)ref.get();
+               if(map == null) break;
+               if(dbg) pa("Republish GispP2PMap");
+               map.republish();
+           }
+           if(dbg) pa("GispP2PMap object garbage collected");
+       }
+    }
+
+    protected void finalize() {
+       keepaliveThread.interrupt();
     }
 
     public static void main(String argv[]) throws Exception { 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]