Public Member Functions | Protected Member Functions | Private Types | Static Private Member Functions | Private Attributes | List of all members
daq2raw::NOvASocketInputDriver Class Reference

#include "/cvmfs/nova-development.opensciencegrid.org/novasoft/releases/N20-11-28/DAQ2RawDigit/NOvASocketInputDriver.h"

Inheritance diagram for daq2raw::NOvASocketInputDriver:
daq2raw::NOvAInputDriverBase

Public Member Functions

 NOvASocketInputDriver (fhicl::ParameterSet const &pset, art::ProductRegistryHelper &help, art::SourceHelper const &pm)
 
void closeCurrentFile ()
 
void readFile (std::string const &name, art::FileBlock *&fb)
 
bool readNext (art::RunPrincipal *const &inR, art::SubRunPrincipal *const &inSR, art::RunPrincipal *&outR, art::SubRunPrincipal *&outSR, art::EventPrincipal *&outE)
 

Protected Member Functions

std::string getDetectorGDML (const int detid) const
 

Private Types

typedef std::vector< uint32_t > eb_t
 Event buffer type. More...
 
typedef eb_t::value_type ebe_t
 Event buffer element type. More...
 
typedef std::vector< uint8_t > cb_t
 Command buffer type. More...
 

Static Private Member Functions

static size_t in_bytes (size_t nElements)
 
static size_t in_elements (size_t nBytes)
 

Private Attributes

art::SourceHelper const & fSourceHelper
 Class to help with {Run, SubRun, Event}Principal construction. More...
 
std::string fCurrentFilename
 Hang on to the current filename. More...
 
art::SubRunID fCurrentSubRunID
 Keep track of the current subRun details. More...
 
boost::asio::io_service fIoService
 Object representing IO services of the OS. More...
 
boost::asio::ip::tcp::socket fSocket
 Object representing the socket from which we read. More...
 
size_t fCurrentEventCapacity
 Current event capacity in buffer elements (current buffer size is 1 more than this);. More...
 
size_t fMaxEventSizeBytes
 Will not try to read an event bigger than this. More...
 
std::vector< uint32_t > fSocketBuffer
 Raw data buffer for socket communication. More...
 
bool fFilterCorruptedNanoslices
 Filter the corrupted nanaslices in the unpacking? More...
 
bool fDoFillFlatDAQData
 Do we need to fill FlatDAQData object? More...
 
int fMaxReadAttempts
 Maximum number of read attempts before we throw. More...
 
int fCorruptEventCount
 Counter for the total number of corrupt events encountered. More...
 
int fConsecutiveCorruptEventCount
 Counter for the number of consecutive corrupt events encountered. More...
 
int fBadReadCount
 Counter for the total number of corrupt events encountered. More...
 
int fConsecutiveBadReadCount
 Counter for the number of consecutive corrupt events encountered. More...
 
uint32_t trigmask_word1
 Lowest word in the trigger mask only the lower 24 bit are used. More...
 
uint32_t trigmask_word2
 Middle word (bits 31-63) in the trigger mask. More...
 
uint32_t trigmask_word3
 High word (bits 64-96) in the trigger mask. More...
 
int fTransmitDelay
 Delay in microseconds to wait between command transmission. More...
 

Detailed Description

Definition at line 49 of file NOvASocketInputDriver.h.

Member Typedef Documentation

typedef std::vector<uint8_t> daq2raw::NOvASocketInputDriver::cb_t
private

Command buffer type.

Definition at line 71 of file NOvASocketInputDriver.h.

typedef std::vector<uint32_t> daq2raw::NOvASocketInputDriver::eb_t
private

Event buffer type.

Definition at line 69 of file NOvASocketInputDriver.h.

typedef eb_t::value_type daq2raw::NOvASocketInputDriver::ebe_t
private

Event buffer element type.

Definition at line 70 of file NOvASocketInputDriver.h.

Constructor & Destructor Documentation

daq2raw::NOvASocketInputDriver::NOvASocketInputDriver ( fhicl::ParameterSet const &  pset,
art::ProductRegistryHelper help,
art::SourceHelper const &  pm 
)

Class to fill the constraints on a template argument to the class, art::Source

Definition at line 68 of file NOvASocketInputDriver.cxx.

References fDoFillFlatDAQData, art::InEvent, art::InRun, and art::ProductRegistryHelper::reconstitutes().

73  : NOvAInputDriverBase(pset.get<std::string>("GDMLFileBase",""))
74  , fSourceHelper(pm)
76  , fCurrentSubRunID(-1, -1) ///< Has to be distinguished from the ones that are acually used
77  , fIoService()
79  , fCurrentEventCapacity(in_elements(65536 * 1024))
80  , fMaxEventSizeBytes(pset.get<size_t>("maxEventSizeK", 65536) * 1024)
81  , fSocketBuffer(fCurrentEventCapacity + 1) // Allow for data size info..
82  , fFilterCorruptedNanoslices(pset.get<bool>("FilterCorruptedNanoslices"))
83  , fDoFillFlatDAQData (pset.get<bool>("DoFillFlatDAQData"))
84  , fMaxReadAttempts (pset.get<int>("MaxReadAttempts",100))
87  , fBadReadCount(0)
89  , trigmask_word1(pset.get<uint32_t>("TriggerMask1",0x00ffffff))
90  , trigmask_word2(pset.get<uint32_t>("TriggerMask2",0xffffffff))
91  , trigmask_word3(pset.get<uint32_t>("TriggerMask3",0xffffffff))
92  , fTransmitDelay(pset.get<int>("TransmitDelay",1000))
93  {
94  // Declare products that *may* be in the event (or run, or subRun)
95  // here. Note the use of "reconstitutes" vs "produces": this is what
96  // allows us to specify module label, process name and (optionally)
97  // instance label for each product.
98  helper.reconstitutes<rawdata::DAQHeader, art::InEvent>("daq");
99  helper.reconstitutes<std::vector<rawdata::RawDigit>, art::InEvent>("daq");
100  helper.reconstitutes<std::vector<rawdata::RawTrigger>, art::InEvent>("daq");
101  helper.reconstitutes<std::vector<rawdata::RawSumDropMB>, art::InEvent>("daq");
102  helper.reconstitutes<std::vector<rawdata::RawSumDCM>, art::InEvent>("daq");
103  if(fDoFillFlatDAQData) helper.reconstitutes<std::vector<rawdata::FlatDAQData>, art::InEvent>("daq");
104  helper.reconstitutes<sumdata::RunData, art::InRun >("daq");
105  }
int fConsecutiveBadReadCount
Counter for the number of consecutive corrupt events encountered.
int fTransmitDelay
Delay in microseconds to wait between command transmission.
static size_t in_elements(size_t nBytes)
int fConsecutiveCorruptEventCount
Counter for the number of consecutive corrupt events encountered.
bool fDoFillFlatDAQData
Do we need to fill FlatDAQData object?
art::SourceHelper const & fSourceHelper
Class to help with {Run, SubRun, Event}Principal construction.
uint32_t trigmask_word2
Middle word (bits 31-63) in the trigger mask.
NOvAInputDriverBase(std::string gdmlBase="")
art::SubRunID fCurrentSubRunID
Keep track of the current subRun details.
size_t fMaxEventSizeBytes
Will not try to read an event bigger than this.
bool fFilterCorruptedNanoslices
Filter the corrupted nanaslices in the unpacking?
std::string fCurrentFilename
Hang on to the current filename.
uint32_t trigmask_word1
Lowest word in the trigger mask only the lower 24 bit are used.
uint32_t trigmask_word3
High word (bits 64-96) in the trigger mask.
boost::asio::io_service fIoService
Object representing IO services of the OS.
int fCorruptEventCount
Counter for the total number of corrupt events encountered.
boost::asio::ip::tcp::socket fSocket
Object representing the socket from which we read.
int fBadReadCount
Counter for the total number of corrupt events encountered.
int fMaxReadAttempts
Maximum number of read attempts before we throw.
size_t fCurrentEventCapacity
Current event capacity in buffer elements (current buffer size is 1 more than this);.
std::vector< uint32_t > fSocketBuffer
Raw data buffer for socket communication.
enum BeamMode string

Member Function Documentation

void daq2raw::NOvASocketInputDriver::closeCurrentFile ( )

Definition at line 109 of file NOvASocketInputDriver.cxx.

References fSocket.

110  {
111  // Our "file" is a socket.
112  fSocket.close();
113  }
boost::asio::ip::tcp::socket fSocket
Object representing the socket from which we read.
std::string daq2raw::NOvAInputDriverBase::getDetectorGDML ( const int  detid) const
protectedinherited

Definition at line 22 of file NOvAInputDriverBase.cxx.

References daq2raw::NOvAInputDriverBase::fGDMLBase, novadaq::cnv::kFARDET, novadaq::cnv::kNDOS, novadaq::cnv::kNEARDET, novadaq::cnv::kTESTBEAM, and string.

Referenced by daq2raw::DAQHit2Raw::beginRun(), daq2raw::NOvARawInputDriver::readNext(), readNext(), and daq2raw::TestBeamRawInputDriver::readNext().

22  {
23 
24  // get the base form of the geometry file for the current detector
25  if(!fGDMLBase.empty()) return fGDMLBase;
26 
27  //\todo at some point we should query a database to get this, especially
28  //\todo when we have instances of only part of a detector working
29  std::string detFileBase("");
30  if (detid == novadaq::cnv::kNDOS)
31  detFileBase = "ndos-2x3-4block_nomc-xtru-vacuum";
32  else if (detid == novadaq::cnv::kNEARDET)
33  detFileBase = "neardet-3x3-8block-xtru-vacuum-stagger";
34  else if(detid == novadaq::cnv::kFARDET)
35  detFileBase = "fardet-12x12-28block-xtru-vacuum-stagger-pivoter";
36  else if(detid == novadaq::cnv::kTESTBEAM)
37  detFileBase = "testbeam-2x2-2block-xtru-vacuum-stagger";
38 
39  return detFileBase;
40  }
Far Detector at Ash River, MN.
Prototype Near Detector on the surface at FNAL.
Near Detector in the NuMI cavern.
enum BeamMode string
static size_t daq2raw::NOvASocketInputDriver::in_bytes ( size_t  nElements)
inlinestaticprivate

Definition at line 73 of file NOvASocketInputDriver.h.

Referenced by readNext().

