Tier.py
Go to the documentation of this file.
1 class Tier():
2  """
3  Tiers are used so that the execution of multiple tiers can be
4  processed inside a single loop. Tier-to-tier differences are small
5  and can be configured based on a small number of options.
6  The options defined are:
7  inspect_only : This means we only look at the input file and
8  do not try and run an art process. Typically this
9  is used to study an input file, such as that coming
10  off of the detector. It can also be used for
11  starting the production chain off at a higher tier,
12  providing an appropriate input file is provided.
13  parent_in_sam : If the input file is in SAM we can grab it's meta-
14  data from there.
15  input_name : Input file name.
16  fcl : FHiCL to run.
17  output_name : Output file name. Note this only works if there is
18  only a single, not TFileService output. Exclusive
19  with multiple_outputs.
20  multiple_outputs : An array of output names for the case where a
21  tier produces multiple output files. Exclusive
22  with output name.
23  precommand : Use subprocess to exectute this shell command before
24  running the tier. This is used for example to
25  generate simulation generation FHiCLs.
26  multiply_events : Multiple the number of events requested for this
27  stage by this factor. This is used for example for
28  ND rock overlay events where a large number of
29  these need to be overlaid onto a single beam
30  event.
31  """
32  import ProductionTestTools as tools
33  from Metric import Metric
34  import ParserDBUsage as db_parser
35 
36  def __init__(self, **kwargs):
37  """
38  Constructor
39  """
40  self.short_name = kwargs.get("short_name", "blank_tier")
41  self.long_name = kwargs.get("long_name", "A blank tier")
42  self.inspect_only = kwargs.get("inspect_only", False)
43  self.parent_in_sam = kwargs.get("parent_in_sam", False)
44  self.input_name = kwargs.get("input_name", False)
45  self.fcl = kwargs.get("fcl", False)
46  self.output_name = kwargs.get("output_name", False)
47  self.precommand = kwargs.get("precommand", False)
48  self.multiply_events = kwargs.get("multiply_events", 1)
49  self.minimum_events = kwargs.get("minimum_events", 1)
50  self.multiple_outputs = kwargs.get("multiple_outputs", False)
51  self.universal_fcl_mod = kwargs.get("universal_fcl_mod",True)
52  self.parse_timing_db = kwargs.get("parse_timing_db", True)
53  #self.verbose = kwargs.get("verbose", False)
54  self.metric = self.Metric()
55  self.log = ""
56  self.full_fcl = ""
57  self.fcl_path = ""
58 
59  def __repr__(self):
60  _repr = "tier : Tier class\n"
61  _repr += "tier : short_name %s\n"%(repr(self.short_name))
62  _repr += "tier : long_name %s\n"%(repr(self.long_name))
63  _repr += "tier : inspect_only %s\n"%(repr(self.inspect_only))
64  _repr += "tier : parent_in_sam %s\n"%(repr(self.parent_in_sam))
65  _repr += "tier : input_name %s\n"%(repr(self.input_name))
66  _repr += "tier : fcl %s\n"%(repr(self.fcl))
67  _repr += "tier : output_name %s\n"%(repr(self.output_name))
68  _repr += "tier : precommand %s\n"%(repr(self.precommand))
69  _repr += "tier : multiply_events %s\n"%(repr(self.multiply_events))
70  _repr += "tier : minimum_events %s\n"%(repr(self.minimum_events))
71  _repr += "tier : multiple_outputs %s\n"%(repr(self.multiple_outputs))
72  _repr += "tier : universal fcl mod: %s\n"%(repr(self.universal_fcl_mod))
73  _repr += "tier : parse timing db: %s\n"%(repr(self.parse_timing_db))
74  _repr += "tier : log lines %s" %(len(self.log))
75  return _repr
76 
77  def setChainName(self, chain_name):
78  self.chain_name = chain_name
79 
80  def runTier(self, dry_run, n_events, verbose, log_file):
81  if "os" not in dir(): import os
82 
83  # Run pre command if requested
84  if self.precommand:
85  print "tier : running precommand: %s"%self.precommand
86  if "subprocess" not in dir(): import subprocess
87  cmd_list = self.precommand.split()
88  try:
89  return_code = subprocess.check_call(cmd_list)
90  except subprocess.CalledProcessError:
91  print "tier : Error, precommand: %s failed"%self.precommand
92  self.metric.return_code = 1
93  return self.metric
94  if verbose: print "tier : precommand completed with return code: %i"%return_code
95 
96  # Run universal modify fcl if required
97  if self.universal_fcl_mod:
98  print "tier : running universal modify fcl"
99  if "subprocess" not in dir(): import subprocess
100  mod_fcl_name = "%s_mod.fcl"%self.fcl.split("/")[-1][:-4]
101  cmd = "python modifyFHiCL.py -b %s -f %s -a -v"%(self.fcl, mod_fcl_name)
102  print "tier : cmd: %s"%cmd
103  self.fcl = mod_fcl_name
104  cmd_list = cmd.split()
105  try:
106  return_code = subprocess.check_call(cmd_list)
107  except subprocess.CalledProcessError:
108  print "tier : Error, universal modify fcl"
109  self.metric.return_code = 1
110  return self.metric
111  if verbose: print "tier : universal modify fcl completed with return code: %i"%return_code
112 
113  # Parse DB files setup
114  if self.parse_timing_db:
115  if os.path.exists("timing.db"):
116  os.system("rm timing.db")
117 
118  # Inspect only is used to study files coming off of the detector
119  if self.inspect_only:
120  self.metric.output_name = "n/a"
121  self.metric.run = True
122  try:
123  size_string, size_float = self.tools.inspectFile(self.input_name)
124  self.metric.output_size = size_float
125  except NameError:
126  pass
127 
128  if self.parent_in_sam:
129  input_metadata = self.tools.getSAMMetaData(self.input_name.split("/")[-1])
130  n_events = input_metadata["Online.TotalEvents"]
131  self.metric.input_events = n_events
132 
133  return self.metric
134 
135  # Multiple number of events if requested
136  if self.multiply_events != 1:
137  if verbose: print "tier : Multiplying the number of events requested %i by %i to %i"%(n_events, self.multiply_events, n_events*self.multiply_events)
138  n_events = self.multiply_events*n_events
139 
140  # Minimum number of events
141  if self.minimum_events != 1:
142  if n_events < self.minimum_events:
143  if verbose: print "tier : Minimum number of events %i is greater than requested number of events %i, so using minimum"%(self.minimum_events, n_events)
144  n_events = self.minimum_events
145 
146  # For all other tiers we run an art job
147  # Check the input file exists if we need one
148  if self.input_name:
149  if "*" in self.input_name:
150  file_name = self.tools.findWildcardFile(self.input_name)
151  if file_name: self.input_name = file_name
152  if not (os.path.exists(self.input_name)):
153  print "tier : Input file %s not found, exiting"%self.input_name
154  return self.metric
155  assert ( self.fcl )
156 
157  self.prepareAndExecuteProcess(dry_run, n_events, verbose, log_file)
158  return self.metric
159 
160  def prepareAndExecuteProcess(self, dry_run, n_events, verbose, log_file):
161  cmd_list = ["nova"]
162  cmd_list.append("-c")
163  cmd_list.append(self.fcl)
164  self.fcl_path = self.tools.findFHiCL(self.fcl)
165  if self.fcl_path != "":
166  self.full_fcl = self.tools.fileToString(self.fcl_path)
167  if self.output_name:
168  cmd_list.append("-o")
169  cmd_list.append(self.output_name)
170  if n_events:
171  cmd_list.append("-n")
172  cmd_list.append(str(n_events))
173  if self.input_name: cmd_list.append(self.input_name)
174  cmd = " ".join(cmd_list)
175  print "tier : cmd: %s"%cmd
176 
177  if not dry_run:
178  print "tier : running..."
179  stdout_redirect = False
180  stderr_redirect = False
181  if log_file:
182  stdout_redirect = open("%s.out"%log_file, "w")
183  print "tier : writing stdout/err to: %s"%stdout_redirect.name
184 
185  return_code = self.executeProcess(cmd_list, stdout_redirect)
186 
187  print "tier : done, got return code: %i"%(return_code)
188  return self.metric
189  return False
190 
191  def executeProcess(self, cmd_list, stdout_redirect):
192  if "MemoryMonitor" not in dir(): import MemoryMonitor
193  if "subprocess" not in dir(): import subprocess
194  if "resource" not in dir(): import resource
195  if "time" not in dir(): import time
196  if "sys" not in dir(): import sys
197  if "os" not in dir(): import os
198  # Try and import psutil and enable advanced monitoring
199  advanced = False
200  if "psutil" not in dir():
201  try:
202  import psutil
203  advanced = True
204  except ImportError:
205  print "tier : psutils not found, advanced features disabled"
206  self.metric.run = True
207  if self.input_name and self.input_name[-4:]=="root":
208  self.metric.input_events = self.tools.countEventsInRootFile(self.input_name)
209  usage_start = resource.getrusage(resource.RUSAGE_CHILDREN)
210  # set memory limits for the process
211  MEMORY_LIMIT = 2.9*1024*1024*1024
212 
213  if stdout_redirect:
214  stdout = subprocess.PIPE
215  stderr = subprocess.STDOUT
216  else:
217  stdout = None
218  stderr = None
219  process = subprocess.Popen(cmd_list, stdout=stdout, stderr=stderr)
220  SLICE_IN_SECONDS = 1.
221  memory_usage = []
222  if advanced:
223  p = psutil.Process(process.pid)
224  self.metric.enable_advanced_metrics()
225 
226  start_time = time.time()
227  previous_sample_time = 0.
228  lines = []
229  sample_times = []
230  while True:
231  if stdout_redirect: line = process.stdout.readline()
232  else: line = ""
233  elapsed_time = time.time()-start_time
234  if (line != "") or ((elapsed_time - previous_sample_time) > SLICE_IN_SECONDS):
235  memory_usage.append(MemoryMonitor.resident(process.pid))
236  sample_times.append(elapsed_time)
237  if advanced: self.metric.fill_advanced_metrics(p)
238  if line != "":
239  lines.append("TimeMarker: %f\n"%(elapsed_time))
240  lines.append(line)
241  previous_sample_time = elapsed_time
242  if elapsed_time > 60: SLICE_IN_SECONDS = 10.
243  if elapsed_time > 60*60: SLICE_IN_SECONDS = 60.
244 
245  # limit memory usage
246  if memory_usage[-1] > MEMORY_LIMIT:
247  line = "Tier : Subprocess exceeded memory limit of %i (current usage: %i), so was killed\n"%\
248  (MEMORY_LIMIT,memory_usage[-1])
249  print line
250  sys.stderr.write(line)
251  lines.append(line)
252  process.kill()
253  if line == '' and process.poll() != None: break
254  # if process.poll() != None: break
255 
256  if stdout_redirect:
257  for line in lines:
258  stdout_redirect.write(line)
259  self.log += line
260  stdout_redirect.close()
261 
262  return_code = process.returncode
263  if return_code != 0:
264  print "tier : Exception caught while executing nova executable, return code: %i"%return_code
265 
266  self.metric.return_code = return_code
267  self.metric.sample_times = sample_times
268  usage_end = resource.getrusage(resource.RUSAGE_CHILDREN)
269 
270  u_cpu_time = usage_end.ru_utime - usage_start.ru_utime
271  s_cpu_time = usage_end.ru_stime - usage_start.ru_stime
272  max_memory = max(memory_usage)
273 
274  #memory_usage = convertSize(usage_end.ru_maxrss * resource.getpagesize())
275  # By manual inspection of the memory usage in top I believe ru_maxrss is returning a size in kb
276  #memory_usage = convertSize(usage_end.ru_maxrss * 1024 )
277  #print usage_end.ru_maxrss, resource.getpagesize(), usage_end.ru_maxrss * resource.getpagesize(), memory_usage
278 
279  self.metric.user_cpu = u_cpu_time
280  self.metric.system_cpu = s_cpu_time
281  self.metric.peak_memory = max_memory
282  self.metric.memory = memory_usage
283 
284  if "-n" in cmd_list:
285  self.metric.used_events = int(cmd_list[cmd_list.index("-n")+1])
286 
287  if "-o" in cmd_list:
288  if return_code == 0:
289  self.metric.output_name = [cmd_list[cmd_list.index("-o")+1]]
290  output_size, size_float = self.tools.inspectFile(self.metric.output_name[0])
291  self.metric.output_size = [size_float]
292  self.metric.output_events = [self.tools.countEventsInRootFile(self.metric.output_name[0])]
293  self.metric.file_size_ana = [self.tools.fileSizeAna(self.metric.output_name[0])]
294  # Parse DB usage from the output file
295  # note this only works if we have pushed the STDOUT to a files
296  # if we haven't then we fill a blank metric
297  if stdout_redirect:
298  self.metric.db_usage = self.db_parser.parseSTDOUT(stdout_redirect.name)
299  else:
300  self.metric.db_usage = self.db_parser.DBUsage()
301 
302  if self.multiple_outputs and (return_code == 0):
303  self.metric.output_name = []
304  self.metric.output_events = []
305  self.metric.output_size = []
306  self.metric.file_size_ana = []
307 
308  for output in self.multiple_outputs:
309  print "tier : Checking output: %s"%output
310  if "*" in output:
311  file_name = self.tools.findWildcardFile(output)
312  if file_name: output = file_name
313  try:
314  output_size, size_float = self.tools.inspectFile(output)
315  except NameError:
316  print "tier : Output file: %s has not been created"%output
317  return return_code
318  self.metric.output_name.append(output)
319  self.metric.output_size.append(size_float)
320  self.metric.output_events.append(self.tools.countEventsInRootFile(output))
321  self.metric.file_size_ana.append(self.tools.fileSizeAna(output))
322 
323  # Parse DB files
324  if self.parse_timing_db and os.path.exists("timing.db"):
325  print "tier : Parse timing DB"
326  import ViewTimingDB as view
327  json_name = "timing_db_%s_%s.json"%(self.chain_name,self.short_name)
328  cmd = "plot_timetracker -d timing.db -b -q -J %s"%json_name
329  print "tier : cmd: %s"%cmd
330  cmd_list = cmd.split()
331  try:
332  db_return_code = subprocess.check_call(cmd_list)
333  except subprocess.CalledProcessError:
334  print "tier : Error, parse timing DB failed"
335  print "tier : parse timing DB completed with return code: %i"%db_return_code
336  if db_return_code == 0:
337  print "tier : - converting to CSV"
338  csv_name = "timing_db_%s_%s.csv" %(self.chain_name,self.short_name)
339  timing_db = view.ViewTimingDB(json_name)
340  timing_db.write_csv(csv_name)
341 
342  return return_code
343  # return 0
def __repr__(self)
Definition: Tier.py:59
universal_fcl_mod
Definition: Tier.py:51
parse_timing_db
Definition: Tier.py:52
fcl_path
Definition: Tier.py:57
def setChainName(self, chain_name)
Definition: Tier.py:77
short_name
Definition: Tier.py:40
parent_in_sam
Definition: Tier.py:43
full_fcl
Definition: Tier.py:56
def prepareAndExecuteProcess(self, dry_run, n_events, verbose, log_file)
Definition: Tier.py:160
metric
Definition: Tier.py:54
output_name
Definition: Tier.py:46
precommand
Definition: Tier.py:47
inspect_only
Definition: Tier.py:42
multiple_outputs
Definition: Tier.py:50
Definition: Tier.py:1
procfile open("FD_BRL_v0.txt")
minimum_events
Definition: Tier.py:49
def __init__(self, kwargs)
Definition: Tier.py:36
TDirectory * dir
Definition: macro.C:5
input_name
Definition: Tier.py:44
def executeProcess(self, cmd_list, stdout_redirect)
Definition: Tier.py:191
def resident(pid, since=0.0)
multiply_events
Definition: Tier.py:48
T max(sqlite3 *const db, std::string const &table_name, std::string const &column_name)
Definition: statistics.h:66
def runTier(self, dry_run, n_events, verbose, log_file)
Definition: Tier.py:80
long_name
Definition: Tier.py:41
chain_name
Definition: Tier.py:78