Package ivs :: Package timeseries :: Module decorators
[hide private]
[frames] | no frames]

Source Code for Module ivs.timeseries.decorators

  1  # -*- coding: utf-8 -*- 
  2  """ 
  3  Various decorator functions for time series analysis 
  4      - Parallel periodogram 
  5      - Autocompletion of default arguments 
  6  """ 
  7  import functools 
  8  import logging 
  9  from multiprocessing import Manager,Process,cpu_count 
 10  import numpy as np 
 11  from ivs.aux import loggers 
 12  #from ivs.timeseries import windowfunctions 
 13   
 14  logger = logging.getLogger("TS.DEC") 
 15  logger.addHandler(loggers.NullHandler) 
16 17 -def parallel_pergram(fctn):
18 """ 19 Run periodogram calculations in parallel. 20 21 This splits up the frequency range between f0 and fn in 'threads' parts. 22 23 This must decorate a 'make_parallel' decorator. 24 """ 25 @functools.wraps(fctn) 26 def globpar(*args,**kwargs): 27 #-- construct a manager to collect all calculations 28 manager = Manager() 29 arr = manager.list([]) 30 all_processes = [] 31 #-- get information on frequency range 32 f0 = kwargs['f0'] 33 fn = kwargs['fn'] 34 threads = kwargs.pop('threads',1) 35 if threads=='max': 36 threads = cpu_count() 37 elif threads=='safe': 38 threads = cpu_count()-1 39 else: 40 threads = float(threads) 41 42 #-- extend the arguments to include the parallel array 43 myargs = tuple(list(args) + [arr] ) 44 #-- however, some functions cannot be parallelized 45 if fctn.__name__ in ['fasper']: 46 threads = 1 47 48 #-- distribute the periodogram calcs over different threads, and wait 49 for i in range(int(threads)): 50 #-- define new start and end frequencies 51 kwargs['f0'] = f0 + i*(fn-f0) / threads 52 kwargs['fn'] = f0 +(i+1)*(fn-f0) / threads 53 logger.debug("parallel: starting process %s: f=%.4f-%.4f"%(i,kwargs['f0'],kwargs['fn'])) 54 p = Process(target=fctn, args=myargs, kwargs=kwargs) 55 p.start() 56 all_processes.append(p) 57 58 for p in all_processes: p.join() 59 60 logger.debug("parallel: all processes ended") 61 62 #-- join all periodogram pieces 63 freq = np.hstack([output[0] for output in arr]) 64 ampl = np.hstack([output[1] for output in arr]) 65 sort_arr = np.argsort(freq) 66 ampl = ampl[sort_arr] 67 freq = freq[sort_arr] 68 ampl[np.isnan(ampl)] = 0. 69 70 if len(arr[0])>2: 71 rest = [] 72 for i in range(2,len(arr[0])): 73 rest.append(np.hstack([output[i] for output in arr])) 74 rest = np.array(rest).T 75 rest = rest[sort_arr].T 76 return tuple([freq,ampl]+list(rest)) 77 else: 78 return freq,ampl
79 80 return globpar 81
82 83 84 -def defaults_pergram(fctn):
85 """ 86 Set default parameters common to all periodograms. 87 """ 88 @functools.wraps(fctn) 89 def globpar(*args,**kwargs): 90 #-- this is the information we need the compute everything 91 times = args[0] 92 signal = args[1] 93 T = times.ptp() 94 95 #-- get information on frequency range. If it is not given, compute the 96 # start (0.1/T) and stop (Nyquist) frequency. 97 # Also compute the frequency step as 0.1/T 98 nyq_stat = kwargs.pop('nyq_stat',np.min) 99 nyquist = getNyquist(times,nyq_stat=nyq_stat) 100 f0 = kwargs.get('f0',0.01/T) 101 fn = kwargs.get('fn',nyquist) 102 df = kwargs.get('df',0.1/T) 103 if df==0: df = 0.1/T 104 if f0==0: f0 = 0.01/T 105 if fn>nyquist: 106 fn = nyquist 107 kwargs['f0'] = f0 108 kwargs['df'] = df 109 kwargs['fn'] = fn 110 111 #-- maybe the data needs to be windowed 112 window = kwargs.pop('window',None) 113 if window is not None: 114 signal = signal*windowfunctions.getWindowFunction(window)(times) 115 signal -= signal.mean() 116 logger.debug('Signal is windowed with %s'%(window)) 117 118 #-- normalise weights if they are given 119 weights = kwargs.get('weights',None) 120 if weights is not None: 121 if weights.sum() != len(weights): 122 weights = weights / float(weights.sum()) * len(weights) 123 logger.debug("Weights were initially not normalized: normalization performed.") 124 kwargs['weights'] = weights 125 return fctn(times,signal,*args[2:],**kwargs)
126 127 return globpar 128
129 130 -def defaults_filtering(fctn):
131 """ 132 Set default parameters common to all filtering functions. 133 """ 134 @functools.wraps(fctn) 135 def globpar(*args,**kwargs): 136 #-- this is the information we need the compute everything 137 # it is possible a template x-axis has been given. If so, the parallelization 138 # depends on the length of the template, not of the original thing. 139 fn = ('x_template' in kwargs) and len(kwargs['x_template']) or len(args[0]) 140 f0 = kwargs.get('f0',0) 141 fn = kwargs.get('fn',fn) 142 kwargs['f0'] = f0 143 kwargs['fn'] = fn 144 return fctn(*args,**kwargs)
145 return globpar 146
147 148 149 150 151 -def getNyquist(times,nyq_stat=np.inf):
152 """ 153 Calculate Nyquist frequency. 154 155 Typical use is minimum or median of time points differences. 156 157 If C{nyq_stat} is not callable, it is assumed to be a number and that number 158 will just be returned: this you can do to search for frequencies above the 159 nyquist frequency 160 161 @param times: sorted array containing time points 162 @type times: numpy array 163 @param nyq_stat: statistic to use or absolute value of the Nyquist frequency 164 @type nyq_stat: callable or float 165 @return: Nyquist frequency 166 @rtype: float 167 """ 168 if not hasattr(nyq_stat,'__call__'): 169 nyquist = nyq_stat 170 else: 171 nyquist = 1/(2.*nyq_stat(np.diff(times))) 172 return nyquist
173