3 Tiers are used so that the execution of multiple tiers can be 4 processed inside a single loop. Tier-to-tier differences are small 5 and can be configured based on a small number of options. 6 The options defined are: 7 inspect_only : This means we only look at the input file and 8 do not try and run an art process. Typically this 9 is used to study an input file, such as that coming 10 off of the detector. It can also be used for 11 starting the production chain off at a higher tier, 12 providing an appropriate input file is provided. 13 parent_in_sam : If the input file is in SAM we can grab it's meta- 15 input_name : Input file name. 17 output_name : Output file name. Note this only works if there is 18 only a single, not TFileService output. Exclusive 19 with multiple_outputs. 20 multiple_outputs : An array of output names for the case where a 21 tier produces multiple output files. Exclusive 23 precommand : Use subprocess to exectute this shell command before 24 running the tier. This is used for example to 25 generate simulation generation FHiCLs. 26 multiply_events : Multiple the number of events requested for this 27 stage by this factor. This is used for example for 28 ND rock overlay events where a large number of 29 these need to be overlaid onto a single beam 32 import ProductionTestTools
as tools
33 from Metric
import Metric
34 import ParserDBUsage
as db_parser
41 self.
long_name = kwargs.get(
"long_name",
"A blank tier")
45 self.
fcl = kwargs.get(
"fcl",
False)
60 _repr =
"tier : Tier class\n" 61 _repr +=
"tier : short_name %s\n"%(repr(self.
short_name))
62 _repr +=
"tier : long_name %s\n"%(repr(self.
long_name))
63 _repr +=
"tier : inspect_only %s\n"%(repr(self.
inspect_only))
64 _repr +=
"tier : parent_in_sam %s\n"%(repr(self.
parent_in_sam))
65 _repr +=
"tier : input_name %s\n"%(repr(self.
input_name))
66 _repr +=
"tier : fcl %s\n"%(repr(self.
fcl))
67 _repr +=
"tier : output_name %s\n"%(repr(self.
output_name))
68 _repr +=
"tier : precommand %s\n"%(repr(self.
precommand))
74 _repr +=
"tier : log lines %s" %(len(self.
log))
80 def runTier(self, dry_run, n_events, verbose, log_file):
81 if "os" not in dir():
import os
85 print "tier : running precommand: %s"%self.
precommand 86 if "subprocess" not in dir():
import subprocess
87 cmd_list = self.precommand.split()
89 return_code = subprocess.check_call(cmd_list)
90 except subprocess.CalledProcessError:
91 print "tier : Error, precommand: %s failed"%self.
precommand 92 self.metric.return_code = 1
94 if verbose:
print "tier : precommand completed with return code: %i"%return_code
98 print "tier : running universal modify fcl" 99 if "subprocess" not in dir():
import subprocess
100 mod_fcl_name =
"%s_mod.fcl"%self.fcl.split(
"/")[-1][:-4]
101 cmd =
"python modifyFHiCL.py -b %s -f %s -a -v"%(self.
fcl, mod_fcl_name)
102 print "tier : cmd: %s"%cmd
103 self.
fcl = mod_fcl_name
104 cmd_list = cmd.split()
106 return_code = subprocess.check_call(cmd_list)
107 except subprocess.CalledProcessError:
108 print "tier : Error, universal modify fcl" 109 self.metric.return_code = 1
111 if verbose:
print "tier : universal modify fcl completed with return code: %i"%return_code
115 if os.path.exists(
"timing.db"):
116 os.system(
"rm timing.db")
120 self.metric.output_name =
"n/a" 121 self.metric.run =
True 123 size_string, size_float = self.tools.inspectFile(self.
input_name)
124 self.metric.output_size = size_float
129 input_metadata = self.tools.getSAMMetaData(self.input_name.split(
"/")[-1])
130 n_events = input_metadata[
"Online.TotalEvents"]
131 self.metric.input_events = n_events
137 if verbose:
print "tier : Multiplying the number of events requested %i by %i to %i"%(n_events, self.
multiply_events, n_events*self.
multiply_events)
143 if verbose:
print "tier : Minimum number of events %i is greater than requested number of events %i, so using minimum"%(self.
minimum_events, n_events)
150 file_name = self.tools.findWildcardFile(self.
input_name)
153 print "tier : Input file %s not found, exiting"%self.
input_name 162 cmd_list.append(
"-c")
163 cmd_list.append(self.
fcl)
168 cmd_list.append(
"-o")
171 cmd_list.append(
"-n")
172 cmd_list.append(
str(n_events))
174 cmd =
" ".join(cmd_list)
175 print "tier : cmd: %s"%cmd
178 print "tier : running..." 179 stdout_redirect =
False 180 stderr_redirect =
False 182 stdout_redirect =
open(
"%s.out"%log_file,
"w")
183 print "tier : writing stdout/err to: %s"%stdout_redirect.name
187 print "tier : done, got return code: %i"%(return_code)
192 if "MemoryMonitor" not in dir():
import MemoryMonitor
193 if "subprocess" not in dir():
import subprocess
194 if "resource" not in dir():
import resource
195 if "time" not in dir():
import time
196 if "sys" not in dir():
import sys
197 if "os" not in dir():
import os
200 if "psutil" not in dir():
205 print "tier : psutils not found, advanced features disabled" 206 self.metric.run =
True 208 self.metric.input_events = self.tools.countEventsInRootFile(self.
input_name)
209 usage_start = resource.getrusage(resource.RUSAGE_CHILDREN)
211 MEMORY_LIMIT = 2.9*1024*1024*1024
214 stdout = subprocess.PIPE
215 stderr = subprocess.STDOUT
219 process = subprocess.Popen(cmd_list, stdout=stdout, stderr=stderr)
220 SLICE_IN_SECONDS = 1.
223 p = psutil.Process(process.pid)
224 self.metric.enable_advanced_metrics()
226 start_time = time.time()
227 previous_sample_time = 0.
231 if stdout_redirect: line = process.stdout.readline()
233 elapsed_time = time.time()-start_time
234 if (line !=
"")
or ((elapsed_time - previous_sample_time) > SLICE_IN_SECONDS):
236 sample_times.append(elapsed_time)
237 if advanced: self.metric.fill_advanced_metrics(p)
239 lines.append(
"TimeMarker: %f\n"%(elapsed_time))
241 previous_sample_time = elapsed_time
242 if elapsed_time > 60: SLICE_IN_SECONDS = 10.
243 if elapsed_time > 60*60: SLICE_IN_SECONDS = 60.
246 if memory_usage[-1] > MEMORY_LIMIT:
247 line =
"Tier : Subprocess exceeded memory limit of %i (current usage: %i), so was killed\n"%\
248 (MEMORY_LIMIT,memory_usage[-1])
250 sys.stderr.write(line)
253 if line ==
'' and process.poll() !=
None:
break 258 stdout_redirect.write(line)
260 stdout_redirect.close()
262 return_code = process.returncode
264 print "tier : Exception caught while executing nova executable, return code: %i"%return_code
266 self.metric.return_code = return_code
267 self.metric.sample_times = sample_times
268 usage_end = resource.getrusage(resource.RUSAGE_CHILDREN)
270 u_cpu_time = usage_end.ru_utime - usage_start.ru_utime
271 s_cpu_time = usage_end.ru_stime - usage_start.ru_stime
272 max_memory =
max(memory_usage)
279 self.metric.user_cpu = u_cpu_time
280 self.metric.system_cpu = s_cpu_time
281 self.metric.peak_memory = max_memory
282 self.metric.memory = memory_usage
285 self.metric.used_events =
int(cmd_list[cmd_list.index(
"-n")+1])
289 self.metric.output_name = [cmd_list[cmd_list.index(
"-o")+1]]
290 output_size, size_float = self.tools.inspectFile(self.metric.output_name[0])
291 self.metric.output_size = [size_float]
292 self.metric.output_events = [self.tools.countEventsInRootFile(self.metric.output_name[0])]
293 self.metric.file_size_ana = [self.tools.fileSizeAna(self.metric.output_name[0])]
298 self.metric.db_usage = self.db_parser.parseSTDOUT(stdout_redirect.name)
300 self.metric.db_usage = self.db_parser.DBUsage()
303 self.metric.output_name = []
304 self.metric.output_events = []
305 self.metric.output_size = []
306 self.metric.file_size_ana = []
309 print "tier : Checking output: %s"%output
311 file_name = self.tools.findWildcardFile(output)
312 if file_name: output = file_name
314 output_size, size_float = self.tools.inspectFile(output)
316 print "tier : Output file: %s has not been created"%output
318 self.metric.output_name.append(output)
319 self.metric.output_size.append(size_float)
320 self.metric.output_events.append(self.tools.countEventsInRootFile(output))
321 self.metric.file_size_ana.append(self.tools.fileSizeAna(output))
325 print "tier : Parse timing DB" 326 import ViewTimingDB
as view
328 cmd =
"plot_timetracker -d timing.db -b -q -J %s"%json_name
329 print "tier : cmd: %s"%cmd
330 cmd_list = cmd.split()
332 db_return_code = subprocess.check_call(cmd_list)
333 except subprocess.CalledProcessError:
334 print "tier : Error, parse timing DB failed" 335 print "tier : parse timing DB completed with return code: %i"%db_return_code
336 if db_return_code == 0:
337 print "tier : - converting to CSV" 339 timing_db = view.ViewTimingDB(json_name)
340 timing_db.write_csv(csv_name)
def setChainName(self, chain_name)
def prepareAndExecuteProcess(self, dry_run, n_events, verbose, log_file)
procfile open("FD_BRL_v0.txt")
def __init__(self, kwargs)
def executeProcess(self, cmd_list, stdout_redirect)
def resident(pid, since=0.0)
T max(sqlite3 *const db, std::string const &table_name, std::string const &column_name)
def runTier(self, dry_run, n_events, verbose, log_file)