libcaf  0.16.0
Public Types | Public Member Functions | Protected Member Functions | Protected Attributes | Related Functions | List of all members
caf::stream_manager Class Referenceabstract

Manages a single stream with any number of in- and outbound paths. More...

#include <stream_manager.hpp>

Inheritance diagram for caf::stream_manager:
caf::ref_counted caf::memory_managed

Public Types

using inbound_paths_list = std::vector< inbound_path * >
 

Public Member Functions

 stream_manager (scheduled_actor *selfptr, stream_priority prio=stream_priority::normal)
 
virtual void handle (inbound_path *from, downstream_msg::batch &x)
 
virtual void handle (inbound_path *from, downstream_msg::close &x)
 
virtual void handle (inbound_path *from, downstream_msg::forced_close &x)
 
virtual bool handle (stream_slots, upstream_msg::ack_open &x)
 
virtual void handle (stream_slots slots, upstream_msg::ack_batch &x)
 
virtual void handle (stream_slots slots, upstream_msg::drop &x)
 
virtual void handle (stream_slots slots, upstream_msg::forced_drop &x)
 
virtual void stop ()
 Closes all output and input paths and sends the final result to the client. More...
 
void advance ()
 Tries to advance the stream by generating more credit or by sending batches. More...
 
virtual void abort (error reason)
 Aborts a stream after any stream message handler returned a non-default constructed error reason or the parent actor terminates with a non-default error. More...
 
virtual void push ()
 Pushes new data to downstream actors by sending batches. More...
 
virtual bool congested () const noexcept
 Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own. More...
 
virtual void deliver_handshake (response_promise &rp, stream_slot slot, message handshake)
 Sends a handshake to dest. More...
 
virtual bool generate_messages ()
 Tries to generate new messages for the stream. More...
 
virtual downstream_managerout ()=0
 Returns the manager for downstream communication.
 
virtual bool done () const =0
 Returns whether the manager has reached the end and can be discarded safely. More...
 
virtual bool idle () const noexcept=0
 Returns whether the manager cannot make any progress on its own at the moment. More...
 
virtual void cycle_timeout (size_t cycle_nr)
 Advances time.
 
virtual void register_input_path (inbound_path *x)
 Informs the manager that a new input path opens. More...
 
virtual void deregister_input_path (inbound_path *x) noexcept
 Informs the manager that an input path closes. More...
 
virtual void remove_input_path (stream_slot slot, error reason, bool silent)
 Removes an input path.
 
bool continuous () const noexcept
 Returns whether this stream remains open even if no in- or outbound paths exist. More...
 
void continuous (bool x) noexcept
 Sets whether this stream remains open even if no in- or outbound paths exist. More...
 
const inbound_paths_list & inbound_paths () const noexcept
 Returns the list of inbound paths.
 
inbound_pathget_inbound_path (stream_slot x) const noexcept
 Returns the inbound paths at slot x.
 
bool inbound_paths_idle () const noexcept
 Queries whether all inbound paths are up-to-date and have non-zero credit. More...
 
scheduled_actorself ()
 Returns the parent actor.
 
virtual int32_t acquire_credit (inbound_path *path, int32_t desired)
 Acquires credit on an inbound path. More...
 
stream_slot add_unchecked_inbound_path_impl ()
 Adds the current sender as an inbound path. More...
 
- Public Member Functions inherited from caf::ref_counted
 ref_counted (const ref_counted &)
 
ref_countedoperator= (const ref_counted &)
 
void ref () const noexcept
 Increases reference count by one.
 
void deref () const noexcept
 Decreases reference count by one and calls request_deletion when it drops to zero. More...
 
bool unique () const noexcept
 Queries whether there is exactly one reference.
 
size_t get_reference_count () const noexcept
 
- Public Member Functions inherited from caf::memory_managed
virtual void request_deletion (bool decremented_rc) const noexcept
 Default implementations calls `delete this, but can be overriden in case deletion depends on some condition or the class doesn't use default new/delete. More...
 

Protected Member Functions

stream_slot assign_next_slot ()
 
stream_slot assign_next_pending_slot ()
 
virtual void finalize (const error &reason)
 
virtual void input_closed (error reason)
 Called when in().closed() changes to true. More...
 
virtual void downstream_demand (outbound_path *ptr, long demand)
 Called whenever new credit becomes available. More...
 
virtual void output_closed (error reason)
 Called when out().closed() changes to true. More...
 

Protected Attributes

scheduled_actorself_
 Points to the parent actor.
 
inbound_paths_list inbound_paths_
 Stores non-owning pointers to all input paths.
 
long pending_handshakes_
 Keeps track of pending handshakes.
 
stream_priority priority_
 Configures the importance of outgoing traffic.
 
bool continuous_
 Stores whether this stream shall remain open even if no in- or outbound paths exist. More...
 
- Protected Attributes inherited from caf::ref_counted
std::atomic< size_t > rc_
 

Related Functions

(Note that these are not member functions.)

using stream_manager_ptr = intrusive_ptr< stream_manager >
 A reference counting pointer to a stream_manager.
 

Detailed Description

Manages a single stream with any number of in- and outbound paths.

Member Function Documentation

◆ abort()

virtual void caf::stream_manager::abort ( error  reason)
virtual

Aborts a stream after any stream message handler returned a non-default constructed error reason or the parent actor terminates with a non-default error.

Parameters
reasonPrevious error or non-default exit reason of the parent.

◆ acquire_credit()

virtual int32_t caf::stream_manager::acquire_credit ( inbound_path path,
int32_t  desired 
)
virtual

Acquires credit on an inbound path.

The calculated credit to fill our queue fro two cycles is desired, but the manager is allowed to return any non-negative value.

◆ add_unchecked_inbound_path_impl()

stream_slot caf::stream_manager::add_unchecked_inbound_path_impl ( )

Adds the current sender as an inbound path.

Precondition
Current message is an open_stream_msg.

◆ advance()

void caf::stream_manager::advance ( )

Tries to advance the stream by generating more credit or by sending batches.

◆ congested()

virtual bool caf::stream_manager::congested ( ) const
virtualnoexcept

Returns true if the handler is not able to process any further batches since it is unable to make progress sending on its own.

◆ continuous() [1/2]

bool caf::stream_manager::continuous ( ) const
noexcept

Returns whether this stream remains open even if no in- or outbound paths exist.

The default is false. Does not keep a source alive past the point where its driver returns done() == true.

◆ continuous() [2/2]

void caf::stream_manager::continuous ( bool  x)
noexcept

Sets whether this stream remains open even if no in- or outbound paths exist.

◆ deliver_handshake()

virtual void caf::stream_manager::deliver_handshake ( response_promise rp,
stream_slot  slot,
message  handshake 
)
virtual

Sends a handshake to dest.

Precondition
dest != nullptr

◆ deregister_input_path()

virtual void caf::stream_manager::deregister_input_path ( inbound_path x)
virtualnoexcept

Informs the manager that an input path closes.

Note
The lifetime of inbound paths is managed by the downstream queue. This function is called from the destructor of inbound_path.

◆ done()

virtual bool caf::stream_manager::done ( ) const
pure virtual

Returns whether the manager has reached the end and can be discarded safely.

◆ downstream_demand()

virtual void caf::stream_manager::downstream_demand ( outbound_path ptr,
long  demand 
)
protectedvirtual

Called whenever new credit becomes available.

The default implementation logs an error (sources are expected to override this hook).

◆ generate_messages()

virtual bool caf::stream_manager::generate_messages ( )
virtual

Tries to generate new messages for the stream.

This member function does nothing on stages and sinks, but can trigger a source to produce more messages.

◆ idle()

virtual bool caf::stream_manager::idle ( ) const
pure virtualnoexcept

Returns whether the manager cannot make any progress on its own at the moment.

For example, a source is idle if it has filled its output buffer and there isn't any credit left.

◆ inbound_paths_idle()

bool caf::stream_manager::inbound_paths_idle ( ) const
noexcept

Queries whether all inbound paths are up-to-date and have non-zero credit.

A sink is idle if this function returns true.

◆ input_closed()

virtual void caf::stream_manager::input_closed ( error  reason)
protectedvirtual

Called when in().closed() changes to true.

The default implementation does nothing.

◆ output_closed()

virtual void caf::stream_manager::output_closed ( error  reason)
protectedvirtual

Called when out().closed() changes to true.

The default implementation does nothing.

◆ push()

virtual void caf::stream_manager::push ( )
virtual

Pushes new data to downstream actors by sending batches.

The amount of pushed data is limited by the available credit.

◆ register_input_path()

virtual void caf::stream_manager::register_input_path ( inbound_path x)
virtual

Informs the manager that a new input path opens.

Note
The lifetime of inbound paths is managed by the downstream queue. This function is called from the constructor of inbound_path.

◆ stop()

virtual void caf::stream_manager::stop ( )
virtual

Closes all output and input paths and sends the final result to the client.

Member Data Documentation

◆ continuous_

bool caf::stream_manager::continuous_
protected

Stores whether this stream shall remain open even if no in- or outbound paths exist.


The documentation for this class was generated from the following file: