projects.py
Go to the documentation of this file.
1 
2 import time, os, re
3 from samweb_client import json, convert_from_unicode
4 from samweb_client.client import samweb_method, get_version
5 from samweb_client.http_client import escape_url_path
6 from exceptions import *
7 
8 from itertools import ifilter
9 
10 @samweb_method
11 def listProjects(samweb, stream=False, **queryCriteria):
12  """ List projects matching query parameters
13  keyword arguments: passed as parameters to server
14  """
15  params = dict(queryCriteria)
16  params['format'] = 'plain'
17  result = samweb.getURL('/projects', params, stream=True)
18  output = ifilter( None, (l.strip() for l in result.iter_lines()) )
19  if stream: return output
20  else: return list(output)
21 
22 @samweb_method
23 def makeProjectName(samweb, description):
24  """ Make a suitable project name from the provided string """
25  description = description.replace(' ','_')
26  import time
27  now = time.strftime("%Y%m%d%H%M%S")
28  name = "%s_%s" % (description, now)
29  # check for the username, offset by _ or -
30  # if it's not there prepend it
31  if samweb.user and not re.search(r'(\A|[_-])%s(\Z|[_-])' % samweb.user, name):
32  name = '%s_%s' % (samweb.user, name)
33  return name
34 
35 @samweb_method
36 def startProject(samweb, project, defname=None, station=None, group=None, user=None, snapshot_id=None):
37  """ Start a project on a station. One of defname or snapshotid must be given
38  arguments:
39  project: project name
40  defname: definition name (default None)
41  station: station name (defaults to experiment name)
42  group: group name (defaults to experiment name)
43  user: user name (default is username from certificate)
44  snapshot_id: snapshot id (default None)
45  """
46 
47  if bool(defname) + bool(snapshot_id) != 1:
48  raise ArgumentError("Exactly one of definition name or snapshot id must be provided")
49 
50  if not station: station = samweb.get_station()
51  if not group: group = samweb.group
52  args = {'name':project, 'station':station, "group":group}
53  if defname: args["defname"] = defname
54  elif snapshot_id: args["snapshot_id"] = snapshot_id
55  if user: args["username"] = user
56  result = samweb.postURL('/startProject', args, secure=True)
57  projecturl = result.text.strip()
58  if projecturl.startswith('https'):
59  # prefer to use unencrypted project urls
60  projecturl = samweb.findProject(project, station)
61 
62  # could look up definition name/snapshot id instead
63  rval = {'project':project,'projectURL':projecturl}
64  if defname: rval["definition_name"] = defname
65  elif snapshot_id: rval["snapshot_id"] = snapshot_id
66  return rval
67 
68 @samweb_method
69 def findProject(samweb, project, station=None):
70  args = {'name':project}
71  if station: args['station'] = station
72  result = samweb.getURL('/findProject', args)
73  return result.text.strip()
74 
75 @samweb_method
76 def startProcess(samweb, projecturl, appfamily, appname, appversion, deliveryLocation=None, node=None,
77  user=None, maxFiles=None, description=None, schemas=None):
78  if not node:
79  # default for the node is the local hostname
80  import socket
81  node = socket.getfqdn()
82 
83  # if the user isn't given and we aren't using client certs, set it to the default
84  if not user and not projecturl.startswith('https:'):
85  user = samweb.user
86 
87  args = { "appname":appname, "appversion":appversion, "node" : node, }
88  if user:
89  args["username"] = user
90  if appfamily:
91  args["appfamily"] = appfamily
92  if maxFiles:
93  args["filelimit"] = maxFiles
94  if deliveryLocation:
95  args["deliverylocation"] = deliveryLocation
96  if description:
97  args["description"] = description
98  if schemas:
99  args["schemas"] = schemas
100  result = samweb.postURL(projecturl + '/establishProcess', args)
101  return result.text.strip()
102 
103 @samweb_method
104 def makeProcessUrl(samweb, projecturl, processid):
105  """ Make the process url from a project url and process id """
106  if not '://' in projecturl:
107  projecturl = samweb.findProject(projecturl)
108  return projecturl + '/process/' + str(processid)
109 
110 @samweb_method
111 def getNextFile(samweb, processurl, timeout=None):
112  """ get the next file from the project
113  arguments:
114  processurl: the process url
115  timeout: timeout after not obtaining a file in this many seconds. -1 to disable; 0 to return immediately; default is None (disabled)
116  """
117  url = processurl + '/getNextFile'
118  starttime = time.time()
119  while True:
120  result= samweb.postURL(url, data={})
121  code = result.status_code
122  data = result.text.strip()
123  if code == 202:
124  retry_interval = 10
125  retry_after = result.headers.get('Retry-After')
126  if retry_after:
127  try:
128  retry_interval = int(retry_after)
129  except ValueError: pass
130  if timeout is not None:
131  if timeout == 0:
132  return None
133  elif timeout > 0 and time.time() - starttime > timeout:
134  raise Timeout('Timed out after %d seconds' % (time.time() - starttime))
135  time.sleep(retry_interval)
136  elif code == 204:
137  raise NoMoreFiles()
138  else:
139  if 'application/json' in result.headers['Content-Type']:
140  return result.json()
141  lines = data.split('\n')
142  output = { "url" : lines[0] }
143  if len(lines) > 1: output["filename"] = lines[1]
144  else:
145  output["filename"] = os.path.basename(output["url"])
146  return output
147 
148 # old method
149 @samweb_method
150 def releaseFile(samweb, processurl, filename, status="ok"):
151  if status == "ok": status = "consumed"
152  else: status = "skipped"
153  return samweb.setProcessFileStatus(processurl, filename, status)
154 
155 # new method
156 @samweb_method
157 def setProcessFileStatus(samweb, processurl, filename, status="consumed"):
158  args = { 'filename' : filename, 'status':status }
159  return samweb.postURL(processurl + '/updateFileStatus', args).text.rstrip()
160 
161 @samweb_method
162 def stopProcess(samweb, processurl):
163  """ End an existing process """
164  samweb.postURL(processurl + '/endProcess')
165 
166 @samweb_method
167 def stopProject(samweb, projecturl):
168  if not '://' in projecturl:
169  projecturl = samweb.findProject(projecturl)
170  args = { "force" : 1 }
171  return samweb.postURL(projecturl + "/endProject", args).text.rstrip()
172 
173 @samweb_method
174 def projectSummary(samweb, projecturl):
175  if not '://' in projecturl:
176  projecturl = '/projects/name/%s' % escape_url_path(projecturl)
177  return convert_from_unicode(samweb.getURL(projecturl + "/summary").json())
178 
179 @samweb_method
180 def projectSummaryText(samweb, projecturl):
181  if not '://' in projecturl:
182  projecturl = '/projects/name/%s' % escape_url_path(projecturl)
183  return samweb.getURL(projecturl + "/summary", params=dict(format='plain')).text.rstrip()
184 
185 @samweb_method
186 def projectRecoveryDimension(samweb, projectnameorurl, useFileStatus=None, useProcessStatus=None):
187  """Get the dimensions to create a recovery dataset
188  arguments:
189  projectnameorurl : name or url of the project
190  useFileStatus : use the status of the last file seen by a process (default unset)
191  useProcessStatus : use the status of the process (default unset)
192  """
193  if not '://' in projectnameorurl:
194  projectnameorurl = "/projects/name/%s" % escape_url_path(projectnameorurl)
195  params = { "format" : "plain" }
196  if useFileStatus is not None: params['useFiles'] = useFileStatus
197  if useProcessStatus is not None: params['useProcess'] = useProcessStatus
198  return convert_from_unicode(samweb.getURL(projectnameorurl + "/recovery_dimensions", params=params).text.rstrip())
199 
200 @samweb_method
201 def setProcessStatus(samweb, status, projectnameorurl, processid=None, process_desc=None):
202  """ Mark the final status of a process
203 
204  Either the processid or the process description must be provided. If the description is
205  used it must be unique within the project
206 
207  arguments:
208  status: completed or bad
209  projectnameorurl: project name or url
210  processid: process identifier
211  process_desc: process description
212  """
213  if '://' not in projectnameorurl:
214  url = '/projects/name/%s' % escape_url_path(projectnameorurl)
215  else: url = projectnameorurl
216  args = { "status" : status }
217  if processid is not None:
218  url += '/processes/%s' % processid
219  elif process_desc is not None:
220  url += '/process_description/%s' % escape_url_path(process_desc)
221  else:
222  # assume direct process url
223  pass
224 
225  return samweb.putURL(url + "/status", args, secure=True).text.rstrip()
226 
227 @samweb_method
228 def runProject(samweb, projectname=None, defname=None, snapshot_id=None, callback=None,
229  deliveryLocation=None, node=None, station=None, maxFiles=0, schemas=None,
230  application=('runproject','runproject',get_version()), nparallel=1, quiet=False ):
231  """ Run a project
232 
233  arguments (use keyword arguments, all default to None):
234  projectname: the name for the project
235  defname: the defname to use
236  snapshot_id: snapshot_id to use
237  callback: a single argument function invoked on each file
238  deliveryLocation
239  node
240  station
241  maxFiles
242  schemas
243  application: a three element sequence of (family, name, version)
244  nparallel: number of processes to run in parallel
245  quiet: If true, suppress normal output
246  """
247 
248  if callback is None:
249  def _print(fileurl):
250  print fileurl
251  return True
252  callback = _print
253  if not projectname:
254  if defname:
255  projectname = samweb.makeProjectName(defname)
256  elif snapshot_id:
257  projectname = samweb.makeProjectName('snapshot_id_%d' % snapshot_id)
258  if quiet:
259  def write(s): pass
260  else:
261  import sys
262  write=sys.stdout.write
263 
264  project = samweb.startProject(projectname, defname=defname, snapshot_id=snapshot_id, station=station)
265  write("Started project %s\n" % projectname)
266 
267  projecturl = project['projectURL']
268  process_description = ""
269  appFamily, appName, appVersion = application
270 
271  if nparallel is None or nparallel < 2:
272  nparallel=1
273  if nparallel > 1:
274  import threading
275  maxFiles=(maxFiles+nparallel-1)//nparallel
276 
277 
278  def runProcess():
279  cpid = samweb.startProcess(projecturl, appFamily, appName, appVersion, deliveryLocation, node=node, description=process_description, maxFiles=maxFiles, schemas=schemas)
280  write("Started consumer processs ID %s\n" % (cpid,))
281  if nparallel > 1:
282  threading.currentThread().setName('CPID-%s' % cpid)
283  log_prefix = '%s: ' % threading.currentThread().getName()
284  else: log_prefix=''
285 
286  processurl = samweb.makeProcessUrl(projecturl, cpid)
287 
288  while True:
289  try:
290  newfile = samweb.getNextFile(processurl)['url']
291  try:
292  rval = callback(newfile)
293  except Exception, ex:
294  write('%s%s\n' % (log_prefix,ex))
295  rval = 1
296  except NoMoreFiles:
297  break
298  if rval: status = 'ok'
299  else: status = 'bad'
300  samweb.releaseFile(processurl, newfile, status)
301 
302  samweb.setProcessStatus('completed', processurl)
303 
304  if nparallel < 2:
305  runProcess()
306  else:
307  threads = []
308  for i in range(nparallel):
309  t = threading.Thread(target=runProcess, name='Thread-%02d' % (i+1,))
310  t.start()
311  threads.append(t)
312 
313  for t in threads: t.join()
314 
315  samweb.stopProject(projecturl)
316  write("Stopped project %s\n" % projectname)
317  return projectname
318 
319 @samweb_method
320 def prestageDataset(samweb, projectname=None, defname=None, snapshot_id=None, maxFiles=0, station=None, deliveryLocation=None, node=None, nparallel=1):
321  """ Prestage the given dataset. This is really the same as run-project with names set appropriately """
322 
323  if nparallel is None or nparallel < 2: nparallel = 1
324 
325  def prestage(fileurl):
326  if nparallel > 1:
327  import threading
328  prefix = '%s: ' % threading.currentThread().getName()
329  else:
330  prefix = ''
331  print "%sFile %s is staged" % (prefix, os.path.basename(fileurl))
332  return True
333 
334  if not projectname:
335  projectname = 'prestage'
336  if defname:
337  projectname = samweb.makeProjectName('%s_%s' % (defname, projectname))
338  elif snapshot_id:
339  projectname = samweb.makeProjectName('snapshot_id_%d_%s' % (snapshot_id, projectname))
340 
341  samweb.runProject(projectname=projectname, defname=defname, snapshot_id=snapshot_id,
342  application=('prestage','prestage',get_version()), callback=prestage, maxFiles=maxFiles,
343  station=station, deliveryLocation=deliveryLocation, node=node, nparallel=nparallel)
344 
def projectRecoveryDimension(samweb, projectnameorurl, useFileStatus=None, useProcessStatus=None)
Definition: projects.py:186
def stopProcess(samweb, processurl)
Definition: projects.py:162
def findProject(samweb, project, station=None)
Definition: projects.py:69
def setProcessFileStatus(samweb, processurl, filename, status="consumed")
Definition: projects.py:157
write
Run ND cosmics.
def projectSummaryText(samweb, projecturl)
Definition: projects.py:180
def setProcessStatus(samweb, status, projectnameorurl, processid=None, process_desc=None)
Definition: projects.py:201
def stopProject(samweb, projecturl)
Definition: projects.py:167
def makeProcessUrl(samweb, projecturl, processid)
Definition: projects.py:104
def projectSummary(samweb, projecturl)
Definition: projects.py:174
def listProjects(samweb, stream=False, queryCriteria)
Definition: projects.py:11
def startProcess(samweb, projecturl, appfamily, appname, appversion, deliveryLocation=None, node=None, user=None, maxFiles=None, description=None, schemas=None)
Definition: projects.py:77
def convert_from_unicode(input)
Definition: __init__.py:14
def getNextFile(samweb, processurl, timeout=None)
Definition: projects.py:111
def prestageDataset(samweb, projectname=None, defname=None, snapshot_id=None, maxFiles=0, station=None, deliveryLocation=None, node=None, nparallel=1)
Definition: projects.py:320
def releaseFile(samweb, processurl, filename, status="ok")
Definition: projects.py:150
def makeProjectName(samweb, description)
Definition: projects.py:23
def startProject(samweb, project, defname=None, station=None, group=None, user=None, snapshot_id=None)
Definition: projects.py:36
def runProject(samweb, projectname=None, defname=None, snapshot_id=None, callback=None, deliveryLocation=None, node=None, station=None, maxFiles=0, schemas=None, application=('runproject','runproject', get_version()), nparallel=1, quiet=False)
Definition: projects.py:230