libcaf  0.15.5
Public Member Functions | Protected Member Functions | Protected Attributes | Related Functions | List of all members
caf::stream_manager Class Referenceabstract

Manages a single stream with any number of down- and upstream actors. More...

#include <stream_manager.hpp>

Inheritance diagram for caf::stream_manager:
caf::ref_counted caf::memory_managed

Public Member Functions

virtual error open (const stream_id &sid, strong_actor_ptr hdl, strong_actor_ptr original_stage, stream_priority priority, bool redeployable, response_promise result_cb)
 Handles stream_msg::open messages. More...
 
virtual error ack_open (const stream_id &sid, const actor_addr &rebind_from, strong_actor_ptr rebind_to, long initial_demand, bool redeployable)
 Handles stream_msg::ack_open messages, i.e., finalizes the stream handshake. More...
 
virtual error batch (const stream_id &sid, const actor_addr &hdl, long xs_size, message &xs, int64_t xs_id)
 Handles stream_msg::batch messages. More...
 
virtual error ack_batch (const stream_id &sid, const actor_addr &hdl, long new_demand, int64_t cumulative_batch_id)
 Handles stream_msg::ack_batch messages. More...
 
virtual error close (const stream_id &sid, const actor_addr &hdl)
 Handles stream_msg::close messages. More...
 
virtual error drop (const stream_id &sid, const actor_addr &hdl)
 Handles stream_msg::drop messages. More...
 
virtual error forced_close (const stream_id &sid, const actor_addr &hdl, error reason)
 Handles stream_msg::drop messages. More...
 
virtual error forced_drop (const stream_id &sid, const actor_addr &hdl, error reason)
 Handles stream_msg::drop messages. More...
 
virtual bool add_sink (const stream_id &sid, strong_actor_ptr origin, strong_actor_ptr sink_ptr, mailbox_element::forwarding_stack stages, message_id handshake_mid, message handshake_data, stream_priority prio, bool redeployable)
 Adds a new sink to the stream.
 
virtual bool add_source (const stream_id &sid, strong_actor_ptr source_ptr, strong_actor_ptr original_stage, stream_priority prio, bool redeployable, response_promise result_cb)
 Adds the source hdl to a stream. More...
 
virtual void push ()
 Pushes new data to downstream actors by sending batches. More...
 
virtual void abort (error reason)
 Aborts a stream after any stream message handler returned a non-default constructed error reason or the parent actor terminates with a non-default error. More...
 
virtual void close ()
 Closes the stream when the parent terminates with default exit reason or the stream reached its end. More...
 
virtual stream_gathererin ()=0
 Returns the stream edge for incoming data.
 
virtual stream_scattererout ()=0
 Returns the stream edge for outgoing data.
 
virtual bool done () const =0
 Returns whether the stream has reached the end and can be discarded safely. More...
 
virtual bool generate_messages ()
 Tries to generate new messages for the stream. More...
 
- Public Member Functions inherited from caf::ref_counted
 ref_counted (const ref_counted &)
 
ref_countedoperator= (const ref_counted &)
 
void ref () noexcept
 Increases reference count by one.
 
void deref () noexcept
 Decreases reference count by one and calls request_deletion when it drops to zero. More...
 
bool unique () const noexcept
 Queries whether there is exactly one reference.
 
size_t get_reference_count () const noexcept
 
- Public Member Functions inherited from caf::memory_managed
virtual void request_deletion (bool decremented_rc) noexcept
 Default implementations calls `delete this, but can be overriden in case deletion depends on some condition or the class doesn't use default new/delete. More...
 

Protected Member Functions

virtual message make_final_result ()
 Called when the gatherer closes to produce the final stream result for all listeners. More...
 
virtual error process_batch (message &msg)
 Called to handle incoming data. More...
 
virtual void input_closed (error reason)
 Called when in().closed() changes to true. More...
 
virtual message make_output_token (const stream_id &) const
 Returns a type-erased stream<T> as handshake token for downstream actors. More...
 
virtual void downstream_demand (outbound_path *ptr, long demand)
 Called whenever new credit becomes available. More...
 
virtual void output_closed (error reason)
 Called when out().closed() changes to true. More...
 

Protected Attributes

local_actor * self_
 Pointer to the parent actor.
 
- Protected Attributes inherited from caf::ref_counted
std::atomic< size_t > rc_
 

Related Functions

(Note that these are not member functions.)

using stream_manager_ptr = intrusive_ptr< stream_manager >
 A reference counting pointer to a stream_manager.
 

Detailed Description

Manages a single stream with any number of down- and upstream actors.

Member Function Documentation

virtual void caf::stream_manager::abort ( error  reason)
virtual

Aborts a stream after any stream message handler returned a non-default constructed error reason or the parent actor terminates with a non-default error.

Parameters
reasonPrevious error or non-default exit reason of the parent.
virtual error caf::stream_manager::ack_batch ( const stream_id &  sid,
const actor_addr hdl,
long  new_demand,
int64_t  cumulative_batch_id 
)
virtual

Handles stream_msg::ack_batch messages.

Parameters
hdlHandle to the sender.
new_demandNew credit for sending data.
cumulative_batch_idId of last handled batch.
Precondition
hdl != nullptr
virtual error caf::stream_manager::ack_open ( const stream_id &  sid,
const actor_addr rebind_from,
strong_actor_ptr  rebind_to,
long  initial_demand,
bool  redeployable 
)
virtual

Handles stream_msg::ack_open messages, i.e., finalizes the stream handshake.

Parameters
sidID of the outgoing stream.
rebind_fromReceiver of the original open message.
rebind_toSender of this confirmation.
initial_demandCredit received with this ack_open.
redeployableDenotes whether the runtime can redeploy rebind_to on failure.
Precondition
hdl != nullptr
virtual bool caf::stream_manager::add_source ( const stream_id &  sid,
strong_actor_ptr  source_ptr,
strong_actor_ptr  original_stage,
stream_priority  prio,
bool  redeployable,
response_promise  result_cb 
)
virtual

Adds the source hdl to a stream.

Convenience function for calling in().add_path(sid, hdl).second.

virtual error caf::stream_manager::batch ( const stream_id &  sid,
const actor_addr hdl,
long  xs_size,
message xs,
int64_t  xs_id 
)
virtual

Handles stream_msg::batch messages.

Parameters
hdlHandle to the sender.
xs_sizeSize of the vector stored in xs.
xsA type-erased vector of size xs_size.
xs_idID of this batch (must be ACKed).
Precondition
hdl != nullptr
xs_size > 0
virtual error caf::stream_manager::close ( const stream_id &  sid,
const actor_addr hdl 
)
virtual

Handles stream_msg::close messages.

Parameters
hdlHandle to the sender.
Precondition
hdl != nullptr
virtual void caf::stream_manager::close ( )
virtual

Closes the stream when the parent terminates with default exit reason or the stream reached its end.

virtual bool caf::stream_manager::done ( ) const
pure virtual

Returns whether the stream has reached the end and can be discarded safely.

virtual void caf::stream_manager::downstream_demand ( outbound_path ptr,
long  demand 
)
protectedvirtual

Called whenever new credit becomes available.

The default implementation logs an error (sources are expected to override this hook).

virtual error caf::stream_manager::drop ( const stream_id &  sid,
const actor_addr hdl 
)
virtual

Handles stream_msg::drop messages.

Parameters
hdlHandle to the sender.
Precondition
hdl != nullptr
virtual error caf::stream_manager::forced_close ( const stream_id &  sid,
const actor_addr hdl,
error  reason 
)
virtual

Handles stream_msg::drop messages.

The default implementation calls abort(reason) and returns sec::unhandled_stream_error.

Parameters
hdlHandle to the sender.
reasonReported error from the source.
Precondition
hdl != nullptr
err != none
virtual error caf::stream_manager::forced_drop ( const stream_id &  sid,
const actor_addr hdl,
error  reason 
)
virtual

Handles stream_msg::drop messages.

The default implementation calls abort(reason) and returns sec::unhandled_stream_error.

Parameters
hdlHandle to the sender.
reasonReported error from the sink.
Precondition
hdl != nullptr
err != none
virtual bool caf::stream_manager::generate_messages ( )
virtual

Tries to generate new messages for the stream.

This member function does nothing on stages and sinks, but can trigger a source to produce more messages.

virtual void caf::stream_manager::input_closed ( error  reason)
protectedvirtual

Called when in().closed() changes to true.

The default implementation does nothing.

virtual message caf::stream_manager::make_final_result ( )
protectedvirtual

Called when the gatherer closes to produce the final stream result for all listeners.

The default implementation returns an empty message.

virtual message caf::stream_manager::make_output_token ( const stream_id &  ) const
protectedvirtual

Returns a type-erased stream<T> as handshake token for downstream actors.

Returns an empty message for sinks.

virtual error caf::stream_manager::open ( const stream_id &  sid,
strong_actor_ptr  hdl,
strong_actor_ptr  original_stage,
stream_priority  priority,
bool  redeployable,
response_promise  result_cb 
)
virtual

Handles stream_msg::open messages.

Returns
Initial credit to the source.
Parameters
hdlHandle to the sender.
original_stageHandle to the initial receiver of the handshake.
priorityAffects credit assignment and maximum bandwidth.
redeployableConfigures whether hdl could get redeployed, i.e., can resume after an abort.
result_cbCallback for the listener of the final stream result. Ignored when returning nullptr, because the previous stage is responsible for it until this manager acknowledges the handshake.
Precondition
hdl != nullptr
virtual void caf::stream_manager::output_closed ( error  reason)
protectedvirtual

Called when out().closed() changes to true.

The default implementation does nothing.

virtual error caf::stream_manager::process_batch ( message msg)
protectedvirtual

Called to handle incoming data.

The default implementation logs an error (sinks are expected to override this member function).

virtual void caf::stream_manager::push ( )
virtual

Pushes new data to downstream actors by sending batches.

The amount of pushed data is limited by the available credit.


The documentation for this class was generated from the following file: