gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r22959 - Extractor/src/main


From: gnunet
Subject: [GNUnet-SVN] r22959 - Extractor/src/main
Date: Sun, 29 Jul 2012 23:49:48 +0200

Author: grothoff
Date: 2012-07-29 23:49:48 +0200 (Sun, 29 Jul 2012)
New Revision: 22959

Modified:
   Extractor/src/main/extractor.c
Log:
-towards a theoretically working implementation...

Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c      2012-07-29 21:32:40 UTC (rev 22958)
+++ Extractor/src/main/extractor.c      2012-07-29 21:49:48 UTC (rev 22959)
@@ -102,10 +102,59 @@
    */
   void *proc_cls;
 
+  /**
+   * Are we done with processing this file? 0 to continue, 1 to terminate.
+   */
+  int file_finished;
+
 };
 
 
+
 /**
+ * Send a 'discard state' message to the plugin and mark it as finished
+ * for this round.
+ *
+ * @param plugin plugin to notify
+ */
+static void
+send_discard_message (struct EXTRACTOR_PluginList *plugin)
+{
+  static unsigned char disc_msg = MESSAGE_DISCARD_STATE;
+
+  if (sizeof (disc_msg) !=
+      EXTRACTOR_IPC_channel_send_ (plugin->channel,
+                                  &disc_msg,
+                                  sizeof (disc_msg)) )
+    {
+      EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
+      plugin->channel = NULL;
+      plugin->round_finished = 1;
+    }
+}
+
+
+/**
+ * We had some serious trouble.  Abort all channels.
+ *
+ * @param plugins list of plugins with channels to abort
+ */
+static void
+abort_all_channels (struct EXTRACTOR_PluginList *plugins)
+{
+  struct EXTRACTOR_PluginList *pos;
+
+  for (pos = plugins; NULL != pos; pos = pos->next)
+    {
+      if (NULL == pos->channel)
+       continue;
+      EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+      pos->channel = NULL;
+    }
+}
+
+
+/**
  * Handler for a message from one of the plugins.
  *
  * @param cls closure with our 'struct PluginReplyProcessor'
@@ -125,9 +174,35 @@
                      const void *value,
                      const char *mime)
 {
-  // struct PluginReplyProcessor *prp = cls;
+  static unsigned char cont_msg = MESSAGE_CONTINUE_EXTRACTING;
+  struct PluginReplyProcessor *prp = cls;
 
-  // FIXME...
+  if (0 != prp->file_finished)
+    {
+      /* client already aborted, ignore message, tell plugin about abort */
+      return; 
+    }
+  if (0 != prp->proc (prp->proc_cls,
+                     plugin->short_libname,
+                     meta_type,
+                     meta_format,
+                     mime,
+                     value,
+                     value_len))
+    {
+      prp->file_finished = 1;
+      send_discard_message (plugin);
+      return;
+    }
+  if (sizeof (cont_msg) !=
+      EXTRACTOR_IPC_channel_send_ (plugin->channel,
+                                  &cont_msg,
+                                  sizeof (cont_msg)) )
+    {
+      EXTRACTOR_IPC_channel_destroy_ (plugin->channel);
+      plugin->channel = NULL;
+      plugin->round_finished = 1;
+    }
 }
 
 
@@ -152,6 +227,8 @@
   struct StartMessage start;
   struct EXTRACTOR_Channel *channel;
   struct PluginReplyProcessor prp;
+  int64_t min_seek;
+  ssize_t data_available;
   uint32_t ready;
   int done;
 
@@ -163,6 +240,7 @@
   else
     ready = 0;
 
+  prp.file_finished = 0;
   prp.proc = proc;
   prp.proc_cls = proc_cls;
 
@@ -171,51 +249,101 @@
   start.reserved = 0;
   start.reserved2 = 0;
   start.shm_ready_bytes = ready;
