ProjMan.py
Go to the documentation of this file.
1 import os
2 import socket
3 import samweb_client
4 import time
5 samweb = samweb_client.SAMWebClient(experiment="nova")
6 
7 import ifdh as ifdh_mod
8 ifdh = ifdh_mod.ifdh()
9 
10 
11 
12 
13 class Consumer:
14  def __init__(self, project, fileLimit=10, appName="PySAMFetcher", schema=""):
15  self.project = project
16  self.appName = appName
17  if "GRID_USER" in os.environ:
18  self.user = os.environ["GRID_USER"]
19  elif "USER" in os.environ:
20  self.user = os.environ["USER"]
21  else:
22  self.user = "unknown_user"
23 
24  self.appFamily = appName
25  self.appVersion = "v0_1"
26  self.location = socket.gethostname()
27 
28  if "CLUSTER" in os.environ and "PROCESS" in os.environ:
29  self.description = ".".join([os.environ["CLUSTER"], os.environ["PROCESS"]])
30  else:
31  self.description = "off_grid"
32 
33  self.fileLimit = fileLimit
34  self.schema = schema
35 
36  self.projecturi = None
37  self.procId = None
38  self.currentFile = None
39  self.consumingFile = False
40 
41  if not "_CONDOR_SCRATCH_DIR" in os.environ:
42  print "Didn't find _CONDOR_SCRATCH_DIR, setting it to /tmp/"
43  os.environ["_CONDOR_SCRATCH_DIR"] = "/tmp"
44  self.establish()
45 
46  @staticmethod
47  def getURI(project):
48  if project.startswith("http"):
49  projecturi = project
50  else:
51  if not "SAM_STATION" in os.environ:
52  raise Exception("Can't find $SAM_STATION, attempt to identify project URI failed.")
53  projecturi = ifdh.findProject(project, os.environ["SAM_STATION"])
54  return projecturi
55 
56  def establish(self):
57  if self.procId:
58  print "Process already established with process id: ", self.procId
59  return
60  self.projecturi = Consumer.getURI(self.project)
61  self.procId = ifdh.establishProcess(self.projecturi,
62  self.appName, self.appVersion,
63  self.location, self.user,
64  self.appFamily, self.description,
65  self.fileLimit, self.schema)
66 
67 
68  def nextFile(self):
69  anotherFile = ifdh.getNextFile(self.projecturi, self.procId)
70  # Returns an empty string if we're all done.
71  if not anotherFile:
72  return False
73  self.currentFile = anotherFile
74  return True
75 
76  def getFile(self):
77  if self.consumingFile:
78  raise Exception("Must call consumedFile() prior to getting another file.")
79 
80  if not self.currentFile:
81  self.nextFile()
82 
83  self.consumingFile = True
84  fileLocation = ifdh.fetchInput(self.currentFile)
85  ifdh.updateFileStatus(self.projecturi, self.procId, self.currentFile, "transferred")
86  return fileLocation
87 
88 
89  def consumedFile(self):
90  ifdh.updateFileStatus(self.projecturi, self.procId, self.currentFile, "consumed")
91  self.consumingFile = False
92  self.currentFile = None
93 
94  def setStatus(self, status):
95  ifdh.setStatus(self.projecturi, self.procId, status)
96 
97 
98  def __iter__(self):
99  while self.nextFile():
100  myFile = self.getFile()
101  yield myFile
102  self.consumedFile()
103 
104 
105 
106 
107 if __name__ == "__main__":
108 
109  p = Consumer("rocco_caf_fetch_test_project_02_17_18-56", fileLimit = 2)
110 
111  for fPath in p:
112  print "MY FILE IS:", fPath
113  print "IS IT A FILE?", os.path.isfile(fPath)
114 
def __iter__(self)
Definition: ProjMan.py:98
def getFile(self)
Definition: ProjMan.py:76
def setStatus(self, status)
Definition: ProjMan.py:94
def __init__(self, project, fileLimit=10, appName="PySAMFetcher", schema="")
Definition: ProjMan.py:14
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
def nextFile(self)
Definition: ProjMan.py:68
def getURI(project)
Definition: ProjMan.py:47
def establish(self)
Definition: ProjMan.py:56
def consumedFile(self)
Definition: ProjMan.py:89