FrameProcessor API Reference#

Controllers#

FrameProcessorController#

class FrameProcessorController : public FrameProcessor::IFrameCallback, public boost::enable_shared_from_this<FrameProcessorController>#

The FrameProcessorController class has overall responsibility for management of the core classes and plugins present within the frame processor application. This class maintains the SharedMemoryController and SharedMemoryParser classes. The class also manages the control IpcChannel, and accepts configuration IpcMessages. The class provides an interface for loading plugins, connecting the plugins together into chains and for configuring the plugins (from the control channel).

The class uses an IpcReactor to manage connections and status updates.

Public Functions

FrameProcessorController(unsigned int num_io_threads = OdinData::Defaults::default_io_threads)#

Construct a new FrameProcessorController class.

The constructor sets up logging used within the class, and starts the IpcReactor thread.

virtual ~FrameProcessorController()#

Destructor.

void handleCtrlChannel()#

Handle an incoming configuration message.

This method is called by the IpcReactor when a configuration IpcMessage has been received. The raw message is read and parsed into an IpcMessage for further processing. The configure method is called, and once configuration has completed a response IpcMessage is sent back on the control channel.

void handleMetaRxChannel()#
void provideStatus(OdinData::IpcMessage &reply)#

Provide status information to requesting clients.

This is called in response to a status request from a connected client. The reply to the request is populated with status information from the shared memory controller and all the plugins currently loaded, and with any error messages currently stored.

Parameters:

reply[inout] - response IPC message to be populated with status parameters

void provideVersion(OdinData::IpcMessage &reply)#

Provide version information to requesting clients.

This is called in response to a version request from a connected client. The reply to the request is populated with version information from application and all the plugins currently loaded.

Parameters:

reply[inout] - response IPC message to be populated with version information

void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for the FrameProcessorController.

Sets up the overall frameProcessor application according to the configuration IpcMessage objects that are received. The objects are searched for: CONFIG_SHUTDOWN - Shuts down the application CONFIG_STATUS - Retrieves status for all plugins and replies CONFIG_CTRL_ENDPOINT - Calls the method setupControlInterface CONFIG_PLUGIN - Calls the method configurePlugin CONFIG_FR_SETUP - Calls the method setupFrameReceiverInterface

The method also searches for configuration objects that have the same index as loaded plugins. If any of these are found the they are passed down to the plugin for execution.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

void requestConfiguration(OdinData::IpcMessage &reply)#

Request the current configuration of the FrameProcessorController.

The method also searches through all loaded plugins. Each plugin is also sent a request for its configuration.

Parameters:

reply[out] - Response IpcMessage with the current configuration.

