Public Member Functions | Public Attributes | List of all members
Tier.Tier Class Reference

Public Member Functions

def __init__ (self, kwargs)
 
def __repr__ (self)
 
def setChainName (self, chain_name)
 
def runTier (self, dry_run, n_events, verbose, log_file)
 
def prepareAndExecuteProcess (self, dry_run, n_events, verbose, log_file)
 
def executeProcess (self, cmd_list, stdout_redirect)
 

Public Attributes

 short_name
 
 long_name
 
 inspect_only
 
 parent_in_sam
 
 input_name
 
 fcl
 
 output_name
 
 precommand
 
 multiply_events
 
 minimum_events
 
 multiple_outputs
 
 universal_fcl_mod
 
 parse_timing_db
 
 metric
 
 log
 
 full_fcl
 
 fcl_path
 
 chain_name
 

Detailed Description

Tiers are used so that the execution of multiple tiers can be 
processed inside a single loop. Tier-to-tier differences are small 
and can be configured based on a small number of options.
The options defined are:
    inspect_only  : This means we only look at the input file and
                    do not try and run an art process. Typically this
                    is used to study an input file, such as that coming
                    off of the detector. It can also be used for 
                    starting the production chain off at a higher tier, 
                    providing an appropriate input file is provided.
    parent_in_sam : If the input file is in SAM we can grab it's meta-
                    data from there.
    input_name    : Input file name.
    fcl           : FHiCL to run.
    output_name   : Output file name. Note this only works if there is 
                    only a single, not TFileService output. Exclusive 
                    with multiple_outputs.
    multiple_outputs : An array of output names for the case where a 
                       tier produces multiple output files. Exclusive 
                       with output name.
    precommand    : Use subprocess to exectute this shell command before 
                    running the tier. This is used for example to 
                    generate simulation generation FHiCLs.
    multiply_events : Multiple the number of events requested for this 
                      stage by this factor. This is used for example for
                      ND rock overlay events where a large number of 
                      these need to be overlaid onto a single beam 
                      event.

Definition at line 1 of file Tier.py.

Constructor & Destructor Documentation

def Tier.Tier.__init__ (   self,
  kwargs 
)
Constructor

Definition at line 36 of file Tier.py.

36  def __init__(self, **kwargs):
37  """
38  Constructor
39  """
40  self.short_name = kwargs.get("short_name", "blank_tier")
41  self.long_name = kwargs.get("long_name", "A blank tier")
42  self.inspect_only = kwargs.get("inspect_only", False)
43  self.parent_in_sam = kwargs.get("parent_in_sam", False)
44  self.input_name = kwargs.get("input_name", False)
45  self.fcl = kwargs.get("fcl", False)
46  self.output_name = kwargs.get("output_name", False)
47  self.precommand = kwargs.get("precommand", False)
48  self.multiply_events = kwargs.get("multiply_events", 1)
49  self.minimum_events = kwargs.get("minimum_events", 1)
50  self.multiple_outputs = kwargs.get("multiple_outputs", False)
51  self.universal_fcl_mod = kwargs.get("universal_fcl_mod",True)
52  self.parse_timing_db = kwargs.get("parse_timing_db", True)
53  #self.verbose = kwargs.get("verbose", False)
54  self.metric = self.Metric()
55  self.log = ""
56  self.full_fcl = ""
57  self.fcl_path = ""
58 
universal_fcl_mod
Definition: Tier.py:51
parse_timing_db
Definition: Tier.py:52
fcl_path
Definition: Tier.py:57
short_name
Definition: Tier.py:40
parent_in_sam
Definition: Tier.py:43
full_fcl
Definition: Tier.py:56
metric
Definition: Tier.py:54
output_name
Definition: Tier.py:46
precommand
Definition: Tier.py:47
inspect_only
Definition: Tier.py:42
multiple_outputs
Definition: Tier.py:50
minimum_events
Definition: Tier.py:49
def __init__(self, kwargs)
Definition: Tier.py:36
input_name
Definition: Tier.py:44
multiply_events
Definition: Tier.py:48
long_name
Definition: Tier.py:41

