21 #include "EventDispatcher_CommandSet/DispatcherCmd.h" 22 #include "NovaDAQConventions/DAQConventions.h" 23 #include "NovaTimingUtilities/TimingUtilities.h" 54 cet::split(filename,
':', std::back_inserter(components));
55 if (components.size() != 2)
58 <<
"Bad socket address in NOvASocketDriver: " 62 address = components[0];
76 , fCurrentSubRunID(-1, -1)
79 , fCurrentEventCapacity(in_elements(65536 * 1024))
80 , fMaxEventSizeBytes(pset.
get<size_t>(
"maxEventSizeK", 65536) * 1024)
81 , fSocketBuffer(fCurrentEventCapacity + 1)
82 , fFilterCorruptedNanoslices(pset.
get<bool>(
"FilterCorruptedNanoslices"))
83 , fDoFillFlatDAQData (pset.
get<bool>(
"DoFillFlatDAQData"))
84 , fMaxReadAttempts (pset.
get<
int>(
"MaxReadAttempts",100))
85 , fCorruptEventCount(0)
86 , fConsecutiveCorruptEventCount(0)
88 , fConsecutiveBadReadCount(0)
89 , trigmask_word1(pset.
get<uint32_t>(
"TriggerMask1",0x00ffffff))
90 , trigmask_word2(pset.
get<uint32_t>(
"TriggerMask2",0xffffffff))
91 , trigmask_word3(pset.
get<uint32_t>(
"TriggerMask3",0xffffffff))
92 , fTransmitDelay(pset.
get<
int>(
"TransmitDelay",1000))
126 namespace ba = boost::asio;
141 bai::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
142 bai::tcp::resolver::iterator
end;
144 boost::system::error_code
error = boost::asio::error::host_not_found;
145 while (error && endpoint_iterator != end)
148 fSocket.connect(*endpoint_iterator++, error);
151 static size_t const cb_size = 32;
154 cb_t cmd_buffer(cb_size);
156 #define SET_DISPATCHER_TRIGGER_MASKS 1 157 #ifdef SET_DISPATCHER_TRIGGER_MASKS 159 cb_t mask1_buffer(cb_size);
160 cb_t mask2_buffer(cb_size);
161 cb_t mask3_buffer(cb_size);
173 ssize_t cmd1_size_bytes = mask1_cmd.
write();
174 ssize_t cmd2_size_bytes = mask2_cmd.
write();
175 ssize_t cmd3_size_bytes = mask3_cmd.
write();
203 ssize_t cmd_size_bytes = single_cmd.
write();
210 #ifndef NOVA_DEBUG_STREAM_ACK 212 << boost::numeric_cast<
unsigned>(cmd_buffer[0])
214 for (cb_t::const_iterator
215 i = cmd_buffer.begin(),
216 e = cmd_buffer.end();
219 std::cerr << boost::numeric_cast<unsigned>(*i) <<
" ";
222 for (
size_t i = 0;
i + 3 < cb_size;
i += 4) {
228 << *
reinterpret_cast<uint32_t*
>(&cmd_buffer[
i]) <<
" ";
231 #endif // NOVA_DEBUG_STREAM_ACK 232 if (cmd_buffer[0] > cb_size) {
234 <<
"Unable to read entire command acknowledgement of " 236 <<
" bytes: buffer size is only " 246 static char const ACK = 6;
247 unsigned char ack = cmd_buffer[1];
248 if (cmd_buffer[1] != ACK) {
250 <<
"Did not receive ACK for SINGLE command: expected 0x" 254 <<
static_cast<unsigned>(ACK)
258 <<
static_cast<unsigned>(ack)
261 unsigned char cmd = cmd_buffer[2];
264 <<
"Received ACK for unexpected command 0x" 277 <<
"Unable to open socket to address " 279 <<
" for NOvASocketInputDriver, due to system error:\n" 303 size_t receivedBytes =0;
304 ssize_t cmd_size_bytes =0;
305 uint32_t eventSize =0;
306 boost::system::error_code
error;
308 ssize_t bytesRead =0;
329 cmd_size_bytes = next_event_cmd.
write();
344 while ( (receivedBytes < 16) ) {
349 throw "Maximum number of retrys reached on byte count read";
362 if(error==boost::asio::error::eof){
371 receivedBytes += bytesRead;
384 <<
" Error: Bad magic word at fSocketBuffer[1]. Will try shifting..." 393 int kEvtStartIDXMax = 8;
394 for (EvtStartIDX=0; EvtStartIDX<kEvtStartIDXMax; ++EvtStartIDX) {
397 if (EvtStartIDX>=kEvtStartIDXMax) {
398 throw std::string(
"Unable to determine byte count on network read");
406 #ifndef NOVA_DEBUG_STREAM_ACK 407 std::cerr <<
"Event contains " << eventSize <<
" bytes.\n";
408 #endif // NOVA_DEBUG_STREAM_ACK 411 printf(
"EVT DATA:\n0x%08x 0x%08x 0x%08x 0x%08x\n",
449 <<
"Unable to read entire event " 451 <<
" bytes: buffer size is only " 452 << fMaxEventSizeBytes
463 while ( receivedBytes < (eventSize+4) )
465 size_t remainingBytes = eventSize +4 -receivedBytes;
466 remainingBytes = remainingBytes > 1400? 1400:remainingBytes;
474 std::cerr <<
" A socket read error should be treated as END_OF_DATA.\n";
511 std::unique_ptr<std::vector<rawdata::RawDigit> > digits(
new std::vector<rawdata::RawDigit>);
523 bool unpackok =
false;
530 &flat_daq_data, &daq_header);
533 <<
" Error: up.ProcessRawEvent(...) unpack failed" 540 <<
" Error: Caught exception thrown from up.ProcessRawEvent(...)" <<
std::endl;
547 std::cerr <<
" ProcessRawEvent failed.\n";
556 std::cerr <<
" A socket read error should be treated as END_OF_DATA.\n";
607 std::unique_ptr<std::vector<rawdata::RawTrigger> > rtcol(
new std::vector<rawdata::RawTrigger>);
608 rtcol->push_back(raw_trigger);
614 std::unique_ptr<std::vector<rawdata::RawSumDropMB> > rtcol_MB(
new std::vector<rawdata::RawSumDropMB>);
615 rtcol_MB->push_back(raw_sumDropMB);
621 std::unique_ptr<std::vector<rawdata::RawSumDCM> > rtcol_DCM(
new std::vector<rawdata::RawSumDCM>);
622 rtcol_DCM->push_back(raw_sumDCM);
635 std::unique_ptr<std::vector<rawdata::FlatDAQData> > flatcol(
new std::vector<rawdata::FlatDAQData>);
636 flatcol->push_back(flat_daq_data);
EventPrincipal * makeEventPrincipal(EventAuxiliary const &eventAux, std::shared_ptr< History > &&history) const
void system_error(const char *function, const char *name, const int &y, const char *msg1, const char *msg2)
std::enable_if_t< P::branch_type==InEvent||P::branch_type==InResults > put_product_in_principal(std::unique_ptr< T > &&product, P &principal, std::string const &module_label, std::string const &instance_name={})
uint32_t getSubrunNumber() const
bool fFilterCorruptedNanoslices
Do we need to filter out Corrupted NanoSlices?
bool convertNovaTimeToUnixTime(uint64_t const &inputNovaTime, struct timespec &outputUnixTime)
::xsd::cxx::tree::buffer< char > buffer
TypeLabel const & reconstitutes(std::string const &modLabel, std::string const &instanceName={})
RunID const & runID() const
void parseFilename(std::string const &filename, std::string &address, std::string &port)
RunPrincipal * makeRunPrincipal(RunAuxiliary const &runAux) const
printf("%d Experimental points found\n", nlines)
A module to produce rawdata::RawDigits from NOvADDT DAQHits.
cet::coded_exception< errors::ErrorCodes, ExceptionDetail::translate > Exception
unsigned long long fTriggerTimingMarker_TimeStart
MaybeLogger_< ELseverityLevel::ELsev_warning, false > LogWarning
SubRunPrincipal * makeSubRunPrincipal(SubRunAuxiliary const &subRunAux) const
void split(std::string const &s, char c, OutIter dest)
uint32_t getRunNumber() const
bool ProcessRawEvent(void *rawevent_buffer, std::vector< rawdata::RawDigit > *, rawdata::RawTrigger *, rawdata::RawSumDropMB *, rawdata::RawSumDCM *, rawdata::FlatDAQData *, rawdata::DAQHeader *)
Unpack the RawEvent.
unsigned long long getEventNumber() const