73  {
74  return nElements * sizeof(ebe_t);
75  }
eb_t::value_type ebe_t
Event buffer element type.
static size_t daq2raw::NOvASocketInputDriver::in_elements ( size_t  nBytes)
inlinestaticprivate

Definition at line 77 of file NOvASocketInputDriver.h.

References fillBadChanDBTables::result.

77  {
78  size_t result = nBytes / sizeof(ebe_t);
79  return (in_bytes(result) == nBytes)?
80  result:
81  result+1;
82  }
eb_t::value_type ebe_t
Event buffer element type.
static size_t in_bytes(size_t nElements)
void daq2raw::NOvASocketInputDriver::readFile ( std::string const &  name,
art::FileBlock *&  fb 
)

Required function: open the file, "name" and construct and return a new FileBlock object. MUST be successful or throw: art::Exception(art::errors::FileOpenError) or art::Exception(art::errors::FileReadError) are good candidates. In this implementation, the role of the 'file' is played by a socket; we interpret the "name" as the address and port number identifying the socket.

Definition at line 123 of file NOvASocketInputDriver.cxx.

References om::cerr, run_hadd::cmd, DispatcherCMD::DSPCMD_SET_EVENTSTREAM_SINGLE, DispatcherCMD::DSPCMD_SET_TRIGGER_MASK_1, DispatcherCMD::DSPCMD_SET_TRIGGER_MASK_2, DispatcherCMD::DSPCMD_SET_TRIGGER_MASK_3, e, febshutoff_auto::end, fCurrentFilename, art::errors::FileReadError, fIoService, fSocket, fTransmitDelay, MECModelEnuComparisons::i, ip, daq2raw::parseFilename(), port, febshutoff_auto::query, string, stan::math::system_error(), trigmask_word1, trigmask_word2, trigmask_word3, DispatcherCMD::write(), and submit_syst::x.

125  {
126  namespace ba = boost::asio;
127  namespace bai = boost::asio::ip;
130  std::string socketAddress;
131  parseFilename(fCurrentFilename, socketAddress, port);
132  try
133  {
134  // Create the file block now, since if we don't throw we're expected
135  // to return one.
136  fb = new art::FileBlock(art::FileFormatVersion(1, "NOvARawInput 2011a"),
138 
139  bai::tcp::resolver resolver(fIoService);
140  bai::tcp::resolver::query query(socketAddress, port);
141  bai::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
142  bai::tcp::resolver::iterator end;
143 
144  boost::system::error_code error = boost::asio::error::host_not_found;
145  while (error && endpoint_iterator != end)
146  {
147  fSocket.close();
148  fSocket.connect(*endpoint_iterator++, error);
149  }
150  if (error) throw boost::system::system_error(error);
151  static size_t const cb_size = 32;
152  // Put a command on the socket ...
153  //uint32_t dummy;
154  cb_t cmd_buffer(cb_size);
155 
156 #define SET_DISPATCHER_TRIGGER_MASKS 1
157 #ifdef SET_DISPATCHER_TRIGGER_MASKS
158  // Internal buffers for trigger mask commands
159  cb_t mask1_buffer(cb_size);
160  cb_t mask2_buffer(cb_size);
161  cb_t mask3_buffer(cb_size);
162 
163  // uint32_t trigmask_word1 = 0xFFFFFFFF;
164  // uint32_t trigmask_word2 = 0xFFFFFFFF;
165  // uint32_t trigmask_word3 = 0xFFFFFFFF;
166 
167  // Construct the commands to set the different trigger masks (1-3)
168  DispatcherCMD mask1_cmd(DispatcherCMD::DSPCMD_SET_TRIGGER_MASK_1, trigmask_word1 & 0x00ffffff, &mask1_buffer[0]);
171 
172  // Serialize the set mask commands to their transmit buffers
173  ssize_t cmd1_size_bytes = mask1_cmd.write();
174  ssize_t cmd2_size_bytes = mask2_cmd.write();
175  ssize_t cmd3_size_bytes = mask3_cmd.write();
176 
177  // Send the mask commands to the dispatcher
178  fSocket.send(boost::asio::buffer(&mask1_buffer[0], cmd1_size_bytes));
179  // ... and receive the acknowledgement.
180  // fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
181  usleep(fTransmitDelay);
182 
183  fSocket.send(boost::asio::buffer(&mask2_buffer[0], cmd2_size_bytes));
184  // ... and receive the acknowledgement.
185  // fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
186  usleep(fTransmitDelay);
187 
188 
189  fSocket.send(boost::asio::buffer(&mask3_buffer[0], cmd3_size_bytes));
190  // ... and receive the acknowledgement.
191  // fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
192  usleep(fTransmitDelay);
193 
194  // If we try to send too many messages to the dispatcher too rapidly, we will get stuck in a non-responsive state.
195  // So, after sending the message to the dispatcher to configure the trigger masks, we will pause for one second
196  // before sending the next message.
197  // usleep(fTransmitDelay);
198 
199 #endif
200 
202  &cmd_buffer[0]);
203  ssize_t cmd_size_bytes = single_cmd.write();
204  fSocket.send(boost::asio::buffer(&cmd_buffer[0], cmd_size_bytes));
205  usleep(fTransmitDelay);
206 
207  // ... and receive the acknowledgement.
208  fSocket.receive(boost::asio::buffer(&cmd_buffer[0], cb_size));
209 
210 #ifndef NOVA_DEBUG_STREAM_ACK
211  std::cerr << "Received: "
212  << boost::numeric_cast<unsigned>(cmd_buffer[0])
213  << " bytes.\n";
214  for (cb_t::const_iterator
215  i = cmd_buffer.begin(),
216  e = cmd_buffer.end();
217  i != e;
218  ++i) {
219  std::cerr << boost::numeric_cast<unsigned>(*i) << " ";
220  }
221  std::cerr << "\n";
222  for (size_t i = 0; i + 3 < cb_size; i += 4) {
223  std::cerr
224  << std::hex
225  << std::setfill('0')
226  << "0x"
227  << std::setw(8)
228  << *reinterpret_cast<uint32_t*>(&cmd_buffer[i]) << " ";
229  }
230  std::cerr << "\n";
231 #endif // NOVA_DEBUG_STREAM_ACK
232  if (cmd_buffer[0] > cb_size) {
233  mf::LogWarning("StreamReadFail")
234  << "Unable to read entire command acknowledgement of "
235  << cmd_buffer[0]
236  << " bytes: buffer size is only "
237  << cb_size
238  << " bytes.\n";
239  try {
240  if (fSocket.is_open()) fSocket.close();
241  } catch (...) {
242  // Ignore.
243  }
244  return; // Can't open is not fatal for this input source.
245  }
246  static char const ACK = 6;
247  unsigned char ack = cmd_buffer[1];
248  if (cmd_buffer[1] != ACK) { // No ACK
250  << "Did not receive ACK for SINGLE command: expected 0x"
251  << std::hex
252  << std::setfill('0')
253  << std::setw(2)
254  << static_cast<unsigned>(ACK)
255  << ", got 0x"
256  << std::setfill('0')
257  << std::setw(2)
258  << static_cast<unsigned>(ack)
259  << ".\n";
260  }
261  unsigned char cmd = cmd_buffer[2];
264  << "Received ACK for unexpected command 0x"
265  << std::hex
266  << std::setfill('0')
267  << std::setw(2)
268  << cmd
269  << " (expected 0x"
271  << ").\n";
272  }
273  }
275  {
276  mf::LogWarning("StreamOpenFail")
277  << "Unable to open socket to address "
279  << " for NOvASocketInputDriver, due to system error:\n"
280  << x.what()
281  << "\n";
282  try {
283  if (fSocket.is_open()) fSocket.close();
284  } catch (...) {
285  // Ignore.
286  }
287  }
288 }
const XML_Char * name
Definition: expat.h:151
int fTransmitDelay
Delay in microseconds to wait between command transmission.
void system_error(const char *function, const char *name, const int &y, const char *msg1, const char *msg2)
OStream cerr
Definition: OStream.cxx:7
TString ip
Definition: loadincs.C:5
::xsd::cxx::tree::buffer< char > buffer
Definition: Database.h:179
string cmd
Definition: run_hadd.py:52
void parseFilename(std::string const &filename, std::string &address, std::string &port)
uint32_t trigmask_word2
Middle word (bits 31-63) in the trigger mask.
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
std::string fCurrentFilename
Hang on to the current filename.
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
uint32_t trigmask_word1
Lowest word in the trigger mask only the lower 24 bit are used.
uint32_t trigmask_word3
High word (bits 64-96) in the trigger mask.
std::vector< uint8_t > cb_t
Command buffer type.
boost::asio::io_service fIoService
Object representing IO services of the OS.
Float_t e
Definition: plot.C:35
int port
Definition: client_test.C:9
boost::asio::ip::tcp::socket fSocket
Object representing the socket from which we read.
enum BeamMode string
bool daq2raw::NOvASocketInputDriver::readNext ( art::RunPrincipal *const &  inR,
art::SubRunPrincipal *const &  inSR,
art::RunPrincipal *&  outR,
art::SubRunPrincipal *&  outSR,
art::EventPrincipal *&  outE 
)

Required function: read and fill Run, SubRun and/or Event as might be appropriate.

Define data structures, which will be filled by OnlineUnpack::ProcessRawEvent

Put FlatDAQData object

Definition at line 293 of file NOvASocketInputDriver.cxx.

References om::cerr, submit_syst::const, novadaq::timeutils::convertNovaTimeToUnixTime(), DispatcherCMD::DSPCMD_NEXT_EVENT, allTimeWatchdog::endl, fBadReadCount, fConsecutiveBadReadCount, fConsecutiveCorruptEventCount, fCorruptEventCount, fCurrentEventCapacity, fCurrentFilename, fCurrentSubRunID, fDoFillFlatDAQData, fFilterCorruptedNanoslices, daq2raw::OnlineUnpack::fFilterCorruptedNanoslices, art::errors::FileReadError, fMaxEventSizeBytes, fMaxReadAttempts, fSocket, fSocketBuffer, fSourceHelper, rawdata::RawTrigger::fTriggerTimingMarker_TimeStart, daq2raw::NOvAInputDriverBase::getDetectorGDML(), daq2raw::OnlineUnpack::getDetId(), daq2raw::OnlineUnpack::getEventNumber(), daq2raw::OnlineUnpack::getRunNumber(), daq2raw::OnlineUnpack::getSubrunNumber(), gsABORT_ON, gsMAGIC_WORD, gsUNPACK_EXCEPTION, gsUNPACK_RETURN, in_bytes(), art::SourceHelper::makeEventPrincipal(), art::SourceHelper::makeRunPrincipal(), art::SourceHelper::makeSubRunPrincipal(), printf(), daq2raw::OnlineUnpack::ProcessRawEvent(), art::put_product_in_principal(), art::SubRunID::runID(), string, stan::math::system_error(), msf_helper::timespec, DispatcherCMD::write(), and submit_syst::x.

