#!/usr/bin/env python from CMGTools.TTHAnalysis.treeReAnalyzer import * from glob import glob import os.path, re, types, itertools from time import sleep MODULES = [] from CMGTools.TTHAnalysis.tools.edgeFriends import edgeFriends, _susyEdgeTight MODULES.append( ('edgeFriends', edgeFriends("Edge", lambda lep : _susyEdgeTight(lep), cleanJet = lambda lep,jet,dr : (jet.pt < 35 and dr < 0.4)) ) ) class VariableProducer(Module): def __init__(self,name,booker,modules): Module.__init__(self,name,booker) self._modules = [ (n,m() if type(m) == types.FunctionType else m) for (n,m) in modules ] def beginJob(self): self.t = PyTree(self.book("TTree","t","t")) self.branches = {} for name,mod in self._modules: print name print mod.listBranches() for B in mod.listBranches(): # don't add the same branch twice if B in self.branches: print "Will not add branch %s twice" % (B,) continue self.branches[B] = True if type(B) == tuple: if len(B) == 2: self.t.branch(B[0],B[1]) elif len(B) == 4: self.t.branch(B[0],B[1],n=B[2],lenVar=B[3]) elif len(B) == 3: self.t.branch(B[0],B[1],n=B[2],lenVar=None) else: self.t.branch(B ,"F") def analyze(self,event): for name,mod in self._modules: keyvals = mod(event) for B,V in keyvals.iteritems(): setattr(self.t, B, V) setattr(event, B, V) self.t.fill() import os, itertools from optparse import OptionParser parser = OptionParser(usage="%prog [options] ") parser.add_option("-m", "--modules", dest="modules", type="string", default=[], action="append", help="Run these modules"); parser.add_option("-d", "--dataset", dest="datasets", type="string", default=[], action="append", help="Process only this dataset (or dataset if specified multiple times)"); parser.add_option("-D", "--dm", "--dataset-match", dest="datasetMatches", type="string", default=[], action="append", help="Process only this dataset (or dataset if specified multiple times): REGEXP"); parser.add_option("-c", "--chunk", dest="chunks", type="int", default=[], action="append", help="Process only these chunks (works only if a single dataset is selected with -d)"); parser.add_option("-N", "--events", dest="chunkSize", type="int", default=500000, help="Default chunk size when splitting trees"); parser.add_option("-j", "--jobs", dest="jobs", type="int", default=1, help="Use N threads"); parser.add_option("-p", "--pretend", dest="pretend", action="store_true", default=False, help="Don't run anything"); parser.add_option("-T", "--tree-dir", dest="treeDir", type="string", default="sf", help="Directory of the friend tree in the file (default: 'sf')"); parser.add_option("-q", "--queue", dest="queue", type="string", default=None, help="Run jobs on lxbatch instead of locally"); parser.add_option("-t", "--tree", dest="tree", default='ttHLepTreeProducerTTH', help="Pattern for tree name"); parser.add_option("-V", "--vector", dest="vectorTree", action="store_true", default=True, help="Input tree is a vector"); parser.add_option("-F", "--add-friend", dest="friendTrees", action="append", default=[], nargs=2, help="Add a friend tree (treename, filename). Can use {name}, {cname} patterns in the treename") parser.add_option("--FMC", "--add-friend-mc", dest="friendTreesMC", action="append", default=[], nargs=2, help="Add a friend tree (treename, filename) to MC only. Can use {name}, {cname} patterns in the treename") parser.add_option("--FD", "--add-friend-data", dest="friendTreesData", action="append", default=[], nargs=2, help="Add a friend tree (treename, filename) to data trees only. Can use {name}, {cname} patterns in the treename") parser.add_option("-L", "--list-modules", dest="listModules", action="store_true", default=False, help="just list the configured modules"); parser.add_option("-n", "--new", dest="newOnly", action="store_true", default=False, help="Make only missing trees"); parser.add_option("-I", "--import", dest="imports", type="string", default=[], action="append", help="Modules to import"); (options, args) = parser.parse_args() if options.imports: MODULES = [] from importlib import import_module for mod in options.imports: import_module(mod) obj = sys.modules[mod] for (name,x) in obj.MODULES: print "Loaded %s from %s " % (name, mod) MODULES.append((name,x)) if options.listModules: print "List of modules" for (n,x) in MODULES: if type(x) == types.FunctionType: x = x() print " '%s': %s" % (n,x) exit() if "{P}" in args[1]: args[1] = args[1].replace("{P}",args[0]) if len(args) != 2 or not os.path.isdir(args[0]): print "Usage: program " exit() if not os.path.isdir(args[1]): os.system("mkdir -p "+args[1]) if not os.path.isdir(args[1]): print "Could not create output directory" exit() if len(options.chunks) != 0 and len(options.datasets) != 1: print "must specify a single dataset with -d if using -c to select chunks" exit() jobs = [] for D in glob(args[0]+"/*"): treename = options.tree fname = "%s/%s/%s_tree.root" % (D,options.tree,options.tree) os.system('ls -l %s/%s/'%(D,options.tree)) print fname if (not os.path.exists(fname)) and (os.path.exists("%s/%s/tree.root" % (D,options.tree)) ): treename = "tree" fname = "%s/%s/tree.root" % (D,options.tree) else: print 'path does not exist', fname if (not os.path.exists(fname)) and (os.path.exists("%s/%s/tree.root.url" % (D,options.tree)) ): treename = "tree" fname = "%s/%s/tree.root" % (D,options.tree) fname = open(fname+".url","r").readline().strip() if os.path.exists(fname) or (os.path.exists("%s/%s/tree.root.url" % (D,options.tree))): short = os.path.basename(D) if options.datasets != []: if short not in options.datasets: continue if options.datasetMatches != []: found = False for dm in options.datasetMatches: if re.match(dm,short): found = True if not found: continue data = ("DoubleMu" in short or "MuonEG" in short or "DoubleEG" in short or 'DoubleEl' in short or "JetHT" in short or "HTMHT" in short) f = ROOT.TFile.Open(fname) t = f.Get(treename) if not t: print "Corrupted ",fname continue entries = t.GetEntries() f.Close() if options.newOnly: fout = "%s/evVarFriend_%s.root" % (args[1],short) if os.path.exists(fout): f = ROOT.TFile.Open(fname); t = f.Get(treename) if t.GetEntries() != entries: print "Component %s has to be remade, mismatching number of entries (%d vs %d)" % (short, entries, t.GetEntries()) f.Close() else: print "Component %s exists already and has matching number of entries (%d)" % (short, entries) continue chunk = options.chunkSize nchunks = int(entries/chunk)+(1 if entries%chunk else 0) if entries < chunk: print " ",os.path.basename(D),(" DATA" if data else " MC")," single chunk" jobs.append((short,fname,"%s/evVarFriend_%s.root" % (args[1],short),data,xrange(entries),-1)) else: nchunk = int(ceil(entries/float(chunk))) print " ",os.path.basename(D),(" DATA" if data else " MC")," %d chunks" % nchunk for i in xrange(nchunk): if options.chunks != []: if i not in options.chunks: continue r = xrange(int(i*chunk),min(int((i+1)*chunk),entries)) jobs.append((short,fname,"%s/evVarFriend_%s.chunk%d.root" % (args[1],short,i),data,r,i)) print "\n" print "I have %d task(s) to process" % len(jobs) if options.queue: import os, sys # basecmd = "bsub -q {queue} {dir}/lxbatch_runner.sh {dir} {cmssw} python {self} -N {chunkSize} -T '{tdir}' -t {tree} {data} {output}".format( # queue = options.queue, dir = os.getcwd(), cmssw = os.environ['CMSSW_BASE'], # self=sys.argv[0], chunkSize=options.chunkSize, tdir=options.treeDir, tree=options.tree, data=args[0], output=args[1] # ) # basecmd = "qsub -q batch {dir}/oviedobatch_runner.sh {dir} {cmssw} python {self} -N {chunkSize} -T '{tdir}' -t {tree} {data} {output}".format( runner = "oviedobatch_runner.sh" basecmd = "echo \" bash {dir}/{runner} {dir} {cmssw} python {self} -N {chunkSize} -T {tdir} -t {tree} {data} {output} ".format( dir = os.getcwd(), cmssw = os.environ['CMSSW_BASE'], runner=runner, self=sys.argv[0], chunkSize=options.chunkSize, tdir=options.treeDir, tree=options.tree, data=args[0], output=args[1] ) if options.vectorTree: basecmd += " --vector " friendPost = "".join([" -F %s %s " % (fn,ft) for fn,ft in options.friendTrees]) friendPost += "".join([" --FM %s %s " % (fn,ft) for fn,ft in options.friendTreesMC]) friendPost += "".join([" --FD %s %s " % (fn,ft) for fn,ft in options.friendTreesData]) friendPost += "".join([" -m '%s' " % m for m in options.modules]) for (name,fin,fout,data,range,chunk) in jobs: if chunk != -1: print "{base} -d {data} -c {chunk} {post} \" | qsub -q batch -N HappyTreeFriends".format(base=basecmd, data=name, chunk=chunk, post=friendPost) else: print "{base} -d {data} {post} \" | qsub -q batch -N HappyTreeFriends".format(base=basecmd, data=name, chunk=chunk, post=friendPost) exit() maintimer = ROOT.TStopwatch() def _runIt(myargs): (name,fin,fout,data,range,chunk) = myargs timer = ROOT.TStopwatch() print 'i am running of filename', fin fb = ROOT.TFile(fin) print 'number of jobs', len(jobs) fetchedfile = None if 'LSB_JOBID' in os.environ or 'LSF_JOBID' in os.environ: if fin.startswith("root://"): try: tmpdir = os.environ['TMPDIR'] if 'TMPDIR' in os.environ else "/tmp" tmpfile = "%s/%s" % (tmpdir, os.path.basename(fin)) print "xrdcp %s %s" % (fin, tmpfile) os.system("xrdcp %s %s" % (fin, tmpfile)) if os.path.exists(tmpfile): fin = tmpfile fetchedfile = fin print "success :-)" except: pass fb = ROOT.TFile.Open(fin) elif "root://" in fin: ## from here what is in marco's code ROOT.gEnv.SetValue("TFile.AsyncReading", 1); ROOT.gEnv.SetValue("XNet.Debug=0", ROOT.kEnvAll); # suppress output about opening connections ROOT.gEnv.SetValue("XrdClientDebug.kUSERDEBUG", 0); # suppress output about opening connections fb = ROOT.TXNetFile(fin+"?readaheadsz=65535&DebugLevel=0") os.environ["XRD_DEBUGLEVEL"]="0" os.environ["XRD_DebugLevel"]="0" os.environ["DEBUGLEVEL"]="0" os.environ["DebugLevel"]="0" else: fb = ROOT.TFile.Open(fin) print fb cn = fb.Get('Count') cnlhe = fb.Get('CountLHE') cnsms = fb.Get('CountSMS') sumgen= fb.Get('SumGenWeights') print 'this is the status of sumgen', sumgen print "getting tree.." tb = fb.Get(options.tree) if not tb: tb = fb.Get("tree") # new trees if options.vectorTree: tb.vectorTree = True else: tb.vectorTree = False friends = options.friendTrees[:] friends += (options.friendTreesData if data else options.friendTreesMC) friends_ = [] # to make sure pyroot does not delete them for tf_tree,tf_file in friends: tf = tb.AddFriend(tf_tree, tf_file.format(name=name, cname=name)), friends_.append(tf) # to make sure pyroot does not delete them nev = tb.GetEntries() if options.pretend: print "==== pretending to run %s (%d entries, %s) ====" % (name, nev, fout) return (name,(nev,0)) print "==== %s starting (%d entries) ====" % (name, nev) booker = Booker(fout) #scaling = len(jobs) #int(ceil(entries/float(chunk))) print 'total number of chunks', nchunks if not cn == None: cn .Scale(1./nchunks); booker.book('TH1D', cn ) if not cnlhe == None: cnlhe .Scale(1./nchunks); booker.book('TH1D', cnlhe ) if not sumgen == None: sumgen.Scale(1./nchunks); booker.book('TH1D', sumgen) if not cnsms == None: cnsms .Scale(1./nchunks); booker.book('TH3D', cnsms ) modulesToRun = MODULES if options.modules != []: toRun = {} for m,v in MODULES: for pat in options.modules: if re.match(pat,m): toRun[m] = True modulesToRun = [ (m,v) for (m,v) in MODULES if m in toRun ] el = EventLoop([ VariableProducer(options.treeDir,booker,modulesToRun), ]) el.loop([tb], eventRange=range) booker.done() fb.Close() time = timer.RealTime() print "=== %s done (%d entries, %.0f s, %.0f e/s) ====" % ( name, nev, time,(nev/time) ) if fetchedfile and os.path.exists(fetchedfile): print 'Cleaning up: removing %s'%fetchedfile os.system("rm %s"%fetchedfile) return (name,(nev,time)) if options.jobs > 0: from multiprocessing import Pool pool = Pool(options.jobs) ret = dict(pool.map(_runIt, jobs)) if options.jobs > 0 else dict([_runIt(j) for j in jobs]) else: ret = dict(map(_runIt, jobs)) fulltime = maintimer.RealTime() totev = sum([ev for (ev,time) in ret.itervalues()]) tottime = sum([time for (ev,time) in ret.itervalues()]) print "Done %d tasks in %.1f min (%d entries, %.1f min)" % (len(jobs),fulltime/60.,totev,tottime/60.)