-  start.file_size = EXTRACTOR_datasource_get_size_ (ds);
-  {
-    struct EXTRACTOR_Channel *channels[plugin_count];
+  start.file_size = EXTRACTOR_datasource_get_size_ (ds);  
+  for (pos = plugins; NULL != pos; pos = pos->next)
+    {
+      if ( (NULL != pos->channel) &&
+          (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
+                                              &start,
+                                              sizeof (start)) ) )
+       {
+         EXTRACTOR_IPC_channel_destroy_ (pos->channel);
+         pos->channel = NULL;
+       }
+    }
+  done = 0;
+  while (! done)
+    {
+      struct EXTRACTOR_Channel *channels[plugin_count];
+      
+      /* calculate current 'channels' array */
+      plugin_count = 0;
+      for (pos = plugins; NULL != pos; pos = pos->next)
+       {
+         channels[plugin_count] = pos->channel;
+         plugin_count++;
+       }
+      
+      /* give plugins chance to send us meta data, seek or finished messages */
+      if (-1 == 
+         EXTRACTOR_IPC_channel_recv_ (channels,
+                                      plugin_count,
+                                      &process_plugin_reply,
+                                      &prp))
+       {
+         /* serious problem in IPC; reset *all* channels */
+         abort_all_channels (plugins);
+         break;
+       }
+      /* calculate minimum seek request (or set done=0 to continue here) */
+      done = 1;
+      min_seek = -1;
+      for (pos = plugins; NULL != pos; pos = pos->next)
+       {
+         if ( (1 == pos->round_finished) ||
+              (NULL == pos->channel) )
+           continue; /* inactive plugin */
+         if (-1 == pos->seek_request)
+           {
+             done = 0; /* possibly more meta data at current position! */
+             break;
+           }
+         if ( (-1 == min_seek) ||
+              (min_seek > pos->seek_request) )
+           {
+             min_seek = pos->seek_request;
+           }
+       }
+      if ( (1 == done) &&
+          (-1 != min_seek) )
+       {
+         /* current position done, but seek requested */
+         done = 0;
+         if (-1 ==
+             (data_available = EXTRACTOR_IPC_shared_memory_set_ (shm,
+                                                                 ds,
+                                                                 min_seek,
+                                                                 
DEFAULT_SHM_SIZE)))
+           {
+             abort_all_channels (plugins);
+             break;
+           }
+       }
+      /* if 'prp.file_finished', send 'abort' to plugins;
+        if not, send 'seek' notification to plugins in range */
+      for (pos = plugins; NULL != pos; pos = pos->next)
+       {
+         if (NULL == (channel = channels[plugin_count]))
+           continue;
+         if ( (-1 != pos->seek_request) &&
+              (min_seek <= pos->seek_request) &&
+              (min_seek + data_available > pos->seek_request) )
+           {
 
-    plugin_count = 0;
-    for (pos = plugins; NULL != pos; pos = pos->next)
-      {
-       channels[plugin_count] = pos->channel;
-       if ( (NULL != pos->channel) &&
-            (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
-                                                &start,
-                                                sizeof (start)) ) )
-         {
-           channels[plugin_count] = NULL;
-           EXTRACTOR_IPC_channel_destroy_ (pos->channel);
-           pos->channel = NULL;
-         }
-       plugin_count++;
-      }
-    done = 0;
-    while (! done)
-      {
-       done = 1;
+             /* FIXME: notify plugin about seek! */
+             pos->seek_request = -1;
+           }
+         if ( (-1 != pos->seek_request) && 
+              (1 == prp.file_finished) )
+           {
+             send_discard_message (pos);
+             pos->round_finished = 1;
+           }         
+         if (0 == pos->round_finished)
+           done = 0; /* can't be done, plugin still active */  
+       }
+    }
 
-       // FIXME: need to handle 'seek' messages from plugins somewhere
-       if (-1 == 
-           EXTRACTOR_IPC_channel_recv_ (channels,
-                                        plugin_count,
-                                        &process_plugin_reply,
-                                        &prp))
-         break;
-       plugin_count = 0;
-       for (pos = plugins; NULL != pos; pos = pos->next)
-         {
-           if (NULL != (channel = channels[plugin_count]))
-             {
-               // ... FIXME ...
-             }
-           plugin_count++;
-         }
-       // FIXME: need to terminate once all plugins are done...
-       done = 0;
-      }
-  }
-
   /* run in-process plugins */
   for (pos = plugins; NULL != pos; pos = pos->next)
     {




reply via email to

[Prev in Thread] Current Thread [Next in Thread]