ShmRdWr.h
Go to the documentation of this file.
1 // This file (ShmRdWr.h) was created by Ron Rechenmacher <ron@fnal.gov> on
2 // May 20, 2011. "TERMS AND CONDITIONS" governing this file are in the README
3 // or COPYING file. If you do not have such a file, one can be obtained by
4 // contacting Ron or Fermi Lab in Batavia IL, 60510, phone: 630-840-3000.
5 // $RCSfile: ShmRdWr.h,v $
6 // rev="$Revision: 1.19.12.1 $$Date: 2019/09/27 00:07:37 $";
7 #ifndef SHMRDWR_H
8 #define SHMRDWR_H
9 
10 #include <stdio.h> /* printf */
11 #include <stdint.h> // uint32_t
12 #include <stddef.h> // offsetof
13 #include <netinet/in.h> // IPPROTO_IP, sockaddr_in, htons(), htonl()
14 #include <unistd.h> // SEEK_SET
15 #include <vector>
16 #include <string>
17 
18 #pragma GCC diagnostic ignored "-Wpedantic"
19 
20 // COMPATIBILITY
21 #define ShmWr ShmRdWr
22 #define ShmRd ShmRdWr
23 #define start_write() write( 0,0,true,sizeof(ShmRdWr_BufMetadata) )
24 #define add_data( src, sz ) write( src, sz, true )
25 #define shm_rdwr_buffer_header ShmRdWr_BufMetadata
26 #define shm_rdwr_header ShmRdWr_Header
27 #define get_ptr getShmRdWr
28 #define bytes_total data_bytes
29 typedef uint32_t ShmRdWrCnt;
30 #define check_overwrite( missed,cush ) read_grp_check( missed )
31 #define end_read( missed,cush ) read_grp_close( missed )
32 #define read_wait read
33 
34 
35 #ifndef SEMMSL
36 # define SEMMSL 250 // from /usr/include/linux/sem.h
37 #endif
38 
39 /* The format is 0xYPSSMMBB, where (all values are hex)
40  Y an arbitrary, user specified value from 0 to F
41  P size in "pages-1" of the semaphore user write areas (0-F is 1-0x10 pages)
42  SS number of semaphore areas (or "reader groups") (from 0 to FF) actually 0 to 250
43  MM main write buffers buffer size in "megabytes-1" (0-FF is 1 to 0x100 MB)
44  BB number of main buffers-1 (0-FF is 1 to 0x100 buffers)
45  Override this default via environment variable SHM_KEY
46  */
47 #ifndef SHMRW_DEFAULT_KEY
48 #define SHMRW_DEFAULT_KEY 0x00017003 // 113 MB buffers, 4 of them
49 #endif
50 
51 // Options
52 #define SHMRW_O_WRITER 1
53 
54 #define SHMRW_EOVRWT 4 // arbitray value for over write error
55 
56 #if 1
57 typedef unsigned short shmrw_gid_t;
58 #else
59 //struct shmrw_gid_t { unsigned short gid; shmrw_gid_t(unsigned short gi):gid(gi){} inline operator unsigned short(){return gid;}};
60 struct shmrw_gid_t { unsigned short gid; inline operator unsigned short(){return gid;}};
61 #endif
62 #define SEMDFLT (shmrw_gid_t){0}
63 
65 { volatile uint32_t idx_rd_start;
66  volatile uint32_t idxcnt_rd_complete;
67  uint32_t total_missed; //aka total overwrite, ovrwt_tot
68  uint32_t reserved; // reserve here for naturall alignment of off_t
70  uint32_t reserved2[2];
71 };
73 { size_t data_bytes;
74  timeval wr_complete_tod;
75  uint32_t reserved[2];
76 };
78 { uint32_t ovrwt; // ovr since last read
79  uint32_t ovrwt_tot;
80  int cushion; // current/instantaneous cushion
81  timeval last_write_tod;
84  uint8_t *data_ptr;
86 };
87 //struct ShmRdWr_Header; // forward declaration - defined below. Need to define here for backward COMPATIBILITY
89 { uint32_t version;
90  uint16_t bufs_num; // num_bufs=2**bufs_2n; i.e 0=1, 1=2, 2=4,...
91  uint16_t buf_sz_MB;
92  uint16_t grpsNum;
93  uint16_t grpsSiz_pages;
94  size_t buf_sz_bytes;
97  volatile uint32_t idx_wr_start;
98  volatile uint32_t idxcnt_wr_complete; // 1st idx complete will be 1
99  off_t offset; // write offset corresponding to idx_wr_start
101  uint32_t poll_period_us;
102  int semSetId;
104  pid_t writer_pid;
106 };
107 
108 
109 
110 class ShmRdWr
111 {
112 public:
113 
114  ShmRdWr(int options=0, shmrw_gid_t default_gIdx = SEMDFLT);
115 
116  static bool const semKeepAfterDataAvail/*=true*/;
117 
118  int attach();
119  inline int attach( key_t k, uint32_t buf_siz, int32_t bufs_num2n ) { return attach(); } // COMPATIBILITY
120 
121  // NOTE: A "-1" (negative 1) val for off_t means "let the code manage it"
122  // A "-1" (negative 1) val for time_t mean "infinite"
123 
124  ssize_t write( void*src, size_t, bool write_partial, off_t=-1 );
125  ssize_t write( void*src, size_t, off_t, bool write_partial=false );
126  ssize_t write( void*src, size_t );
127 
128  inline int end_write() { unsigned bidx=bufcnt2idx(shm_hdr_ptr_->idx_wr_start);
129  return write(&shm_databuf_metadata_ptr_[bidx],sizeof(ShmRdWr_BufMetadata),(off_t)0); } // for backward COMPATIBILITY
130  inline void seek(off_t off) { off+=sizeof(ShmRdWr_BufMetadata);write(0,0,true,off); } // for backward COMPATIBILITY
131 
132  // independent/no grp/semaphore
133  // bool is "allowPartial"; if true, info must be check to see if read complete (off==bytesInBuf)
134  ssize_t read( void*dest, size_t, ShmRdWr_Info&, time_t=-1, bool=false ); // [tmo] NOTE: time_t here is ms
135  ssize_t read( void*dest, size_t, uint32_t&ovrwt, time_t=-1 ); // [tmo,] NOTE: time_t here is ms
136  ssize_t read( void*dest, size_t, time_t=-1, uint&ovrwt=*(uint*)0 ); // [tmo,] NOTE: time_t here is ms
137 
138  // grp/semaphore controlled - takes sem if need be; release if no data
139  // bool is "semKeepAfterDataAvail"
140  // return bytes read
141  ssize_t read_grp( void*dest, size_t, ShmRdWr_Info&, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL ); // [tmp,grp] NOTE: time_t here is ms
142  ssize_t read_grp( void*dest, size_t, uint32_t&ovrwt, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL ); // [tmo,grp] NOTE: time_t here is ms
143  ssize_t read_grp( void*dest, size_t, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL ); // [tmo, grp] NOTE: time_t here is ms
144 
145  // "start" will semKeepAfterDataAvail (watch out for 0 byte data)
146  ssize_t read_grp_start( ShmRdWr_Info&, time_t tmo=-1, shmrw_gid_t gIdx=SEMMSL ); // return bytes available
147  ssize_t read_grp_start( uint32_t&ovrwt, time_t tmo=-1, shmrw_gid_t gIdx=SEMMSL ); // return bytes available
148 
149  // for a read in-progress, (usually using ptr) _just_"check" if ovrwt has
150  // occurred OR finish read, doing one last check and optionally keep semaphore
151  int read_grp_check( uint32_t&ovrwt, shmrw_gid_t gIdx=SEMMSL ); // for occassional checks of overwrite
152  int read_grp_close( uint32_t&ovrwt, bool=false, shmrw_gid_t gIdx=SEMMSL ); // end the read -- one last check for ovrwt, optional keepSem
153 
154 
155  int start_read( ShmRdWr_BufMetadata**hdr, ShmRdWrCnt& missed, uint32_t & cushion, int tmo_us ); // for backward COMPATIBILITY
156  int read( void* dstbuf, size_t dstbuf_sz_bytes
157  , timeval& this_or_last, ShmRdWrCnt& missed
158  , uint32_t& cushion, uint32_t& more_avail, int tmo_us=-1 ); // COMPATIBILITY
159 
160 
161  // takes sem if need be, leaves based on arg -- no ovrwt,etc
162  int read_grp_prv( void*dest, size_t, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL ); // [grpIdx, tmo,release] NOTE: time_t here is ms
163  int write_grp_prv( void*src, size_t, time_t=-1, bool=false, shmrw_gid_t gIdx=SEMMSL ); // [tmo,release] NOTE: time_t here is ms
164 
165  int get_grp_missed( uint32_t &missed, uint32_t &new_missed, shmrw_gid_t gIdx=SEMMSL );
166 
167  key_t getShmKey();
168 
169  inline void setpoll_ms_( uint32_t poll_ms ) { poll_ms_ = poll_ms; }
170  inline const char* envvar() { return ("SHMRW_KEY"); }
172  { if (shm_hdr_ptr_!=NULL) return shm_hdr_ptr_;
173  else return (ShmRdWr_Header*)-1;
174  }
175 
176  int info( size_t & buf_payload, size_t & buf_sz_bytes
177  , uint32_t & poll_period_us, uint32_t & version
178  , pid_t & writer_pid );
179  uint64_t get_us_timeofday();
180  int print_header( int fd );
181  friend std::ostream& operator <<( std::ostream&, ShmRdWr& );
182 
183 
184 # ifndef TEST
185 private:
186 # endif
187  // common to reader/writer
189 
190  int options_;
193 
194  // read (independent) -------------------------------------------
196 
197  // read (group) -------------------------------------------------
199 
201  std::vector<bool> have_sem_;
203 
204  uint32_t poll_ms_;
205 
206  int read_next_( ShmRdWr_Info&, timeval&, ShmRdWr_Rdr& ); // returns bufIdx (>=0) OR error/tmo (<0)
207  ssize_t read_( void*,size_t,ShmRdWr_Info&, timeval&, bool, ShmRdWr_Rdr& );
208  int getShmKeyParams(key_t&, uint16_t&,uint16_t&, uint8_t&,uint8_t&);
209  size_t paramsToSize( uint16_t,uint16_t, uint8_t,uint8_t );
210  uint32_t bufcnt_add( uint32_t, int );
211  int bufcnt2idx( uint32_t );
212  uint32_t bufcnt_delta( uint32_t, uint32_t );
213  timeval& set_expire_time( timeval&, time_t );
214  bool is_tmo( timeval& );
215  void expire_tv2ts( const timeval&, timespec& );
216  bool semtake( shmrw_gid_t, timespec& );
217  void semgive( shmrw_gid_t );
218  void tprint( const char *fmt, ... )__attribute__((format(printf,2,3)));//attribute--recall arg shift b/c of implicit "this" arg.
219 };
220 
221 #endif // SHMRDWR_H
uint8_t * data_ptr
Definition: ShmRdWr.h:84
const XML_Char XML_Encoding * info
Definition: expat.h:530
int cushion
Definition: ShmRdWr.h:80
ShmRdWr_BufMetadata * shm_databuf_metadata_ptr_
Definition: ShmRdWr.h:191
int buffersAvail
Definition: ShmRdWr.h:83
ShmRdWr_Header * getShmRdWr()
Definition: ShmRdWr.h:171
off_t offset
Definition: ShmRdWr.h:99
uint32_t total_missed
Definition: ShmRdWr.h:67
#define SEMMSL
Definition: ShmRdWr.h:36
volatile uint32_t idx_rd_start
Definition: ShmRdWr.h:65
uint32_t ShmRdWrCnt
Definition: ShmRdWr.h:29
unsigned short shmrw_gid_t
Definition: ShmRdWr.h:57
uint32_t total_bytes_written
Definition: ShmRdWr.h:100
int gIdx
Definition: show_event.C:12
uint32_t ovrwt
Definition: ShmRdWr.h:78
const char * envvar()
Definition: ShmRdWr.h:170
timeval last_write_tod
Definition: ShmRdWr.h:81
void seek(off_t off)
Definition: ShmRdWr.h:130
uint32_t poll_ms_
Definition: ShmRdWr.h:204
size_t data_bytes
Definition: ShmRdWr.h:73
write
Run ND cosmics.
size_t buf_sz_bytes
Definition: ShmRdWr.h:94
uint32_t largest_zero_offset
Definition: ShmRdWr.h:96
pid_t writer_pid
Definition: ShmRdWr.h:104
int options_
Definition: ShmRdWr.h:190
time_t penultimate_ms
Definition: ShmRdWr.h:103
uint32_t ovrwt_tot
Definition: ShmRdWr.h:79
uint32_t version
Definition: ShmRdWr.h:89
uint16_t grpsSiz_pages
Definition: ShmRdWr.h:93
__attribute__((unused)) static std
timeval wr_complete_tod
Definition: ShmRdWr.h:74
off_t offset
Definition: ShmRdWr.h:69
ShmRdWr_Header * shm_hdr_ptr_
Definition: ShmRdWr.h:188
ShmRdWr_BufMetadata bufInfo
Definition: ShmRdWr.h:85
printf("%d Experimental points found\n", nlines)
shmrw_gid_t gIdx_dflt_
Definition: ShmRdWr.h:202
uint16_t buf_sz_MB
Definition: ShmRdWr.h:91
std::string format(const int32_t &value, const int &ndigits=8)
Definition: HexUtils.cpp:14
time_t penultimate_ms
Definition: ShmRdWr.h:82
volatile uint32_t idx_wr_start
Definition: ShmRdWr.h:97
#define off_t
Definition: macconfig.h:47
volatile uint32_t idxcnt_rd_complete
Definition: ShmRdWr.h:66
const XML_Char * version
Definition: expat.h:187
std::ostream & operator<<(std::ostream &o, skim::ParametersNue const &p)
uint8_t * shm_prv_data_ptr_
Definition: ShmRdWr.h:200
volatile uint32_t idxcnt_wr_complete
Definition: ShmRdWr.h:98
void setpoll_ms_(uint32_t poll_ms)
Definition: ShmRdWr.h:169
uint16_t grpsNum
Definition: ShmRdWr.h:92
std::vector< bool > have_sem_
Definition: ShmRdWr.h:201
uint32_t poll_period_us
Definition: ShmRdWr.h:101
sockaddr_in addr_notify_wr_complete
Definition: ShmRdWr.h:105
ShmRdWr_Rdr rdr_info_
Definition: ShmRdWr.h:195
int attach(key_t k, uint32_t buf_siz, int32_t bufs_num2n)
Definition: ShmRdWr.h:119
#define SEMDFLT
Definition: ShmRdWr.h:62
uint8_t * shm_databuf_data_ptr_
Definition: ShmRdWr.h:192
uint32_t bufs_largest_multiple
Definition: ShmRdWr.h:95
ShmRdWr_Rdr * shm_grprdr_info__ptr_
Definition: ShmRdWr.h:198
uint32_t reserved
Definition: ShmRdWr.h:68
uint16_t bufs_num
Definition: ShmRdWr.h:90
int end_write()
Definition: ShmRdWr.h:128
std::string key_t
Definition: KeyAssembler.h:71
unsigned int uint