libcaf  0.15.5
Public Types | Public Member Functions | Static Public Member Functions | Protected Attributes | List of all members
caf::stream_scatterer_impl Class Reference

Type-erased policy for dispatching data to sinks. More...

#include <stream_scatterer_impl.hpp>

Inheritance diagram for caf::stream_scatterer_impl:
caf::stream_edge_impl< stream_scatterer > caf::stream_scatterer caf::buffered_scatterer< T > caf::topic_scatterer< T, Filter, Select > caf::broadcast_topic_scatterer< T, Filter, Select > caf::random_topic_scatterer< T, Filter, Select >

Public Types

using super = stream_edge_impl< stream_scatterer >
 
- Public Types inherited from caf::stream_edge_impl< stream_scatterer >
using super = stream_scatterer
 
using path_type = typename super::path_type
 Either inbound_path or outbound_path.
 
using path_ptr = path_type *
 A raw pointer to a path.
 
using path_ptr_vec = std::vector< path_ptr >
 Vector of raw pointers (views) of paths.
 
using path_ptr_iter = typename path_ptr_vec::iterator
 Iterator to a vector of raw pointers.
 
using path_uptr = std::unique_ptr< path_type >
 A unique pointer to a path.
 
using path_uptr_vec = std::vector< path_uptr >
 Vector of owning pointers of paths.
 
using path_uptr_iter = typename path_uptr_vec::iterator
 Iterator to a vector of owning pointers.
 
using regular_shutdown = typename path_type::regular_shutdown
 Message type for sending graceful shutdowns along the path (either stream_msg::drop or stream_msg::close). More...
 
using irregular_shutdown = typename path_type::irregular_shutdown
 Message type for sending errors along the path (either stream_msg::forced_drop or stream_msg::forced_close). More...
 
- Public Types inherited from caf::stream_scatterer
using path_type = outbound_path
 
using path_ptr = path_type *
 

Public Member Functions

 stream_scatterer_impl (local_actor *selfptr)
 
void abort (error reason) override
 Removes all paths with an error message.
 
long total_credit () const
 Returns the total number (sum) of all credit in paths().
 
long min_credit () const
 Returns the minimum number of credit in paths().
 
long max_credit () const
 Returns the maximum number of credit in paths().
 
void close () override
 Removes all paths gracefully.
 
path_ptr add_path (const stream_id &sid, strong_actor_ptr origin, strong_actor_ptr sink_ptr, mailbox_element::forwarding_stack stages, message_id handshake_mid, message handshake_data, stream_priority prio, bool redeployable) override
 Adds a path to the edge. More...
 
path_ptr confirm_path (const stream_id &sid, const actor_addr &from, strong_actor_ptr to, long initial_demand, bool redeployable) override
 Adds a path to a sink and initiates the handshake.
 
bool paths_clean () const override
 Returns true if there is no data pending and no unacknowledged batch on any path. More...
 
long min_batch_size () const override
 Minimum amount of messages required to emit a batch. More...
 
long max_batch_size () const override
 Maximum amount of messages to put into a single batch. More...
 
long min_buffer_size () const override
 Minimum amount of messages we wish to store at the actor in order to emit new batches immediately when receiving new downstream demand. More...
 
duration max_batch_delay () const override
 Forces to actor to emit a batch even if the minimum batch size was not reached. More...
 
void min_batch_size (long x) override
 Minimum amount of messages required to emit a batch. More...
 
void max_batch_size (long x) override
 Maximum amount of messages to put into a single batch. More...
 
void min_buffer_size (long x) override
 Minimum amount of messages we wish to store at the actor in order to emit new batches immediately when receiving new downstream demand. More...
 
void max_batch_delay (duration x) override
 Forces to actor to emit a batch even if the minimum batch size was not reached. More...
 
- Public Member Functions inherited from caf::stream_edge_impl< stream_scatterer >
 stream_edge_impl (local_actor *selfptr)
 
path_ptr find (const stream_id &sid, const actor_addr &x) override
 
const path_uptr_vecpaths () const
 Returns all available paths.
 
local_actor * self () const
 Returns a pointer to the parent actor.
 
bool remove_path (path_uptr_iter i, error reason, bool silent)
 
bool remove_path (const stream_id &sid, const actor_addr &x, error reason, bool silent) override
 
void abort (error reason) override
 
long num_paths () const override
 
bool closed () const override
 
bool continuous () const override
 
void continuous (bool value) override
 
path_ptr path_at (size_t index) override
 
- Public Member Functions inherited from caf::stream_scatterer
virtual void emit_batches ()=0
 Sends batches to sinks.
 
virtual long credit () const =0
 Returns the currently available credit, depending on the policy in use. More...
 
virtual long buffered () const =0
 Returns the size of the output buffer.
 
bool remove_path (const stream_id &sid, const strong_actor_ptr &x, error reason, bool silent)
 Removes a path from the scatterer.
 
path_ptr find (const stream_id &sid, const strong_actor_ptr &x)
 Convenience function for calling find(x, actor_cast<actor_addr>(x)).
 

Static Public Member Functions

template<class PathContainer , class F >
static long fold_credit (const PathContainer &xs, long x0, F f)
 Folds paths() by extracting the open_credit from each path.
 
template<class PathContainer >
static long total_credit (const PathContainer &xs)
 Returns the total number (sum) of all credit in xs.
 
template<class PathContainer >
static long min_credit (const PathContainer &xs)
 Returns the minimum number of credit in xs.
 
template<class PathContainer >
static long max_credit (const PathContainer &xs)
 
- Static Public Member Functions inherited from caf::stream_edge_impl< stream_scatterer >
static void sort_by_credit (PathContainer &xs)
 Sorts xs in descending order by available credit.
 
static T fold (PathContainer &xs, T init, F f)
 
static path_ptr find (PathContainer &xs, const stream_id &sid, const Handle &x)
 Finds the path for ptr and returns a pointer to it.
 
static PathContainer::iterator iter_find (PathContainer &xs, const stream_id &sid, const Handle &x)
 Finds the path for ptr and returns an iterator to it.
 

Protected Attributes

long min_batch_size_
 
long max_batch_size_
 
long min_buffer_size_
 
duration max_batch_delay_
 
- Protected Attributes inherited from caf::stream_edge_impl< stream_scatterer >
local_actor * self_
 
path_uptr_vec paths_
 
bool continuous_
 

Additional Inherited Members

- Static Public Attributes inherited from caf::stream_edge_impl< stream_scatterer >
static constexpr const auto aborter_type
 Stream aborter flag to monitor paths.
 
- Protected Member Functions inherited from caf::stream_edge_impl< stream_scatterer >
path_ptr add_path_impl (const stream_id &sid, strong_actor_ptr x)
 Adds a path to the edge without emitting messages.
 
void close_impl (F f)
 

Detailed Description

Type-erased policy for dispatching data to sinks.

Member Function Documentation

path_ptr caf::stream_scatterer_impl::add_path ( const stream_id &  sid,
strong_actor_ptr  origin,
strong_actor_ptr  sink_ptr,
mailbox_element::forwarding_stack  stages,
message_id  handshake_mid,
message  handshake_data,
stream_priority  prio,
bool  redeployable 
)
overridevirtual

Adds a path to the edge.

Returns
The added path on success, nullptr otherwise.

Implements caf::stream_scatterer.

duration caf::stream_scatterer_impl::max_batch_delay ( ) const
overridevirtual

Forces to actor to emit a batch even if the minimum batch size was not reached.

Implements caf::stream_scatterer.

void caf::stream_scatterer_impl::max_batch_delay ( duration  x)
overridevirtual

Forces to actor to emit a batch even if the minimum batch size was not reached.

Implements caf::stream_scatterer.

long caf::stream_scatterer_impl::max_batch_size ( ) const
overridevirtual

Maximum amount of messages to put into a single batch.

Causes the actor to split a buffer into more batches if necessary.

Implements caf::stream_scatterer.

void caf::stream_scatterer_impl::max_batch_size ( long  x)
overridevirtual

Maximum amount of messages to put into a single batch.

Causes the actor to split a buffer into more batches if necessary.

Implements caf::stream_scatterer.

long caf::stream_scatterer_impl::min_batch_size ( ) const
overridevirtual

Minimum amount of messages required to emit a batch.

A value of 0 disables batch delays.

Implements caf::stream_scatterer.

void caf::stream_scatterer_impl::min_batch_size ( long  x)
overridevirtual

Minimum amount of messages required to emit a batch.

A value of 0 disables batch delays.

Implements caf::stream_scatterer.

long caf::stream_scatterer_impl::min_buffer_size ( ) const
overridevirtual

Minimum amount of messages we wish to store at the actor in order to emit new batches immediately when receiving new downstream demand.

Implements caf::stream_scatterer.

void caf::stream_scatterer_impl::min_buffer_size ( long  x)
overridevirtual

Minimum amount of messages we wish to store at the actor in order to emit new batches immediately when receiving new downstream demand.

Implements caf::stream_scatterer.

bool caf::stream_scatterer_impl::paths_clean ( ) const
overridevirtual

Returns true if there is no data pending and no unacknowledged batch on any path.

Implements caf::stream_scatterer.


The documentation for this class was generated from the following file: