=== modified file 'bin/duplicity' --- bin/duplicity 2013-01-06 18:12:52 +0000 +++ bin/duplicity 2013-02-05 10:58:23 +0000 @@ -29,6 +29,8 @@ import getpass, gzip, os, sys, time, types import traceback, platform, statvfs, resource, re +import threading +from datetime import datetime pwd = os.path.abspath(os.path.dirname(sys.argv[0])) if os.path.exists(os.path.join(pwd, "../duplicity")): @@ -57,11 +59,11 @@ from duplicity import tempdir from duplicity import asyncscheduler from duplicity import util +from duplicity import progress # If exit_val is not None, exit with given value at end. exit_val = None - def get_passphrase(n, action, for_signing = False): """ Check to make sure passphrase is indeed needed, then get @@ -307,6 +309,7 @@ tdp.delete() return putsize + def validate_encryption_settings(backup_set, manifest): """ When restarting a backup, we have no way to verify that the current @@ -368,6 +371,11 @@ io_scheduler = asyncscheduler.AsyncScheduler(globals.async_concurrency) async_waiters = [] + # If --progress option is given, initiate a background thread that will + # periodically report progress to the Log. + if globals.progress: + progress.progress_thread = progress.LogProgressThread() + progress.progress_thread.start() while not at_end: # set up iterator @@ -420,6 +428,11 @@ # Upload the collection summary. #bytes_written += write_manifest(mf, backup_type, backend) + # Terminate the background thread now, if any + if globals.progress: + progress.progress_thread.finished = True + progress.progress_thread.join() + return bytes_written @@ -495,6 +508,17 @@ @rtype: void @return: void """ + if globals.progress: + progress.tracker = progress.ProgressTracker() + # Fake a backup to compute total of moving bytes + tarblock_iter = diffdir.DirFull(globals.select) + dummy_backup(tarblock_iter) + # Store computed stats to compute progress later + progress.tracker.set_evidence(diffdir.stats) + # Reinit the globals.select iterator, so + # the core of duplicity can rescan the paths + commandline.set_selection() + if globals.dry_run: tarblock_iter = diffdir.DirFull(globals.select) bytes_written = dummy_backup(tarblock_iter) @@ -566,6 +590,19 @@ time.sleep(2) dup_time.setcurtime() assert dup_time.curtime != dup_time.prevtime, "time not moving forward at appropriate pace - system clock issues?" + + if globals.progress: + progress.tracker = progress.ProgressTracker() + # Fake a backup to compute total of moving bytes + tarblock_iter = diffdir.DirDelta(globals.select, + sig_chain.get_fileobjs()) + dummy_backup(tarblock_iter) + # Store computed stats to compute progress later + progress.tracker.set_evidence(diffdir.stats) + # Reinit the globals.select iterator, so + # the core of duplicity can rescan the paths + commandline.set_selection() + if globals.dry_run: tarblock_iter = diffdir.DirDelta(globals.select, sig_chain.get_fileobjs()) === modified file 'bin/duplicity.1' --- bin/duplicity.1 2013-01-18 15:21:08 +0000 +++ bin/duplicity.1 2013-02-05 10:58:23 +0000 @@ -644,6 +644,20 @@ the new filename format. .TP +.B --progress +When selected, duplicity will output the current upload progress and estimated +upload time. To annotate changes, it will perform a first dry-run before a full +or incremental, and then runs the real operation estimating the real upload +progress. + +.TP +.BI "--progress_rate " number +Sets the update rate at which duplicity will output the upload progress +messages (requires +.B --progress +option). Default is to prompt the status each 3 seconds. + +.TP .BI "--rename " "orig new" Treats the path .I orig === modified file 'duplicity/backend.py' --- duplicity/backend.py 2013-01-10 19:04:39 +0000 +++ duplicity/backend.py 2013-02-05 10:58:23 +0000 @@ -39,6 +39,7 @@ from duplicity import globals from duplicity import log from duplicity import urlparse_2_5 as urlparser +from duplicity import progress from duplicity.util import exception_traceback === modified file 'duplicity/backends/_boto_multi.py' --- duplicity/backends/_boto_multi.py 2012-02-05 17:38:19 +0000 +++ duplicity/backends/_boto_multi.py 2013-02-05 10:58:23 +0000 @@ -23,6 +23,8 @@ import os import sys import time +import threading +import Queue import duplicity.backend @@ -32,6 +34,7 @@ from duplicity.util import exception_traceback from duplicity.backend import retry from duplicity.filechunkio import FileChunkIO +from duplicity import progress BOTO_MIN_VERSION = "1.6a" @@ -43,6 +46,27 @@ import multiprocessing +class ConsumerThread(threading.Thread): + """ + A background thread that collects all written bytes from all + the pool workers, and reports it to the progress module. + Wakes up every second to check for termination + """ + def __init__(self, queue): + super(ConsumerThread, self).__init__() + self.daemon = True + self.finish = False + self.queue = queue + + def run(self): + while not self.finish: + try: + args = self.queue.get(True, 1) + progress.report_transfer(args[0], args[1]) + except Queue.Empty, e: + pass + + def get_connection(scheme, parsed_url): try: import boto @@ -357,14 +381,29 @@ mp = self.bucket.initiate_multipart_upload(key, headers) + # Initiate a queue to share progress data between the pool + # workers and a consumer thread, that will collect and report + queue = None + if globals.progress: + manager = multiprocessing.Manager() + queue = manager.Queue() + consumer = ConsumerThread(queue) + consumer.start() + pool = multiprocessing.Pool(processes=chunks) for n in range(chunks): params = [self.scheme, self.parsed_url, self.bucket_name, - mp.id, filename, n, chunk_size, globals.num_retries] + mp.id, filename, n, chunk_size, globals.num_retries, + queue] pool.apply_async(multipart_upload_worker, params) pool.close() pool.join() + # Terminate the consumer thread, if any + if globals.progress: + consumer.finish = True + consumer.join() + if len(mp.get_all_parts()) < chunks: mp.cancel_upload() raise BackendException("Multipart upload failed. Aborted.") @@ -373,7 +412,7 @@ def multipart_upload_worker(scheme, parsed_url, bucket_name, multipart_id, filename, - offset, bytes, num_retries): + offset, bytes, num_retries, queue): """ Worker method for uploading a file chunk to S3 using multipart upload. Note that the file chunk is read into memory, so it's important to keep @@ -384,6 +423,8 @@ def _upload_callback(uploaded, total): worker_name = multiprocessing.current_process().name log.Debug("%s: Uploaded %s/%s bytes" % (worker_name, uploaded, total)) + if not queue is None: + queue.put([uploaded, total]) # Push data to the consumer thread def _upload(num_retries): worker_name = multiprocessing.current_process().name @@ -395,7 +436,9 @@ for mp in bucket.get_all_multipart_uploads(): if mp.id == multipart_id: with FileChunkIO(filename, 'r', offset=offset * bytes, bytes=bytes) as fd: - mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback) + mp.upload_part_from_file(fd, offset + 1, cb=_upload_callback, + num_cb=max(2, 8 * bytes / (1024 * 1024)) + ) # Max num of callbacks = 8 times x megabyte break except Exception, e: traceback.print_exc() === modified file 'duplicity/backends/_boto_single.py' --- duplicity/backends/_boto_single.py 2011-11-25 17:47:57 +0000 +++ duplicity/backends/_boto_single.py 2013-02-05 10:58:23 +0000 @@ -27,6 +27,7 @@ from duplicity.errors import * address@hidden from duplicity.util import exception_traceback from duplicity.backend import retry +from duplicity import progress BOTO_MIN_VERSION = "1.6a" @@ -200,6 +201,7 @@ remote_filename = source_path.get_filename() key = self.key_class(self.bucket) key.key = self.key_prefix + remote_filename + for n in range(1, globals.num_retries+1): if n > 1: # sleep before retry (new connection to a **hopeful** new host, so no need to wait so long) @@ -212,7 +214,11 @@ log.Info("Uploading %s/%s to %s Storage" % (self.straight_url, remote_filename, storage_class)) try: key.set_contents_from_filename(source_path.name, {'Content-Type': 'application/octet-stream', - 'x-amz-storage-class': storage_class}) + 'x-amz-storage-class': storage_class}, + cb=progress.report_transfer, + num_cb=(max(2, 8 * globals.volsize / (1024 * 1024))) + ) # Max num of callbacks = 8 times x megabyte + key.close() self.resetConnection() return === modified file 'duplicity/commandline.py' --- duplicity/commandline.py 2013-01-18 15:17:55 +0000 +++ duplicity/commandline.py 2013-02-05 10:58:23 +0000 @@ -431,6 +431,12 @@ callback=lambda o, s, v, p: (setattr(p.values, o.dest, True), old_fn_deprecation(s))) + # Used to display the progress for the full and incremental backup operations + parser.add_option("--progress", action="store_true") + + # Used to control the progress option update rate in seconds. Default: prompts each 3 seconds + parser.add_option("--progress-rate", type="int", metavar=_("number")) + # option to trigger Pydev debugger parser.add_option("--pydevd", action="store_true") === modified file 'duplicity/diffdir.py' --- duplicity/diffdir.py 2013-01-06 18:12:52 +0000 +++ duplicity/diffdir.py 2013-02-05 10:58:23 +0000 @@ -27,15 +27,16 @@ the second, the ROPath iterator is put into tar block form. """ -import cStringIO, types +import cStringIO, types, math from duplicity import statistics from duplicity import util from duplicity.path import * address@hidden from duplicity.lazy import * address@hidden +from duplicity import progress -# A StatsObj will be written to this from DirDelta_WriteSig only. +# A StatsObj will be written to this from DirDelta and DirDelta_WriteSig. stats = None - +tracker = None class DiffDirException(Exception): pass @@ -82,7 +83,7 @@ else: sig_iter = sigtar2path_iter(dirsig_fileobj_list) delta_iter = get_delta_iter(path_iter, sig_iter) - if globals.dry_run: + if globals.dry_run or (globals.progress and not progress.tracker.has_collected_evidence()): return DummyBlockIter(delta_iter) else: return DeltaTarBlockIter(delta_iter) @@ -363,7 +364,7 @@ else: sig_path_iter = sigtar2path_iter(sig_infp_list) delta_iter = get_delta_iter(path_iter, sig_path_iter, newsig_outfp) - if globals.dry_run: + if globals.dry_run or (globals.progress and not progress.tracker.has_collected_evidence()): return DummyBlockIter(delta_iter) else: return DeltaTarBlockIter(delta_iter) === modified file 'duplicity/globals.py' --- duplicity/globals.py 2013-01-10 19:04:39 +0000 +++ duplicity/globals.py 2013-02-05 10:58:23 +0000 @@ -241,3 +241,11 @@ # Renames (--rename) rename = {} + +# When selected, triggers a dry-run before a full or incremental to compute +# changes, then runs the real operation and keeps track of the real progress +progress = False + +# Controls the upload progress messages refresh rate. Default: update each +# 3 seconds +progress_rate = 3 === modified file 'duplicity/log.py' --- duplicity/log.py 2012-06-21 19:13:45 +0000 +++ duplicity/log.py 2013-02-05 10:58:23 +0000 @@ -26,6 +26,7 @@ import os import sys import logging +import datetime MIN = 0 ERROR = 0 @@ -100,6 +101,7 @@ synchronous_upload_done = 13 asynchronous_upload_done = 14 skipping_socket = 15 + upload_progress = 16 def Info(s, code=InfoCode.generic, extra=None): """Shortcut used for info messages (verbosity 5).""" @@ -113,6 +115,83 @@ controlLine = '%d' % current Log(s, INFO, InfoCode.progress, controlLine) +def _ElapsedSecs2Str(secs): + tdelta = datetime.timedelta(seconds=secs) + hours,rem = divmod(tdelta.seconds, 3600) + minutes,seconds = divmod(rem, 60) + fmt = "" + if tdelta.days > 0: + fmt = "%dd," % (tdelta.days) + fmt = "%s%02d:%02d:%02d" % (fmt, hours, minutes, seconds) + return fmt + +def _RemainingSecs2Str(secs): + tdelta = datetime.timedelta(seconds=secs) + hours,rem = divmod(tdelta.seconds, 3600) + minutes,seconds = divmod(rem, 60) + fmt = "" + if tdelta.days > 0: + fmt = "%dd" % (tdelta.days) + if hours > 0: + fmt = "%s %dh" % (fmt, hours) + if minutes > 0: + fmt = "%s %dmin" % (fmt, minutes) + elif hours > 0: + fmt = "%dh" % hours + if minutes > 0: + fmt = "%s %dmin" % (fmt, minutes) + elif minutes > 5: + fmt = "%dmin" % minutes + elif minutes > 0: + fmt = "%dmin" % minutes + if seconds >= 30: + fmt = "%s 30sec" % fmt + elif seconds > 45: + fmt = "< 1min" + elif seconds > 30: + fmt = "< 45sec" + elif seconds > 15: + fmt = "< 30sec" + else: + fmt = "%dsec" % seconds + return fmt + +def TransferProgress(progress, eta, changed_bytes, elapsed, speed, stalled): + """Shortcut used for upload progress messages (verbosity 5).""" + dots = int(0.4 * progress) # int(40.0 * progress / 100.0) -- for 40 chars + data_amount = float(changed_bytes) / 1024.0 + data_scale = "KB" + if data_amount > 1000.0: + data_amount /= 1024.0 + data_scale = "MB" + if data_amount > 1000.0: + data_amount /= 1024.0 + data_scale = "GB" + if stalled: + eta_str = "Stalled!" + speed_amount = 0 + speed_scale = "B" + else: + eta_str = _RemainingSecs2Str(eta) + speed_amount = float(speed) / 1024.0 + speed_scale = "KB" + if speed_amount > 1000.0: + speed_amount /= 1024.0 + speed_scale = "MB" + if speed_amount > 1000.0: + speed_amount /= 1024.0 + speed_scale = "GB" + s = "%.1f%s %s [%.1f%s/s] [%s>%s] %d%% ETA %s" % (data_amount, data_scale, + _ElapsedSecs2Str(elapsed), + speed_amount, speed_scale, + '='*dots, ' '*(40-dots), + progress, + eta_str + ) + + controlLine = "%d %d %d %d %d %d" % (changed_bytes, elapsed, progress, eta, speed, stalled) + Log(s, NOTICE, InfoCode.upload_progress, controlLine) + def PrintCollectionStatus(col_stats, force_print=False): """Prints a collection status to the log""" Log(str(col_stats), 8, InfoCode.collection_status, === added file 'duplicity/progress.py' --- duplicity/progress.py 1970-01-01 00:00:00 +0000 +++ duplicity/progress.py 2013-02-05 10:58:23 +0000 @@ -0,0 +1,255 @@ +# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4 -*- +# +# Copyright 2002 Ben Escoto +# Copyright 2007 Kenneth Loafman +# +# This file is part of duplicity. +# +# Duplicity is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2 of the License, or (at your +# option) any later version. +# +# Duplicity is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with duplicity; if not, write to the Free Software Foundation, +# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# @author: Juan Antonio Moya Vicen +# +""" +Functions to compute progress of compress & upload files +The heuristics try to infer the ratio between the amount of data collected +by the deltas and the total size of the changing files. It also infers the +compression and encryption ration of the raw deltas before sending them to +the backend. +With the inferred ratios, the heuristics estimate the percentage of completion +and the time left to transfer all the (yet unknown) amount of data to send. +This is a forecast based on gathered evidence. +""" + + +import math +import threading +import time +from datetime import datetime, timedelta +from duplicity import globals +from duplicity import log + +def import_non_local(name, custom_name=None): + """ + This function is needed to play a trick... as there exists a local + "collections" module, that is named the same as a system module + """ + import imp, sys + + custom_name = custom_name or name + + f, pathname, desc = imp.find_module(name, sys.path[1:]) + module = imp.load_module(custom_name, f, pathname, desc) + f.close() + + return module + +""" +Import non-local module, use a custom name to differentiate it from local +This name is only used internally for identifying the module. We decide +the name in the local scope by assigning it to the variable sys_collections. +""" +sys_collections = import_non_local('collections','sys_collections') + + + +tracker = None +progress_thread = None + +class ProgressTracker(): + + def __init__(self): + self.total_stats = None + self.nsteps = 0 + self.start_time = None + self.change_mean_ratio = 0.0 + self.change_r_estimation = 0.0 + self.compress_mean_ratio = 0.0 + self.compress_r_estimation = 0.0 + self.progress_estimation = 0.0 + self.time_estimation = 0 + self.total_bytecount = 0 + self.last_total_bytecount = 0 + self.last_bytecount = 0 + self.stall_last_time = None + self.last_time = None + self.elapsed_sum = timedelta() + self.speed = 0.0 + self.transfers = sys_collections.deque() + + def has_collected_evidence(self): + """ + Returns true if the progress computation is on and duplicity has not + yet started the first dry-run pass to collect some information + """ + return (not self.total_stats is None) + + def log_upload_progress(self): + """ + Aproximative and evolving method of computing the progress of upload + """ + if not globals.progress or not self.has_collected_evidence(): + return + + current_time = datetime.now() + if self.start_time is None: + self.start_time = current_time + if not self.last_time is None: + elapsed = (current_time - self.last_time) + else: + elapsed = timedelta() + self.last_time = current_time + + # Detect (and report) a stallment if no changing data for more than 5 seconds + if self.stall_last_time is None: + self.stall_last_time = current_time + if (current_time - self.stall_last_time).seconds > max(5, 2 * globals.progress_rate): + log.TransferProgress(100.0 * self.progress_estimation, + self.time_estimation, self.total_bytecount, + (current_time - self.start_time).seconds, + self.speed, + True + ) + return + + self.nsteps += 1 + + """ + Compute the ratio of information being written for deltas vs file sizes + Using Knuth algorithm to estimate approximate upper bound in % of completion + The progress is estimated on the current bytes written vs the total bytes to + change as estimated by a first-dry-run. The weight is the ratio of changing + data (Delta) against the total file sizes. (pessimistic estimation) + """ + from duplicity import diffdir + changes = diffdir.stats.NewFileSize + diffdir.stats.ChangedFileSize + total_changes = self.total_stats.NewFileSize + self.total_stats.ChangedFileSize + if changes == 0 or total_changes == 0: + return + + # Snapshot current values for progress + last_progress_estimation = self.progress_estimation + + # Compute ratio of changes + change_ratio = diffdir.stats.RawDeltaSize / float(changes) + change_delta = change_ratio - self.change_mean_ratio + self.change_mean_ratio += change_delta / float(self.nsteps) # mean cumulated ratio + self.change_r_estimation += change_delta * (change_ratio - self.change_mean_ratio) + change_sigma = math.sqrt(math.fabs(self.change_r_estimation / float(self.nsteps))) + + # Compute ratio of compression of the deltas + compress_ratio = self.total_bytecount / float(diffdir.stats.RawDeltaSize) + compress_delta = compress_ratio - self.compress_mean_ratio + self.compress_mean_ratio += compress_delta / float(self.nsteps) # mean cumulated ratio + self.compress_r_estimation += compress_delta * (compress_ratio - self.compress_mean_ratio) + compress_sigma = math.sqrt(math.fabs(self.compress_r_estimation / float(self.nsteps))) + + # Combine 2 statistically independent variables (ratios) optimistically + self.progress_estimation = (self.change_mean_ratio * self.compress_mean_ratio + + change_sigma + compress_sigma) * float(changes) / float(total_changes) + self.progress_estimation = max(0.0, min(self.progress_estimation, 1.0)) + + + """ + Estimate the time just as a projection of the remaining time, fit to a [(1 - x) / x] curve + """ + self.elapsed_sum += elapsed # As sum of timedeltas, so as to avoid clock skew in long runs (adding also microseconds) + projection = 1.0 + if self.progress_estimation > 0: + projection = (1.0 - self.progress_estimation) / self.progress_estimation + if self.elapsed_sum.total_seconds() > 0: + self.time_estimation = long(projection * float(self.elapsed_sum.total_seconds())) + + # Apply values only when monotonic, so the estimates look more consistent to the human eye + if self.progress_estimation < last_progress_estimation: + self.progress_estimation = last_progress_estimation + + """ + Compute Exponential Moving Average of speed as bytes/sec of the last 30 probes + """ + if elapsed.total_seconds() > 0: + self.transfers.append(float(self.total_bytecount - self.last_total_bytecount) / float(elapsed.total_seconds())) + self.last_total_bytecount = self.total_bytecount + if len(self.transfers) > 30: + self.transfers.popleft() + self.speed = 0.0 + for x in self.transfers: + self.speed = 0.3 * x + 0.7 * self.speed + + log.TransferProgress(100.0 * self.progress_estimation, + self.time_estimation, + self.total_bytecount, + (current_time - self.start_time).seconds, + self.speed, + False + ) + + + def annotate_written_bytes(self, bytecount): + """ + Annotate the number of bytes that have been added/changed since last time + this function was called. + bytecount param will show the number of bytes since the start of the current + volume and for the current volume + """ + changing = max(bytecount - self.last_bytecount, 0) + self.total_bytecount += long(changing) # Annotate only changing bytes since last probe + self.last_bytecount = bytecount + if changing > 0: + self.stall_last_time = datetime.now() + + def set_evidence(self, stats): + """ + Stores the collected statistics from a first-pass dry-run, to use this + information later so as to estimate progress + """ + self.total_stats = stats + + def total_elapsed_seconds(self): + """ + Elapsed seconds since the first call to log_upload_progress method + """ + return (datetime.now() - self.start_time).seconds + + +def report_transfer(bytecount, totalbytes): + """ + Method to call tracker.annotate_written_bytes from outside + the class, and to offer the "function(long, long)" signature + which is handy to pass as callback + """ + global tracker + global progress_thread + if not progress_thread is None and not tracker is None: + tracker.annotate_written_bytes(bytecount) + + +class LogProgressThread(threading.Thread): + """ + Background thread that reports progress to the log, + every --progress-rate seconds + """ + def __init__(self): + super(LogProgressThread, self).__init__() + self.setDaemon(True) + self.finished = False + + def run(self): + global tracker + if not globals.dry_run and globals.progress and tracker.has_collected_evidence(): + while not self.finished: + tracker.log_upload_progress() + time.sleep(globals.progress_rate) + log.TransferProgress(100.0, 0, tracker.total_bytecount, tracker.total_elapsed_seconds(), tracker.speed, False) +