LEMWorkerOutput_module.cc
Go to the documentation of this file.
1 ////////////////////////////////////////////////////////////////////////
2 // \file LEMWorkerOutput_module.cc
3 // \brief Runs after LEM on the high-mem node, returns results to the server
4 // \author Christopher Backhouse - bckhouse@caltech.edu
5 ////////////////////////////////////////////////////////////////////////
6 
7 // Framework includes
13 #include "fhiclcpp/ParameterSet.h"
14 
15 #include "LEM/LEMInput.h"
16 #include "LEM/LEMWebSettings.h"
17 #include "LEM/PIDDetails.h"
18 #include "LEM/WebUtils.h"
19 
20 #include <iostream>
21 
22 #include <curl/curl.h>
23 
24 namespace
25 {
26  struct LEMWorkerOutputParams
27  {
28  template<class T> using Atom = fhicl::Atom<T>;
29  template<class T> using Table = fhicl::Table<T>;
30  using Comment = fhicl::Comment;
31  using Name = fhicl::Name;
32 
33  Atom<std::string> inputLabel{
34  Name("InputLabel"),
35  Comment("Where to find LEMInput objects")
36  };
37 
38  Atom<std::string> pidLabel{
39  Name("PIDLabel"),
40  Comment("Where to find PIDDetails objects")
41  };
42 
43  Table<lem::LEMWebSettings> web{Name("WebSettings")};
44  };
45 }
46 
47 namespace lem
48 {
49  /// Runs after LEM on the high-mem node, returns results to the server
51  {
52  public:
53  // Allows 'nova --print-description' to work
55 
56  explicit LEMWorkerOutput(const Parameters& params);
57  ~LEMWorkerOutput();
58 
59  virtual void produce(art::Event& evt);
60 
61  protected:
62  void ReturnResults(const std::string& tag,
63  const PIDDetails& pid) const;
64 
65  LEMWorkerOutputParams fParams;
66 
68  };
69 
70  //......................................................................
71  LEMWorkerOutput::LEMWorkerOutput(const Parameters& params)
72  : fParams(params())
73  {
74  // Generate a unique ID so that the server can identify us.
75  char host[1024];
76  gethostname(host, 1023);
77  host[1023] = 0;
78  const char* user = getenv("USER");
79  if(!user) user = "(unknown)";
80  fWorkerID = TString::Format("%s@%s.%d.%ld",
81  user, host, getpid(), time(0)).Data();
82 
83  curl_global_init(CURL_GLOBAL_ALL);
84  }
85 
86  //......................................................................
88  {
89  curl_global_cleanup();
90  }
91 
93  {
97  };
98 
99  //......................................................................
101  {
103 
104  bool ok;
105  post_query(args->web.host(),
106  args->params, args->postdata,
107  args->web.minPort(), args->web.maxPort(),
108  args->web.queryTimeout(), args->web.retryTimeout(),
109  ok);
110 
111  if(!ok){
112  std::cerr << "No response from server after many retries. Aborting"
113  << std::endl;
114  abort();
115  }
116 
117  delete args;
118 
119  return 0;
120  }
121 
122  //......................................................................
124  const PIDDetails& pid) const
125  {
126  // Don't hang around waiting for the server to receive our results when we
127  // could be getting started on the next event to ID. Implement a "fire and
128  // forget" scheme.
129 
130  static bool once = true;
131  static pthread_t thread;
132  if(once){
133  once = false;
134  }
135  else{
136  // Always make sure the previous result was received so that we don't try
137  // to have multiple in flight. In case eg there really is a problem with
138  // our connectivity.
139  pthread_join(thread, 0);
140  }
141 
142  const LEMWebSettings& web = fParams.web();
143 
144  const std::string params = TString::Format("return_results?tag=%s&id=%s&timeout=%d",
145  tag.c_str(),
146  fWorkerID.c_str(),
147  web.queryTimeout()).Data();
148 
150  args->params = params;
151  args->postdata = pid.ToString();
152  args->web = web;
153 
154  pthread_create(&thread, 0, ReturnResultsThreadFunc, args);
155  }
156 
157  //......................................................................
159  {
161  evt.getByLabel(fParams.pidLabel(), pidcol);
162  assert(!pidcol.failedToGet());
163  assert(!pidcol->empty());
164 
166  evt.getByLabel(fParams.inputLabel(), inputcol);
167  assert(inputcol->size() == 1);
168  const std::string tag = (*inputcol)[0].tag;
169  assert(pidcol->size() == 1);
170 
171  ReturnResults(tag, (*pidcol)[0]);
172  }
173 
175 
176 } //namespace lem
177 ////////////////////////////////////////////////////////////////////////
void * ReturnResultsThreadFunc(void *x)
Runs after LEM on the high-mem node, returns results to the server.
Atom< std::string > host
void ReturnResults(const std::string &tag, const PIDDetails &pid) const
Atom< int > queryTimeout
OStream cerr
Definition: OStream.cxx:7
DEFINE_ART_MODULE(TestTMapFile)
::xsd::cxx::tree::time< char, simple_type > time
Definition: Database.h:194
PID
Definition: FillPIDs.h:14
std::string getenv(std::string const &name)
LEMWorkerOutputParams fParams
int evt
std::string post_query(const std::string &host, const std::string &query, const std::string &postdata, int minPort, int maxPort, int queryTimeout, int retryTimeout, bool &ok)
Make an HTTP POST query.
Definition: WebUtils.cxx:162
bool getByLabel(std::string const &label, std::string const &productInstanceName, Handle< PROD > &result) const
Definition: DataViewImpl.h:344
virtual void produce(art::Event &evt)
assert(nhit_max >=nhit_nbins)
std::string ToString() const
Definition: PIDDetails.cxx:9
Attach LEM-specific info to the base PID object.
Definition: PIDDetails.h:20
void Format(TGraph *gr, int lcol, int lsty, int lwid, int mcol, int msty, double msiz)
Definition: Style.cxx:154
Atom< int > retryTimeout
bool failedToGet() const
Definition: Handle.h:196
enum BeamMode string