gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r27060 - in gnunet-planetlab/gplmt: contrib gplmt


From: gnunet
Subject: [GNUnet-SVN] r27060 - in gnunet-planetlab/gplmt: contrib gplmt
Date: Tue, 7 May 2013 21:01:10 +0200

Author: otarabai
Date: 2013-05-07 21:01:10 +0200 (Tue, 07 May 2013)
New Revision: 27060

Added:
   gnunet-planetlab/gplmt/contrib/test_node.nodes
Modified:
   gnunet-planetlab/gplmt/gplmt/Nodes.py
   gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
Added the ability to specify user/pass/port in nodes file to override the ones 
in the config file


Added: gnunet-planetlab/gplmt/contrib/test_node.nodes
===================================================================
--- gnunet-planetlab/gplmt/contrib/test_node.nodes                              
(rev 0)
+++ gnunet-planetlab/gplmt/contrib/test_node.nodes      2013-05-07 19:01:10 UTC 
(rev 27060)
@@ -0,0 +1,4 @@
+host1.example.com
+host2.example.com:222
+user:address@hidden:222
+

Modified: gnunet-planetlab/gplmt/gplmt/Nodes.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Nodes.py       2013-05-07 17:44:00 UTC (rev 
27059)
+++ gnunet-planetlab/gplmt/gplmt/Nodes.py       2013-05-07 19:01:10 UTC (rev 
27060)
@@ -32,6 +32,44 @@
     print "That's a bug! please check README"
     sys.exit (1) 
 
+class Node:
+    def __init__(self, hostname, port = None, username = None, password = 
None):
+        self.hostname = hostname
+        self.port = port
+        self.username = username
+        self.password = password
+
+    @staticmethod
+    def parse(line):
+        parts = line.split('@')
+        hostname = None
+        port = None
+        username = None
+        password = None
+
+        if len(parts) == 2: #credentials supplied
+            creds = parts[0].split(':')
+            if len(creds) != 2:
+                raise Exception("Invalid node definition: " + line)
+            username = creds[0]
+            password = creds[1]
+
+            line = parts[1]
+
+        elif len(parts) > 2:
+            raise Exception("Invalid node definition: " + line)
+
+        #parse host/port
+        hostport = line.split(':')
+        hostname = hostport[0]
+        if len(hostport) == 2:
+            port = int(hostport[1])
+        elif len(hostport) > 2:
+            raise Exception("Invalid node definition: " + line)
+
+        return Node(hostname, port, username, password)
+
+
 class Nodes:
     def __init__(self, filename, logger):
         assert (None != logger)
@@ -45,7 +83,7 @@
             for line in fobj: 
                 line = line.strip() 
                 self.logger.log ("Found node '" + line + "'")
-                self.nodes.append(line)
+                self.nodes.append(Node.parse(line))
             fobj.close()
         except IOError:
             print "File " + self.filename + " not found"
