[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r27060 - in gnunet-planetlab/gplmt: contrib gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r27060 - in gnunet-planetlab/gplmt: contrib gplmt |
Date: |
Tue, 7 May 2013 21:01:10 +0200 |
Author: otarabai
Date: 2013-05-07 21:01:10 +0200 (Tue, 07 May 2013)
New Revision: 27060
Added:
gnunet-planetlab/gplmt/contrib/test_node.nodes
Modified:
gnunet-planetlab/gplmt/gplmt/Nodes.py
gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
Added the ability to specify user/pass/port in nodes file to override the ones
in the config file
Added: gnunet-planetlab/gplmt/contrib/test_node.nodes
===================================================================
--- gnunet-planetlab/gplmt/contrib/test_node.nodes
(rev 0)
+++ gnunet-planetlab/gplmt/contrib/test_node.nodes 2013-05-07 19:01:10 UTC
(rev 27060)
@@ -0,0 +1,4 @@
+host1.example.com
+host2.example.com:222
+user:address@hidden:222
+
Modified: gnunet-planetlab/gplmt/gplmt/Nodes.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Nodes.py 2013-05-07 17:44:00 UTC (rev
27059)
+++ gnunet-planetlab/gplmt/gplmt/Nodes.py 2013-05-07 19:01:10 UTC (rev
27060)
@@ -32,6 +32,44 @@
print "That's a bug! please check README"
sys.exit (1)
+class Node:
+ def __init__(self, hostname, port = None, username = None, password =
None):
+ self.hostname = hostname
+ self.port = port
+ self.username = username
+ self.password = password
+
+ @staticmethod
+ def parse(line):
+ parts = line.split('@')
+ hostname = None
+ port = None
+ username = None
+ password = None
+
+ if len(parts) == 2: #credentials supplied
+ creds = parts[0].split(':')
+ if len(creds) != 2:
+ raise Exception("Invalid node definition: " + line)
+ username = creds[0]
+ password = creds[1]
+
+ line = parts[1]
+
+ elif len(parts) > 2:
+ raise Exception("Invalid node definition: " + line)
+
+ #parse host/port
+ hostport = line.split(':')
+ hostname = hostport[0]
+ if len(hostport) == 2:
+ port = int(hostport[1])
+ elif len(hostport) > 2:
+ raise Exception("Invalid node definition: " + line)
+
+ return Node(hostname, port, username, password)
+
+
class Nodes:
def __init__(self, filename, logger):
assert (None != logger)
@@ -45,7 +83,7 @@
for line in fobj:
line = line.strip()
self.logger.log ("Found node '" + line + "'")
- self.nodes.append(line)
+ self.nodes.append(Node.parse(line))
fobj.close()
except IOError:
print "File " + self.filename + " not found"
@@ -108,4 +146,4 @@
self.logger.log ("Planetlab API returned: " + node)
self.nodes.append(node)
self.logger.log ("Planetlab API returned " + str(len(self.nodes)) + "
nodes")
- return True
\ No newline at end of file
+ return True
Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py 2013-05-07 17:44:00 UTC (rev
27059)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py 2013-05-07 19:01:10 UTC (rev
27060)
@@ -100,7 +100,7 @@
if (line[0] == '#'):
continue;
sline = line.split (';',2)
- if (sline[0] == self.node):
+ if (sline[0] == self.node.hostname):
cmd = sline[1].strip()
found = True
if (sline[0] == ''):
@@ -111,15 +111,15 @@
t = task.copy()
if (found == True):
- g_logger.log (self.node + " : Found specific command '"+ cmd + "'")
+ g_logger.log (self.node.hostname + " : Found specific command '"+
cmd + "'")
t.command = cmd
t.arguments = ""
elif ((found == False) and (default != None)):
- g_logger.log (self.node + " : Using default command '"+ default +
"'")
+ g_logger.log (self.node.hostname + " : Using default command '"+
default + "'")
t.command = default
t.arguments = ""
else:
- g_logger.log (self.node + " : Task '"+ task.name + "' failed: no
command to execute")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
failed: no command to execute")
return Tasks.Taskresult.fail
return self.exec_run(t, transport)
@@ -127,10 +127,10 @@
def exec_run (self, task, transport):
global interrupt
if(interrupt):
- g_logger.log (self.node + " : Task '"+ task.name + "' interrupted
by user")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
interrupted by user")
return Tasks.Taskresult.user_interrupt
if ((task.command == None) and (task.arguments == None)):
- g_logger.log (self.node + " : Task '"+ task.name + "' failed: no
command to execute")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
failed: no command to execute")
return Tasks.Taskresult.fail
try:
@@ -141,7 +141,7 @@
channel.exec_command(task.command + " " + task.arguments)
except SSHException as e:
- g_logger.log (self.node + " : Error while trying to connect: " +
str(e))
+ g_logger.log (self.node.hostname + " : Error while trying to
connect: " + str(e))
if (task.timeout > 0):
timeout = task.timeout
@@ -160,7 +160,7 @@
if (timeout != -1):
delta = time.time() - start_time
if (delta > timeout):
- g_logger.log (self.node + " : Timeout after " +str(delta)
+" seconds")
+ g_logger.log (self.node.hostname + " : Timeout after "
+str(delta) +" seconds")
result = Tasks.Taskresult.timeout
break
(r, w, e) = select.select([channel], [], [], 1)
@@ -170,13 +170,13 @@
data = r[0].recv(4096)
if data:
got_data = True
- g_logger.log (self.node + " : " + data)
+ g_logger.log (self.node.hostname + " : " + data)
stdout_data += data
if channel.recv_stderr_ready():
data = r[0].recv_stderr(4096)
if data:
got_data = True
- g_logger.log (self.node + " : " + data)
+ g_logger.log (self.node.hostname + " : " + data)
stderr_data += data
if not got_data:
break
@@ -187,12 +187,12 @@
if (result == Tasks.Taskresult.success):
if (task.expected_return_code != -1):
if (exit_status != task.expected_return_code):
- g_logger.log (self.node + " : Task '"+ task.name + "'
completed after "+ str(time.time() - start_time) +" sec, but exit code "
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, but exit code "
+str(exit_status)+ " was not as expected " + str(task.expected_return_code))
g_logger.log (stdout_data)
g_logger.log (stderr_data)
result = Tasks.Taskresult.return_value_did_not_match
else:
- g_logger.log (self.node + " : Task '"+ task.name + "'
completed after "+ str(time.time() - start_time) +" sec, exit code "
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code "
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
if (task.expected_output != None):
output_contained = False
@@ -201,19 +201,19 @@
if (task.expected_output in stderr_data):
output_contained = True
if (output_contained == True):
- g_logger.log (self.node + " : Task '"+ task.name + "'
expected output '"+task.expected_output+"' was found")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' expected output '"+task.expected_output+"' was found")
else:
- g_logger.log (self.node + " : Task '"+ task.name + "'
expected output '"+task.expected_output+"' was not found")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' expected output '"+task.expected_output+"' was not found")
result = Tasks.Taskresult.output_did_not_match
if (result == Tasks.Taskresult.success):
- g_logger.log (self.node + " : Task '"+ task.name + "' successful")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
successful")
elif (result == Tasks.Taskresult.timeout):
- g_logger.log (self.node + " : Task '"+ task.name + "' with
timeout")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
with timeout")
elif (result == Tasks.Taskresult.user_interrupt):
- g_logger.log (self.node + " : Task '"+ task.name + "' interrupted
by user")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
interrupted by user")
else:
- g_logger.log (self.node + " : Task '"+ task.name + "' failed")
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "'
failed")
return result
def exec_put (self, task, transport):
if (False == os.path.exists (task.src)):
@@ -226,7 +226,7 @@
scp = SCPClient (transport)
scp.put (task.src, task.dest)
except SCPException as e:
- g_logger.log (self.node + " : Task '"+ task.name + "' :" +
str(e))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' :" + str(e))
result = Tasks.Taskresult.fail
pass
if (g_configuration.ssh_transfer ==
Configuration.TransferMode.sftp):
@@ -234,7 +234,7 @@
sftp.put(task.src, task.dest)
sftp.close()
except paramiko.SSHException as e:
- g_logger.log (self.node + " : Task '"+ task.name + "' :" + str(e))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :"
+ str(e))
result = Tasks.Taskresult.fail
pass
except (OSError, IOError) as e:
@@ -251,7 +251,7 @@
scp = SCPClient (transport)
scp.get (task.src, task.dest)
except SCPException as e:
- g_logger.log (self.node + " : Task '"+ task.name + "' :" +
str(e))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' :" + str(e))
result = Tasks.Taskresult.fail
pass
if (g_configuration.ssh_transfer ==
Configuration.TransferMode.sftp):
@@ -259,25 +259,25 @@
sftp.get (task.src, task.dest)
sftp.close()
except paramiko.SSHException as e:
- g_logger.log (self.node + " : Task '"+ task.name + "' :" + str(e))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :"
+ str(e))
result = Tasks.Taskresult.fail
pass
except (OSError, IOError) as e:
- g_logger.log (self.node + " : Task '"+ task.name + "' : " + str(e))
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : "
+ str(e))
result = Tasks.Taskresult.src_file_not_found
pass
return result
def run(self):
global interrupt
- g_logger.log (self.node + " : Starting tasklist " + self.tasks.name)
+ g_logger.log (self.node.hostname + " : Starting tasklist " +
self.tasks.name)
task = self.tasks.get()
if(interrupt):
return Tasks.Taskresult.user_interrupt
try:
ssh = paramiko.SSHClient()
if (g_configuration.ssh_use_known_hosts):
- g_logger.log (self.node + " : Loading known hosts")
+ g_logger.log (self.node.hostname + " : Loading known hosts")
ssh.load_system_host_keys ()
# Automatically add new hostkeys
@@ -287,66 +287,78 @@
keyfile = None
if (g_configuration.ssh_keyfile != None):
if (os.path.exists (g_configuration.ssh_keyfile)):
- g_logger.log (self.node + " : Found " +
g_configuration.ssh_keyfile)
+ g_logger.log (self.node.hostname + " : Found " +
g_configuration.ssh_keyfile)
keyfile = g_configuration.ssh_keyfile
else:
- g_logger.log (self.node + " : Not found " +
g_configuration.ssh_keyfile)
-
- if ("" != g_configuration.ssh_username):
- g_logger.log (self.node + " : Trying to connect to " +
- g_configuration.ssh_username + "@" + self.node +
+ g_logger.log (self.node.hostname + " : Not found " +
g_configuration.ssh_keyfile)
+
+ if self.node.username is not None: #credentials are supplied in
node file
+ g_logger.log (self.node.hostname + " : Trying to connect to "
+
+ self.node.username + "@" + self.node.hostname +
+ " using password '" + self.node.password)
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ username=self.node.username,
+ password=self.node.password,
+ timeout=10)
+
+ elif ("" != g_configuration.ssh_username):
+ g_logger.log (self.node.hostname + " : Trying to connect to "
+
+ g_configuration.ssh_username + "@" +
self.node.hostname +
" using password '" + g_configuration.ssh_password+
"' and private keyfile '" +str(keyfile)+ "'")
- ssh.connect (self.node,
- username=g_configuration.ssh_username,
- password=g_configuration.ssh_password,
- timeout=10,
- key_filename=keyfile)
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ username=g_configuration.ssh_username,
+ password=g_configuration.ssh_password,
+ timeout=10,
+ key_filename=keyfile)
else:
- g_logger.log (self.node + " : Trying to connect to " +
- self.node +
+ g_logger.log (self.node.hostname + " : Trying to connect to "
+
+ self.node.hostname +
" using password '" + g_configuration.ssh_password+
"' and private keyfile '" +str(keyfile)+ "'")
- ssh.connect (self.node,
- password=g_configuration.ssh_password,
- timeout=10,
- key_filename=keyfile)
+ ssh.connect (self.node.hostname,
+ port=self.node.port or 22,
+ password=g_configuration.ssh_password,
+ timeout=10,
+ key_filename=keyfile)
except (IOError,
paramiko.SSHException,
paramiko.BadHostKeyException,
paramiko.AuthenticationException,
socket.error) as e:
- g_logger.log (self.node + " : Error while trying to connect: " +
str(e))
- g_notifications.node_connected (self.node, False)
- g_notifications.tasklist_completed (self.node, self.tasks, False)
+ g_logger.log (self.node.hostname + " : Error while trying to
connect: " + str(e))
+ g_notifications.node_connected (self.node.hostname, False)
+ g_notifications.tasklist_completed (self.node.hostname,
self.tasks, False)
return
- g_notifications.node_connected (self.node, True)
+ g_notifications.node_connected (self.node.hostname, True)
transport = ssh.get_transport()
success = True
result = Tasks.Taskresult.success
while (None != task and not interrupt):
- g_logger.log (self.node + " : Running task id " +str(task.id)+" '"
+ task.name + "'")
- g_notifications.task_started (self.node, task)
+ g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
+ g_notifications.task_started (self.node.hostname, task)
if (task.__class__.__name__ == "Task"):
if (task.type == Tasks.Operation.run):
result = self.exec_run (task, transport)
- g_notifications.task_completed (self.node, task, result)
+ g_notifications.task_completed (self.node.hostname, task,
result)
elif (task.type == Tasks.Operation.put):
result = self.exec_put (task, transport)
- g_notifications.task_completed (self.node, task, result)
+ g_notifications.task_completed (self.node.hostname, task,
result)
elif (task.type == Tasks.Operation.get):
result = self.exec_get (task, transport)
- g_notifications.task_completed (self.node, task, result)
+ g_notifications.task_completed (self.node.hostname, task,
result)
elif (task.type == Tasks.Operation.run_per_host):
result = self.exec_run_per_host (task, transport)
- g_notifications.task_completed (self.node, task, result)
+ g_notifications.task_completed (self.node.hostname, task,
result)
else:
print "UNSUPPORTED OPERATION!"
elif (task.__class__.__name__ == "Taskset"):
- g_logger.log (self.node + " : Running task set")
+ g_logger.log (self.node.hostname + " : Running task set")
if ((task.stop_on_fail == True) and (result !=
Tasks.Taskresult.success)):
- g_logger.log (self.node + " : Task failed and therefore
execution is stopped")
+ g_logger.log (self.node.hostname + " : Task failed and
therefore execution is stopped")
transport.close()
success = False
break
@@ -358,9 +370,9 @@
task = self.tasks.get()
ssh.close()
- g_notifications.node_disconnected (self.node, True)
- g_notifications.tasklist_completed (self.node, self.tasks, success)
- g_logger.log (self.node + " : All tasks done for " + self.node)
+ g_notifications.node_disconnected (self.node.hostname, True)
+ g_notifications.tasklist_completed (self.node.hostname, self.tasks,
success)
+ g_logger.log (self.node.hostname + " : All tasks done for " +
self.node.hostname)
@@ -372,8 +384,8 @@
self.tasks = tasks
self.thread = None
def start (self):
- g_logger.log ("Starting execution for node " + self.node)
- g_notifications.tasklist_started (self.node, self.tasks)
+ g_logger.log ("Starting execution for node " + self.node.hostname)
+ g_notifications.tasklist_started (self.node.hostname, self.tasks)
self.thread = NodeWorkerThread (1, self.node, self.tasks)
self.thread.start()
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r27060 - in gnunet-planetlab/gplmt: contrib gplmt,
gnunet <=