MessageBuffer.h
Go to the documentation of this file.
1 #ifndef MESSAGEBUFFER__H
2 #define MESSAGEBUFFER__H
3 
4 #include "DAQMessages/ccpp_NSNMessages.h"
5 #include <boost/interprocess/containers/map.hpp>
6 #include <boost/interprocess/containers/vector.hpp>
7 #include <boost/interprocess/allocators/allocator.hpp>
8 #include <boost/interprocess/managed_shared_memory.hpp>
9 #include <boost/interprocess/shared_memory_object.hpp>
10 #include <boost/interprocess/sync/named_mutex.hpp>
11 
12 namespace novaddt{
13  class MessageBuffer;
14  class sm_cleaner;
15 }
16 
18  //a small helper class for (attempt) clearing shared memory
19  //on construction and destruction
20  public:
21  sm_cleaner(const char* Name):name(Name){
22  //clear();
23  }
25  void clear(){
26  if(boost::interprocess::shared_memory_object::remove(name))
27  std::cout<<"Shared memory \""<<name<<"\" cleared"<<std::endl;
28  }
29  const char* name;
30 };
31 
32 
34  /// a class to manage point buffer in shared memory
35  /// When 10 points are accumulated, we can form an NSNPackedMessage
36  public:
37  using TimeType = uint64_t;
38  using DataType = uint64_t;
39  using Point = std::pair<TimeType, DataType>; /// a point is pair (timestamp,nclusters)
40  using PointAllocator = boost::interprocess::allocator
41  <Point,boost::interprocess::managed_shared_memory::segment_manager>;
42  using PointVector = boost::interprocess::vector<Point, PointAllocator>;
43  using interprocess_mutex = boost::interprocess::interprocess_mutex;
44  using scoped_lock = boost::interprocess::scoped_lock<interprocess_mutex>;
45 
46  MessageBuffer(const char* shmemName,const char* bufName, size_t Capacity=10):
47  _sm_cleaner(shmemName),
48  _segment(boost::interprocess::open_or_create, shmemName, 65536),
49  _allocator(_segment.get_segment_manager()),
50  _buffer(_segment.find_or_construct<PointVector>(bufName)(_allocator)),
51  _mutex(_segment.find_or_construct<interprocess_mutex>("mtx")()),
52  _capacity(Capacity)
53  {
54  scoped_lock lock(*_mutex);
55  //set buffer length
56  _buffer->reserve(_capacity);
57  boost::interprocess::managed_shared_memory::shrink_to_fit(shmemName);
58  std::cout<<"Open messageBuffer \""<<bufName<<"\" of size= "<<_segment.get_size()<<std::endl;
59  }
60 
62  }
63 
64  void addPoint(Point p){
65  if(isFull())
66  throw std::logic_error("Trying to fill full buffer");
67  _buffer->push_back(p);
68  }
69  bool isFull(){return (_buffer->size()>=_capacity);}
70  size_t size(){return _buffer->size();}
71  size_t capacity(){return _capacity;}
72  void clear(){_buffer->clear();}
73 
74  nsnmessages::NSNPackedMessage makeMessage(){
75  //prepare a message to be sent
76  nsnmessages::NSNPackedMessage m;
77  for(size_t i=0;i<_capacity;++i){
78  Point p=_buffer->at(i);
79  m.time[i]=p.first;
80  m.hits[i]=p.second;
81  }
82  return m;
83  }
84  scoped_lock ScopedLock(){return scoped_lock(*_mutex);}
85  private:
87  boost::interprocess::managed_shared_memory _segment;
91  unsigned* _counter;
92  size_t _capacity;
93 };
94 
95 
96 #endif
const XML_Char * name
Definition: expat.h:151
boost::interprocess::allocator< Point, boost::interprocess::managed_shared_memory::segment_manager > PointAllocator
a point is pair (timestamp,nclusters)
Definition: MessageBuffer.h:41
const char * p
Definition: xmltok.h:285
sm_cleaner(const char *Name)
Definition: MessageBuffer.h:21
novaddt::sm_cleaner _sm_cleaner
Definition: MessageBuffer.h:86
boost::interprocess::interprocess_mutex interprocess_mutex
Definition: MessageBuffer.h:43
std::pair< TimeType, DataType > Point
Definition: MessageBuffer.h:39
MessageBuffer(const char *shmemName, const char *bufName, size_t Capacity=10)
Definition: MessageBuffer.h:46
interprocess_mutex * _mutex
Definition: MessageBuffer.h:90
boost::interprocess::vector< Point, PointAllocator > PointVector
Definition: MessageBuffer.h:42
boost::interprocess::managed_shared_memory _segment
Definition: MessageBuffer.h:87
nsnmessages::NSNPackedMessage makeMessage()
Definition: MessageBuffer.h:74
PointVector * _buffer
Definition: MessageBuffer.h:89
scoped_lock ScopedLock()
Definition: MessageBuffer.h:84
OStream cout
Definition: OStream.cxx:6
void addPoint(Point p)
Definition: MessageBuffer.h:64
boost::interprocess::scoped_lock< interprocess_mutex > scoped_lock
Definition: MessageBuffer.h:44
const PointAllocator _allocator
Definition: MessageBuffer.h:88
const char * name
Definition: MessageBuffer.h:29