libcaf  0.15.5
Public Types | Public Member Functions | Static Public Member Functions | Related Functions | List of all members
caf::scheduled_actor Class Reference

A cooperatively scheduled, event-based actor implementation. More...

#include <scheduled_actor.hpp>

Inheritance diagram for caf::scheduled_actor:
caf::resumable caf::io::abstract_broker caf::io::broker

Public Types

using stream_manager_ptr = intrusive_ptr< stream_manager >
 A reference-counting pointer to a stream_manager.
 
using streams_map = std::unordered_map< stream_id, stream_manager_ptr >
 A container for associating stream IDs to handlers.
 
using pending_response = std::pair< const message_id, behavior >
 The message ID of an outstanding response with its callback.
 
using pointer = scheduled_actor *
 A pointer to a scheduled actor.
 
using default_handler = std::function< result< message >(pointer, message_view &)>
 Function object for handling unmatched messages.
 
using error_handler = std::function< void(pointer, error &)>
 Function object for handling error messages.
 
using down_handler = std::function< void(pointer, down_msg &)>
 Function object for handling down messages.
 
using exit_handler = std::function< void(pointer, exit_msg &)>
 Function object for handling exit messages.
 
using exception_handler = std::function< error(pointer, std::exception_ptr &)>
 Function object for handling exit messages.
 
- Public Types inherited from caf::resumable
enum  resume_result {
  resume_later,
  awaiting_message,
  done,
  shutdown_execution_unit
}
 Denotes the state in which a resumable returned from its last call to resume. More...
 
enum  subtype_t {
  unspecified,
  scheduled_actor,
  io_actor,
  function_object
}
 Denotes common subtypes of resumable. More...
 

Public Member Functions

 scheduled_actor (actor_config &cfg)
 
void enqueue (mailbox_element_ptr ptr, execution_unit *eu) override
 
const char * name () const override
 
void launch (execution_unit *eu, bool lazy, bool hide) override
 
bool cleanup (error &&fail_state, execution_unit *host) override
 
subtype_t subtype () const override
 Returns a subtype hint for this object. More...
 
void intrusive_ptr_add_ref_impl () override
 Add a strong reference count to this object.
 
void intrusive_ptr_release_impl () override
 Remove a strong reference count from this object.
 
resume_result resume (execution_unit *, size_t) override
 Resume any pending computation until it is either finished or needs to be re-scheduled later. More...
 
virtual proxy_registryproxy_registry_ptr ()
 Returns a factory for proxies created and managed by this actor or nullptr. More...
 
void quit (error x=error{})
 Finishes execution of this actor after any currently running message handler is done. More...
 
void set_default_handler (default_handler fun)
 Sets a custom handler for unexpected messages.
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< result< message >type_erased_tuple &)> >::value >::type set_default_handler (F fun)
 Sets a custom handler for unexpected messages.
 
void set_error_handler (error_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_error_handler (T fun) -> decltype(fun(std::declval< error & >()))
 Sets a custom handler for error messages.
 
void set_down_handler (down_handler fun)
 Sets a custom handler for down messages.
 
template<class T >
auto set_down_handler (T fun) -> decltype(fun(std::declval< down_msg & >()))
 Sets a custom handler for down messages.
 
void set_exit_handler (exit_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg & >()))
 Sets a custom handler for exit messages.
 
void set_exception_handler (exception_handler fun)
 Sets a custom exception handler for this actor. More...
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type set_exception_handler (F f)
 Sets a custom exception handler for this actor. More...
 
stream_id make_stream_id ()
 Returns a new stream ID.
 
template<class Handle , class... Ts, class Init , class Getter , class ClosedPredicate , class ResHandler , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
annotated_stream< typename stream_source_trait_t< Getter >::output, Ts... > make_source (const Handle &dest, std::tuple< Ts... > xs, Init init, Getter getter, ClosedPredicate pred, ResHandler res_handler, policy::arg< Scatterer > scatterer_type={})
 Creates a new stream source and starts streaming to dest. More...
 
template<class Handle , class Init , class Getter , class ClosedPredicate , class ResHandler , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
stream< typename stream_source_trait_t< Getter >::output > make_source (const Handle &dest, Init init, Getter getter, ClosedPredicate pred, ResHandler res_handler, policy::arg< Scatterer > scatterer_type={})
 Creates a new stream source and starts streaming to dest. More...
 
template<class Init , class... Ts, class Getter , class ClosedPredicate , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
annotated_stream< typename stream_source_trait_t< Getter >::output, Ts... > make_source (std::tuple< Ts... > xs, Init init, Getter getter, ClosedPredicate pred, policy::arg< Scatterer > scatterer_type={})
 Creates a new stream source. More...
 
template<class Init , class Getter , class ClosedPredicate , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
stream< typename stream_source_trait_t< Getter >::output > make_source (Init init, Getter getter, ClosedPredicate pred, policy::arg< Scatterer > scatterer_type={})
 Creates a new stream source. More...
 
template<class In , class... Ts, class Init , class Fun , class Cleanup , class Gatherer = random_gatherer, class Scatterer = broadcast_scatterer<typename stream_stage_trait_t<Fun>::output>>
annotated_stream< typename stream_stage_trait_t< Fun >::output, Ts... > make_stage (const stream< In > &in, std::tuple< Ts... > xs, Init init, Fun fun, Cleanup cleanup, policy::arg< Gatherer, Scatterer > policies={})
 Creates a new stream stage. More...
 
template<class In , class Init , class Fun , class Cleanup , class Gatherer = random_gatherer, class Scatterer = broadcast_scatterer<typename stream_stage_trait_t<Fun>::output>>
stream< typename stream_stage_trait_t< Fun >::output > make_stage (const stream< In > &in, Init init, Fun fun, Cleanup cleanup, policy::arg< Gatherer, Scatterer > policies={})
 Creates a new stream stage. More...
 
template<class T , class In , class SuccessCallback , class... Ts>
stream_result< typename T::output_type > make_sink_impl (const stream< In > &in, SuccessCallback f, Ts &&...xs)
 Creates a new stream sink of type T. More...
 
template<class In , class Init , class Fun , class Finalize , class Gatherer = random_gatherer, class Scatterer = terminal_stream_scatterer>
stream_result< typename stream_sink_trait_t< Fun, Finalize >::output > make_sink (const stream< In > &in, Init init, Fun fun, Finalize finalize, policy::arg< Gatherer, Scatterer > policies={})
 Creates a new stream sink. More...
 
streams_mapstreams ()
 
void trigger_downstreams ()
 Tries to send more data on all downstream paths. More...
 

Static Public Member Functions

static void default_error_handler (pointer ptr, error &x)
 
static void default_down_handler (pointer ptr, down_msg &x)
 
static void default_exit_handler (pointer ptr, exit_msg &x)
 
static error default_exception_handler (pointer ptr, std::exception_ptr &x)
 

Related Functions

(Note that these are not member functions.)

result< messagereflect (scheduled_actor *, message_view &)
 
result< messagereflect_and_quit (scheduled_actor *, message_view &)
 
result< messageprint_and_drop (scheduled_actor *, message_view &)
 
result< messagedrop (scheduled_actor *, message_view &)
 

Detailed Description

A cooperatively scheduled, event-based actor implementation.

This is the recommended base class for user-defined actors.

Member Function Documentation

template<class In , class Init , class Fun , class Finalize , class Gatherer = random_gatherer, class Scatterer = terminal_stream_scatterer>
stream_result<typename stream_sink_trait_t<Fun, Finalize>::output> caf::scheduled_actor::make_sink ( const stream< In > &  in,
Init  init,
Fun  fun,
Finalize  finalize,
policy::arg< Gatherer, Scatterer >  policies = {} 
)