Member Function Documentation

def Tier.Tier.__repr__ (   self)

Definition at line 59 of file Tier.py.

References Tier.Tier.fcl, Tier.Tier.input_name, Tier.Tier.inspect_only, BatchLog.BatchLog.log, Tier.Tier.log, Tier.Tier.long_name, Tier.Tier.minimum_events, Tier.Tier.multiple_outputs, Tier.Tier.multiply_events, Metric.Metric.output_name, Tier.Tier.output_name, Tier.Tier.parent_in_sam, Tier.Tier.parse_timing_db, Tier.Tier.precommand, Tier.Tier.short_name, and Tier.Tier.universal_fcl_mod.

59  def __repr__(self):
60  _repr = "tier : Tier class\n"
61  _repr += "tier : short_name %s\n"%(repr(self.short_name))
62  _repr += "tier : long_name %s\n"%(repr(self.long_name))
63  _repr += "tier : inspect_only %s\n"%(repr(self.inspect_only))
64  _repr += "tier : parent_in_sam %s\n"%(repr(self.parent_in_sam))
65  _repr += "tier : input_name %s\n"%(repr(self.input_name))
66  _repr += "tier : fcl %s\n"%(repr(self.fcl))
67  _repr += "tier : output_name %s\n"%(repr(self.output_name))
68  _repr += "tier : precommand %s\n"%(repr(self.precommand))
69  _repr += "tier : multiply_events %s\n"%(repr(self.multiply_events))
70  _repr += "tier : minimum_events %s\n"%(repr(self.minimum_events))
71  _repr += "tier : multiple_outputs %s\n"%(repr(self.multiple_outputs))
72  _repr += "tier : universal fcl mod: %s\n"%(repr(self.universal_fcl_mod))
73  _repr += "tier : parse timing db: %s\n"%(repr(self.parse_timing_db))
74  _repr += "tier : log lines %s" %(len(self.log))
75  return _repr
76 
def __repr__(self)
Definition: Tier.py:59
universal_fcl_mod
Definition: Tier.py:51
parse_timing_db
Definition: Tier.py:52
short_name
Definition: Tier.py:40
parent_in_sam
Definition: Tier.py:43
output_name
Definition: Tier.py:46
precommand
Definition: Tier.py:47
inspect_only
Definition: Tier.py:42
multiple_outputs
Definition: Tier.py:50
minimum_events
Definition: Tier.py:49
input_name
Definition: Tier.py:44
multiply_events
Definition: Tier.py:48
long_name
Definition: Tier.py:41
def Tier.Tier.executeProcess (   self,
  cmd_list,
  stdout_redirect 
)

Definition at line 191 of file Tier.py.

References Tier.Tier.chain_name, dir, Tier.Tier.input_name, makeTrainCVSamples.int, BatchLog.BatchLog.log, Tier.Tier.log, cet::sqlite.max(), Tier.Tier.multiple_outputs, Tier.Tier.parse_timing_db, MemoryMonitor.resident(), and Tier.Tier.short_name.

Referenced by Tier.Tier.prepareAndExecuteProcess().

191  def executeProcess(self, cmd_list, stdout_redirect):
192  if "MemoryMonitor" not in dir(): import MemoryMonitor
193  if "subprocess" not in dir(): import subprocess
194  if "resource" not in dir(): import resource
195  if "time" not in dir(): import time
196  if "sys" not in dir(): import sys
197  if "os" not in dir(): import os
198  # Try and import psutil and enable advanced monitoring
199  advanced = False
200  if "psutil" not in dir():
201  try:
202  import psutil
203  advanced = True
204  except ImportError:
205  print "tier : psutils not found, advanced features disabled"
206  self.metric.run = True
207  if self.input_name and self.input_name[-4:]=="root":
208  self.metric.input_events = self.tools.countEventsInRootFile(self.input_name)
209  usage_start = resource.getrusage(resource.RUSAGE_CHILDREN)
210  # set memory limits for the process
211  MEMORY_LIMIT = 2.9*1024*1024*1024
212 
213  if stdout_redirect:
214  stdout = subprocess.PIPE
215  stderr = subprocess.STDOUT
216  else:
217  stdout = None
218  stderr = None
219  process = subprocess.Popen(cmd_list, stdout=stdout, stderr=stderr)
220  SLICE_IN_SECONDS = 1.
221  memory_usage = []
222  if advanced:
223  p = psutil.Process(process.pid)
224  self.metric.enable_advanced_metrics()
225 
226  start_time = time.time()
227  previous_sample_time = 0.
228  lines = []
229  sample_times = []
230  while True:
231  if stdout_redirect: line = process.stdout.readline()
232  else: line = ""
233  elapsed_time = time.time()-start_time
234  if (line != "") or ((elapsed_time - previous_sample_time) > SLICE_IN_SECONDS):
235  memory_usage.append(MemoryMonitor.resident(process.pid))
236  sample_times.append(elapsed_time)
237  if advanced: self.metric.fill_advanced_metrics(p)
238  if line != "":
239  lines.append("TimeMarker: %f\n"%(elapsed_time))
240  lines.append(line)
241  previous_sample_time = elapsed_time
242  if elapsed_time > 60: SLICE_IN_SECONDS = 10.
243  if elapsed_time > 60*60: SLICE_IN_SECONDS = 60.
244 
245  # limit memory usage
246  if memory_usage[-1] > MEMORY_LIMIT:
247  line = "Tier : Subprocess exceeded memory limit of %i (current usage: %i), so was killed\n"%\
248  (MEMORY_LIMIT,memory_usage[-1])
249  print line
250  sys.stderr.write(line)
251  lines.append(line)
252  process.kill()
253  if line == '' and process.poll() != None: break
254  # if process.poll() != None: break
255 
256  if stdout_redirect:
257  for line in lines:
258  stdout_redirect.write(line)
259  self.log += line
260  stdout_redirect.close()
261 
262  return_code = process.returncode
263  if return_code != 0:
264  print "tier : Exception caught while executing nova executable, return code: %i"%return_code
265 
266  self.metric.return_code = return_code
267  self.metric.sample_times = sample_times
268  usage_end = resource.getrusage(resource.RUSAGE_CHILDREN)
269 
270  u_cpu_time = usage_end.ru_utime - usage_start.ru_utime
271  s_cpu_time = usage_end.ru_stime - usage_start.ru_stime
272  max_memory = max(memory_usage)
273 
274  #memory_usage = convertSize(usage_end.ru_maxrss * resource.getpagesize())
275  # By manual inspection of the memory usage in top I believe ru_maxrss is returning a size in kb
276  #memory_usage = convertSize(usage_end.ru_maxrss * 1024 )
277  #print usage_end.ru_maxrss, resource.getpagesize(), usage_end.ru_maxrss * resource.getpagesize(), memory_usage
278 
279  self.metric.user_cpu = u_cpu_time
280  self.metric.system_cpu = s_cpu_time
281  self.metric.peak_memory = max_memory
282  self.metric.memory = memory_usage
283 
284  if "-n" in cmd_list:
285  self.metric.used_events = int(cmd_list[cmd_list.index("-n")+1])
286 
287  if "-o" in cmd_list:
288  if return_code == 0:
289  self.metric.output_name = [cmd_list[cmd_list.index("-o")+1]]
290  output_size, size_float = self.tools.inspectFile(self.metric.output_name[0])
291  self.metric.output_size = [size_float]
292  self.metric.output_events = [self.tools.countEventsInRootFile(self.metric.output_name[0])]
293  self.metric.file_size_ana = [self.tools.fileSizeAna(self.metric.output_name[0])]
294  # Parse DB usage from the output file
295  # note this only works if we have pushed the STDOUT to a files
296  # if we haven't then we fill a blank metric
297  if stdout_redirect:
298  self.metric.db_usage = self.db_parser.parseSTDOUT(stdout_redirect.name)
299  else:
300  self.metric.db_usage = self.db_parser.DBUsage()
301 
302  if self.multiple_outputs and (return_code == 0):
303  self.metric.output_name = []
304  self.metric.output_events = []
305  self.metric.output_size = []
306  self.metric.file_size_ana = []
307 
308  for output in self.multiple_outputs:
309  print "tier : Checking output: %s"%output
310  if "*" in output:
311  file_name = self.tools.findWildcardFile(output)
312  if file_name: output = file_name
313  try:
314  output_size, size_float = self.tools.inspectFile(output)
315  except NameError:
316  print "tier : Output file: %s has not been created"%output
317  return return_code
318  self.metric.output_name.append(output)
319  self.metric.output_size.append(size_float)
320  self.metric.output_events.append(self.tools.countEventsInRootFile(output))
321  self.metric.file_size_ana.append(self.tools.fileSizeAna(output))
322 
323  # Parse DB files
324  if self.parse_timing_db and os.path.exists("timing.db"):
325  print "tier : Parse timing DB"
326  import ViewTimingDB as view
327  json_name = "timing_db_%s_%s.json"%(self.chain_name,self.short_name)
328  cmd = "plot_timetracker -d timing.db -b -q -J %s"%json_name
329  print "tier : cmd: %s"%cmd
330  cmd_list = cmd.split()
331  try:
332  db_return_code = subprocess.check_call(cmd_list)
333  except subprocess.CalledProcessError:
334  print "tier : Error, parse timing DB failed"
335  print "tier : parse timing DB completed with return code: %i"%db_return_code
336  if db_return_code == 0:
337  print "tier : - converting to CSV"
338  csv_name = "timing_db_%s_%s.csv" %(self.chain_name,self.short_name)
339  timing_db = view.ViewTimingDB(json_name)
340  timing_db.write_csv(csv_name)
341 
342  return return_code
343  # return 0
344 
parse_timing_db
Definition: Tier.py:52
short_name
Definition: Tier.py:40
multiple_outputs
Definition: Tier.py:50
TDirectory * dir
Definition: macro.C:5
input_name
Definition: Tier.py:44
def executeProcess(self, cmd_list, stdout_redirect)
Definition: Tier.py:191
def resident(pid, since=0.0)
T max(sqlite3 *const db, std::string const &table_name, std::string const &column_name)
Definition: statistics.h:68
chain_name
Definition: Tier.py:78
def Tier.Tier.prepareAndExecuteProcess (   self,
  dry_run,
  n_events,
  verbose,
  log_file 
)

Definition at line 160 of file Tier.py.

References Tier.Tier.executeProcess(), Tier.Tier.fcl, Tier.Tier.fcl_path, Tier.Tier.full_fcl, Tier.Tier.input_name, Tier.Tier.metric, open(), Metric.Metric.output_name, Tier.Tier.output_name, and submit_syst.str.

Referenced by Tier.Tier.runTier().

160  def prepareAndExecuteProcess(self, dry_run, n_events, verbose, log_file):
161  cmd_list = ["nova"]
162  cmd_list.append("-c")
163  cmd_list.append(self.fcl)
164  self.fcl_path = self.tools.findFHiCL(self.fcl)
165  if self.fcl_path != "":
166  self.full_fcl = self.tools.fileToString(self.fcl_path)
167  if self.output_name:
168  cmd_list.append("-o")
169  cmd_list.append(self.output_name)
170  if n_events:
171  cmd_list.append("-n")
172  cmd_list.append(str(n_events))
173  if self.input_name: cmd_list.append(self.input_name)
174  cmd = " ".join(cmd_list)
175  print "tier : cmd: %s"%cmd
176 
177  if not dry_run:
178  print "tier : running..."
179  stdout_redirect = False
180  stderr_redirect = False
181  if log_file:
182  stdout_redirect = open("%s.out"%log_file, "w")
183  print "tier : writing stdout/err to: %s"%stdout_redirect.name
184 
185  return_code = self.executeProcess(cmd_list, stdout_redirect)
186 
187  print "tier : done, got return code: %i"%(return_code)
188  return self.metric
189  return False
190 
fcl_path
Definition: Tier.py:57
full_fcl
Definition: Tier.py:56
def prepareAndExecuteProcess(self, dry_run, n_events, verbose, log_file)
Definition: Tier.py:160
metric
Definition: Tier.py:54
output_name
Definition: Tier.py:46
procfile open("FD_BRL_v0.txt")
input_name
Definition: Tier.py:44
def executeProcess(self, cmd_list, stdout_redirect)
Definition: Tier.py:191
def Tier.Tier.runTier (   self,
  dry_run,
  n_events,
  verbose,
  log_file 
)

Definition at line 80 of file Tier.py.

References dir, Tier.Tier.fcl, Tier.Tier.input_name, Tier.Tier.inspect_only, Tier.Tier.metric, Tier.Tier.minimum_events, Tier.Tier.multiply_events, Tier.Tier.parent_in_sam, Tier.Tier.parse_timing_db, Tier.Tier.precommand, Tier.Tier.prepareAndExecuteProcess(), and Tier.Tier.universal_fcl_mod.

80  def runTier(self, dry_run, n_events, verbose, log_file):
81  if "os" not in dir(): import os
82 
83  # Run pre command if requested
84  if self.precommand:
85  print "tier : running precommand: %s"%self.precommand
86  if "subprocess" not in dir(): import subprocess
87  cmd_list = self.precommand.split()
88  try:
89  return_code = subprocess.check_call(cmd_list)
90  except subprocess.CalledProcessError:
91  print "tier : Error, precommand: %s failed"%self.precommand
92  self.metric.return_code = 1
93  return self.metric
94  if verbose: print "tier : precommand completed with return code: %i"%return_code
95 
96  # Run universal modify fcl if required
97  if self.universal_fcl_mod:
98  print "tier : running universal modify fcl"
99  if "subprocess" not in dir(): import subprocess
100  mod_fcl_name = "%s_mod.fcl"%self.fcl.split("/")[-1][:-4]
101  cmd = "python modifyFHiCL.py -b %s -f %s -a -v"%(self.fcl, mod_fcl_name)
102  print "tier : cmd: %s"%cmd
103  self.fcl = mod_fcl_name
104  cmd_list = cmd.split()
105  try:
106  return_code = subprocess.check_call(cmd_list)
107  except subprocess.CalledProcessError:
108  print "tier : Error, universal modify fcl"
109  self.metric.return_code = 1
110  return self.metric
111  if verbose: print "tier : universal modify fcl completed with return code: %i"%return_code
112 
113  # Parse DB files setup
114  if self.parse_timing_db:
115  if os.path.exists("timing.db"):
116  os.system("rm timing.db")
117 
118  # Inspect only is used to study files coming off of the detector
119  if self.inspect_only:
120  self.metric.output_name = "n/a"
121  self.metric.run = True
122  try:
123  size_string, size_float = self.tools.inspectFile(self.input_name)
124  self.metric.output_size = size_float
125  except NameError:
126  pass
127 
128  if self.parent_in_sam:
129  input_metadata = self.tools.getSAMMetaData(self.input_name.split("/")[-1])
130  n_events = input_metadata["Online.TotalEvents"]
131  self.metric.input_events = n_events
132 
133  return self.metric
134 
135  # Multiple number of events if requested
136  if self.multiply_events != 1:
137  if verbose: print "tier : Multiplying the number of events requested %i by %i to %i"%(n_events, self.multiply_events, n_events*self.multiply_events)
138  n_events = self.multiply_events*n_events
139 
140  # Minimum number of events
141  if self.minimum_events != 1:
142  if n_events < self.minimum_events:
143  if verbose: print "tier : Minimum number of events %i is greater than requested number of events %i, so using minimum"%(self.minimum_events, n_events)
144  n_events = self.minimum_events
145 
146  # For all other tiers we run an art job
147  # Check the input file exists if we need one
148  if self.input_name:
149  if "*" in self.input_name:
150  file_name = self.tools.findWildcardFile(self.input_name)
151  if file_name: self.input_name = file_name
152  if not (os.path.exists(self.input_name)):
153  print "tier : Input file %s not found, exiting"%self.input_name
154  return self.metric
155  assert ( self.fcl )
156 
157  self.prepareAndExecuteProcess(dry_run, n_events, verbose, log_file)
158  return self.metric
159 
universal_fcl_mod
Definition: Tier.py:51
parse_timing_db
Definition: Tier.py:52
parent_in_sam
Definition: Tier.py:43
def prepareAndExecuteProcess(self, dry_run, n_events, verbose, log_file)
Definition: Tier.py:160
metric
Definition: Tier.py:54
precommand
Definition: Tier.py:47
inspect_only
Definition: Tier.py:42
minimum_events
Definition: Tier.py:49
TDirectory * dir
Definition: macro.C:5
input_name
Definition: Tier.py:44
multiply_events
Definition: Tier.py:48
def runTier(self, dry_run, n_events, verbose, log_file)
Definition: Tier.py:80
def Tier.Tier.setChainName (   self,
  chain_name 
)

