filesource.py
Go to the documentation of this file.
1 import os
2 import subprocess
3 import sys
4 from datetime import datetime
5 from itertools import count
6 import time
7 import re
8 import hashlib
9 
10 # Contains classes that return a generator over a file source
11 
12 # simple wrapper around list of files provided by the user
13 class listsource():
14  def __init__(self, filelist):
15  self.files = filelist
16  self.gen = self.getnextfile()
17 
18  def getnextfile(self):
19  for f in self.files:
20  assert os.path.isfile(f), "File {} doesn't exist!".format(f)
21  yield f
22 
23  def __call__(self):
24  return next(self.gen)
25 
26  def nFiles(self):
27  return len(self.files)
28 
29 # pass a glob instead
30 class globsource():
31  def __init__(self, globstr, stride=1, offset=0, limit=None):
32  import glob
33  self.files = glob.glob(globstr)[offset::stride]
34  if limit: self.files = self.files[:limit]
35  self.gen = self.getnextfile()
36 
37  def getnextfile(self):
38  for f in self.files:
39  assert os.path.isfile(f), "File {} doesn't exist!".format(f)
40  yield f
41 
42  def __call__(self):
43  return next(self.gen)
44 
45  def nFiles(self):
46  return len(self.files)
47 
48 # sam project source
50  def __init__(self, projname, limit = -1):
51  from ifdh import ifdh
52  import samweb_client
53  self.SAM = samweb_client.SAMWebClient(experiment='nova')
54  self.ifdh = ifdh()
55 
56  self.limit = limit
57  self.nfiles = limit
58  self.projname = projname
59  self.hasTicket = False
60  self.projurl = ''
61  self.processID = 0
62  self.currfile = 0
63 
64  self.isgrid = os.getenv('_CONDOR_SCRATCH_DIR') is not None
65 
66  self.setup()
67  self.gen = self.getnextfile()
68 
69  def setup(self):
70  # self.checkproxy() # turning this off temporarily before I figure our what's going wrong with kx509 on the grid, seemingly only for pandana
71  self.establishProcess()
72 
73  def checkproxy(self):
74  try:
75  # subprocess.check_output(["klist", "-l"])
76  check = subprocess.check_call(["klist", "-s"]) or subprocess.check_call(["klist", "-5", "-s"])
77  if not check:
78  self.hasTicket = True
79  except subprocess.CalledProcessError:
80  try:
81  subprocess.check_call(["setup_fnal_security", "-b"])
82  self.hasTicket = True
83  except subprocess.CalledProcessError:
84  self.hasTicket = False
85  print (sys.exit(2), "Authentication failed. Please run setup_fnal_security -k")
86  if not os.getenv("X509_USER_PROXY"):
87  uid = subprocess.check_output(["id", "-u"]).strip('\n')
88  os.environ["X509_USER_PROXY"] = "/tmp/x509up_u"+uid
89 
90  def establishProcess(self):
91  self.ifdh.set_debug('0')
92 
93  # find user-provided project and establish an ifdh process over it
94  self.projurl = self.ifdh.findProject(self.projname, "nova")
95  userstr = os.getenv('USER') or os.getenv('GRID_USER')
96  self.processID = self.ifdh.establishProcess(self.projurl, "PandAna", "0.1",
97  os.getenv('HOSTNAME'),
98  userstr, "nova", "", self.limit)
99  print("Connecting to project %s with process %s" % (self.projurl, self.processID))
100  if self.nfiles < 0:
101  self.nfiles = self.SAM.projectSummary(self.projname)['files_in_snapshot']
102 
103 
104  def getnextfile(self):
105  while True:
106  if(self.currfile):
107  # set status to consumed
108  os.unlink(self.currfile)
109  self.ifdh.updateFileStatus(self.projurl, self.processID, self.currfile, "consumed")
110  uri = self.ifdh.getNextFile(self.projurl, self.processID)
111  # end of project
112  if not uri:
113  self.ifdh.endProcess(self.projurl, self.processID)
114  self.ifdh.cleanup()
115 
116  # If running interactively, just stop the project.
117  # On the grid, there may be other processes running over this project.
118  # Which is why we're only stopping it once all the files in the project are used up
119  if not self.isgrid:
120  self.SAM.stopProject(self.projurl)
121  # else:
122  # summary = self.SAM.projectSummary(self.projname)
123  # nused = summary['file_counts']['consumed'] + summary['file_counts']['failed']
124  # ntot = summary['files_in_snapshot']
125  # if nused == ntot:
126  # self.SAM.stopProject(self.projurl)
127  # Project stats needs some delay to show the correct status once it ends
128  time.sleep(5)
129  self.processStats(self.processID)
130  break
131 
132  # fetch the file
133  self.currfile = self.ifdh.fetchInput(uri)
134  assert self.currfile
135 
136  self.ifdh.updateFileStatus(self.projurl, self.processID, self.currfile, "transferred")
137  yield self.currfile
138 
139  def __call__(self):
140  return next(self.gen)
141 
142  def nFiles(self):
143  return self.nfiles
144 
145  def processStats(self, processID):
146  stats = self.SAM.projectSummary(self.projname)
147  process_stats = stats.pop('processes')
148  print process_stats
149  print processID
150  process = [k for k in process_stats if k['process_id'] == int(processID)]
151  if not process:
152  print("Warning!! Something went wrong. unable to get process information.")
153  print("\nProject summary :")
154  print("===================")
155  print(self.SAM.projectSummaryText(self.projname))
156  return
157  stats.update({'process':process[0]})
158  import yaml
159  print("\nProcess summary :")
160  print("===================")
161  print(yaml.dump(stats))
162  return
163 
164 # wrap sam queries around a project.
165 # The philosophy here is to start up a project for each SAM query and fetch each file to a temporary scratch location.
166 # This is better than allowing multiple users to open the same files directly from their location.
167 # CAFAna, in contrast, doesn't have a problem doing the above because the data is streamed via xrootd anyway, thus preventing concurrency issues.
168 # The pros of using projects are that we don't have to worry about taking snapshots ourselves and a project can be shared across all grid jobs for example
169 # The cons are that if we aren't careful the user can end up with many stale running projects and there's an experiment-wide cap
171  _instanceid = count(0)
172  # number of running projects allowed per user
173  _MAX_INT_PROJECTS = 10 #interactive
174  _MAX_GRID_PROJECTS = 100 #grid
175 
176  def __init__(self, query):
177  import samweb_client
178  self.SAMclient = samweb_client # need the whole module for exceptions
179  self.SAM = samweb_client.SAMWebClient(experiment='nova')
180 
181  self.query = query
182  # don't create separate projects if the query differs only by limit
183  if 'with limit' in self.query:
184  try:
185  self.limit = int(re.sub(r'^.* with limit ([0-9]*)$', '\\1', self.query))
186  except ValueError:
187  print ("Invalid limit number in SAM query!")
188  sys.exit(2)
189  else:
190  self.limit = -1
191 
192  # keep track of class instances in a given user macro
193  self.instanceid = next(samquerysource._instanceid)
194  self.user = os.getenv('USER') or os.getenv('GRID_USER')
195  self.checkDefinition = False
196  self.checkSnapshot = False
197  self.definition = None
198  self.snapshot_id = None
199 
200  self.isgrid = os.getenv('_CONDOR_SCRATCH_DIR') is not None
201 
202  self.setupProject()
203 
204  def isPlainDefinition(self):
205  # check if query doesn't have extraneous metadata fields
206  if not self.checkDefinition:
207  # strip defname/def_snapshot/dataset_def_name_newest_snapshot off of query
208  definition = re.sub(r'(dataset_def_name_newest_snapshot |defname: |def_snapshot | with limit [0-9]*$)', '', self.query)
209  self.isdefinition = len(self.SAM.listDefinitions(defname=definition)) > 0
210  self.checkDefinition = True
211  if self.isdefinition:
212  self.definition = definition
213  return self.isdefinition
214 
215  def isPlainSnapshot(self):
216  if not self.checkSnapshot:
217  # check if query asks for snapshot and get the snapshot id if so
218  self.issnapshot = bool(re.search(r'(dataset_def_name_newest_snapshot |def_snapshot |snapshot_id)', self.query))
219  if self.issnapshot:
220  if self.isPlainDefinition():
221  self.snapshot_id = self.SAM.snapshotInfo(defname=self.definition)['snapshot_id']
222  else:
223  try:
224  self.snapshot_id = int(re.sub('snapshot_id ([0-9]*) with limit [0-9]*$', '\\1', self.query))
225  except ValueError:
226  self.snapshot_id = None
227  self.issnapshot = False
228  pass
229  self.checkSnapshot = True
230 
231  return self.issnapshot
232 
233  def nProjWithPrefix(self, projname):
234  projlist = self.SAM.listProjects(defname=self.definition, snapshot_id=self.snapshot_id, state='running')
235  return [projname in l for l in projlist].count(True)
236 
237  def setupProject(self):
238  if not(self.isPlainDefinition() or self.isPlainSnapshot()):
239  # Query is something else. Let's create our own definition. The hashing strips out bad characters
240  batchname = re.sub(r' with limit [0-9]*$', '', self.query)
241  defname = "%s_pandana_defn_%s" % (self.user, hashlib.md5(batchname).hexdigest())
242 
243  if not self.SAM.listDefinitions(defname=defname):
244  self.SAM.createDefinition(defname=defname, dims=batchname)
245  self.definition = defname
246 
247  assert self.definition or self.snapshot_id
248  # use snapshot unless its not there
249  if self.snapshot_id:
250  self.definition = None
251  if self.definition:
252  self.snapshot_id = None
253 
254  # need a unique project name but it needs to be shared across each grid job.
255  # Therefore, create one per query per loader instance
256  batchname = re.sub(r' with limit [0-9]*$', '', self.query)
257  projname = "%s_pandana_proj%d_%s" % (self.user, self.instanceid, hashlib.md5(batchname).hexdigest())
258  uniqueid = "time"+datetime.now().strftime('%Y%m%d_%H%M%S')
259  checkprojid = "_time"
260  MAX_PROJECTS = samquerysource._MAX_INT_PROJECTS
261 
262  # CLUSTER is unique for every job. Therefore, create a new project for every new submission.
263  if self.isgrid:
264  projname += "_cluster%s" % os.getenv('CLUSTER')
265  checkprojid = ""
266  MAX_PROJECTS = samquerysource._MAX_GRID_PROJECTS
267 
268  # don't allow too many stale projects
269  if self.nProjWithPrefix(projname+checkprojid) > MAX_PROJECTS:
270  print (
271  """
272  More than %d projects are running for current query already.
273  Most likely, this is because of leftover projects from faulty/interrupted runs.
274  Use samweb list-projects --defname=%s --snapshot_id=%d --state=running | grep %s
275  and stop each one of them with samweb stop-project first and then re-run.
276  You can also call PandAna.utils.misc.StopAllUserProjects() in your script to
277  stop all your running interactive and grid projects
278  """ % (samquersource._MAX_PROJECTS, self.definition, self.snapshot_id, projname+checkprojid)
279  )
280  sys.exit(2)
281 
282  projlock = projname+"_lock"
283  # allow only one per cluster for a given loader instance
284  if not self.isgrid:
285  self.SAM.startProject(projname+"_"+uniqueid, defname=self.definition, snapshot_id=self.snapshot_id)
286  projname = projname+"_"+uniqueid
287  else:
288  try:
289  # lock creation of projects for other processes
290  self.SAM.createDefinition(projlock, "nova.special %s" % os.getenv('PROCESS'))
291  # No one's created our project. We can go for it
292  self.SAM.startProject(projname+"_"+uniqueid, defname=self.definition, snapshot_id=self.snapshot_id)
293  projname += "_"+uniqueid
294 
295  except self.SAMclient.exceptions.SAMWebConflict:
296  # waiting for another process to create it
297  period = 1
298  while not self.nProjWithPrefix(projname):
299  time.sleep(int(period))
300  period *= 1.5
301  if period > 60*30:
302  print ("""
303  We've been waiting a long time for the project to be created.
304  We don't think its happening.
305  """)
306  sys.exit(2)
307  assert self.nProjWithPrefix(projname) == 1, \
308  """
309  Multiple projects of the same loader instance running on the same cluster.
310  This shouldn't be possible
311  """
312  projlist = self.SAM.listProjects(defname=self.definition, snapshot_id=self.snapshot_id, state='running')
313  # Some other grid job has already created our project. Connect to it
314  projname = projlist[[projname in l for l in projlist].index(True)]
315 
316  # Establish an IFDH process over it
317  samprojectsource.__init__(self, projname, self.limit)
318  return
319 
321  def __init__(self, query, stride=1, offset = 0, limit = None):
322  self.query = query
323  self.stride = stride
324  self.offset = offset
325  self.limit = limit
326 
327  def sourceArgsID(self):
328  args_id = "stride%d_offset%d" % (self.stride, self.offset)
329  if self.limit:
330  args_id = "stride%d_offset%d_limit%d" % (self.stride, self.offset, self.limit)
331  return args_id
332 
333  def getQuery(self):
334  return self.query
335 
336  # define an id based on query and stride arguments to check if multiple loaders use the same source
337  def __eq__(self, other_source):
338  return (self.getQuery() == other_source.getQuery() and \
339  self.sourceArgsID() == other_source.sourceArgsID())
340 
341  def islist(self):
342  return type(self.query) is list
343 
344  def isglob(self):
345  return type(self.query) is str and ' ' not in self.query
346 
347  def isproj(self):
348  from ifdh import ifdh
349  i = ifdh()
350  url = i.findProject(self.query, "nova")
351  exitcode = 1
352  with open(os.devnull, 'wb') as null:
353  try:
354  # for some reason I'm not able to use ifdh projectStatus directly with subprocess.
355  exitcode = subprocess.check_call(['bash', '-c', 'ifdh projectStatus {}'.format(url)],
356  stdout=null, stderr=null)
357  except subprocess.CalledProcessError:
358  exitcode = 1
359  return not exitcode
360 
361  def issamquery(self):
362  return ' ' in self.query
363  # import samweb_client
364  # SAM = samweb_client.SAMWebClient(experiment='nova')
365  #
366  # try:
367  # SAM.listFilesSummary(dimensions=self.query)
368  # return True
369  # except samweb_client.exceptions.DimensionError:
370  # return False
371  # return False
372 
373  def __call__(self):
374  if self.islist():
375  if os.getenv('PANDANA_STRIDE'):
376  self.stride = int(os.getenv('PANDANA_STRIDE'))
377  if os.getenv('PANDANA_LIMIT'):
378  self.limit = int(os.getenv('PANDANA_LIMIT'))
379  if os.getenv('PANDANA_OFFSET'):
380  self.offset = int(os.getenv('PANDANA_OFFSET'))
381 
382  filelist = self.query[self.offset::self.stride]
383  if self.limit: filelist = filelist[:self.limit]
384  print ("Running over list of files")
385  return listsource(filelist)
386 
387  elif self.isglob():
388  if os.getenv('PANDANA_STRIDE'):
389  self.stride = int(os.getenv('PANDANA_STRIDE'))
390  if os.getenv('PANDANA_LIMIT'):
391  self.limit = int(os.getenv('PANDANA_LIMIT'))
392  if os.getenv('PANDANA_OFFSET'):
393  self.offset = int(os.getenv('PANDANA_OFFSET'))
394 
395  print ("Running over list of files matching glob")
396  return globsource(self.query, self.stride, self.offset, self.limit)
397 
398  elif self.isproj():
399  if self.stride > 1 or self.offset > 0:
400  print ("""
401  Can't use stride and offset for SAM projects.
402  Use a query instead to start an internal project
403  """)
404  sys.exit(2)
405  if os.getenv('PANDANA_LIMIT'):
406  self.limit = int(os.getenv('PANDANA_LIMIT'))
407 
408  if not self.limit: self.limit = -1
409 
410  print ("Running over SAM project with name %s" % self.query)
411  return samprojectsource(self.query, self.limit)
412 
413  elif self.issamquery():
414  if os.getenv('PANDANA_LIMIT'):
415  self.limit = int(os.getenv('PANDANA_LIMIT'))
416  if self.stride > 1:
417  self.query += ' with stride %d' % self.stride
418  if self.offset > 0:
419  self.query += ' with offset %d' % self.offset
420  if self.limit:
421  self.query += ' with limit %d' % self.limit
422 
423  print ("Running over list of files matching SAM query '%s'" % self.query)
424  return samquerysource(self.query)
425 
426  else:
427  print ("Invalid loader query!")
428  sys.exit(2)
if(dump)
bool print
def __init__(self, query, stride=1, offset=0, limit=None)
Definition: filesource.py:321
ifdh
Definition: ProjMan.py:8
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
procfile open("FD_BRL_v0.txt")
def __eq__(self, other_source)
Definition: filesource.py:337
def __init__(self, filelist)
Definition: filesource.py:14
def __init__(self, projname, limit=-1)
Definition: filesource.py:50
void next()
Definition: show_event.C:84
def __init__(self, globstr, stride=1, offset=0, limit=None)
Definition: filesource.py:31