[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r28172 - gnunet-planetlab/gplmt/gplmt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r28172 - gnunet-planetlab/gplmt/gplmt |
Date: |
Thu, 18 Jul 2013 16:48:06 +0200 |
Author: wachs
Date: 2013-07-18 16:48:06 +0200 (Thu, 18 Jul 2013)
New Revision: 28172
Modified:
gnunet-planetlab/gplmt/gplmt/Nodes.py
gnunet-planetlab/gplmt/gplmt/Notifications.py
gnunet-planetlab/gplmt/gplmt/Worker.py
Log:
abstract worker loop, test worker loop and notifications are working
Modified: gnunet-planetlab/gplmt/gplmt/Nodes.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Nodes.py 2013-07-18 14:37:43 UTC (rev
28171)
+++ gnunet-planetlab/gplmt/gplmt/Nodes.py 2013-07-18 14:48:06 UTC (rev
28172)
@@ -34,7 +34,7 @@
print "That's a bug! please check README: " + str(e)
sys.exit (1)
-class Node:
+class NodeResult:
def __init__(self, hostname, port = None, username = None, password =
None):
self.hostname = hostname
self.port = port
@@ -76,7 +76,7 @@
elif len(hostport) > 2:
raise Exception("Invalid node definition: " + line)
return None
- return Node(hostname, port, username, password)
+ return NodeResult(hostname, port, username, password)
class Nodes:
@@ -84,22 +84,22 @@
assert (None != logger)
self.logger = logger
self.filename = filename
- self.nodes = list ()
+ self.node_results = list ()
def load (self):
- self.logger.log ("Loading nodes file '" + self.filename + "'")
+ self.logger.log ("Loading node_results file '" + self.filename + "'")
try:
fobj = open (self.filename, "r")
for line in fobj:
line = line.strip()
- node = Node.parse(line)
+ node = NodeResult.parse(line)
if (None != node):
- self.nodes.append(node)
+ self.node_results.append(node)
self.logger.log ("Found node '" +
Util.print_ssh_connection (node) + "'")
fobj.close()
except IOError:
print "File " + self.filename + " not found"
return False
- self.logger.log ("Loaded " + str(len(self.nodes)) + " nodes")
+ self.logger.log ("Loaded " + str(len(self.node_results)) + "
node_results")
return True
class StringNodes:
@@ -107,13 +107,13 @@
assert (None != logger)
self.str = str
self.logger = logger
- self.nodes = list ()
+ self.node_results = list ()
def load (self):
- self.logger.log ("Loading nodes '" + self.str + "'")
- node = Node.parse(self.str)
+ self.logger.log ("Loading node_results '" + self.str + "'")
+ node = NodeResult.parse(self.str)
if (None == node):
return False
- self.nodes.append(node)
+ self.node_results.append(node)
self.logger.log ("Loaded node '" +Util.print_ssh_connection (node)+
"'")
return True
@@ -122,7 +122,7 @@
assert (None != logger)
self.logger = logger
self.configuration = configuration
- self.nodes = list ()
+ self.node_results = list ()
def load (self):
if (self.configuration.pl_password == ""):
@@ -134,7 +134,7 @@
if (self.configuration.pl_api_url == ""):
print "No PlanetLab API url given in configuration, fail!"
return False
- self.logger.log ("Retrieving nodes assigned to slice '" +
self.configuration.pl_slicename + "' for user " +self.configuration.pl_username)
+ self.logger.log ("Retrieving node_results assigned to slice '" +
self.configuration.pl_slicename + "' for user " +self.configuration.pl_username)
try:
server = xmlrpclib.ServerProxy(self.configuration.pl_api_url)
except:
@@ -157,8 +157,8 @@
return False
for node in node_hostnames:
- n = Node(node, 22, self.configuration.pl_slicename, None)
+ n = NodeResult(node, 22, self.configuration.pl_slicename, None)
self.logger.log ("Planetlab API returned: " + n.hostname)
- self.nodes.append(n)
- self.logger.log ("Planetlab API returned " + str(len(self.nodes)) + "
nodes")
+ self.node_results.append(n)
+ self.logger.log ("Planetlab API returned " +
str(len(self.node_results)) + " node_results")
return True
Modified: gnunet-planetlab/gplmt/gplmt/Notifications.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Notifications.py 2013-07-18 14:37:43 UTC
(rev 28171)
+++ gnunet-planetlab/gplmt/gplmt/Notifications.py 2013-07-18 14:48:06 UTC
(rev 28172)
@@ -47,14 +47,14 @@
def task_completed (self, node, tasks, success, message):
assert (0)
-class NodeCollection:
+class NodeResultCollection:
def __init__(self):
- self.nodes = list ()
- def add (self, node):
- self.nodes.append (node)
- def get (self, name):
- for n in self.nodes:
- if (n.name == name):
+ self.node_results = list ()
+ def add (self, node_res):
+ self.node_results.append (node_res)
+ def get (self, node):
+ for n in self.node_results:
+ if (n.node == node):
return n
return None
@@ -64,7 +64,6 @@
self.msg = ""
self.output = ""
def finished (self, result, fail, msg, output):
- print self.task.name + " done with " + str (fail)
self.result = result
self.fail = fail
self.msg = msg
@@ -88,9 +87,9 @@
return tl
return None
-class Node:
- def __init__(self, name):
- self.name = name
+class NodeResult:
+ def __init__(self, node):
+ self.node = node
self.start = 0
self.end = 0
self.tasks = list ()
@@ -101,7 +100,7 @@
def __init__(self, logger):
assert (None != logger)
self.logger = logger
- self.nodes = NodeCollection ()
+ self.node_results = NodeResultCollection ()
def node_connected (self, node, success, message):
return
def node_disconnected (self, node, success, message):
@@ -109,7 +108,7 @@
def tasklist_started (self, node, tasks, message):
return
def tasklist_completed (self, node, tasks, success, message):
- self.nodes.add (Node(node))
+ self.node_results.add (NodeResult(node))
if (success == True):
print node + " : Tasklist '" + tasks.name + "' completed
successfully"
else:
@@ -124,72 +123,72 @@
def __init__(self, logger):
assert (None != logger)
self.logger = logger
- self.nodes = NodeCollection ()
+ self.node_results = NodeResultCollection ()
def summarize (self):
maxNodeLen = 0
maxTasklistLen = 0
# Calculate max length of node names and tasklist names
- for n in self.nodes.nodes:
- nodeLen = len(n.name)
+ for nres in self.node_results.node_results:
+ nodeLen = len(nres.node.hostname)
if (nodeLen > maxNodeLen):
maxNodeLen = nodeLen
- for tl in n.tasklists.tasklists:
+ for tl in nres.tasklists.tasklists:
tlLen = len(tl.name)
if(tlLen > maxTasklistLen):
maxTasklistLen = tlLen
# Sort output (success then fail)
- self.nodes.nodes.sort(key=lambda x: not
x.tasklists.tasklists[0].success)
+ self.node_results.node_results.sort(key=lambda x: not
x.tasklists.tasklists[0].success)
# Print organized output
- for n in self.nodes.nodes:
- sys.stdout.write(n.name)
- diff = maxNodeLen - len(n.name)
+ for nres in self.node_results.node_results:
+ sys.stdout.write(nres.node.hostname)
+ diff = maxNodeLen - len(nres.node.hostname)
sys.stdout.write(' ' * diff + ' | ')
- for tl in n.tasklists.tasklists:
+ for tl in nres.tasklists.tasklists:
sys.stdout.write(tl.name)
diff = maxTasklistLen - len(tl.name)
sys.stdout.write(' ' * diff + ' | ')
print 'success' if tl.success else 'failed in: '
- for t in n.tasks:
- if (t.result != Tasklist.Taskresult.success):
+ for t in nres.tasks:
+ if (t.fail == True):
print "\tFAIL: " + t.task.name + " with '" +t.msg+ "' and
'" +t.output+ "'"
else:
print "\tSUC " + t.task.name + " with '" +t.msg+ "' and '"
+t.output+ "'"
# tsk_str = ""
- # for t in n.tasks:
+ # for t in nres.tasks:
# tsk_f = "[e]"
# if (t.fail == True):
# tsk_f = "[f]"
# else:
# tsk_f = "[s]"
# tsk_str += t.task.name + " " + tsk_f + " ->"
- # print n.name
+ # print nres.name
def node_connected (self, node, success, message):
# Get node object
- nodeObj = self.nodes.get(node)
+ node_resObj = self.node_results.get(node)
# Create it if it doesn't exist
- if(None == self.nodes.get(node)):
- nodeObj = Node(node)
- self.nodes.add(nodeObj)
+ if(None == self.node_results.get(node)):
+ node_resObj = NodeResult(node)
+ self.node_results.add(node_resObj)
# Set node start time as of now
- nodeObj.start = time.time()
- nodeObj.connectSuccess = success
+ node_resObj.start = time.time()
+ node_resObj.connectSuccess = success
if (False == success):
- nodeObj.error_msg = message
+ node_resObj.error_msg = message
return
def node_disconnected (self, node, success, message):
# Mainly need to set node end connection time
- nodeObj = self.nodes.get(node)
+ nodeObj = self.node_results.get(node)
nodeObj.end = time.time()
return
def tasklist_started (self, node, tasks, message):
# Get node object
- nodeObj = self.nodes.get(node)
+ nodeObj = self.node_results.get(node)
# Create it if it doesn't exist (shouldn't node_connected be called
before this?)
- if(None == self.nodes.get(node)):
- nodeObj = Node(node)
- self.nodes.add(nodeObj)
+ if(None == self.node_results.get(node)):
+ nodeObj = NodeResult(node)
+ self.node_results.add(nodeObj)
# Create tasklist object
tasklist = TaskList(tasks.name)
# Add it to the collection of node tasklists
@@ -197,7 +196,7 @@
return
def tasklist_completed (self, node, tasks, success, message):
# Mainly want to set tasklist end time and success status
- nodeObj = self.nodes.get(node)
+ nodeObj = self.node_results.get(node)
tasklist = nodeObj.tasklists.get(tasks.name)
tasklist.end = time.time()
tasklist.success = success
@@ -206,29 +205,29 @@
#else:
# print node + " : Tasklist '" + tasks.name + "' completed with
failure"
def task_started (self, node, task, message):
- nodeObj = self.nodes.get(node)
+ nodeObj = self.node_results.get(node)
if (None == nodeObj):
- print "Node not found!"
+ print "NodeResult not found!"
return
nodeObj.tasks.append (Task (task))
return
def task_completed (self, node, task, result, message, output):
- nodeObj = self.nodes.get(node)
+ nodeObj = self.node_results.get(node)
for t in nodeObj.tasks:
if t.task is task:
if (result != Tasklist.Taskresult.success):
t.finished (result, True, message, output)
- #else:
- # t.finished (result, False, message, output)
+ else:
+ t.finished (result, False, message, output)
return
class SimpleNotification (Notification):
def __init__(self, logger):
assert (None != logger)
self.logger = logger
- #self.nodes = NodeCollection ()
+ #self.node_results = NodeResultCollection ()
def summarize (self):
- #for n in self.nodes:
+ #for n in self.node_results:
# tsk_str = ""
# for t in n.tasks:
# tsk_f = "[e]"
@@ -251,9 +250,9 @@
print node.hostname + " : disconnected with failure"
def tasklist_started (self, node, tasks, message):
print node.hostname + " : Tasklist '" + tasks.name + "' started"
- #self.nodes.add (Node(node))
+ #self.node_results.add (NodeResult(node))
def tasklist_completed (self, node, tasks, success, message):
- if (success == True):
+ if (success == Tasklist.Taskresult.success):
print node.hostname + " : Tasklist '" + tasks.name + "' completed
successfully"
else:
print node.hostname + " : Tasklist '" + tasks.name + "' completed
with failure"
Modified: gnunet-planetlab/gplmt/gplmt/Worker.py
===================================================================
--- gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-18 14:37:43 UTC (rev
28171)
+++ gnunet-planetlab/gplmt/gplmt/Worker.py 2013-07-18 14:48:06 UTC (rev
28172)
@@ -29,6 +29,7 @@
import sys
import select
import signal
+import inspect
try:
import gplmt.Configuration as Configuration
@@ -93,22 +94,30 @@
self.node = node
self.tasks = tasks
def connect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def disconnect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run_per_host (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_put (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def run(self):
global interrupt
tasklist_success = True
# Connecting
- res = self.connect()
+ res = False
+ try:
+ res = self.connect()
+ except NotImplementedError as e:
+ print "Not implemented: " + str(self.__class__) + " function: " +
str(e)
+ pass
+ except Exception as e:
+ print "Exception in Worker: " + str (e)
+ pass
if (False == res):
g_notifications.node_connected (self.node, False, "Failed to
connect")
return
@@ -120,46 +129,71 @@
g_notifications.tasklist_started (self.node, self.tasks, "")
task = self.tasks.get()
- #if (interrupt):
- # return Tasklist.Taskresult.user_interrupt
+ if (interrupt):
+ g_notifications.tasklist_completed (self.node, self.tasks,
Tasks.Taskresult.user_interrupt, "")
# Executing Tasks
while (None != task and not interrupt):
g_logger.log (self.node.hostname + " : Running task id "
+str(task.id)+" '" + task.name + "'")
g_notifications.task_started (self.node, task, "")
- if (task.type == Tasks.Operation.run):
- task_result = self.exec_run (task)
- assert (None != task_result)
- g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
- elif (task.type == Tasks.Operation.put):
- task_result = self.exec_put (task)
- assert (None != task_result)
- g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
- elif (task.type == Tasks.Operation.get):
- task_result = self.exec_get (task)
- assert (None != task_result)
- g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
- elif (task.type == Tasks.Operation.run_per_host):
- task_result = self.exec_run_per_host (task)
- assert (None != task_result)
- g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
- else:
- print "UNSUPPORTED OPERATION!"
+ try:
+ if (task.type == Tasks.Operation.run):
+ task_result = self.exec_run (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
+ elif (task.type == Tasks.Operation.put):
+ task_result = self.exec_put (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
+ elif (task.type == Tasks.Operation.get):
+ task_result = self.exec_get (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
+ elif (task.type == Tasks.Operation.run_per_host):
+ task_result = self.exec_run_per_host (task)
+ assert (None != task_result)
+ g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
+ else:
+ print "UNSUPPORTED OPERATION!"
+ except NotImplementedError as e:
+ print "Not implemented" + str (e)
+ pass
+ except Exception as e:
+ print "Exception in Worker:" + str (e)
+ pass
+
if ((task_result.result != Tasks.Taskresult.success) and
(task.stop_on_fail == True)):
g_logger.log (self.node.hostname + " : Task failed and
therefore execution is stopped")
- g_notifications.task_completed (self.node.hostname, task,
task_result.result, task_result.message, task_result.output)
+ g_notifications.task_completed (self.node, task,
task_result.result, task_result.message, task_result.output)
self.disconnect()
tasklist_success = False
break
task = self.tasks.get()
+
+ if (interrupt):
+ g_notifications.tasklist_completed (self.node, self.tasks,
Tasks.Taskresult.user_interrupt, "")
+ if (None != task):
+ g_notifications.task_completed (self.node, task,
task_result.user_interrupt, "task was interrupted", "")
+
+ if (False == tasklist_success):
+ g_notifications.tasklist_completed (self.node, self.tasks,
Tasks.Taskresult.fail, "")
+ else:
+ g_notifications.tasklist_completed (self.node, self.tasks,
Tasks.Taskresult.success, "")
+ #disconnect
+ try:
+ res = self.disconnect()
+ except NotImplementedError as e:
+ print "Not implemented: " + str(self.__class__) + " function: " +
str(e)
+ pass
+ except Exception as e:
+ print "Exception in Worker:" + str (e)
+ pass
- #disconnect
- res = self.disconnect()
if (False == res):
g_notifications.node_disconnected (self.node, False, "Failed to
disconnect")
else:
g_notifications.node_disconnected (self.node, True, "Disconnected
successfully")
- g_notifications.tasklist_completed (self.node, self.tasks,
tasklist_success, "")
+ g_logger.log (self.node.hostname + " : All tasks done for " +
self.node.hostname)
@@ -183,45 +217,45 @@
class LocalWorker (AbstractWorker):
def connect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def disconnect (self):
raise NotImplementedError
def exec_run_per_host (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_put (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
class RemoteSSHWorker (AbstractWorker):
def connect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def disconnect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run_per_host (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_put (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
class PlanetLabWorker (AbstractWorker):
def connect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def disconnect (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run_per_host (self, task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_run (self,task):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_put (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
def exec_get (self):
- raise NotImplementedError
+ raise NotImplementedError (inspect.stack()[0][3])
class NodeWorker ():
@@ -267,7 +301,7 @@
assert (hasattr(notifications, 'task_started'))
assert (hasattr(notifications, 'task_completed'))
self.target = target
- self.nodes = nodes
+ self.node_results = nodes
self.tasks = tasks
g_configuration = configuration
g_notifications = notifications
@@ -275,12 +309,11 @@
def start (self):
g_logger.log ("Starting execution on target '" + str (self.target) +
"'")
- for node in self.nodes.nodes:
+ for node in self.node_results.node_results:
nw = NodeWorker (self.target, node, self.tasks.copy())
workersList.append(nw)
nw.start()
-
- return
+
# block main thread until all worker threads are finished to print
summary
threads_done = False
while(not threads_done):
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r28172 - gnunet-planetlab/gplmt/gplmt,
gnunet <=