Creates a new stream sink.

Precondition
current_mailbox_element() is a stream_msg::open handshake
Parameters
inThe input of the sink.
initFunction object for initializing the state of the stage.
funFunction object for processing stream elements.
finalizeFunction object for producing the final result.
policiesSets the policies for up- and downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
template<class T , class In , class SuccessCallback , class... Ts>
stream_result<typename T::output_type> caf::scheduled_actor::make_sink_impl ( const stream< In > &  in,
SuccessCallback  f,
Ts &&...  xs 
)

Creates a new stream sink of type T.

Precondition
current_mailbox_element() is a stream_msg::open handshake
Parameters
inThe input of the sink.
fCallback for initializing the object after successful creation.
xsParameter pack for creating the instance of T.
Returns
A stream object with a pointer to the generated stream_manager.
template<class Handle , class... Ts, class Init , class Getter , class ClosedPredicate , class ResHandler , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
annotated_stream<typename stream_source_trait_t<Getter>::output, Ts...> caf::scheduled_actor::make_source ( const Handle &  dest,
std::tuple< Ts... >  xs,
Init  init,
Getter  getter,
ClosedPredicate  pred,
ResHandler  res_handler,
policy::arg< Scatterer >  scatterer_type = {} 
)

Creates a new stream source and starts streaming to dest.

Parameters
destActor handle to the stream destination.
xsUser-defined handshake payload.
initFunction object for initializing the state of the source.
getterFunction object for generating messages for the stream.
predPredicate returning true when the stream is done.
res_handlerFunction object for receiving the stream result.
scatterer_typeConfigures the policy for downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
template<class Handle , class Init , class Getter , class ClosedPredicate , class ResHandler , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
stream<typename stream_source_trait_t<Getter>::output> caf::scheduled_actor::make_source ( const Handle &  dest,
Init  init,
Getter  getter,
ClosedPredicate  pred,
ResHandler  res_handler,
policy::arg< Scatterer >  scatterer_type = {} 
)

Creates a new stream source and starts streaming to dest.

Parameters
destActor handle to the stream destination.
initFunction object for initializing the state of the source.
getterFunction object for generating messages for the stream.
predPredicate returning true when the stream is done.
res_handlerFunction object for receiving the stream result.
scatterer_typeConfigures the policy for downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
template<class Init , class... Ts, class Getter , class ClosedPredicate , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
annotated_stream<typename stream_source_trait_t<Getter>::output, Ts...> caf::scheduled_actor::make_source ( std::tuple< Ts... >  xs,
Init  init,
Getter  getter,
ClosedPredicate  pred,
policy::arg< Scatterer >  scatterer_type = {} 
)

Creates a new stream source.

Parameters
xsUser-defined handshake payload.
initFunction object for initializing the state of the source.
getterFunction object for generating messages for the stream.
predPredicate returning true when the stream is done.
scatterer_typeConfigures the policy for downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
template<class Init , class Getter , class ClosedPredicate , class Scatterer = broadcast_scatterer< typename stream_source_trait_t<Getter>::output>>
stream<typename stream_source_trait_t<Getter>::output> caf::scheduled_actor::make_source ( Init  init,
Getter  getter,
ClosedPredicate  pred,
policy::arg< Scatterer >  scatterer_type = {} 
)

Creates a new stream source.

Parameters
initFunction object for initializing the state of the source.
getterFunction object for generating messages for the stream.
predPredicate returning true when the stream is done.
scatterer_typeConfigures the policy for downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
template<class In , class... Ts, class Init , class Fun , class Cleanup , class Gatherer = random_gatherer, class Scatterer = broadcast_scatterer<typename stream_stage_trait_t<Fun>::output>>
annotated_stream<typename stream_stage_trait_t<Fun>::output, Ts...> caf::scheduled_actor::make_stage ( const stream< In > &  in,
std::tuple< Ts... >  xs,
Init  init,
Fun  fun,
Cleanup  cleanup,
policy::arg< Gatherer, Scatterer >  policies = {} 
)

Creates a new stream stage.

Precondition
current_mailbox_element() is a stream_msg::open handshake
Parameters
inThe input of the stage.
xsUser-defined handshake payload.
initFunction object for initializing the state of the stage.
funFunction object for processing stream elements.
cleanupFunction object for clearing the stage of the stage.
policiesSets the policies for up- and downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
template<class In , class Init , class Fun , class Cleanup , class Gatherer = random_gatherer, class Scatterer = broadcast_scatterer<typename stream_stage_trait_t<Fun>::output>>
stream<typename stream_stage_trait_t<Fun>::output> caf::scheduled_actor::make_stage ( const stream< In > &  in,
Init  init,
Fun  fun,
Cleanup  cleanup,
policy::arg< Gatherer, Scatterer >  policies = {} 
)

Creates a new stream stage.

Precondition
current_mailbox_element() is a stream_msg::open handshake
Parameters
inThe input of the stage.
initFunction object for initializing the state of the stage.
funFunction object for processing stream elements.
cleanupFunction object for clearing the stage of the stage.
policiesSets the policies for up- and downstream communication.
Returns
A stream object with a pointer to the generated stream_manager.
virtual proxy_registry* caf::scheduled_actor::proxy_registry_ptr ( )
virtual

Returns a factory for proxies created and managed by this actor or nullptr.

void caf::scheduled_actor::quit ( error  x = error{})

Finishes execution of this actor after any currently running message handler is done.

This member function clears the behavior stack of the running actor and invokes on_exit(). The actors does not finish execution if the implementation of on_exit() sets a new behavior. When setting a new behavior in on_exit(), one has to make sure to not produce an infinite recursion.

If on_exit() did not set a new behavior, the actor sends an exit message to all of its linked actors, sets its state to exited and finishes execution.

In case this actor uses the blocking API, this member function unwinds the stack by throwing an actor_exited exception.

Warning
This member function throws immediately in thread-based actors that do not use the behavior stack, i.e., actors that use blocking API calls such as receive().
resume_result caf::scheduled_actor::resume ( execution_unit ,
size_t  max_throughput 
)
overridevirtual

Resume any pending computation until it is either finished or needs to be re-scheduled later.

Implements caf::resumable.

Reimplemented in caf::io::abstract_broker.

void caf::scheduled_actor::set_exception_handler ( exception_handler  fun)

Sets a custom exception handler for this actor.

If multiple handlers are defined, only the functor that was added last is being executed.

template<class F >
std::enable_if< std::is_convertible< F, std::function<error (std::exception_ptr&)> >::value >::type caf::scheduled_actor::set_exception_handler ( f)

Sets a custom exception handler for this actor.

If multiple handlers are defined, only the functor that was added last is being executed.

subtype_t caf::scheduled_actor::subtype ( ) const
overridevirtual

Returns a subtype hint for this object.

This allows an execution unit to limit processing to a specific set of resumables and delegate other subtypes to dedicated workers.

Reimplemented from caf::resumable.

Reimplemented in caf::io::abstract_broker.

void caf::scheduled_actor::trigger_downstreams ( )

Tries to send more data on all downstream paths.

Use this function to manually trigger batches in a source after receiving more data to send.

Friends And Related Function Documentation

result< message > drop ( scheduled_actor ,
message_view  
)
related

Default handler function that simply drops messages.

result< message > print_and_drop ( scheduled_actor ,
message_view  
)
related

Default handler function that prints messages message via aout and drops them afterwards.

result< message > reflect ( scheduled_actor ,
message_view  
)
related

Default handler function that sends the message back to the sender.

result< message > reflect_and_quit ( scheduled_actor ,
message_view  
)
related

Default handler function that sends the message back to the sender and then quits.


The documentation for this class was generated from the following file: