lem_server.py
Go to the documentation of this file.
1 #!/usr/bin/env python
2 
3 # Here's some useful documentation on TCP: http://linux.die.net/man/7/tcp
4 # The python sockets howto is good https://docs.python.org/2/howto/sockets.html
5 
6 # We need more open files than is usual. Edit /etc/security/limits.conf to add
7 # This is a per-login setting, no need to reboot.
8 # lem soft nofile 4096
9 # lem hard nofile 4096
10 
11 # We need bigger tables to keep track of old connections. Absence of this is
12 # what made the network/CPU "oscillate" at high load. Add these commands to
13 # /etc/rc.d/rc.local
14 #
15 # touch /var/lock/subsys/local
16 # sysctl -w net.netfilter.nf_conntrack_max=262144
17 # echo 65536 > /sys/module/nf_conntrack/parameters/hashsize
18 #
19 # Check they took with:
20 #
21 # $ cat /sys/module/nf_conntrack/parameters/hashsize
22 # 65536
23 # $ cat /proc/sys/net/netfilter/nf_conntrack_max
24 # 262144
25 
26 # If we ever want to listen on port 80 it'd be something like:
27 # ls -l /etc/authbind/byport/80
28 # -rwxr--r-- 1 lem lem 0 Nov 18 20:36 /etc/authbind/byport/80
29 # and start with 'authbind --deep ./lem_server.py'
30 
31 import os
32 import select
33 import socket
34 import sys
35 import time
36 import thread
37 import time
38 import traceback
39 
40 from collections import defaultdict, deque
41 
42 gVerbosity = 0
43 
44 # IP whitelist is currently unused
45 gIPWhitelist = [
46  # Fermilab, according to https://ipinfo.io/AS3152
47  '131.225.',
48  '192.190.216.',
49  '198.49.208.',
50  # Caltech, according to https://ipinfo.io/AS31
51  # TODO maybe only the HEP cluster machines?
52  '131.215.',
53  '134.4.',
54  '192.12.19.',
55  '192.31.43.',
56  '192.31.208.',
57  '192.43.243.',
58  '192.54.249.'
59  # SMU: https://ipinfo.io/AS1832
60  # UMN: https://ipinfo.io/AS217
61  ]
62 
63 def is_allowed_ip(ip):
64  # We'll be running offsite. Don't know all those IP addresses, so just let
65  # anything through for now.
66  return True
67 
68  for entry in gIPWhitelist:
69  if ip.startswith(entry): return True
70  return False
71 
72 
73 class ConnInfo:
74  def __init__(self, client, query, timeout):
75  self.client = client
76  self.query = query
77  self.deadline = time.time()+int(timeout)
78 
79  def late(self):
80  # Discretize so that we close several at second-boundaries
81  # instead of one constantly.
82  return int(time.time()) > int(self.deadline)
83 
84 
85 
87  ret = []
88  while len(q) > 0 and q[0].late():
89  ret.append(q.popleft())
90  return ret
91 
92 # Sums the length of all the lists in a dict
93 def nConns(d):
94  return sum([len(x) for x in d.values()])
95 
96 # Takes a dict of deques, removes the first element from every deque, and
97 # returns that list.
98 def pop_fronts(d):
99  return [x.popleft() for x in d.values() if len(x) > 0]
100 
101 
102 # When a new 'pid' request comes in, if we can't match it to a worker
103 # immediately, we put it here. The top level key is the release, and under each
104 # release is a list of requests (ConnInfo objects), ordered by age, newest
105 # last.
106 gPIDQueue = defaultdict(deque)
107 
108 # When a new 'get_work' request comes in, if we can't match it to a client
109 # immediately, we put it here. The top level key is the release, and under each
110 # release is a list of requests (ConnInfo objects), ordered by age, newest
111 # last.
112 gGetWorkQueue = defaultdict(deque)
113 
114 # Once we've matched a 'pid' and a 'get_work' the connection from the client
115 # goes in here (the connection from the worker is closed once we've sent it the
116 # work). The key here is the unique 'tag' we gave the worker that it'll give us
117 # back when it makes its 'return_results' call. We don't store the release
118 # here, that match has already been succesfully made.
119 gWorkInProgress = {}
120 
121 # File numbers to sockets
122 gClientMap = {}
123 
124 # Data for get_tag()
125 gSeqNo = 0
126 
127 gErrCount = 0
128 
129 gBusyTime = 0
130 gIdleTime = 0
131 
132 gIdList = defaultdict(lambda: 0)
133 
134 # Map from connection's ID to last time received
135 gClientList = {}
136 gWorkerList = {}
137 
138 # -----------------------------------------------------------------------------
139 
140 # Poor man's UUID
141 def get_tag():
142  global gSeqNo
143 
144  n = gSeqNo
145  gSeqNo += 1
146 
147  return str(int(time.time())) + '.' + str(os.getpid()) + '.' + str(n)
148 
149 # -----------------------------------------------------------------------------
150 
152  # headers are always terminated by \r\n\r\n
153  kTerminator = '\r\n\r\n';
154 
155  if kTerminator not in msg: return False
156 
157  headerSize = msg.find(kTerminator)+4
158 
159  cl = 'Content-Length: '
160  # No content-length, therefore no body
161  if cl not in msg: return True
162  lpos = msg.find(cl)
163 
164  # Parse the context-length header
165  newline = msg.find('\r', lpos+len(cl))
166  bodyLength = int(msg[lpos+len(cl):newline])
167 
168  return len(msg) == headerSize+bodyLength
169 
170 # -----------------------------------------------------------------------------
171 
172 def ok200():
173  return 'HTTP/1.0 200 OK\r\n'\
174  'Server: LEMServer/0.1.0\r\n'\
175  'Content-Type: text/plain\r\n'\
176  '\r\n'
177 
178 # -----------------------------------------------------------------------------
179 
181  return 'HTTP/1.0 200 OK\r\n'\
182  'Server: LEMServer/0.1.0\r\n'\
183  'Content-Type: text/html\r\n'\
184  '\r\n'
185 
186 # -----------------------------------------------------------------------------
187 
188 def unimp501():
189  return 'HTTP/1.0 501 Not Implemented\r\n'\
190  'Server: LEMServer/0.1.0\r\n'\
191  'Content-Type: text/plain\r\n'\
192  '\r\n'\
193  "I don't know how to do that\r\n"
194 
195 # -----------------------------------------------------------------------------
196 
197 gLastSeqNo = 0
198 gLastStatus = time.time()
200  global gLastSeqNo, gLastStatus
201 
202  rate = int((gSeqNo-gLastSeqNo)/(time.time()-gLastStatus))
203  gLastSeqNo = gSeqNo
204  gLastStatus = time.time()
205 
206  msg = time.strftime('%Y-%m-%d %H:%M:%S %Z: ')
207  msg += 'Handled ' + str(gSeqNo) + ' queries ('+str(rate)+' Hz). '
208  msg += str(len(gWorkInProgress)) + ' workers working, '
209  msg += str(nConns(gGetWorkQueue)) + ' waiting, '
210  msg += str(nConns(gPIDQueue)) + ' backlog. '
211  msg += str(gErrCount) + ' errors. '
212  msg += 'Server '+str(.1*int(1000*gBusyTime/(gBusyTime+gIdleTime)+.5))+'% busy. '
213 
214  # Forget about clients and workers after 10 minutes
215  t0 = time.time()
216  n10s = 0
217  n60s = 0
218  n5m = 0
219  n30m = 0
220  for k in gClientList.keys():
221  t = gClientList[k]
222  if t0 < t+10: n10s += 1
223  if t0 < t+60: n60s += 1
224  if t0 < t+300: n5m += 1
225  if t0 < t+1800: n30m += 1
226  if t0 > t+1800: gClientList.pop(k)
227  for k in gWorkerList:
228  if t0 > gWorkerList[k] + 600: gClientList.pop(k)
229 
230  msg += str(n10s)+'/'+str(n60s)+'/'+str(n5m)+'/'+str(n30m)+' clients active in last 10s/60s/5m/30m. '
231 
232  msg += str(len(gWorkerList)) + ' workers in last 10m.'
233 
234  return msg
235 
236 # -----------------------------------------------------------------------------
237 
238 def handle_status(client, args):
239  return {client: ok200()+status_text()+'\n'}
240 
241 # -----------------------------------------------------------------------------
242 
243 def handle_releases(client, args):
244  rels = set()
245  # TODO ideally we would also list releases with only in-progress PIDs,
246  # but we don't store that information anywhere.
247  for key in gPIDQueue:
248  if len(gPIDQueue[key]) > 0:
249  rels.add(key)
250 
251  msg = ok200()
252  for rel in rels: msg += rel+'\n'
253  return {client: msg}
254 
255 # -----------------------------------------------------------------------------
256 
257 def handle_backlog(client, args):
258  if not 'rel' in args:
259  count = {}
260  for rel in gPIDQueue.keys():
261  N = len(gPIDQueue[rel])
262  if N > 0: count[rel] = N
263 
264  msg = ok200()
265  if len(count) == 0:
266  msg += 'No backlog\n'
267  else:
268  msg += 'Backlogs:\n'
269  for rel in count.keys():
270  msg += rel+': '+str(count[rel])+'\n'
271  return {client: msg}
272 
273  # Otherwise a release /was/ specified, return machine-readable result
274  rel = failsafe_arg('backlog', args, 'rel')
275 
276  count = len(gPIDQueue[rel])
277 
278  return {client: ok200()+str(count)+'\n'}
279 
280 # -----------------------------------------------------------------------------
281 
282 def handle_ids(client, args):
283  msg = ok200()
284 
285  for key in sorted(gIdList.keys()):
286  msg += key + ' : ' + str(gIdList[key])+'\n'
287 
288  return {client: msg}
289 
290 # -----------------------------------------------------------------------------
291 
292 def handle_status_html(client, args):
293  htmlHead = '<html><head><meta charset="UTF-8"><meta http-equiv="refresh" content="60"><title>LEM server status</title></head><body>\n'
294  htmlFoot = '</body></html>\n'
295 
296  return {client: ok200_html() + htmlHead + status_text() + htmlFoot}
297 
298 # -----------------------------------------------------------------------------
299 def failsafe_arg(cmd, args, name):
300  if name in args: return args[name]
301  print 'Error:', cmd, 'without', name, args
302  return None
303 
304 # -----------------------------------------------------------------------------
305 
306 def handle_get_work(client, args):
307  global gQueuedPIDRequest, gGetWorkQueue, gWorkInProgress, gVerbosity
308 
309  workerID = failsafe_arg('get_work', args, 'id')
310  rel = failsafe_arg('get_work', args, 'rel')
311  timeout = failsafe_arg('get_work', args, 'timeout')
312 
313  if not workerID or not rel or not timeout:
314  return {client: unimp501()}
315 
316  # No work to give out right now
317  if len(gPIDQueue[rel]) == 0 or gPIDQueue[rel][-1].late():
318  # Experimenting with not letting too many get_work calls build up
319  gGetWorkQueue[rel].append(ConnInfo(client, '', min(10,timeout)))
320  return {}
321 
322  # There is work, and it's recent enough
323  info = gPIDQueue[rel].pop()
324  # Remember we're doing this so we know who the reply goes to
325  tag = get_tag()
326  gWorkInProgress[tag] = info
327 
328 
329  if gVerbosity > 0:
330  print 'Handing out work under tag', tag
331 
332  return {client: ok200() + 'tag='+tag+'\n' + info.query}
333 
334 # -----------------------------------------------------------------------------
335 
336 def handle_return_results(client, args, postdata):
337  global gQueuedPIDRequest, gGetWorkQueue, gWorkInProgress, gVerbosity
338 
339  tag = failsafe_arg('return_results', args, 'tag')
340 
341  if not tag:
342  return {client: unimp501()}
343 
344  if tag not in gWorkInProgress:
345  print 'Warning: Reply for unknown tag', tag
346  return {client: ok200()}
347 
348  workerClient = gWorkInProgress.pop(tag).client
349 
350  if gVerbosity > 0: print 'Sending response', postdata
351 
352  # No point checking the timeout, just try to send to the client in any case
353  return {client: ok200(),
354  workerClient: ok200()+postdata}
355 
356 # -----------------------------------------------------------------------------
357 
358 def handle_pid(client, args, postdata):
359  global gQueuedPIDRequest, gGetWorkQueue, gWorkInProgress, gVerbosity
360 
361  rel = failsafe_arg('handle_pid', args, 'rel')
362  timeout = failsafe_arg('handle_pid', args, 'timeout')
363 
364  if not rel or not timeout:
365  return {client: unimp501()}
366 
367  info = ConnInfo(client, postdata, timeout)
368 
369  # No recent matching worker, stick it in the queue
370  if len(gGetWorkQueue[rel]) == 0 or gGetWorkQueue[rel][-1].late():
371  gPIDQueue[rel].append(info)
372  return {}
373 
374  # All OK, hook things up
375  workerClient = gGetWorkQueue[rel].pop().client
376 
377  # Remember that we're doing this
378  tag = get_tag()
379  gWorkInProgress[tag] = info
380 
381 
382  if gVerbosity > 0: print 'Handing out work under tag', tag
383  return {workerClient: ok200() + 'tag='+tag+'\n' + info.query}
384 
385 # -----------------------------------------------------------------------------
386 
387 # Returns a map from sockets to the strings to send to them
388 def handle_query(sockStr, client):
389  if gVerbosity > 0: print sockStr
390 
391  query, junk, postdata = sockStr.partition('\r\n\r\n')
392 
393  isGet = query.startswith('GET')
394  isPost = query.startswith('POST')
395 
396  # Second thing after GET/POST. Drop the leading slash
397  params = query.split(' ')[1][1:]
398  cmd, junk, argStr = params.partition('?')
399 
400  args = {}
401  for arg in argStr.split('&'):
402  key, junk, val = arg.partition('=')
403  args[key] = val
404 
405  if gVerbosity > 0: print 'Parameters are:', params
406  if gVerbosity > 0: print 'Cmd:', cmd, 'args:', args
407 
408  shortId = 'unknown'
409  if 'id' in args:
410  shortId = args['id']
411 
412  # Update last contacted
413  if isPost and cmd == 'pid':
414  gClientList[shortId] = time.time()
415  if isGet and cmd == 'get_work':
416  gWorkerList[shortId] = time.time()
417 
418  # Clumsily truncate the last two numbers off the end to leave the host
419  if shortId.rfind('.') > 0: shortId = shortId[:shortId.rfind('.')]
420  if shortId.rfind('.') > 0: shortId = shortId[:shortId.rfind('.')]
421 
422  gIdList[shortId] += 1
423 
424 
425  get_handlers = {
426  'get_work' : handle_get_work,
427  'status' : handle_status,
428  'status.html' : handle_status_html,
429  'releases' : handle_releases,
430  'backlog' : handle_backlog,
431  'ids' : handle_ids
432  }
433 
434  post_handlers = {
435  'pid' : handle_pid,
436  'return_results' : handle_return_results
437  }
438 
439  if isGet and cmd in get_handlers:
440  return get_handlers[cmd](client, args)
441 
442  if isPost and cmd in post_handlers:
443  return post_handlers[cmd](client, args, postdata)
444 
445  # Didn't hit any of the cases above
446  print 'Rejecting malformed query:'
447  print sockStr
448 
449  return {client: unimp501()}
450 
451 # -----------------------------------------------------------------------------
452 
453 def startup(port):
454  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
455  # Reuse port immediately even if a previous instance just aborted.
456  s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
457  s.bind(('', port))
458  # Parameter is the length of "backlog" allowed
459  s.listen(128)
460  return s
461 
462 # -----------------------------------------------------------------------------
463 
465  toClose = []
466  nClient = 0
467  nWorker = 0
468  # Clear out old connections
469  for rel in gPIDQueue.keys():
470  toClose += pop_old_conns(gPIDQueue[rel])
471  nClient = len(toClose)
472  for rel in gGetWorkQueue.keys():
473  toClose += pop_old_conns(gGetWorkQueue[rel])
474  nWorker = len(toClose) - nClient
475 
476  if nClient > 0:
477  print 'Closing', nClient, 'client connections that reached their timeout'
478 
479  if nWorker > 0 and gVerbosity > 0:
480  print 'Closing', nWorker, 'worker connections that reached their timeout'
481 
482  for info in toClose:
483  gClientMap.pop(info.client.fileno())
484  info.client.close()
485 
486 # -----------------------------------------------------------------------------
487 
488 def shed_load():
489  toClose = []
490 
491  kConnLimit = 3300
492 
493  # First drop older PID requests
494  while nConns(gPIDQueue) + nConns(gGetWorkQueue) + len(gWorkInProgress) > kConnLimit and nConns(gPIDQueue) > 0:
495  toClose += pop_fronts(gPIDQueue)
496 
497  # If things get desperate, drop get_work requests
498  while nConns(gGetWorkQueue) + len(gWorkInProgress) > kConnLimit and nConns(gGetWorkQueue) > 0:
499  toClose += pop_fronts(gGetWorkQueue)
500 
501  # And finally, throw away actual work in progress (at random)
502  keys = gWorkInProgress.keys()
503  for key in keys:
504  if len(gWorkInProgress) <= kConnLimit: break
505  toClose.append(gWorkInProgress.pop(key))
506 
507 
508  if len(toClose) > 0:
509  print 'Overloaded: closing', len(toClose), 'connections'
510  # Doesn't seem to recover situation. Let the restarter do its thing
511  sys.exit(1)
512 
513  for info in toClose:
514  gClientMap.pop(info.client.fileno())
515  info.client.close()
516 
517 
518 # -----------------------------------------------------------------------------
519 
520 if __name__ == '__main__':
521  kMinPort = 1024
522  kMaxPort = 1031
523  kNumPorts = kMaxPort-kMinPort+1
524 
525  serverSock = [startup(port) for port in range(kMinPort, kMaxPort+1)]
526 
527  print 'Listening on ports', kMinPort, '-', kMaxPort
528 
529  ep = select.epoll()
530  socketMap = {}
531  for ss in serverSock:
532  ep.register(ss, select.POLLIN)
533  socketMap[ss.fileno()] = ss
534 
535  inbox = defaultdict(lambda: '')
536  outbox = defaultdict(lambda: '')
537 
538  statusTime = time.time()
539 
540  t1 = time.time()
541  t2 = time.time()
542 
543  while True:
544  # No more than once every three seconds
545  if time.time() - statusTime > 3:
546  statusTime = time.time()
547  print status_text()
548  # Busy fraction averages between these updates
549  gBusyTime = 0
550  gIdleTime = 0
551 
552  # Do some housekeeping
554  shed_load()
555 
556  # Update status every minute even if nothing's happening
557  t1 = time.time()
558  gBusyTime += t1-t2
559  events = ep.poll(60)
560  t2 = time.time()
561  gIdleTime += t2-t1
562 
563  for event in events:
564  fileno, mask = event
565 
566  if fileno not in socketMap and fileno not in gClientMap:
567  print 'Unknown fileno!', fileno
568 
569  if fileno in socketMap:
570  assert mask == select.POLLIN
571 
572  ss = socketMap[fileno]
573  clientSock, addr = ss.accept()
574 
575  clientSock.setblocking(0)
576 
577  # We'll always start by wanting to read this socket
578  ep.register(clientSock, select.POLLIN)
579  gClientMap[clientSock.fileno()] = clientSock
580 
581  if fileno in gClientMap:
582  cs = gClientMap[fileno]
583 
584  if mask == select.POLLIN:
585  x = cs.recv(4096)
586  if x == '':
587  print 'Read nothing. Client hung up on us?'
588  gErrCount += 1
589  gClientMap.pop(fileno)
590  cs.close()
591  else:
592  inbox[cs] += x
593  if is_complete_query(inbox[cs]):
594  # Will never need to read again
595  ep.unregister(cs)
596  msgs = handle_query(inbox[cs], cs)
597  inbox.pop(cs)
598  for sock in msgs:
599  outbox[sock] = msgs[sock]
600  ep.register(sock, select.POLLOUT)
601 
602  elif mask == select.POLLOUT:
603  msg = outbox[cs]
604  n = cs.send(msg)
605  if n == 0:
606  print 'Wrote nothing. Client hung up on us?'
607  gErrCount += 1
608  gClientMap.pop(fileno)
609  cs.close()
610  else:
611  msg = msg[n:]
612  outbox.pop(cs)
613  if msg == '':
614  # Sent everything
615  gClientMap.pop(fileno)
616  cs.close()
617 
618  else:
619  print 'Unknown mask', mask
620  gClientMap.pop(fileno)
621  cs.close()
622  gErrCount += 1
def failsafe_arg(cmd, args, name)
Definition: lem_server.py:299
def is_allowed_ip(ip)
Definition: lem_server.py:63
def nConns(d)
Definition: lem_server.py:93
def handle_return_results(client, args, postdata)
Definition: lem_server.py:336
def pop_fronts(d)
Definition: lem_server.py:98
def shed_load()
Definition: lem_server.py:488
def handle_pid(client, args, postdata)
Definition: lem_server.py:358
def handle_query(sockStr, client)
Definition: lem_server.py:388
def pop_old_conns(q)
Definition: lem_server.py:86
def handle_status_html(client, args)
Definition: lem_server.py:292
def startup(port)
Definition: lem_server.py:453
def unimp501()
Definition: lem_server.py:188
def handle_get_work(client, args)
Definition: lem_server.py:306
def close_old_connections()
Definition: lem_server.py:464
def get_tag()
Definition: lem_server.py:141
def __init__(self, client, query, timeout)
Definition: lem_server.py:74
def is_complete_query(msg)
Definition: lem_server.py:151
static float min(const float a, const float b, const float c)
Definition: absgeo.cxx:45
def ok200()
Definition: lem_server.py:172
def handle_status(client, args)
Definition: lem_server.py:238
def handle_ids(client, args)
Definition: lem_server.py:282
Double_t sum
Definition: plot.C:31
def status_text()
Definition: lem_server.py:199
def handle_backlog(client, args)
Definition: lem_server.py:257
def handle_releases(client, args)
Definition: lem_server.py:243
def ok200_html()
Definition: lem_server.py:180