NOvASocketInputDriver.cxx
Go to the documentation of this file.
1 ////////////////////////////////////////////////////////////////////////
2 /// \brief Source to read raw Events from a socket, per specifications
3 /// in NOvA-DocDB-3947 and NOvA-DocDB-4390.
4 ///
5 /// \author paterno@fnal.gov
6 ////////////////////////////////////////////////////////////////////////
8 
16 #include "cetlib/split.h"
17 #include "fhiclcpp/ParameterSet.h"
19 
21 #include "EventDispatcher_CommandSet/DispatcherCmd.h"
22 #include "NovaDAQConventions/DAQConventions.h"
23 #include "NovaTimingUtilities/TimingUtilities.h"
24 #include "RawData/DAQHeader.h"
25 #include "RawData/RawDigit.h"
26 #include "RawData/RawTrigger.h"
27 #include "RawData/FlatDAQData.h"
28 #include "RawData/RawSumDropMB.h"
29 #include "RawData/RawSumDCM.h"
30 #include "SummaryData/RunData.h"
31 
32 //
33 // A bit mask to tell us which errors to abort on. Use "0" if you want
34 // to ignore errors and try to keep running. When debugging, you can
35 // select which errors you want to focus on.
36 //
37 static const int gsMAGIC_WORD = 1<<0;
38 static const int gsUNPACK_RETURN = 1<<1;
39 static const int gsUNPACK_EXCEPTION = 1<<2;
40 static const int gsABORT_ON = 0;
41 
42 namespace daq2raw {
43  // Utility functions for this file, which do not need to be class
44  // members.
45 
46  /// Parse a string (the "filename") to yield an address and a port. The
47  /// "filename" is expected to have the form <address>:<port>, where
48  /// neither *address* nor *port* contain a colon.
52  {
53  std::vector<std::string> components;
54  cet::split(filename, ':', std::back_inserter(components));
55  if (components.size() != 2)
56  {
58  << "Bad socket address in NOvASocketDriver: "
59  << filename
60  << "\n";
61  }
62  address = components[0];
63  port = components[1];
64  }
65 
66  //---------------------------------------------------------------------
67  // Required constructor.
70  art::SourceHelper const& pm)
71  // look to see if the user provided a base file name for the gdml file, if not
72  // pass along an emptry string to default to the behavior previously in NOvAInputDriverBase
73  : NOvAInputDriverBase(pset.get<std::string>("GDMLFileBase",""))
74  , fSourceHelper(pm)
75  , fCurrentFilename()
76  , fCurrentSubRunID(-1, -1) ///< Has to be distinguished from the ones that are acually used
77  , fIoService()
78  , fSocket(fIoService)
79  , fCurrentEventCapacity(in_elements(65536 * 1024))
80  , fMaxEventSizeBytes(pset.get<size_t>("maxEventSizeK", 65536) * 1024)
81  , fSocketBuffer(fCurrentEventCapacity + 1) // Allow for data size info..
82  , fFilterCorruptedNanoslices(pset.get<bool>("FilterCorruptedNanoslices"))
83  , fDoFillFlatDAQData (pset.get<bool>("DoFillFlatDAQData"))
84  , fMaxReadAttempts (pset.get<int>("MaxReadAttempts",100))
85  , fCorruptEventCount(0)
86  , fConsecutiveCorruptEventCount(0)
87  , fBadReadCount(0)
88  , fConsecutiveBadReadCount(0)
89  , trigmask_word1(pset.get<uint32_t>("TriggerMask1",0x00ffffff))
90  , trigmask_word2(pset.get<uint32_t>("TriggerMask2",0xffffffff))
91  , trigmask_word3(pset.get<uint32_t>("TriggerMask3",0xffffffff))
92  , fTransmitDelay(pset.get<int>("TransmitDelay",1000))
93  {
94  // Declare products that *may* be in the event (or run, or subRun)
95  // here. Note the use of "reconstitutes" vs "produces": this is what
96  // allows us to specify module label, process name and (optionally)
97  // instance label for each product.
99  helper.reconstitutes<std::vector<rawdata::RawDigit>, art::InEvent>("daq");
100  helper.reconstitutes<std::vector<rawdata::RawTrigger>, art::InEvent>("daq");
101  helper.reconstitutes<std::vector<rawdata::RawSumDropMB>, art::InEvent>("daq");
102  helper.reconstitutes<std::vector<rawdata::RawSumDCM>, art::InEvent>("daq");
103  if(fDoFillFlatDAQData) helper.reconstitutes<std::vector<rawdata::FlatDAQData>, art::InEvent>("daq");
104  helper.reconstitutes<sumdata::RunData, art::InRun >("daq");
105  }
106 
107  //---------------------------------------------------------------------
108  // Required function: close currently-open file.
110  {
111  // Our "file" is a socket.
112  fSocket.close();
113  }
114 
115  //---------------------------------------------------------------------
116  /// Required function: open the file, "name" and construct and return a
117  /// new FileBlock object. MUST be successful or throw:
118  /// art::Exception(art::errors::FileOpenError) or
119  /// art::Exception(art::errors::FileReadError) are good candidates.
120  /// In this implementation, the role of the 'file' is played by a socket;
121  /// we interpret the "name" as the address and port number identifying
122  /// the socket.
124  art::FileBlock* &fb)
125  {
126  namespace ba = boost::asio;
127  namespace bai = boost::asio::ip;
130  std::string socketAddress;
131  parseFilename(fCurrentFilename, socketAddress, port);
132  try
133  {
134  // Create the file block now, since if we don't throw we're expected
135  // to return one.
136  fb = new art::FileBlock(art::FileFormatVersion(1, "NOvARawInput 2011a"),
138 
139  bai::tcp::resolver resolver(fIoService);
140  bai::tcp::resolver::query query(socketAddress, port);
141  bai::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
142  bai::tcp::resolver::iterator end;
143 
144  boost::system::error_code error = boost::asio::error::host_not_found;
145  while (error && endpoint_iterator != end)
146  {
147  fSocket.close();
148  fSocket.connect(*endpoint_iterator++, error);
149  }
150  if (error) throw boost::system::system_error(error);
151  static size_t const cb_size = 32;
152  // Put a command on the socket ...
153  //uint32_t dummy;
154  cb_t cmd_buffer(cb_size);
155 
156 #define SET_DISPATCHER_TRIGGER_MASKS 1
157 #ifdef SET_DISPATCHER_TRIGGER_MASKS
158  // Internal buffers for trigger mask commands
159  cb_t mask1_buffer(cb_size);
160  cb_t mask2_buffer(cb_size);
161  cb_t mask3_buffer(cb_size);
162 
163  // uint32_t trigmask_word1 = 0xFFFFFFFF;
164  // uint32_t trigmask_word2 = 0xFFFFFFFF;
165  // uint32_t trigmask_word3 = 0xFFFFFFFF;
166 
167  // Construct the commands to set the different trigger masks (1-3)
168  DispatcherCMD mask1_cmd(DispatcherCMD::DSPCMD_SET_TRIGGER_MASK_1, trigmask_word1 & 0x00ffffff, &mask1_buffer[0]);
171 
172  // Serialize the set mask commands to their transmit buffers
173  ssize_t cmd1_size_bytes = mask1_cmd.write();
174  ssize_t cmd2_size_bytes = mask2_cmd.write();
175  ssize_t cmd3_size_bytes = mask3_cmd.write();
176 
177  // Send the mask commands to the dispatcher
178  fSocket.send(boost::asio::buffer(&mask1_buffer[0], cmd1_size_bytes));
179  // ... and receive the acknowledgement.
180  // fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
181  usleep(fTransmitDelay);
182 
183  fSocket.send(boost::asio::buffer(&mask2_buffer[0], cmd2_size_bytes));
184  // ... and receive the acknowledgement.
185  // fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
186  usleep(fTransmitDelay);
187 
188 
189  fSocket.send(boost::asio::buffer(&mask3_buffer[0], cmd3_size_bytes));
190  // ... and receive the acknowledgement.
191  // fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
192  usleep(fTransmitDelay);
193 
194  // If we try to send too many messages to the dispatcher too rapidly, we will get stuck in a non-responsive state.
195  // So, after sending the message to the dispatcher to configure the trigger masks, we will pause for one second
196  // before sending the next message.
197  // usleep(fTransmitDelay);
198 
199 #endif
200 
202  &cmd_buffer[0]);
203  ssize_t cmd_size_bytes = single_cmd.write();
204  fSocket.send(boost::asio::buffer(&cmd_buffer[0], cmd_size_bytes));
205  usleep(fTransmitDelay);
206 
207  // ... and receive the acknowledgement.
208  fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
209 
210 #ifndef NOVA_DEBUG_STREAM_ACK
211  std::cerr << "Received: "
212  << boost::numeric_cast<unsigned>(cmd_buffer[0])
213  << " bytes.\n";
214  for (cb_t::const_iterator
215  i = cmd_buffer.begin(),
216  e = cmd_buffer.end();
217  i != e;
218  ++i) {
219  std::cerr << boost::numeric_cast<unsigned>(*i) << " ";
220  }
221  std::cerr << "\n";
222  for (size_t i = 0; i + 3 < cb_size; i += 4) {
223  std::cerr
224  << std::hex
225  << std::setfill('0')
226  << "0x"
227  << std::setw(8)
228  << *reinterpret_cast<uint32_t*>(&cmd_buffer[i]) << " ";
229  }
230  std::cerr << "\n";
231 #endif // NOVA_DEBUG_STREAM_ACK
232  if (cmd_buffer[0] > cb_size) {
233  mf::LogWarning("StreamReadFail")
234  << "Unable to read entire command acknowledgement of "
235  << cmd_buffer[0]
236  << " bytes: buffer size is only "
237  << cb_size
238  << " bytes.\n";
239  try {
240  if (fSocket.is_open()) fSocket.close();
241  } catch (...) {
242  // Ignore.
243  }
244  return; // Can't open is not fatal for this input source.
245  }
246  static char const ACK = 6;
247  unsigned char ack = cmd_buffer[1];
248  if (cmd_buffer[1] != ACK) { // No ACK
250  << "Did not receive ACK for SINGLE command: expected 0x"
251  << std::hex
252  << std::setfill('0')
253  << std::setw(2)
254  << static_cast<unsigned>(ACK)
255  << ", got 0x"
256  << std::setfill('0')
257  << std::setw(2)
258  << static_cast<unsigned>(ack)
259  << ".\n";
260  }
261  unsigned char cmd = cmd_buffer[2];
264  << "Received ACK for unexpected command 0x"
265  << std::hex
266  << std::setfill('0')
267  << std::setw(2)
268  << cmd
269  << " (expected 0x"
271  << ").\n";
272  }
273  }
275  {
276  mf::LogWarning("StreamOpenFail")
277  << "Unable to open socket to address "
279  << " for NOvASocketInputDriver, due to system error:\n"
280  << x.what()
281  << "\n";
282  try {
283  if (fSocket.is_open()) fSocket.close();
284  } catch (...) {
285  // Ignore.
286  }
287  }
288 }
289 
290  //---------------------------------------------------------------------
291  /// Required function: read and fill Run, SubRun and/or Event as might
292  /// be appropriate.
294  art::SubRunPrincipal* const &inSR,
295  art::RunPrincipal* &outR,
296  art::SubRunPrincipal* &outSR,
297  art::EventPrincipal* &outE) {
298 
299 
300  // Allocate and initialize here to allow for a proper use
301  // of a goto later
302  int EvtStartIDX =0;
303  size_t receivedBytes =0;
304  ssize_t cmd_size_bytes =0;
305  uint32_t eventSize =0;
306  boost::system::error_code error;
307  int readAttempts =0;
308  ssize_t bytesRead =0;
309 
310  receivedBytes=0;
311  cmd_size_bytes=0;
312 
313  retryLOOP:
314 
315  // Check that the socket is open
316  if (!fSocket.is_open()){
317  // If the socket is closed/has closed return back
318  return false;
319  }
320 
321  ////////////////////////////////////
322  // Send the 'NEXT_EVENT' command here, then read the resulting event.
323  try {
324  // Construct the NEXT_EVENT command object
326  &fSocketBuffer[0]);
327 
328  // Serialize the command out to the socketbuffer
329  cmd_size_bytes = next_event_cmd.write();
330 
331  // Now send the NEXT_EVENT command to the dispatcher source
332  fSocket.send(boost::asio::buffer(&fSocketBuffer[0], cmd_size_bytes));
333 
334  // Cast the buffer to an 8bit byte array from a 32bit word array
335  uint8_t*const pSocketBuffer= reinterpret_cast<uint8_t*const>(&fSocketBuffer[0]);
336 
337  // Reset the number of bytes that have been received
338  receivedBytes =0;
339  bytesRead =0;
340 
341  // Read the first 4 words of the event
342  // this allows us to retrieve the total
343  // event size from the response:
344  while ( (receivedBytes < 16) ) {
345  ++readAttempts;
346  // If we reach our maximum number of retrys
347  // then we throw
348  if(readAttempts > fMaxReadAttempts){
349  throw "Maximum number of retrys reached on byte count read";
350  }
351 
352  // Attempt to read the header
353  bytesRead = fSocket.receive(boost::asio::buffer(pSocketBuffer + receivedBytes, 16 - receivedBytes ),
354  0,
355  error);
356 
357  // If we receive zero bytes back from the receive call
358  // this indicates an error. We go to check the error code
359  // in this case and take the appropriate action.
360  if(bytesRead == 0){
361  // Catch errors in the IO system
362  if(error==boost::asio::error::eof){
363  return false;
364  }else if(error){
365  // Some other error occured
366  throw boost::system::system_error(error);
367  } // endif error
368  } // endif bytesRead
369 
370  // Increment the total number of bytes that have been received
371  receivedBytes += bytesRead;
372 
373  // receive is a blocking IO call. There is no reason to sleep.
374  // usleep(1);
375  }
376 
377  //
378  // Debugging - we should always find the magic number 0xe929e929
379  // as the second word in the stream. If we don't there may be
380  // problems. Take a look at the start of the stream.
381  //
382  if (fSocketBuffer[1]!=0xe929e929) {
383  std::cerr << __FILE__ " : " << __LINE__
384  << " Error: Bad magic word at fSocketBuffer[1]. Will try shifting..."
385  << std::endl;
386  if (gsABORT_ON & gsMAGIC_WORD) abort();
387  }
388 
389  //
390  // Seek in the stream until we find the magic 0xe929e929. This
391  // should be the second word so don't look too far.
392  //
393  int kEvtStartIDXMax = 8;
394  for (EvtStartIDX=0; EvtStartIDX<kEvtStartIDXMax; ++EvtStartIDX) {
395  if(0xe929e929 == fSocketBuffer[EvtStartIDX+1]) break;
396  }
397  if (EvtStartIDX>=kEvtStartIDXMax) {
398  throw std::string("Unable to determine byte count on network read");
399  }
400  eventSize = ntohl(fSocketBuffer[EvtStartIDX]);
401 
402  // Don't modify the data stream in place. This can result
403  // in a mixed endian problem and we don't want to deal with it.
404  // fSocketBuffer[0] = ntohl(fSocketBuffer[0]);
405 
406 #ifndef NOVA_DEBUG_STREAM_ACK
407  std::cerr << "Event contains " << eventSize << " bytes.\n";
408 #endif // NOVA_DEBUG_STREAM_ACK
409 
410  if(eventSize == 0){
411  printf("EVT DATA:\n0x%08x 0x%08x 0x%08x 0x%08x\n",
414  }
415 
416  // Trap on the various bad event values
417 
418  // An "event size" of 0xFFFFFFFF means END_OF_DATA, so return false.
419  switch(eventSize){
420  case 0:
421  // This is an empty event. This indicates a problem
422  // with the network stream
423  ++fBadReadCount;
425  eventSize = ntohl(fSocketBuffer[1]);
426  EvtStartIDX = 1;
427  break;
428  case 0xFFFFFFFF:
429  // This is the event corrupt byte count. This indicates a
430  // problem with the dispatchers ability to buffer the event
433  return false;
434  break;
435  default:
436  // This is a normal event, we reset our
437  // consecutive error counters
440  break;
441  }
442 
443  // If we make it to this point then we probably have a good event
444  // header in the buffer. We will use this to get the rest of the event
445  // data off of the network.
446 
447  if( eventSize > (ssize_t)fMaxEventSizeBytes){
449  << "Unable to read entire event "
450  << eventSize
451  << " bytes: buffer size is only "
452  << fMaxEventSizeBytes
453  << " bytes.\n";
454  } else if ( eventSize > fCurrentEventCapacity) {
455  fSocketBuffer.resize( eventSize + 1);
456  fCurrentEventCapacity = eventSize;
459  }
460 
461  // Read the response:
462 
463  while ( receivedBytes < (eventSize+4) )
464  {
465  size_t remainingBytes = eventSize +4 -receivedBytes;
466  remainingBytes = remainingBytes > 1400? 1400:remainingBytes;
467 
468  receivedBytes += fSocket.receive(boost::asio::buffer(pSocketBuffer+receivedBytes, remainingBytes));
469 
470  }
471 
472 
473  } catch (boost::system::system_error& x) {
474  std::cerr << " A socket read error should be treated as END_OF_DATA.\n";
475  return false;
476  }
477 
478 
479  ////////////////////////////////////
480  // Unpack the data.
481 
482  // Need to convert the data to host byte order from network byte order
483  // (event size has already been done).
484  /*
485  size_t count_check = 0;
486  for (eb_t::iterator
487  i = fSocketBuffer.begin() + 1,
488  e = i + in_elements(fSocketBuffer[0]);
489  i != e;
490  ++i, ++count_check) {
491  *i = ntohl(*i);
492  }
493 
494  #ifndef NOVA_DEBUG_STREAM_ACK
495  std::cerr << "Converted " << in_bytes(count_check) << " bytes.\n";
496  #endif // NOVA_DEBUG_STREAM_ACK
497  */
498  // NOTE: As of OnlineUnpack.cxx r1.9, the histogram pointers required
499  // in the constructor are *not* filled. In the event that one *does*
500  // want this kind of information, the data to be histogrammed should
501  // be put into event products (not necessarily written out to file)
502  // and turned into histograms by a downstream analyzer. This will
503  // enable easy management of different histograms for diferent runs,
504  // subruns, etc, etc and trivial control of whether histograns are
505  // actually wanted or not.
506  daq2raw::OnlineUnpack up(nullptr, nullptr, nullptr, &fCurrentFilename);
508 
509 
510  /// Define data structures, which will be filled by OnlineUnpack::ProcessRawEvent
511  std::unique_ptr<std::vector<rawdata::RawDigit> > digits(new std::vector<rawdata::RawDigit>);
512  rawdata::RawTrigger raw_trigger;
513  rawdata::RawSumDropMB raw_sumDropMB;
514  rawdata::RawSumDCM raw_sumDCM;
515  rawdata::FlatDAQData flat_daq_data;
516  rawdata::DAQHeader daq_header;
517 
518  // Fill the data structures for this event.
519  // MFP: Who is responsible for deleting the vector 'digits'? At what point
520  // is the responsibility for doing so accepted?
521  // ProcessRawEvent expects the first argument to be the first word of *data* (not the size
522  // of the event data), thus the +1 in addressing the vector.
523  bool unpackok = false;
524  try {
525  unpackok = up.ProcessRawEvent(&fSocketBuffer[EvtStartIDX+1],
526  digits.get(),
527  &raw_trigger,
528  &raw_sumDropMB,
529  &raw_sumDCM,
530  &flat_daq_data, &daq_header);
531  if (!unpackok) {
532  std::cerr << __FILE__ << ":" << __LINE__
533  << " Error: up.ProcessRawEvent(...) unpack failed"
534  << std::endl;
535  if (gsABORT_ON & gsUNPACK_RETURN) abort();
536  }
537  }
538  catch (...) {
539  std::cerr << __FILE__ << ":" << __LINE__
540  << " Error: Caught exception thrown from up.ProcessRawEvent(...)" << std::endl;
541  unpackok = false;
542  if (gsABORT_ON & gsUNPACK_EXCEPTION) abort();
543  }
544 
545  if (!unpackok)
546  {
547  std::cerr << " ProcessRawEvent failed.\n";
548  try{
549  //drain leftovers
550  // fSocket.receive(boost::asio::buffer(&fSocketBuffer[EvtStartIDX], fCurrentEventCapacity ));
551  //
552  // messier@indiana.edu - This "fSocket.receive" never
553  // completes. My tests show that maybe its not needed. I've
554  // commented it out until its better understood.
555  } catch (boost::system::system_error& x) {
556  std::cerr << " A socket read error should be treated as END_OF_DATA.\n";
557  return false;
558  }
559  goto retryLOOP; //FIXME: server code needs to use double buffering
560 
561  return false;
562  }
563 
564  // Get the actual timestamp here - use the current time as the
565  // default, maybe the default should really be a ridiculous value
566  struct timespec ts;
567 
570 
571  // art::Timestamp has the seconds since the beginning of the epoch
572  // in the upper 32 bits and the fractional seconds in the lower 32
573  // bits.
574  art::Timestamp tstamp = (ts.tv_sec << 32) | ts.tv_nsec;
575 
576  ////////////////////////////////////
577  // Assemble new principals as required.
578  art::SubRunID newID(up.getRunNumber(), up.getSubrunNumber());
579  if (inR == nullptr || fCurrentSubRunID.runID() != newID.runID()) { // New Run fragment
580  outR = fSourceHelper.makeRunPrincipal(up.getRunNumber(), tstamp);
581  // get the base form of the geometry file for the current detector
582  //\todo at some point we should query a database to get this, especially
583  //\todo when we have instances of only part of a detector working
584  std::string detFileBase = getDetectorGDML(up.getDetId());
585  std::unique_ptr<sumdata::RunData> rundata(new sumdata::RunData(novadaq::cnv::DetId(up.getDetId()), detFileBase));
586  art::put_product_in_principal(std::move(rundata),
587  *outR,
588  "daq"); // Module label
589  }
590  if (inSR == nullptr || fCurrentSubRunID != newID) { // New SubRun fragment
592  up.getSubrunNumber(),
593  tstamp);
594  fCurrentSubRunID = newID;
595  }
597  up.getSubrunNumber(),
598  up.getEventNumber(),
599  tstamp);
600 
601  ////////////////////////////////////
602  // Add the event products to the event:
603 
604  // Digitized data:
605  art::put_product_in_principal(std::move(digits), *outE, "daq");
606  // Trigger data:
607  std::unique_ptr<std::vector<rawdata::RawTrigger> > rtcol(new std::vector<rawdata::RawTrigger>);
608  rtcol->push_back(raw_trigger); // Read one, product is a vector.
609  art::put_product_in_principal(std::move(rtcol),
610  *outE,
611  "daq"); // Module label
612 
613  // RawSumDropMB data:
614  std::unique_ptr<std::vector<rawdata::RawSumDropMB> > rtcol_MB(new std::vector<rawdata::RawSumDropMB>);
615  rtcol_MB->push_back(raw_sumDropMB); // Read one, product is a vector.
616  art::put_product_in_principal(std::move(rtcol_MB),
617  *outE,
618  "daq"); // Module label
619 
620  // RawSumDCM data:
621  std::unique_ptr<std::vector<rawdata::RawSumDCM> > rtcol_DCM(new std::vector<rawdata::RawSumDCM>);
622  rtcol_DCM->push_back(raw_sumDCM); // Read one, product is a vector.
623  art::put_product_in_principal(std::move(rtcol_DCM),
624  *outE,
625  "daq"); // Module label
626 
627  // DAQ header:
628  std::unique_ptr<rawdata::DAQHeader > daqcol(new rawdata::DAQHeader(daq_header));
629  art::put_product_in_principal(std::move(daqcol),
630  *outE,
631  "daq"); // Module label
632 
633  /// Put FlatDAQData object
634  if(fDoFillFlatDAQData) {
635  std::unique_ptr<std::vector<rawdata::FlatDAQData> > flatcol(new std::vector<rawdata::FlatDAQData>);
636  flatcol->push_back(flat_daq_data); // Read one, product is a vector.
637  art::put_product_in_principal(std::move(flatcol),
638  *outE,
639  "daq"); // Module label
640  }
641 
642 
643  return true;
644  }
645 
646 }// end of namespace
static const int gsMAGIC_WORD
Source to read raw Events from a socket, per specifications in NOvA-DocDB-3947 and NOvA-DocDB-4390...
std::string getDetectorGDML(const int detid) const
const XML_Char * name
Definition: expat.h:151
EventPrincipal * makeEventPrincipal(EventAuxiliary const &eventAux, std::shared_ptr< History > &&history) const
static const int gsUNPACK_RETURN
int fConsecutiveBadReadCount
Counter for the number of consecutive corrupt events encountered.
dictionary components
int fTransmitDelay
Delay in microseconds to wait between command transmission.
void system_error(const char *function, const char *name, const int &y, const char *msg1, const char *msg2)
std::enable_if_t< P::branch_type==InEvent||P::branch_type==InResults > put_product_in_principal(std::unique_ptr< T > &&product, P &principal, std::string const &module_label, std::string const &instance_name={})
uint32_t getSubrunNumber() const
Definition: OnlineUnpack.h:80
static size_t in_bytes(size_t nElements)
static const int gsABORT_ON
bool fFilterCorruptedNanoslices
Do we need to filter out Corrupted NanoSlices?
Definition: OnlineUnpack.h:96
OStream cerr
Definition: OStream.cxx:7
TString ip
Definition: loadincs.C:5
string filename
Definition: shutoffs.py:106
static const int gsUNPACK_EXCEPTION
int fConsecutiveCorruptEventCount
Counter for the number of consecutive corrupt events encountered.
bool convertNovaTimeToUnixTime(uint64_t const &inputNovaTime, struct timespec &outputUnixTime)
bool fDoFillFlatDAQData
Do we need to fill FlatDAQData object?
::xsd::cxx::tree::buffer< char > buffer
Definition: Database.h:179
art::SourceHelper const & fSourceHelper
Class to help with {Run, SubRun, Event}Principal construction.
TypeLabel const & reconstitutes(std::string const &modLabel, std::string const &instanceName={})
RunID const & runID() const
Definition: SubRunID.h:77
string cmd
Definition: run_hadd.py:52
void parseFilename(std::string const &filename, std::string &address, std::string &port)
uint32_t trigmask_word2
Middle word (bits 31-63) in the trigger mask.
art::SubRunID fCurrentSubRunID
Keep track of the current subRun details.
RunPrincipal * makeRunPrincipal(RunAuxiliary const &runAux) const
printf("%d Experimental points found\n", nlines)
size_t fMaxEventSizeBytes
Will not try to read an event bigger than this.
NOvASocketInputDriver(fhicl::ParameterSet const &pset, art::ProductRegistryHelper &help, art::SourceHelper const &pm)
A module to produce rawdata::RawDigits from NOvADDT DAQHits.
bool fFilterCorruptedNanoslices
Filter the corrupted nanaslices in the unpacking?
Source to read raw Events from a socket, per specifications in NOvA-DocDB-3947 and NOvA-DocDB-4390...
void readFile(std::string const &name, art::FileBlock *&fb)
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
::xsd::cxx::tree::string< char, simple_type > string
Definition: Database.h:154
unsigned long long fTriggerTimingMarker_TimeStart
Definition: RawTrigger.h:38
std::string fCurrentFilename
Hang on to the current filename.
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
uint32_t trigmask_word1
Lowest word in the trigger mask only the lower 24 bit are used.
uint32_t trigmask_word3
High word (bits 64-96) in the trigger mask.
SubRunPrincipal * makeSubRunPrincipal(SubRunAuxiliary const &subRunAux) const
int getDetId() const
Definition: OnlineUnpack.h:82
void split(std::string const &s, char c, OutIter dest)
Definition: split.h:35
std::vector< uint8_t > cb_t
Command buffer type.
boost::asio::io_service fIoService
Object representing IO services of the OS.
uint32_t getRunNumber() const
Definition: OnlineUnpack.h:79
Float_t e
Definition: plot.C:35
int fCorruptEventCount
Counter for the total number of corrupt events encountered.
bool readNext(art::RunPrincipal *const &inR, art::SubRunPrincipal *const &inSR, art::RunPrincipal *&outR, art::SubRunPrincipal *&outSR, art::EventPrincipal *&outE)
int port
Definition: client_test.C:9
boost::asio::ip::tcp::socket fSocket
Object representing the socket from which we read.
bool ProcessRawEvent(void *rawevent_buffer, std::vector< rawdata::RawDigit > *, rawdata::RawTrigger *, rawdata::RawSumDropMB *, rawdata::RawSumDCM *, rawdata::FlatDAQData *, rawdata::DAQHeader *)
Unpack the RawEvent.
unsigned long long getEventNumber() const
Definition: OnlineUnpack.h:81
int fBadReadCount
Counter for the total number of corrupt events encountered.
int fMaxReadAttempts
Maximum number of read attempts before we throw.
size_t fCurrentEventCapacity
Current event capacity in buffer elements (current buffer size is 1 more than this);.
std::vector< uint32_t > fSocketBuffer
Raw data buffer for socket communication.