40 from collections
import defaultdict, deque
68 for entry
in gIPWhitelist:
69 if ip.startswith(entry):
return True 88 while len(q) > 0
and q[0].late():
89 ret.append(q.popleft())
94 return sum([len(x)
for x
in d.values()])
99 return [x.popleft()
for x
in d.values()
if len(x) > 0]
106 gPIDQueue = defaultdict(deque)
112 gGetWorkQueue = defaultdict(deque)
132 gIdList = defaultdict(
lambda: 0)
147 return str(
int(time.time())) +
'.' +
str(os.getpid()) +
'.' +
str(n)
153 kTerminator =
'\r\n\r\n';
155 if kTerminator
not in msg:
return False 157 headerSize = msg.find(kTerminator)+4
159 cl =
'Content-Length: ' 161 if cl
not in msg:
return True 165 newline = msg.find(
'\r', lpos+len(cl))
166 bodyLength =
int(msg[lpos+len(cl):newline])
168 return len(msg) == headerSize+bodyLength
173 return 'HTTP/1.0 200 OK\r\n'\
174 'Server: LEMServer/0.1.0\r\n'\
175 'Content-Type: text/plain\r\n'\
181 return 'HTTP/1.0 200 OK\r\n'\
182 'Server: LEMServer/0.1.0\r\n'\
183 'Content-Type: text/html\r\n'\
189 return 'HTTP/1.0 501 Not Implemented\r\n'\
190 'Server: LEMServer/0.1.0\r\n'\
191 'Content-Type: text/plain\r\n'\
193 "I don't know how to do that\r\n" 198 gLastStatus = time.time()
200 global gLastSeqNo, gLastStatus
202 rate =
int((gSeqNo-gLastSeqNo)/(time.time()-gLastStatus))
204 gLastStatus = time.time()
206 msg = time.strftime(
'%Y-%m-%d %H:%M:%S %Z: ')
207 msg +=
'Handled ' +
str(gSeqNo) +
' queries ('+
str(rate)+
' Hz). ' 208 msg +=
str(len(gWorkInProgress)) +
' workers working, ' 209 msg +=
str(
nConns(gGetWorkQueue)) +
' waiting, ' 210 msg +=
str(
nConns(gPIDQueue)) +
' backlog. ' 211 msg +=
str(gErrCount) +
' errors. ' 212 msg +=
'Server '+
str(.1*
int(1000*gBusyTime/(gBusyTime+gIdleTime)+.5))+
'% busy. ' 220 for k
in gClientList.keys():
222 if t0 < t+10: n10s += 1
223 if t0 < t+60: n60s += 1
224 if t0 < t+300: n5m += 1
225 if t0 < t+1800: n30m += 1
226 if t0 > t+1800: gClientList.pop(k)
227 for k
in gWorkerList:
228 if t0 > gWorkerList[k] + 600: gClientList.pop(k)
230 msg +=
str(n10s)+
'/'+
str(n60s)+
'/'+
str(n5m)+
'/'+
str(n30m)+
' clients active in last 10s/60s/5m/30m. ' 232 msg +=
str(len(gWorkerList)) +
' workers in last 10m.' 247 for key
in gPIDQueue:
248 if len(gPIDQueue[key]) > 0:
252 for rel
in rels: msg += rel+
'\n' 258 if not 'rel' in args:
260 for rel
in gPIDQueue.keys():
261 N = len(gPIDQueue[rel])
262 if N > 0: count[rel] = N
266 msg +=
'No backlog\n' 269 for rel
in count.keys():
270 msg += rel+
': '+
str(count[rel])+
'\n' 276 count = len(gPIDQueue[rel])
278 return {client:
ok200()+
str(count)+
'\n'}
285 for key
in sorted(gIdList.keys()):
286 msg += key +
' : ' +
str(gIdList[key])+
'\n' 293 htmlHead =
'<html><head><meta charset="UTF-8"><meta http-equiv="refresh" content="60"><title>LEM server status</title></head><body>\n' 294 htmlFoot =
'</body></html>\n' 300 if name
in args:
return args[name]
301 print 'Error:', cmd,
'without', name, args
307 global gQueuedPIDRequest, gGetWorkQueue, gWorkInProgress, gVerbosity
313 if not workerID
or not rel
or not timeout:
317 if len(gPIDQueue[rel]) == 0
or gPIDQueue[rel][-1].late():
323 info = gPIDQueue[rel].pop()
326 gWorkInProgress[tag] = info
330 print 'Handing out work under tag', tag
332 return {client:
ok200() +
'tag='+tag+
'\n' + info.query}
337 global gQueuedPIDRequest, gGetWorkQueue, gWorkInProgress, gVerbosity
344 if tag
not in gWorkInProgress:
345 print 'Warning: Reply for unknown tag', tag
346 return {client:
ok200()}
348 workerClient = gWorkInProgress.pop(tag).client
350 if gVerbosity > 0:
print 'Sending response', postdata
353 return {client:
ok200(),
354 workerClient:
ok200()+postdata}
359 global gQueuedPIDRequest, gGetWorkQueue, gWorkInProgress, gVerbosity
364 if not rel
or not timeout:
367 info =
ConnInfo(client, postdata, timeout)
370 if len(gGetWorkQueue[rel]) == 0
or gGetWorkQueue[rel][-1].late():
371 gPIDQueue[rel].
append(info)
375 workerClient = gGetWorkQueue[rel].pop().client
379 gWorkInProgress[tag] = info
382 if gVerbosity > 0:
print 'Handing out work under tag', tag
383 return {workerClient:
ok200() +
'tag='+tag+
'\n' + info.query}
389 if gVerbosity > 0:
print sockStr
391 query, junk, postdata = sockStr.partition(
'\r\n\r\n')
393 isGet = query.startswith(
'GET')
394 isPost = query.startswith(
'POST')
397 params = query.split(
' ')[1][1:]
398 cmd, junk, argStr = params.partition(
'?')
401 for arg
in argStr.split(
'&'):
402 key, junk, val = arg.partition(
'=')
405 if gVerbosity > 0:
print 'Parameters are:', params
406 if gVerbosity > 0:
print 'Cmd:', cmd,
'args:', args
413 if isPost
and cmd ==
'pid':
414 gClientList[shortId] = time.time()
415 if isGet
and cmd ==
'get_work':
416 gWorkerList[shortId] = time.time()
419 if shortId.rfind(
'.') > 0: shortId = shortId[:shortId.rfind(
'.')]
420 if shortId.rfind(
'.') > 0: shortId = shortId[:shortId.rfind(
'.')]
422 gIdList[shortId] += 1
426 'get_work' : handle_get_work,
427 'status' : handle_status,
428 'status.html' : handle_status_html,
429 'releases' : handle_releases,
430 'backlog' : handle_backlog,
436 'return_results' : handle_return_results
439 if isGet
and cmd
in get_handlers:
440 return get_handlers[cmd](client, args)
442 if isPost
and cmd
in post_handlers:
443 return post_handlers[cmd](client, args, postdata)
446 print 'Rejecting malformed query:' 454 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
456 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
469 for rel
in gPIDQueue.keys():
471 nClient = len(toClose)
472 for rel
in gGetWorkQueue.keys():
474 nWorker = len(toClose) - nClient
477 print 'Closing', nClient,
'client connections that reached their timeout' 479 if nWorker > 0
and gVerbosity > 0:
480 print 'Closing', nWorker,
'worker connections that reached their timeout' 483 gClientMap.pop(info.client.fileno())
494 while nConns(gPIDQueue) +
nConns(gGetWorkQueue) + len(gWorkInProgress) > kConnLimit
and nConns(gPIDQueue) > 0:
498 while nConns(gGetWorkQueue) + len(gWorkInProgress) > kConnLimit
and nConns(gGetWorkQueue) > 0:
502 keys = gWorkInProgress.keys()
504 if len(gWorkInProgress) <= kConnLimit:
break 505 toClose.append(gWorkInProgress.pop(key))
509 print 'Overloaded: closing', len(toClose),
'connections' 514 gClientMap.pop(info.client.fileno())
520 if __name__ ==
'__main__':
523 kNumPorts = kMaxPort-kMinPort+1
527 print 'Listening on ports', kMinPort,
'-', kMaxPort
531 for ss
in serverSock:
532 ep.register(ss, select.POLLIN)
533 socketMap[ss.fileno()] = ss
535 inbox = defaultdict(
lambda:
'')
536 outbox = defaultdict(
lambda:
'')
538 statusTime = time.time()
545 if time.time() - statusTime > 3:
546 statusTime = time.time()
566 if fileno
not in socketMap
and fileno
not in gClientMap:
567 print 'Unknown fileno!', fileno
569 if fileno
in socketMap:
570 assert mask == select.POLLIN
572 ss = socketMap[fileno]
573 clientSock, addr = ss.accept()
575 clientSock.setblocking(0)
578 ep.register(clientSock, select.POLLIN)
579 gClientMap[clientSock.fileno()] = clientSock
581 if fileno
in gClientMap:
582 cs = gClientMap[fileno]
584 if mask == select.POLLIN:
587 print 'Read nothing. Client hung up on us?' 589 gClientMap.pop(fileno)
599 outbox[sock] = msgs[sock]
600 ep.register(sock, select.POLLOUT)
602 elif mask == select.POLLOUT:
606 print 'Wrote nothing. Client hung up on us?' 608 gClientMap.pop(fileno)
615 gClientMap.pop(fileno)
619 print 'Unknown mask', mask
620 gClientMap.pop(fileno)
def failsafe_arg(cmd, args, name)
def handle_return_results(client, args, postdata)
def handle_pid(client, args, postdata)
def handle_query(sockStr, client)
def handle_status_html(client, args)
def handle_get_work(client, args)
def close_old_connections()
def __init__(self, client, query, timeout)
def is_complete_query(msg)
static float min(const float a, const float b, const float c)
def handle_status(client, args)
def handle_ids(client, args)
def handle_backlog(client, args)
def handle_releases(client, args)