gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3143 - in GNUnet/src: include transports


From: grothoff
Subject: [GNUnet-SVN] r3143 - in GNUnet/src: include transports
Date: Fri, 28 Jul 2006 02:30:06 -0700 (PDT)

Author: grothoff
Date: 2006-07-28 02:30:02 -0700 (Fri, 28 Jul 2006)
New Revision: 3143

Modified:
   GNUnet/src/include/gnunet_transport.h
   GNUnet/src/include/gnunet_util_config.h
   GNUnet/src/transports/nat.c
   GNUnet/src/transports/tcp.c
   GNUnet/src/transports/udp.c
Log:
updated transports

Modified: GNUnet/src/include/gnunet_transport.h
===================================================================
--- GNUnet/src/include/gnunet_transport.h       2006-07-28 08:29:51 UTC (rev 
3142)
+++ GNUnet/src/include/gnunet_transport.h       2006-07-28 09:30:02 UTC (rev 
3143)
@@ -263,6 +263,8 @@
    *        or the hello_message from connect)
    * @param msg the message
    * @param size the size of the message, <= mtu
+   * @param important YES if message is important (i.e. grow 
+   *        buffers to queue if needed)
    * @return SYSERR on error, NO on temporary error (retry),
    *         YES/OK on success; after any persistent error,
    *         the caller must call "disconnect" and not continue
@@ -271,27 +273,10 @@
    */
   int (*send)(TSession * tsession,
              const void * msg,
-             const unsigned int size);
+             const unsigned int size,
+             int important);
 
   /**
-   * Send a message to the specified remote node with
-   * increased reliablility (whatever that means is
-   * up to the transport).
-   *
-   * @param tsession an opaque session handle (e.g. a socket
-   *        or the hello_message from connect)
-   * @param msg the message
-   * @param size the size of the message, <= mtu
-   * @return SYSERR on error, OK on success; after any error,
-   *         the caller must call "disconnect" and not continue
-   *         using the session afterwards (useful if the other
-   *         side closed the connection).
-   */
-  int (*sendReliable)(TSession * tsession,
-                     const void * msg,
-                     const unsigned int size);
-
-  /**
    * A (core) Session is to be associated with a transport session. The
    * transport service may want to know in order to call back on the
    * core if the connection is being closed. Associate can also be
@@ -340,12 +325,6 @@
   int (*stopTransportServer)(void);
 
   /**
-   * Reload the configuration. Should never fail (keep old
-   * configuration on error, syslog errors!)
-   */
-  void (*reloadConfiguration)(void);
-
-  /**
    * Convert transport address to human readable string.
    */
   char * (*addressToString)(const P2P_hello_MESSAGE * helo);

Modified: GNUnet/src/include/gnunet_util_config.h
===================================================================
--- GNUnet/src/include/gnunet_util_config.h     2006-07-28 08:29:51 UTC (rev 
3142)
+++ GNUnet/src/include/gnunet_util_config.h     2006-07-28 09:30:02 UTC (rev 
3143)
@@ -182,7 +182,11 @@
 
 /**
  * Attach a callback that is notified whenever a 
- * configuration option changes.
+ * configuration option changes.<p>
+ *
+ * TODO: also call callback on existing configuration and confirm
+ * existing configuration is OK!  If not, return error!
+ * 
  * @return 0 on success, -1 on error
  */
 int GC_attach_change_listener(struct GC_Configuration * cfg,

Modified: GNUnet/src/transports/nat.c
===================================================================
--- GNUnet/src/transports/nat.c 2006-07-28 08:29:51 UTC (rev 3142)
+++ GNUnet/src/transports/nat.c 2006-07-28 09:30:02 UTC (rev 3143)
@@ -148,7 +148,8 @@
  */
 static int natSend(TSession * tsession,
                   const void * message,
-                  const unsigned int size) {
+                  const unsigned int size,
+                  int important) {
   return SYSERR;
 }
 
@@ -180,12 +181,6 @@
 }
 
 /**
- * Reload the configuration. Should never fail.
- */
-static void reloadConfiguration(void) {
-}
-
-/**
  * Convert NAT address to a string.
  */
 static char * addressToString(const P2P_hello_MESSAGE * helo) {
@@ -206,12 +201,10 @@
   natAPI.createhello           = &createhello;
   natAPI.connect              = &natConnect;
   natAPI.send                 = &natSend;
-  natAPI.sendReliable         = &natSend; /* can't increase reliability */
   natAPI.associate            = &natAssociate;
   natAPI.disconnect           = &natDisconnect;
   natAPI.startTransportServer = &startTransportServer;
   natAPI.stopTransportServer  = &stopTransportServer;
-  natAPI.reloadConfiguration  = &reloadConfiguration;
   natAPI.addressToString      = &addressToString;
 
   return &natAPI;

Modified: GNUnet/src/transports/tcp.c
===================================================================
--- GNUnet/src/transports/tcp.c 2006-07-28 08:29:51 UTC (rev 3142)
+++ GNUnet/src/transports/tcp.c 2006-07-28 09:30:02 UTC (rev 3143)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2002, 2003, 2004, 2005 Christian Grothoff (and other contributing 
authors)
+     (C) 2002, 2003, 2004, 2005, 2006 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -63,35 +63,11 @@
 } HostAddress;
 
 /**
- * TCP Message-Packet header.
- */
-typedef struct {
-  /**
-   * size of the message, in bytes, excluding this header;
-   * max 65535; we do NOT want to make this field an int
-   * because then a malicious peer could cause us to allocate
-   * lots of memory -- this bounds it by 64k/peer.
-   * Field is in network byte order.
-   */
-  unsigned short size;
-
-  /**
-   * For alignment, always 0.
-   */
-  unsigned short reserved;
-
-  /**
-   * This struct is followed by MESSAGE_PARTs - until size is reached
-   * There is no "end of message".
-   */
-} TCPP2P_PACKET;
-
-/**
  * Initial handshake message. Note that the beginning
  * must match the CS_MESSAGE_HEADER since we are using tcpio.
  */
 typedef struct {
-  TCPP2P_PACKET header;
+  MESSAGE_HEADER header;
 
   /**
    * Identity of the node connecting (TCP client)
@@ -104,21 +80,16 @@
  */
 typedef struct {
   /**
-   * the tcp socket
+   * the tcp socket (used to identify this connection with selector)
    */
   struct SocketHandle * sock;
 
   /**
-   * number of users of this session
+   * number of users of this session (reference count)
    */
   int users;
 
   /**
-   * Last time this connection was used
-   */
-  cron_t lastUse;
-
-  /**
    * mutex for synchronized access to 'users'
    */
   struct MUTEX * lock;
@@ -162,6 +133,8 @@
 
 static struct GC_Configuration * cfg;
 
+static struct MUTEX * tcplock;
+
 /* ******************** helper functions *********************** */
 
 /**
@@ -178,62 +151,6 @@
 }
 
 /**
- * Disconnect from a remote node. May only be called
- * on sessions that were aquired by the caller first.
- * For the core, aquiration means to call associate or
- * connect. The number of disconnects must match the
- * number of calls to connect+associate.
- *
- * @param tsession the session that is closed
- * @return OK on success, SYSERR if the operation failed
- */
-static int tcpDisconnect(TSession * tsession) {
-  if (tsession->internal != NULL) {
-    TCPSession * tcpsession = tsession->internal;
-
-    MUTEX_LOCK(tcpsession->lock);
-    tcpsession->users--;
-    if (tcpsession->users > 0) {
-      MUTEX_UNLOCK(tcpsession->lock);
-      return OK;
-    }
-    MUTEX_UNLOCK(tcpsession->lock);
-    MUTEX_DESTROY(tcpsession->lock);
-    FREE(tcpsession->rbuff);
-    FREENONNULL(tcpsession->wbuff);
-    tcpsession->wbuff = NULL;
-    FREE(tcpsession);
-  }
-  FREE(tsession);
-  return OK;
-}
-
-/**
- * Remove a session, either the other side closed the connection
- * or we have otherwise reason to believe that it should better
- * be killed. Destroy session closes the session as far as the
- * TCP layer is concerned, but since the core may still have
- * references to it, tcpDisconnect may not instantly free all
- * the associated resources. <p>
- *
- * destroySession may only be called if the tcplock is already
- * held.
- *
- * @param i index to the session handle
- */
-static void destroySession(int i) {
-  TCPSession * tcpSession;
-
-  tcpSession = tsessions[i]->internal;
-  if (tcpSession->sock != NULL)
-    socket_destroy(tcpSession->sock);
-  tcpSession->sock = NULL;
-  tcpDisconnect(tsessions[i]);
-  tsessions[i] = tsessions[--tsessionCount];
-  tsessions[tsessionCount] = NULL;
-}
-
-/**
  * Get the GNUnet UDP port from the configuration,
  * or from /etc/services if it is not specified in
  * the config file.
@@ -258,6 +175,35 @@
 }
 
 /**
+ * Disconnect from a remote node. May only be called
+ * on sessions that were aquired by the caller first.
+ * For the core, aquiration means to call associate or
+ * connect. The number of disconnects must match the
+ * number of calls to connect+associate.
+ *
+ * @param tsession the session that is closed
+ * @return OK on success, SYSERR if the operation failed
+ */
+static int tcpDisconnect(TSession * tsession) {
+  TCPSession * tcpsession = tsession->internal;
+
+  GE_ASSERT(ectx, tcpsession != NULL);
+  MUTEX_LOCK(tcpsession->lock);
+  tcpsession->users--;
+  if (tcpsession->users > 0) {
+    MUTEX_UNLOCK(tcpsession->lock);
+    return OK;
+  }  
+  select_disconnect(selector,
+                   tcpsession->sock);
+  MUTEX_UNLOCK(tcpsession->lock);
+  MUTEX_DESTROY(tcpsession->lock);
+  FREE(tcpsession);  
+  FREE(tsession);
+  return OK;
+}
+
+/**
  * A (core) Session is to be associated with a transport session. The
  * transport service may want to know in order to call back on the
  * core if the connection is being closed. Associate can also be
@@ -279,11 +225,8 @@
 static int tcpAssociate(TSession * tsession) {
   TCPSession * tcpSession;
 
-  if (tsession == NULL) {
-    GE_BREAK(ectx, 0);
-    return SYSERR;
-  }
-  tcpSession = (TCPSession*) tsession->internal;
+  GE_ASSERT(ectx, tsession != NULL);
+  tcpSession = tsession->internal;
   MUTEX_LOCK(tcpSession->lock);
   tcpSession->users++;
   MUTEX_UNLOCK(tcpSession->lock);
@@ -296,438 +239,146 @@
  * This function may only be called if the tcplock is
  * already held by the caller.
  */
-static int readAndProcess(int i) {
-  TSession * tsession;
+static int select_message_handler(void * mh_cls,
+                                 struct SelectHandle * sh,
+                                 struct SocketHandle * sock,
+                                 void * sock_ctx,
+                                 const MESSAGE_HEADER * msg) {
+  TSession * tsession = sock_ctx;
   TCPSession * tcpSession;
   unsigned int len;
-  int ret;
-  TCPP2P_PACKET * pack;
   P2P_PACKET * mp;
-  size_t recvd;
+  const TCPWelcome * welcome;
 
-  tsession = tsessions[i];
   if (SYSERR == tcpAssociate(tsession))
     return SYSERR;
+  len = ntohs(msg->size);
   tcpSession = tsession->internal;
-  if (tcpSession->rsize == tcpSession->pos) {
-    /* read buffer too small, grow */
-    GROW(tcpSession->rbuff,
-        tcpSession->rsize,
-        tcpSession->rsize * 2);
-  }
-  ret = socket_recv(tcpSession->sock,
-                   NC_Blocking | NC_IgnoreInt, 
-                   &tcpSession->rbuff[tcpSession->pos],
-                   tcpSession->rsize - tcpSession->pos,
-                   &recvd);
-  tcpSession->lastUse = get_time();
-  if (ret != OK) {
-    tcpDisconnect(tsession);
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK,
-          "READ on socket %d returned 0 bytes, closing connection\n",
-          tcpSession->sock);
-#endif
-    return SYSERR; /* other side closed connection */
-  }
-  tcpSession->pos += recvd;
-
-  while (tcpSession->pos > 2) {
-    len = ntohs(((TCPP2P_PACKET*)&tcpSession->rbuff[0])->size)
-      + sizeof(TCPP2P_PACKET);
-    if (len > tcpSession->rsize) /* if message larger than read buffer, grow! 
*/
-      GROW(tcpSession->rbuff,
-          tcpSession->rsize,
-          len);
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK,
-          "Read %d bytes on socket %d, expecting %d for full message\n",
-          tcpSession->pos,
-          tcpSession->sock,
-          len);
-#endif
-    if (tcpSession->pos < len) {
+  if (YES == tcpSession->expectingWelcome) {    
+    welcome = (const TCPWelcome*) msg;
+    if ( (ntohs(welcome->header.type) != 0) ||
+        (len != sizeof(TCPWelcome)) ) {
       tcpDisconnect(tsession);
-      return OK;
+      return SYSERR;    
     }
-
-    /* complete message received, let's check what it is */
-    if (YES == tcpSession->expectingWelcome) {
-      TCPWelcome * welcome;
-#if DEBUG_TCP
-      EncName enc;
-#endif
-
-      welcome = (TCPWelcome*) &tcpSession->rbuff[0];
-      if ( (ntohs(welcome->header.reserved) != 0) ||
-          (ntohs(welcome->header.size)
-           != sizeof(TCPWelcome) - sizeof(TCPP2P_PACKET)) ) {
-       GE_LOG(ectx,
-              GE_WARNING | GE_USER | GE_BULK,
-              _("Expected welcome message on tcp connection, "
-                "got garbage (%u, %u). Closing.\n"),
-              ntohs(welcome->header.reserved),
-              ntohs(welcome->header.size));
-       tcpDisconnect(tsession);
-       return SYSERR;
-      }
-      tcpSession->expectingWelcome = NO;
-      tcpSession->sender = welcome->clientIdentity;
-#if DEBUG_TCP
-      IF_GELOG(ectx,
-              GE_DEBUG | GE_USER | GE_BULK,
-              hash2enc(&tcpSession->sender.hashPubKey,
-                       &enc));
-      GE_LOG(etcx,
-            GE_DEBUG | GE_USER | GE_BULK,
-            "tcp welcome message from `%s' received\n",
-            &enc);
-#endif
-      memmove(&tcpSession->rbuff[0],
-             &tcpSession->rbuff[sizeof(TCPWelcome)],
-             tcpSession->pos - sizeof(TCPWelcome));
-      tcpSession->pos -= sizeof(TCPWelcome);
-      len = ntohs(((TCPP2P_PACKET*)&tcpSession->rbuff[0])->size)
-       + sizeof(TCPP2P_PACKET);
-    }
-    if ( (tcpSession->pos < 2) ||
-        (tcpSession->pos < len) ) {
-      tcpDisconnect(tsession);
-      return OK;
-    }
-
-    pack = (TCPP2P_PACKET*)&tcpSession->rbuff[0];
+    tcpSession->expectingWelcome = NO;
+    tcpSession->sender = welcome->clientIdentity;
+  } else {
     /* send msg to core! */
-    if (len <= sizeof(TCPP2P_PACKET)) {
+    if (len <= sizeof(MESSAGE_HEADER)) {
       GE_LOG(ectx,
             GE_WARNING | GE_USER | GE_BULK,
-            _("Received malformed message (size %u)"
-              " from tcp-peer connection. Closing.\n"),
-            len);
+            _("Received malformed message from tcp-peer connection. 
Closing.\n"));
       tcpDisconnect(tsession);
       return SYSERR;
     }
     mp      = MALLOC(sizeof(P2P_PACKET));
-    mp->msg = MALLOC(len - sizeof(TCPP2P_PACKET));
+    mp->msg = MALLOC(len - sizeof(MESSAGE_HEADER));
     memcpy(mp->msg,
-          &pack[1],
-          len - sizeof(TCPP2P_PACKET));
+          &msg[1],
+          len - sizeof(MESSAGE_HEADER));
     mp->sender   = tcpSession->sender;
-    mp->size     = len - sizeof(TCPP2P_PACKET);
+    mp->size     = len - sizeof(MESSAGE_HEADER);
     mp->tsession = tsession;
-#if DEBUG_TCP
-    {
-      EncName enc;
-      
-      IF_GELOG(ectx,
-              GE_DEBUG | GE_USER | GE_BULK,
-              hash2enc(&mp->sender.hashPubKey, 
-              &enc);      
-      GE_LOG(ectx,
-            GE_DEBUG | GE_USER | GE_BULK,
-            "tcp transport received %u bytes from %s (CRC %u), forwarding to 
core\n",
-            mp->size, 
-            &enc, 
-            crc32N(tcpSession->rbuff, 
-                   tcpSession->pos));
-    }
-#endif
     coreAPI->receive(mp);
-    /* finally, shrink buffer adequately */
-    memmove(&tcpSession->rbuff[0],
-           &tcpSession->rbuff[len],
-           tcpSession->pos - len);
-    tcpSession->pos -= len;
-    if ( (tcpSession->pos + 1024 < tcpSession->rsize) &&
-        (tcpSession->rsize > 4 * 1024) ) {
-      /* read buffer far too large, shrink! */
-      GROW(tcpSession->rbuff,
-          tcpSession->rsize,
-          tcpSession->pos + 1024);
-    }
   }
   tcpDisconnect(tsession);
   return OK;
 }
 
-/**
- * Add a new session to the array watched by the select thread.  Grows
- * the array if needed.  If the caller wants to do anything useful
- * with the return value, it must have the lock on tcplock before
- * calling.  It is ok to call this function without holding tcplock if
- * the return value is ignored.
- */
-static unsigned int addTSession(TSession * tsession) {
-  unsigned int i;
 
-  MUTEX_LOCK(tcplock);
-  if (tsessionCount == tsessionArrayLength)
-    GROW(tsessions,
-        tsessionArrayLength,
-        tsessionArrayLength * 2);
-  i = tsessionCount;
-  tsessions[tsessionCount++] = tsession;
-  MUTEX_UNLOCK(tcplock);
-  return i;
-}
-
 /**
  * Create a new session for an inbound connection on the given
  * socket. Adds the session to the array of sessions watched
  * by the select thread.
  */
-static void createNewSession(int sock) {
+static void * select_accept_handler(void * ah_cls,
+                                   struct SelectHandle * sh,
+                                   struct SocketHandle * sock,
+                                   const void * addr,
+                                   unsigned int addr_len) {
   TSession * tsession;
   TCPSession * tcpSession;
+  IPaddr ip;
 
+  if (addr_len != sizeof(IPaddr))
+    return NULL;
+  memcpy(&ip,
+        addr,
+        addr_len);
+  if (isBlacklisted(ip))
+    return NULL;
   tcpSession = MALLOC(sizeof(TCPSession));
-  tcpSession->pos = 0;
-  tcpSession->rsize = 2 * 1024 + sizeof(TCPP2P_PACKET);
-  tcpSession->rbuff = MALLOC(tcpSession->rsize);
-  tcpSession->wpos = 0;
-  tcpSession->wbuff = NULL;
-  tcpSession->wsize = 0;
-  tcpSession->sock = socket_create(ectx,
-                                  load_monitor,
-                                  sock);
+  tcpSession->sock = sock;
   /* fill in placeholder identity to mark that we
      are waiting for the welcome message */
   tcpSession->sender = *(coreAPI->myIdentity);
   tcpSession->expectingWelcome = YES;
   tcpSession->lock = MUTEX_CREATE(YES);
   tcpSession->users = 1; /* us only, core has not seen this tsession! */
-  tcpSession->lastUse = get_time();
   tsession = MALLOC(sizeof(TSession));
   tsession->ttype = TCP_PROTOCOL_NUMBER;
   tsession->internal = tcpSession;
-  addTSession(tsession);
+
+  return tsession;
 }                                      
 
-/**
- * Send a message (already encapsulated if needed) via the
- * tcp socket (or enqueue if sending now would block).
- *
- * @param tcpSession the session to use for sending
- * @param mp the message to send
- * @param ssize the size of the message
- * @return OK if message send or queued, NO if queue is full and
- * message was dropped, SYSERR on error
- */
-static int tcpDirectSend(TCPSession * tcpSession,
-                        void * mp,
-                        unsigned int ssize) {
-  size_t ret;
-  int success;
-
-#if DEBUG_TCP
-  {
-    EncName enc;
-    
-    IF_GELOG(ectx,
-            GE_DEBUG | GE_USER | GE_BULK,
-            hash2enc(&tcpSession->sender.hashPubKey,
-                     &enc));    
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK,
-          "tcpDirectSend called to transmit %u bytes to %s (CRC %u).\n",
-          ssize,
-          &enc,
-          crc32N(mp, ssize));
-  }
-#endif 
-  if (tcp_shutdown == YES) {
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK,
-          "tcpDirectSend called while TCP transport is shutdown.\n");          
-#endif 
-    return SYSERR;
-  }
-  if (tcpSession->sock == NULL) {
-#if DEBUG_TCP
-    LOG(LOG_INFO,
-       "tcpDirectSend called, but socket is closed\n");
-#endif
-    return SYSERR;
-  }
-  if (ssize == 0) {
-    GE_BREAK(ectx, 0); /* size 0 not allowed */
-    return SYSERR;
-  }
-  MUTEX_LOCK(tcplock);
-  if (tcpSession->wpos > 0) {
-    /* select already pending... */
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK,           
-          "write already pending, will not take additional message.\n");
-#endif
-    if (stats != NULL)
-      stats->change(stat_bytesDropped,
-                   ssize);
-    MUTEX_UNLOCK(tcplock);
-    return NO;
-  }
-#if DEBUG_TCP
-  GE_LOG(ectx,
-        GE_DEBUG | GE_USER | GE_BULK, 
-        "TCP: trying to send %u bytes\n",
-        ssize);
-#endif
-  success = socket_send(tcpSession->sock,
-                       NC_Nonblocking,
-                       mp,
-                       ssize,
-                       &ret);
-  if (success == SYSERR) {
-#if DEBUG_TCP
-    LOG_STRERROR(LOG_INFO, "send");
-#endif
-    MUTEX_UNLOCK(tcplock);
-    return SYSERR;
-  }
-  if (success == NO)
-    ret = 0;
-  if (stats != NULL)
-    stats->change(stat_bytesSent,
-                 ret);
-
-#if DEBUG_TCP
-  GE_LOG(ectx,
-        GE_DEBUG | GE_USER | GE_BULK, 
-        "TCP: transmitted %u bytes\n",
-        ret);
-#endif
-
-  if (ret < ssize) {/* partial send */
-    if (tcpSession->wsize < ssize - ret) {
-      GROW(tcpSession->wbuff,
-          tcpSession->wsize,
-          ssize - ret);
-    }
-    memcpy(tcpSession->wbuff,
-          mp + ret,
-          ssize - ret);
-    tcpSession->wpos = ssize - ret;
-    signalSelect(); /* select set changed! */
-  }
-  tcpSession->lastUse = get_time();
-  MUTEX_UNLOCK(tcplock);
-  return OK;
+static void select_close_handler(void * ch_cls,
+                                struct SelectHandle * sh,
+                                struct SocketHandle * sock,
+                                void * sock_ctx) {
+  TSession * tsession = sock_ctx;
+  tcpDisconnect(tsession);
 }
 
 /**
- * Send a message (already encapsulated if needed) via the
- * tcp socket.  Block if required.
+ * Send a message to the specified remote node.
  *
- * @param tcpSession the session to use for sending
- * @param mp the message to send
- * @param ssize the size of the message
- * @return OK if message send or queued, NO if queue is full and
- * message was dropped, SYSERR on error
- */
-static int tcpDirectSendReliable(TCPSession * tcpSession,
-                                void * mp,
-                                unsigned int ssize) {
-  int ok;
-
-#if DEBUG_TCP
-  {
-    EncName enc;
-    
-    IF_GELOC(ectx,
-            GE_DEBUG | GE_USER | GE_BULK, 
-            hash2enc(&tcpSession->sender.hashPubKey, 
-                     &enc));    
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK, 
-          "tcpDirectSendReliable called to transmit %u bytes to %s (CRC 
%u).\n",
-          ssize, 
-          &enc,
-          crc32N(mp, ssize));
-  }
-#endif 
-  if (tcp_shutdown == YES) {
-#if DEBUG_TCP
-    LOG(LOG_INFO,
-       "tcpDirectSendReliable called, but TCP service is shutdown\n");
-#endif
-    return SYSERR;
-  }
-  if (tcpSession->sock == NULL) {
-#if DEBUG_TCP
-    LOG(LOG_INFO,
-       "tcpDirectSendReliable called, but socket is closed\n");
-#endif
-    return SYSERR;
-  }
-  if (ssize == 0) {
-    GE_BREAK(ectx, 0);
-    return SYSERR;
-  }
-  MUTEX_LOCK(tcplock);
-  if (tcpSession->wpos > 0) {
-    unsigned int old = tcpSession->wpos;
-    GROW(tcpSession->wbuff,
-        tcpSession->wsize,
-        tcpSession->wpos + ssize);
-    tcpSession->wpos += ssize;
-    memcpy(&tcpSession->wbuff[old],
-          mp,
-          ssize);
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK, 
-          "tcpDirectSendReliable appended message to send buffer.\n");
-#endif 
-
-    ok = OK;
-  } else {
-    ok = tcpDirectSend(tcpSession,
-                      mp,
-                      ssize);
-  }
-  MUTEX_UNLOCK(tcplock);
-  return ok;
-}
-
-/**
- * Send a message to the specified remote node with
- * increased reliability (i.e. grow TCP send buffer
- * above one frame if needed).
- *
- * @param tsession the P2P_hello_MESSAGE identifying the remote node
+ * @param tsession the handle identifying the remote node
  * @param msg the message
  * @param size the size of the message
- * @return SYSERR on error, OK on success, NO on temporary error
+ * @return SYSERR on error, OK on success
  */
-static int tcpSendReliable(TSession * tsession,
-                          const void * msg,
-                          const unsigned int size) {
-  TCPP2P_PACKET * mp;
+static int tcpSend(TSession * tsession,
+                  const void * msg,
+                  const unsigned int size,
+                  int important) {
+  TCPSession * tcpSession;
+  MESSAGE_HEADER * mp;
   int ok;
 
-  if (size >= MAX_BUFFER_SIZE)
+  tcpSession = tsession->internal;
+  if (size >= MAX_BUFFER_SIZE - sizeof(MESSAGE_HEADER)) {
+    GE_BREAK(ectx, 0);
+    return SYSERR; /* too big */
+  }
+  if (selector == NULL) {
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   size);
     return SYSERR;
-  if (tcp_shutdown == YES)
-    return SYSERR;
+  }
   if (size == 0) {
     GE_BREAK(ectx, 0);
     return SYSERR;
   }
-  if (((TCPSession*)tsession->internal)->sock == NULL)
+  if (tcpSession->sock == NULL) {
+    if (stats != NULL)
+      stats->change(stat_bytesDropped,
+                   size);
     return SYSERR; /* other side closed connection */
-  mp = MALLOC(sizeof(TCPP2P_PACKET) + size);
+  }
+  mp = MALLOC(sizeof(MESSAGE_HEADER) + size);
+  mp->size = htons(size + sizeof(MESSAGE_HEADER));
+  mp->type = 0;
   memcpy(&mp[1],
         msg,
         size);
-  mp->size = htons(size);
-  mp->reserved = 0;
-  ok = tcpDirectSendReliable(tsession->internal,
-                            mp,
-                            size + sizeof(TCPP2P_PACKET));
+  ok = select_write(selector,
+                   tcpSession->sock,
+                   mp,
+                   NO,
+                   important);
   FREE(mp);
   return ok;
 }
@@ -811,7 +462,6 @@
  */
 static int tcpConnect(const P2P_hello_MESSAGE * helo,
                      TSession ** tsessionPtr) {
-  int i;
   HostAddress * haddr;
   TCPWelcome welcome;
   int sock;
@@ -819,8 +469,9 @@
   TCPSession * tcpSession;
   struct sockaddr_in soaddr;
   struct SocketHandle * s;
+  int i;
 
-  if (tcp_shutdown == YES)
+  if (selector == NULL)
     return SYSERR;
   haddr = (HostAddress*) &helo[1];
 #if DEBUG_TCP
@@ -840,7 +491,7 @@
     return SYSERR;
   }
   s = socket_create(ectx,
-                   load_monitor,
+                   coreAPI->load_monitor,
                    sock);
   if (-1 == socket_set_blocking(s, NO)) {
     socket_destroy(s);
@@ -872,118 +523,40 @@
   }
   tcpSession = MALLOC(sizeof(TCPSession));
   tcpSession->sock = s;
-  tcpSession->wpos = 0;
-  tcpSession->wbuff = NULL;
-  tcpSession->wsize = 0;
-  tcpSession->rsize = 2 * 1024 + sizeof(TCPP2P_PACKET);
-  tcpSession->rbuff = MALLOC(tcpSession->rsize);
   tsession = MALLOC(sizeof(TSession));
   tsession->internal = tcpSession;
   tsession->ttype = tcpAPI.protocolNumber;
   tcpSession->lock = MUTEX_CREATE(YES);
   tcpSession->users = 2; /* caller + us */
-  tcpSession->pos = 0;
-  tcpSession->lastUse = get_time();
   tcpSession->sender = helo->senderIdentity;
   tcpSession->expectingWelcome = NO;
   MUTEX_LOCK(tcplock);
-  i = addTSession(tsession);
+  select_connect(selector,
+                tcpSession->sock,
+                tsession);
 
   /* send our node identity to the other side to fully establish the
      connection! */
   welcome.header.size
-    = htons(sizeof(TCPWelcome) - sizeof(TCPP2P_PACKET));
-  welcome.header.reserved
+    = htons(sizeof(TCPWelcome));
+  welcome.header.type
     = htons(0);
   welcome.clientIdentity
     = *(coreAPI->myIdentity);
-  if (SYSERR == tcpDirectSend(tcpSession,
-                             &welcome,
-                             sizeof(TCPWelcome))) {
-    destroySession(i);
+  if (SYSERR == tcpSend(tsession,
+                       &welcome.header,
+                       sizeof(TCPWelcome),
+                       YES)) {
     tcpDisconnect(tsession);
     MUTEX_UNLOCK(tcplock);
     return SYSERR;
   }
   MUTEX_UNLOCK(tcplock);
-  signalSelect();
-
   *tsessionPtr = tsession;
   return OK;
 }
 
 /**
- * Send a message to the specified remote node.
- *
- * @param tsession the P2P_hello_MESSAGE identifying the remote node
- * @param msg the message
- * @param size the size of the message
- * @return SYSERR on error, OK on success
- */
-static int tcpSend(TSession * tsession,
-                  const void * msg,
-                  const unsigned int size) {
-  TCPP2P_PACKET * mp;
-  int ok;
-
-#if DEBUG_TCP
-  GE_LOG(ectx,
-        GE_DEBUG | GE_USER | GE_BULK, 
-        "tcpSend called to transmit %u bytes.\n",
-        size);
-#endif 
-  if (size >= MAX_BUFFER_SIZE) {
-    GE_BREAK(ectx, 0);
-    return SYSERR;
-  }
-
-  if (tcp_shutdown == YES) {
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK, 
-          "tcpSend called while TCP is shutdown.\n");
-#endif 
-    if (stats != NULL)
-      stats->change(stat_bytesDropped,
-                   size);
-    return SYSERR;
-  }
-  if (size == 0) {
-    GE_BREAK(ectx, 0);
-    return SYSERR;
-  }
-  if (((TCPSession*)tsession->internal)->sock == NULL) {
-#if DEBUG_TCP
-    GE_LOG(ectx,
-          GE_DEBUG | GE_USER | GE_BULK, 
-          "tcpSend called after other side closed connection.\n");
-#endif
-    if (stats != NULL)
-      stats->change(stat_bytesDropped,
-                   size);
-    return SYSERR; /* other side closed connection */
-  }
-  mp = MALLOC(sizeof(TCPP2P_PACKET) + size);
-  memcpy(&mp[1],
-        msg,
-        size);
-  mp->size = htons(size);
-  mp->reserved = 0;
-  /* if we would have less than TARGET_BUFFER_SIZE in buffers,
-     do reliable send */
-  if (((TCPSession*)tsession->internal)->wpos + size < TARGET_BUFFER_SIZE)
-    ok = tcpDirectSendReliable(tsession->internal,
-                              mp,
-                              size + sizeof(TCPP2P_PACKET));
-  else
-    ok = tcpDirectSend(tsession->internal,
-                      mp,
-                      size + sizeof(TCPP2P_PACKET));
-  FREE(mp);
-  return ok;
-}
-
-/**
  * Start the server process to receive inbound traffic.
  * @return OK on success, SYSERR if the operation failed
  */
@@ -993,94 +566,66 @@
   unsigned short port;
   int s;
 
-  if (serverSignal != NULL) {
+  if (selector != NULL) {
     GE_BREAK(ectx, 0);
     return SYSERR;
   }
-  serverSignal = SEMAPHORE_CREATE(0);
-  tcp_shutdown = NO;
-
-  if (0 != PIPE(tcp_pipe)) {
+  port = getGNUnetTCPPort();
+  if (port == 0) { 
+    /* read-only TCP */
+    return OK;
+  }
+  s = SOCKET(PF_INET,
+            SOCK_STREAM,
+            0);
+  if (s < 0) {
     GE_LOG_STRERROR(ectx,
                    GE_ERROR | GE_ADMIN | GE_BULK,
-                   "pipe");
+                   "socket");
     return SYSERR;
   }
-  setBlocking(tcp_pipe[1], NO);
-
-  port = getGNUnetTCPPort();
-  if (port != 0) { /* if port == 0, this is a read-only
-                     business! */
-    s = SOCKET(PF_INET,
-              SOCK_STREAM,
-              0);
-    if (s < 0) {
-      GE_LOG_STRERROR(ectx,
-                     GE_ERROR | GE_ADMIN | GE_BULK,
-                     "socket");
-      if (0 != CLOSE(tcp_pipe[0]))
-       GE_LOG_STRERROR(ectx,
-                       GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
-                       "close");
-      if (0 != CLOSE(tcp_pipe[1]))
-       GE_LOG_STRERROR(ectx,
-                       GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
-                       "close");
-      SEMAPHORE_DESTROY(serverSignal);
-      serverSignal = NULL;
-      tcp_shutdown = YES;
-      return SYSERR;
-    }
-    if (SETSOCKOPT(s,
-                  SOL_SOCKET,
-                  SO_REUSEADDR,
-                  &on,
-                  sizeof(on)) < 0 )
-      GE_DIE_STRERROR(ectx, 
-                     GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
-                     "setsockopt");
-    memset((char *) &serverAddr,
-          0,
-          sizeof(serverAddr));
-    serverAddr.sin_family      = AF_INET;
-    serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
-    serverAddr.sin_port        = htons(getGNUnetTCPPort());
-#if DEBUG_TCP
+  if (SETSOCKOPT(s,
+                SOL_SOCKET,
+                SO_REUSEADDR,
+                &on,
+                sizeof(on)) < 0 )
+    GE_DIE_STRERROR(ectx, 
+                   GE_FATAL | GE_ADMIN | GE_IMMEDIATE,
+                   "setsockopt");
+  memset((char *) &serverAddr,
+        0,
+        sizeof(serverAddr));
+  serverAddr.sin_family      = AF_INET;
+  serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
+  serverAddr.sin_port        = htons(getGNUnetTCPPort());
+  if (BIND(s,
+          (struct sockaddr *) &serverAddr,
+          sizeof(serverAddr)) < 0) {
+    GE_LOG_STRERROR(ectx,
+                   GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
+                   "bind");
     GE_LOG(ectx,
-          GE_INFO | GE_USER | GE_BULK,
-          "starting %s peer server on port %d\n",
-          "tcp",
-          ntohs(serverAddr.sin_port));
-#endif
-    if (BIND(s,
-            (struct sockaddr *) &serverAddr,
-            sizeof(serverAddr)) < 0) {
+          GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
+          _("Failed to start transport service on port %d.\n"),
+          getGNUnetTCPPort());
+    if (0 != CLOSE(s))
       GE_LOG_STRERROR(ectx,
-                     GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
-                     "bind");
-      GE_LOG(ectx,
-            GE_ERROR | GE_ADMIN | GE_IMMEDIATE,
-            _("Failed to start transport service on port %d.\n"),
-            getGNUnetTCPPort());
-      if (0 != CLOSE(s))
-       GE_LOG_STRERROR(ectx,
-                       GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
-                       "close");
-      SEMAPHORE_DESTROY(serverSignal);
-      serverSignal = NULL;
-      return SYSERR;
-    }
-    if (0 != LISTEN(s, 5))
-      GE_LOG_STRERROR(ectx,
-                     GE_ERROR | GE_USER | GE_ADMIN | GE_IMMEDIATE, 
-                     "listen");
-    tcp_sock = socket_create(ectx,
-                            load_monitor,
-                            s);
-  } else
-    tcp_sock = NULL;
-
-  /* FIXME: call network/select code! */
+                     GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
+                     "close");
+    return SYSERR;
+  }
+  selector = select_create(ectx,
+                          coreAPI->load_monitor,
+                          s,
+                          sizeof(IPaddr),
+                          0, /* timeout */
+                          &select_message_handler,
+                          NULL,
+                          &select_accept_handler,
+                          NULL,
+                          &select_close_handler,
+                          NULL,
+                          0 /* memory quota */ );
   return OK;
 }
 
@@ -1089,34 +634,10 @@
  * traffic). Maybe restarted later!
  */
 static int stopTransportServer() {
-  void * unused;
-  int haveThread;
-
-  if (tcp_shutdown == YES)
-    return OK;
-  tcp_shutdown = YES;
-  signalSelect();
-  if (serverSignal != NULL) {
-    haveThread = YES;
-    SEMAPHORE_DOWN(serverSignal, YES);
-    SEMAPHORE_DESTROY(serverSignal);
-  } else
-    haveThread = NO;
-  serverSignal = NULL;
-  if (0 != CLOSE(tcp_pipe[1]))
-    GE_LOG_STRERROR(ectx,
-                   GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
-                   "close");
-  if (0 != CLOSE(tcp_pipe[0])) 
-    GE_LOG_STRERROR(ectx,
-                   GE_ERROR | GE_USER | GE_ADMIN | GE_BULK,
-                   "close");
-  if (tcp_sock != NULL) {
-    socket_destroy(tcp_sock);
-    tcp_sock = NULL;
+  if (selector != NULL) {
+    select_destroy(selector);
+    selector = NULL;
   }
-  if (haveThread == YES)
-    PTHREAD_JOIN(listenThread, &unused);
   return OK;
 }
 
@@ -1124,9 +645,16 @@
  * Reload the configuration. Should never fail (keep old
  * configuration on error, syslog errors!)
  */
-static void reloadConfiguration() {
+static int reloadConfiguration(void * ctx,
+                              struct GC_Configuration * cfg, 
+                              struct GE_Context * ectx,
+                              const char * section,
+                              const char * option) {
   char * ch;
 
+  if (0 != strcmp(section, "TCP"))
+    return OK; /* fast path */
+       
   MUTEX_LOCK(tcplock);
   FREENONNULL(filteredNetworks_);
   if (0 != GC_get_configuration_value_string(cfg,
@@ -1142,6 +670,8 @@
     FREE(ch);
   }
   MUTEX_UNLOCK(tcplock);
+  /* TODO: error handling! */
+  return OK;
 }
 
 /**
@@ -1173,17 +703,17 @@
 TransportAPI * inittransport_tcp(CoreAPIForTransport * core) {
   ectx = core->ectx;
   cfg = core->cfg;
-  load_monitor = core->load_monitor;
   GE_ASSERT(ectx, sizeof(HostAddress) == 8);
-  GE_ASSERT(ectx, sizeof(TCPP2P_PACKET) == 4);
+  GE_ASSERT(ectx, sizeof(MESSAGE_HEADER) == 4);
   GE_ASSERT(ectx, sizeof(TCPWelcome) == 68);
   tcplock = MUTEX_CREATE(YES);
-  reloadConfiguration();
-  tsessionCount = 0;
-  tsessionArrayLength = 0;
-  GROW(tsessions,
-       tsessionArrayLength,
-       32);
+  if (0 != GC_attach_change_listener(cfg,
+                                    &reloadConfiguration,
+                                    NULL)) {
+    MUTEX_DESTROY(tcplock);
+    tcplock = NULL;
+    return NULL;
+  }
   coreAPI = core;
   stats = coreAPI->requestService("stats");
   if (stats != NULL) {
@@ -1198,30 +728,24 @@
   tcpAPI.mtu                  = 0;
   tcpAPI.cost                 = 20000; /* about equal to udp */
   tcpAPI.verifyHelo           = &verifyHelo;
-  tcpAPI.createhello           = &createhello;
+  tcpAPI.createhello          = &createhello;
   tcpAPI.connect              = &tcpConnect;
   tcpAPI.associate            = &tcpAssociate;
   tcpAPI.send                 = &tcpSend;
-  tcpAPI.sendReliable         = &tcpSendReliable;
   tcpAPI.disconnect           = &tcpDisconnect;
   tcpAPI.startTransportServer = &startTransportServer;
   tcpAPI.stopTransportServer  = &stopTransportServer;
-  tcpAPI.reloadConfiguration  = &reloadConfiguration;
   tcpAPI.addressToString      = &addressToString;
 
   return &tcpAPI;
 }
 
 void donetransport_tcp() {
-  int i;
-
+  GC_detach_change_listener(cfg,
+                           &reloadConfiguration,
+                           NULL);
   coreAPI->releaseService(stats);
   stats = NULL;
-  for (i=tsessionCount-1;i>=0;i--)
-    destroySession(i);
-  GROW(tsessions,
-       tsessionArrayLength,
-       0);
   FREENONNULL(filteredNetworks_);
   MUTEX_DESTROY(tcplock);
 }

Modified: GNUnet/src/transports/udp.c
===================================================================
--- GNUnet/src/transports/udp.c 2006-07-28 08:29:51 UTC (rev 3142)
+++ GNUnet/src/transports/udp.c 2006-07-28 09:30:02 UTC (rev 3143)
@@ -530,7 +530,8 @@
  */
 static int udpSend(TSession * tsession,
                   const void * message,
-                  const unsigned int size) {
+                  const unsigned int size,
+                  int important) {
   char * msg;
   UDPMessage mp;
   P2P_hello_MESSAGE * helo;
@@ -785,12 +786,10 @@
   udpAPI.createhello          = &createhello;
   udpAPI.connect              = &udpConnect;
   udpAPI.send                 = &udpSend;
-  udpAPI.sendReliable         = &udpSend; /* can't increase reliability */
   udpAPI.associate            = &udpAssociate;
   udpAPI.disconnect           = &udpDisconnect;
   udpAPI.startTransportServer = &startTransportServer;
   udpAPI.stopTransportServer  = &stopTransportServer;
-  udpAPI.reloadConfiguration  = &reloadConfiguration;
   udpAPI.addressToString      = &addressToString;
 
   return &udpAPI;





reply via email to

[Prev in Thread] Current Thread [Next in Thread]