#!/usr/bin/env python ################################################################################ # tmix_utils.py: script for manipulating tmix trace files # 2008-06-23 # # Copyright (c) 2008, Tom Quetchenbach # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the California Institute of Technology nor the names # of its contributors may be used to endorse or promote products derived # from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE # DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR # SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # ################################################################################ # # this script can be used as a module by importing it or as a command-line # utility by executing it from the command line. # # ASSUMPTIONS AND CAVEATS: # # The input and output file formats are as described in [1]. The global-id and # subset-id fields are currently ignored; this could be a bug as the resampling # algorithms will result in multiple connections with the same ids. Since I # don't know how Tmix works I don't know if this is an issue or not. # # Throughout the file the trace 'duration' is assumed to be equal to the start # time of the last connection. As long as the trace starts at time 0 and there # are numerous short connections, this should not introduce great inaccuracies. # # Most of the functions in this code do not take into account the connection # pause times. This is acceptable for the purpose of computing average # throughput over the entire trace. The function interval_bytes() uses the pause # times, but assumes infinite transfer rate. (That is, it assumes that the # duration of a connection is equal to the maximum of the sum of the 't' times, # the sum of the 't>' times, and the sum of the 't<' times.) # # If you don't want to modify or extend this code or write code using these # functions, you can probably stop reading now. # # For speed and memory efficiency, connection vectors are represented by lists # rather than objects, and traces are simply lists of connection vectors. (This # script still uses a lot of memory. Reading a 2254-second (290 MB) trace file # requires 642 MB of virtual memory. If we had used dictionaries instead of # lists that would be over 1500 MB! (This number is based on an earlier # implementation.) Since classes are implemented on top of dictionaries, having # snazzy 'connection vector objects' would be even more inefficient.) But some # effort has been made to make trace operations memory-efficient. For this # reason some functions that produce new traces will modify the original # connection vectors in ways that render the original trace gibberish. What a # function does with the input trace is documented in the docstring for that # function. (Note that the original trace *file* is never modified, just the # trace in memory, so this is only of interest if you are using these functions # in your own code.) # # if you want to create a copy of the trace but will not modify any of the # connection vectors in the copy (removing them from the list using 'del' or # adding new connection vectors to the list is OK), it is safe to perform a # shallow copy of the trace list like this: # # trace_copy = trace[:] # # but if you are going to modify any of the fields of the connection vectors you # need to copy them as well, either by doing # # import copy # trace_copy = copy.deepcopy(trace) # # or by performing a shallow copy of the connection vector itself: # for cv in trace: # new_cv = cv[:] # #...do other stuff... # # It would be nice to be able to perform multiple operations in one go, without # having to load the trace from disk, perform the operation, and write the trace # back to disk each time. # ################################################################################ # in this script transmit, initiate, and receive are used rather sloppily. What # is going on is somewhat like this. Suppose we have two sites and a link # between them: # # site A <-----link-----> site B # # some connections are initiated at site A and accepted at site B; these are the # 'cinit' connections. Others are initiated at B and accepted at A; these are # the 'crecv' connections. These are provided to this script as two separate # traces. Each connection has data transmitted and received; for 'cinit' # transmit means data moves from A to B; for 'crecv' transmit means data moves # from B to A. # # In the 'a-b-t' model described in [1] and [2], the 'transmit' bytes are 'a' # and the 'receive' bytes are 'b' # # this script can combine combines 'crecv' and 'cinit' into one combined trace # so that statistics about the total link utilization can be calculated. In this # case the 'crecv' trace has its 'a' and 'b' swapped for the calculations. This # is the default behavior if two input files are given on the command line. To # get meaningful output in this case, you need to specify two output files as # well; otherwise both output traces will be merged to standard output. # # References: # [1] P. Adurthi, Generatign Tmix-based TCP application workloads in ns-2 and # GTNetS (M.S. thesis), 2006. [Online]. Available: # http://www.cs.odu.edu/~mweigle/papers/adurthi-thesis06.pdf # [2] M. C. Weigle, P. Adurthi, F. Hernandez-Campos, K. Jeffay, F. D. Smith, # Tmix: a tool for generating realistic TCP application workloads in ns-2, # ACM SIGCOMM Computer Communication Review, vol. 36, no. 3, pp. 65-76, # July 2006. # [3] F. Hernandez-Campos, Generation and validation of empirically-derived TCP # application workloads (doctoral dissertation), 2006. [Online]. Available: # http://www.cs.unc.edu/~fhernand/diss-html/ ################################################################################ import sys, os import random import optparse # throughout this code, variables called 'dir' refer to the transfer direction; # 0 is A -> B; 1 is B -> A # FIXME: 'dir' is a poor choice because it's a python built-in function, so # forgetting to define 'dir' gives a TypeError rather than a NameError. DIR_A = 0 DIR_B = 1 DIR_BOTH = 2 # dummy function to process interrupts from psyco-optimized code # see comment in read_trace loop def dummy(): pass try: # use psyco to go faster import psyco psyco.cannotcompile(dummy) psyco.full() except: pass def trace_info(trace): """ returns a tuple with some information about the trace. If time is specified, stop after time microseconds. The trace is not modified. """ a_size = 0 b_size = 0 min_rtt = 1e20 max_rtt = 0 for cv in trace: duration = cv[CV_START] a_size += cv[CV_A] b_size += cv[CV_B] min_rtt = min(min_rtt, cv[CV_RTT]) max_rtt = max(max_rtt, cv[CV_RTT]) a_rate = a_size * 8000000 / duration b_rate = b_size * 8000000 / duration return duration, a_size, b_size, a_rate, b_rate, min_rtt, max_rtt # constants for indexing the trace_info tuple TRACE_INFO_DURATION = 0 TRACE_INFO_SIZE_A = 1 TRACE_INFO_SIZE_B = 2 TRACE_INFO_RATE_A = 3 TRACE_INFO_RATE_B = 4 TRACE_INFO_RTT_MIN = 5 TRACE_INFO_RTT_MAX = 6 def parse_cvec(raw, d): """ extracts basic information about the connection vector and returns a dictionary with the following members: start: start time of the connection, in microseconds a : number of bytes transferred in 'A-B' direction b : number of bytes transferred in 'B-A' direction rtt : RTT for the connection dir : direction (init/recv) of this trace (the 'd' argument unchanged) text : text of the connection vector with start time replaced with %u the 'd' argument is whether or not to swap the transmit/receive directions see the source for the definition of 'A-B' and 'B-A'. """ if type(raw) == str: lines = raw.split('\n') else: lines = raw text = '' a = b = 0 t = 0 ta = 0 tb = 0 rtt = 0 for l in lines: parts = l.split() if not parts: continue if parts[0] == '<' or parts[0] == 'c<': if not d: b += int(parts[1]) else: a += int(parts[1]) elif parts[0] == '>' or parts[0] == 'c>': if not d: a += int(parts[1]) else: b += int(parts[1]) elif parts[0] == 't': t += int(parts[1]) elif parts[0] == 't>': ta += int(parts[1]) elif parts[0] == 't<': tb += int(parts[1]) #elif parts[0] == 'l': # pass #elif parts[0] == 'w': # pass elif parts[0] == 'r': rtt = int(parts[1]) if parts[0] == 'CONC' or parts[0] == 'SEQ': start = int(parts[1]) text += parts[0] + ' %u ' + ' '.join(parts[2:]) + '\n' else: text += l + '\n' dur = max(t, ta, tb) return [start, a, b, d, text, rtt, dur] # constants for indexing the connection vector list CV_START = 0 CV_A = 1 CV_B = 2 CV_DIR = 3 CV_TEXT = 4 CV_RTT = 5 CV_DURATION=6 def truncate_trace(trace, end = 0, tail = 0): """ truncate the trace to duration microseconds. All connections which start after end microseconds are removed; in addition, all connections wich finish more than tail microseconds after end are removed. if end is negative, it is relative to the starting time of the last connection in the trace. If tail is negative, it is taken to be infinite. If end is zero or longer than the duration of the entire trace, it is set to the duration of the trace (The duration of the trace is the length of the interval from the start of the first connection to the start of the last.) This definition has the side effect of possibly removing the last connection or two from the trace, if tail is zero. Infinite transfer rate is assumed. The original trace is not modified. """ if not end: end = trace[-1][CV_START] elif end < 0: end += trace[-1][CV_START] rm_cv = 0 rm_bytes_a = 0 rm_bytes_b = 0 bytes_a = 0 bytes_b = 0 new_trace = [] for cv in trace: if cv[CV_START] > end or (tail >= 0 and cv[CV_DURATION] > end + tail - cv[CV_START]): rm_bytes_a += cv[CV_A] rm_bytes_b += cv[CV_B] rm_cv += 1 else: bytes_a += cv[CV_A] bytes_b += cv[CV_B] new_trace.append(cv) print >> sys.stderr, 'Removed %d connections, (%d, %d) bytes.' % ( rm_cv, rm_bytes_a, rm_bytes_b) print >> sys.stderr, 'New trace is %g seconds, (%d, %d) bits/sec' % ( new_trace[-1][CV_START]/1e6, bytes_a * 8 * 1000000 / new_trace[-1][CV_START], bytes_b * 8 * 1000000 / new_trace[-1][CV_START]) return new_trace def rtt_quantiles(trace, nbins, same_rate, dir = 0): """ splits trace into nbins quantiles by RTT and returns a list of them if same_rate, each subtrace will have approximately the same rate, otherwise each subtrace will have the same number of connections. (The last [largest-RTT] subtrace may have a few extra connections if len(trace) is not divisible by nbins.) This function does not modify the original trace. """ qsize = len(trace)/nbins trace_copy = trace[:] print >> sys.stderr, 'Sorting trace by RTT' trace_copy.sort(lambda a, b: cmp(a[CV_RTT], b[CV_RTT])) print >> sys.stderr, 'done.' if not same_rate: # each bin has same number of connections # we can create bins by copying slices of the input list, which is # probably faster than a for-loop over the trace. But it may be # worth combining these two cases for simplicity. bins = [] for i in range(nbins): if i == nbins - 1: end = len(trace_copy) else: end = (i + 1) * qsize bins.append(trace_copy[i * qsize : end]) print >> sys.stderr, 'Bin %d: (%d, %d), %d connections' % ( i, bins[i][0][CV_RTT], bins[i][-1][CV_RTT], len(bins[i])) else: # each bin has same number of bytes. info = trace_info(trace) if dir == DIR_BOTH: total_bytes = info[TRACE_INFO_SIZE_A] + info[TRACE_INFO_SIZE_B] else: total_bytes = info[TRACE_INFO_SIZE_A + dir] target = total_bytes / nbins bin_bytes_a = 0 bin_bytes_b = 0 bins = [[] for i in range(nbins)] i = 0 for cv in trace_copy: if dir == DIR_A: bin_bytes = bin_bytes_a next_bytes = bin_bytes_a + cv[CV_A] elif dir == DIR_B: bin_bytes = bin_bytes_b next_bytes = bin_bytes_b + cv[CV_B] else: bin_bytes = bin_bytes_a + bin_bytes_b next_bytes = bin_bytes + cv[CV_A] + cv[CV_B] if i < nbins - 1 and (abs(target - bin_bytes) < abs(target - next_bytes)): print >> sys.stderr, ( 'Bin %d: (%d, %d), %d connections, %d bytes' % ( i, bins[i][0][CV_RTT], bins[i][-1][CV_RTT], len(bins[i]), bin_bytes)) bin_bytes_a = 0 bin_bytes_b = 0 i += 1 bins[i].append(cv) bin_bytes_a += cv[CV_A] bin_bytes_b += cv[CV_B] print >> sys.stderr, ( 'Bin %d: (%d, %d), %d connections, %d bytes' % ( i, bins[i][0][CV_RTT], bins[i][-1][CV_RTT], len(bins[i]), bin_bytes)) for i, bin in enumerate(bins): print >> sys.stderr, 'Sorting bin %d by time' % i bin.sort(lambda a, b: cmp(a[CV_START], b[CV_START])) print >> sys.stderr, 'done' return bins def interval_bytes(trace, start_idx, interval, force = False): """ compute throughput over a sub-interval of the trace of length interval microseconds starting at vector start_idx. Unlike trace_info, this function takes into account the pause times in the connection vectors, so it can give a more accurate estimate of the average rate over a subset of a trace. If force is set, this function always returns an answer, even if the trace is shorter than the interval. Only connections that start during the interval are taken into account; connections that are already established are left out. Thus, the result of this function should be interpreted as the bytes transmitted for the first interval seconds if the connections prior to start_index were deleted. This is the correct behavior for match_mean, which calls this function. The trace is not modified. """ i = start_idx start_time = trace[start_idx][CV_START] end_time = start_time + interval bytes_a = 0 bytes_b = 0 if not force and trace[-1][CV_START] < end_time: # too close to the end of the trace; fail (could wrap, I suppose, but # it's not really necessary to try that hard) return None while i < len(trace) and trace[i][CV_START] < start_time + interval: if trace[i][CV_START] + trace[i][CV_DURATION] <= end_time: # entire connection fits in averaging interval bytes_a += trace[i][CV_A] bytes_b += trace[i][CV_B] else: # compute bytes transferred in averaging interval by parsing the # raw connection vector, assuming infinite transfer rate offs_a = 0 offs_b = 0 for line in trace[i][CV_TEXT].split('\n'): words = line.split() if not words: continue if words[0] == 't': offs_a += int(words[1]) offs_b += int(words[1]) elif words[0] == 't>': offs_a += int(words[1]) elif words[0] == 't<': offs_b += int(words[1]) elif (words[0] == '>' or words[0] == 'c>' and trace[i][CV_START] + offs_a <= end_time): bytes_a += int(words[1]) elif (words[0] == '<' or words[0] == 'c<' and trace[i][CV_START] + offs_b <= end_time): bytes_b += int(words[1]) else: continue if trace[i][CV_START] + min(offs_a, offs_b) > end_time: break i += 1 return bytes_a, bytes_b def match_mean(trace, dir, period, try_step = 5): """ returns a cyclic permutation of the trace such that the mean rate for the first period seconds is close to the overall mean rate. First, the average rate over an interval of period seconds is computed. Then, the ending point is advanced try_step seconds, and the computation is repeated. If the new average is closer to the target, the new starting point is used as the "split point." The smaller the value of try_step, the more accurate a match will be possible, but the longer it will take. If try-step is zero, we advance the ending point by one connection vector every time. The calculation is complicated and time-consuming because to get an accurate estimate of the average rate over a sub-interval of the original trace, the pause times in the middle of connections must be taken into account. This function will modify the connection vectors in the original trace, so pass it a deep copy (use copy.deepcopy()) if you want to save the original. """ verbose = sys.stderr.isatty() #report_step = len(trace)/100 report_step = 100 idx = CV_A + dir closest = None closest_start = 0 target_a = trace_info(trace)[TRACE_INFO_RATE_A] target_b = trace_info(trace)[TRACE_INFO_RATE_B] print >> sys.stderr, 'trying to match mean', (target_a, target_b) last_try = 0 for i, cv in enumerate(trace): if verbose and i % report_step == 0: sys.stderr.write("Computing %.2f%%\r" % (i * 100.0 / len(trace))) if cv[CV_START] < last_try + try_step * 1000000: continue last_try = cv[CV_START] bytes = interval_bytes(trace, i, period) if bytes is None: # too close to end of trace; we're done. break rate_a = bytes[DIR_A] * 8 * 1000000 / period rate_b = bytes[DIR_B] * 8 * 1000000 / period if dir == DIR_BOTH: dev = abs(target_a - rate_a) + abs(target_b - rate_b) elif dir == DIR_A: dev = abs(target_a - rate_a) elif dir == DIR_B: dev = abs(target_b - rate_b) if closest is None or dev < closest: closest = dev closest_start = i closest_rate = rate_a, rate_b print >> sys.stderr, dev, closest_rate print >> sys.stderr, 'First 100 seconds average will be (%d, %d)' % closest_rate # Now perform a cyclic permutation of the trace... # we have no choice but to be arbitrary about the delay between the # connection that used to be last and the one that used to be first. new_zero = trace[closest_start][CV_START] new_trace_head = trace[closest_start:] new_trace_tail = trace[:closest_start] print >> sys.stderr, '%d us is new zero' % new_zero assert len(new_trace_head) + len(new_trace_tail) == len(trace) prev = 0 for cv in new_trace_head: cv[CV_START] -= new_zero assert cv[CV_START] >= prev prev = cv[CV_START] offset = prev - new_trace_tail[0][CV_START] for cv in new_trace_tail: cv[CV_START] += offset assert cv[CV_START] >= prev prev = cv[CV_START] new_trace = new_trace_head + new_trace_tail return new_trace def read_trace(filename, blocksize, swap, time = 0, verbose = None): """ read trace from file filename. Generate block list of size blocksize (seconds) for use with block_resample. (Note that if input trace is not sorted this does not work.) If swap is 1, swap 'a' and 'b' directions (see the comment at the start of the source for more detail. if time is nonzero, return after reading time microseconds of the trace. If interrupted by KeyboardInterrupt, return what was read so far. """ if verbose is None: verbose = sys.stderr.isatty() fsize = os.path.getsize(filename) f = open(filename, 'r') cur = [] trace = [] linenum = 0 need_sort = False prev_info = [-1e10] blocks = [0] blocksize *= 1000000 next_block = blocksize try: for line in f: line = line.strip() if (not line) or line[0] == '#': continue if line[0] == 'S' or line[0] == 'C': # start of cvec if cur: info = parse_cvec(cur, swap) t = info[CV_START] if t < prev_info[CV_START]: need_sort = True if time and t > time: raise KeyboardInterrupt if blocksize and t >= next_block: blocks.append(len(trace)) next_block += blocksize trace.append(info) prev_info = info cur = [line] else: cur.append(line) linenum += 1 if verbose and linenum % 100000 == 0: # python function call to process interrupts with psyco # see http://psyco.sourceforge.net/psycoguide/bugs.html, # bullet point 3 dummy() sys.stderr.write('Reading file: %.2f%%\r' % (f.tell() * 100.0/ fsize)) sys.stderr.flush() info = parse_cvec(cur, swap) if info[CV_START] < prev_info[CV_START]: need_sort = True trace.append(info) except KeyboardInterrupt: print >> sys.stderr, 'Warning: interrupted' if need_sort: if verbose: print >> sys.stderr, 'Sorting trace' print >> sys.stderr, 'FIXME: block resampling will be wrong' trace.sort(lambda x, y: cmp(x[CV_START], y[CV_START])) elif verbose: print >> sys.stderr, 'Congratulations; trace is already sorted!' if verbose: print >> sys.stderr, '' return trace, blocks def merge_traces(t1, t2, blocksize, include_tail = False): """ merge two traces into one. The traces are assumed to start at the same time. The output is truncated to the duration of the shorter trace, unless include_tail is set. The original traces are not modified. """ new_trace = [] pos_1 = 0 pos_2 = 0 while pos_1 < len(t1) and pos_2 < len(t2): time_1 = t1[pos_1][CV_START] time_2 = t2[pos_2][CV_START] if time_1 < time_2: # time 1 is earlier # take cv from list 1 new_trace.append(t1[pos_1]) pos_1 += 1 else: new_trace.append(t2[pos_2]) pos_2 += 1 if len(new_trace) > 1: assert new_trace[-1][CV_START] >= new_trace[-2][CV_START] if include_tail: if pos_1 < len(t1): assert new_trace[-1][CV_START] <= t1[pos_1][CV_START] new_trace += t1[pos_1:] elif pos_2 < len(t2): assert new_trace[-1][CV_START] <= t2[pos_2][CV_START] new_trace += t2[pos_2:] #assert len(t1)+len(t2) == len(new_trace) return new_trace def poisson_resample(trace, duration, rate, dir = 0, verbose = None): """Use 'byte-oriented poisson resampling' algorithm from [2] to produce a new trace. duration is in seconds; rate in bits per second This function will modify the connection vectors in the original trace, so pass it a deep copy (use copy.deepcopy()) if you want to save the original. """ if dir == DIR_BOTH: print 'Bidirectional mode is not supported for poisson resampling' sys.exit(1) if verbose is None: verbose = sys.stderr.isatty() target_size = int(rate * duration / 8) if verbose: print >> sys.stderr, 'Making new trace of %d bytes' % target_size size = 0 new_trace = [] idx = CV_A + dir n = 0 while size < target_size: next = random.choice(trace) next_size = next[idx] if abs(next_size + size - target_size) > abs(size - target_size): break size += next_size new_trace.append(next) n += 1 # lambda = 1 / mean arrival_lambda = float(n) / (duration * 1000000) sprev = 0 for i in xrange(len(new_trace)): new_s = sprev + random.expovariate(arrival_lambda) sprev = new_s new_trace[i][CV_START] = int(new_s) if verbose: print >> sys.stderr, 'New trace has %d cvecs, size %d, duration %g, rate %d' % ( n, size, new_s / 1e6, size * 8 / (new_s / 1e6)) return new_trace def block_resample(trace, blocks, blocksize, duration, rate, dir = 0, verbose = None): """ Use block resampling algorithm from [2] to produce a new trace. duration is in seconds; rate in bits per second. blocks is the list of block start indices returned by read_trace; blocksize is in seconds. This is a sloppy and mostly untested implementation that probably has many bugs; use at your own risk. This function will modify the connection vectors in the original trace, so pass it a deep copy (use copy.deepcopy()) if you want to save the original. """ if dir == DIR_BOTH: print 'Bidirectional mode is not supported for block resampling' sys.exit(1) idx = CV_A + dir base_size = 0 for cv in trace: base_size += cv[idx] base_duration = cv[CV_START] base_rate = base_size * 8000000 / base_duration print >> sys.stderr, 'Original rate is %d bits/s' % base_rate copies = int(rate / base_rate + 1) if blocksize == 0: num_blocks = 1 else: num_blocks = int(duration / blocksize + 1) new_trace = [] for dst_block in range(num_blocks): for iter in range(copies): # never use last block, because might be incomplete... try: src_block = random.randint(0, len(blocks) - 2) except ValueError: # ...unless there's only one block! src_block = 0 offset = (dst_block-src_block)*blocksize*1000000 src_start = blocks[src_block] try: src_end = blocks[src_block + 1] except IndexError: src_end = len(trace) - 1 if trace[src_start][CV_START] + offset < 0: print >> sys.stderr, 'Negative time. Source block %d, dst block %d' % (src_block, dst_block) print >> sys.stderr, 'Offset is %d.' % offset print >> sys.stderr, 'Source block index is %d.' % src_start print >> sys.stderr, 'Source block time is %d.' % trace[src_start][CV_START] print >> sys.stderr, 'All block indices:', blocks print >> sys.stderr, 'All block times:', [(i, trace[b][CV_START]) for i, b in enumerate(blocks)] raise AssertionError if iter == 0: new_block = trace[src_start:src_end] if offset: for i in xrange(len(new_block)): new_block[i][CV_START] += offset else: pos = 0 for cv in trace[src_start:src_end]: # find place to insert this vector cv[CV_START] += offset try: while cv[CV_START] > new_block[pos][CV_START]: pos += 1 new_block.insert(pos, cv) except IndexError: new_block.append(cv) offset = new_block[-1][CV_START] new_trace += new_block new_size = 0 start = 0 for i, cv in enumerate(new_trace): if start > cv[CV_START]: print '***out of order at %d with %d***' % (i,cv[CV_START]) if cv[CV_START] < 0: print 'negative' start = cv[CV_START] new_size += cv[idx] new_duration = cv[CV_START] new_rate = new_size * 8000000 / new_duration print >> sys.stderr, 'Intermediate rate is %d bits/s' % new_rate print >> sys.stderr, 'Now thinning to correct rate' return thin_trace(new_trace, rate, dir, verbose) def scale_trace(trace, rate, dir = 0, verbose = None): """ adjust the trace load to rate (in direction dir (0=a, 1=b) by scaling the interconnection times This function modifies the original trace in place, so pass it a deep copy (use copy.deepcopy()) if you want to save the original. The returned trace is simply a pointer to the original trace. """ if verbose is None: verbose = sys.stderr.isatty() idx = dir + 1 info = trace_info(trace) duration = info[TRACE_INFO_DURATION] base_rate = info[TRACE_INFO_RATE_A + dir] if verbose: print >> sys.stderr, "Old duration %g; rate %d" % ( duration / 1000000, base_rate) scale_factor = float(base_rate)/ rate report_step = len(trace) / 100 for i, cv in enumerate(trace): cv[CV_START] = int(cv[CV_START] * scale_factor) # it may be, for all I know, that tmix uses a 32-bit unsigned int for # connection start times, in which case this is necessary: #if cv[CV_START] > 0x100000000: # print >> sys.stderr, 'Truncating result at 2^32-1 us' # del trace[i:] # break if verbose and i % report_step == 0: sys.stderr.write("Scaling %.2f%%\r" % (i * 100.0 / len(trace))) info = trace_info(trace) if verbose: print >> sys.stderr, "\nNew duration %g; rate %d" % ( info[TRACE_INFO_DURATION] / 1000000, info[idx + 2]) return trace def thin_trace(trace, rate, dir = 0, verbose = None): """ Thins the trace by removing connections at random until the rate offered by the trace in direction dir is approximately equal to rate . (As described in [2]. This function leaves 'empty' entries in the trace; print_trace ignores these, but the other functions do not. You MUST pass the result of this function to clean_trace before passing it to any function other than print_trace! (This is necessary because deleting from a list is O(N).) The original trace is not modified. """ if verbose is None: verbose = sys.stderr.isatty() cv_idx = CV_A + dir info_idx = TRACE_INFO_SIZE_A + dir info = trace_info(trace) duration = info[TRACE_INFO_DURATION] base_size = info[info_idx] new_trace = trace[:] if verbose: print >> sys.stderr, "Old rate %d" % (base_size * 8000000 / duration) target_size = rate * duration / 8000000 if target_size > base_size: print >> sys.stderr, "Error: cannot increase rate by thinning" return None del_target = base_size - target_size report_step = del_target / 100 last_report = 0 del_size = 0 while del_size < del_target: to_delete = random.randint(0, len(new_trace)-1) if new_trace[to_delete][CV_START] == -1: continue size = new_trace[to_delete][cv_idx] if abs(del_size + size - del_target) > abs(del_size - del_target): break del_size += size # it's unfortunate that deleting from a list is O(N) #del new_trace[to_delete] new_trace[to_delete] = [-1, 0, 0] if verbose and del_size - last_report > report_step: last_report = del_size sys.stderr.write("Thinning %.2f%%\r" % (del_size * 100.0 / del_target)) sys.stderr.flush() new_size = 0 for cv in new_trace: if cv[CV_START] == -1: continue duration = cv[CV_START] new_size += cv[cv_idx] if verbose: print >> sys.stderr, "\nNew rate %d" % (new_size * 8000000 / duration) return new_trace def clean_trace(trace): """ removes deleted entries left in by thin_trace. This function is completely untested. returns the cleaned trace; the original trace is unmodified. """ new_trace = [] for cv in trace: if cv[CV_START] != -1: new_trace.append(cv) return new_trace def print_header(trace, file, dir): n = len([None for cv in trace if cv[CV_DIR] == dir]) if n == 0: return print >> file, "# tcvec format version 2" print >> file, "# Number of Cvecs: %d" % n def print_trace(trace, file_recv, file_send): """ prints the trace in the original format """ print_header(trace, file_recv, 0) print_header(trace, file_send, 1) for cv in trace: if cv[CV_START] == -1: continue try: print >> (file_recv, file_send)[cv[CV_DIR]], (cv[CV_TEXT] % cv[CV_START]).strip() except: print >> sys.stderr, cv[CV_TEXT] del cv[CV_TEXT] print >> sys.stderr, cv raise def parse_args(): """ parse command-line arguments """ global option_parser parser = option_parser = optparse.OptionParser() parser.set_usage('%prog [options] [-e file [file ...] | file [file2 ' '[outfile1 outfile2]]]\n' 'Without -e, file and file2 are traces in opposite directions, and two ' 'output\ntraces are produced. With -e, all files are merged into one ' 'trace in the same\ndirection. ' ) parser.add_option('-b', '--block', dest='block', action='store_true', help='Perform "block resampling"') parser.add_option('-B', '--blocksize', dest='blocksize', action='store', metavar='SECS', help='For block resampling use this block size; zero ' 'sets the block size to the trace duration.', type='int', default=30) parser.add_option('-d', '--duration', dest='duration', action='store', metavar='SECS', help='For resampling algorithms, produce new trace of ' 'duration SECS seconds', type='int') parser.add_option('-e', '--merge', dest='merge', action='store_true', help='Merge all of the traces on the command line into one.') parser.add_option('-i', '--info', dest='info', action='store_true', help='Just print some information about the trace') parser.add_option('-l', '--leave', dest='truncate_tail', action='store', help='With --truncate, do not remove connections which finish within ' 'TAIL seconds of the starting time of the last connection in the ' 'truncated trace. Default 0; any negative value means infinity.', metavar='TAIL', type='float', default=0) parser.add_option('-m', '--match-mean', dest='match_mean', metavar='SECS', action='store', help="Emit a cyclic permutation of the original " "trace in which the mean rate of the first SECS seconds is as " "close as possible to the mean of the entire trace.", default=0, type='int') parser.add_option('-p', '--poisson', dest='poisson', action='store_true', help='Perform "poisson resampling"') parser.add_option('-q', '--quantiles', dest='quan', action='store', metavar='N', help='Split trace into N RTT quantiles', default=0, type='int') parser.add_option('-r', '--rate', dest='rate', action='store', metavar='RATE', help='Produce new trace with load RATE bits/second', type='int', default=1000000) parser.add_option('-R', '--same-rate', dest='same_rate', action='store_true', help="With --quantiles, if this option is specified, each " "subtrace will have (approximately) the same rate instead of the " "same number of connections.") parser.add_option('-s', '--scale', dest='scale', action='store_true', help='Scale inter-connection times to set rate') parser.add_option('-S', '--seed', dest='seed', action='store', type='int', metavar='SEED', help='Seed PRNG with SEED', default=0) parser.add_option('-t', '--thin', dest='thin', action='store_true', help='Remove connection at random to reduce rate.') parser.add_option('-T', '--time', dest='time', action='store', help='Only use the first SECS seconds of this trace', metavar='SECS', type='float', default=0) parser.add_option('-u', '--truncate', dest='truncate', action='store', help='Truncate trace to SECS seconds, or, if SECS is zero, truncate ' 'to the start time of the last connection (useful if file was ' 'truncated', metavar='SECS', type='float', default=None) parser.add_option('-x', '--transmit-rate', dest='transmit_rate', action='store_true', help='Use transmit rate instead of receive rate for calculations.') parser.add_option('-X', '--bidir', dest='bidir', action='store_true', help='Use both transmit and receive rate for ' 'calculations. Not implemented for all functions. Overrides --transmit-rate.') return parser.parse_args(sys.argv) def print_usage(): global option_parser try: option_parser.print_help(sys.stderr) except NameError: parse_args() option_parser.print_help(sys.stderr) sys.exit(1) if __name__ == '__main__': options, args = parse_args() if options.seed: random.seed(options.seed) if (len(args) > 2 or options.merge) and options.block: print >> sys.stderr, "Block resampling is not yet implemented for this case." sys.exit(1) if len(args) < 2: print_usage() trace, blocks = read_trace(args[1], options.blocksize, 0, options.time * 1000000) if options.merge: for i in range(2, len(args)): trace_next = read_trace(args[i], 0, 0, options.time * 1000000)[0] trace = merge_traces(trace, trace_next, options.blocksize, include_tail = True) elif len(args) > 2: trace_init = read_trace(args[2], 0, 1, options.time * 1000000)[0] trace = merge_traces(trace, trace_init, options.blocksize) rate = options.rate if options.bidir: dir = DIR_BOTH elif options.transmit_rate: dir = DIR_A else: dir = DIR_B if options.quan: bins = rtt_quantiles(trace, options.quan, options.same_rate, dir) for i, b in enumerate(bins): print >> sys.stderr, 'Writing bin %d to file' % i outfile = open(os.path.basename(args[1]) + '.bin' + str(i), 'w') if len(args) > 2 and not options.merge: outfile_2 = open(os.path.basename(args[2]) + '.bin' + str(i), 'w') else: outfile_2 = outfile print_trace(b, outfile, outfile_2) outfile.close() outfile_2.close() sys.exit(0) # open output files if len(args) == 5 and not options.merge: out_recv = open(args[3], 'w') out_send = open(args[4], 'w') else: out_recv = out_send = sys.stdout info = trace_info(trace) if options.info: if not options.time: rate_a = info[TRACE_INFO_RATE_A] rate_b = info[TRACE_INFO_RATE_B] else: size_a, size_b = interval_bytes(trace, 0, options.time * 1000000, True) rate_a = size_a * 8 / options.time rate_b = size_b * 8 / options.time print 'Number of cvecs : %d' % len(trace) print 'Trace duration : %g seconds' % ( info[TRACE_INFO_DURATION]/1e6) print 'Bytes A -> B : %d bytes' % ( info[TRACE_INFO_SIZE_A]) print 'Bytes B -> A : %d bytes' % ( info[TRACE_INFO_SIZE_B]) print 'Average size A -> B : %d bytes' % ( info[TRACE_INFO_SIZE_A]/len(trace)) print 'Average size B -> A : %d bytes' % ( info[TRACE_INFO_SIZE_B]/len(trace)) print 'Average rate A -> B : %d bits/second' % rate_a print 'Average rate B -> A : %d bits/second' % rate_b print 'Arrival rate : %g cvecs/second' % ( len(trace)*1e6/info[TRACE_INFO_DURATION]) print 'Minimum RTT : %d us' % info[TRACE_INFO_RTT_MIN] print 'Maximum RTT : %d us' % info[TRACE_INFO_RTT_MAX] sys.exit(0) if options.duration is None: options.duration = info[TRACE_INFO_DURATION] / 1e6 print >> sys.stderr, 'Read %d connection vectors' % len(trace) if options.match_mean: new_trace = match_mean(trace, dir, options.match_mean * 1000000) elif options.truncate is not None: new_trace = truncate_trace(trace, options.truncate * 1000000, options.truncate_tail * 1000000) elif options.poisson: new_trace = poisson_resample(trace, options.duration, options.rate, dir) elif options.block: new_trace = block_resample(trace, blocks, options.blocksize, options.duration, options.rate, dir) elif options.scale: new_trace = scale_trace(trace, options.rate, dir) elif options.thin: new_trace = thin_trace(trace, options.rate, dir) elif options.merge: # merge was specified without any other operation new_trace = trace else: print_usage() if not new_trace: print >> sys.stderr, 'An error occurred; see above for clues.' sys.exit(1) print_trace(new_trace, out_recv, out_send) if out_recv != sys.stdout: out_recv.close() if out_send != sys.stdout: out_send.close()