ShmRdWr.cpp
Go to the documentation of this file.
1 // This file (ShmRdWr.cpp) was created by Ron Rechenmacher <ron@fnal.gov> on
2 // May 23, 2011. "TERMS AND CONDITIONS" governing this file are in the README
3 // or COPYING file. If you do not have such a file, one can be obtained by
4 // contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
5 // $RCSfile: ShmRdWr.cpp,v $
6 // rev="$Revision: 1.30.12.1 $$Date: 2019/09/27 00:07:37 $";
7 
8 /* The writer will do (in the following order):
9  1) inc idx_wr_start
10  1a) determine buffer
11  2) write time in buffer
12  3) write byte count and data in buffer
13  4) inc total_bytes_written
14  5) inc idxcnt_wr_complete
15 
16 Readers init their own static read_idx
17  1) readers checks it's read_idx against the idxcnt_wr_complete to determine
18  how many buffers are available and if any have been missed (overwritten
19  since last read.
20  2) if buffers are available, read a buffer
21  3) after a buffer has been read, check cnt associatedd with that buffer
22  against idx_wr_start to see if buffer just read has been written to
23  (at least _potentially)
24 */
25 
26 #pragma GCC diagnostic ignored "-Wmissing-field-initializers"
27 #pragma GCC diagnostic ignored "-Wpedantic"
28 #pragma GCC diagnostic ignored "-Wnonnull-compare"
29 
30 #include <stdio.h> // printf
31 #include <stdlib.h> // getenv
32 #include <sys/shm.h> // shmget
33 #include <sys/sem.h> // semget
34 #include <errno.h> // errno, EACCES, EINVAL
35 #include <strings.h> // bzero
36 #include <string.h> // memcpy
37 #include <sys/time.h> // gettimeofday
38 #include <stdarg.h> // va_start,...
39 
40 #include "ShmRdWr/ShmRdWr.h"
41 
42 // make CXXFLAGS=-DDEBUG
43 #ifdef DEBUG
44 # define DPRINT(x) tprint x
45 #else
46 # define DPRINT(x)
47 #endif
48 #ifdef TEST
49 # define PAUSE( msg ) { char xx[99];printf(msg);printf("press enter/return to continue...");fflush(stdout);fgets(xx,sizeof(xx),stdin); }
50 #else
51 # define PAUSE( msg )
52 #endif
53 
54 
55 
57  : shm_hdr_ptr_(NULL)
58  , options_(options)
59  , rdr_info_((ShmRdWr_Rdr){0,0})
60  , shm_grprdr_info__ptr_(NULL)
61  , have_sem_(250,false)
62  , gIdx_dflt_(default_gIdx)
63  , poll_ms_(5)
64 {
65 }
66 
67 bool const ShmRdWr::semKeepAfterDataAvail=true;
68 
69 // returns: 0=success 1=error
71 {
72  key_t shmkey;
73  uint16_t bufsNum;
74  uint16_t bufsSiz_MB;
75  uint8_t grpsNum;
76  uint8_t grpsSiz_pages;
77 
78  if (getShmKeyParams(shmkey,bufsNum,bufsSiz_MB,grpsNum,grpsSiz_pages))
79  { DPRINT(("ERROR - problems with getShmKeyParams\n"));
80  return (1);
81  }
82 
83  // Try to attach, creating if it doesn't exist.
84  size_t shmsize = paramsToSize( bufsNum,bufsSiz_MB,grpsNum,grpsSiz_pages );
85  DPRINT(("shmsize=%lld\n",(long long)shmsize));
86  int shmflgs = IPC_CREAT | IPC_EXCL | 0666; // 1st try to create and error if one exists
87  int shmid = shmget( shmkey, shmsize, shmflgs );
88  if (shmid != -1)
89  { // NEWLY CREATED - get ptr, add semaphores
90  shm_hdr_ptr_ = (ShmRdWr_Header*)shmat( shmid, NULL, 0 );
91  if (shm_hdr_ptr_ == (void *)-1) return (1);
92  bzero( shm_hdr_ptr_, sizeof(*shm_hdr_ptr_) );
93  shm_hdr_ptr_->bufs_num = bufsNum;
94  shm_hdr_ptr_->buf_sz_MB = bufsSiz_MB;
95  shm_hdr_ptr_->buf_sz_bytes = bufsSiz_MB*0x100000;
96  shm_hdr_ptr_->grpsNum = grpsNum;
97  shm_hdr_ptr_->grpsSiz_pages = grpsSiz_pages;
98  shm_hdr_ptr_->bufs_largest_multiple = 0xffffffff - (0xffffffff%bufsNum);
99  shm_hdr_ptr_->largest_zero_offset = (0xffffffff%bufsNum) + 1;
100  shm_hdr_ptr_->semSetId = -1;
101 
102  if (grpsNum)
103  { int semid = semget( shmkey, grpsNum, IPC_CREAT | 0666 ); // no IPC_EXCL - may be left over
104  if (semid == -1) { perror( "semget" ); return (1); }
105  // initialize the sem set (all) to 1 without the SEM_UNDO flag
106  sembuf _sembuf[SEMMSL];
107  for (int ii=0; ii<grpsNum; ++ii)
108  { _sembuf[ii].sem_num = ii;
109  _sembuf[ii].sem_op = 1;
110  _sembuf[ii].sem_flg = 0;
111  }
112  if (semop(semid,_sembuf,grpsNum) == -1)
113  { perror("semop");
114  return (1);
115  }
116  shm_hdr_ptr_->semSetId = semid;
117  }
118  }
119  else if (errno == EEXIST)
120  { // There is an exist segment -- get/attach to it
121  shmflgs = 0;
122  shmid = shmget( shmkey, shmsize, shmflgs );
123  if (shmid != -1)
124  { // have id of existing - attach and check for (application) consistency
125  shm_hdr_ptr_ = (ShmRdWr_Header*)shmat( shmid, NULL, 0);
126  if (shm_hdr_ptr_ == (void *)-1)
127  { perror("shmat");
128  return (1);
129  }
130  if (grpsNum && (shm_hdr_ptr_->semSetId == -1))
131  sleep(1); // wait for semaphores ???
132  if (grpsNum && (shm_hdr_ptr_->semSetId == -1))
133  { DPRINT(("ERROR - grpsNum && semSetId=-1\n"));
134  return (1);
135  }
136  }
137  else
138  { perror( "shmget" );
139  }
140  }
141  else
142  { // a real error
143  perror( "shmget" );
144  return (1);
145  }
148  shm_prv_data_ptr_ = (uint8_t*)(((uint8_t*)shm_grprdr_info__ptr_) + 2*4096);
149  shm_databuf_data_ptr_ = shm_prv_data_ptr_ + grpsNum*4096;
150  if (options_ & SHMRW_O_WRITER)
151  { // set writer pid
153  {
154  }
155  shm_hdr_ptr_->writer_pid = getpid();
156  // cancel any write in-progress (for now, ASSUME previous writer died)
158  shm_hdr_ptr_->offset = 0;
159  }
160  return (0);
161 } // attach
162 
163 
164 // error returns: -28 ENOSPC
165 ssize_t ShmRdWr::write( void* src_ptr, size_t byte_cnt, bool more, off_t off )
166 {
167  if ((shm_hdr_ptr_==NULL) && (attach()!=0)) return (-1);
168  DPRINT(("write: off=%ld\n", off ));
169 
170  int bufidx;
172  { // starting new write
173  if (byte_cnt > (size_t)shm_hdr_ptr_->buf_sz_MB*0x100000) return (-2);
174  // FIRST adjust idx_wr_start
176  shm_hdr_ptr_->offset = 0;
177  bufidx = bufcnt2idx( shm_hdr_ptr_->idx_wr_start );
178  bzero( &shm_databuf_metadata_ptr_[bufidx], sizeof(ShmRdWr_BufMetadata) );
179  }
180  else // continuing write
181  { bufidx=bufcnt2idx( shm_hdr_ptr_->idx_wr_start );
182  }
183 
184  if (off != -1) shm_hdr_ptr_->offset = off;
185  if (byte_cnt+shm_hdr_ptr_->offset > (size_t)shm_hdr_ptr_->buf_sz_MB*0x100000)
186  return (-ENOSPC);
187 
188  uint8_t* datptr=shm_databuf_data_ptr_ + bufidx*shm_hdr_ptr_->buf_sz_MB*0x100000;
189 
190  DPRINT(("write: shm_hdr_ptr_->offset=%ld\n", shm_hdr_ptr_->offset ));
191  memcpy( datptr+shm_hdr_ptr_->offset, src_ptr, byte_cnt );
192 
193  shm_hdr_ptr_->offset+=byte_cnt;
194 
197 
198  if (more == false)
199  { gettimeofday( &shm_databuf_metadata_ptr_[bufidx].wr_complete_tod, NULL );
201  // LAST - adjust idxcnt_wr_complete
202  }
203  return (byte_cnt);
204 }
205 ssize_t ShmRdWr::write( void* src_ptr, size_t byte_cnt, off_t off, bool more )
206 { return write( src_ptr, byte_cnt, more, off );
207 }
208 ssize_t ShmRdWr::write( void* src_ptr, size_t byte_cnt )
209 { return write( src_ptr, byte_cnt, false, (off_t)-1 );
210 } // write
211 
212 
213 // Get the next buffer to be read and Info, but does not "start" read (incase calling method has (allowPartial=false && obufSz<bytesInBuf))
214 // returns bufIdx (>=0) OR error/tmo (<0)
215 int ShmRdWr::read_next_( ShmRdWr_Info& info, timeval& expire, ShmRdWr_Rdr& rdr )
216 {
217  if ((shm_hdr_ptr_==NULL) && (attach()!=0)) return (-1);
218  int bufidx=-1;
219 
220  bzero( &info, sizeof(info) ); // init info
221 
222  uint32_t buffs_num=shm_hdr_ptr_->bufs_num;
223 
224 
225  // check if already started
226  if (rdr.idx_rd_start != rdr.idxcnt_rd_complete)
227  { bufidx = bufcnt2idx( rdr.idx_rd_start );
228  info.bufInfo = shm_databuf_metadata_ptr_[bufidx];
229  info.data_ptr = shm_databuf_data_ptr_+ bufidx*shm_hdr_ptr_->buf_sz_MB*0x100000;
230  // check if bufInfo is OK
231  if (bufcnt_delta(shm_hdr_ptr_->idx_wr_start,rdr.idx_rd_start) < buffs_num)
232  { // yes, it is OK (at this point)
233  return (bufcnt2idx( rdr.idx_rd_start ));
234  }
235  // else not OK -- end read, return error
236  ++rdr.total_missed;
238  rdr.offset = 0;
239  ++info.ovrwt;
240  info.ovrwt_tot = rdr.total_missed;
241  return (-SHMRW_EOVRWT);
242  }
243 
244  while (1)
245  {
246  uint32_t buffs_delta = bufcnt_delta( shm_hdr_ptr_->idxcnt_wr_complete
247  ,rdr.idxcnt_rd_complete );
248  if (buffs_delta > buffs_num)
249  { // missed some
250  uint32_t missed = buffs_delta - buffs_num;
251  rdr.total_missed += missed;
252  // reset idxcnt_rd_complete to recover
254  if (buffs_num > 1)
255  { // sacrifice a buffer if we can to help assure recovery
256  ++rdr.idxcnt_rd_complete;
257  ++missed;
258  ++rdr.total_missed;
259  }
260  info.ovrwt += missed;
261  // now go on to actually read a buffer (below)...
262  }
263  else if (buffs_delta == 0)
264  { // no buffers available
265  if ((expire.tv_sec!=0) && is_tmo(expire))
266  { info.ovrwt_tot = rdr.total_missed;
267  info.cushion = buffs_num - (( shm_hdr_ptr_->idx_wr_start
269  DPRINT(("read_next_: timeout\n"));
270  return (-EAGAIN);
271  }
272  DPRINT(("read_next_: poll sleep %ums\n",poll_ms_));
273  usleep( poll_ms_*1000 );
274  continue;
275  }
276  // else 0 < buffs_delta <= buffs_num
277 
278  // read next buffer
280  rdr.offset = 0;
281  bufidx = bufcnt2idx( rdr.idx_rd_start );
282  info.bufInfo = shm_databuf_metadata_ptr_[bufidx];
283  info.data_ptr = shm_databuf_data_ptr_+ bufidx*shm_hdr_ptr_->buf_sz_MB*0x100000;
284  info.ovrwt_tot = rdr.total_missed;
285  // check if bufInfo is OK
286  if (bufcnt_delta(shm_hdr_ptr_->idx_wr_start,rdr.idx_rd_start) < buffs_num)
287  { // yes, it is OK (at this point)
288  break;
289  }
290  }
291  return (bufidx);
292 } // read_next_
293 
294 // returns: >=0 data bytes read
295 // -4 in prog ovrwt SHM_EOVRWT
296 // -28 obuf too small ENOSPC (only when allowPartial=false)
297 // -11 timeout EAGAIN
298 ssize_t ShmRdWr::read_( void* dst_ptr, size_t dst_max_bytes, ShmRdWr_Info& info
299  , timeval& expire_time, bool allowPartial, ShmRdWr_Rdr& rdr )
300 {
301  if ((shm_hdr_ptr_==NULL) && (attach()!=0)) return (-1);
302  int retsts=0;
303 # if 0
304  if (&info != NULL) printf("&info=%p\n", &info );
305  else printf("&info=NULL\n" );
306 # endif
307 
308  bzero( &info, sizeof(info) ); // init info
309 
310  uint32_t buffs_num=shm_hdr_ptr_->bufs_num;
311 
312  while (1)
313  { int bufidx;
314  if ((bufidx=read_next_(info,expire_time,rdr)) < 0)
315  { retsts=bufidx;
316  break;
317  }
318 
319  size_t datbytes = shm_databuf_metadata_ptr_[bufidx].data_bytes;
320  uint8_t* datptr = shm_databuf_data_ptr_ + bufidx*shm_hdr_ptr_->buf_sz_MB*0x100000;
321 
322  if (!allowPartial && ((datbytes-rdr.offset)>dst_max_bytes))
323  { retsts = -ENOSPC; // didn't read b/c obuf too small
324  break;
325  }
326  else
327  { size_t memcpy_bytes=((datbytes-rdr.offset)>dst_max_bytes)
328  ? dst_max_bytes: (datbytes-rdr.offset);
329  DPRINT(("read_: memcpy(dest,datptr+%ld,%ld)\n",rdr.offset,memcpy_bytes));
330  memcpy( dst_ptr, datptr+rdr.offset, memcpy_bytes );
331  rdr.offset += memcpy_bytes;
332  // check for possible/probable overwrite
333  if (bufcnt_delta(shm_hdr_ptr_->idx_wr_start,rdr.idx_rd_start) >= buffs_num)
334  { // A PROBABLE OVRWT - start OVER
335  continue;
336  }
337  else
338  { // ALL IS WELL
339  // inc idxcnt, fill in info and return
340  if (!allowPartial || (rdr.offset==(unsigned)datbytes))
342  retsts = memcpy_bytes;
343  info.ovrwt_tot = rdr.total_missed;
344  info.cushion = buffs_num - (( shm_hdr_ptr_->idx_wr_start
346  info.bufInfo = shm_databuf_metadata_ptr_[bufidx];
347  break;
348  }
349  }
350  } // while (1)
351 
352  return (retsts);
353 } // read_
354 
355 
356 ssize_t ShmRdWr::read( void* dst_ptr, size_t dst_max_bytes
357  , ShmRdWr_Info& info, time_t tmo_ms, bool allowPartial )
358 { timeval expire;
359  return read_( dst_ptr, dst_max_bytes, info, set_expire_time(expire,tmo_ms), allowPartial, rdr_info_ );
360 }
361 ssize_t ShmRdWr::read( void* dst_ptr, size_t max_bytes, uint32_t& ovrwt, time_t tmo_ms )
363  int sts=read( dst_ptr, max_bytes, info, tmo_ms );
364  if (&ovrwt!= NULL)
365  { ovrwt = info.ovrwt;
366  }
367  return (sts);
368 }
369 ssize_t ShmRdWr::read( void* dst_ptr, size_t max_bytes, time_t tmo_ms, uint32_t& ovrwt )
370 { return read( dst_ptr, max_bytes, ovrwt, tmo_ms );
371 } // read
372 
373 
374 
375 ssize_t ShmRdWr::read_grp( void* dptr, size_t dbytes, ShmRdWr_Info& info
376  , time_t tmo, bool semKeepAfterDataAvail, shmrw_gid_t gIdx )
377 {
378  if ((shm_hdr_ptr_==NULL) && (attach()!=0)) return (-1);
379  DPRINT(("read_grp(...dbytes=%ld,info,gIdx=%u,tmo=%ld)\n",dbytes,(unsigned)gIdx,tmo));
380  if (gIdx >= SEMMSL) gIdx = gIdx_dflt_;
381  if (gIdx >= shm_hdr_ptr_->grpsNum) return (-1);
382 
383  if (have_sem_[gIdx] && (tmo!=0))
384  { DPRINT(("read_grp: setting tmo=0\n"));
385  tmo=0;
386  }
387  timeval expire_time;
388  set_expire_time( expire_time, tmo );
389 
390  int sts=0;
391  while (1)
392  {
393  if (have_sem_[gIdx] == false)
394  { // get sem, remember to do release
395  timespec tmo_ts;
396  expire_tv2ts( expire_time, tmo_ts );
397  if (!semtake(gIdx,tmo_ts)) return (0);
398  have_sem_[gIdx] = true;
399  }
400 
401  // all read_ with tmo=0 as we are handling any tmo!=0 at this level
402  timeval tmo0((timeval){1,0}); // a long time ago
403  sts = read_( dptr, dbytes, info, tmo0, semKeepAfterDataAvail, shm_grprdr_info__ptr_[gIdx] );
404  DPRINT(("read_grp: read_=%d\n",sts));
405  if (sts != -EAGAIN)
406  break;
407  if ((tmo!=-1) && is_tmo(expire_time))
408  { DPRINT(("read_grp: timed out\n"));
409  break;
410  }
411  semgive( gIdx );
412  have_sem_[gIdx] = false;
413  DPRINT(("read_grp: poll sleep %ums exp=*.%06ld\n",poll_ms_,expire_time.tv_usec));
414  usleep( poll_ms_*1000 );
415  }
416  // Any error should cause semgive
417  if ( (have_sem_[gIdx]==true)
418  &&(!(semKeepAfterDataAvail&&((sts>=0)&&info.bufInfo.data_bytes))) )
419  { semgive( gIdx );
420  have_sem_[gIdx] = false;
421  }
422  return (sts);
423 }
424 ssize_t ShmRdWr::read_grp( void*dest, size_t dbytes, uint32_t& ovrwt
425  , time_t tmo, bool semKeepAfterDataAvail, shmrw_gid_t gIdx )
427  int sts=read_grp( dest, dbytes, info, tmo, semKeepAfterDataAvail, gIdx );
428  if (&ovrwt!= NULL)
429  { ovrwt = info.ovrwt;
430  }
431  return (sts);
432 }
433 ssize_t ShmRdWr::read_grp( void*dest, size_t max_bytes
434  , time_t tmo, bool semKeepAfterDataAvail, shmrw_gid_t gIdx )
435 { //ShmRdWr_Info info;
436  return read_grp( dest, max_bytes, *(ShmRdWr_Info*)0, tmo, false, gIdx );
437 } // read_grp
438 
440 { ssize_t sts=read_grp(0,0,info,tmo,true,gIdx);
441  if (sts<0) return (sts);
442  else
443  { // return bytes avail instead of bytes read (which would be 0,as specified)
444  return(info.bufInfo.data_bytes);
445  }
446 }
447 ssize_t ShmRdWr::read_grp_start( uint32_t&ovrwt, time_t tmo,shmrw_gid_t gIdx )
448 { ShmRdWr_Info info;ssize_t sts=read_grp_start(info,tmo,gIdx);
449  ovrwt = info.ovrwt;
450  return (sts);
451 }
452 
454 {
455  int retsts=0;
456  if (gIdx >= SEMMSL) gIdx = gIdx_dflt_;
457  if (gIdx >= shm_hdr_ptr_->grpsNum) return (-1);
459  DPRINT(("read_grp_close: gIdx=%u start=%u complete=%u\n",gIdx,rdr.idx_rd_start, rdr.idxcnt_rd_complete));
460  if (rdr.idx_rd_start != rdr.idxcnt_rd_complete)
461  { // check for possible ovrwt
463  { // ovrwt
464  ovrwt=1;
465  ++rdr.total_missed;
466  // close and return indication
468  rdr.offset = 0;
469  retsts = -SHMRW_EOVRWT;
470  }
471  else ovrwt=0;
472  }
473  else // already closed -- no additional ovrwt
474  ovrwt=0;
475  return (retsts);
476 } // read_grp_check
477 
479 {
480  if ((shm_hdr_ptr_==NULL) && (attach()!=0)) return (-1);
481  if (gIdx >= SEMMSL) gIdx = gIdx_dflt_;
482  if (gIdx >= shm_hdr_ptr_->grpsNum) return (-1);
484  DPRINT(("read_grp_close: gIdx=%u start=%u complete=%u\n",gIdx,rdr.idx_rd_start, rdr.idxcnt_rd_complete));
485  if (rdr.idx_rd_start != rdr.idxcnt_rd_complete)
486  { // need to close; check for possible ovrwt
488  { // ovrwt
489  ovrwt=1;
490  ++rdr.total_missed;
491  }
492  else ovrwt=0;
494  rdr.offset = 0;
495  }
496  else // already closed -- no additional ovrwt
497  ovrwt=0;
498  if ( (have_sem_[gIdx]==true) && !(semKeepAfterDataAvail) )
499  { semgive( gIdx );
500  have_sem_[gIdx] = false;
501  }
502  return (0);
503 } // read_grp_close
504 
505 
506 // compatibility method
507 int ShmRdWr::start_read( ShmRdWr_BufMetadata**hdr, ShmRdWrCnt& missed, uint32_t & cushion, int tmo_us )
508 { int retsts=0;
510  ssize_t ss=read_grp_start( info, tmo_us/1000 );
511  missed = info.ovrwt;
512  if (ss == 0) // 0 data is "no data"
513  { uint32_t ovrwt;
514  read_grp_close( ovrwt );
515  missed += ovrwt;
516  retsts = -1;
517  }
518  else
519  { *hdr = (ShmRdWr_BufMetadata*)info.data_ptr; // regardless of whether ss<0
520  retsts = ss;
521  }
522  return (retsts);
523 }
524 
525 int ShmRdWr::read( void* dstbuf, size_t dstbuf_sz_bytes
526  , timeval& this_or_last, ShmRdWrCnt& missed
527  , uint32_t& cushion, uint32_t& more_avail, int tmo_us )
529  time_t tmo_ms;
530  if (tmo_us == -1) tmo_ms = -1;
531  else tmo_ms = tmo_us/1000;
532  ssize_t ss=read_grp( dstbuf, dstbuf_sz_bytes, info, tmo_ms );
533  this_or_last = info.bufInfo.wr_complete_tod;
534  missed = info.ovrwt;
535  cushion = info.cushion;
536  return (ss);
537 }
538 
539 
540 int ShmRdWr::read_grp_prv( void* dst_ptr, size_t max_bytes
541  , time_t tmo, bool semKeep, shmrw_gid_t grpIdx )
542 {
543 
544  return (0);
545 } // read_grp_prv
546 
547 int ShmRdWr::write_grp_prv( void* src_ptr, size_t bytes_to_write
548  , time_t tmo, bool semKeep, shmrw_gid_t grpIdx )
549 {
550 
551  return (0);
552 } // write_grp_prv
553 
554 int ShmRdWr::get_grp_missed( uint32_t &missed, uint32_t &new_missed, shmrw_gid_t gIdx )
555 {
556  if ((shm_hdr_ptr_==NULL) && (attach()!=0)) return (-1);
557  if (gIdx >= SEMMSL) gIdx = gIdx_dflt_;
558  if (gIdx >= shm_hdr_ptr_->grpsNum) return (-1);
560 
561  missed = rdr.total_missed;
562  new_missed = rdr.total_missed
564 
565  return (0);
566 } // get_grp_missed
567 
568 
569 int ShmRdWr::info( size_t & buf_payload, size_t & buf_sz_bytes
570  , uint32_t & poll_period_us, uint32_t & version
571  , pid_t & writer_pid )
572 {
573  return (0);
574 }
575 
577 { timeval tv;
578  gettimeofday( &tv, NULL );
579  return (uint64_t)tv.tv_sec*1000000+tv.tv_usec;
580 } // get_us_timeofday
581 
583 {
584  return (0);
585 }
586 
587 std::ostream& operator <<( std::ostream& out, ShmRdWr& shmrw )
588 {
589  char bigbuf[4096];
590  snprintf( bigbuf, sizeof(bigbuf), "\
591  bufs_num=%u\n\
592  buf_sz_MB=%u\n\
593  grpsNum=%u\n\
594  grpsSiz_pages=%u\n\
595  bufs_largest_multiple=0x%08x\n\
596  largest_zero_offset=0x%08x\n\
597  idx_wr_start=%u\n\
598  idxcnt_wr_complete=%u\n\
599  offset=%ld\n\
600  total_bytes_written=%u\n\
601  semSetId=%d\n\
602  writer_pid=%u\n\
603 "
604  , shmrw.shm_hdr_ptr_->bufs_num
605  , shmrw.shm_hdr_ptr_->buf_sz_MB
606  , shmrw.shm_hdr_ptr_->grpsNum
607  , shmrw.shm_hdr_ptr_->grpsSiz_pages
610  , shmrw.shm_hdr_ptr_->idx_wr_start
612  , shmrw.shm_hdr_ptr_->offset
614  , shmrw.shm_hdr_ptr_->semSetId
615  , shmrw.shm_hdr_ptr_->writer_pid
616  );
617  std::string ostr(bigbuf);
618  snprintf( bigbuf, sizeof(bigbuf), "idx _bytes_ _1st_lword\n" );
619  ostr += bigbuf;
620  for (int ii=0; ii<shmrw.shm_hdr_ptr_->bufs_num; ++ii)
621  { uint32_t *dptr=(uint32_t*)( shmrw.shm_databuf_data_ptr_
622  + ii*shmrw.shm_hdr_ptr_->buf_sz_MB*0x100000);
623  snprintf( bigbuf, sizeof(bigbuf), "%3d %7ld ",ii
625  ostr += bigbuf;
626  unsigned jj;
627  for (jj=0; jj<6; ++jj)
628  { snprintf( bigbuf, sizeof(bigbuf), " 0x%08x", dptr[jj] );
629  ostr += bigbuf;
630  }
631  ostr+="\n ";
632  for (; jj<12; ++jj)
633  { snprintf( bigbuf, sizeof(bigbuf), " 0x%08x", dptr[jj] );
634  ostr += bigbuf;
635  }
636  ostr+='\n';
637  }
638  for (int ii=0; ii<shmrw.shm_hdr_ptr_->grpsNum; ++ii)
639  { int semval = semctl( shmrw.shm_hdr_ptr_->semSetId, ii, GETVAL );
640  uint32_t missed, new_missed;
641  shmrw.get_grp_missed( missed, new_missed, ii );
642  snprintf( bigbuf, sizeof(bigbuf), "\
643 grpIdx=%3d idxcnt_rd_complete=%u\n\
644  idx_rd_start=%u\n\
645  total_missed=%u %u\n\
646  offset=%ld\n\
647  semval=%d\n\
648 "
649  , ii
652  , missed, new_missed
653  , shmrw.shm_grprdr_info__ptr_[ii].offset
654  , semval
655  );
656  ostr += bigbuf;
657  }
658  out << ostr;
659  return (out);
660 } // operator <<
661 
662 
664 { key_t retkey;
665  const char *key=envvar();
666  const char *envval=getenv(key);
667  if (envval) retkey=strtoul(envval,NULL,0);
668  else retkey=SHMRW_DEFAULT_KEY;
669  return (retkey);
670 } // getShmKey
671 
672 
673 // returns: 0=success 1=error
675  , uint16_t& bufsNum, uint16_t& bufsSiz_MB
676  , uint8_t& grpsNum, uint8_t& grpsSiz_pages )
677 {
678  shmkey = getShmKey();
679  bufsNum = (shmkey & 0xff) + 1;
680  bufsSiz_MB = ((shmkey>>8) & 0xff) + 1;
681  grpsNum = ((shmkey>>16) & 0xff); // 0 is valid
682  grpsSiz_pages = ((shmkey>>24) & 0xf) + 1;
683  return (0);
684 } // getShmKeyParams
685 
686 size_t ShmRdWr::paramsToSize( uint16_t bufsNum, uint16_t bufsSiz_MB
687  , uint8_t grpsNum, uint8_t grpsSiz_pages )
688 {
689  size_t sz;
690  sz = 4096*5 + (size_t)bufsNum*(bufsSiz_MB*0x100000) + grpsNum*(grpsSiz_pages*4096);
691  return (sz);
692 }
693 
694 // The "bufs_largest_multiple" method (using 0xffffffff) allows "easy" "add 1"
695 // I must do the substract (ie. add negative) by hand.
696 uint32_t ShmRdWr::bufcnt_add( uint32_t bufcnt, int add )
697 { uint32_t tt;
698  if (add < 0)
699  { add = -add;
700  if ((uint32_t)add > bufcnt)
701  { add -= bufcnt;
703  }
704  else
705  tt=bufcnt-add;
706  }
707  else
708  tt=bufcnt+add;
709  return (tt%shm_hdr_ptr_->bufs_largest_multiple);
710 }
711 
712 int ShmRdWr::bufcnt2idx( uint32_t bufcnt )
713 { return (bufcnt % shm_hdr_ptr_->bufs_num);
714 }
715 
716 uint32_t ShmRdWr::bufcnt_delta( uint32_t wr, uint32_t rd )
717 { return ((wr>=rd)
718  ? wr-rd
720 }
721 
722 timeval& ShmRdWr::set_expire_time( timeval& expire_time, time_t tmo_ms )
723 { if (tmo_ms == -1)
724  { expire_time.tv_sec = expire_time.tv_usec = 0;
725  }
726  else
727  { gettimeofday( &expire_time, NULL );
728  time_t seconds = tmo_ms/1000;
729  expire_time.tv_sec += seconds;
730  time_t ms = tmo_ms - seconds*1000;
731  expire_time.tv_usec += ms*1000;
732  if (expire_time.tv_usec >= 1000000)
733  { expire_time.tv_usec -= 1000000;
734  ++expire_time.tv_sec;
735  }
736  }
737  DPRINT(("set_expire_time: expire_time.tv_usec=%ld\n", expire_time.tv_usec));
738  return (expire_time);
739 } // set_expire_time
740 
741 bool ShmRdWr::is_tmo( timeval& expire_time )
742 {
743  timeval time_now;
744  gettimeofday( &time_now, NULL );
745  if ( ( (expire_time.tv_sec==time_now.tv_sec)
746  &&(expire_time.tv_usec<=time_now.tv_usec))
747  ||(expire_time.tv_sec < time_now.tv_sec))
748  { DPRINT(("now:%ld.%06ld exp:%ld.%06ld\n"
749  ,time_now.tv_sec,time_now.tv_usec,expire_time.tv_sec,expire_time.tv_usec));
750  return (true);
751  }
752  else
753  return (false);
754 } // is_tmo
755 
756 void ShmRdWr::expire_tv2ts( const timeval& texp, timespec& tdiff )
757 { timeval tnow;
758  gettimeofday( &tnow, NULL );
759  // get diff texp-tnow
760  tdiff.tv_nsec = (texp.tv_usec + 1000000 - tnow.tv_usec)*1000;
761  tdiff.tv_sec = texp.tv_sec - tnow.tv_sec;
762  if (tdiff.tv_nsec >= 1000000000)
763  tdiff.tv_nsec -= 1000000000;
764  else tdiff.tv_sec -= 1;
765  if (tdiff.tv_sec < 0)
766  tdiff.tv_sec = tdiff.tv_nsec = 0;
767  DPRINT(("expire_tv2ts: tdiff.tv_sec=%ld tdiff.tv_nsec=%ld\n"
768  ,tdiff.tv_sec, tdiff.tv_nsec));
769  return;
770 }
771 
772 // return true if got sem, false otherwise
774 {
775  sembuf sops;
776  sops.sem_num = gIdx;
777  sops.sem_op = -1;
778  sops.sem_flg = SEM_UNDO;
779  int semsts=0;
780 #ifdef DARWINBUILD
781  semsts=semop( shm_hdr_ptr_->semSetId, &sops, 1 );
782 #else
783  semsts=semtimedop( shm_hdr_ptr_->semSetId, &sops, 1, &tmo_ts );
784 #endif
785  if ((semsts==-1) && (errno==EAGAIN))
786  { DPRINT(("SEMOP TIMEOUT\n"));
787  return (false);
788  }
789  else if (semsts==-1)
790  { perror("semtimedop");
791  printf("SEMOP OTHER ERROR=%d semSetId=%d sem_num=%u tmo.sec=%ld tmp.nsec=%ld\n"
792  ,errno,shm_hdr_ptr_->semSetId,(unsigned short)gIdx,tmo_ts.tv_sec
793  ,tmo_ts.tv_nsec);
794  return (false);
795  }
796  return true;
797 } // semtake
798 
800 { sembuf sops;
801  sops.sem_num = gIdx;
802  sops.sem_op = 1;
803  sops.sem_flg = SEM_UNDO;
804  semop( shm_hdr_ptr_->semSetId, &sops, 1 );
805 } // semgive
806 
807 void ShmRdWr::tprint( const char* fmt, ... )
808 { timeval tv;
809  va_list ap;
810  va_start( ap, fmt);
811  gettimeofday( &tv, NULL );
812  printf( "%ld.%06ld ", tv.tv_sec, tv.tv_usec);
813  vprintf( fmt, ap );
814  va_end( ap );
815 }
void tprint(const char *fmt,...) __attribute__((format(printf
Definition: ShmRdWr.cpp:807
static bool const semKeepAfterDataAvail
Definition: ShmRdWr.h:116
friend std::ostream & operator<<(std::ostream &, ShmRdWr &)
Definition: ShmRdWr.cpp:587
uint8_t * data_ptr
Definition: ShmRdWr.h:84
const XML_Char XML_Encoding * info
Definition: expat.h:530
int cushion
Definition: ShmRdWr.h:80
ShmRdWr_BufMetadata * shm_databuf_metadata_ptr_
Definition: ShmRdWr.h:191
static constexpr Double_t ms
Definition: Munits.h:192
#define DPRINT(x)
Definition: ShmRdWr.cpp:46
ssize_t read_grp(void *dest, size_t, ShmRdWr_Info &, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:375
int print_header(int fd)
Definition: ShmRdWr.cpp:582
bool is_tmo(timeval &)
Definition: ShmRdWr.cpp:741
off_t offset
Definition: ShmRdWr.h:99
uint32_t total_missed
Definition: ShmRdWr.h:67
#define SEMMSL
Definition: ShmRdWr.h:36
timeval & set_expire_time(timeval &, time_t)
Definition: ShmRdWr.cpp:722
volatile uint32_t idx_rd_start
Definition: ShmRdWr.h:65
ssize_t write(void *src, size_t, bool write_partial, off_t=-1)
Definition: ShmRdWr.cpp:165
int read_grp_close(uint32_t &ovrwt, bool=false, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:478
int read_grp_prv(void *dest, size_t, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:540
int write_grp_prv(void *src, size_t, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:547
Float_t ss
Definition: plot.C:24
bool semtake(shmrw_gid_t, timespec &)
Definition: ShmRdWr.cpp:773
uint32_t ShmRdWrCnt
Definition: ShmRdWr.h:29
size_t paramsToSize(uint16_t, uint16_t, uint8_t, uint8_t)
Definition: ShmRdWr.cpp:686
unsigned short shmrw_gid_t
Definition: ShmRdWr.h:57
ShmRdWr(int options=0, shmrw_gid_t default_gIdx=SEMDFLT)
Definition: ShmRdWr.cpp:56
uint32_t total_bytes_written
Definition: ShmRdWr.h:100
int gIdx
Definition: show_event.C:12
uint32_t ovrwt
Definition: ShmRdWr.h:78
int bufcnt2idx(uint32_t)
Definition: ShmRdWr.cpp:712
key_t getShmKey()
Definition: ShmRdWr.cpp:663
const char * envvar()
Definition: ShmRdWr.h:170
uint32_t poll_ms_
Definition: ShmRdWr.h:204
size_t data_bytes
Definition: ShmRdWr.h:73
size_t buf_sz_bytes
Definition: ShmRdWr.h:94
uint32_t largest_zero_offset
Definition: ShmRdWr.h:96
Definition: type_traits.h:56
pid_t writer_pid
Definition: ShmRdWr.h:104
int options_
Definition: ShmRdWr.h:190
uint32_t ovrwt_tot
Definition: ShmRdWr.h:79
uint16_t grpsSiz_pages
Definition: ShmRdWr.h:93
uint32_t bufcnt_add(uint32_t, int)
Definition: ShmRdWr.cpp:696
uint64_t get_us_timeofday()
Definition: ShmRdWr.cpp:576
ssize_t read(void *dest, size_t, ShmRdWr_Info &, time_t=-1, bool=false)
Definition: ShmRdWr.cpp:356
timeval wr_complete_tod
Definition: ShmRdWr.h:74
off_t offset
Definition: ShmRdWr.h:69
std::string getenv(std::string const &name)
ShmRdWr_Header * shm_hdr_ptr_
Definition: ShmRdWr.h:188
ShmRdWr_BufMetadata bufInfo
Definition: ShmRdWr.h:85
printf("%d Experimental points found\n", nlines)
shmrw_gid_t gIdx_dflt_
Definition: ShmRdWr.h:202
uint16_t buf_sz_MB
Definition: ShmRdWr.h:91
int read_next_(ShmRdWr_Info &, timeval &, ShmRdWr_Rdr &)
Definition: ShmRdWr.cpp:215
ssize_t read_grp_start(ShmRdWr_Info &, time_t tmo=-1, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:439
volatile uint32_t idx_wr_start
Definition: ShmRdWr.h:97
uint32_t bufcnt_delta(uint32_t, uint32_t)
Definition: ShmRdWr.cpp:716
#define off_t
Definition: macconfig.h:47
int attach()
Definition: ShmRdWr.cpp:70
volatile uint32_t idxcnt_rd_complete
Definition: ShmRdWr.h:66
void expire_tv2ts(const timeval &, timespec &)
Definition: ShmRdWr.cpp:756
const XML_Char * version
Definition: expat.h:187
uint8_t * shm_prv_data_ptr_
Definition: ShmRdWr.h:200
::xsd::cxx::tree::string< char, simple_type > string
Definition: Database.h:154
volatile uint32_t idxcnt_wr_complete
Definition: ShmRdWr.h:98
int getShmKeyParams(key_t &, uint16_t &, uint16_t &, uint8_t &, uint8_t &)
Definition: ShmRdWr.cpp:674
void semgive(shmrw_gid_t)
Definition: ShmRdWr.cpp:799
uint16_t grpsNum
Definition: ShmRdWr.h:92
#define SHMRW_EOVRWT
Definition: ShmRdWr.h:54
#define SHMRW_O_WRITER
Definition: ShmRdWr.h:52
std::vector< bool > have_sem_
Definition: ShmRdWr.h:201
ShmRdWr_Rdr rdr_info_
Definition: ShmRdWr.h:195
ssize_t read_(void *, size_t, ShmRdWr_Info &, timeval &, bool, ShmRdWr_Rdr &)
Definition: ShmRdWr.cpp:298
#define SHMRW_DEFAULT_KEY
Definition: ShmRdWr.h:48
int get_grp_missed(uint32_t &missed, uint32_t &new_missed, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:554
add("abs", expr_type(int_type()), expr_type(int_type()))
uint8_t * shm_databuf_data_ptr_
Definition: ShmRdWr.h:192
uint32_t bufs_largest_multiple
Definition: ShmRdWr.h:95
int start_read(ShmRdWr_BufMetadata **hdr, ShmRdWrCnt &missed, uint32_t &cushion, int tmo_us)
Definition: ShmRdWr.cpp:507
int errno
Definition: errno.cpp:12
int info(size_t &buf_payload, size_t &buf_sz_bytes, uint32_t &poll_period_us, uint32_t &version, pid_t &writer_pid)
Definition: ShmRdWr.cpp:569
ShmRdWr_Rdr * shm_grprdr_info__ptr_
Definition: ShmRdWr.h:198
int read_grp_check(uint32_t &ovrwt, shmrw_gid_t gIdx=SEMMSL)
Definition: ShmRdWr.cpp:453
uint16_t bufs_num
Definition: ShmRdWr.h:90
std::string key_t
Definition: KeyAssembler.h:71