[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28184 - gnunet-planetlab/gplmt/gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28184 - gnunet-planetlab/gplmt/gplmt |
Date: |
Fri, 19 Jul 2013 10:39:49 +0200 |
Author: wachs
Date: 2013-07-19 10:39:48 +0200 (Fri, 19 Jul 2013)
New Revision: 28184
Modified:
gnunet-planetlab/gplmt/gplmt/Notifications.py
gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
get and put operations for SSH worker
Modified: gnunet-planetlab/gplmt/gplmt/Notifications.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Notifications.py 2013-07-19 08:05:35 UTC
(rev 28183)
+++ gnunet-planetlab/gplmt/gplmt/Notifications.py 2013-07-19 08:39:48 UTC
(rev 28184)
@@ -262,6 +262,6 @@
if (result == Tasklist.Taskresult.success):
print node.hostname + " : Task '" + task.name + "' completed
successfully"
elif (result == Tasklist.Taskresult.src_file_not_found):
- print node.hostname + " : Task '" + task.name + "' failed :
source file not found"
+ print node.hostname + " : Task '" + task.name + "' failed :
source file not found: " + message
else:
- print node.hostname + " : Task '" + task.name + "' completed with
failure"
+ print node.hostname + " : Task '" + task.name + "' completed with
failure" + message
Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-19 08:05:35 UTC (rev
28183)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-19 08:39:48 UTC (rev
28184)
@@ -102,9 +102,9 @@
raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self, task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_put (self):
+ def exec_put (self, task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_get (self):
+ def exec_get (self, task):
raise NotImplementedError (inspect.stack()[0][3])
def run(self):
global interrupt
@@ -133,6 +133,7 @@
while (None != task and not interrupt):
g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
g_notifications.task_started (self.node, task, "")
+ task_result = None
try:
if (task.type == Tasks.Operation.run):
task_result = self.exec_run (task)
@@ -155,14 +156,17 @@
except NotImplementedError as e:
print "Not implemented" + str (e)
pass
- except Exception as e:
- print "Exception in Worker:" + str (e)
+ except Exception as e2:
+ print "Exception in Worker: " + str (e2)
pass
if (interrupt):
break
+ if (None == task_result):
+ g_logger.log (self.node.hostname + " : Task '"+ task.name +"'
failed to execute")
+ task_result = TaskExecutionResult(Tasks.Taskresult.fail,
"failed to execute task: " + task.name, "")
+ pass
if ((task_result.result != Tasks.Taskresult.success) and
(task.stop_on_fail == True)):
- g_logger.log (self.node.hostname + " : Task failed and
therefore execution is stopped")
- g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
+ g_logger.log (self.node.hostname + " : Task failed and
therefore execution is stopped")
self.disconnect()
tasklist_success = False
break
@@ -205,10 +209,10 @@
def exec_run (self, task):
print "TestWorker executes '" + task.name + "'"
return TaskExecutionResult(Tasks.Taskresult.success, "exec_run
successful", "")
- def exec_put (self):
+ def exec_put (self, task):
print "TestWorker puts " + task.name + "'"
return TaskExecutionResult(Tasks.Taskresult.success, "exec_put
successful", "")
- def exec_get (self):
+ def exec_get (self, task):
print "TestWorker puts '" + task.name + "'"
return TaskExecutionResult(Tasks.Taskresult.success, "exec_get
successful", "")
@@ -221,9 +225,9 @@
raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self, task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_put (self):
+ def exec_put (self, task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_get (self):
+ def exec_get (self, task):
raise NotImplementedError (inspect.stack()[0][3])
@@ -252,20 +256,20 @@
g_logger.log (self.node.hostname + " : Trying to connect to '"
+Util.print_ssh_connection (self.node) + "'")
if self.node.username is not None: #credentials are supplied in
node file
if (self.node.password is not None):
- print "Using node information " + self.node.username + " "
+self.node.password
+ g_logger.log ( "Using node information " +
self.node.username + " " +self.node.password)
self.ssh.connect (self.node.hostname,
port=self.node.port or 22,
username=self.node.username,
password=self.node.password,
timeout=10)
else:
- print "Using node information " + self.node.username
+ g_logger.log ( "Using node information " +
self.node.username)
self.ssh.connect (self.node.hostname,
port=self.node.port or 22,
username=self.node.username,
timeout=10)
elif ("" != g_configuration.ssh_username):
- print "Using node information " + g_configuration.ssh_username
+ " " + g_configuration.ssh_password
+ g_logger.log ( "Using node information " +
g_configuration.ssh_username + " " + g_configuration.ssh_password)
self.ssh.connect (self.node.hostname,
port=self.node.port or 22,
username=g_configuration.ssh_username,
@@ -273,14 +277,14 @@
timeout=10,
key_filename=keyfile)
elif ("" != g_configuration.ssh_password):
- print "Using node information " + g_configuration.ssh_password
+ g_logger.log ( "Using node information " +
g_configuration.ssh_password)
self.ssh.connect (self.node.hostname,
port=self.node.port or 22,
password=g_configuration.self.ssh_password,
timeout=10,
key_filename=keyfile)
else:
- print "Using no information"
+ g_logger.log ("Using no information")
self.ssh.connect (self.node.hostname,
port=self.node.port or 22,
timeout=10,
@@ -302,7 +306,7 @@
return TaskExecutionResult (Tasks.Taskresult.success, "", "")
def exec_run_per_host (self, task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_run (self, task):
+ def exec_run (self, task):
global interrupt
message = "undefined"
output = ""
@@ -361,11 +365,9 @@
output += data
stderr_data += data
if not got_data:
- break
-
+ break
if (result == Tasks.Taskresult.success):
- exit_status = channel.recv_exit_status ()
-
+ exit_status = channel.recv_exit_status ()
if (result == Tasks.Taskresult.success):
if (task.expected_return_code != -1):
if (exit_status != task.expected_return_code):
@@ -374,8 +376,7 @@
g_logger.log (stderr_data)
result = Tasks.Taskresult.return_value_did_not_match
else:
- g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code "
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
-
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' completed after "+ str(time.time() - start_time) +" sec, exit code "
+str(exit_status)+ " was as expected " + str(task.expected_return_code))
if (task.expected_output != None):
output_contained = False
if (task.expected_output in stdout_data):
@@ -401,10 +402,64 @@
message = "'"+ task.name + "' failed"
g_logger.log (self.node.hostname + " : Task "+ message)
return TaskExecutionResult(result, message, output)
- def exec_put (self):
- raise NotImplementedError (inspect.stack()[0][3])
- def exec_get (self):
- raise NotImplementedError (inspect.stack()[0][3])
+ def exec_put (self, task):
+ if (False == os.path.exists (task.src)):
+ return TaskExecutionResult(Tasks.Taskresult.src_file_not_found,
task.src, "")
+ result = None
+ try:
+ if (g_configuration.ssh_transfer ==
Configuration.TransferMode.scp):
+ try:
+ scp = SCPClient (self.transport)
+ scp.put (task.src, task.dest)
+ except SCPException as e:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' :" + str(e))
+ result = TaskExecutionResult(Tasks.Taskresult.fail,
str(e), "")
+ pass
+ if (g_configuration.ssh_transfer ==
Configuration.TransferMode.sftp):
+ sftp = paramiko.SFTPClient.from_transport (self.transport)
+ sftp.put(task.src, task.dest)
+ sftp.close()
+ except paramiko.SSHException as e:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :"
+ str(e))
+ result = TaskExecutionResult(Tasks.Taskresult.fail, str(e), "")
+ pass
+ except (OSError, IOError) as e:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : "
+ str(e))
+ result = TaskExecutionResult(Tasks.Taskresult.src_file_not_found,
str(e), "")
+ pass
+ if (None == result):
+ result = TaskExecutionResult(Tasks.Taskresult.success, "", "")
+ return result
+ def exec_get (self, task):
+ result = None
+ if(interrupt):
+ message = "'"+ task.name + "' interrupted by user"
+ g_logger.log (self.node.hostname + " : Task '"+ message)
+ return TaskExecutionResult(Tasks.Taskresult.user_interrupt,
"interrupted by user", "")
+ try:
+ if (g_configuration.ssh_transfer ==
Configuration.TransferMode.scp):
+ try:
+ scp = SCPClient (self.transport)
+ scp.get (task.src, task.dest)
+ except SCPException as e:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name
+ "' :")
+ result = TaskExecutionResult(Tasks.Taskresult.fail,
str(e), "")
+ pass
+ if (g_configuration.ssh_transfer ==
Configuration.TransferMode.sftp):
+ sftp = paramiko.SFTPClient.from_transport (self.transport)
+ sftp.get (task.src, task.dest)
+ sftp.close()
+ except paramiko.SSHException as e:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' :"
+ str(e))
+ result = TaskExecutionResult(Tasks.Taskresult.fail, str(e), "")
+ pass
+ except (OSError, IOError) as e:
+ g_logger.log (self.node.hostname + " : Task '"+ task.name + "' : "
+ str(e))
+ result = TaskExecutionResult(Tasks.Taskresult.src_file_not_found,
str(e), "")
+ pass
+ if (None == result):
+ result = TaskExecutionResult(Tasks.Taskresult.success, "Store
source '"+task.src+"' in '" +task.dest+"'", "")
+ return result
class PlanetLabWorker (RemoteSSHWorker):
def connect (self):
@@ -415,9 +470,9 @@
raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self,task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_put (self):
+ def exec_put (self, task):
raise NotImplementedError (inspect.stack()[0][3])
- def exec_get (self):
+ def exec_get (self, task):
raise NotImplementedError (inspect.stack()[0][3])
class NodeWorker ():
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28184 - gnunet-planetlab/gplmt/gplmt,
gnunet <=