log_trawl.py
Go to the documentation of this file.
1 #! /usr/bin/env python
2 
3 import sys, os, stat
4 import argparse
5 from collections import OrderedDict
6 import code
7 import glob
8 
9 # Permission mask: user r,w; group r
10 PERMISSION_RW_R = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP
11 # Permission mask: user r,w; group r; others r
12 PERMISSION_RW_R_R = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH
13 
14 def checkIsFile(path):
15  if os.path.isfile(path):
16  return 1
17  print "Warning, not a file: ", path
18  return 0
19 
20 def setPermission(path, permission=PERMISSION_RW_R_R):
21  if checkIsFile(path):
22  os.chmod(path, permission)
23 
24 def tail(path, window=20):
25  f = open(path, 'r')
26  BUFSIZ = 1024
27  f.seek(0, 2)
28  bytes = f.tell()
29  size = window
30  block = -1
31  data = []
32  while size > 0 and bytes > 0:
33  if (bytes - BUFSIZ > 0):
34  # Seek back one whole BUFSIZ
35  f.seek(block*BUFSIZ, 2)
36  # read BUFFER
37  data.append(f.read(BUFSIZ))
38  else:
39  # file too small, start from begining
40  f.seek(0,0)
41  # only read what was not read
42  data.append(f.read(bytes))
43  linesFound = data[-1].count('\n')
44  size -= linesFound
45  bytes -= BUFSIZ
46  block -= 1
47  return '\n'.join(''.join(data).splitlines()[-window:])
48 
49 
50 
51 class Process:
52  # Constructor
53  def __init__(self, num, proj):
54  self.num = num
55  self.msgs = []
56  self.proj = proj
57  self.err = FileViewer(self.errPath())
58  self.out = FileViewer(self.outPath())
59 
60  # Implementation of str()
61  def __str__(self):
62  return "Process " + str(self.num) + ", last message: \n" + self.lastMsg()
63 
64  # How the thing represents it self, notably for tostring and the "in" operator
65  def __repr__(self):
66  return str(self)
67 
68  # How the thing represents it self, notably for tostring and the "in" operator
69  def __contains__(self, test):
70  return test in self.lastMsg()
71 
72  # Implementation of ==
73  def __eq__(self,other):
74  return self.num == other.num and self.proj.logPath == other.proj.logPath
75 
76  # Get the number of messages in the .log file
77  def nMsgs(self):
78  return len(self.msgs)
79 
80  # Get the last message, usually the most interesting one
81  def lastMsg(self):
82  return self.msgs[len(self.msgs) - 1]
83 
84  # Add a message to the process, intended for use in the Project class
85  # when parsing .log files
86  def addMsg(self, msg):
87  self.msgs.append(msg)
88 
89  # Print all messages for this process
90  def printMsgs(self):
91  for msg in self.msgs:
92  print msg
93 
94  ## Get the path to the std err file
95  def errPath(self):
96  wildcard = self.proj.logPath.replace(".log", "*.%s.err" %(str(self.num)))
97  files = glob.glob(wildcard)
98  if len(files) != 1:
99  raise Exception("Too many (%s) files found matching path %s"
100  % (len(files), wildcard))
101  path = files[0]
102  checkIsFile(path)
103  return path
104 
105  ## Get the path to the std out file
106  def outPath(self):
107  wildcard = self.proj.logPath.replace(".log", "*.%s.out" %(str(self.num)))
108  files = glob.glob(wildcard)
109  if len(files) != 1:
110  raise Exception("Too many (%s) files found matching path %s"
111  % (len(files), wildcard))
112  path = files[0]
113  checkIsFile(path)
114  return path
115 
116  ## Tail the std err file
117  def errTail(self, window=20):
118  path = self.errPath()
119  if not checkIsFile(path):
120  return
121  return tail(path, window)
122 
123  ## Tail the std out file
124  def outTail(self, window=20):
125  path = self.outPath()
126  if not checkIsFile(path):
127  return
128  return tail(path, window)
129 
130  ## Open the std out file with less
131  def errLess(self):
132  path = self.errPath()
133  if not checkIsFile(path):
134  return
135  print os.system("less " + path)
136  return
137 
138  ## Open the std err file with less
139  def outLess(self):
140  path = self.outPath()
141  if not checkIsFile(path):
142  return
143  print os.system("less " + path)
144  return
145 
146  ## Find the return code from the last message, -1 if last message is not
147  ## normal termination
148  def retCode(self):
149  if not "return value" in self.lastMsg():
150  return -1
151  start = self.lastMsg().find("return value")
152 
153  end = self.lastMsg().find(")", start)
154  valStr = self.lastMsg()[start:end]
155  values = valStr.split()
156  try:
157  return int(values[2])
158  except:
159  raise Exception("Can't find return code, improperly fomatted message: %s"
160  % (self.lastMsg()))
161 
162 
163 states = ["normal", "abnormal", "incomplete"]
164 
165 class Project:
166  def __init__(self, logPath):
167  # Path to log file
168  self.logPath = logPath
169  # List of all messages from .log file, not sorted by process
170  self.msgs = []
171  # Main dictionary of processes, indexed by condor job number.
172  self.procs = OrderedDict()
173  #
174  self.decomp = dict()
175  # List of process objects that ended normally (return value == 0 )
176  self.normal = []
177  # List of process objects that ended abnormally
178  # (return value != 0 or any other last message)
179  self.abnormal = []
180  # List of process objects that are incomplete
181  # (no return value, in progress or abnormally terminated)
182  self.incomplete = []
183 
184  self.stateMap = OrderedDict()
185  global states
186 
187  for state in states:
188  self.stateMap[state] = []
189 
190  if self.logPath.endswith(".cmd"):
191  print "Condor .cmd file supplied, swapping extension for .log"
192  self.logPath = logPath.replace(".cmd", ".log")
193  self.separateMessages()
194  self.processRecon()
195  self.inspectProcs()
196  self.decomposeModes()
197 
198  ## separateMessages() turns all of the separate messages in the file into
199  ## entries in a list.
200  ### Need to break the log into chunks. Many messages from many jobs
201  ### Example:
202  ### ...
203  ### 009 (17365926.880.000) 08/06 21:16:09 Job was aborted by the user.
204  ### The system macro SYSTEM_PERIODIC_REMOVE expression ...
205  ### atus) > 86400*3)))' evaluated to TRUE
206  ### ...
207  ### Seems we can rely on those dots, here we go
208  def separateMessages(self):
209  log = open(self.logPath, 'r')
210  msg = ""
211  for line in log:
212  if line.strip() == "...":
213  self.msgs.append(msg) # append the current message to the list
214  msg = "" # Reset the current message
215  else:
216  msg += line
217 
218 
219  ### processRecon() loops over all the messages to make Process objects, fills
220  ### a dict.
221  ### Relies on separateMessages() to have been called first, but that happens
222  ### properly in the constructor
223  def processRecon(self):
224  for msg in self.msgs:
225  smsg = msg.split()
226 
227  if len(smsg) > 2:
228  if "028" in smsg[0]:
229  continue
230  if smsg[1][0] == "(":
231  numStr = smsg[1]
232  num = int(numStr.split(".")[1])
233  if num in self.procs.keys():
234  self.procs[num].addMsg(msg)
235  else:
236  self.procs[num] = Process(num, self)
237  self.procs[num].addMsg(msg)
238 
239  ### Inspect the processes, make sure the numbers jive and decompose them into
240  ### "normal" and "abnormal"
241  def inspectProcs(self):
242  print "Found", self.nProcs(), "processes."
243  lastKey = None
244  sequential = True
245  firstKey = None
246  for key in sorted(self.procs.keys()):
247  if lastKey == None:
248  lastKey = key
249  firstKey = key
250  continue
251  diff = key - lastKey
252  if not diff == 1:
253  sequential = False
254  break
255  lastKey = key
256 
257  if sequential and firstKey == 0:
258  print "Process numbers are sequential starting with %s, things seem ok."\
259  % (str(firstKey))
260  elif sequential:
261  print "Process numbers are sequential, but do not start with 0. " + \
262  "Be careful with your indexing."
263  print "First process number is %s." % (str(firstKey))
264  else:
265  print "Process numbers are not sequential, be careful with your indexing."
266 
267 
268  for proc in self.procs.values():
269  if proc.retCode() == 0:
270  self.stateMap["normal"].append(proc)
271  elif proc.retCode() > 0:
272  self.stateMap["abnormal"].append(proc)
273  else:
274  self.stateMap["incomplete"].append(proc)
275 
276  self.normal = self.stateMap["normal"]
277  self.abnormal = self.stateMap["abnormal"]
278  self.incomplete = self.stateMap["incomplete"]
279  print len(self.stateMap["normal"]), \
280  "of those processes ended normally with status 0"
281  print len(self.stateMap["abnormal"]), \
282  "of those processes ended normally but with non-zero status"
283  print len(self.stateMap["incomplete"]), \
284  "of those processes are either still in progress ended abnormally"
285 
286  ##
287  def decomposeModes(self):
288  for proc in self.procs.values():
289  splitMsg = proc.lastMsg().split()
290  try:
291  mode = int(splitMsg[0])
292  except:
293  raise Exception("Could not find mode (three digits) at the beginning" +\
294  "of this message: " + str(proc))
295 
296  if not mode in self.decomp.keys():
297  self.decomp[mode] = []
298 
299  self.decomp[mode].append(proc)
300  sum = 0
301  for mode in self.decomp.values():
302  sum += len(mode)
303  if not sum == self.nProcs():
304  raise Exception("Sum of decomposed processes not equal to total number "+\
305  "of processes.")
306 
307  def printModes(self):
308  print "Jobs have", len(self.decomp), "different modes. "+\
309  "Here is an example of each."
310  for mode in self.decomp.keys():
311  print "* Mode", mode, "-- there are", len(self.decomp[mode]), \
312  "of these, one example is:"
313  for line in str(self.decomp[mode][0]).splitlines():
314  print "\t", line
315  print ""
316 
317  ## Number of process objects in dict
318  def nProcs(self):
319  return len(self.procs)
320 
321  ## Implementation of [] operator for indexing
322  def __getitem__(self, n):
323  return self.procs[n]
324 
325  # Implementation of len(), returns number of items in dictionary
326  def __len__(self):
327  return len(self.procs)
328 
329  # Implementation of iter, use as "for thing in proj"
330  def __iter__(self):
331  for key in self.procs.keys():
332  yield self.procs[key]
333 
334  ## Make .log and all .err and .out group readable and user read/writable
335  def makeReadable(self, permission=PERMISSION_RW_R_R):
336  setPermission(self.logPath, permission)
337  for proc in self.procs.values():
338  setPermission(proc.outPath(), permission)
339  setPermission(proc.errPath(), permission)
340 
341 
342 
343 def findLogInDirectory(logDir):
344  logDirContents = os.listdir(logDir)
345  for fileName in logDirContents:
346  if fileName.endswith(".log"):
347  return os.path.join(logDir,fileName)
348  return None
349 
350 
351 
353  print "JobSub Client Job ID supplied, finding logs."
354  if "CONDOR_TMP" in os.environ and os.path.isdir(os.environ["CONDOR_TMP"]):
355  print "Found $CONDOR_TMP, logs will be stored there. "
356  fetchLocation = os.environ["CONDOR_TMP"]
357  else:
358  print "Failed to find $CONDOR_TMP, fetching logs to current directory."
359  fetchLocation = "./"
360 
361  logDir = os.path.join(fetchLocation, jobId)
362 
363  if not os.path.isdir(logDir):
364  os.mkdir(logDir)
365  print "Directory:", logDir
366  extantLogFile = findLogInDirectory(logDir)
367  if extantLogFile:
368  print "Log file previously fetched, location:", extantLogFile
369  return extantLogFile
370 
371  else:
372  print "Fetching log files..."
373  os.system("jobsub_fetchlog --jobid " + jobId + " --unzipdir " + logDir)
374 
375  fetchedLogFile = findLogInDirectory(logDir)
376  if fetchedLogFile:
377  print "Log file location:", fetchedLogFile
378  return fetchedLogFile
379  else:
380  raise Exception("Failed to filnd log file after fetching, " + \
381  "something went wrong.")
382 
384  def __init__(self, path):
385  self.path = path
386  def __repr__(self):
387  print self.path
388  if not checkIsFile(self.path):
389  return
390  os.system("less " + self.path)
391  return ""
392 
393 
394 
395 
396 
397 
398 
399 def main(argv):
400  parser = argparse.ArgumentParser(description="""
401  Loop over the ".log" file and find the final state of each
402  process, then find the .err and .out""")
403  parser.add_argument('logFile', help="""
404  The input file to run over, or a jobsub_client jobid.
405  In the jobid case, the files are fetched to CONDOR_TMP if it is defined,
406  or falls back to the current working directory if not defined. If the log
407  has already been fetcheded, it just finds it there.""", type=str)
408 
409  args = parser.parse_args()
410  logPath = args.logFile
411 
412  if not os.path.isfile(logPath) and "@" in logPath and ".fnal.gov" in logPath:
413  logPath = retreiveLogFromJobSubClient(logPath)
414 
415 
416  proj = Project(logPath)
417  return proj
418 
419 
420 
421 if __name__ == "__main__":
422  proj = main(sys.argv)
423  code.interact(local=locals(), banner="")
424 
void split(double tt, double *fr)
def errPath(self)
Get the path to the std err file.
Definition: log_trawl.py:95
def separateMessages(self)
separateMessages() turns all of the separate messages in the file into entries in a list...
Definition: log_trawl.py:208
def errTail(self, window=20)
Tail the std err file.
Definition: log_trawl.py:117
def outPath(self)
Get the path to the std out file.
Definition: log_trawl.py:106
def printMsgs(self)
Definition: log_trawl.py:90
def __init__(self, num, proj)
Definition: log_trawl.py:53
def processRecon(self)
processRecon() loops over all the messages to make Process objects, fills a dict. ...
Definition: log_trawl.py:223
def __eq__(self, other)
Definition: log_trawl.py:73
def addMsg(self, msg)
Definition: log_trawl.py:86
def __repr__(self)
Definition: log_trawl.py:65
def findLogInDirectory(logDir)
Definition: log_trawl.py:343
def __contains__(self, test)
Definition: log_trawl.py:69
def __str__(self)
Definition: log_trawl.py:61
def outLess(self)
Open the std err file with less.
Definition: log_trawl.py:139
def __getitem__(self, n)
Implementation of [] operator for indexing.
Definition: log_trawl.py:322
def __iter__(self)
Definition: log_trawl.py:330
def retreiveLogFromJobSubClient(jobId)
Definition: log_trawl.py:352
def nMsgs(self)
Definition: log_trawl.py:77
procfile open("FD_BRL_v0.txt")
def __len__(self)
Definition: log_trawl.py:326
def errLess(self)
Open the std out file with less.
Definition: log_trawl.py:131
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
def retCode(self)
Find the return code from the last message, -1 if last message is not normal termination.
Definition: log_trawl.py:148
def decomposeModes(self)
Definition: log_trawl.py:287
def lastMsg(self)
Definition: log_trawl.py:81
def tail(path, window=20)
Definition: log_trawl.py:24
def checkIsFile(path)
Definition: log_trawl.py:14
def inspectProcs(self)
Inspect the processes, make sure the numbers jive and decompose them into "normal" and "abnormal"...
Definition: log_trawl.py:241
def makeReadable(self, permission=PERMISSION_RW_R_R)
Make .log and all .err and .out group readable and user read/writable.
Definition: log_trawl.py:335
def setPermission(path, permission=PERMISSION_RW_R_R)
Definition: log_trawl.py:20
def __repr__(self)
Definition: log_trawl.py:386
def __init__(self, path)
Definition: log_trawl.py:384
def nProcs(self)
Number of process objects in dict.
Definition: log_trawl.py:318
def __init__(self, logPath)
Definition: log_trawl.py:166
def main(argv)
Definition: log_trawl.py:399
def printModes(self)
Definition: log_trawl.py:307
def outTail(self, window=20)
Tail the std out file.
Definition: log_trawl.py:124