Definition at line 77 of file Tier.py.

77  def setChainName(self, chain_name):
78  self.chain_name = chain_name
79 
def setChainName(self, chain_name)
Definition: Tier.py:77
chain_name
Definition: Tier.py:78

Member Data Documentation

Tier.Tier.chain_name

Definition at line 78 of file Tier.py.

Referenced by Tier.Tier.executeProcess().

Tier.Tier.fcl
Tier.Tier.fcl_path

Definition at line 57 of file Tier.py.

Referenced by Tier.Tier.prepareAndExecuteProcess().

Tier.Tier.full_fcl

Definition at line 56 of file Tier.py.

Referenced by Tier.Tier.prepareAndExecuteProcess().

Tier.Tier.input_name
Tier.Tier.inspect_only

Definition at line 42 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.runTier().

Tier.Tier.log

Definition at line 55 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.executeProcess().

Tier.Tier.long_name

Definition at line 41 of file Tier.py.

Referenced by Tier.Tier.__repr__().

Tier.Tier.metric

Definition at line 54 of file Tier.py.

Referenced by Tier.Tier.prepareAndExecuteProcess(), and Tier.Tier.runTier().

Tier.Tier.minimum_events

Definition at line 49 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.runTier().

Tier.Tier.multiple_outputs

Definition at line 50 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.executeProcess().

Tier.Tier.multiply_events

Definition at line 48 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.runTier().

Tier.Tier.output_name

Definition at line 46 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.prepareAndExecuteProcess().

Tier.Tier.parent_in_sam

Definition at line 43 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.runTier().

Tier.Tier.parse_timing_db

Definition at line 52 of file Tier.py.

Referenced by Tier.Tier.__repr__(), Tier.Tier.executeProcess(), and Tier.Tier.runTier().

Tier.Tier.precommand

Definition at line 47 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.runTier().

Tier.Tier.short_name

Definition at line 40 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.executeProcess().

Tier.Tier.universal_fcl_mod

Definition at line 51 of file Tier.py.

Referenced by Tier.Tier.__repr__(), and Tier.Tier.runTier().


The documentation for this class was generated from the following file: