LEMWorker_module.cc
Go to the documentation of this file.
1 ////////////////////////////////////////////////////////////////////////
2 // \file LEMWorker_module.cc
3 // \brief This runs on a high-mem node and reconstitutes LEMSummary events
4 // \author Christopher Backhouse - bckhouse@caltech.edu
5 ////////////////////////////////////////////////////////////////////////
6 
7 // NOvASoft includes
8 #include "RecoBase/Cluster.h"
9 #include "Utilities/AssociationUtil.h"
10 
11 // Framework includes
16 #include "fhiclcpp/ParameterSet.h"
17 
18 #include "LEM/LEMInput.h"
19 #include "LEM/LEMWebSettings.h"
20 #include "LEM/Util.h"
21 #include "LEM/WebUtils.h"
22 
23 #include <iostream>
24 
25 #include <curl/curl.h>
26 
27 namespace
28 {
29  struct LEMWorkerParams
30  {
31  template<class T> using Atom = fhicl::Atom<T>;
32  template<class T> using Table = fhicl::Table<T>;
33  using Comment = fhicl::Comment;
34  using Name = fhicl::Name;
35 
36  Atom<std::string> releaseOverride{
37  Name("ReleaseOverride"),
38  Comment("Specify this release to the server. Empty string for current release.")
39  };
40 
41  Table<lem::LEMWebSettings> web{Name("WebSettings")};
42  };
43 }
44 
45 namespace lem
46 {
47  /// This runs on a high-mem node and reconstitutes LEMSummary events
48  class LEMWorker: public art::EDProducer
49  {
50  public:
51  // Allows 'nova --print-description' to work
53 
54  explicit LEMWorker(const Parameters& params);
55  ~LEMWorker();
56 
57  virtual void produce(art::Event& evt);
58 
59  protected:
60  std::string GetWork() const;
61 
62  LEMWorkerParams fParams;
63 
65  };
66 
67  //......................................................................
68  LEMWorker::LEMWorker(const Parameters& params)
69  : fParams(params())
70  {
71  produces<std::vector<LEMInput>>();
72 
73  // Randomize ports and so on properly
74  srand(getpid());
75 
76  // Generate a unique ID so that the server can identify us, eg to detect
77  // duplicate requests.
78  char host[1024];
79  gethostname(host, 1023);
80  host[1023] = 0;
81  const char* user = getenv("USER");
82  if(!user) user = "(unknown)";
83  fWorkerID = TString::Format("%s@%s.%d.%ld",
84  user, host, getpid(), time(0)).Data();
85 
86  curl_global_init(CURL_GLOBAL_ALL);
87  }
88 
89  //......................................................................
91  {
92  curl_global_cleanup();
93  }
94 
95  struct GetWorkArgs
96  {
99  };
100 
101  void* GetWorkThreadFunc(void* x)
102  {
103  const GetWorkArgs* args = (GetWorkArgs*)x;
104 
105  bool ok;
106  const std::string work = get_query(args->web.host(),
107  args->params,
108  args->web.minPort(),
109  args->web.maxPort(),
110  args->web.queryTimeout(),
111  args->web.retryTimeout(),
112  ok);
113  if(!ok){
114  std::cerr << "No response from server after many retries. "
115  << "Maybe there's no work for us. Aborting"
116  << std::endl;
117  abort();
118  }
119 
120  delete args;
121  return new std::string(work);
122  }
123 
124  //......................................................................
126  {
127  // Instead of hanging around wasting valuable computation time waiting for
128  // the server to give us the next event, fetch the next one in parallel
129  // with ID'ing the previous event and hopefully have it ready when
130  // produce() wants it.
131 
132  static bool once = true;
133  static pthread_t thread;
134 
135  const LEMWebSettings& web = fParams.web();
136 
137  const std::string rel = fParams.releaseOverride().empty() ? get_release() : fParams.releaseOverride();
138 
139  const std::string query = TString::Format("get_work?rel=%s&id=%s&timeout=%d",
140  rel.c_str(),
141  fWorkerID.c_str(),
142  web.queryTimeout()).Data();
143 
145 
146  args->params = query;
147  args->web = web;
148 
149  void* ret;
150  if(once){
151  once = false;
152  // First time out just make a synchronous call to get the ball rolling.
153  // Clone the args since the thread function deletes them
154  ret = GetWorkThreadFunc(new GetWorkArgs(*args));
155  }
156  else{
157  // Otherwise, fetch the result from the thread, wait for it if necessary.
158  pthread_join(thread, &ret);
159  }
160 
161  // Get ahold of the result more safely.
162  const std::string work = *((std::string*)ret);
163  delete (std::string*)ret;
164 
165  // And fire up a new request, ready for next time.
166  pthread_create(&thread, 0, GetWorkThreadFunc, args);
167 
168  return work;
169  }
170 
171  //......................................................................
173  {
174  const std::string work = GetWork();
175 
176  char workstr[work.size()+1];
177  strcpy(workstr, work.c_str());
178 
179  LEMInput input;
180 
181  input.tag = &strtok(workstr, "\n")[4]; // skip over "tag="
182 
183  char* estr = strtok(0, "\n");
184  assert(estr);
185  int n = sscanf(estr, "E=%lf", &input.totalGeV);
186  assert(n == 1);
187 
188  while(char* chanqstr = strtok(0, "\n")){
189  LiteHit hit;
190  n = sscanf(chanqstr, "%hu %f", &hit.cellIdx, &hit.pecorr);
191  assert(n == 2);
192  input.hits.push_back(hit);
193  }
194 
195  std::unique_ptr<std::vector<LEMInput>> sumcol(new std::vector<LEMInput>(1, input));
196  evt.put(std::move(sumcol));
197  }
198 
200 
201 } //namespace lem
202 ////////////////////////////////////////////////////////////////////////
Atom< std::string > host
LEMWebSettings web
LEMWorkerParams fParams
Atom< int > queryTimeout
OStream cerr
Definition: OStream.cxx:7
float pecorr
Definition: LiteHit.h:24
std::string GetWork() const
DEFINE_ART_MODULE(TestTMapFile)
::xsd::cxx::tree::time< char, simple_type > time
Definition: Database.h:194
std::string fWorkerID
PID
Definition: FillPIDs.h:14
ProductID put(std::unique_ptr< PROD > &&product)
Definition: Event.h:102
unsigned short cellIdx
Definition: LiteHit.h:25
std::string getenv(std::string const &name)
int evt
void * GetWorkThreadFunc(void *x)
std::string get_release()
Figure out the current release. TODO: does this belong somewhere else?
Definition: WebUtils.cxx:178
string rel
Definition: shutoffs.py:11
Definition: structs.h:12
assert(nhit_max >=nhit_nbins)
std::string tag
Definition: LEMInput.h:16
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
std::string get_query(const std::string &host, const std::string &query, int minPort, int maxPort, int queryTimeout, int retryTimeout, bool &ok)
Make an HTTP GET query.
Definition: WebUtils.cxx:147
Compressed hit info, basic component of LEM events.
Definition: LiteHit.h:18
virtual void produce(art::Event &evt)
This runs on a high-mem node and reconstitutes LEMSummary events.
Atom< int > retryTimeout
enum BeamMode string