297  {
298 
299 
300  // Allocate and initialize here to allow for a proper use
301  // of a goto later
302  int EvtStartIDX =0;
303  size_t receivedBytes =0;
304  ssize_t cmd_size_bytes =0;
305  uint32_t eventSize =0;
306  boost::system::error_code error;
307  int readAttempts =0;
308  ssize_t bytesRead =0;
309 
310  receivedBytes=0;
311  cmd_size_bytes=0;
312 
313  retryLOOP:
314 
315  // Check that the socket is open
316  if (!fSocket.is_open()){
317  // If the socket is closed/has closed return back
318  return false;
319  }
320 
321  ////////////////////////////////////
322  // Send the 'NEXT_EVENT' command here, then read the resulting event.
323  try {
324  // Construct the NEXT_EVENT command object
326  &fSocketBuffer[0]);
327 
328  // Serialize the command out to the socketbuffer
329  cmd_size_bytes = next_event_cmd.write();
330 
331  // Now send the NEXT_EVENT command to the dispatcher source
332  fSocket.send(boost::asio::buffer(&fSocketBuffer[0], cmd_size_bytes));
333 
334  // Cast the buffer to an 8bit byte array from a 32bit word array
335  uint8_t*const pSocketBuffer= reinterpret_cast<uint8_t*const>(&fSocketBuffer[0]);
336 
337  // Reset the number of bytes that have been received
338  receivedBytes =0;
339  bytesRead =0;
340 
341  // Read the first 4 words of the event
342  // this allows us to retrieve the total
343  // event size from the response:
344  while ( (receivedBytes < 16) ) {
345  ++readAttempts;
346  // If we reach our maximum number of retrys
347  // then we throw
348  if(readAttempts > fMaxReadAttempts){
349  throw "Maximum number of retrys reached on byte count read";
350  }
351 
352  // Attempt to read the header
353  bytesRead = fSocket.receive(boost::asio::buffer(pSocketBuffer + receivedBytes, 16 - receivedBytes ),
354  0,
355  error);
356 
357  // If we receive zero bytes back from the receive call
358  // this indicates an error. We go to check the error code
359  // in this case and take the appropriate action.
360  if(bytesRead == 0){
361  // Catch errors in the IO system
362  if(error==boost::asio::error::eof){
363  return false;
364  }else if(error){
365  // Some other error occured
366  throw boost::system::system_error(error);
367  } // endif error
368  } // endif bytesRead
369 
370  // Increment the total number of bytes that have been received
371  receivedBytes += bytesRead;
372 
373  // receive is a blocking IO call. There is no reason to sleep.
374  // usleep(1);
375  }
376 
377  //
378  // Debugging - we should always find the magic number 0xe929e929
379  // as the second word in the stream. If we don't there may be
380  // problems. Take a look at the start of the stream.
381  //
382  if (fSocketBuffer[1]!=0xe929e929) {
383  std::cerr << __FILE__ " : " << __LINE__
384  << " Error: Bad magic word at fSocketBuffer[1]. Will try shifting..."
385  << std::endl;
386  if (gsABORT_ON & gsMAGIC_WORD) abort();
387  }
388 
389  //
390  // Seek in the stream until we find the magic 0xe929e929. This
391  // should be the second word so don't look too far.
392  //
393  int kEvtStartIDXMax = 8;
394  for (EvtStartIDX=0; EvtStartIDX<kEvtStartIDXMax; ++EvtStartIDX) {
395  if(0xe929e929 == fSocketBuffer[EvtStartIDX+1]) break;
396  }
397  if (EvtStartIDX>=kEvtStartIDXMax) {
398  throw std::string("Unable to determine byte count on network read");
399  }
400  eventSize = ntohl(fSocketBuffer[EvtStartIDX]);
401 
402  // Don't modify the data stream in place. This can result
403  // in a mixed endian problem and we don't want to deal with it.
404  // fSocketBuffer[0] = ntohl(fSocketBuffer[0]);
405 
406 #ifndef NOVA_DEBUG_STREAM_ACK
407  std::cerr << "Event contains " << eventSize << " bytes.\n";
408 #endif // NOVA_DEBUG_STREAM_ACK
409 
410  if(eventSize == 0){
411  printf("EVT DATA:\n0x%08x 0x%08x 0x%08x 0x%08x\n",
414  }
415 
416  // Trap on the various bad event values
417 
418  // An "event size" of 0xFFFFFFFF means END_OF_DATA, so return false.
419  switch(eventSize){
420  case 0:
421  // This is an empty event. This indicates a problem
422  // with the network stream
423  ++fBadReadCount;
425  eventSize = ntohl(fSocketBuffer[1]);
426  EvtStartIDX = 1;
427  break;
428  case 0xFFFFFFFF:
429  // This is the event corrupt byte count. This indicates a
430  // problem with the dispatchers ability to buffer the event
433  return false;
434  break;
435  default:
436  // This is a normal event, we reset our
437  // consecutive error counters
440  break;
441  }
442 
443  // If we make it to this point then we probably have a good event
444  // header in the buffer. We will use this to get the rest of the event
445  // data off of the network.
446 
447  if( eventSize > (ssize_t)fMaxEventSizeBytes){
449  << "Unable to read entire event "
450  << eventSize
451  << " bytes: buffer size is only "
452  << fMaxEventSizeBytes
453  << " bytes.\n";
454  } else if ( eventSize > fCurrentEventCapacity) {
455  fSocketBuffer.resize( eventSize + 1);
456  fCurrentEventCapacity = eventSize;
459  }
460 
461  // Read the response:
462 
463  while ( receivedBytes < (eventSize+4) )
464  {
465  size_t remainingBytes = eventSize +4 -receivedBytes;
466  remainingBytes = remainingBytes > 1400? 1400:remainingBytes;
467 
468  receivedBytes += fSocket.receive(boost::asio::buffer(pSocketBuffer+receivedBytes, remainingBytes));
469 
470  }
471 
472 
473  } catch (boost::system::system_error& x) {
474  std::cerr << " A socket read error should be treated as END_OF_DATA.\n";
475  return false;
476  }
477 
478 
479  ////////////////////////////////////
480  // Unpack the data.
481 
482  // Need to convert the data to host byte order from network byte order
483  // (event size has already been done).
484  /*
485  size_t count_check = 0;
486  for (eb_t::iterator
487  i = fSocketBuffer.begin() + 1,
488  e = i + in_elements(fSocketBuffer[0]);
489  i != e;
490  ++i, ++count_check) {
491  *i = ntohl(*i);
492  }
493 
494  #ifndef NOVA_DEBUG_STREAM_ACK
495  std::cerr << "Converted " << in_bytes(count_check) << " bytes.\n";
496  #endif // NOVA_DEBUG_STREAM_ACK
497  */
498  // NOTE: As of OnlineUnpack.cxx r1.9, the histogram pointers required
499  // in the constructor are *not* filled. In the event that one *does*
500  // want this kind of information, the data to be histogrammed should
501  // be put into event products (not necessarily written out to file)
502  // and turned into histograms by a downstream analyzer. This will
503  // enable easy management of different histograms for diferent runs,
504  // subruns, etc, etc and trivial control of whether histograns are
505  // actually wanted or not.
506  daq2raw::OnlineUnpack up(nullptr, nullptr, nullptr, &fCurrentFilename);
507  up.fFilterCorruptedNanoslices = fFilterCorruptedNanoslices;
508 
509 
510  /// Define data structures, which will be filled by OnlineUnpack::ProcessRawEvent
511  std::unique_ptr<std::vector<rawdata::RawDigit> > digits(new std::vector<rawdata::RawDigit>);
512  rawdata::RawTrigger raw_trigger;
513  rawdata::RawSumDropMB raw_sumDropMB;
514  rawdata::RawSumDCM raw_sumDCM;
515  rawdata::FlatDAQData flat_daq_data;
516  rawdata::DAQHeader daq_header;
517 
518  // Fill the data structures for this event.
519  // MFP: Who is responsible for deleting the vector 'digits'? At what point
520  // is the responsibility for doing so accepted?
521  // ProcessRawEvent expects the first argument to be the first word of *data* (not the size
522  // of the event data), thus the +1 in addressing the vector.
523  bool unpackok = false;
524  try {
525  unpackok = up.ProcessRawEvent(&fSocketBuffer[EvtStartIDX+1],
526  digits.get(),
527  &raw_trigger,
528  &raw_sumDropMB,
529  &raw_sumDCM,
530  &flat_daq_data, &daq_header);
531  if (!unpackok) {
532  std::cerr << __FILE__ << ":" << __LINE__
533  << " Error: up.ProcessRawEvent(...) unpack failed"
534  << std::endl;
535  if (gsABORT_ON & gsUNPACK_RETURN) abort();
536  }
537  }
538  catch (...) {
539  std::cerr << __FILE__ << ":" << __LINE__
540  << " Error: Caught exception thrown from up.ProcessRawEvent(...)" << std::endl;
541  unpackok = false;
542  if (gsABORT_ON & gsUNPACK_EXCEPTION) abort();
543  }
544 
545  if (!unpackok)
546  {
547  std::cerr << " ProcessRawEvent failed.\n";
548  try{
549  //drain leftovers
550  // fSocket.receive(boost::asio::buffer(&fSocketBuffer[EvtStartIDX], fCurrentEventCapacity ));
551  //
552  // messier@indiana.edu - This "fSocket.receive" never
553  // completes. My tests show that maybe its not needed. I've
554  // commented it out until its better understood.
555  } catch (boost::system::system_error& x) {
556  std::cerr << " A socket read error should be treated as END_OF_DATA.\n";
557  return false;
558  }
559  goto retryLOOP; //FIXME: server code needs to use double buffering
560 
561  return false;
562  }
563 
564  // Get the actual timestamp here - use the current time as the
565  // default, maybe the default should really be a ridiculous value
566  struct timespec ts;
567 
570 
571  // art::Timestamp has the seconds since the beginning of the epoch
572  // in the upper 32 bits and the fractional seconds in the lower 32
573  // bits.
574  art::Timestamp tstamp = (ts.tv_sec << 32) | ts.tv_nsec;
575 
576  ////////////////////////////////////
577  // Assemble new principals as required.
578  art::SubRunID newID(up.getRunNumber(), up.getSubrunNumber());
579  if (inR == nullptr || fCurrentSubRunID.runID() != newID.runID()) { // New Run fragment
580  outR = fSourceHelper.makeRunPrincipal(up.getRunNumber(), tstamp);
581  // get the base form of the geometry file for the current detector
582  //\todo at some point we should query a database to get this, especially
583  //\todo when we have instances of only part of a detector working
584  std::string detFileBase = getDetectorGDML(up.getDetId());
585  std::unique_ptr<sumdata::RunData> rundata(new sumdata::RunData(novadaq::cnv::DetId(up.getDetId()), detFileBase));
586  art::put_product_in_principal(std::move(rundata),
587  *outR,
588  "daq"); // Module label
589  }
590  if (inSR == nullptr || fCurrentSubRunID != newID) { // New SubRun fragment
591  outSR = fSourceHelper.makeSubRunPrincipal(up.getRunNumber(),
592  up.getSubrunNumber(),
593  tstamp);
594  fCurrentSubRunID = newID;
595  }
596  outE = fSourceHelper.makeEventPrincipal(up.getRunNumber(),
597  up.getSubrunNumber(),
598  up.getEventNumber(),
599  tstamp);
600 
601  ////////////////////////////////////
602  // Add the event products to the event:
603 
604  // Digitized data:
605  art::put_product_in_principal(std::move(digits), *outE, "daq");
606  // Trigger data:
607  std::unique_ptr<std::vector<rawdata::RawTrigger> > rtcol(new std::vector<rawdata::RawTrigger>);
608  rtcol->push_back(raw_trigger); // Read one, product is a vector.
609  art::put_product_in_principal(std::move(rtcol),
610  *outE,
611  "daq"); // Module label
612 
613  // RawSumDropMB data:
614  std::unique_ptr<std::vector<rawdata::RawSumDropMB> > rtcol_MB(new std::vector<rawdata::RawSumDropMB>);
615  rtcol_MB->push_back(raw_sumDropMB); // Read one, product is a vector.
616  art::put_product_in_principal(std::move(rtcol_MB),
617  *outE,
618  "daq"); // Module label
619 
620  // RawSumDCM data:
621  std::unique_ptr<std::vector<rawdata::RawSumDCM> > rtcol_DCM(new std::vector<rawdata::RawSumDCM>);
622  rtcol_DCM->push_back(raw_sumDCM); // Read one, product is a vector.
623  art::put_product_in_principal(std::move(rtcol_DCM),
624  *outE,
625  "daq"); // Module label
626 
627  // DAQ header:
628  std::unique_ptr<rawdata::DAQHeader > daqcol(new rawdata::DAQHeader(daq_header));
629  art::put_product_in_principal(std::move(daqcol),
630  *outE,
631  "daq"); // Module label
632 
633  /// Put FlatDAQData object
634  if(fDoFillFlatDAQData) {
635  std::unique_ptr<std::vector<rawdata::FlatDAQData> > flatcol(new std::vector<rawdata::FlatDAQData>);
636  flatcol->push_back(flat_daq_data); // Read one, product is a vector.
637  art::put_product_in_principal(std::move(flatcol),
638  *outE,
639  "daq"); // Module label
640  }
641 
642 
643  return true;
644  }
static const int gsMAGIC_WORD
Source to read raw Events from a socket, per specifications in NOvA-DocDB-3947 and NOvA-DocDB-4390...
std::string getDetectorGDML(const int detid) const
EventPrincipal * makeEventPrincipal(EventAuxiliary const &eventAux, std::shared_ptr< History > &&history) const
static const int gsUNPACK_RETURN
int fConsecutiveBadReadCount
Counter for the number of consecutive corrupt events encountered.
void system_error(const char *function, const char *name, const int &y, const char *msg1, const char *msg2)
std::enable_if_t< P::branch_type==InEvent||P::branch_type==InResults > put_product_in_principal(std::unique_ptr< T > &&product, P &principal, std::string const &module_label, std::string const &instance_name={})
static size_t in_bytes(size_t nElements)
static const int gsABORT_ON
OStream cerr
Definition: OStream.cxx:7
static const int gsUNPACK_EXCEPTION
int fConsecutiveCorruptEventCount
Counter for the number of consecutive corrupt events encountered.
bool convertNovaTimeToUnixTime(uint64_t const &inputNovaTime, struct timespec &outputUnixTime)
bool fDoFillFlatDAQData
Do we need to fill FlatDAQData object?
::xsd::cxx::tree::buffer< char > buffer
Definition: Database.h:179
art::SourceHelper const & fSourceHelper
Class to help with {Run, SubRun, Event}Principal construction.
RunID const & runID() const
Definition: SubRunID.h:77
art::SubRunID fCurrentSubRunID
Keep track of the current subRun details.
RunPrincipal * makeRunPrincipal(RunAuxiliary const &runAux) const
printf("%d Experimental points found\n", nlines)
size_t fMaxEventSizeBytes
Will not try to read an event bigger than this.
bool fFilterCorruptedNanoslices
Filter the corrupted nanaslices in the unpacking?
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
Definition: Exception.h:66
unsigned long long fTriggerTimingMarker_TimeStart
Definition: RawTrigger.h:38
std::string fCurrentFilename
Hang on to the current filename.
SubRunPrincipal * makeSubRunPrincipal(SubRunAuxiliary const &subRunAux) const
int fCorruptEventCount
Counter for the total number of corrupt events encountered.
boost::asio::ip::tcp::socket fSocket
Object representing the socket from which we read.
int fBadReadCount
Counter for the total number of corrupt events encountered.
int fMaxReadAttempts
Maximum number of read attempts before we throw.
size_t fCurrentEventCapacity
Current event capacity in buffer elements (current buffer size is 1 more than this);.
std::vector< uint32_t > fSocketBuffer
Raw data buffer for socket communication.
enum BeamMode string

Member Data Documentation

int daq2raw::NOvASocketInputDriver::fBadReadCount
private

Counter for the total number of corrupt events encountered.

Definition at line 100 of file NOvASocketInputDriver.h.

Referenced by readNext().

int daq2raw::NOvASocketInputDriver::fConsecutiveBadReadCount
private

Counter for the number of consecutive corrupt events encountered.

Definition at line 101 of file NOvASocketInputDriver.h.

Referenced by readNext().

int daq2raw::NOvASocketInputDriver::fConsecutiveCorruptEventCount
private

Counter for the number of consecutive corrupt events encountered.

Definition at line 99 of file NOvASocketInputDriver.h.

Referenced by readNext().

int daq2raw::NOvASocketInputDriver::fCorruptEventCount
private

Counter for the total number of corrupt events encountered.

Definition at line 98 of file NOvASocketInputDriver.h.

Referenced by readNext().

size_t daq2raw::NOvASocketInputDriver::fCurrentEventCapacity
private

Current event capacity in buffer elements (current buffer size is 1 more than this);.

Definition at line 89 of file NOvASocketInputDriver.h.

Referenced by readNext().

std::string daq2raw::NOvASocketInputDriver::fCurrentFilename
private

Hang on to the current filename.

Definition at line 85 of file NOvASocketInputDriver.h.

Referenced by readFile(), and readNext().

art::SubRunID daq2raw::NOvASocketInputDriver::fCurrentSubRunID
private

Keep track of the current subRun details.

Definition at line 86 of file NOvASocketInputDriver.h.

Referenced by readNext().

bool daq2raw::NOvASocketInputDriver::fDoFillFlatDAQData
private

Do we need to fill FlatDAQData object?

Definition at line 94 of file NOvASocketInputDriver.h.

Referenced by NOvASocketInputDriver(), and readNext().

bool daq2raw::NOvASocketInputDriver::fFilterCorruptedNanoslices
private

Filter the corrupted nanaslices in the unpacking?

Definition at line 93 of file NOvASocketInputDriver.h.

Referenced by readNext().

boost::asio::io_service daq2raw::NOvASocketInputDriver::fIoService
private

Object representing IO services of the OS.

Definition at line 87 of file NOvASocketInputDriver.h.

Referenced by readFile().

size_t daq2raw::NOvASocketInputDriver::fMaxEventSizeBytes
private

Will not try to read an event bigger than this.

Definition at line 90 of file NOvASocketInputDriver.h.

Referenced by readNext().

int daq2raw::NOvASocketInputDriver::fMaxReadAttempts
private

Maximum number of read attempts before we throw.

