FrameProcessor API Reference#
Controllers#
FrameProcessorController#
-
class FrameProcessorController : public FrameProcessor::IFrameCallback, public boost::enable_shared_from_this<FrameProcessorController>#
The FrameProcessorController class has overall responsibility for management of the core classes and plugins present within the frame processor application. This class maintains the SharedMemoryController and SharedMemoryParser classes. The class also manages the control IpcChannel, and accepts configuration IpcMessages. The class provides an interface for loading plugins, connecting the plugins together into chains and for configuring the plugins (from the control channel).
The class uses an IpcReactor to manage connections and status updates.
Public Functions
-
FrameProcessorController(unsigned int num_io_threads = OdinData::Defaults::default_io_threads)#
Construct a new FrameProcessorController class.
The constructor sets up logging used within the class, and starts the IpcReactor thread.
-
virtual ~FrameProcessorController()#
Destructor.
-
void handleCtrlChannel()#
Handle an incoming configuration message.
This method is called by the IpcReactor when a configuration IpcMessage has been received. The raw message is read and parsed into an IpcMessage for further processing. The configure method is called, and once configuration has completed a response IpcMessage is sent back on the control channel.
-
void handleMetaRxChannel()#
-
void provideStatus(OdinData::IpcMessage &reply)#
Provide status information to requesting clients.
This is called in response to a status request from a connected client. The reply to the request is populated with status information from the shared memory controller and all the plugins currently loaded, and with any error messages currently stored.
- Parameters:
reply – [inout] - response IPC message to be populated with status parameters
-
void provideVersion(OdinData::IpcMessage &reply)#
Provide version information to requesting clients.
This is called in response to a version request from a connected client. The reply to the request is populated with version information from application and all the plugins currently loaded.
- Parameters:
reply – [inout] - response IPC message to be populated with version information
-
void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for the FrameProcessorController.
Sets up the overall frameProcessor application according to the configuration IpcMessage objects that are received. The objects are searched for: CONFIG_SHUTDOWN - Shuts down the application CONFIG_STATUS - Retrieves status for all plugins and replies CONFIG_CTRL_ENDPOINT - Calls the method setupControlInterface CONFIG_PLUGIN - Calls the method configurePlugin CONFIG_FR_SETUP - Calls the method setupFrameReceiverInterface
The method also searches for configuration objects that have the same index as loaded plugins. If any of these are found the they are passed down to the plugin for execution.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
void requestConfiguration(OdinData::IpcMessage &reply)#
Request the current configuration of the FrameProcessorController.
The method also searches through all loaded plugins. Each plugin is also sent a request for its configuration.
- Parameters:
reply – [out] - Response IpcMessage with the current configuration.
-
void execute(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Submit commands to the FrameProcessor plugins.
Submits command(s) to execute on individual plugins if they support commands. The IpcMessage should contain the command name and a structure of any parameters required by the command.
This method searches for command objects that have the same index as loaded plugins. If any of these are found then the commands are passed down to the plugin for execution.
- Parameters:
config – [in] - IpcMessage containing command and any parameter data.
reply – [out] - Response IpcMessage.
-
void requestCommands(OdinData::IpcMessage &reply)#
Request the command set supported by this FrameProcessorController and its loaded plugins.
The method searches through all loaded plugins. Each plugin is also sent a request for its supported commands.
- Parameters:
reply – [out] - Response IpcMessage with the current supported command set.
-
void resetStatistics(OdinData::IpcMessage &reply)#
Reset statistics on all of the loaded plugins.
The method calls reset statistics on all of the loaded plugins.
- Parameters:
reply – [out] - Response IpcMessage with the current status.
-
void configurePlugin(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for the plugins.
Sets up the plugins loaded into the controller according to the configuration IpcMessage objects that are received. The objects are searched for: CONFIG_PLUGIN_LIST - Replies with a list of loaded plugins CONFIG_PLUGIN_LOAD - Uses NAME, INDEX and LIBRARY to load a plugin into the controller. CONFIG_PLUGIN_CONNECT - Uses CONNECTION and INDEX to connect one plugin input to another plugin output. CONFIG_PLUGIN_DISCONNECT - Uses CONNECTION and INDEX to disconnect one plugin from another.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
void loadPlugin(const std::string &index, const std::string &name, const std::string &library)#
Load a new plugin.
Attempts to load the specified library dynamically using the classloader. If the index specified is already used then throws an error. Once the plugin has been loaded it’s processing thread is started. The same plugin type can be loaded multiple times as long as each index is unique.
- Parameters:
index – [in] - Unique index required for the plugin.
name – [in] - Name of the plugin class.
library – [in] - Full path of shared library file for the plugin.
-
void connectPlugin(const std::string &index, const std::string &connectTo)#
Connects two plugins together.
When plugins have been connected they can pass frame objects between them.
- Parameters:
index – [in] - Index of the plugin wanting to connect.
connectTo – [in] - Index of the plugin to connect to.
-
void disconnectPlugin(const std::string &index, const std::string &disconnectFrom)#
Disconnect one plugin from another plugin.
- Parameters:
index – [in] - Index of the plugin wanting to disconnect.
disconnectFrom – [in] - Index of the plugin to disconnect from.
-
void disconnectAllPlugins()#
Disconnect all plugins from each other.
-
void run()#
-
void waitForShutdown()#
Wait for the exit condition before returning.
-
void shutdown()#
Private Functions
-
void setupFrameReceiverInterface(const std::string &frPublisherString, const std::string &frSubscriberString)#
Set up the frame receiver interface.
This method creates new SharedMemoryController and SharedMemoryParser objects, which manage the receipt of frame ready notifications and construction of Frame objects from shared memory. Pointers to the two objects are kept by this class.
- Parameters:
sharedMemName – [in] - Name of the shared memory block opened by the frame receiver.
frPublisherString – [in] - Endpoint for sending frame release notifications.
frSubscriberString – [in] - Endpoint for receiving frame ready notifications.
-
void closeFrameReceiverInterface()#
Close the frame receiver interface.
-
void setupControlInterface(const std::string &ctrlEndpointString)#
Set up the control interface.
This method binds the control IpcChannel to the provided endpoint, creating a socket for controlling applications to connect to. This socket is used for sending configuration IpcMessages.
- Parameters:
ctrlEndpointString – [in] - Name of the control endpoint.
-
void closeControlInterface()#
Close the control interface.
-
void setupMetaRxInterface()#
-
void closeMetaRxInterface()#
-
void setupMetaTxInterface(const std::string &metaEndpointString)#
-
void closeMetaTxInterface()#
-
void runIpcService(void)#
Start the Ipc service running.
Sets up a tick timer and runs the Ipc reactor. Currently the tick timer does not perform any processing.
-
void tickTimer(void)#
Tick timer task called by IpcReactor.
This currently performs no processing.
Count frames passed through plugin chain and trigger shutdown when expected datasets received.
This is registered as a blocking callback from the end of the plugin chain. It will check for shutdown conditions and then return once all plugins have been notified to shutdown by the main thread.
- Parameters:
frame – - Pointer to the frame
Private Members
-
log4cxx::LoggerPtr logger_#
Pointer to the logging facility
Pointer to the shared memory controller instance for this process
-
std::map<std::string, boost::shared_ptr<FrameProcessorPlugin>> plugins_#
Map of plugins loaded, indexed by plugin index
-
std::map<std::string, std::string> stored_configs_#
Map of stored configuration objects
-
boost::condition_variable exitCondition_#
Condition for exiting this file writing process
-
int shutdownFrameCount#
Frames to write before shutting down - 0 to disable shutdown
-
int totalFrames#
Total frames processed
-
std::string masterFrame#
Master frame specifier - Frame to include in count of total frames processed
-
boost::mutex exitMutex_#
Mutex used for locking the exitCondition
-
bool runThread_#
Used to check for Ipc tick timer termination
-
bool threadRunning_#
Is the main thread running
-
bool threadInitError_#
Did an error occur during the thread initialisation
-
bool pluginShutdownSent_#
Have we sent sent the shutdown command to the plugins
-
bool shutdown_#
Have we successfully shutdown
-
boost::thread ctrlThread_#
Main thread used for control message handling
-
std::string threadInitMsg_#
Store for any messages occurring during thread initialisation
-
boost::shared_ptr<OdinData::IpcReactor> reactor_#
Pointer to the IpcReactor for incoming frame handling
-
std::string ctrlChannelEndpoint_#
End point for control messages
-
OdinData::IpcContext &ipc_context_#
ZMQ context for IPC channels
-
OdinData::IpcChannel ctrlChannel_#
IpcChannel for control messages
-
OdinData::IpcChannel metaRxChannel_#
IpcChannel for meta-data messages
-
std::string metaTxChannelEndpoint_#
End point for publishing meta-data messages
-
OdinData::IpcChannel metaTxChannel_#
IpcChannel for publishing meta-data messages
-
std::string frReadyEndpoint_#
End point for frameReceiver ready channel
-
std::string frReleaseEndpoint_#
End point for frameReceiver release channel
Private Static Attributes
-
static const std::string META_RX_INTERFACE = "inproc://meta_rx"#
Configuration constant for the meta-data Rx interface
-
static const std::string CONFIG_SHUTDOWN#
Configuration constant to shutdown the frame processor
-
static const std::string CONFIG_EOA = "inject_eoa"#
Configuration constant to inject an End Of Acquisition frame into the plugin chain
-
static const std::string CONFIG_DEBUG = "debug_level"#
Configuration constant to set the debug level of the frame processor
-
static const std::string CONFIG_FR_SHARED_MEMORY#
Configuration constant for name of shared memory storage
-
static const std::string CONFIG_FR_RELEASE = "fr_release_cnxn"#
Configuration constant for connection string for frame release
-
static const std::string CONFIG_FR_READY = "fr_ready_cnxn"#
Configuration constant for connection string for frame ready
-
static const std::string CONFIG_FR_SETUP = "fr_setup"#
Configuration constant for executing setup of shared memory interface
-
static const std::string CONFIG_CTRL_ENDPOINT = "ctrl_endpoint"#
Configuration constant for control socket endpoint
-
static const std::string CONFIG_META_ENDPOINT = "meta_endpoint"#
Configuration constant for meta data endpoint
-
static const std::string CONFIG_PLUGIN = "plugin"#
Configuration constant for plugin related items
-
static const std::string CONFIG_PLUGIN_LOAD = "load"#
Configuration constant for listing loaded plugins
-
static const std::string CONFIG_PLUGIN_CONNECT = "connect"#
Configuration constant for connecting plugins
-
static const std::string CONFIG_PLUGIN_DISCONNECT = "disconnect"#
Configuration constant for disconnecting plugins
-
static const std::string CONFIG_PLUGIN_DISCONNECT_ALL = "all"#
Configuration keyword for disconnecting all plugins
-
static const std::string CONFIG_PLUGIN_NAME = "name"#
Configuration constant for a plugin name
-
static const std::string CONFIG_PLUGIN_INDEX = "index"#
Configuration constant for a plugin index
-
static const std::string CONFIG_PLUGIN_LIBRARY = "library"#
Configuration constant for a plugin external library
-
static const std::string CONFIG_PLUGIN_CONNECTION = "connection"#
Configuration constant for setting up a plugin connection
-
static const std::string CONFIG_STORE = "store"#
Configuration constant for storing a named configuration object
-
static const std::string CONFIG_EXECUTE = "execute"#
Configuration constant for executing a named configuration object
-
static const std::string CONFIG_INDEX = "index"#
Configuration constant for the name of a stored configuration object
-
static const std::string CONFIG_VALUE = "value"#
Configuration constant for the value of a stored configuration object
-
static const std::string COMMAND_KEY = "command"#
Configuration constant for the a command to execute
-
static const int META_TX_HWM = 10000#
Configuration constant for the meta TX channel high water mark
-
FrameProcessorController(unsigned int num_io_threads = OdinData::Defaults::default_io_threads)#
Plugins#
FrameProcessorPlugin#
-
class FrameProcessorPlugin : public FrameProcessor::IFrameCallback, public OdinData::IVersionedObject, public FrameProcessor::MetaMessagePublisher#
Abstract plugin class, providing the IFrameCallback interface.
All frame processor plugins must subclass this class. It provides the IFrameCallback interface and associated WorkQueue for transferring Frame objects between plugins. It also provides methods for configuring plugins and for retrieving status from plugins.
Subclassed by FrameProcessor::BloscPlugin, FrameProcessor::DummyUDPProcessPlugin, FrameProcessor::FileWriterPlugin, FrameProcessor::GapFillPlugin, FrameProcessor::KafkaProducerPlugin, FrameProcessor::LiveViewPlugin, FrameProcessor::OffsetAdjustmentPlugin, FrameProcessor::ParameterAdjustmentPlugin, FrameProcessor::SumPlugin
Public Functions
-
FrameProcessorPlugin()#
Constructor, initialises name_ and meta data channel.
-
virtual ~FrameProcessorPlugin()#
Destructor
-
void set_name(const std::string &name)#
Set the name of this plugin
- Parameters:
name – [in] - The name.
-
std::string get_name()#
Get the name of this plugin
- Returns:
The name.
-
void set_error(const std::string &msg)#
Set the error state.
Sets an error for this plugin
- Parameters:
msg – [in] - std::string error message.
-
void set_warning(const std::string &msg)#
Set the warning state.
Sets an warning for this plugin
- Parameters:
msg – [in] - std::string warning message.
-
void clear_errors()#
Clear error and warning messages.
-
virtual bool reset_statistics()#
Reset any statistics.
Any counters in the plugin should be reset by this method
-
std::vector<std::string> get_errors()#
Return the current error message.
-
std::vector<std::string> get_warnings()#
Return the current warning message.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Configure the plugin.
In this abstract class the configure method does perform any actions, this should be overridden by subclasses.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Request the plugin’s current configuration.
In this abstract class the request method does perform any actions, this should be overridden by subclasses.
- Parameters:
reply – [out] - Response IpcMessage with current configuration.
-
virtual void execute(const std::string &command, OdinData::IpcMessage &reply)#
Execute a command within the plugin.
In this abstract class the command method does perform any actions, this should be overridden by subclasses.
- Parameters:
command – [in] - String containing the command to execute.
reply – [out] - Response IpcMessage.
-
virtual std::vector<std::string> requestCommands()#
Request the plugin’s supported commands.
In this abstract class the request method does perform any actions, this should be overridden by subclasses.
- Returns:
- Vector containing supported command strings.
-
virtual void status(OdinData::IpcMessage &status)#
Collate status information for the plugin.
The status is added to the status IpcMessage object. In this abstract class the status method does perform any actions, this should be overridden by subclasses.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the status.
-
void add_performance_stats(OdinData::IpcMessage &status)#
Collate performance statistics for the plugin.
The performance metrics are added to the status IpcMessage object.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the performance stats.
-
void reset_performance_stats()#
Reset performance statistics for the plugin.
The performance metrics are reset to zero.
-
void version(OdinData::IpcMessage &status)#
Collate version information for the plugin.
The version information is added to the status IpcMessage object.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the version.
Registers another plugin for frame callbacks from this plugin.
The callback interface (which will be another plugin) is stored in our map, indexed by name. If the callback already exists within our map then this is a no-op.
- Parameters:
name – [in] - Index of the callback (plugin index).
cb – [in] - Pointer to an IFrameCallback interface (plugin).
blocking – [in] - Whether call should block.
-
void remove_callback(const std::string &name)#
Remove a plugin from our callback map.
- Parameters:
name – [in] - Index of the callback (plugin index) to remove.
-
void remove_all_callbacks()#
Remove all registered callbacks for this plugin.
-
void notify_end_of_acquisition()#
Protected Functions
Push the supplied frame to any registered callbacks.
This method calls any blocking callbacks directly and then loops over the map of registered callbacks and places the frame pointer on their worker queue (see IFrameCallback).
- Parameters:
frame – [in] - Pointer to the frame.
Push the supplied frame to a specifically named registered callback.
This method calls the named blocking callback directly or places the frame pointer on the named worker queue (see IFrameCallback).
- Parameters:
plugin_name – [in] - Name of the plugin to send the frame to.
frame – [in] - Pointer to the frame.
Private Functions
We have been called back with a frame from a plugin that we registered with. This method calls the processFrame pure virtual method that must be overridden by any children of this abstract class.
- Parameters:
frame – [in] - Pointer to the frame.
This is called by the callback method when any new frames have arrived and must be overridden by child classes.
- Parameters:
frame – [in] - Pointer to the frame.
-
virtual void process_end_of_acquisition()#
Perform any end of acquisition cleanup.
This default implementation does nothing. Any plugins that want to perform cleanup actions when an end of acquisition notification takes place can override this method.
Private Members
-
LoggerPtr logger_#
Pointer to logger
-
std::string name_#
Name of this plugin
-
std::map<std::string, boost::shared_ptr<IFrameCallback>> callbacks_#
Map of registered plugins for callbacks, indexed by name
-
std::map<std::string, boost::shared_ptr<IFrameCallback>> blocking_callbacks_#
Map of registered plugins for blocking callbacks, indexed by name
-
std::vector<std::string> error_messages_#
Error message array
-
std::vector<std::string> warning_messages_#
Warning message array
-
boost::mutex mutex_#
Mutex to make accessing error_messages_ threadsafe
-
CallDuration process_duration_#
process_frame performance stats
-
FrameProcessorPlugin()#
BloscPlugin#
-
class BloscPlugin : public FrameProcessor::FrameProcessorPlugin#
This is a compression plugin using the Blosc library
When this plugin receives a frame, processFrame is called and the class uses the blosc compression methods to compress the data and output a new, compressed Frame.
Public Functions
-
BloscPlugin()#
The constructor sets up logging used within the class.
-
virtual ~BloscPlugin()#
Destructor.
Compress one frame, return compressed frame.
- Parameters:
src_frame – - source frame to compress
- Returns:
compressed frame
Private Functions
Perform compression on the frame and output a new, compressed Frame.
- Parameters:
frame – [in] - Pointer to a Frame object.
-
virtual void status(OdinData::IpcMessage &status)#
Collate status information for the plugin
- Parameters:
status – - Reference to an IpcMessage value to store the status
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Configure
- Parameters:
config –
reply –
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Get configuration settings for the BloscPlugin
- Parameters:
reply – - Response IpcMessage.
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
-
void update_compression_settings()#
Update the compression settings
-
void *get_buffer(size_t nbytes)#
Return data buffer
- Parameters:
nbytes –
- Returns:
Private Members
-
LoggerPtr logger_#
Pointer to logger
-
boost::recursive_mutex mutex_#
Mutex used to make this class thread safe
-
std::string current_acquisition_#
Current acquisition ID
-
BloscCompressionSettings compression_settings_#
Compression settings
-
BloscCompressionSettings commanded_compression_settings_#
Compression settings for the next acquisition
-
void *data_buffer_ptr_#
Temporary buffer for compressed data
-
size_t data_buffer_size_#
-
BloscPlugin()#
GapFillPlugin#
-
class GapFillPlugin : public FrameProcessor::FrameProcessorPlugin#
Public Functions
-
GapFillPlugin()#
Constructor for this class.
-
virtual ~GapFillPlugin()#
Process received frame.
- Parameters:
frame – [in] - pointer to a frame object.
Check the validity of the configuration against the incoming frame.
Checks the frame size is equal to the grid size X chip size in both directions. Checks the number of gaps correspond to the specified grid size.
- Parameters:
frame – [in] - pointer to a frame object.
- Returns:
verified - true if the configuration is valid, false otherwise.
Insert gaps into the frame according to the grid, gap_x and gap_y values.
A series of copies is made copying each row of each chip into the appropriate destination offset.
- Parameters:
frame – [in] - pointer to a frame object.
- Returns:
gap_frame - pointer to a frame that has gaps inserted.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for this Plugin.
This sets up the Live View Plugin according to the configuration IpcMessage objects that are received.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Public Static Attributes
-
static const std::string CONFIG_GRID_SIZE = "grid_size"#
The required grid for the image [y, x]
-
static const std::string CONFIG_CHIP_SIZE = "chip_size"#
The chip size of the image in pixels [y, x]
-
static const std::string CONFIG_GRID_X_GAPS = "x_gaps"#
The gaps to insert in the x grid direction (must be grid[x] + 1 in dimension
-
static const std::string CONFIG_GRID_Y_GAPS = "y_gaps"#
The gaps to insert in the y grid direction (must be grid[y] + 1 in dimension
Private Functions
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Get the configuration values for this Plugin.
- Parameters:
reply – [out] - Response IpcMessage.
-
GapFillPlugin()#
KafkaProducerPlugin#
-
class KafkaProducerPlugin : public FrameProcessor::FrameProcessorPlugin#
KafkaProducerPlugin integrates Odin with Kafka.
It creates and send messages that contains frame data and metadata to one or more Kafka servers.
Plugin parameters: servers: Kafka broker list using format IP:PORT[,IP2:PORT2,…] Once this is set, the plugin starts delivering to the specified server/s. dataset: Dataset name of frame that will be delivered. Defaults to “data”. topic: Topic name of the queue to send the message to. Defaults to “data”. partition: Partition number. Defaults to RD_KAFKA_PARTITION_UA (automatic partitioning) include_parameters: Boolean indicating if frame parameters should be included in the message. Defaults to true.
Status variables: sent: Number of sent frames. lost: Number of lost frames. ack: Number of acknowledged frames.
Public Functions
-
KafkaProducerPlugin()#
The constructor sets up logging used within the class.
-
~KafkaProducerPlugin()#
The destructor cleans up Kafka handlers.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for this Plugin.
This sets up the Kafka Producer Plugin according to the configuration IpcMessage objects that are received.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Get the configuration values for this Plugin.
- Parameters:
reply – [out] - Response IpcMessage.
-
virtual void status(OdinData::IpcMessage &status)#
Collate status information for the plugin. The status is added to the status IpcMessage object.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the status.
If dataset configured matches, it sends the frame to kafka server/s
- Parameters:
frame – [in] - Pointer to a Frame object.
-
virtual bool reset_statistics()#
Clear frame statistics
-
void destroy_kafka()#
Destroy Kafka handlers for the connection and the topic
-
void poll_delivery_message_report_queue()#
Poll the delivery message report queue, calling the callback function if appropriate.
-
void on_message_ack()#
It updates stats when a message is acknowledged
-
void on_message_error(const char *error)#
It logs an error when a message delivery has failed
-
void configure_kafka_servers(std::string servers)#
Configure Kafka connection handler for the server/s specified.
- Parameters:
servers – [in] - string representing Kafka brokers using format: IP:PORT[,IP2:PORT2,…]
-
void configure_kafka_topic(std::string topic_name)#
Configure Kafka topic handler for the topic specified
- Parameters:
topic_name – [in] - string representing the topic name.
-
void configure_partition(int32_t partition)#
Set Kafka partition to send messages to
If it is not configured, it defaults to automatic partitioning (using the topic’s partitioner function)
- Parameters:
partition – [in] - partition number.
-
void configure_dataset(std::string dataset)#
Configure the dataset that will be published
Create a message with the following structure:
[ json header length (2 bytes) ] + [ json header ] + [ frame data ]
- Parameters:
frame – [in] - Pointer to a Frame object.
nbytes – [out] - Reference to the message size in bytes.
Create and enqueue a frame message to kafka server/s
- Parameters:
frame – [in] - Pointer to a Frame object.
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Private Members
-
LoggerPtr logger_#
Pointer to logger
-
std::string dataset_name_#
Name of the dataset that will be delivered
-
std::string topic_name_#
Topic name identifying the destination queue
-
std::string servers_#
Kafka brokers to connect to
-
int32_t partition_#
Partition number
-
rd_kafka_t *kafka_producer_#
Pointer to a Kafka producer handler
-
rd_kafka_topic_t *kafka_topic_#
Pointer to a Kafka topic handler
-
uint32_t frames_sent_#
Number of sent frames
-
uint32_t frames_lost_#
Number of lost frames
-
uint32_t frames_ack_#
Number of acknowledged frames
-
int polling_timer_id_#
-
boost::recursive_mutex mutex_#
-
bool include_parameters_#
True if frame parameters need to be included in the message header
Private Static Attributes
-
static const std::string CONFIG_SERVERS = "servers"#
Configuration constant for servers parameter
-
static const std::string CONFIG_TOPIC = "topic"#
Configuration constant for topic parameter
-
static const std::string CONFIG_PARTITION = "partition"#
Configuration constant for partition parameter
-
static const std::string CONFIG_DATASET = "dataset"#
Configuration constant for dataset parameter
-
static const std::string CONFIG_INCLUDE_PARAMETERS = "include_parameters"#
Configuration constant for include_parameters
-
KafkaProducerPlugin()#
FileWriterPlugin#
-
class FileWriterPlugin : public FrameProcessor::FrameProcessorPlugin#
Plugin that writes Frame objects to HDF5 files.
This plugin processes Frame objects and can write them to HDF5 files. The plugin can be configured through the Ipc control interface defined in the FileWriterPluginController class. Currently only the raw data is written into datasets. Multiple datasets can be created and the raw data is stored according to the Frame index (or name).
Public Functions
-
explicit FileWriterPlugin()#
Create a FileWriterPlugin with default values. File path is set to default of current directory, and the filename is set to a default.
The writer plugin is also configured to be a single process writer (no other expected writers).
-
virtual ~FileWriterPlugin()#
Destructor.
-
void start_writing()#
Start writing frames to file.
This method checks that the writer is not already writing. Then it creates the datasets required (from their definitions) and creates the HDF5 file ready to write frames. The framesWritten counter is reset to 0.
-
void stop_writing()#
Stop writing frames to file.
This method checks that the writer is currently writing. Then it closes the file and stops writing frames.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for the file writer.
This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_PROCESS - Calls the method processConfig CONFIG_FILE - Calls the method fileConfig CONFIG_DATASET - Calls the method dsetConfig
Checks to see if the number of frames to write has been set. Checks to see if the writer should start or stop writing frames.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Request the plugin’s current configuration.
In this abstract class the request method does perform any actions, this should be overridden by subclasses.
- Parameters:
reply – [out] - Response IpcMessage with current configuration.
-
void configure_process(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for the file writer process count.
This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_PROCESS_NUMBER - Sets the number of writer processes executing CONFIG_PROCESS_RANK - Sets the rank of this process
The configuration is not applied if the writer is currently writing.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
void configure_file(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set file configuration options for the file writer.
This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_FILE_PATH - Sets the path of the file to write to CONFIG_FILE_NAME - Sets the filename of the file to write to
The configuration is not applied if the writer is currently writing.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
void configure_dataset(const std::string &dataset_name, OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set dataset configuration options for the file writer.
This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_DATASET_CMD - Should we create/delete a dataset definition CONFIG_DATASET_NAME - Name of the dataset CONFIG_DATASET_TYPE - Datatype of the dataset CONFIG_DATASET_DIMS - Dimensions of the dataset CONFIG_DATASET_CHUNKS - Chunking parameters of the dataset CONFIG_DATASET_COMPRESSION - Compression of raw data
The configuration is not applied if the writer is currently writing.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
void create_new_dataset(const std::string &dset_name)#
Checks to see if a dataset with the supplied name already exists. If it doesn’t then the dataset definition is created and then added to the store.
- Parameters:
dset_name – [in] - Name of the dataset to create.
-
void delete_datasets()#
Deletes all dataset definitions from the plugin.
-
virtual void status(OdinData::IpcMessage &status)#
Collate status information for the plugin. The status is added to the status IpcMessage object.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the status.
-
void add_file_writing_stats(OdinData::IpcMessage &status)#
Collate file writing statistics for the plugin.
The metrics are added to the status IpcMessage object.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the file writing stats.
-
virtual bool reset_statistics()#
Reset file writing statistics
-
void stop_acquisition()#
Stops the current acquisition and starts the next if it is configured
-
void start_close_file_timeout()#
Starts the close file timeout
-
void run_close_file_timeout()#
Function that is run by the close file timeout thread
This waits until notified to start, and then runs a timer. If the timer times out, then the current acquisition is stopped. If the timer is notified before timing out (by a frame being written) then no action is taken, as it will either start the timer again or go back to the wait for start state, depending on the value of timeoutActive.
-
size_t calc_num_frames(size_t total_frames)#
Calculates the number of frames that this FileWriter can expect to write based on the total number of frames
- Parameters:
totalFrames – [in] - The total number of frames in the acquisition
- Returns:
- The number of frames that this FileWriter is expected to write
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Private Functions
-
FileWriterPlugin(const FileWriterPlugin &src)#
Prevent a copy of the FileWriterPlugin plugin.
- Parameters:
src – [in]
Process an incoming frame.
Checks we have been asked to write frames. If we are in writing mode then the frame is checked for subframes. If subframes are found then writeSubFrames is called. If no subframes are found then writeFrame is called. Finally counters are updated and if the number of required frames has been reached then the stopWriting method is called.
- Parameters:
frame – [in] - Pointer to the Frame object.
-
virtual void process_end_of_acquisition()#
Process an EndOfAcquisitionFrame.
Checks we are writing. If we are in writing mode then the acquisition is stopped and the timeout_active_ flag is set to false.
This function checks the acquisition id of the frame matches that of the current acquisition, subject to two caveats: i) if the frame-aid is “”, it matches anything ii) this function has the side-effect of moving from the current-acq to the next-acq if that helps us match the frame.
The function will set an error if the frame does not match a writing acquisition.
- Parameters:
frame – [in] - Pointer to the Frame object.
Private Members
-
LoggerPtr logger_#
Pointer to logger
-
boost::recursive_mutex mutex_#
Mutex used to make this class thread safe
-
bool writing_#
Is this plugin writing frames to file?
-
size_t concurrent_processes_#
Number of concurrent file writers executing
-
size_t concurrent_rank_#
Rank of this file writer
-
boost::shared_ptr<Acquisition> current_acquisition_#
Details of the acquisition currently being written
-
boost::shared_ptr<Acquisition> next_acquisition_#
Details of the next acquisition to be written
-
std::map<std::string, DatasetDefinition> dataset_defs_#
Map of dataset definitions
-
size_t frames_per_block_#
Number of frames to write consecutively in a file
-
size_t blocks_per_file_#
Number of blocks to write in a file
-
bool use_earliest_hdf5_#
Use the earliest version of hdf5
-
size_t alignment_threshold_#
HDF5 file chunk alignment threshold
-
size_t alignment_value_#
HDF5 file chunk alignment value
-
size_t timeout_period_#
Timeout for closing the file after receiving no data
-
boost::mutex start_timeout_mutex_#
Mutex used to make starting the close file timeout thread safe
-
boost::mutex close_file_mutex_#
Mutex used to make running the close file timeout thread safe
-
boost::condition_variable start_condition_#
Condition variable used to start the close file timeout
-
boost::condition_variable timeout_condition_#
Condition variable used to run the close file timeout
-
bool timeout_active_#
Close file timeout active switch
-
bool timeout_thread_running_#
Close file timeout thread running
-
boost::thread timeout_thread_#
The close file timeout thread
-
uint32_t first_file_index_#
Starting file index (default to 0 index based numbering)
-
bool use_file_numbering_#
Do we use file numbers in the file name construction. Defaults to true
-
std::string file_postfix_#
The optional file postfix to add
-
std::string file_extension_#
The file extension to use
-
std::string master_frame_#
Name of master frame. When a master frame is received frame numbers increment
-
HDF5ErrorDefinition_t hdf5_error_definition_#
HDF5 call warning and error durations
-
HDF5CallDurations_t hdf5_call_durations_#
HDF5 File IO performance stats
Private Static Attributes
-
static const std::string CONFIG_PROCESS = "process"#
Configuration constant for process related items
-
static const std::string CONFIG_PROCESS_NUMBER = "number"#
Configuration constant for number of processes
-
static const std::string CONFIG_PROCESS_RANK = "rank"#
Configuration constant for this process rank
-
static const std::string CONFIG_PROCESS_BLOCKSIZE = "frames_per_block"#
Configuration constant for the number of frames per block
-
static const std::string CONFIG_PROCESS_BLOCKS_PER_FILE = "blocks_per_file"#
Configuration constant for the number of blocks per file
-
static const std::string CONFIG_PROCESS_EARLIEST_VERSION = "earliest_version"#
Configuration constant for using earliest file version
-
static const std::string CONFIG_PROCESS_ALIGNMENT_THRESHOLD = "alignment_threshold"#
Configuration constant for chunk alignment threshold
-
static const std::string CONFIG_PROCESS_ALIGNMENT_VALUE = "alignment_value"#
Configuration constant for chunk alignment value
-
static const std::string CONFIG_FILE = "file"#
Configuration constant for file related items
-
static const std::string CONFIG_FILE_NAME = "name"#
Configuration constant for file name
-
static const std::string CONFIG_FILE_USE_NUMBERS = "use_numbers"#
Configuration constant for using automatic file name numbering
-
static const std::string CONFIG_FILE_NUMBER_START = "first_number"#
Configuration constant for starting file number if using numbering
-
static const std::string CONFIG_FILE_POSTFIX = "postfix"#
Configuration constant for file name postfix (optional)
-
static const std::string CONFIG_FILE_PATH = "path"#
Configuration constant for file path
-
static const std::string CONFIG_FILE_EXTENSION = "extension"#
Configuration constant for file extension
-
static const std::string CONFIG_DATASET = "dataset"#
Configuration constant for dataset related items
-
static const std::string CONFIG_DATASET_TYPE = "datatype"#
Configuration constant for dataset datatype
-
static const std::string CONFIG_DATASET_DIMS = "dims"#
Configuration constant for dataset dimensions
-
static const std::string CONFIG_DATASET_CHUNKS = "chunks"#
Configuration constant for chunking dimensions
-
static const std::string CONFIG_DATASET_COMPRESSION = "compression"#
Configuration constant for data compression
-
static const std::string CONFIG_DATASET_INDEXES = "indexes"#
Configuration constant for data high/low indexes
-
static const std::string CONFIG_DATASET_BLOSC_COMPRESSOR = "blosc_compressor"#
Configurations for Blosc compression
-
static const std::string CONFIG_DATASET_BLOSC_LEVEL = "blosc_level"#
-
static const std::string CONFIG_DATASET_BLOSC_SHUFFLE = "blosc_shuffle"#
-
static const std::string CONFIG_DELETE_DATASETS = "delete_datasets"#
Configuration constant for deleting all datasets
-
static const std::string CONFIG_FRAMES = "frames"#
Configuration constant for number of frames to write
-
static const std::string CONFIG_MASTER_DATASET = "master"#
Configuration constant for master dataset name
-
static const std::string CONFIG_WRITE = "write"#
Configuration constant for starting and stopping writing of frames
-
static const std::string ACQUISITION_ID = "acquisition_id"#
Configuration constant for the acquisition id
-
static const std::string CLOSE_TIMEOUT_PERIOD = "timeout_timer_period"#
Configuration constant for the close file timeout
-
static const std::string START_CLOSE_TIMEOUT = "start_timeout_timer"#
Configuration constant for starting the close file timeout
-
static const std::string CREATE_ERROR_DURATION = "create_error_duration"#
Configuration constant for HDF5 call timeout durations before loggin an error
-
static const std::string WRITE_ERROR_DURATION = "write_error_duration"#
-
static const std::string FLUSH_ERROR_DURATION = "flush_error_duration"#
-
static const std::string CLOSE_ERROR_DURATION = "close_error_duration"#
-
explicit FileWriterPlugin()#
Acquisition#
-
class Acquisition : public FrameProcessor::MetaMessagePublisher#
Public Functions
-
Acquisition(const HDF5ErrorDefinition_t &hdf5_error_definition)#
-
~Acquisition()#
-
std::string get_last_error()#
Returns the last error message that was generated
- Returns:
- The most recently generated error message.
Processes a frame
This method checks that the frame is valid before using an HDF5File to write the frame to file
- Parameters:
frame – [in] - The frame to process
- Returns:
- The Status of the processing.
-
void create_file(size_t file_number, HDF5CallDurations_t &call_durations)#
Creates a file
This method creates a new HDF5File object with the given file_number. The file will be created, the datasets populated within the file, and a meta message sent
- Parameters:
file_number – [in] - The file_number to create a file for
Closes a file
This method closes the file that is currently being written to by the specified HDF5File object and sends off meta data for this event
- Parameters:
file – [in] - The HDF5File to call to close its file
-
void validate_dataset_definition(DatasetDefinition definition)#
Validate dataset definition
Perform checks on the given dataset definition to ensure it is valid before creation
- Parameters:
definition – [in] - The DatasetDefinition to validate
-
bool start_acquisition(size_t concurrent_rank, size_t concurrent_processes, size_t frames_per_block, size_t blocks_per_file, uint32_t starting_file_index, bool use_file_numbers, std::string file_postfix, std::string file_extension, bool use_earliest_hdf5, size_t alignment_threshold, size_t alignment_value, std::string master_frame, HDF5CallDurations_t &call_durations)#
Starts this acquisition, creating the acquisition file, or first file in a series, and publishes meta
- Parameters:
concurrent_rank – [in] - The rank of the processor
concurrent_processes – [in] - The number of processors
frames_per_block – [in] - The number of frames per block
blocks_per_file – [in] - The number of blocks per file
starting_file_index – [in] - The first number in the indexing of filenames
use_file_numbers – [in] - Use file numbers in the file name (or not)
file_postfix – [in] - An additional string placed before any numbering
file_extension – [in] - The file extension to use
use_earliest_hdf5 – [in] - Whether to use an early version of hdf5 library
alignment_threshold – [in] - Alignment threshold for hdf5 chunking
alignment_value – [in] - Alignment value for hdf5 chunking
master_frame – [in] - The master frame dataset name
- Returns:
- true if the acquisition was started successfully
-
void stop_acquisition(HDF5CallDurations_t &call_durations)#
Stops this acquisition, closing off any open files
Check incoming frame is valid for its target dataset.
Check the dimensions, data type and compression of the frame data.
- Parameters:
frame – [in] - Pointer to the Frame object.
- Returns:
- true if the frame was valid
-
size_t get_frame_offset_in_file(size_t frame_offset) const#
Return the dataset offset for the supplied global offset
This method calculates the dataset offset for this frame, within the final file that it will be written to
- Parameters:
frame_offset – [in] - Frame number of the frame.
- Returns:
- the dataset offset for the frame number.
-
size_t get_file_index(size_t frame_offset) const#
Return the file index for the supplied global offset
This method calculates the file index that this frame should be written to
- Parameters:
frame_offset – [in] - Frame number of the frame.
- Returns:
- the file index.
Returns the adjusted offset (index in file) for the Frame
Combines the frame number with the frame offset stored on the frame object to calculate an the adjusted frame offset in the file for this frame
Throws a std::range_error if the applied offset would cause the frame offset to be calculated as a negative number
Returns the dataset offset for frame number (frame_no)
-
boost::shared_ptr<HDF5File> get_file(size_t frame_offset, HDF5CallDurations_t &call_durations)#
Gets the HDF5File object for the given frame
This will depending on variables like the number of frames per block and blocks per file. If the required HDF5File doesn’t currently exist, one will be created. If it’s detected that there are frames which have been missed which would have required a file before this one, those files are created and their files written with blank data
- Parameters:
frame_offset – [in] - The frame offset to get the file for
- Returns:
- The file that should be used to write this frame
-
std::string get_create_meta_header()#
Create the header for the create_file meta message
This includes total frames in order to configure the meta writer
- Returns:
- a string containing the json meta data header
-
std::string get_meta_header()#
Create the standard header for a meta message
- Returns:
- a string containing the json meta data header
-
std::string generate_filename(size_t file_number = 0)#
Generates the filename for the given file number
Appends a postfix string to the filename followed by a 6 digit file number to the configured file name. The string postfix defaults to an empty string if it was not set. File names are 0 indexed by default, but the starting index can be configured. File numbers are not used if the feature is turned off (use_file_numbers_).
If no file name is configured, it uses the acquisition ID and if this is not configured, then the generated filename returned is empty.
- Parameters:
file_number – [in] - The file number to generate the filename for
- Returns:
- The name of the file including extension
Public Members
-
LoggerPtr logger_#
-
std::string master_frame_#
Name of master frame. When a master frame is received frame numbers increment
-
size_t frames_to_write_#
Number of frames to write to file
-
size_t total_frames_#
Total number of frames in acquisition
-
uint32_t starting_file_index_#
Starting file index (default to 0 index based numbering)
-
bool use_file_numbers_#
Do we use file numbers in the file name construction. Defaults to true
-
std::string file_path_#
Path of the file to write to
-
std::string filename_#
Name of the file to write to
-
std::string configured_filename_#
Configured value to be used as the prefix to generate the filename.
-
std::string file_postfix_#
Configured value to be used as extra filename control if required.
-
std::string file_extension_#
File extension to use
-
bool use_earliest_hdf5_#
Use the earliest version of hdf5
-
size_t alignment_threshold_#
HDF5 file chunk alignment threshold
-
size_t alignment_value_#
HDF5 file chunk alignment value
-
std::string acquisition_id_#
Identifier for the acquisition - value sent from a detector/control to be used to identify frames, config or anything else to this acquisition. Used to name the file
-
std::map<std::string, DatasetDefinition> dataset_defs_#
Map of dataset definitions for this acquisition
-
size_t frames_written_#
Number of frames that have been written to file
-
size_t frames_processed_#
Number of frames that have been processed
-
size_t concurrent_processes_#
Number of concurrent file writers executing
-
size_t concurrent_rank_#
Rank of this file writer
-
size_t frames_per_block_#
Number of frames to write consecutively in a file
-
size_t blocks_per_file_#
Number of blocks to write in a file; 0=put all blocks in same file
-
const HDF5ErrorDefinition_t &hdf5_error_definition_#
HDF5 call error definitions
Private Functions
-
void add_uint64_to_document(const std::string &key, size_t value, rapidjson::Document *document) const#
-
void add_string_to_document(const std::string &key, const std::string &value, rapidjson::Document *document) const#
-
std::string document_to_string(rapidjson::Document &document) const#
-
Acquisition(const HDF5ErrorDefinition_t &hdf5_error_definition)#
HDF5File#
-
class HDF5File#
Public Functions
-
HDF5File(const HDF5ErrorDefinition_t &hdf5_error_definition)#
-
~HDF5File()#
-
void hdf_error_handler(unsigned n, const H5E_error2_t *err_desc)#
-
void clear_hdf_errors()#
-
void handle_h5_error(const std::string &message, const std::string &function, const std::string &filename, int line)#
Handles an HDF5 error. Logs the error and throws a runtime exception
- Parameters:
message – [in] - The error message to log
function – [in] - The function that had the error
filename – [in] - The filename
line – [in] - The line number of the call
-
size_t create_file(std::string file_name, size_t file_index, bool use_earliest_version, size_t alignment_threshold, size_t alignment_value)#
Create the HDF5 ready for writing datasets.
- Parameters:
filename – [in] - Full file name of the file to create.
file_index – [in] - File index of the file
use_earliest_version – [in] - Whether to use the earliest version of HDF5 library
alignment_threshold – [in] - Chunk threshold
alignment_value – [in] - Chunk alignment value
- Returns:
- The duration of the H5Fcreate call
-
size_t close_file()#
Close the currently open HDF5 file.
- Returns:
- The hdf5 write metric with the durations of the write and flush calls
-
void create_dataset(const DatasetDefinition &definition, int low_index, int high_index)#
Create a HDF5 dataset from the DatasetDefinition.
- Parameters:
definition – [in] - Reference to the DatasetDefinition.
low_index – [in] - Value of the lowest frame index in the file if in block mode.
high_index – [in] - Value of the highest frame index in the file if in block mode.
-
void write_frame(const Frame &frame, hsize_t frame_offset, uint64_t outer_chunk_dimension, HDF5CallDurations_t &call_durations)#
Write a frame to the file.
- Parameters:
frame – [in] - Reference to the frame
frame_offset – [in] - The offset in the file to write the frame into
outer_chunk_dimension – [in] - The size of the outermost dimension of a chunk
call_durations – [in] - Struct containing hdf5 call durations - write and flush will be updated with the durations of the H5DOwrite_chunk and H5Dflush calls
-
void write_parameter(const Frame &frame, DatasetDefinition dataset_definition, hsize_t frame_offset)#
Write a parameter to the file.
- Parameters:
frame – [in] - Reference to the frame.
dataset_definition – [in] - The dataset definition for this parameter.
frame_offset – [in] - The offset to write the value to
-
size_t get_dataset_frames(const std::string &dset_name)#
Read the current number of frames in an HDF5 dataset (including gaps, up to the highest written offset)
- Parameters:
dataset – [in] - HDF5 dataset
-
size_t get_dataset_max_size(const std::string &dset_name)#
Get the maximum size of the given dataset
- Parameters:
dataset – [in] - HDF5 dataset
- Returns:
- 0 if unlimited_, else the extent of the outermost dimension of the dataset
-
void start_swmr()#
Start SWMR writing
-
size_t get_file_index()#
Get the file index of this file
- Returns:
- the file index (0 indexed)
-
std::string get_filename()#
Get the file name of this file
- Returns:
- the name of the file
-
void set_unlimited()#
Configure datasets to allow extension during write to an unlimited extent
Private Functions
-
HDF5Dataset_t &get_hdf5_dataset(const std::string &dset_name)#
Get a HDF5Dataset_t definition by its name.
The private map of HDF5 dataset definitions is searched and if found the HDF5Dataset_t definition is returned. Throws a runtime error if the dataset cannot be found.
- Parameters:
dset_name – [in] - name of the dataset to search for.
- Returns:
- the dataset definition if found.
-
void extend_dataset(HDF5File::HDF5Dataset_t &dset, size_t frame_no)#
Extend the HDF5 dataset ready for new data
Checks the frame_no is larger than the current dataset dimensions and then sets the extent of the dataset to this new value.
This is used in the case that the final size of the dataset is unknown initially and set to H5S_UNLIMITED.
- Parameters:
dset – [in] - Handle to the HDF5 dataset.
frame_no – [in] - Number of the incoming frame to extend to.
-
hid_t datatype_to_hdf_type(DataType data_type) const#
Convert from a DataType type to the corresponding HDF5 type.
- Parameters:
data_type – [in] - The DataType type to convert.
- Returns:
- the equivalent HDF5 type.
Private Members
-
LoggerPtr logger_#
-
hid_t hdf5_file_id_#
Internal ID of the file being written to
-
bool hdf5_error_flag_#
Internal HDF5 error flag
-
std::vector<H5E_error2_t> hdf5_errors_#
Internal HDF5 error recording
-
std::map<std::string, HDF5Dataset_t> hdf5_datasets_#
Map of datasets that are being written to
-
size_t file_index_#
The index of this file across all processors in the acquisition, 0 indexed
-
std::string filename_#
Full name and path of the file to write to
-
bool use_earliest_version_#
Whether to use the earliest version of the hdf5 library
-
bool unlimited_#
Whether datasets use H5S_UNLIMITED as the outermost dimension extent
-
boost::recursive_mutex mutex_#
Mutex used to make this class thread safe
-
hid_t param_memspace_#
-
std::map<std::string, boost::posix_time::ptime> last_flushed#
-
WatchdogTimer watchdog_timer_#
-
const HDF5ErrorDefinition_t &hdf5_error_definition_#
HDF5 call error definitions
Private Static Attributes
-
static const H5Z_filter_t LZ4_FILTER = (H5Z_filter_t)32004#
Filter definition to write datasets with LZ4 compressed data
-
static const H5Z_filter_t BSLZ4_FILTER = (H5Z_filter_t)32008#
Filter definition to write datasets with bitshuffle processed data
-
static const H5Z_filter_t BLOSC_FILTER = (H5Z_filter_t)32001#
Filter definition to write datasets with Blosc processed data
-
static const int PARAM_FLUSH_RATE = 1000#
Flush rate for parameter datasets in miliseconds
-
struct HDF5Dataset_t#
Struct to keep track of an HDF5 dataset handle and dimensions.
Public Members
-
hid_t dataset_id#
Handle of the dataset
-
std::vector<hsize_t> dataset_dimensions#
Array of dimensions of the dataset
-
std::vector<hsize_t> dataset_offsets#
Array of offsets of the dataset
-
size_t actual_dataset_size_#
Extent of the (outermost dimension of the) dataset that has had frames written to, including any gaps i.e. the highest offset that has been written to + 1
-
hid_t dataset_id#
-
HDF5File(const HDF5ErrorDefinition_t &hdf5_error_definition)#
CallDuration#
WatchdogTimer#
-
class WatchdogTimer#
Public Functions
-
WatchdogTimer(const boost::function<void(const std::string&)> &timeout_callback)#
-
~WatchdogTimer()#
-
void start_timer(const std::string &function_name, unsigned int watchdog_timeout_ms)#
Store the start time and schedule a deadline_timer to print an error
To be called before a function call
- Parameters:
function_name – [in] - Function name for log message
watchdog_timeout_ms – [in] - Timeout for watchdog to log error message
-
unsigned int finish_timer()#
Disable the watchdog, calculate the duration and then log and return
To be called after a function returns
- Returns:
- Duration in microseconds
Private Functions
-
void run()#
Function run by the worker_thread_
This will register a heartbeat timer and then run the IpcReactor
-
void call_timeout_callback(const std::string &function_name) const#
Call the timeout_callback_ with an error message
-
void heartbeat()#
Report the reactor is still alive and stop when flag set
Private Members
-
LoggerPtr logger_#
Logger for logging
-
struct timespec start_time_#
-
boost::thread worker_thread_#
-
bool worker_thread_running_#
Flag to control start up timings of main thread and worker thread
-
OdinData::IpcReactor reactor_#
IpcReactor to use as a simple timer controller
-
unsigned int timeout_#
Timeout of current timer in milliseconds
-
std::string function_name_#
Name of function currently being timed
-
int timer_id_#
ID of current timer to cancel callback
-
int ticks_#
Counter to monitor number of ticks that have passed
-
const boost::function<void(const std::string&)> &timeout_callback_#
Callback function to call when the timer expires
-
WatchdogTimer(const boost::function<void(const std::string&)> &timeout_callback)#
LiveViewPlugin#
-
class LiveViewPlugin : public FrameProcessor::FrameProcessorPlugin#
Public Functions
-
LiveViewPlugin()#
Constructor for this class. Sets up ZMQ pub socket and other default values for the config
-
virtual ~LiveViewPlugin()#
Class Destructor. Closes the Publish socket
Process recieved frame. For the live view plugin, this means checking if certain conditions are true (time elapsed, frame number, dataset name) and then, if the conditions mean sending the frame, creating a json header and copying the data to send to the publisher socket.
- Parameters:
frame – [in] - pointer to a frame object.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for this Plugin.
This sets up the Live View Plugin according to the configuration IpcMessage objects that are received.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
Constructs a header with information about the data frame, then sends that header and the data to the ZMQ socket interface to be consumed by an external live viewer. The Header contains the following:
int32_t Frame number
string Acquisition ID
string Data Type
size_t Data Size
string compression type
size_t[] dimensions
- Parameters:
frame – [in] - pointer to the data frame
frame_num – [in] - the number of the frame
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Public Static Attributes
-
static const int32_t DEFAULT_FRAME_FREQ = 1#
The default value for the Frame Frequency configuration
-
static const int32_t DEFAULT_PER_SECOND = 0#
The default value for the frames per second configuration
-
static const std::string DEFAULT_IMAGE_VIEW_SOCKET_ADDR = "tcp://127.0.0.1:5020"#
The default value for the ZMQ socket address
-
static const std::string DEFAULT_DATASET_NAME = ""#
The default value for the dataset name filter
-
static const std::string DEFAULT_TAGGED_FILTER = ""#
The default value for the Tagged Filter
-
static const std::string CONFIG_FRAME_FREQ = "frame_frequency"#
The name of the Frame Frequency config in the json file
-
static const std::string CONFIG_PER_SECOND = "per_second"#
The name of the Per Second config in the json file
-
static const std::string CONFIG_SOCKET_ADDR = "live_view_socket_addr"#
The name of the Socket Address config in the json file
-
static const std::string CONFIG_DATASET_NAME = "dataset_name"#
The name of the Dataset Name config in the json file
-
static const std::string CONFIG_TAGGED_FILTER_NAME = "filter_tagged"#
The name of the Tagged Filter config in the json file
Private Functions
-
void add_json_member(rapidjson::Document *document, std::string key, std::string value)#
-
void add_json_member(rapidjson::Document *document, std::string key, uint32_t value)#
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Get the configuration values for this Plugin.
- Parameters:
reply – [out] - Response IpcMessage.
-
void set_per_second_config(int32_t value)#
Sets the config value “per_second” which tells the plugin the number of frames to show each second Setting this value to 0 disables the “frames per second” checking
- Parameters:
value – [in] - the value to assign to per_second
-
void set_frame_freq_config(int32_t value)#
Sets the Frame Frequency config value. This value tells the plugin to show every Nth frame. Setting this value to 0 disables this check
- Parameters:
value – [in] - the value to assign to frame_freq
-
void set_socket_addr_config(std::string value)#
Sets the address of the publisher socket that live view data is sent to. When setting this address, it also binds the socket to the address, unbinding it from any previous address given
- Parameters:
value – [in] - the address to bind the socket to.
-
void set_dataset_name_config(std::string value)#
Sets the dataset filter configuration. The live view output can be filtered by the dataset variable on the frame based off a list of desired datasets.
- Parameters:
value – [in] - A comma deliminated list of dataset names, or an empty string to ignore this filtering.
-
void set_tagged_filter_config(std::string value)#
Private Members
-
boost::posix_time::time_duration time_between_frames_#
time between frames in milliseconds, calculated from the per_second config
-
boost::posix_time::ptime time_last_frame_#
time the last frame was shown
-
LoggerPtr logger_#
Pointer to logger
-
std::string image_view_socket_addr_#
address for ZMQ socket for transmitting images
-
int32_t per_second_#
Frames to show per second. Will override the frame_freq if elapsed time between frames gets too big
-
OdinData::IpcChannel publish_socket_#
The socket the live view image frames will be sent to
-
std::map<std::string, int> datasets_#
List of the dataset names to publish. If a frame comes in with a dataset name not on the list it will be ignored
-
std::vector<std::string> tags_#
List of Parameter names to look for. If a frame comes in without one of these tags it will be ignored
-
bool is_bound_#
Boolean that shows if the plugin has a successfully bound ZMQ endpoint
-
LiveViewPlugin()#
OffsetAdjustmentPlugin#
-
class OffsetAdjustmentPlugin : public FrameProcessor::FrameProcessorPlugin#
This plugin class alters the frame offset by a configured amount
Public Functions
-
OffsetAdjustmentPlugin()#
The constructor sets up logging used within the class.
-
virtual ~OffsetAdjustmentPlugin()#
Destructor.
Perform processing on the frame. For the OffsetAdjustmentPlugin class we adjust the frame offset by the configured amount
- Parameters:
frame – [in] - Pointer to a Frame object.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for this Plugin.
This sets up the Offset Adjustment Plugin according to the configuration IpcMessage objects that are received.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Private Functions
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Get the configuration values for this Plugin.
- Parameters:
reply – [out] - Response IpcMessage.
-
OffsetAdjustmentPlugin()#
ParameterAdjustmentPlugin#
-
class ParameterAdjustmentPlugin : public FrameProcessor::FrameProcessorPlugin#
This plugin class alters parameters named in a list by a configured amount added on to the frame number. The parameter will be added to the frame if it doesn’t already exist
Public Functions
-
ParameterAdjustmentPlugin()#
The constructor sets up logging used within the class.
-
virtual ~ParameterAdjustmentPlugin()#
Destructor.
Perform processing on the frame. For the ParameterAdjustmentPlugin class we adjust confiugred parameters by the configured amount, adding the parameter if needed
- Parameters:
frame – [in] - Pointer to a Frame object.
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Set configuration options for this Plugin.
This sets up the Parameter Adjustment Plugin according to the configuration IpcMessage objects that are received.
- Parameters:
config – [in] - IpcMessage containing configuration data.
reply – [out] - Response IpcMessage.
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Private Functions
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Get the configuration values for this Plugin.
- Parameters:
reply – [out] - Response IpcMessage.
-
ParameterAdjustmentPlugin()#
SumPlugin#
-
class SumPlugin : public FrameProcessor::FrameProcessorPlugin#
This plugin class calculates the sum of each pixel and adds it as a parameter
Public Functions
-
SumPlugin()#
The constructor sets up logging used within the class.
-
~SumPlugin()#
Destructor.
Calculate the sum of each pixel based on the data type
- Parameters:
frame – [in] - Pointer to a Frame object.
-
virtual int get_version_major()#
-
virtual int get_version_minor()#
-
virtual int get_version_patch()#
-
virtual std::string get_version_short()#
-
virtual std::string get_version_long()#
Private Members
-
LoggerPtr logger_#
Pointer to logger
-
SumPlugin()#
DummyUDPProcessPlugin#
-
class DummyUDPProcessPlugin : public FrameProcessor::FrameProcessorPlugin#
FrameProcessor plugin for dummy UDP data streams used in integration testing. This plugin processes incoming frames into output 2D images, handling lost packets appropriately. The plugin can be configured to either copy the incoming frame into a new output frame, or set metadata etc on the incoming frame and push that out.
Public Functions
-
DummyUDPProcessPlugin()#
The constructor sets up default configuration parameters and logging used within the class.
-
virtual ~DummyUDPProcessPlugin()#
Destructor.
-
virtual int get_version_major()#
Get the plugin major version number.
- Returns:
major version number as an integer
-
virtual int get_version_minor()#
Get the plugin minor version number.
- Returns:
minor version number as an integer
-
virtual int get_version_patch()#
Get the plugin patch version number.
- Returns:
patch version number as an integer
-
virtual std::string get_version_short()#
Get the plugin short version (e.g. x.y.z) string.
- Returns:
short version as a string
-
virtual std::string get_version_long()#
Get the plugin long version (e.g. x.y.z-qualifier) string.
- Returns:
long version as a string
-
virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#
Configure the plugin. This receives an IpcMessage which should be processed to configure the plugin, and any response can be added to the reply IpcMessage.
- Parameters:
config – [in] - Reference to the configuration IpcMessage object.
reply – [out] - Reference to the reply IpcMessage object.
-
virtual void requestConfiguration(OdinData::IpcMessage &reply)#
Respond to configuration requests from clients.
This method responds to configuration requests from client, populating the supplied IpcMessage reply with configured parameters.
- Parameters:
reply – [inout] - IpcMessage response to client, to be populated with plugin parameters
-
virtual void execute(const std::string &command, OdinData::IpcMessage &reply)#
Execute a command on the plugin. This receives an IpcMessage which should be processed to execute a command within the plugin, and any response can be added to the reply IpcMessage. The dummy plugin implements a single command “print” that prints the value of the parameter named.
- Parameters:
config – [in] - String containing the command to execute.
reply – [out] - Reference to the reply IpcMessage object.
-
virtual std::vector<std::string> requestCommands()#
Respond to command execution requests from clients.
This method responds to command executions requests from client, populating the supplied IpcMessage reply with the commands and command parameters supported by this plugin.
- Returns:
- Vector containing supported command strings.
-
virtual void status(OdinData::IpcMessage &status)#
Collate status information for the plugin. The status is added to the status IpcMessage object.
- Parameters:
status – [out] - Reference to an IpcMessage value to store the status.
-
virtual bool reset_statistics(void)#
Reset process plugin statistics, i.e. counter of packets lost
Private Functions
Perform processing on the frame. For the DummyUDPProcessPlugin class this involves dealing with any lost packets and then simply copying the data buffer into an appropriately dimensioned output frame.
- Parameters:
frame – [in] - Pointer to a Frame object.
Process and report lost UDP packets for the frame
- Parameters:
frame – [in] - Pointer to a Frame object.
Private Members
-
LoggerPtr logger_#
Pointer to logger
-
int image_width_#
Image width
-
int image_height_#
Image height
-
int packets_lost_#
Packet loss counter
-
bool copy_frame_#
Copy frame mode flag
Private Static Attributes
-
static const std::string CONFIG_IMAGE_WIDTH = "width"#
Configuration constant for image width
-
static const std::string CONFIG_IMAGE_HEIGHT = "height"#
Configuration constant for image height
-
static const std::string CONFIG_COPY_FRAME = "copy_frame"#
Configuraiton constant for copy frame mode
-
static const std::string EXECUTE_PRINT = "print"#
Command execution constant for print command
-
DummyUDPProcessPlugin()#
Frames#
-
class Frame#
Interface class for a Frame; all Frames must sub-class this.
Subclassed by FrameProcessor::DataBlockFrame, FrameProcessor::EndOfAcquisitionFrame, FrameProcessor::SharedBufferFrame
Public Functions
-
Frame(const FrameMetaData &meta_data, const size_t &data_size, const int &image_offset = 0)#
Base constructor
Base Frame constructor
- Parameters:
meta-data – - frame FrameMetaData
image_offset – - between start of data memory and image
-
Frame(const Frame &frame)#
Shallow-copy copy
Copy constructor; implement as shallow copy
- Parameters:
frame – - source frame
-
Frame &operator=(const Frame &frame)#
Deep-copy assignment
Assignment operator; implement as deep copy
- Parameters:
frame – - source frame
- Returns:
-
bool is_valid() const#
Return if frame is valid
Return if frame is valid
- Returns:
bool - frame is valid
-
virtual void *get_data_ptr() const = 0#
Return a void pointer to the raw data
-
virtual void *get_image_ptr() const#
Return a void pointer to the image data
Return a void pointer to the image data @ return void pointer to image data
-
virtual bool get_end_of_acquisition() const#
Return if this frame object is an “end of acquisition” frame
Return false to signify that this frame is not an “end of acquisition” frame. Should be overridden by a child class that suppports EOA. @ return end of acquisition
-
size_t get_data_size() const#
Return the data size
Return the data size @ return data size
-
void set_data_size(size_t size)#
Update the data size
Update the data size
- Parameters:
size – new data size
-
long long get_frame_number() const#
Return the frame number
Return the frame number /** Return the frame number
- Returns:
frame frame number
-
void set_frame_number(long long frame_number)#
Set the frame number
Set the frame number
- Parameters:
frame_number – - new frame number
-
FrameMetaData &meta_data()#
Return a reference to the MetaData
Return a reference to the MetaData
- Returns:
reference to meta data
-
const FrameMetaData &get_meta_data() const#
Return the MetaData
Return the MetaData
- Returns:
frame meta data
-
FrameMetaData get_meta_data_copy() const#
Return deep copy of the MetaData
Return deep copy of the MetaData
- Returns:
FrameMetaDatacopy (deep) of the meta data
-
void set_meta_data(const FrameMetaData &meta_data)#
Set MetaData
Set MetaData
- Parameters:
FrameMetaData – meta_data - new meta data
-
int get_image_size() const#
Return the image size
Return the image size
- Returns:
size of data minus offset (header size)
-
void set_image_size(const int &size)#
Set the image size
Set the image size If not called defaults to data_size minus image_offset
- Parameters:
size – the image size
-
void set_image_offset(const int &offset)#
Set the image offset
Set the image offset
- Parameters:
offset – - new offset
-
void set_outer_chunk_size(const int &size)#
Set the outer chunk size
Set the outer chunk size of the frame (number of trigger acqs in chunk)
- Parameters:
size – - outer_chunk_size
-
int get_outer_chunk_size() const#
Set the outer chunk size
Get the outer chunk size of the frame (number of trigger acqs in chunk)
- Returns:
outer_chunk_size
Protected Attributes
-
log4cxx::LoggerPtr logger_#
Pointer to logger
-
FrameMetaData meta_data_#
Frame MetaData
-
size_t data_size_#
Shared memory size
-
int image_size_#
Size of the image
-
int image_offset_#
Offset from frame memory start to image data
-
int outer_chunk_size_#
Outer chunk size of this frame (number of images in this chunk)
-
Frame(const FrameMetaData &meta_data, const size_t &data_size, const int &image_offset = 0)#
FrameMetaData#
-
class FrameMetaData#
Public Functions
-
FrameMetaData(const long long &frame_number, const std::string &dataset_name, const DataType &data_type, const std::string &acquisition_ID, const std::vector<unsigned long long> &dimensions, const CompressionType &compression_type = no_compression)#
-
FrameMetaData()#
-
FrameMetaData(const FrameMetaData &frame)#
-
const std::map<std::string, boost::any> &get_parameters() const#
Return frame parameters
Get frame parameters
- Returns:
std::map <std::string, boost::any> map
-
template<class T>
inline T get_parameter(const std::string ¶meter_name) const# Get frame parameter
- Template Parameters:
T –
- Parameters:
parameter_name –
- Returns:
-
template<class T>
inline void set_parameter(const std::string ¶meter_name, T value)# Set frame parameter
- Template Parameters:
T –
- Parameters:
parameter_name –
value –
-
inline bool has_parameter(const std::string &index) const#
Check if frame has parameter
- Parameters:
index –
- Returns:
-
long long get_frame_number() const#
Return frame number
Return frame number
- Returns:
long long - frame number
-
void set_frame_number(const long long &frame_number)#
Set frame number
Set frame number
- Parameters:
long – long - frame number
-
const std::string &get_dataset_name() const#
Return dataset_name
Return dataset name
- Returns:
std::string - dataset name
-
void set_dataset_name(const std::string &dataset_name)#
Set dataset name
Set dataset name
- Parameters:
std::string – dataset_name - name of dataset
-
DataType get_data_type() const#
Return data type
Return data type
- Returns:
DataType - data type
-
void set_data_type(DataType data_type)#
Set data type
Set data type
- Parameters:
DataType – - data type
-
const std::string &get_acquisition_ID() const#
Return acquisition ID
Return acquistion ID
- Returns:
std::string acquisition ID
-
void set_acquisition_ID(const std::string &acquisition_ID)#
Set acquisition ID
Set acquisition ID
- Parameters:
std::string – acquisition_ID
-
const dimensions_t &get_dimensions() const#
Return dimensions
Return dimensions
- Returns:
dimensions
-
void set_dimensions(const dimensions_t &dimensions)#
Set dimensions
Set dimensions
- Parameters:
dimensions_t – dimensions
-
CompressionType get_compression_type() const#
Return compression type
Return compression type
- Returns:
compression type
-
void set_compression_type(CompressionType compression_type)#
Set compression type
Set compression type @ param compression_type compression type
-
int64_t get_frame_offset() const#
Return frame offset
Return frame offset
- Returns:
int64_t frame offset
-
void set_frame_offset(const int64_t &offset)#
Set frame offset
Set frame offset
- Parameters:
int64_t – new offset
-
void adjust_frame_offset(const int64_t &increment)#
Adjust frame offset by increment
Adjust frame offset by increment
- Parameters:
increment – to adjust offset by
Private Members
-
log4cxx::LoggerPtr logger#
Pointer to logger
-
std::string dataset_name_#
Name of this dataset
-
DataType data_type_#
Data type of raw data
-
std::string acquisition_ID_#
Acquisition ID of the acquisition of this frame
-
dimensions_t dimensions_#
Vector of dimensions
-
CompressionType compression_type_#
Compression type of raw data
-
std::map<std::string, boost::any> parameters_#
Map of parameters
-
FrameMetaData(const long long &frame_number, const std::string &dataset_name, const DataType &data_type, const std::string &acquisition_ID, const std::vector<unsigned long long> &dimensions, const CompressionType &compression_type = no_compression)#
DataBlockFrame#
-
class DataBlockFrame : public FrameProcessor::Frame#
Public Functions
-
DataBlockFrame(const FrameMetaData &meta_data, const void *data_src, size_t block_size, const int &image_offset = 0)#
Construct a DataBlockFrame
DataBlockFrame constructor
- Parameters:
meta_data – - frame FrameMetaData
data_src – - data to copy into block memory
block_size – - size of data in bytes
image_offset – - between start of data memory and image
-
DataBlockFrame(const FrameMetaData &meta_data, size_t block_size, const int &image_offset = 0)#
Construct a DataBlockFrame
DataBlockFrame constructor
- Parameters:
meta_data – - frame FrameMetaData
block_size – - size of data in bytes
image_offset – - between start of data memory and image
-
DataBlockFrame(const DataBlockFrame &frame)#
Shallow-copy copy
Copy constructor; implement as shallow copy
- Parameters:
frame –
-
DataBlockFrame &operator=(DataBlockFrame &frame)#
Deep-copy assignment
Assignment operator; implement as deep copy
- Parameters:
frame –
- Returns:
-
~DataBlockFrame()#
Destructor
Destroy frame
-
virtual void *get_data_ptr() const#
Return a void pointer to the raw data
Return a void pointer to the raw data.
- Returns:
pointer to the raw data.
-
DataBlockFrame(const FrameMetaData &meta_data, const void *data_src, size_t block_size, const int &image_offset = 0)#
DataBlock#
-
class DataBlock#
The DataBlock and DataBlockPool classes provide memory management for data within Frames. Memory is allocated by a data block on construction, and then the data block can be re-used without continually freeing and re- allocating the memory. If a data block is resized then the memory is re-allocated, so data blocks work most efficiently when using the same sized data multiple times. Data can be copied into the allocated block, and a pointer to the raw block is available. Data block memory should NOT be freed outside of the block, when a data block is destroyed it frees its own memory.
Public Functions
-
DataBlock(size_t block_size)#
Construct a data block
Construct a data block, allocating the required memory.
- Parameters:
block_size – [in] - number of bytes to allocate.
-
virtual ~DataBlock()#
Destroy a data block
Destroy a data block, freeing resources.
-
int get_index()#
Return the unique index
Return the unique index of this data block.
- Returns:
- unique index of this data block.
-
size_t get_size()#
Return the size in bytes
Return the size in bytes of this data block.
- Returns:
- size in bytes of this data block.
-
void copy_data(const void *data_src, size_t block_size)#
Copy data from source into block allocated memory
Copy from data source to the allocated memory within this data block. If more bytes are requested to be copied than are available in this block then the copy is truncated to the size of this block.
- Parameters:
data_src – [in] - void pointer to the data source.
block_size – [in] - size of data in bytes to copy.
-
const void *get_data()#
Return a const void pointer to memory block owns
Returns a void pointer to the memory that this data block owns.
- Returns:
- void pointer to memory owned by this data block.
-
void *get_writeable_data()#
Return a non-const pointer to memory block owns
Returns a non-const void pointer to the memory that this data block owns.
- Returns:
- non-const void pointer to memory owned by this data block
Public Static Functions
-
static int get_current_index_count()#
Return the current unique index counter
Returns the current index counter value
- Returns:
- int current index count
Private Functions
-
void resize(size_t block_size)#
Resize the data block
Resize this data block. The current memory allocation will be freed. Then a new memory block will be allocated to the desired size. If resize is called but the same size is requested, then no actual free or reallocation takes place.
- Parameters:
block_size – [in] - new size of this data block.
Private Members
-
log4cxx::LoggerPtr logger_#
Pointer to logger
-
void *block_ptr_#
Void pointer to the allocated memory
Private Static Attributes
-
static int index_counter_ = 0#
Static counter for the unique index
Friends
- friend class DataBlockPool
-
DataBlock(size_t block_size)#
DataBlockPool#
-
class DataBlockPool#
The DataBlock and DataBlockPool classes provide memory management for data within Frames. Memory is allocated by a data block on construction, and then the data block can be re-used without continually freeing and re- allocating the memory. The DataBlockPool provides a singleton class that can be used to access data blocks through shared memory pointers and manages the data blocks to avoid continuous allocating and freeing of memory. The DataBlockPool also contains details of how many blocks are available, in use and the total memory used.
Public Functions
-
virtual ~DataBlockPool()#
Public Static Functions
-
static void allocate(size_t block_count, size_t block_size)#
Static method to force allocation of new DataBlocks which are added to the pool specified by the index parameter.
- Parameters:
block_count – [in] - Number of DataBlocks to allocate.
block_size – [in] - Number of bytes to allocate to each block.
-
static boost::shared_ptr<DataBlock> take(size_t block_size)#
Static method to take a DataBlock from the DataBlockPool specified by the block_size parameter. New DataBlocks will be allocated if necessary.
Static method to release a DataBlock back into the DataBlockPool specified by the block size. Once a DataBlock has been released it will become available for re-use.
- Parameters:
block – [in] - DataBlock to release.
-
static size_t get_free_blocks(size_t block_size)#
Static method that returns the number of free DataBlocks present in the DataBlockPool specified by the block_size parameter.
- Parameters:
block_size – [in] - Index of DataBlockPool to get the free count from.
- Returns:
- Number of free DataBlocks.
-
static size_t get_used_blocks(size_t block_size)#
Static method that returns the number of in-use DataBlocks present in the DataBlockPool specified by the block_size parameter.
- Parameters:
block_size – [in] - Index of DataBlockPool to get the in-use count from.
- Returns:
- Number of in-use DataBlocks.
-
static size_t get_total_blocks(size_t block_size)#
Static method that returns the total number of DataBlocks present in the DataBlockPool specified by the block_size parameter.
- Parameters:
block_size – [in] - Index of DataBlockPool to get the total count from.
- Returns:
- Total number of DataBlocks.
-
static size_t get_memory_allocated(size_t block_size)#
Static method that returns the total number of bytes that have been allocated by the DataBlockPool specified by the index parameter.
- Parameters:
index – [in] - Index of DataBlockPool to get the total bytes allocated from.
- Returns:
- Total number of allocated bytes.
-
static void tearDownClass()#
Delete DataBlockPool instances stored in static class attribute instanceMap_
Private Functions
-
DataBlockPool()#
Construct a DataBlockPool object. The constructor is private, these pool objects can only be constructed from the static methods to enforce only one pool for each index is created.
-
void internal_allocate(size_t block_count, size_t block_size)#
Allocate new DataBlocks to this DataBlockPool. This results in additional memory allocation.
- Parameters:
block_count – [in] - Number of DataBlocks to allocate.
block_size – [in] - Number of bytes to allocate to each block.
-
boost::shared_ptr<DataBlock> internal_take(size_t block_size)#
Take a DataBlock from the DataBlockPool. New DataBlocks will be allocated if necessary.
Release a DataBlock back into the DataBlockPool. Once a DataBlock has been released it will become available for re-use.
- Parameters:
block – [in] - DataBlock to release.
-
size_t internal_get_free_blocks()#
Returns the number of free DataBlocks present in the DataBlockPool.
- Returns:
- Number of free DataBlocks.
-
size_t internal_get_used_blocks()#
Returns the number of in-use DataBlocks present in the DataBlockPool.
- Returns:
- Number of in-use DataBlocks.
-
size_t internal_get_total_blocks()#
Returns the total number of DataBlocks present in the DataBlockPool.
- Returns:
- Total number of DataBlocks.
-
size_t internal_get_memory_allocated()#
Returns the number of bytes allocated by the DataBlockPool.
- Returns:
- Number of allocated bytes.
Private Members
-
log4cxx::LoggerPtr logger_#
Pointer to logger
-
boost::recursive_mutex mutex_#
Mutex used to make this class thread safe
-
std::map<int, boost::shared_ptr<DataBlock>> used_map_#
Map of currently used DataBlock objects, indexed by their unique IDs
-
size_t memory_allocated_#
Total number of bytes allocated (sum of all DataBlocks)
Private Static Functions
-
static DataBlockPool *instance(size_t block_size)#
Static private method that returns a pointer to the DataBlockPool specified by the index parameter. This is private and is used by all of the static access methods. If no DataBlockPool exists for the index provided then a new DataBlockPool is created.
- Parameters:
block_size – [in] - Block size of DataBlockPool to retrieve.
- Returns:
- Pointer to a DataBlockPool instance.
Private Static Attributes
-
static std::map<size_t, DataBlockPool*> instance_map_#
Static map of all DataBlockPool objects, indexed by their names
Container of DataBlockPool instances which can be indexed by name
-
virtual ~DataBlockPool()#
EndOfAcquisitionFrame#
-
class EndOfAcquisitionFrame : public FrameProcessor::Frame#
Public Functions
-
EndOfAcquisitionFrame()#
Construct an End Of AcquisitionFrame
EndOfAcquisitionFrame constructor
-
EndOfAcquisitionFrame(const EndOfAcquisitionFrame &frame)#
Shallow-copy copy
Copy constructor; implement as shallow copy
- Parameters:
frame –
-
EndOfAcquisitionFrame &operator=(EndOfAcquisitionFrame &frame)#
Deep-copy assignment
Assignment operator; implement as deep copy
- Parameters:
frame –
- Returns:
-
~EndOfAcquisitionFrame()#
Destructor
Destroy frame
-
virtual void *get_data_ptr() const#
Return a null pointer (no raw data)
No data ptr so return null
-
virtual bool get_end_of_acquisition() const#
Return confirmation that this is an “end of acquisition” frame
Return true to signify that this frame is an “end of acquisition” frame. @ return end of acquisition
-
EndOfAcquisitionFrame()#