void execute(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Submit commands to the FrameProcessor plugins.

Submits command(s) to execute on individual plugins if they support commands. The IpcMessage should contain the command name and a structure of any parameters required by the command.

This method searches for command objects that have the same index as loaded plugins. If any of these are found then the commands are passed down to the plugin for execution.

Parameters:
  • config[in] - IpcMessage containing command and any parameter data.

  • reply[out] - Response IpcMessage.

void requestCommands(OdinData::IpcMessage &reply)#

Request the command set supported by this FrameProcessorController and its loaded plugins.

The method searches through all loaded plugins. Each plugin is also sent a request for its supported commands.

Parameters:

reply[out] - Response IpcMessage with the current supported command set.

void resetStatistics(OdinData::IpcMessage &reply)#

Reset statistics on all of the loaded plugins.

The method calls reset statistics on all of the loaded plugins.

Parameters:

reply[out] - Response IpcMessage with the current status.

void configurePlugin(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for the plugins.

Sets up the plugins loaded into the controller according to the configuration IpcMessage objects that are received. The objects are searched for: CONFIG_PLUGIN_LIST - Replies with a list of loaded plugins CONFIG_PLUGIN_LOAD - Uses NAME, INDEX and LIBRARY to load a plugin into the controller. CONFIG_PLUGIN_CONNECT - Uses CONNECTION and INDEX to connect one plugin input to another plugin output. CONFIG_PLUGIN_DISCONNECT - Uses CONNECTION and INDEX to disconnect one plugin from another.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

void loadPlugin(const std::string &index, const std::string &name, const std::string &library)#

Load a new plugin.

Attempts to load the specified library dynamically using the classloader. If the index specified is already used then throws an error. Once the plugin has been loaded it’s processing thread is started. The same plugin type can be loaded multiple times as long as each index is unique.

Parameters:
  • index[in] - Unique index required for the plugin.

  • name[in] - Name of the plugin class.

  • library[in] - Full path of shared library file for the plugin.

void connectPlugin(const std::string &index, const std::string &connectTo)#

Connects two plugins together.

When plugins have been connected they can pass frame objects between them.

Parameters:
  • index[in] - Index of the plugin wanting to connect.

  • connectTo[in] - Index of the plugin to connect to.

void disconnectPlugin(const std::string &index, const std::string &disconnectFrom)#

Disconnect one plugin from another plugin.

Parameters:
  • index[in] - Index of the plugin wanting to disconnect.

  • disconnectFrom[in] - Index of the plugin to disconnect from.

void disconnectAllPlugins()#

Disconnect all plugins from each other.

void run()#
void waitForShutdown()#

Wait for the exit condition before returning.

void shutdown()#

Private Functions

void setupFrameReceiverInterface(const std::string &frPublisherString, const std::string &frSubscriberString)#

Set up the frame receiver interface.

This method creates new SharedMemoryController and SharedMemoryParser objects, which manage the receipt of frame ready notifications and construction of Frame objects from shared memory. Pointers to the two objects are kept by this class.

Parameters:
  • sharedMemName[in] - Name of the shared memory block opened by the frame receiver.

  • frPublisherString[in] - Endpoint for sending frame release notifications.

  • frSubscriberString[in] - Endpoint for receiving frame ready notifications.

void closeFrameReceiverInterface()#

Close the frame receiver interface.

void setupControlInterface(const std::string &ctrlEndpointString)#

Set up the control interface.

This method binds the control IpcChannel to the provided endpoint, creating a socket for controlling applications to connect to. This socket is used for sending configuration IpcMessages.

Parameters:

ctrlEndpointString[in] - Name of the control endpoint.

void closeControlInterface()#

Close the control interface.

void setupMetaRxInterface()#
void closeMetaRxInterface()#
void setupMetaTxInterface(const std::string &metaEndpointString)#
void closeMetaTxInterface()#
void runIpcService(void)#

Start the Ipc service running.

Sets up a tick timer and runs the Ipc reactor. Currently the tick timer does not perform any processing.

void tickTimer(void)#

Tick timer task called by IpcReactor.

This currently performs no processing.

virtual void callback(boost::shared_ptr<Frame> frame)#

Count frames passed through plugin chain and trigger shutdown when expected datasets received.

This is registered as a blocking callback from the end of the plugin chain. It will check for shutdown conditions and then return once all plugins have been notified to shutdown by the main thread.

Parameters:

frame – - Pointer to the frame

Private Members

log4cxx::LoggerPtr logger_#

Pointer to the logging facility

boost::shared_ptr<SharedMemoryController> sharedMemController_#

Pointer to the shared memory controller instance for this process

std::map<std::string, boost::shared_ptr<FrameProcessorPlugin>> plugins_#

Map of plugins loaded, indexed by plugin index

std::map<std::string, std::string> stored_configs_#

Map of stored configuration objects

boost::condition_variable exitCondition_#

Condition for exiting this file writing process

int shutdownFrameCount#

Frames to write before shutting down - 0 to disable shutdown

int totalFrames#

Total frames processed

std::string masterFrame#

Master frame specifier - Frame to include in count of total frames processed

boost::mutex exitMutex_#

Mutex used for locking the exitCondition

bool runThread_#

Used to check for Ipc tick timer termination

bool threadRunning_#

Is the main thread running

bool threadInitError_#

Did an error occur during the thread initialisation

bool pluginShutdownSent_#

Have we sent sent the shutdown command to the plugins

bool shutdown_#

Have we successfully shutdown

boost::thread ctrlThread_#

Main thread used for control message handling

std::string threadInitMsg_#

Store for any messages occurring during thread initialisation

boost::shared_ptr<OdinData::IpcReactor> reactor_#

Pointer to the IpcReactor for incoming frame handling

std::string ctrlChannelEndpoint_#

End point for control messages

OdinData::IpcContext &ipc_context_#

ZMQ context for IPC channels

OdinData::IpcChannel ctrlChannel_#

IpcChannel for control messages

OdinData::IpcChannel metaRxChannel_#

IpcChannel for meta-data messages

std::string metaTxChannelEndpoint_#

End point for publishing meta-data messages

OdinData::IpcChannel metaTxChannel_#

IpcChannel for publishing meta-data messages

std::string frReadyEndpoint_#

End point for frameReceiver ready channel

std::string frReleaseEndpoint_#

End point for frameReceiver release channel

Private Static Attributes

static const std::string META_RX_INTERFACE = "inproc://meta_rx"#

Configuration constant for the meta-data Rx interface

static const std::string CONFIG_SHUTDOWN#

Configuration constant to shutdown the frame processor

static const std::string CONFIG_EOA = "inject_eoa"#

Configuration constant to inject an End Of Acquisition frame into the plugin chain

static const std::string CONFIG_DEBUG = "debug_level"#

Configuration constant to set the debug level of the frame processor

static const std::string CONFIG_FR_SHARED_MEMORY#

Configuration constant for name of shared memory storage

static const std::string CONFIG_FR_RELEASE = "fr_release_cnxn"#

Configuration constant for connection string for frame release

static const std::string CONFIG_FR_READY = "fr_ready_cnxn"#

Configuration constant for connection string for frame ready

static const std::string CONFIG_FR_SETUP = "fr_setup"#

Configuration constant for executing setup of shared memory interface

static const std::string CONFIG_CTRL_ENDPOINT = "ctrl_endpoint"#

Configuration constant for control socket endpoint

static const std::string CONFIG_META_ENDPOINT = "meta_endpoint"#

Configuration constant for meta data endpoint

static const std::string CONFIG_PLUGIN = "plugin"#

Configuration constant for plugin related items

static const std::string CONFIG_PLUGIN_LOAD = "load"#

Configuration constant for listing loaded plugins

static const std::string CONFIG_PLUGIN_CONNECT = "connect"#

Configuration constant for connecting plugins

static const std::string CONFIG_PLUGIN_DISCONNECT = "disconnect"#

Configuration constant for disconnecting plugins

static const std::string CONFIG_PLUGIN_DISCONNECT_ALL = "all"#

Configuration keyword for disconnecting all plugins

static const std::string CONFIG_PLUGIN_NAME = "name"#

Configuration constant for a plugin name

static const std::string CONFIG_PLUGIN_INDEX = "index"#

Configuration constant for a plugin index

static const std::string CONFIG_PLUGIN_LIBRARY = "library"#

Configuration constant for a plugin external library

static const std::string CONFIG_PLUGIN_CONNECTION = "connection"#

Configuration constant for setting up a plugin connection

static const std::string CONFIG_STORE = "store"#

Configuration constant for storing a named configuration object

static const std::string CONFIG_EXECUTE = "execute"#

Configuration constant for executing a named configuration object

static const std::string CONFIG_INDEX = "index"#

Configuration constant for the name of a stored configuration object

static const std::string CONFIG_VALUE = "value"#

Configuration constant for the value of a stored configuration object

static const std::string COMMAND_KEY = "command"#

Configuration constant for the a command to execute

static const int META_TX_HWM = 10000#

Configuration constant for the meta TX channel high water mark

SharedMemoryController#

class SharedMemoryController#

The SharedMemoryController class uses an IpcReactor object which is used to notify this class when new data is available from the frame receiver service. This class also owns an instance of the SharedMemoryParser class, which extracts the data from the shared memory location specified by the incoming IpcMessage objects, constructs a Frame to contain the data and meta data, and then notifies any listening plugins. This class also notifies the frame receiver service once the shared memory location is available for re-use.

Public Functions

SharedMemoryController(boost::shared_ptr<OdinData::IpcReactor> reactor, const std::string &rxEndPoint, const std::string &txEndPoint)#

Constructor.

The constructor sets up logging used within the class. It also creates the rxChannel_ subscribing IpcChannel and registers it with the IpcReactor reactor_. Finally, the txChannel_ publishing IpcChannel is created. rxChannel_ is a ZeroMQ subscriber (listening for frame ready notifications from the frame recevier) and txChannel_ is a ZeroMQ plublisher that sends notifications (of released frames to the frame receiver).

Parameters:
  • reactor[in] - pointer to the IpcReactor object.

  • rxEndPoint[in] - string name of the subscribing endpoint for frame ready notifications.

  • txEndPoint[in] - string name of the publishing endpoint for frame release notifications.

virtual ~SharedMemoryController()#

Destructor.

void setSharedBufferManager(const std::string &shared_buffer_name)#

setSharedBufferManager Takes a name of shared buffer manager and initialises a SharedBufferManager object

Parameters:

shared_buffer_name[in] - name of the shared buffer manager

void requestSharedBufferConfig(const bool deferred = false)#

Request the shared buffer configuration information from the upstream frame receiver process

Requests the name of the shared buffer manager from the upstream frame receiver process by sending an IpcMessage over the frame notification channel. This can be deferred to allow time for the notification channels to be connected, using a single-shot timer registered on the reactor.

Parameters:

deferred[in] - true if the request should be deferred

void registerCallback(const std::string &name, boost::shared_ptr<IFrameCallback> cb)#

Register a callback for Frame updates with this class.

The callback (IFrameCallback subclass) is added to the map of callbacks, indexed by name. Whenever a new Frame object is received from the frame receiver then these callbacks will be called and passed the Frame pointer.

Parameters:
  • name[in] - string index of the callback.

  • cb[in] - IFrameCallback to register for updates.

void removeCallback(const std::string &name)#

Remove a callback from the callback map.

The callback is removed from the map of callbacks.

Parameters:

name[in] - string index of the callback.

void handleRxChannel()#

Called whenever a new IpcMessage is received to notify that a frame is ready.

Reads the raw message bytes from the rxChannel_ and constructs an IpcMessage object from the bytes. Verifies the IpcMessage type and value, and then uses the frame number and buffer ID information to tell the SharedMemoryParser which frame is ready for extraction from shared memory. Loops over registered callbacks and passes the frame to the relevant WorkQueue objects, before sending notifiation that the frame has been released for re-use.

void status(OdinData::IpcMessage &status)#

Collate status information for the plugin. The status is added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the status.

void injectEOA()#

Create an EndOfAcquisitionFrame object and inject it into the plugin chain

Private Members

LoggerPtr logger_#

Pointer to logger

boost::shared_ptr<OdinData::SharedBufferManager> sbm_#

Pointer to SharedBufferManager object

std::map<std::string, boost::shared_ptr<IFrameCallback>> callbacks_#

Map of IFrameCallback pointers, indexed by name

boost::shared_ptr<OdinData::IpcReactor> reactor_#

IpcReactor pointer, for managing IpcMessage objects

OdinData::IpcChannel rxChannel_#

IpcChannel for receiving notifications of new frames

OdinData::IpcChannel txChannel_#

IpcChannel for sending notifications of frame release

bool sharedBufferConfigured_#

Shared buffer configured status flag

bool sharedBufferConfigRequestDeferred_#

Shared buffer config request deferred flag

Private Static Attributes

static const std::string SHARED_MEMORY_CONTROLLER_NAME = "shared_memory"#

Name of class used in status messages

Plugins#

FrameProcessorPlugin#

class FrameProcessorPlugin : public FrameProcessor::IFrameCallback, public OdinData::IVersionedObject, public FrameProcessor::MetaMessagePublisher#

Abstract plugin class, providing the IFrameCallback interface.

All frame processor plugins must subclass this class. It provides the IFrameCallback interface and associated WorkQueue for transferring Frame objects between plugins. It also provides methods for configuring plugins and for retrieving status from plugins.

Subclassed by FrameProcessor::BloscPlugin, FrameProcessor::DummyUDPProcessPlugin, FrameProcessor::FileWriterPlugin, FrameProcessor::GapFillPlugin, FrameProcessor::KafkaProducerPlugin, FrameProcessor::LiveViewPlugin, FrameProcessor::OffsetAdjustmentPlugin, FrameProcessor::ParameterAdjustmentPlugin, FrameProcessor::SumPlugin

Public Functions

FrameProcessorPlugin()#

Constructor, initialises name_ and meta data channel.

virtual ~FrameProcessorPlugin()#

Destructor

void set_name(const std::string &name)#

Set the name of this plugin

Parameters:

name[in] - The name.

std::string get_name()#

Get the name of this plugin

Returns:

The name.

void set_error(const std::string &msg)#

Set the error state.

Sets an error for this plugin

Parameters:

msg[in] - std::string error message.

void set_warning(const std::string &msg)#

Set the warning state.

Sets an warning for this plugin

Parameters:

msg[in] - std::string warning message.

void clear_errors()#

Clear error and warning messages.

virtual bool reset_statistics()#

Reset any statistics.

Any counters in the plugin should be reset by this method

std::vector<std::string> get_errors()#

Return the current error message.

std::vector<std::string> get_warnings()#

Return the current warning message.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Configure the plugin.

In this abstract class the configure method does perform any actions, this should be overridden by subclasses.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Request the plugin’s current configuration.

In this abstract class the request method does perform any actions, this should be overridden by subclasses.

Parameters:

reply[out] - Response IpcMessage with current configuration.

virtual void execute(const std::string &command, OdinData::IpcMessage &reply)#

Execute a command within the plugin.

In this abstract class the command method does perform any actions, this should be overridden by subclasses.

Parameters:
  • command[in] - String containing the command to execute.

  • reply[out] - Response IpcMessage.

virtual std::vector<std::string> requestCommands()#

Request the plugin’s supported commands.

In this abstract class the request method does perform any actions, this should be overridden by subclasses.

Returns:

- Vector containing supported command strings.

virtual void status(OdinData::IpcMessage &status)#

Collate status information for the plugin.

The status is added to the status IpcMessage object. In this abstract class the status method does perform any actions, this should be overridden by subclasses.

Parameters:

status[out] - Reference to an IpcMessage value to store the status.

void add_performance_stats(OdinData::IpcMessage &status)#

Collate performance statistics for the plugin.

The performance metrics are added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the performance stats.

void reset_performance_stats()#

Reset performance statistics for the plugin.

The performance metrics are reset to zero.

void version(OdinData::IpcMessage &status)#

Collate version information for the plugin.

The version information is added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the version.

void register_callback(const std::string &name, boost::shared_ptr<IFrameCallback> cb, bool blocking = false)#

Registers another plugin for frame callbacks from this plugin.

The callback interface (which will be another plugin) is stored in our map, indexed by name. If the callback already exists within our map then this is a no-op.

Parameters:
  • name[in] - Index of the callback (plugin index).

  • cb[in] - Pointer to an IFrameCallback interface (plugin).

  • blocking[in] - Whether call should block.

void remove_callback(const std::string &name)#

Remove a plugin from our callback map.

Parameters:

name[in] - Index of the callback (plugin index) to remove.

void remove_all_callbacks()#

Remove all registered callbacks for this plugin.

void notify_end_of_acquisition()#

Protected Functions

void push(boost::shared_ptr<Frame> frame)#

Push the supplied frame to any registered callbacks.

This method calls any blocking callbacks directly and then loops over the map of registered callbacks and places the frame pointer on their worker queue (see IFrameCallback).

Parameters:

frame[in] - Pointer to the frame.

void push(const std::string &plugin_name, boost::shared_ptr<Frame> frame)#

Push the supplied frame to a specifically named registered callback.

This method calls the named blocking callback directly or places the frame pointer on the named worker queue (see IFrameCallback).

Parameters:
  • plugin_name[in] - Name of the plugin to send the frame to.

  • frame[in] - Pointer to the frame.

Private Functions

virtual void callback(boost::shared_ptr<Frame> frame)#

We have been called back with a frame from a plugin that we registered with. This method calls the processFrame pure virtual method that must be overridden by any children of this abstract class.

Parameters:

frame[in] - Pointer to the frame.

virtual void process_frame(boost::shared_ptr<Frame> frame) = 0#

This is called by the callback method when any new frames have arrived and must be overridden by child classes.

Parameters:

frame[in] - Pointer to the frame.

virtual void process_end_of_acquisition()#

Perform any end of acquisition cleanup.

This default implementation does nothing. Any plugins that want to perform cleanup actions when an end of acquisition notification takes place can override this method.

Private Members

LoggerPtr logger_#

Pointer to logger

std::string name_#

Name of this plugin

std::map<std::string, boost::shared_ptr<IFrameCallback>> callbacks_#

Map of registered plugins for callbacks, indexed by name

std::map<std::string, boost::shared_ptr<IFrameCallback>> blocking_callbacks_#

Map of registered plugins for blocking callbacks, indexed by name

std::vector<std::string> error_messages_#

Error message array

std::vector<std::string> warning_messages_#

Warning message array

boost::mutex mutex_#

Mutex to make accessing error_messages_ threadsafe

CallDuration process_duration_#

process_frame performance stats

BloscPlugin#

class BloscPlugin : public FrameProcessor::FrameProcessorPlugin#

This is a compression plugin using the Blosc library

When this plugin receives a frame, processFrame is called and the class uses the blosc compression methods to compress the data and output a new, compressed Frame.

Public Functions

BloscPlugin()#

The constructor sets up logging used within the class.

virtual ~BloscPlugin()#

Destructor.

boost::shared_ptr<Frame> compress_frame(boost::shared_ptr<Frame> frame)#

Compress one frame, return compressed frame.

Parameters:

src_frame – - source frame to compress

Returns:

compressed frame

Private Functions

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Perform compression on the frame and output a new, compressed Frame.

Parameters:

frame[in] - Pointer to a Frame object.

virtual void status(OdinData::IpcMessage &status)#

Collate status information for the plugin

Parameters:

status – - Reference to an IpcMessage value to store the status

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Configure

Parameters:
  • config

  • reply

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Get configuration settings for the BloscPlugin

Parameters:

reply – - Response IpcMessage.

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#
void update_compression_settings()#

Update the compression settings

void *get_buffer(size_t nbytes)#

Return data buffer

Parameters:

nbytes

Returns:

Private Members

LoggerPtr logger_#

Pointer to logger

boost::recursive_mutex mutex_#

Mutex used to make this class thread safe

std::string current_acquisition_#

Current acquisition ID

BloscCompressionSettings compression_settings_#

Compression settings

BloscCompressionSettings commanded_compression_settings_#

Compression settings for the next acquisition

void *data_buffer_ptr_#

Temporary buffer for compressed data

size_t data_buffer_size_#

Private Static Attributes

static const std::string CONFIG_BLOSC_COMPRESSOR = "compressor"#

Configuration constants

static const std::string CONFIG_BLOSC_THREADS = "threads"#
static const std::string CONFIG_BLOSC_LEVEL = "level"#
static const std::string CONFIG_BLOSC_SHUFFLE = "shuffle"#

GapFillPlugin#

class GapFillPlugin : public FrameProcessor::FrameProcessorPlugin#

Public Functions

GapFillPlugin()#

Constructor for this class.

virtual ~GapFillPlugin()#
virtual void process_frame(boost::shared_ptr<Frame> frame)#

Process received frame.

Parameters:

frame[in] - pointer to a frame object.

bool configuration_valid(boost::shared_ptr<Frame> frame)#

Check the validity of the configuration against the incoming frame.

Checks the frame size is equal to the grid size X chip size in both directions. Checks the number of gaps correspond to the specified grid size.

Parameters:

frame[in] - pointer to a frame object.

Returns:

verified - true if the configuration is valid, false otherwise.

boost::shared_ptr<Frame> insert_gaps(boost::shared_ptr<Frame> frame)#

Insert gaps into the frame according to the grid, gap_x and gap_y values.

A series of copies is made copying each row of each chip into the appropriate destination offset.

Parameters:

frame[in] - pointer to a frame object.

Returns:

gap_frame - pointer to a frame that has gaps inserted.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for this Plugin.

This sets up the Live View Plugin according to the configuration IpcMessage objects that are received.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Public Static Attributes

static const std::string CONFIG_GRID_SIZE = "grid_size"#

The required grid for the image [y, x]

static const std::string CONFIG_CHIP_SIZE = "chip_size"#

The chip size of the image in pixels [y, x]

static const std::string CONFIG_GRID_X_GAPS = "x_gaps"#

The gaps to insert in the x grid direction (must be grid[x] + 1 in dimension

static const std::string CONFIG_GRID_Y_GAPS = "y_gaps"#

The gaps to insert in the y grid direction (must be grid[y] + 1 in dimension

Private Functions

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Get the configuration values for this Plugin.

Parameters:

reply[out] - Response IpcMessage.

Private Members

LoggerPtr logger_#

Pointer to logger

std::vector<int> grid_#
std::vector<int> chip_#
std::vector<int> gaps_x_#
std::vector<int> gaps_y_#

KafkaProducerPlugin#

class KafkaProducerPlugin : public FrameProcessor::FrameProcessorPlugin#

KafkaProducerPlugin integrates Odin with Kafka.

It creates and send messages that contains frame data and metadata to one or more Kafka servers.

Plugin parameters: servers: Kafka broker list using format IP:PORT[,IP2:PORT2,…] Once this is set, the plugin starts delivering to the specified server/s. dataset: Dataset name of frame that will be delivered. Defaults to “data”. topic: Topic name of the queue to send the message to. Defaults to “data”. partition: Partition number. Defaults to RD_KAFKA_PARTITION_UA (automatic partitioning) include_parameters: Boolean indicating if frame parameters should be included in the message. Defaults to true.

Status variables: sent: Number of sent frames. lost: Number of lost frames. ack: Number of acknowledged frames.

Public Functions

KafkaProducerPlugin()#

The constructor sets up logging used within the class.

~KafkaProducerPlugin()#

The destructor cleans up Kafka handlers.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for this Plugin.

This sets up the Kafka Producer Plugin according to the configuration IpcMessage objects that are received.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Get the configuration values for this Plugin.

Parameters:

reply[out] - Response IpcMessage.

virtual void status(OdinData::IpcMessage &status)#

Collate status information for the plugin. The status is added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the status.

virtual void process_frame(boost::shared_ptr<Frame> frame)#

If dataset configured matches, it sends the frame to kafka server/s

Parameters:

frame[in] - Pointer to a Frame object.

virtual bool reset_statistics()#

Clear frame statistics

void destroy_kafka()#

Destroy Kafka handlers for the connection and the topic

void poll_delivery_message_report_queue()#

Poll the delivery message report queue, calling the callback function if appropriate.

void on_message_ack()#

It updates stats when a message is acknowledged

void on_message_error(const char *error)#

It logs an error when a message delivery has failed

void configure_kafka_servers(std::string servers)#

Configure Kafka connection handler for the server/s specified.

Parameters:

servers[in] - string representing Kafka brokers using format: IP:PORT[,IP2:PORT2,…]

void configure_kafka_topic(std::string topic_name)#

Configure Kafka topic handler for the topic specified

Parameters:

topic_name[in] - string representing the topic name.

void configure_partition(int32_t partition)#

Set Kafka partition to send messages to

If it is not configured, it defaults to automatic partitioning (using the topic’s partitioner function)

Parameters:

partition[in] - partition number.

void configure_dataset(std::string dataset)#

Configure the dataset that will be published

void *create_message(boost::shared_ptr<Frame> frame, size_t &nbytes)#

Create a message with the following structure:

[ json header length (2 bytes) ] + [ json header ] + [ frame data ]

Parameters:
  • frame[in] - Pointer to a Frame object.

  • nbytes[out] - Reference to the message size in bytes.

void enqueue_frame(boost::shared_ptr<Frame> frame)#

Create and enqueue a frame message to kafka server/s

Parameters:

frame[in] - Pointer to a Frame object.

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Private Members

LoggerPtr logger_#

Pointer to logger

std::string dataset_name_#

Name of the dataset that will be delivered

std::string topic_name_#

Topic name identifying the destination queue

std::string servers_#

Kafka brokers to connect to

int32_t partition_#

Partition number

rd_kafka_t *kafka_producer_#

Pointer to a Kafka producer handler

rd_kafka_topic_t *kafka_topic_#

Pointer to a Kafka topic handler

uint32_t frames_sent_#

Number of sent frames

uint32_t frames_lost_#

Number of lost frames

uint32_t frames_ack_#

Number of acknowledged frames

int polling_timer_id_#
boost::recursive_mutex mutex_#
bool include_parameters_#

True if frame parameters need to be included in the message header

Private Static Attributes

static const std::string CONFIG_SERVERS = "servers"#

Configuration constant for servers parameter

static const std::string CONFIG_TOPIC = "topic"#

Configuration constant for topic parameter

static const std::string CONFIG_PARTITION = "partition"#

Configuration constant for partition parameter

static const std::string CONFIG_DATASET = "dataset"#

Configuration constant for dataset parameter

static const std::string CONFIG_INCLUDE_PARAMETERS = "include_parameters"#

Configuration constant for include_parameters

FileWriterPlugin#

class FileWriterPlugin : public FrameProcessor::FrameProcessorPlugin#

Plugin that writes Frame objects to HDF5 files.

This plugin processes Frame objects and can write them to HDF5 files. The plugin can be configured through the Ipc control interface defined in the FileWriterPluginController class. Currently only the raw data is written into datasets. Multiple datasets can be created and the raw data is stored according to the Frame index (or name).

Public Functions

explicit FileWriterPlugin()#

Create a FileWriterPlugin with default values. File path is set to default of current directory, and the filename is set to a default.

The writer plugin is also configured to be a single process writer (no other expected writers).

virtual ~FileWriterPlugin()#

Destructor.

void start_writing()#

Start writing frames to file.

This method checks that the writer is not already writing. Then it creates the datasets required (from their definitions) and creates the HDF5 file ready to write frames. The framesWritten counter is reset to 0.

void stop_writing()#

Stop writing frames to file.

This method checks that the writer is currently writing. Then it closes the file and stops writing frames.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for the file writer.

This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_PROCESS - Calls the method processConfig CONFIG_FILE - Calls the method fileConfig CONFIG_DATASET - Calls the method dsetConfig

Checks to see if the number of frames to write has been set. Checks to see if the writer should start or stop writing frames.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Request the plugin’s current configuration.

In this abstract class the request method does perform any actions, this should be overridden by subclasses.

Parameters:

reply[out] - Response IpcMessage with current configuration.

virtual void execute(const std::string &command, OdinData::IpcMessage &reply)#

Execute a command within the plugin.

In this abstract class the command method does perform any actions, this should be overridden by subclasses.

Parameters:
  • command[in] - String containing the command to execute.

  • reply[out] - Response IpcMessage.

virtual std::vector<std::string> requestCommands()#

Request the plugin’s supported commands.

In this abstract class the request method does perform any actions, this should be overridden by subclasses.

Returns:

- Vector containing supported command strings.

void configure_process(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for the file writer process count.

This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_PROCESS_NUMBER - Sets the number of writer processes executing CONFIG_PROCESS_RANK - Sets the rank of this process

The configuration is not applied if the writer is currently writing.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

void configure_file(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set file configuration options for the file writer.

This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_FILE_PATH - Sets the path of the file to write to CONFIG_FILE_PREFIX - Sets the filename of the file to write to

The configuration is not applied if the writer is currently writing.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

void configure_dataset(const std::string &dataset_name, OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set dataset configuration options for the file writer.

This sets up the file writer plugin according to the configuration IpcMessage objects that are received. The options are searched for: CONFIG_DATASET_CMD - Should we create/delete a dataset definition CONFIG_DATASET_NAME - Name of the dataset CONFIG_DATASET_TYPE - Datatype of the dataset CONFIG_DATASET_DIMS - Dimensions of the dataset CONFIG_DATASET_CHUNKS - Chunking parameters of the dataset CONFIG_DATASET_COMPRESSION - Compression of raw data

The configuration is not applied if the writer is currently writing.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

void create_new_dataset(const std::string &dset_name)#

Checks to see if a dataset with the supplied name already exists. If it doesn’t then the dataset definition is created and then added to the store.

Parameters:

dset_name[in] - Name of the dataset to create.

void delete_datasets()#

Deletes all dataset definitions from the plugin.

virtual void status(OdinData::IpcMessage &status)#

Collate status information for the plugin. The status is added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the status.

void add_file_writing_stats(OdinData::IpcMessage &status)#

Collate file writing statistics for the plugin.

The metrics are added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the file writing stats.

virtual bool reset_statistics()#

Reset file writing statistics

void stop_acquisition()#

Stops the current acquisition and starts the next if it is configured

void start_close_file_timeout()#

Starts the close file timeout

void run_close_file_timeout()#

Function that is run by the close file timeout thread

This waits until notified to start, and then runs a timer. If the timer times out, then the current acquisition is stopped. If the timer is notified before timing out (by a frame being written) then no action is taken, as it will either start the timer again or go back to the wait for start state, depending on the value of timeoutActive.

size_t calc_num_frames(size_t total_frames)#

Calculates the number of frames that this FileWriter can expect to write based on the total number of frames

Parameters:

totalFrames[in] - The total number of frames in the acquisition

Returns:

- The number of frames that this FileWriter is expected to write

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Private Functions

FileWriterPlugin(const FileWriterPlugin &src)#

Prevent a copy of the FileWriterPlugin plugin.

Parameters:

src[in]

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Process an incoming frame.

Checks we have been asked to write frames. If we are in writing mode then the frame is checked for subframes. If subframes are found then writeSubFrames is called. If no subframes are found then writeFrame is called. Finally counters are updated and if the number of required frames has been reached then the stopWriting method is called.

Parameters:

frame[in] - Pointer to the Frame object.

virtual void process_end_of_acquisition()#

Process an EndOfAcquisitionFrame.

Checks we are writing. If we are in writing mode then the acquisition is stopped and the timeout_active_ flag is set to false.

bool frame_in_acquisition(boost::shared_ptr<Frame> frame)#

This function checks the acquisition id of the frame matches that of the current acquisition, subject to two caveats: i) if the frame-aid is “”, it matches anything ii) this function has the side-effect of moving from the current-acq to the next-acq if that helps us match the frame.

The function will set an error if the frame does not match a writing acquisition.

Parameters:

frame[in] - Pointer to the Frame object.

Private Members

LoggerPtr logger_#

Pointer to logger

boost::recursive_mutex mutex_#

Mutex used to make this class thread safe

bool writing_#

Is this plugin writing frames to file?

size_t concurrent_processes_#

Number of concurrent file writers executing

size_t concurrent_rank_#

Rank of this file writer

boost::shared_ptr<Acquisition> current_acquisition_#

Details of the acquisition currently being written

boost::shared_ptr<Acquisition> next_acquisition_#

Details of the next acquisition to be written

std::map<std::string, DatasetDefinition> dataset_defs_#

Map of dataset definitions

size_t frames_per_block_#

Number of frames to write consecutively in a file

size_t blocks_per_file_#

Number of blocks to write in a file

bool use_earliest_hdf5_#

Use the earliest version of hdf5

size_t alignment_threshold_#

HDF5 file chunk alignment threshold

size_t alignment_value_#

HDF5 file chunk alignment value

size_t timeout_period_#

Timeout for closing the file after receiving no data

boost::mutex start_timeout_mutex_#

Mutex used to make starting the close file timeout thread safe

boost::mutex close_file_mutex_#

Mutex used to make running the close file timeout thread safe

boost::condition_variable start_condition_#

Condition variable used to start the close file timeout

boost::condition_variable timeout_condition_#

Condition variable used to run the close file timeout

bool timeout_active_#

Close file timeout active switch

bool timeout_thread_running_#

Close file timeout thread running

boost::thread timeout_thread_#

The close file timeout thread

uint32_t first_file_index_#

Starting file index (default to 0 index based numbering)

bool use_file_numbering_#

Do we use file numbers in the file name construction. Defaults to true

std::string file_postfix_#

The optional file postfix to add

std::string file_extension_#

The file extension to use

std::string master_frame_#

Name of master frame. When a master frame is received frame numbers increment

HDF5ErrorDefinition_t hdf5_error_definition_#

HDF5 call warning and error durations

HDF5CallDurations_t hdf5_call_durations_#

HDF5 File IO performance stats

Private Static Attributes

static const std::string CONFIG_PROCESS = "process"#

Configuration constant for process related items

static const std::string CONFIG_PROCESS_NUMBER = "number"#

Configuration constant for number of processes

static const std::string CONFIG_PROCESS_RANK = "rank"#

Configuration constant for this process rank

static const std::string CONFIG_PROCESS_BLOCKSIZE = "frames_per_block"#

Configuration constant for the number of frames per block

static const std::string CONFIG_PROCESS_BLOCKS_PER_FILE = "blocks_per_file"#

Configuration constant for the number of blocks per file

static const std::string CONFIG_PROCESS_EARLIEST_VERSION = "earliest_version"#

Configuration constant for using earliest file version

static const std::string CONFIG_PROCESS_ALIGNMENT_THRESHOLD = "alignment_threshold"#

Configuration constant for chunk alignment threshold

static const std::string CONFIG_PROCESS_ALIGNMENT_VALUE = "alignment_value"#

Configuration constant for chunk alignment value

static const std::string CONFIG_FILE = "file"#

Configuration constant for file related items

static const std::string CONFIG_FILE_PREFIX = "prefix"#

Configuration constant for file name

static const std::string CONFIG_FILE_USE_NUMBERS = "use_numbers"#

Configuration constant for using automatic file name numbering

static const std::string CONFIG_FILE_NUMBER_START = "first_number"#

Configuration constant for starting file number if using numbering

static const std::string CONFIG_FILE_POSTFIX = "postfix"#

Configuration constant for file name postfix (optional)

static const std::string CONFIG_FILE_PATH = "path"#

Configuration constant for file path

static const std::string CONFIG_FILE_EXTENSION = "extension"#

Configuration constant for file extension

static const std::string CONFIG_DATASET = "dataset"#

Configuration constant for dataset related items

static const std::string CONFIG_DATASET_TYPE = "datatype"#

Configuration constant for dataset datatype

static const std::string CONFIG_DATASET_DIMS = "dims"#

Configuration constant for dataset dimensions

static const std::string CONFIG_DATASET_CHUNKS = "chunks"#

Configuration constant for chunking dimensions

static const std::string CONFIG_DATASET_COMPRESSION = "compression"#

Configuration constant for data compression

static const std::string CONFIG_DATASET_INDEXES = "indexes"#

Configuration constant for data high/low indexes

static const std::string CONFIG_DATASET_BLOSC_COMPRESSOR = "blosc_compressor"#

Configurations for Blosc compression

static const std::string CONFIG_DATASET_BLOSC_LEVEL = "blosc_level"#
static const std::string CONFIG_DATASET_BLOSC_SHUFFLE = "blosc_shuffle"#
static const std::string CONFIG_DELETE_DATASETS = "delete_datasets"#

Configuration constant for deleting all datasets

static const std::string CONFIG_FRAMES = "frames"#

Configuration constant for number of frames to write

static const std::string CONFIG_MASTER_DATASET = "master"#

Configuration constant for master dataset name

static const std::string CONFIG_WRITE = "write"#

Configuration constant for starting and stopping writing of frames

static const std::string ACQUISITION_ID = "acquisition_id"#

Configuration constant for the acquisition id

static const std::string CLOSE_TIMEOUT_PERIOD = "timeout_timer_period"#

Configuration constant for the close file timeout

static const std::string START_CLOSE_TIMEOUT = "start_timeout_timer"#

Configuration constant for starting the close file timeout

static const std::string CREATE_ERROR_DURATION = "create_error_duration"#

Configuration constant for HDF5 call timeout durations before loggin an error

static const std::string WRITE_ERROR_DURATION = "write_error_duration"#
static const std::string FLUSH_ERROR_DURATION = "flush_error_duration"#
static const std::string CLOSE_ERROR_DURATION = "close_error_duration"#
static const std::string START_WRITING = "start_writing"#
static const std::string STOP_WRITING = "stop_writing"#

Acquisition#

class Acquisition : public FrameProcessor::MetaMessagePublisher#

Public Functions

Acquisition(const HDF5ErrorDefinition_t &hdf5_error_definition)#
~Acquisition()#
std::string get_last_error()#

Returns the last error message that was generated

Returns:

- The most recently generated error message.

ProcessFrameStatus process_frame(boost::shared_ptr<Frame> frame, HDF5CallDurations_t &call_durations)#

Processes a frame

This method checks that the frame is valid before using an HDF5File to write the frame to file

Parameters:

frame[in] - The frame to process

Returns:

- The Status of the processing.

void create_file(size_t file_number, HDF5CallDurations_t &call_durations)#

Creates a file

This method creates a new HDF5File object with the given file_number. The file will be created, the datasets populated within the file, and a meta message sent

Parameters:

file_number[in] - The file_number to create a file for

void close_file(boost::shared_ptr<HDF5File> file, HDF5CallDurations_t &call_durations)#

Closes a file

This method closes the file that is currently being written to by the specified HDF5File object and sends off meta data for this event

Parameters:

file[in] - The HDF5File to call to close its file

void validate_dataset_definition(DatasetDefinition definition)#

Validate dataset definition

Perform checks on the given dataset definition to ensure it is valid before creation

Parameters:

definition[in] - The DatasetDefinition to validate

bool start_acquisition(size_t concurrent_rank, size_t concurrent_processes, size_t frames_per_block, size_t blocks_per_file, uint32_t starting_file_index, bool use_file_numbers, std::string file_postfix, std::string file_extension, bool use_earliest_hdf5, size_t alignment_threshold, size_t alignment_value, std::string master_frame, HDF5CallDurations_t &call_durations)#

Starts this acquisition, creating the acquisition file, or first file in a series, and publishes meta

Parameters:
  • concurrent_rank[in] - The rank of the processor

  • concurrent_processes[in] - The number of processors

  • frames_per_block[in] - The number of frames per block

  • blocks_per_file[in] - The number of blocks per file

  • starting_file_index[in] - The first number in the indexing of filenames

  • use_file_numbers[in] - Use file numbers in the file name (or not)

  • file_postfix[in] - An additional string placed before any numbering

  • file_extension[in] - The file extension to use

  • use_earliest_hdf5[in] - Whether to use an early version of hdf5 library

  • alignment_threshold[in] - Alignment threshold for hdf5 chunking

  • alignment_value[in] - Alignment value for hdf5 chunking

  • master_frame[in] - The master frame dataset name

Returns:

- true if the acquisition was started successfully

void stop_acquisition(HDF5CallDurations_t &call_durations)#

Stops this acquisition, closing off any open files

bool check_frame_valid(boost::shared_ptr<Frame> frame)#

Check incoming frame is valid for its target dataset.

Check the dimensions, data type and compression of the frame data.

Parameters:

frame[in] - Pointer to the Frame object.

Returns:

- true if the frame was valid

size_t get_frame_offset_in_file(size_t frame_offset) const#

Return the dataset offset for the supplied global offset

This method calculates the dataset offset for this frame, within the final file that it will be written to

Parameters:

frame_offset[in] - Frame number of the frame.

Returns:

- the dataset offset for the frame number.

size_t get_file_index(size_t frame_offset) const#

Return the file index for the supplied global offset

This method calculates the file index that this frame should be written to

Parameters:

frame_offset[in] - Frame number of the frame.

Returns:

- the file index.

size_t adjust_frame_offset(boost::shared_ptr<Frame> frame) const#

Returns the adjusted offset (index in file) for the Frame

Combines the frame number with the frame offset stored on the frame object to calculate an the adjusted frame offset in the file for this frame

Throws a std::range_error if the applied offset would cause the frame offset to be calculated as a negative number

Returns the dataset offset for frame number (frame_no)

boost::shared_ptr<HDF5File> get_file(size_t frame_offset, HDF5CallDurations_t &call_durations)#

Gets the HDF5File object for the given frame

This will depending on variables like the number of frames per block and blocks per file. If the required HDF5File doesn’t currently exist, one will be created. If it’s detected that there are frames which have been missed which would have required a file before this one, those files are created and their files written with blank data

Parameters:

frame_offset[in] - The frame offset to get the file for

Returns:

- The file that should be used to write this frame

std::string get_create_meta_header()#

Create the header for the create_file meta message

This includes total frames in order to configure the meta writer

Returns:

- a string containing the json meta data header

std::string get_meta_header()#

Create the standard header for a meta message

Returns:

- a string containing the json meta data header

std::string generate_filename(size_t file_number = 0)#

Generates the filename for the given file number

Appends a postfix string to the filename followed by a 6 digit file number to the configured file name. The string postfix defaults to an empty string if it was not set. File names are 0 indexed by default, but the starting index can be configured. File numbers are not used if the feature is turned off (use_file_numbers_).

If no file name is configured, it uses the acquisition ID and if this is not configured, then the generated filename returned is empty.

Parameters:

file_number[in] - The file number to generate the filename for

Returns:

- The name of the file including extension

Public Members

LoggerPtr logger_#
std::string master_frame_#

Name of master frame. When a master frame is received frame numbers increment

size_t frames_to_write_#

Number of frames to write to file

size_t total_frames_#

Total number of frames in acquisition

uint32_t starting_file_index_#

Starting file index (default to 0 index based numbering)

bool use_file_numbers_#

Do we use file numbers in the file name construction. Defaults to true

std::string file_path_#

Path of the file to write to

std::string filename_#

Name of the file to write to

std::string configured_filename_#

Configured value to be used as the prefix to generate the filename.

std::string file_postfix_#

Configured value to be used as extra filename control if required.

std::string file_extension_#

File extension to use

bool use_earliest_hdf5_#

Use the earliest version of hdf5

size_t alignment_threshold_#

HDF5 file chunk alignment threshold

size_t alignment_value_#

HDF5 file chunk alignment value

std::string acquisition_id_#

Identifier for the acquisition - value sent from a detector/control to be used to identify frames, config or anything else to this acquisition. Used to name the file

std::map<std::string, DatasetDefinition> dataset_defs_#

Map of dataset definitions for this acquisition

size_t frames_written_#

Number of frames that have been written to file

size_t frames_processed_#

Number of frames that have been processed

size_t concurrent_processes_#

Number of concurrent file writers executing

size_t concurrent_rank_#

Rank of this file writer

size_t frames_per_block_#

Number of frames to write consecutively in a file

size_t blocks_per_file_#

Number of blocks to write in a file; 0=put all blocks in same file

const HDF5ErrorDefinition_t &hdf5_error_definition_#

HDF5 call error definitions

Private Functions

void add_uint64_to_document(const std::string &key, size_t value, rapidjson::Document *document) const#
void add_string_to_document(const std::string &key, const std::string &value, rapidjson::Document *document) const#
std::string document_to_string(rapidjson::Document &document) const#

Private Members

boost::shared_ptr<HDF5File> current_file_#

The current file that frames are being written to

boost::shared_ptr<HDF5File> previous_file_#

The previous file that frames were written to, held in case of late frames

std::string last_error_#

Most recently generated error message

HDF5File#

class HDF5File#

Public Functions

HDF5File(const HDF5ErrorDefinition_t &hdf5_error_definition)#
~HDF5File()#
void hdf_error_handler(unsigned n, const H5E_error2_t *err_desc)#
void clear_hdf_errors()#
void handle_h5_error(const std::string &message, const std::string &function, const std::string &filename, int line)#

Handles an HDF5 error. Logs the error and throws a runtime exception

Parameters:
  • message[in] - The error message to log

  • function[in] - The function that had the error

  • filename[in] - The filename

  • line[in] - The line number of the call

size_t create_file(std::string file_name, size_t file_index, bool use_earliest_version, size_t alignment_threshold, size_t alignment_value)#

Create the HDF5 ready for writing datasets.

Parameters:
  • filename[in] - Full file name of the file to create.

  • file_index[in] - File index of the file

  • use_earliest_version[in] - Whether to use the earliest version of HDF5 library

  • alignment_threshold[in] - Chunk threshold

  • alignment_value[in] - Chunk alignment value

Returns:

- The duration of the H5Fcreate call

size_t close_file()#

Close the currently open HDF5 file.

Returns:

- The hdf5 write metric with the durations of the write and flush calls

void create_dataset(const DatasetDefinition &definition, int low_index, int high_index)#

Create a HDF5 dataset from the DatasetDefinition.

Parameters:
  • definition[in] - Reference to the DatasetDefinition.

  • low_index[in] - Value of the lowest frame index in the file if in block mode.

  • high_index[in] - Value of the highest frame index in the file if in block mode.

void write_frame(const Frame &frame, hsize_t frame_offset, uint64_t outer_chunk_dimension, HDF5CallDurations_t &call_durations)#

Write a frame to the file.

Parameters:
  • frame[in] - Reference to the frame

  • frame_offset[in] - The offset in the file to write the frame into

  • outer_chunk_dimension[in] - The size of the outermost dimension of a chunk

  • call_durations[in] - Struct containing hdf5 call durations - write and flush will be updated with the durations of the H5DOwrite_chunk and H5Dflush calls

void write_parameter(const Frame &frame, DatasetDefinition dataset_definition, hsize_t frame_offset)#

Write a parameter to the file.

Parameters:
  • frame[in] - Reference to the frame.

  • dataset_definition[in] - The dataset definition for this parameter.

  • frame_offset[in] - The offset to write the value to

size_t get_dataset_frames(const std::string &dset_name)#

Read the current number of frames in an HDF5 dataset (including gaps, up to the highest written offset)

Parameters:

dataset[in] - HDF5 dataset

size_t get_dataset_max_size(const std::string &dset_name)#

Get the maximum size of the given dataset

Parameters:

dataset[in] - HDF5 dataset

Returns:

- 0 if unlimited_, else the extent of the outermost dimension of the dataset

void start_swmr()#

Start SWMR writing

size_t get_file_index()#

Get the file index of this file

Returns:

- the file index (0 indexed)

std::string get_filename()#

Get the file name of this file

Returns:

- the name of the file

void set_unlimited()#

Configure datasets to allow extension during write to an unlimited extent

Private Functions

HDF5Dataset_t &get_hdf5_dataset(const std::string &dset_name)#

Get a HDF5Dataset_t definition by its name.

The private map of HDF5 dataset definitions is searched and if found the HDF5Dataset_t definition is returned. Throws a runtime error if the dataset cannot be found.

Parameters:

dset_name[in] - name of the dataset to search for.

Returns:

- the dataset definition if found.

void extend_dataset(HDF5File::HDF5Dataset_t &dset, size_t frame_no)#

Extend the HDF5 dataset ready for new data

Checks the frame_no is larger than the current dataset dimensions and then sets the extent of the dataset to this new value.

This is used in the case that the final size of the dataset is unknown initially and set to H5S_UNLIMITED.

Parameters:
  • dset[in] - Handle to the HDF5 dataset.

  • frame_no[in] - Number of the incoming frame to extend to.

hid_t datatype_to_hdf_type(DataType data_type) const#

Convert from a DataType type to the corresponding HDF5 type.

Parameters:

data_type[in] - The DataType type to convert.

Returns:

- the equivalent HDF5 type.

Private Members

LoggerPtr logger_#
hid_t hdf5_file_id_#

Internal ID of the file being written to

bool hdf5_error_flag_#

Internal HDF5 error flag

std::vector<H5E_error2_t> hdf5_errors_#

Internal HDF5 error recording

std::map<std::string, HDF5Dataset_t> hdf5_datasets_#

Map of datasets that are being written to

size_t file_index_#

The index of this file across all processors in the acquisition, 0 indexed

std::string filename_#

Full name and path of the file to write to

bool use_earliest_version_#

Whether to use the earliest version of the hdf5 library

bool unlimited_#

Whether datasets use H5S_UNLIMITED as the outermost dimension extent

boost::recursive_mutex mutex_#

Mutex used to make this class thread safe

hid_t param_memspace_#
std::map<std::string, boost::posix_time::ptime> last_flushed#
WatchdogTimer watchdog_timer_#
const HDF5ErrorDefinition_t &hdf5_error_definition_#

HDF5 call error definitions

Private Static Attributes

static const H5Z_filter_t LZ4_FILTER = (H5Z_filter_t)32004#

Filter definition to write datasets with LZ4 compressed data

static const H5Z_filter_t BSLZ4_FILTER = (H5Z_filter_t)32008#

Filter definition to write datasets with bitshuffle processed data

static const H5Z_filter_t BLOSC_FILTER = (H5Z_filter_t)32001#

Filter definition to write datasets with Blosc processed data

static const int PARAM_FLUSH_RATE = 1000#

Flush rate for parameter datasets in miliseconds

struct HDF5Dataset_t#

Struct to keep track of an HDF5 dataset handle and dimensions.

Public Members

hid_t dataset_id#

Handle of the dataset

std::vector<hsize_t> dataset_dimensions#

Array of dimensions of the dataset

std::vector<hsize_t> dataset_offsets#

Array of offsets of the dataset

size_t actual_dataset_size_#

Extent of the (outermost dimension of the) dataset that has had frames written to, including any gaps i.e. the highest offset that has been written to + 1

CallDuration#

class CallDuration#

A simple store for call duration metrics.

Durations in microseconds.

Public Functions

void update(unsigned int duration)#

Replace last, replace max if higher and recalculate mean

Parameters:

duration[in] - Duration to update with

void reset()#

Reset all values to 0

Public Members

unsigned int last_#

Last call duration

unsigned int max_#

Maximum call duration

unsigned int mean_#

Mean call duration (exponential average)

WatchdogTimer#

class WatchdogTimer#

Public Functions

WatchdogTimer(const boost::function<void(const std::string&)> &timeout_callback)#
~WatchdogTimer()#
void start_timer(const std::string &function_name, unsigned int watchdog_timeout_ms)#

Store the start time and schedule a deadline_timer to print an error

To be called before a function call

Parameters:
  • function_name[in] - Function name for log message

  • watchdog_timeout_ms[in] - Timeout for watchdog to log error message

unsigned int finish_timer()#

Disable the watchdog, calculate the duration and then log and return

To be called after a function returns

Returns:

- Duration in microseconds

Private Functions

void run()#

Function run by the worker_thread_

This will register a heartbeat timer and then run the IpcReactor

void call_timeout_callback(const std::string &function_name) const#

Call the timeout_callback_ with an error message

void heartbeat()#

Report the reactor is still alive and stop when flag set

Private Members

LoggerPtr logger_#

Logger for logging

struct timespec start_time_#
boost::thread worker_thread_#
bool worker_thread_running_#

Flag to control start up timings of main thread and worker thread

OdinData::IpcReactor reactor_#

IpcReactor to use as a simple timer controller

unsigned int timeout_#

Timeout of current timer in milliseconds

std::string function_name_#

Name of function currently being timed

int timer_id_#

ID of current timer to cancel callback

int ticks_#

Counter to monitor number of ticks that have passed

const boost::function<void(const std::string&)> &timeout_callback_#

Callback function to call when the timer expires

LiveViewPlugin#

class LiveViewPlugin : public FrameProcessor::FrameProcessorPlugin#

Public Functions

LiveViewPlugin()#

Constructor for this class. Sets up ZMQ pub socket and other default values for the config

virtual ~LiveViewPlugin()#

Class Destructor. Closes the Publish socket

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Process recieved frame. For the live view plugin, this means checking if certain conditions are true (time elapsed, frame number, dataset name) and then, if the conditions mean sending the frame, creating a json header and copying the data to send to the publisher socket.

Parameters:

frame[in] - pointer to a frame object.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for this Plugin.

This sets up the Live View Plugin according to the configuration IpcMessage objects that are received.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

void pass_live_frame(boost::shared_ptr<Frame> frame)#

Constructs a header with information about the data frame, then sends that header and the data to the ZMQ socket interface to be consumed by an external live viewer. The Header contains the following:

  • int32_t Frame number

  • string Acquisition ID

  • string Data Type

  • size_t Data Size

  • string compression type

  • size_t[] dimensions

Parameters:
  • frame[in] - pointer to the data frame

  • frame_num[in] - the number of the frame

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Public Static Attributes

static const int32_t DEFAULT_FRAME_FREQ = 1#

The default value for the Frame Frequency configuration

static const int32_t DEFAULT_PER_SECOND = 0#

The default value for the frames per second configuration

static const std::string DEFAULT_IMAGE_VIEW_SOCKET_ADDR = "tcp://127.0.0.1:5020"#

The default value for the ZMQ socket address

static const std::string DEFAULT_DATASET_NAME = ""#

The default value for the dataset name filter

static const std::string DEFAULT_TAGGED_FILTER = ""#

The default value for the Tagged Filter

static const std::string CONFIG_FRAME_FREQ = "frame_frequency"#

The name of the Frame Frequency config in the json file

static const std::string CONFIG_PER_SECOND = "per_second"#

The name of the Per Second config in the json file

static const std::string CONFIG_SOCKET_ADDR = "live_view_socket_addr"#

The name of the Socket Address config in the json file

static const std::string CONFIG_DATASET_NAME = "dataset_name"#

The name of the Dataset Name config in the json file

static const std::string CONFIG_TAGGED_FILTER_NAME = "filter_tagged"#

The name of the Tagged Filter config in the json file

Private Functions

void add_json_member(rapidjson::Document *document, std::string key, std::string value)#
void add_json_member(rapidjson::Document *document, std::string key, uint32_t value)#
virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Get the configuration values for this Plugin.

Parameters:

reply[out] - Response IpcMessage.

void set_per_second_config(int32_t value)#

Sets the config value “per_second” which tells the plugin the number of frames to show each second Setting this value to 0 disables the “frames per second” checking

Parameters:

value[in] - the value to assign to per_second

void set_frame_freq_config(int32_t value)#

Sets the Frame Frequency config value. This value tells the plugin to show every Nth frame. Setting this value to 0 disables this check

Parameters:

value[in] - the value to assign to frame_freq

void set_socket_addr_config(std::string value)#

Sets the address of the publisher socket that live view data is sent to. When setting this address, it also binds the socket to the address, unbinding it from any previous address given

Parameters:

value[in] - the address to bind the socket to.

void set_dataset_name_config(std::string value)#

Sets the dataset filter configuration. The live view output can be filtered by the dataset variable on the frame based off a list of desired datasets.

Parameters:

value[in] - A comma deliminated list of dataset names, or an empty string to ignore this filtering.

void set_tagged_filter_config(std::string value)#

Private Members

boost::posix_time::time_duration time_between_frames_#

time between frames in milliseconds, calculated from the per_second config

boost::posix_time::ptime time_last_frame_#

time the last frame was shown

LoggerPtr logger_#

Pointer to logger

int32_t frame_freq_#

Every Nth Frame to display

std::string image_view_socket_addr_#

address for ZMQ socket for transmitting images

int32_t per_second_#

Frames to show per second. Will override the frame_freq if elapsed time between frames gets too big

OdinData::IpcChannel publish_socket_#

The socket the live view image frames will be sent to

std::map<std::string, int> datasets_#

List of the dataset names to publish. If a frame comes in with a dataset name not on the list it will be ignored

std::vector<std::string> tags_#

List of Parameter names to look for. If a frame comes in without one of these tags it will be ignored

bool is_bound_#

Boolean that shows if the plugin has a successfully bound ZMQ endpoint

Private Static Attributes

static const std::string DATA_TYPES[]#

List of possible dtype strings

static const std::string COMPRESS_TYPES[]#

List of possible compression type strings

OffsetAdjustmentPlugin#

class OffsetAdjustmentPlugin : public FrameProcessor::FrameProcessorPlugin#

This plugin class alters the frame offset by a configured amount

Public Functions

OffsetAdjustmentPlugin()#

The constructor sets up logging used within the class.

virtual ~OffsetAdjustmentPlugin()#

Destructor.

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Perform processing on the frame. For the OffsetAdjustmentPlugin class we adjust the frame offset by the configured amount

Parameters:

frame[in] - Pointer to a Frame object.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for this Plugin.

This sets up the Offset Adjustment Plugin according to the configuration IpcMessage objects that are received.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Private Functions

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Get the configuration values for this Plugin.

Parameters:

reply[out] - Response IpcMessage.

Private Members

LoggerPtr logger_#

Pointer to logger

int64_t offset_adjustment_#

Offset adjustment to use

ParameterAdjustmentPlugin#

class ParameterAdjustmentPlugin : public FrameProcessor::FrameProcessorPlugin#

This plugin class alters parameters named in a list by a configured amount added on to the frame number. The parameter will be added to the frame if it doesn’t already exist

Public Functions

ParameterAdjustmentPlugin()#

The constructor sets up logging used within the class.

virtual ~ParameterAdjustmentPlugin()#

Destructor.

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Perform processing on the frame. For the ParameterAdjustmentPlugin class we adjust confiugred parameters by the configured amount, adding the parameter if needed

Parameters:

frame[in] - Pointer to a Frame object.

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Set configuration options for this Plugin.

This sets up the Parameter Adjustment Plugin according to the configuration IpcMessage objects that are received.

Parameters:
  • config[in] - IpcMessage containing configuration data.

  • reply[out] - Response IpcMessage.

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Private Functions

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Get the configuration values for this Plugin.

Parameters:

reply[out] - Response IpcMessage.

Private Members

LoggerPtr logger_#

Pointer to logger

std::map<std::string, int64_t> parameter_adjustments_#

Map of parameter adjustments to use for each parameter

std::map<std::string, std::string> parameter_inputs_#

Map of input parameters to use for each parameter

SumPlugin#

class SumPlugin : public FrameProcessor::FrameProcessorPlugin#

This plugin class calculates the sum of each pixel and adds it as a parameter

Public Functions

SumPlugin()#

The constructor sets up logging used within the class.

~SumPlugin()#

Destructor.

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Calculate the sum of each pixel based on the data type

Parameters:

frame[in] - Pointer to a Frame object.

virtual int get_version_major()#
virtual int get_version_minor()#
virtual int get_version_patch()#
virtual std::string get_version_short()#
virtual std::string get_version_long()#

Private Members

LoggerPtr logger_#

Pointer to logger

DummyUDPProcessPlugin#

class DummyUDPProcessPlugin : public FrameProcessor::FrameProcessorPlugin#

FrameProcessor plugin for dummy UDP data streams used in integration testing. This plugin processes incoming frames into output 2D images, handling lost packets appropriately. The plugin can be configured to either copy the incoming frame into a new output frame, or set metadata etc on the incoming frame and push that out.

Public Functions

DummyUDPProcessPlugin()#

The constructor sets up default configuration parameters and logging used within the class.

virtual ~DummyUDPProcessPlugin()#

Destructor.

virtual int get_version_major()#

Get the plugin major version number.

Returns:

major version number as an integer

virtual int get_version_minor()#

Get the plugin minor version number.

Returns:

minor version number as an integer

virtual int get_version_patch()#

Get the plugin patch version number.

Returns:

patch version number as an integer

virtual std::string get_version_short()#

Get the plugin short version (e.g. x.y.z) string.

Returns:

short version as a string

virtual std::string get_version_long()#

Get the plugin long version (e.g. x.y.z-qualifier) string.

Returns:

long version as a string

virtual void configure(OdinData::IpcMessage &config, OdinData::IpcMessage &reply)#

Configure the plugin. This receives an IpcMessage which should be processed to configure the plugin, and any response can be added to the reply IpcMessage.

Parameters:
  • config[in] - Reference to the configuration IpcMessage object.

  • reply[out] - Reference to the reply IpcMessage object.

virtual void requestConfiguration(OdinData::IpcMessage &reply)#

Respond to configuration requests from clients.

This method responds to configuration requests from client, populating the supplied IpcMessage reply with configured parameters.

Parameters:

reply[inout] - IpcMessage response to client, to be populated with plugin parameters

virtual void execute(const std::string &command, OdinData::IpcMessage &reply)#

Execute a command on the plugin. This receives an IpcMessage which should be processed to execute a command within the plugin, and any response can be added to the reply IpcMessage. The dummy plugin implements a single command “print” that prints the value of the parameter named.

Parameters:
  • config[in] - String containing the command to execute.

  • reply[out] - Reference to the reply IpcMessage object.

virtual std::vector<std::string> requestCommands()#

Respond to command execution requests from clients.

This method responds to command executions requests from client, populating the supplied IpcMessage reply with the commands and command parameters supported by this plugin.

Returns:

- Vector containing supported command strings.

virtual void status(OdinData::IpcMessage &status)#

Collate status information for the plugin. The status is added to the status IpcMessage object.

Parameters:

status[out] - Reference to an IpcMessage value to store the status.

virtual bool reset_statistics(void)#

Reset process plugin statistics, i.e. counter of packets lost

Private Functions

virtual void process_frame(boost::shared_ptr<Frame> frame)#

Perform processing on the frame. For the DummyUDPProcessPlugin class this involves dealing with any lost packets and then simply copying the data buffer into an appropriately dimensioned output frame.

Parameters:

frame[in] - Pointer to a Frame object.

void process_lost_packets(boost::shared_ptr<Frame> &frame)#

Process and report lost UDP packets for the frame

Parameters:

frame[in] - Pointer to a Frame object.

Private Members

LoggerPtr logger_#

Pointer to logger

int image_width_#

Image width

int image_height_#

Image height

int packets_lost_#

Packet loss counter

bool copy_frame_#

Copy frame mode flag

Private Static Attributes

static const std::string CONFIG_IMAGE_WIDTH = "width"#

Configuration constant for image width

static const std::string CONFIG_IMAGE_HEIGHT = "height"#

Configuration constant for image height

static const std::string CONFIG_COPY_FRAME = "copy_frame"#

Configuraiton constant for copy frame mode

static const std::string EXECUTE_PRINT = "print"#

Command execution constant for print command

Frames#

class Frame#

Interface class for a Frame; all Frames must sub-class this.

Subclassed by FrameProcessor::DataBlockFrame, FrameProcessor::EndOfAcquisitionFrame, FrameProcessor::SharedBufferFrame

Public Functions

Frame(const FrameMetaData &meta_data, const size_t &data_size, const int &image_offset = 0)#

Base constructor

Base Frame constructor

Parameters:
  • meta-data – - frame FrameMetaData

  • image_offset – - between start of data memory and image

Frame(const Frame &frame)#

Shallow-copy copy

Copy constructor; implement as shallow copy

Parameters:

frame – - source frame

Frame &operator=(const Frame &frame)#

Deep-copy assignment

Assignment operator; implement as deep copy

Parameters:

frame – - source frame

Returns:

Frame

bool is_valid() const#

Return if frame is valid

Return if frame is valid

Returns:

bool - frame is valid

virtual void *get_data_ptr() const = 0#

Return a void pointer to the raw data

virtual void *get_image_ptr() const#

Return a void pointer to the image data

Return a void pointer to the image data @ return void pointer to image data

virtual bool get_end_of_acquisition() const#

Return if this frame object is an “end of acquisition” frame

Return false to signify that this frame is not an “end of acquisition” frame. Should be overridden by a child class that suppports EOA. @ return end of acquisition

size_t get_data_size() const#

Return the data size

Return the data size @ return data size

void set_data_size(size_t size)#

Update the data size

Update the data size

Parameters:

size – new data size

long long get_frame_number() const#

Return the frame number

Return the frame number /** Return the frame number

Returns:

frame frame number

void set_frame_number(long long frame_number)#

Set the frame number

Set the frame number

Parameters:

frame_number – - new frame number

FrameMetaData &meta_data()#

Return a reference to the MetaData

Return a reference to the MetaData

Returns:

reference to meta data

const FrameMetaData &get_meta_data() const#

Return the MetaData

Return the MetaData

Returns:

frame meta data

FrameMetaData get_meta_data_copy() const#

Return deep copy of the MetaData

Return deep copy of the MetaData

Returns:

FrameMetaDatacopy (deep) of the meta data

void set_meta_data(const FrameMetaData &meta_data)#

Set MetaData

Set MetaData

Parameters:

FrameMetaData – meta_data - new meta data

int get_image_size() const#

Return the image size

Return the image size

Returns:

size of data minus offset (header size)

void set_image_size(const int &size)#

Set the image size

Set the image size If not called defaults to data_size minus image_offset

Parameters:

size – the image size

void set_image_offset(const int &offset)#

Set the image offset

Set the image offset

Parameters:

offset – - new offset

void set_outer_chunk_size(const int &size)#

Set the outer chunk size

Set the outer chunk size of the frame (number of trigger acqs in chunk)

Parameters:

size – - outer_chunk_size

int get_outer_chunk_size() const#

Set the outer chunk size

Get the outer chunk size of the frame (number of trigger acqs in chunk)

Returns:

outer_chunk_size

Protected Attributes

log4cxx::LoggerPtr logger_#

Pointer to logger

FrameMetaData meta_data_#

Frame MetaData

size_t data_size_#

Shared memory size

int image_size_#

Size of the image

int image_offset_#

Offset from frame memory start to image data

int outer_chunk_size_#

Outer chunk size of this frame (number of images in this chunk)

FrameMetaData#

class FrameMetaData#

Public Functions

FrameMetaData(const long long &frame_number, const std::string &dataset_name, const DataType &data_type, const std::string &acquisition_ID, const std::vector<unsigned long long> &dimensions, const CompressionType &compression_type = no_compression)#
FrameMetaData()#
FrameMetaData(const FrameMetaData &frame)#
const std::map<std::string, boost::any> &get_parameters() const#

Return frame parameters

Get frame parameters

Returns:

std::map <std::string, boost::any> map

template<class T>
inline T get_parameter(const std::string &parameter_name) const#

Get frame parameter

Template Parameters:

T

Parameters:

parameter_name

Returns:

template<class T>
inline void set_parameter(const std::string &parameter_name, T value)#

Set frame parameter

Template Parameters:

T

Parameters:
  • parameter_name

  • value

inline bool has_parameter(const std::string &index) const#

Check if frame has parameter

Parameters:

index

Returns:

long long get_frame_number() const#

Return frame number

Return frame number

Returns:

long long - frame number

void set_frame_number(const long long &frame_number)#

Set frame number

Set frame number

Parameters:

long – long - frame number

const std::string &get_dataset_name() const#

Return dataset_name

Return dataset name

Returns:

std::string - dataset name

void set_dataset_name(const std::string &dataset_name)#

Set dataset name

Set dataset name

Parameters:

std::string – dataset_name - name of dataset

DataType get_data_type() const#

Return data type

Return data type

Returns:

DataType - data type

void set_data_type(DataType data_type)#

Set data type

Set data type

Parameters:

DataType – - data type

const std::string &get_acquisition_ID() const#

Return acquisition ID

Return acquistion ID

Returns:

std::string acquisition ID

void set_acquisition_ID(const std::string &acquisition_ID)#

Set acquisition ID

Set acquisition ID

Parameters:

std::string – acquisition_ID

const dimensions_t &get_dimensions() const#

Return dimensions

Return dimensions

Returns:

dimensions

void set_dimensions(const dimensions_t &dimensions)#

Set dimensions

Set dimensions

Parameters:

dimensions_t – dimensions

CompressionType get_compression_type() const#

Return compression type

Return compression type

Returns:

compression type

void set_compression_type(CompressionType compression_type)#

Set compression type

Set compression type @ param compression_type compression type

int64_t get_frame_offset() const#

Return frame offset

Return frame offset

Returns:

int64_t frame offset

void set_frame_offset(const int64_t &offset)#

Set frame offset

Set frame offset

Parameters:

int64_t – new offset

void adjust_frame_offset(const int64_t &increment)#

Adjust frame offset by increment

Adjust frame offset by increment

Parameters:

increment – to adjust offset by

Private Members

long long frame_number_#

Frame number

log4cxx::LoggerPtr logger#

Pointer to logger

std::string dataset_name_#

Name of this dataset

DataType data_type_#

Data type of raw data

std::string acquisition_ID_#

Acquisition ID of the acquisition of this frame

dimensions_t dimensions_#

Vector of dimensions

CompressionType compression_type_#

Compression type of raw data

std::map<std::string, boost::any> parameters_#

Map of parameters

int64_t frame_offset_#

Frame offset

DataBlockFrame#

class DataBlockFrame : public FrameProcessor::Frame#

Public Functions

DataBlockFrame(const FrameMetaData &meta_data, const void *data_src, size_t block_size, const int &image_offset = 0)#

Construct a DataBlockFrame

DataBlockFrame constructor

Parameters:
  • meta_data – - frame FrameMetaData

  • data_src – - data to copy into block memory

  • block_size – - size of data in bytes

  • image_offset – - between start of data memory and image

DataBlockFrame(const FrameMetaData &meta_data, size_t block_size, const int &image_offset = 0)#

Construct a DataBlockFrame

DataBlockFrame constructor

Parameters:
  • meta_data – - frame FrameMetaData

  • block_size – - size of data in bytes

  • image_offset – - between start of data memory and image

DataBlockFrame(const DataBlockFrame &frame)#

Shallow-copy copy

Copy constructor; implement as shallow copy

Parameters:

frame

DataBlockFrame &operator=(DataBlockFrame &frame)#

Deep-copy assignment

Assignment operator; implement as deep copy

Parameters:

frame

Returns:

Frame

~DataBlockFrame()#

Destructor

Destroy frame

virtual void *get_data_ptr() const#

Return a void pointer to the raw data

Return a void pointer to the raw data.

Returns:

pointer to the raw data.

Private Members

boost::shared_ptr<DataBlock> raw_data_block_ptr_#

Pointer to raw data block

DataBlock#

class DataBlock#

The DataBlock and DataBlockPool classes provide memory management for data within Frames. Memory is allocated by a data block on construction, and then the data block can be re-used without continually freeing and re- allocating the memory. If a data block is resized then the memory is re-allocated, so data blocks work most efficiently when using the same sized data multiple times. Data can be copied into the allocated block, and a pointer to the raw block is available. Data block memory should NOT be freed outside of the block, when a data block is destroyed it frees its own memory.

Public Functions

DataBlock(size_t block_size)#

Construct a data block

Construct a data block, allocating the required memory.

Parameters:

block_size[in] - number of bytes to allocate.

virtual ~DataBlock()#

Destroy a data block

Destroy a data block, freeing resources.

int get_index()#

Return the unique index

Return the unique index of this data block.

Returns:

- unique index of this data block.

size_t get_size()#

Return the size in bytes

Return the size in bytes of this data block.

Returns:

- size in bytes of this data block.

void copy_data(const void *data_src, size_t block_size)#

Copy data from source into block allocated memory

Copy from data source to the allocated memory within this data block. If more bytes are requested to be copied than are available in this block then the copy is truncated to the size of this block.

Parameters:
  • data_src[in] - void pointer to the data source.

  • block_size[in] - size of data in bytes to copy.

const void *get_data()#

Return a const void pointer to memory block owns

Returns a void pointer to the memory that this data block owns.

Returns:

- void pointer to memory owned by this data block.

void *get_writeable_data()#

Return a non-const pointer to memory block owns

Returns a non-const void pointer to the memory that this data block owns.

Returns:

- non-const void pointer to memory owned by this data block

Public Static Functions

static int get_current_index_count()#

Return the current unique index counter

Returns the current index counter value

Returns:

- int current index count

Private Functions

void resize(size_t block_size)#

Resize the data block

Resize this data block. The current memory allocation will be freed. Then a new memory block will be allocated to the desired size. If resize is called but the same size is requested, then no actual free or reallocation takes place.

Parameters:

block_size[in] - new size of this data block.

Private Members

log4cxx::LoggerPtr logger_#

Pointer to logger

size_t allocated_bytes_#

Number of bytes allocated for this DataBlock

int index_#

Unique index of this DataBlock

void *block_ptr_#

Void pointer to the allocated memory

Private Static Attributes

static int index_counter_ = 0#

Static counter for the unique index

Friends

friend class DataBlockPool

DataBlockPool#

class DataBlockPool#

The DataBlock and DataBlockPool classes provide memory management for data within Frames. Memory is allocated by a data block on construction, and then the data block can be re-used without continually freeing and re- allocating the memory. The DataBlockPool provides a singleton class that can be used to access data blocks through shared memory pointers and manages the data blocks to avoid continuous allocating and freeing of memory. The DataBlockPool also contains details of how many blocks are available, in use and the total memory used.

Public Functions

virtual ~DataBlockPool()#

Public Static Functions

static void allocate(size_t block_count, size_t block_size)#

Static method to force allocation of new DataBlocks which are added to the pool specified by the index parameter.

Parameters:
  • block_count[in] - Number of DataBlocks to allocate.

  • block_size[in] - Number of bytes to allocate to each block.

static boost::shared_ptr<DataBlock> take(size_t block_size)#

Static method to take a DataBlock from the DataBlockPool specified by the block_size parameter. New DataBlocks will be allocated if necessary.

Parameters:

block_size[in] - Size of the DataBlock required in bytes.

Returns:

- DataBlock from the available pool.

static void release(boost::shared_ptr<DataBlock> block)#

Static method to release a DataBlock back into the DataBlockPool specified by the block size. Once a DataBlock has been released it will become available for re-use.

Parameters:

block[in] - DataBlock to release.

static size_t get_free_blocks(size_t block_size)#

Static method that returns the number of free DataBlocks present in the DataBlockPool specified by the block_size parameter.

Parameters:

block_size[in] - Index of DataBlockPool to get the free count from.

Returns:

- Number of free DataBlocks.

static size_t get_used_blocks(size_t block_size)#

Static method that returns the number of in-use DataBlocks present in the DataBlockPool specified by the block_size parameter.

Parameters:

block_size[in] - Index of DataBlockPool to get the in-use count from.

Returns:

- Number of in-use DataBlocks.

static size_t get_total_blocks(size_t block_size)#

Static method that returns the total number of DataBlocks present in the DataBlockPool specified by the block_size parameter.

Parameters:

block_size[in] - Index of DataBlockPool to get the total count from.

Returns:

- Total number of DataBlocks.

static size_t get_memory_allocated(size_t block_size)#

Static method that returns the total number of bytes that have been allocated by the DataBlockPool specified by the index parameter.

Parameters:

index[in] - Index of DataBlockPool to get the total bytes allocated from.

Returns:

- Total number of allocated bytes.

static void tearDownClass()#

Delete DataBlockPool instances stored in static class attribute instanceMap_

Private Functions

DataBlockPool()#

Construct a DataBlockPool object. The constructor is private, these pool objects can only be constructed from the static methods to enforce only one pool for each index is created.

void internal_allocate(size_t block_count, size_t block_size)#

Allocate new DataBlocks to this DataBlockPool. This results in additional memory allocation.

Parameters:
  • block_count[in] - Number of DataBlocks to allocate.

  • block_size[in] - Number of bytes to allocate to each block.

boost::shared_ptr<DataBlock> internal_take(size_t block_size)#

Take a DataBlock from the DataBlockPool. New DataBlocks will be allocated if necessary.

Parameters:

block_size[in] - Size of the DataBlock required in bytes.

Returns:

- DataBlock from the available pool.

void internal_release(boost::shared_ptr<DataBlock> block)#

Release a DataBlock back into the DataBlockPool. Once a DataBlock has been released it will become available for re-use.

Parameters:

block[in] - DataBlock to release.

size_t internal_get_free_blocks()#

Returns the number of free DataBlocks present in the DataBlockPool.

Returns:

- Number of free DataBlocks.

size_t internal_get_used_blocks()#

Returns the number of in-use DataBlocks present in the DataBlockPool.

Returns:

- Number of in-use DataBlocks.

size_t internal_get_total_blocks()#

Returns the total number of DataBlocks present in the DataBlockPool.

Returns:

- Total number of DataBlocks.

size_t internal_get_memory_allocated()#

Returns the number of bytes allocated by the DataBlockPool.

Returns:

- Number of allocated bytes.

Private Members

log4cxx::LoggerPtr logger_#

Pointer to logger

boost::recursive_mutex mutex_#

Mutex used to make this class thread safe

std::list<boost::shared_ptr<DataBlock>> free_list_#

List of currently available DataBlock objects

std::map<int, boost::shared_ptr<DataBlock>> used_map_#

Map of currently used DataBlock objects, indexed by their unique IDs

size_t free_blocks_#

Number of currently available DataBlock objects

size_t used_blocks_#

Number of currently used DataBlock objects

size_t total_blocks_#

Total number of DataBlock objects, used + free

size_t memory_allocated_#

Total number of bytes allocated (sum of all DataBlocks)

Private Static Functions

static DataBlockPool *instance(size_t block_size)#

Static private method that returns a pointer to the DataBlockPool specified by the index parameter. This is private and is used by all of the static access methods. If no DataBlockPool exists for the index provided then a new DataBlockPool is created.

Parameters:

block_size[in] - Block size of DataBlockPool to retrieve.

Returns:

- Pointer to a DataBlockPool instance.

Private Static Attributes

static std::map<size_t, DataBlockPool*> instance_map_#

Static map of all DataBlockPool objects, indexed by their names

Container of DataBlockPool instances which can be indexed by name

SharedBufferFrame#

class SharedBufferFrame : public FrameProcessor::Frame#

Public Functions

SharedBufferFrame(const FrameMetaData &meta_data, void *data_src, size_t nbytes, uint64_t bufferID, OdinData::IpcChannel *relCh, const int &image_offset = 0)#

Construct a SharedBufferFrame

SharedBufferFrame(const SharedBufferFrame &frame)#

Shallow-copy copy

Copy constructor; implement as shallow copy

Parameters:

frame

~SharedBufferFrame()#

Destructor

Destroy frame

virtual void *get_data_ptr() const#

Return a void pointer to the raw data

Return a void pointer to the raw data.

Returns:

pointer to the raw data.

Private Members

void *data_ptr_#

Pointer to shared memory raw block

uint64_t shared_id_#

Shared memory buffer ID

OdinData::IpcChannel *shared_channel_#

ZMQ release channel for the shared buffer

EndOfAcquisitionFrame#

class EndOfAcquisitionFrame : public FrameProcessor::Frame#

Public Functions

EndOfAcquisitionFrame()#

Construct an End Of AcquisitionFrame

EndOfAcquisitionFrame constructor

EndOfAcquisitionFrame(const EndOfAcquisitionFrame &frame)#

Shallow-copy copy

Copy constructor; implement as shallow copy

Parameters:

frame

EndOfAcquisitionFrame &operator=(EndOfAcquisitionFrame &frame)#

Deep-copy assignment

Assignment operator; implement as deep copy

Parameters:

frame

Returns:

Frame

~EndOfAcquisitionFrame()#

Destructor

Destroy frame

virtual void *get_data_ptr() const#

Return a null pointer (no raw data)

No data ptr so return null

virtual bool get_end_of_acquisition() const#

Return confirmation that this is an “end of acquisition” frame

Return true to signify that this frame is an “end of acquisition” frame. @ return end of acquisition