#!/usr/bin/python -u # # @file process.py # @purpose Process TCS data # Takes S(E) = dI/dE # @author Sam Moore # @date August 2012 # import sys import os import re # Regular expressions - for removing comments import odict #ordered dictionary import copy import Gnuplot, Gnuplot.funcutils import string import time import math import cmath import random import numpy gnuplot = Gnuplot.Gnuplot() def Reset(): gnuplot = Gnuplot.Gnuplot() def FindDataFiles(directory=".", depth=1, result=None): if result == None: result = [] for f in os.listdir(directory): if os.path.isdir(directory+"/"+str(f)): if depth > 1: result += FindDataFiles(directory+"/"+str(f), depth-1, result) continue s = f.split(".") if (len(s) == 2 and s[1] == "dat"): result.append(directory+"/"+str(f)) return result def BaseName(f): a = f.split("/") return a[len(a)-1] def DirectoryName(f, start=0,back=1): a = f.split("/") return string.join(a[start:(len(a)-back)], "/") def GetData(filename, key=1): if type(filename) != type(""): if type(filename) == type([]): return filename else: return [[0,0,0,0]] if os.path.isdir(filename): if os.path.exists(filename.strip("/")+"/average.dat"): os.remove(filename.strip("/")+"/average.dat") AverageAllData(filename) return GetData(filename.strip("/")+"/average.dat") input_file = open(filename, "r") data = {} for line in input_file: line = re.sub("#.*", "", line).strip("\r\n ") if len(line) == 0: continue line = map(lambda e : float(e), line.split("\t")) if line[key] in data: for i in range(len(line)): data[line[key]][0][i] += line[i] data[line[key]][1] += 1 else: data.update({line[key] : [line, 1]}) d = map(lambda e : map(lambda f : float(f) / float(e[1][1]), e[1][0]), data.items()) d.sort(key = lambda e : e[key]) #for l in d: # print str(l) return d def DoNothing(data): return data def GetDataSets(directory="."): data_sets = [] for f in os.listdir(directory): if os.path.isdir(directory+"/"+str(f)) == False: if (len(f.split(".")) > 1 and f.split(".")[1] == "dat"): d = GetData(directory+"/"+str(f)) if len(d) > 0: data_sets.append(d) return data_sets def Derivative(data, a=1, b=2, sigma=None,step=1): #print "Derivative called" result = [[]] n = 0 dI = [0,0] dE = [0,0] for i in range(0, len(data),step): result[len(result)-1] = [d for d in data[i]] if (i >= step): dE[0] = data[i][a] - data[i-step][a] dI[0] = data[i][b] - data[i-step][b] else: dI[0] = None if (i < len(data)-step): dE[1] = data[i+step][a] - data[i][a] dI[1] = data[i+step][b] - data[i][b] else: dI[1] = None if sigma != None: #print str(data[i]) + " ["+str(sigma)+"] = " + str(data[i][int(abs(sigma))]) if sigma < 0: if dI[0] != None: dI[0] -= 0.5*data[i][int(abs(sigma))] if dI[1] != None: dI[1] -= 0.5*data[i][int(abs(sigma))] else: if dI[0] != None: dI[0] += 0.5*data[i][int(abs(sigma))] if dI[1] != None: dI[1] += 0.5*data[i][int(abs(sigma))] deltaE = 0.0 deltaI = 0.0 count = 0 if dI[0] != None: deltaE += dE[0] deltaI += dI[0] count += 1 if dI[1] != None: deltaE += dE[1] deltaI += dI[1] count += 1 if (count > 0): deltaI /= float(count) deltaE /= float(count) if (deltaE != 0): result[len(result)-1][b] = (deltaI / deltaE) else: result[len(result)-1][b] = 0.0 else: result[len(result)-1][b] = 0.0 result.append([]) return result[0:len(result)-1] def MaxNormalise(data, u=2): result = copy.deepcopy(data) if (len(data) <= 0): return result maxval = max(data, key = lambda e : e[u])[u] if maxval == 0: return result for d in result: d[u] = d[u] / maxval return result def Average(data_sets, u=1): avg = odict.odict() for t in data_sets: for p in t: if p[u] in avg: #print "Already have " + str(p[u]) avg[p[u]][1] += 1 for i in range(0, len(p)): avg[p[u]][0][i] += p[i] else: #print "Create key for " + str(p[u]) avg.update({p[u] : [p, 1]}) for a in avg.keys(): for i in range(0, len(avg[a][0])): avg[a][0][i] /= float(avg[a][1]) return map(lambda e : e[1][0], sorted(avg.items(), key = lambda e : e[0])) def FullWidthAtHalfMax(data, u=1): maxval = max(data, key = lambda e : e[u]) peak = data.index(maxval) maxval = maxval[0] lhs = None rhs = None for i in range(1, len(data)/2): if lhs == None: if (peak-i > 0 and data[peak-i] < 0.50*maxval): lhs = data[peak-i][u] if rhs == None: if (peak+i < len(data) and data[peak+i] < 0.50*maxval): rhs = peak+i if lhs != None and rhs != None: break if rhs == None or lhs == None: return abs(data[len(data)-1][0] - data[0][0]) else: return abs(rhs - lhs) def SaveData(filename, data): out = open(filename, "w", 0) for a in data: for i in range(0, len(a)): out.write(str(a[i])) if (i < len(a) - 1): out.write("\t") out.write("\n") def AverageAllData(directory, save=None, normalise=True): data_sets = [] if save == None: save = directory+"/average.dat" for f in FindDataFiles(directory): d = GetData(f) if normalise: d = MaxNormalise(d) data_sets.append(d) a = Average(data_sets) SaveData(save, a) return a def CalibrateData(original, ammeter_scale=1e-6): data = copy.deepcopy(original) for i in range(0, len(data)): data[i][1] = 16.8 * float(data[i][1]) / 4000.0 data[i][2] = ammeter_scale * 0.170 * float(data[i][2]) / 268.0 data[i][3] = ammeter_scale * 0.170 * float(data[i][3]) / 268.0 return data def ShowTCS(filename, raw=True,calibrate=True, normalise=False, show_error=False, plot=gnuplot.plot,with_="lp", step=1, output=None, title="", master_title="", smooth=0, show_peak=False, inflection=1): if raw == False: calibrate = False normalise = False if type(filename) == type(""): data = GetData(filename) else: data = filename filename = "tcs data" if (title == ""): title = BaseName(filename) if (len(data) <= 0): return data if (smooth > 0): if type(smooth) == type([]): for i in range(smooth[0]): data = Smooth(data, m=smooth[1]) else: data = Smooth(data, m=smooth) if calibrate: data = CalibrateData(data) units = ["Volts", "uA / V"] else: units = ["DAC counts", "ADC counts / DAC counts"] if not normalise: gnuplot("set ylabel \"S(U) ("+str(units[1])+")\"") else: data = MaxNormalise(data) gnuplot("set ylabel \"S(U) (arb. units)\"") if (output != None and type(output) == type("")): gnuplot("set term png size 640,480") gnuplot("set output \""+str(output)+"\"") if master_title == "": master_title = "Total Current Spectrum S(U)" if type(filename) == type("") and plot == gnuplot.plot: if filename != "tcs data": p = ReadParameters(filename) if "Sample" in p: master_title += "\\nSample: "+p["Sample"] gnuplot("set title \""+str(master_title)+"\"") gnuplot("set xlabel \"U ("+str(units[0])+")\"") gnuplot("set xrange [0:16]") gnuplot("set mxtics 10") gnuplot("set mytics 10") if raw: d = Derivative(data, 1, 2, step=step) else: d = data ymax = 0.01 + 1.2 * max(d, key=lambda e : e[2])[2] ymin = -0.01 + 1.2 * min(d, key=lambda e : e[2])[2] gnuplot("set yrange ["+str(ymin)+":"+str(ymax)+"]") plotList = [] plotList.append(Gnuplot.Data(d, using="2:3", with_=with_,title=title)) if (show_error): error1 = Derivative(data, 1, 2, -3,step=step) error2 = Derivative(data, 1, 2, +3,step=step) plotList.append(Gnuplot.Data(error1, using="2:3", with_=w,title="-sigma/2")) plotList.append(Gnuplot.Data(error2, using="2:3", with_=w, title="+sigma/2")) if (show_peak): peak = SmoothPeakFind(d, ap=DoNothing, stop=1, inflection=inflection) plotList += PlotPeaks(peak,with_="l lt -1", plot=None) if (plot != None): plot(*plotList) time.sleep(0.2) if (output != None and type(output) == type("")): gnuplot("set term wxt") if (plot == None): return plotList return data def ShowData(filename,calibrate=True, normalise=False, show_error=False, plot=gnuplot.plot,with_="lp", step=1, output=None, title="", master_title="Sample Current I(E)", smooth=0): if type(filename) == type(""): data = GetData(filename) else: data = filename filename = "raw data" if (title == ""): title = BaseName(filename) if len(data) <= 0: return data if (smooth > 0): if type(data) == type([]): for i in range(0, smooth[0]): data = Smooth(data, m=smooth[1]) else: data = Smooth(data, m = smooth) if calibrate: data = CalibrateData(data) units = ["V", "uA"] else: units = ["DAC counts", "ADC counts"] if not normalise: gnuplot("set ylabel \"I(U) ("+str(units[1])+")\"") else: data = MaxNormalise(data) gnuplot("set ylabel \"I(U) (arb. units)\"") if (output != None and type(output) == type("")): gnuplot("set term png size 640,480") gnuplot("set output \""+str(output)+"\"") gnuplot("set title \""+str(master_title)+"\"") gnuplot("set xlabel \"U ("+str(units[0])+")\"") ymax = 0.005 + 1.2 * max(data, key=lambda e : e[2])[2] ymin = -0.005 + 1.2 * min(data, key=lambda e : e[2])[2] gnuplot("set yrange ["+str(ymin)+":"+str(ymax)+"]") #d = Derivative(data, 1, 2, step=step) plotList = [] plotList.append(Gnuplot.Data(data, using="2:3", with_=with_,title=title)) time.sleep(0.1) if (show_error): error1 = copy.deepcopy(data) error2 = copy.deepcopy(data) for i in range(len(data)): #print str(data[i]) error1[i][2] -= 0.50*float(data[i][3]) error2[i][2] += 0.50*float(data[i][3]) plotList.append(Gnuplot.Data(error1, using="2:3", with_=w,title="Error : Low bound")) plotList.append(Gnuplot.Data(error2, using="2:3", with_=w, title="Error : Upper bound")) if plot != None: plot(*plotList) if (output != None and type(output) == type("")): gnuplot("set term wxt") return data else: return plotList def ReadParameters(filename): parameters = odict.odict() input_file = open(filename, "r") for line in input_file: k = line.split("=") item = None #print str(k) if (len(k) >= 2): item = k[0].strip("# \r\n") value = k[1].strip("# \r\n") if (item in parameters): parameters[item] = value else: parameters.update({str(item) : value}) input_file.close() return parameters def PlotParameters(filename): ReadParameters(filename) def Smooth(data, m, k=2): smooth = copy.deepcopy(data) for i in range(len(smooth)): count = 0 smooth[i][k] = 0.0 for j in range(i-m,i+m): if j >= 0 and j < len(smooth): count += 1 smooth[i][k] += data[j][k] if count > 0: smooth[i][k] = smooth[i][k] / float(count) else: smooth[i][k] = data[i][k] return smooth def PeakFind(data, k=2,threshold=0.00, inflection=0): results = [] for i in range(len(data)): if i == 0 or i == len(data)-1: continue #if abs(data[i][k]) < threshold * abs(max(data, key = lambda e : abs(e[k]))[k]): # continue left = data[i-1][k] - data[i][k] right = data[i+1][k] - data[i][k] if abs(left) < threshold*abs(data[i][k]): continue if abs(right) < threshold*abs(data[i][k]): continue if left*right > 0: results.append(data[i] + [inflection]) if inflection > 0: results += PeakFind(Derivative(data), k=k, threshold=threshold, inflection=inflection-1) return results def SmoothPeakFind(data, a=1, k=2, ap=DoNothing, stop=10,smooth=5, inflection=0): s = data #results = [] peakList = [] m = 0 while m < stop: #results.append([]) peaks = PeakFind(ap(s),k=k, inflection=inflection) #print "m = " +str(m) for p in peaks: add = [m] [add.append(f) for f in p] if m == 0: #print "*New peak at " + str(p) peakList.append([add]) else: score = [] for i in range(len(peakList)): p2 = peakList[i][len(peakList[i])-1] if m - p2[0] > 1: continue score.append([i, abs(p[a] - p2[1+a])]) score.sort(key = lambda e : e[1]) if len(score) == 0 or score[0][1] > 100: #print "New peak at " + str(p) peakList.append([add]) else: #print "Peak exists near " + str(p) + " ("+str(score[0][1])+") " + str(peakList[score[0][0]][len(peakList[score[0][0]])-1]) peakList[score[0][0]].append(add) #results.append([m, []]) #[results[len(results)-1].append(f) for f in p] m += 1 s = Smooth(s, m=smooth,k=k) #results.sort(key = lambda e : e[2]) #peaks = [] return peakList def PlotPeaks(peaks, calibrate=True, with_="lp", plot=gnuplot.replot): plotList = [] for p in peaks: p.append(copy.deepcopy(p[len(p)-1])) p[len(p)-1][0] += 1 #print "Adding " + str(p) + " to list" if len(p) >= 0: l = p[len(p)-1] if l[len(l)-1] < 1: with_ = with_.split(" lt")[0] + " lt 9" plotList.append(Gnuplot.Data(p, using="3:1", with_=with_)) if len(plotList) > 0 and plot != None: plot(*plotList) time.sleep(0.2) #print str(plotList) #for p in peaks: # p = p[0:len(p)-1] return plotList def main(): if (len(sys.argv) < 2): sys.stderr.write(sys.argv[0] + " - Require arguments (filename)\n") return 1 i = 1 plotFunc = ShowTCS normalise = True title = "" master_title = "" smooth=0 with_="lp" while i < len(sys.argv): if sys.argv[i] == "--raw": plotFunc = ShowData elif sys.argv[i] == "--tcs": plotFunc = ShowTCS #lambda e : ShowTCS(e, show_peak=False) elif sys.argv[i] == "--output": if i+1 >= len(sys.argv): sys.stderr.write("Need argument for "+sys.argv[i]+" switch\n") sys.exit(1) gnuplot("set term postscript colour") gnuplot("set output \""+sys.argv[i+1]+"\"") i += 1 elif sys.argv[i] == "--wxt": gnuplot("set term wxt") elif sys.argv[i] == "--normalise": normalise = True elif sys.argv[i] == "--unnormalise": normalise = False elif sys.argv[i] == "--title": if i+1 >= len(sys.argv): sys.stderr.write("Need argument for "+sys.argv[i]+" switch\n") sys.exit(1) title = sys.argv[i+1] i += 1 elif sys.argv[i] == "--master_title": if i+1 >= len(sys.argv): sys.stderr.write("Need argument for "+sys.argv[i]+" switch\n") sys.exit(1) master_title = sys.argv[i+1] i += 1 elif sys.argv[i] == "--smooth": if i+1 >= len(sys.argv): sys.stderr.write("Need argument for "+sys.argv[i]+" switch\n") sys.exit(1) smooth = sys.argv[i+1] smooth = map(int, smooth.split("x")) if len(smooth) <= 1: smooth = smooth[0] i += 1 elif sys.argv[i] == "--with": if i+1 >= len(sys.argv): sys.stderr.write("Need argument for "+sys.argv[i]+" switch\n") sys.exit(1) with_ = sys.argv[i+1] i += 1 elif sys.argv[i] == "--output": if i+1 >= len(sys.argv): sys.stderr.write("Need argument for "+sys.argv[i]+" switch\n") sys.exit(1) gnuplot("set term postscript colour") gnuplot("set output \""+str(argv[i+1])+"\"") else: plotFunc(sys.argv[i], plot=gnuplot.replot, normalise=normalise, title=title, master_title=master_title, smooth=smooth, with_=with_) i += 1 print "Done. Press enter to exit, or type name of file to save as." out = sys.stdin.readline().strip("\t\r\n #") if out != "": gnuplot("set term postscript colour enhanced \"Arial Bold,18\"") gnuplot("set output \""+out+"\"") gnuplot.replot() def ModelTCS(f, sigma, Emin, Emax, dE): data = [] E = Emin while E < Emax: S = (1 - sigma(0))*f(-E) + FuncIntegrate(lambda e : f(e - E) * FuncDerivative(sigma, E, dE), Emin, Emax, dE) data.append([0.00, E, S,0.00]) E += dE return data def IntegrateTCS(data, imin, imax=0, di=1): i = imin if imax == 0: imax = len(data)-1 total = 0.0 while i < imax: total += data[i][2] * (data[i+1][1] - data[i][1]) i += di return total def FuncIntegrate(f, xmin, xmax, dx): x = xmin total = 0.0 while x <= xmax: total += f(x) * dx x += dx return total def FuncDerivative(f, x, dx): return 0.50*(f(x+dx) - f(x-dx))/dx def FitTCS(data, min_mse=1e-4, max_fail=100, max_adjust=4,divide=10, plot=gnuplot.plot,smooth=0): if type(data) == type(""): d = GetData(data) d = CalibrateData(d) d = MaxNormalise(d) d = Derivative(d) else: d = data if smooth != 0: if type(smooth) == type([]): for _ in range(smooth[0]): d = Smooth(d, m=smooth[1]) else: d = Smooth(d, m=smooth) plotItems = ShowTCS(d, raw=False,smooth=smooth,plot=None) plotItems.append(None) peaks = SmoothPeakFind(d, smooth=5, stop=1, inflection=0) peaks.sort(key = lambda e : e[len(e)-1][1]) fits = [] for i in range(0,len(peaks)): p = peaks[i] l = p[len(p)-1] if l[len(l)-1] == 0: fits.append([l[3], l[2], 1.0]) else: if i-2 >= 0: l = peaks[i-2][len(peaks[i-2])] if l[len(l)-1] == 0: fits.append([l[3], l[2], 1.0]) if i+2 <= len(peaks)-1: l = peaks[i+2][len(peaks[i+2])] if l[len(l)-1] == 0: fits.append([l[3], l[2], 1.0]) for i in range(len(fits)): left = 2.0 right = 2.0 if i > 0: left = fits[i-1][1] - fits[i][1] if i < len(fits)-1: right = fits[i+1][1] - fits[i][1] fits[i][2] = min([abs(0.5*left), abs(0.5*right)]) #print "Fits are " + str(fits) #stdin.readline() def tcs(E): total = 0.0 for f in fits: dt = f[0] * gaussian(E - f[1], f[2]) #print " Increase total by " + str(dt) total += dt #print "tcs returns " + str(total) return total mse = 1 old_mse = 1 cycle = 0 failcount = 0 adjust = 1.0 iterations = 0 while failcount < max_fail and mse > min_mse: i = random.randint(0, len(fits)-1) j = random.randint(0, len(fits[i])-1) #while j == 1: # j = random.randint(0, len(fits[i])-1) #print "Adjust " + str(i) + ","+str(j) + ": Iteration " + str(iterations) + " mse: " + str(mse) old = fits[i][j] old_mse = mse fits[i][j] += adjust * (random.random() - 0.50) if i == 2: while fits[i][j] <= 0.0005: fits[i][j] = adjust * (random.random() - 0.50) model = table(lambda e : [0.00, e, tcs(e), 0.00], 0, 16.8, divide*d[len(d)-1][1]/(len(d))) mse = MeanSquareError(model, d[0::divide]) if mse >= old_mse: fits[i][j] = old failcount += 1 if failcount > max_fail / 2: if adjust > 1.0/(2.0**max_adjust): adjust /= 2 mse = old_mse else: #adjust /= 2.0 failcount = 0 iterations += 1 #model = table(lambda e : [0.00, e, tcs(e), 0.00], 0, 16.8, 16.8/len(d)) plotItems[len(plotItems)-1] = Gnuplot.Data(model, using="2:3", with_="l lt 3", title="model") time.sleep(0.1) fits.sort(key = lambda e : e[0] * gaussian(0, e[2]), reverse=True) if plot != None: gnuplot("set title \"MSE = "+str(mse)+"\\nfailcount = "+str(failcount)+"\\nadjust = "+str(adjust)+"\"") gnuplot.plot(*plotItems) return [fits, model] else: return [fits, model,plotItems] #return model def SaveFit(filename, fit): out = open(filename, "w", 0) out.write("# TCS Fit\n") for f in fit: out.write(str(f[0]) + "\t" + str(f[1]) + "\t" + str(f[2]) + "\n") out.close() def LoadFit(filename): infile = open(filename, "r") if (infile.readline() != "# TCS Fit\n"): sys.stderr.write("Error loading fit from file " + str(filename) + "\n") sys.exit(0) fit = [] while True: f = infile.readline().strip("# \r\n\t").split("\t") if len(f) != 3: break fit.append(map(float, f)) infile.close() fit.sort(key = lambda e : e[0] * gaussian(0, e[2]), reverse=True) #def model(e): # total = 0.0 # for f in fit: # total += f[0] * gaussian(e - f[1], f[2]) # return total #return table(lambda e : [0.0, e, model(e), 0.0], 0.0, 16.8, 16.8/400) return fit def MeanSquareError(model, real, k = 2): mse = 0.0 for i in range(len(real)): mse += (model[i][k] - real[i][k])**2 mse /= len(model) return mse def delta(x): if (x == 0): return 1.0 else: return 0.0 def table(f, xmin, xmax, dx): result = [] x = xmin while (x <= xmax): result.append(f(x)) x += dx return result def gaussian(x, sigma): if (sigma == 0.0): return 0.0 return math.exp(- (x**2.0)/(2.0 * sigma**2.0)) / (sigma * (2.0 * math.pi)**0.50) def step(x, sigma, T): if T == 0: return 1.0 return 1.0 / (math.exp((x - sigma)/T) + 1.0) if __name__ == "__main__": sys.exit(main())