Component.py
Go to the documentation of this file.
1 import os
2 import subprocess
3 import time
4 import datetime
5 import common_tools as tools
6 import SAMView as sam_view
7 import glob
8 
9 class Component():
10 
11  def __init__(self, config_dict, release="development", local=False, verbose=False, user=False):
12  self.local = local
13  self.verbose = verbose
14  self.user = user
15  # compulsory augments
16  self.name = config_dict["name"]
17  self.version = int(config_dict["version"])
18  self.script = config_dict["script"]
19  self.dataset = config_dict["dataset"]
20  # optional with false defaults
21  optional_attributes = ["datasets","local_dataset","local_datasets","sam","directories"]
22  for attr in optional_attributes:
23  if attr in config_dict.keys():
24  setattr(self, attr, config_dict[attr])
25  else:
26  setattr(self, attr, False)
27  # optional with non-false defaults
28  if "label" in config_dict.keys():
29  self.label = config_dict["label"]
30  else:
31  self.label = self.dataset
32 
33  if "release" in config_dict.keys():
34  self.release = config_dict["release"]
35  else:
36  self.release = release
37 
38  string_attributes = ["n_jobs","files_per_job"]
39  for attr in string_attributes:
40  if attr in config_dict.keys():
41  setattr(self, attr, config_dict[attr])
42  else:
43  setattr(self, attr, "")
44 
45  self.status = "initialised"
46  self.sam_merge_status = "not run"
47 
48  if (self.local or self.sam):
49  self.jobsub_command = "sh %s"%(os.environ["NOVAANAVALID_DIR"])
50  else:
51  self.jobsub_command = "jobsub_submit -G nova --resource-provides=usage_model=DEDICATED,OPPORTUNISTIC --expected-lifetime=3600s --OS=SL6 file://%s"%(os.environ["NOVAANAVALID_DIR"])
52 
53  self.get_dataset()
54 
55  def run(self):
56  self.status = "run"
57 
58  self.command = "%s/%s %s %s %s %s %s %s %s"%(self.jobsub_command, self.script, self.user, self.release, self.script_dataset, self.output_dir, self.name, self.n_jobs, self.files_per_job)
59  if self.verbose: print "comp : command: %s"%self.command
60 
61  process = subprocess.Popen(self.command.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
62 
63  output = process.stdout
64  self.lines = output.readlines()
65  self.good_submission = False
66  for line in self.lines:
67  if self.verbose: print line.strip()
68  if "JobsubJobId of first job" in line: self.good_submission = True
69 
70  if (not self.good_submission):
71  print "comp : ERROR in submission"
72  self.status = "submission error"
73 
74  self.run_time = datetime.datetime.now()
75 
76  def get_dataset(self):
77  if self.local_datasets: self.script_dataset = " ".join(self.local_datasets)
78  elif self.datasets: self.script_dataset = " ".join(self.datasets)
79  elif self.local_dataset: self.script_dataset = self.local_dataset
80  else: self.script_dataset = self.dataset
81 
82  def setup_sam_view(self):
83  sam_url = tools.getSAMURL(self.lines)
84  if self.verbose: print "comp : found sam URL: %s"%sam_url
85  self.sam_view = sam_view.SamView(sam_url, self.output_dir, verbose=self.verbose)
86 
87  def get_sam_status(self, sam):
88  if (not hasattr(self, "sam_view")):
89  self.setup_sam_view()
90 
92  return self.sam_view.get_status(sam, n_files=self.n_files)
93 
95  # work out the number of files we're expecting
96  if (self.files_per_job == ""):
97  n_files = False
98  elif (self.files_per_job != "") and (self.n_jobs == ""):
99  # in this case the number of n_files = files_per_job * n_jobs
100  # as n_jobs isn't set here it is determined at run time:
101  # the default way to do this is as N_TOTAL_FILES / 100
102  total_files = sam.countFiles(dimensions="snapshot_id %s"%self.sam_view.sam_summary["snapshot_id"])
103  self.n_jobs = int(total_files / 100)
104  n_files = self.n_jobs * self.files_per_job
105  else:
106  n_files = self.n_jobs * self.files_per_job
107  return n_files
108 
110  if (not self.sam):
111  return "not applicable"
112  if (not hasattr(self, "sam_merge_status")):
113  self.sam_merge_status = "not run"
114  return self.sam_merge_status
115 
116  def run_sam_merge(self):
117  if hasattr(self, "sam_merge_status") and (self.sam_merge_status != "not run"):
118  if (self.sam_merge_status != "complete"):
119  print "comp : SAM merge has already run, status: %s"%self.sam_merge_status
120  return
121 
122  self.sam_merge_status = "running"
123  output_files = glob.glob("%s/*.root"%self.output_dir)
124  if (not len(output_files)):
125  self.sam_merge_status = "no files"
126  return
127  self.root_file = "%s/hadd_%s_%s.root"%(self.output_dir, self.name, self.version)
128  cmd = 'python %s/core/utils/hadd_caf.py -i %s/ -w * -o %s -s'%(os.environ["NOVAANAVALID_DIR"], self.output_dir, self.root_file)
129 
130  process = subprocess.Popen(cmd.split(" "), stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
131  lines = []
132  while True:
133  line = process.stdout.readline()
134  if line != "": lines.append(line)
135  if line == '' and process.poll() != None: break
136  # time.sleep(0.1)
137 
138  if (process.returncode != 0):
139  print "comp : --- Exception caught while executing sam merge, return code: %i"%process.returncode
140  print "comp : command run: %s"%cmd
141  for line in lines:
142  print line.strip()
143  self.sam_merge_status = "failed: return code = %i"%process.returncode
144 
145  elif (not os.path.exists(self.root_file)):
146  self.sam_merge_status = "failed: no output file"
147  else:
148  self.sam_merge_status = "complete"
149 
150 
def get_dataset(self)
Definition: Component.py:76
def get_sam_status(self, sam)
Definition: Component.py:87
def run_sam_merge(self)
Definition: Component.py:116
def get_sam_merge_status(self)
Definition: Component.py:109
def setup_sam_view(self)
Definition: Component.py:82
def get_expected_sam_output_files(self, sam)
Definition: Component.py:94
def __init__(self, config_dict, release="development", local=False, verbose=False, user=False)
Definition: Component.py:11