@@ -108,4 +146,4 @@
             self.logger.log ("Planetlab API returned: " + node)            
             self.nodes.append(node)
         self.logger.log ("Planetlab API returned " + str(len(self.nodes)) + " 
nodes")     
-        return True
\ No newline at end of file
+        return True

Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py      2013-05-07 17:44:00 UTC (rev 
27059)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py      2013-05-07 19:01:10 UTC (rev 
27060)
@@ -100,7 +100,7 @@
                 if (line[0] == '#'):
                     continue;
                 sline = line.split (';',2)
-                if (sline[0] == self.node):
+                if (sline[0] == self.node.hostname):
                     cmd = sline[1].strip()
                     found = True
                 if (sline[0] == ''):
@@ -111,15 +111,15 @@
         
         t = task.copy()
         if (found == True):
-            g_logger.log (self.node + " : Found specific command '"+ cmd + "'")
+            g_logger.log (self.node.hostname + " : Found specific command '"+ 
cmd + "'")
             t.command = cmd
             t.arguments = ""
         elif ((found == False) and (default != None)):
-            g_logger.log (self.node + " : Using default command '"+ default + 
"'")
+            g_logger.log (self.node.hostname + " : Using default command '"+ 
default + "'")
             t.command = default
             t.arguments = ""
         else:
-            g_logger.log (self.node + " : Task '"+ task.name + "' failed: no 
command to execute")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
failed: no command to execute")
             return Tasks.Taskresult.fail
         
         return self.exec_run(t, transport)
@@ -127,10 +127,10 @@
     def exec_run (self, task, transport):
         global interrupt
         if(interrupt):
-            g_logger.log (self.node + " : Task '"+ task.name + "' interrupted 
by user")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
interrupted by user")
             return Tasks.Taskresult.user_interrupt
         if ((task.command == None) and (task.arguments == None)):
-            g_logger.log (self.node + " : Task '"+ task.name + "' failed: no 
command to execute")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
failed: no command to execute")
             return Tasks.Taskresult.fail
     
         try:
@@ -141,7 +141,7 @@
             channel.exec_command(task.command + " " + task.arguments)
             
         except SSHException as e:
-            g_logger.log (self.node + " : Error while trying to connect: " + 
str(e))
+            g_logger.log (self.node.hostname + " : Error while trying to 
connect: " + str(e))
         
         if (task.timeout > 0):
             timeout = task.timeout
@@ -160,7 +160,7 @@
             if (timeout != -1):
                 delta = time.time() - start_time
                 if (delta > timeout):
-                    g_logger.log (self.node + " : Timeout after " +str(delta) 
+" seconds")                    
+                    g_logger.log (self.node.hostname + " : Timeout after " 
+str(delta) +" seconds")                    
                     result = Tasks.Taskresult.timeout
                     break
             (r, w, e) = select.select([channel], [], [], 1)
@@ -170,13 +170,13 @@
                     data = r[0].recv(4096)
                     if data:
                         got_data = True
-                        g_logger.log (self.node + " : " + data)
+                        g_logger.log (self.node.hostname + " : " + data)
                         stdout_data += data
                 if channel.recv_stderr_ready():
                     data = r[0].recv_stderr(4096)
                     if data:
                         got_data = True
-                        g_logger.log (self.node + " : " + data)
+                        g_logger.log (self.node.hostname + " : " + data)
                         stderr_data += data
                 if not got_data:
                     break
@@ -187,12 +187,12 @@
         if (result == Tasks.Taskresult.success):
             if (task.expected_return_code != -1):                    
                 if (exit_status != task.expected_return_code):
-                    g_logger.log (self.node + " : Task '"+ task.name + "' 
completed after "+ str(time.time() - start_time) +" sec, but exit code " 
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, but exit code " 
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
                     g_logger.log (stdout_data)
                     g_logger.log (stderr_data)
                     result = Tasks.Taskresult.return_value_did_not_match
                 else:
-                    g_logger.log (self.node + " : Task '"+ task.name + "' 
completed after "+ str(time.time() - start_time) +" sec, exit code " 
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code " 
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
             
             if (task.expected_output != None):
                 output_contained = False
@@ -201,19 +201,19 @@
                 if (task.expected_output in stderr_data):
                         output_contained = True                
                 if (output_contained == True):
-                    g_logger.log (self.node + " : Task '"+ task.name + "' 
expected output '"+task.expected_output+"' was found")
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' expected output '"+task.expected_output+"' was found")
                 else:
-                    g_logger.log (self.node + " : Task '"+ task.name + "' 
expected output '"+task.expected_output+"' was not found")
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' expected output '"+task.expected_output+"' was not found")
                     result = Tasks.Taskresult.output_did_not_match
                     
         if (result == Tasks.Taskresult.success):
-            g_logger.log (self.node + " : Task '"+ task.name + "' successful")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
successful")
         elif (result == Tasks.Taskresult.timeout):
-            g_logger.log (self.node + " : Task '"+ task.name + "' with 
timeout")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
with timeout")
         elif (result == Tasks.Taskresult.user_interrupt):
-            g_logger.log (self.node + " : Task '"+ task.name + "' interrupted 
by user")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
interrupted by user")
         else: 
-            g_logger.log (self.node + " : Task '"+ task.name + "' failed")
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' 
failed")
         return result
     def exec_put (self, task, transport):
         if (False == os.path.exists (task.src)):
@@ -226,7 +226,7 @@
                     scp = SCPClient (transport)
                     scp.put (task.src, task.dest)
                 except SCPException as e:
-                    g_logger.log (self.node + " : Task '"+ task.name + "' :" + 
str(e))
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' :" + str(e))
                     result = Tasks.Taskresult.fail
                     pass
             if (g_configuration.ssh_transfer == 
Configuration.TransferMode.sftp):                
@@ -234,7 +234,7 @@
                 sftp.put(task.src, task.dest)
                 sftp.close()
         except paramiko.SSHException as e:
-            g_logger.log (self.node + " : Task '"+ task.name + "' :" + str(e))
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :" 
+ str(e))
             result = Tasks.Taskresult.fail
             pass
         except (OSError, IOError) as e:
@@ -251,7 +251,7 @@
                     scp = SCPClient (transport)
                     scp.get (task.src, task.dest)
                 except SCPException as e:
-                    g_logger.log (self.node + " : Task '"+ task.name + "' :" + 
str(e))
+                    g_logger.log (self.node.hostname + " : Task '"+ task.name 
+ "' :" + str(e))
                     result = Tasks.Taskresult.fail
                     pass                
             if (g_configuration.ssh_transfer == 
Configuration.TransferMode.sftp):
@@ -259,25 +259,25 @@
                 sftp.get (task.src, task.dest)
                 sftp.close()
         except paramiko.SSHException as e:
-            g_logger.log (self.node + " : Task '"+ task.name + "' :" + str(e))
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :" 
+ str(e))
             result = Tasks.Taskresult.fail
             pass
         except (OSError, IOError) as e:
-            g_logger.log (self.node + " : Task '"+ task.name + "' : " + str(e))
+            g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : " 
+ str(e))
             result = Tasks.Taskresult.src_file_not_found 
             pass           
         return result    
           
     def run(self):
         global interrupt
-        g_logger.log (self.node + " : Starting tasklist " + self.tasks.name)
+        g_logger.log (self.node.hostname + " : Starting tasklist " + 
self.tasks.name)
         task = self.tasks.get()
         if(interrupt):
             return Tasks.Taskresult.user_interrupt
         try: 
             ssh = paramiko.SSHClient()
             if (g_configuration.ssh_use_known_hosts):
-                g_logger.log (self.node + " : Loading known hosts")
+                g_logger.log (self.node.hostname + " : Loading known hosts")
                 ssh.load_system_host_keys ()
 
             # Automatically add new hostkeys
@@ -287,66 +287,78 @@
             keyfile = None
             if (g_configuration.ssh_keyfile != None): 
                 if (os.path.exists (g_configuration.ssh_keyfile)):
-                    g_logger.log (self.node + " : Found " + 
g_configuration.ssh_keyfile)
+                    g_logger.log (self.node.hostname + " : Found " + 
g_configuration.ssh_keyfile)
                     keyfile = g_configuration.ssh_keyfile
                 else:
-                    g_logger.log (self.node + " : Not found " + 
g_configuration.ssh_keyfile)
-                
-            if ("" != g_configuration.ssh_username):
-                g_logger.log (self.node + " : Trying to connect to " + 
-                          g_configuration.ssh_username + "@" + self.node + 
+                    g_logger.log (self.node.hostname + " : Not found " + 
g_configuration.ssh_keyfile)
+            
+            if self.node.username is not None: #credentials are supplied in 
node file
+                g_logger.log (self.node.hostname + " : Trying to connect to " 
+ 
+                    self.node.username + "@" + self.node.hostname + 
+                    " using password '" + self.node.password)
+                ssh.connect (self.node.hostname,
+                        port=self.node.port or 22,
+                        username=self.node.username,
+                        password=self.node.password,
+                        timeout=10)
+
+            elif ("" != g_configuration.ssh_username):
+                g_logger.log (self.node.hostname + " : Trying to connect to " 
+ 
+                          g_configuration.ssh_username + "@" + 
self.node.hostname + 
                           " using password '" + g_configuration.ssh_password+ 
                           "' and private keyfile '" +str(keyfile)+ "'")
-                ssh.connect (self.node, 
-                         username=g_configuration.ssh_username, 
-                         password=g_configuration.ssh_password,
-                         timeout=10,
-                         key_filename=keyfile) 
+                ssh.connect (self.node.hostname,
+                       port=self.node.port or 22,
+                                       username=g_configuration.ssh_username, 
+                                       password=g_configuration.ssh_password,
+                                       timeout=10,
+                                       key_filename=keyfile)
             else:
-                g_logger.log (self.node + " : Trying to connect to " + 
-                          self.node + 
+                g_logger.log (self.node.hostname + " : Trying to connect to " 
+ 
+                          self.node.hostname + 
                           " using password '" + g_configuration.ssh_password+ 
                           "' and private keyfile '" +str(keyfile)+ "'")
-                ssh.connect (self.node, 
-                         password=g_configuration.ssh_password,
-                         timeout=10,
-                         key_filename=keyfile)                 
+                ssh.connect (self.node.hostname,
+                       port=self.node.port or 22,
+                                       password=g_configuration.ssh_password,
+                                       timeout=10,
+                                       key_filename=keyfile)                 
         except (IOError,
                 paramiko.SSHException,
                 paramiko.BadHostKeyException, 
                 paramiko.AuthenticationException,
                 socket.error) as e:  
-            g_logger.log (self.node + " : Error while trying to connect: " + 
str(e))
-            g_notifications.node_connected (self.node, False)
-            g_notifications.tasklist_completed (self.node, self.tasks, False)
+            g_logger.log (self.node.hostname + " : Error while trying to 
connect: " + str(e))
+            g_notifications.node_connected (self.node.hostname, False)
+            g_notifications.tasklist_completed (self.node.hostname, 
self.tasks, False)
             return
         
-        g_notifications.node_connected (self.node, True)
+        g_notifications.node_connected (self.node.hostname, True)
         transport = ssh.get_transport()        
         success = True
         result = Tasks.Taskresult.success
         while (None != task and not interrupt):
-            g_logger.log (self.node + " : Running task id " +str(task.id)+" '" 
+ task.name + "'")
-            g_notifications.task_started (self.node, task)
+            g_logger.log (self.node.hostname + " : Running task id " 
+str(task.id)+" '" + task.name + "'")
+            g_notifications.task_started (self.node.hostname, task)
             if (task.__class__.__name__ == "Task"):
                 if (task.type == Tasks.Operation.run):
                     result = self.exec_run (task, transport)
-                    g_notifications.task_completed (self.node, task, result)
+                    g_notifications.task_completed (self.node.hostname, task, 
result)
                 elif (task.type == Tasks.Operation.put):
                     result = self.exec_put (task, transport)
-                    g_notifications.task_completed (self.node, task, result)
+                    g_notifications.task_completed (self.node.hostname, task, 
result)
                 elif (task.type == Tasks.Operation.get):
                     result = self.exec_get (task, transport)
-                    g_notifications.task_completed (self.node, task, result)
+                    g_notifications.task_completed (self.node.hostname, task, 
result)
                 elif (task.type == Tasks.Operation.run_per_host):
                     result = self.exec_run_per_host (task, transport)
-                    g_notifications.task_completed (self.node, task, result)   
                 
+                    g_notifications.task_completed (self.node.hostname, task, 
result)                    
                 else:
                     print "UNSUPPORTED OPERATION!"                    
             elif (task.__class__.__name__ == "Taskset"):
-                g_logger.log (self.node + " : Running task set")
+                g_logger.log (self.node.hostname + " : Running task set")
             if ((task.stop_on_fail == True) and (result != 
Tasks.Taskresult.success)):
-                g_logger.log (self.node + " : Task failed and therefore 
execution is stopped")
+                g_logger.log (self.node.hostname + " : Task failed and 
therefore execution is stopped")
                 transport.close()
                 success = False
                 break
@@ -358,9 +370,9 @@
             task = self.tasks.get()
         
         ssh.close()
-        g_notifications.node_disconnected (self.node, True)
-        g_notifications.tasklist_completed (self.node, self.tasks, success)
-        g_logger.log (self.node + " : All tasks done for " + self.node)
+        g_notifications.node_disconnected (self.node.hostname, True)
+        g_notifications.tasklist_completed (self.node.hostname, self.tasks, 
success)
+        g_logger.log (self.node.hostname + " : All tasks done for " + 
self.node.hostname)
         
 
 
@@ -372,8 +384,8 @@
         self.tasks = tasks
         self.thread = None
     def start (self):
-        g_logger.log ("Starting execution for node " + self.node)
-        g_notifications.tasklist_started (self.node, self.tasks)
+        g_logger.log ("Starting execution for node " + self.node.hostname)
+        g_notifications.tasklist_started (self.node.hostname, self.tasks)
         self.thread = NodeWorkerThread (1, self.node, self.tasks)
         self.thread.start()
     




reply via email to

[Prev in Thread] Current Thread [Next in Thread]