Definition at line 96 of file NOvASocketInputDriver.h.

Referenced by readNext().

boost::asio::ip::tcp::socket daq2raw::NOvASocketInputDriver::fSocket
private

Object representing the socket from which we read.

Definition at line 88 of file NOvASocketInputDriver.h.

Referenced by closeCurrentFile(), readFile(), and readNext().

std::vector<uint32_t> daq2raw::NOvASocketInputDriver::fSocketBuffer
private

Raw data buffer for socket communication.

Definition at line 91 of file NOvASocketInputDriver.h.

Referenced by readNext().

art::SourceHelper const& daq2raw::NOvASocketInputDriver::fSourceHelper
private

Class to help with {Run, SubRun, Event}Principal construction.

Definition at line 84 of file NOvASocketInputDriver.h.

Referenced by readNext().

int daq2raw::NOvASocketInputDriver::fTransmitDelay
private

Delay in microseconds to wait between command transmission.

Definition at line 107 of file NOvASocketInputDriver.h.

Referenced by readFile().

uint32_t daq2raw::NOvASocketInputDriver::trigmask_word1
private

Lowest word in the trigger mask only the lower 24 bit are used.

Definition at line 103 of file NOvASocketInputDriver.h.

Referenced by readFile().

uint32_t daq2raw::NOvASocketInputDriver::trigmask_word2
private

Middle word (bits 31-63) in the trigger mask.

Definition at line 104 of file NOvASocketInputDriver.h.

Referenced by readFile().

uint32_t daq2raw::NOvASocketInputDriver::trigmask_word3
private

High word (bits 64-96) in the trigger mask.

Definition at line 105 of file NOvASocketInputDriver.h.

Referenced by readFile().


The documentation for this class was generated from the following files: