myserver-commit
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[myserver-commit] [2984] `ConnectionsScheduler' uses `SocketPair'.


From: Giuseppe Scrivano
Subject: [myserver-commit] [2984] `ConnectionsScheduler' uses `SocketPair'.
Date: Tue, 20 Jan 2009 21:13:53 +0000

Revision: 2984
          http://svn.sv.gnu.org/viewvc/?view=rev&root=myserver&revision=2984
Author:   gscrivano
Date:     2009-01-20 21:13:52 +0000 (Tue, 20 Jan 2009)

Log Message:
-----------
`ConnectionsScheduler' uses `SocketPair'.

Modified Paths:
--------------
    trunk/myserver/include/base/socket_pair/socket_pair.h
    trunk/myserver/include/connections_scheduler/connections_scheduler.h
    trunk/myserver/src/base/socket_pair/socket_pair.cpp
    trunk/myserver/src/connections_scheduler/connections_scheduler.cpp

Modified: trunk/myserver/include/base/socket_pair/socket_pair.h
===================================================================
--- trunk/myserver/include/base/socket_pair/socket_pair.h       2009-01-19 
21:50:45 UTC (rev 2983)
+++ trunk/myserver/include/base/socket_pair/socket_pair.h       2009-01-20 
21:13:52 UTC (rev 2984)
@@ -16,8 +16,8 @@
 along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
-#ifndef PIPE_H
-#define PIPE_H
+#ifndef SOCKET_PAIR_H
+#define SOCKET_PAIR_H
 
 #include "stdafx.h"
 #include <include/filter/stream.h>
@@ -39,6 +39,8 @@
        virtual int close ();
        void closeFirstHandle ();
        void closeSecondHandle ();
+  void setNonBlocking (bool blocking);
+       virtual u_long bytesToRead();
 private:
        FileHandle handles[2];
 };

Modified: trunk/myserver/include/connections_scheduler/connections_scheduler.h
===================================================================
--- trunk/myserver/include/connections_scheduler/connections_scheduler.h        
2009-01-19 21:50:45 UTC (rev 2983)
+++ trunk/myserver/include/connections_scheduler/connections_scheduler.h        
2009-01-20 21:13:52 UTC (rev 2984)
@@ -1,7 +1,7 @@
 /* -*- mode: c++ -*- */
 /*
 MyServer
-Copyright (C) 2007, 2008 Free Software Foundation, Inc.
+Copyright (C) 2007, 2008, 2009 Free Software Foundation, Inc.
 This program is free software; you can redistribute it and/or modify
 it under the terms of the GNU General Public License as published by
 the Free Software Foundation; either version 3 of the License, or
@@ -27,6 +27,7 @@
 #include <include/base/sync/semaphore.h>
 #include <include/base/hash_map/hash_map.h>
 #include <include/base/thread/thread.h>
+#include <include/base/socket_pair/socket_pair.h>
 
 #include <list>
 #include <queue>
@@ -69,10 +70,10 @@
     bool terminated; 
     bool terminate;
     Mutex* mutex;
-    FileHandle fd[2];
     event loopEvent;
     Server* server;
     ConnectionsScheduler* scheduler;
+    SocketPair socketPair;
   };
 
   ConnectionsScheduler(Server* server = NULL);

Modified: trunk/myserver/src/base/socket_pair/socket_pair.cpp
===================================================================
--- trunk/myserver/src/base/socket_pair/socket_pair.cpp 2009-01-19 21:50:45 UTC 
(rev 2983)
+++ trunk/myserver/src/base/socket_pair/socket_pair.cpp 2009-01-20 21:13:52 UTC 
(rev 2984)
@@ -206,3 +206,25 @@
   Socket sock (handles[1]);
   sock.close ();
 }
+
+/*!
+ *Configure the server socket blocking or not blocking.
+ *\param blocking The new blocking status.
+ */
+void SocketPair::setNonBlocking (bool notBlocking)
+{
+   Socket sock0 (handles[0]);
+   sock0.setNonBlocking (notBlocking);
+
+   Socket sock1 (handles[1]);
+   sock1.setNonBlocking (notBlocking);
+}
+
+/*!
+ *Return how many bytes can be read on the first socket.
+ */
+u_long SocketPair::bytesToRead()
+{
+  Socket sock (handles[0]);
+  return sock.bytesToRead ();
+}

Modified: trunk/myserver/src/connections_scheduler/connections_scheduler.cpp
===================================================================
--- trunk/myserver/src/connections_scheduler/connections_scheduler.cpp  
2009-01-19 21:50:45 UTC (rev 2983)
+++ trunk/myserver/src/connections_scheduler/connections_scheduler.cpp  
2009-01-20 21:13:52 UTC (rev 2984)
@@ -19,92 +19,7 @@
 #include <include/server/server.h>
 
 
-//////FIXME: Use the SocketPair class//////////////////////////
 #ifdef WIN32
-#define socket_t intptr_t
-#include <windows.h>
-#else
-#define socket_t int
-#endif
-
-static int
-make_socket_nonblocking(FileHandle fd)
-{
-#ifdef WIN32
-  unsigned long nonblocking = 1;
-  ioctlsocket(fd, FIONBIO, (unsigned long*) &nonblocking);
-#else
-  return fcntl(fd, F_SETFL, O_NONBLOCK);
-#endif
-  return 0;
-}
-
-static int
-create_socketpair(int af, int type, int protocol, FileHandle socks[2])
-{
-#ifndef WIN32
-  return socketpair(af, type, protocol, (int*)socks);
-#else
-  struct sockaddr_in addr;
-  SOCKET listener;
-  int e;
-  int addrlen = sizeof(addr);
-  DWORD flags = 0;
-  
-  if (socks == 0)
-    return -1;
-
-  socks[0] = socks[1] = INVALID_SOCKET;
-  listener = socket(AF_INET, type, 0);
-
-  if (listener == INVALID_SOCKET)
-    return -1;
-
-  memset(&addr, 0, sizeof(addr));
-  addr.sin_family = AF_INET;
-  addr.sin_addr.s_addr = htonl(0x7f000001);
-  addr.sin_port = 0;
-
-  e = bind(listener, (const struct sockaddr*) &addr, sizeof(addr));
-  if (e == SOCKET_ERROR)
-  {
-    close(listener);
-    return -1;
-  }
-  
-  e = getsockname(listener, (struct sockaddr*) &addr, &addrlen);
-  if (e == SOCKET_ERROR)
-  {
-    close(listener);
-    return -1;
-  }
-  
-  do
-  {
-    if (listen(listener, 1) == SOCKET_ERROR)
-      break;
-    if ((socks[0] = socket(AF_INET, type, 0)) == INVALID_SOCKET)
-      break;
-    if (connect(socks[0], (const struct sockaddr*) &addr, sizeof(addr)) == 
SOCKET_ERROR)
-      break;
-    if ((socks[1] = accept(listener, NULL, NULL)) == INVALID_SOCKET)
-      break;
-    
-    close(listener);
-    return 0;
-  } while (0);
-  
-  close(listener);
-  close(socks[0]);
-  close(socks[1]);
-  
-  return -1;
-  #endif
-}
-
-///////////////////////////////////////////////////////////////////////////
-
-#ifdef WIN32
 static unsigned int __stdcall dispatcher(void* p)
 #else
 static void* dispatcher(void* p)
@@ -182,16 +97,16 @@
 static void eventLoopHandler(int fd, short event, void *arg)
 {
   ConnectionsScheduler::DispatcherArg *da = 
(ConnectionsScheduler::DispatcherArg*)arg;
- 
-  if(event == EV_READ || event == EV_TIMEOUT)
+  u_long nbr;
+  if (event == EV_READ || event == EV_TIMEOUT)
   {
-    Socket sock((FileHandle) da->fd[0]);
+    Socket sock ((FileHandle) da->socketPair.getFirstHandle ());
 
-    while(sock.bytesToRead())
+    while (da->socketPair.bytesToRead())
     {
       char cmd;
-      sock.recv(&cmd, 1, 0);
-      if(cmd == 'c')
+      da->socketPair.read (&cmd, 1, &nbr);
+      if (cmd == 'c')
       {
         /*
          *Schedule a new connection.
@@ -204,43 +119,43 @@
         ConnectionPtr c;
         timeval tv = {10, 0};
 
-        sock.recv((char*)&handle, sizeof(FileHandle), 0);
-        sock.recv((char*)&c, sizeof(ConnectionPtr), 0);
-        sock.recv((char*)&tv, sizeof(timeval), 0);
+        da->socketPair.read ((char*)&handle, sizeof(FileHandle), &nbr);
+        da->socketPair.read ((char*)&c, sizeof(ConnectionPtr), &nbr);
+        da->socketPair.read ((char*)&tv, sizeof(timeval), &nbr);
 
-        event_once((int) handle, EV_READ | EV_TIMEOUT, newDataHandler, da, 
&tv);
+        event_once ((int) handle, EV_READ | EV_TIMEOUT, newDataHandler, da, 
&tv);
       }
-      if(cmd == 'r')
+      if (cmd == 'r')
         return;
       /* Handle other cmd without do anything else.  */
     }
 
-    event_add(&(da->loopEvent), NULL);
+    event_add (&(da->loopEvent), NULL);
   }
 }
 
-static void listenerHandler(int fd, short event, void *arg)
+static void listenerHandler (int fd, short event, void *arg)
 {
   static timeval tv = {5, 0};
   ConnectionsScheduler::ListenerArg* s = 
(ConnectionsScheduler::ListenerArg*)arg;
 
-  if(event == EV_TIMEOUT)
+  if (event == EV_TIMEOUT)
   {
     event_add (&(s->ev), &tv);
   }
-  else if(event == EV_READ)
+  else if (event == EV_READ)
   {
     MYSERVER_SOCKADDRIN asockIn;
     int asockInLen = 0;
     Socket asock;
 
-    asockInLen = sizeof(sockaddr_in);
-    asock = s->serverSocket->accept(&asockIn, &asockInLen);
+    asockInLen = sizeof (sockaddr_in);
+    asock = s->serverSocket->accept (&asockIn, &asockInLen);
 
-    if(s->server &&
+    if (s->server &&
        asock.getHandle() != (FileHandle)INVALID_SOCKET)
     {
-      s->server->addConnection(asock, &asockIn);
+      s->server->addConnection (asock, &asockIn);
     }
 
     event_add (&(s->ev), &tv);
@@ -273,7 +188,7 @@
   event_add(&(arg->ev), &tv);
 
   u_long nbw;
-  Socket sock ((FileHandle)dispatcherArg.fd[1]);
+  Socket sock ((FileHandle)dispatcherArg.socketPair.getSecondHandle ());
 
   eventsSocketMutex.lock();
   sock.write("l", 1, &nbw);  
@@ -372,9 +287,9 @@
 #else
 #define LOCAL_SOCKETPAIR_AF AF_UNIX
 #endif
-  int err = create_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0,
-                              dispatcherArg.fd);
 
+  int err = dispatcherArg.socketPair.create ();
+
   if (err == -1)
   {
     if(server)
@@ -384,19 +299,18 @@
     return;
   }
 
-  make_socket_nonblocking(dispatcherArg.fd[0]);
-  make_socket_nonblocking(dispatcherArg.fd[1]);
+  dispatcherArg.socketPair.setNonBlocking (true);
 
-  event_set(&(dispatcherArg.loopEvent), dispatcherArg.fd[0], EV_READ | 
EV_TIMEOUT,
+  event_set (&(dispatcherArg.loopEvent), 
dispatcherArg.socketPair.getFirstHandle (), EV_READ | EV_TIMEOUT,
             eventLoopHandler, &dispatcherArg);
 
-  event_add(&(dispatcherArg.loopEvent), NULL);
+  event_add (&(dispatcherArg.loopEvent), NULL);
 
-  if(Thread::create(&dispatchedThreadId, dispatcher, &dispatcherArg))
+  if (Thread::create (&dispatchedThreadId, dispatcher, &dispatcherArg))
   {
-    if(server)
+    if (server)
     {
-      server->logWriteln("Error initializing dispatcher thread.", 
MYSERVER_LOG_MSG_ERROR);
+      server->logWriteln ("Error initializing dispatcher thread.", 
MYSERVER_LOG_MSG_ERROR);
     }
     dispatchedThreadId = 0;
   }
@@ -443,7 +357,7 @@
   int priority = c->getPriority();
 
   if(priority == -1 && c->host)
-      priority = c->host->getDefaultPriority();
+    priority = c->host->getDefaultPriority();
     
   priority = std::max(0, priority);
   priority = std::min(PRIORITY_CLASSES - 1, priority);
@@ -504,7 +418,7 @@
   if(lock)
   {
     u_long nbw;
-    Socket sock (dispatcherArg.fd[1]);
+    Socket sock (dispatcherArg.socketPair.getSecondHandle ());
 
     eventsSocketMutex.lock();
     sock.write("c", 1, &nbw);
@@ -578,8 +492,8 @@
     readySemaphore->unlock();
   }
 
-  Socket sockR (dispatcherArg.fd[0]);
-  Socket sockW (dispatcherArg.fd[1]);
+  Socket sockR (dispatcherArg.socketPair.getFirstHandle ());
+  Socket sockW (dispatcherArg.socketPair.getSecondHandle ());
 
   eventsSocketMutex.lock();
   sockW.write("r", 1, &nbw);
@@ -607,11 +521,7 @@
   
   eventsMutex.unlock();
 
-  sockR.shutdown(SD_BOTH);
-  sockW.shutdown(SD_BOTH);
-
-  sockR.close();
-  sockW.close();
+  dispatcherArg.socketPair.close ();
 }
 
 /*!






reply via email to

[Prev in Thread] Current Thread [Next in Thread]