LEMWorkerOutput_module.cc
Go to the documentation of this file.
1 ////////////////////////////////////////////////////////////////////////
2 // \file LEMWorkerOutput_module.cc
3 // \brief Runs after LEM on the high-mem node, returns results to the server
4 // \author Christopher Backhouse - bckhouse@caltech.edu
5 ////////////////////////////////////////////////////////////////////////
6 
7 // Framework includes
13 #include "fhiclcpp/ParameterSet.h"
14 
15 #include "LEM/LEMInput.h"
16 #include "LEM/LEMWebSettings.h"
17 #include "LEM/PIDDetails.h"
18 #include "LEM/WebUtils.h"
19 
20 #include <iostream>
21 
22 #include <curl/curl.h>
23 
24 namespace
25 {
26  struct LEMWorkerOutputParams
27  {
28  template<class T> using Atom = fhicl::Atom<T>;
29  template<class T> using Table = fhicl::Table<T>;
30  using Comment = fhicl::Comment;
31  using Name = fhicl::Name;
32 
33  Atom<std::string> inputLabel{
34  Name("InputLabel"),
35  Comment("Where to find LEMInput objects")
36  };
37 
38  Atom<std::string> pidLabel{
39  Name("PIDLabel"),
40  Comment("Where to find PIDDetails objects")
41  };
42 
43  Table<lem::LEMWebSettings> web{Name("WebSettings")};
44  };
45 }
46 
47 namespace lem
48 {
49  /// Runs after LEM on the high-mem node, returns results to the server
51  {
52  public:
53  // Allows 'nova --print-description' to work
55 
56  explicit LEMWorkerOutput(const Parameters& params);
57  ~LEMWorkerOutput();
58 
59  virtual void produce(art::Event& evt);
60 
61  protected:
62  void ReturnResults(const std::string& tag,
63  const PIDDetails& pid) const;
64 
65  LEMWorkerOutputParams fParams;
66 
68  };
69 
70  //......................................................................
71  LEMWorkerOutput::LEMWorkerOutput(const Parameters& params)
72  : EDProducer(params)
73  , fParams(params())
74  {
75  // Generate a unique ID so that the server can identify us.
76  char host[1024];
77  gethostname(host, 1023);
78  host[1023] = 0;
79  const char* user = getenv("USER");
80  if(!user) user = "(unknown)";
81  fWorkerID = TString::Format("%s@%s.%d.%ld",
82  user, host, getpid(), time(0)).Data();
83 
84  curl_global_init(CURL_GLOBAL_ALL);
85  }
86 
87  //......................................................................
89  {
90  curl_global_cleanup();
91  }
92 
94  {
98  };
99 
100  //......................................................................
102  {
104 
105  bool ok;
106  post_query(args->web.host(),
107  args->params, args->postdata,
108  args->web.minPort(), args->web.maxPort(),
109  args->web.queryTimeout(), args->web.retryTimeout(),
110  ok);
111 
112  if(!ok){
113  std::cerr << "No response from server after many retries. Aborting"
114  << std::endl;
115  abort();
116  }
117 
118  delete args;
119 
120  return 0;
121  }
122 
123  //......................................................................
125  const PIDDetails& pid) const
126  {
127  // Don't hang around waiting for the server to receive our results when we
128  // could be getting started on the next event to ID. Implement a "fire and
129  // forget" scheme.
130 
131  static bool once = true;
132  static pthread_t thread;
133  if(once){
134  once = false;
135  }
136  else{
137  // Always make sure the previous result was received so that we don't try
138  // to have multiple in flight. In case eg there really is a problem with
139  // our connectivity.
140  pthread_join(thread, 0);
141  }
142 
143  const LEMWebSettings& web = fParams.web();
144 
145  const std::string params = TString::Format("return_results?tag=%s&id=%s&timeout=%d",
146  tag.c_str(),
147  fWorkerID.c_str(),
148  web.queryTimeout()).Data();
149 
151  args->params = params;
152  args->postdata = pid.ToString();
153  args->web = web;
154 
155  pthread_create(&thread, 0, ReturnResultsThreadFunc, args);
156  }
157 
158  //......................................................................
160  {
162  evt.getByLabel(fParams.pidLabel(), pidcol);
163  assert(!pidcol.failedToGet());
164  assert(!pidcol->empty());
165 
167  evt.getByLabel(fParams.inputLabel(), inputcol);
168  assert(inputcol->size() == 1);
169  const std::string tag = (*inputcol)[0].tag;
170  assert(pidcol->size() == 1);
171 
172  ReturnResults(tag, (*pidcol)[0]);
173  }
174 
176 
177 } //namespace lem
178 ////////////////////////////////////////////////////////////////////////
void * ReturnResultsThreadFunc(void *x)
Runs after LEM on the high-mem node, returns results to the server.
Atom< std::string > host
void ReturnResults(const std::string &tag, const PIDDetails &pid) const
Atom< int > queryTimeout
OStream cerr
Definition: OStream.cxx:7
DEFINE_ART_MODULE(TestTMapFile)
::xsd::cxx::tree::time< char, simple_type > time
Definition: Database.h:194
PID
Definition: FillPIDs.h:14
bool getByLabel(std::string const &label, std::string const &instance, Handle< PROD > &result) const
Definition: DataViewImpl.h:446
std::string getenv(std::string const &name)
LEMWorkerOutputParams fParams
int evt
std::string post_query(const std::string &host, const std::string &query, const std::string &postdata, int minPort, int maxPort, int queryTimeout, int retryTimeout, bool &ok)
Make an HTTP POST query.
Definition: WebUtils.cxx:162
virtual void produce(art::Event &evt)
assert(nhit_max >=nhit_nbins)
std::string ToString() const
Definition: PIDDetails.cxx:9
Attach LEM-specific info to the base PID object.
Definition: PIDDetails.h:20
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
Atom< int > retryTimeout
bool failedToGet() const
Definition: Handle.h:190
enum BeamMode string