libcaf  0.16.0
Public Types | Public Member Functions | Protected Member Functions | List of all members
caf::io::broker Class Reference

Describes a dynamically typed broker. More...

#include <broker.hpp>

Inheritance diagram for caf::io::broker:
caf::io::abstract_broker caf::scheduled_actor caf::resumable

Public Types

using super = extend< abstract_broker, broker >::with< mixin::sender, mixin::requester, mixin::behavior_changer >
 
using signatures = none_t
 
- Public Types inherited from caf::scheduled_actor
enum  message_category {
  message_category::ordinary,
  message_category::internal,
  message_category::skipped
}
 Categorizes incoming messages. More...
 
enum  activation_result {
  activation_result::success,
  activation_result::terminated,
  activation_result::skipped,
  activation_result::dropped
}
 Result of one-shot activations. More...
 
using super = local_actor
 Base type.
 
using stream_manager_map = std::map< stream_slot, stream_manager_ptr >
 Maps slot IDs to stream managers.
 
using default_queue = intrusive::drr_cached_queue< policy::normal_messages >
 Stores asynchronous messages with default priority.
 
using urgent_queue = intrusive::drr_cached_queue< policy::urgent_messages >
 Stores asynchronous messages with hifh priority.
 
using upstream_queue = intrusive::drr_queue< policy::upstream_messages >
 Stores upstream messages.
 
using downstream_queue = intrusive::wdrr_dynamic_multiplexed_queue< policy::downstream_messages >
 Stores downstream messages.
 
using mailbox_type = intrusive::fifo_inbox< mailbox_policy >
 A queue optimized for single-reader-many-writers.
 
using pending_response = std::pair< const message_id, behavior >
 The message ID of an outstanding response with its callback.
 
using pointer = scheduled_actor *
 A pointer to a scheduled actor.
 
using default_handler = std::function< result< message >(pointer, message_view &)>
 Function object for handling unmatched messages.
 
using error_handler = std::function< void(pointer, error &)>
 Function object for handling error messages.
 
using down_handler = std::function< void(pointer, down_msg &)>
 Function object for handling down messages.
 
using exit_handler = std::function< void(pointer, exit_msg &)>
 Function object for handling exit messages.
 
using exception_handler = std::function< error(pointer, std::exception_ptr &)>
 Function object for handling exit messages.
 
- Public Types inherited from caf::resumable
enum  resume_result {
  resume_later,
  awaiting_message,
  done,
  shutdown_execution_unit
}
 Denotes the state in which a resumable returned from its last call to resume. More...
 
enum  subtype_t {
  unspecified,
  scheduled_actor,
  io_actor,
  function_object
}
 Denotes common subtypes of resumable. More...
 

Public Member Functions

template<class F , class... Ts>
infer_handle_from_fun< F >::type fork (F fun, connection_handle hdl, Ts &&... xs)
 
void initialize () override
 
 broker (actor_config &cfg)
 
- Public Member Functions inherited from caf::io::abstract_broker
void enqueue (mailbox_element_ptr, execution_unit *) override
 
void enqueue (strong_actor_ptr, message_id, message, execution_unit *) override
 
void launch (execution_unit *eu, bool lazy, bool hide) override
 
bool cleanup (error &&reason, execution_unit *host) override
 
resume_result resume (execution_unit *, size_t) override
 Resume any pending computation until it is either finished or needs to be re-scheduled later. More...
 
template<class Handle >
void halt (Handle hdl)
 Suspends activities on hdl unconditionally.
 
template<class Handle >
void trigger (Handle hdl)
 Allows activities on hdl unconditionally (default).
 
template<class Handle >
void trigger (Handle hdl, size_t num_events)
 Allows num_events activities on hdl.
 
void configure_read (connection_handle hdl, receive_policy::config cfg)
 Modifies the receive policy for a given connection. More...
 
void ack_writes (connection_handle hdl, bool enable)
 Enables or disables write notifications for a given connection.
 
std::vector< char > & wr_buf (connection_handle hdl)
 Returns the write buffer for a given connection.
 
void write (connection_handle hdl, size_t bs, const void *buf)
 Writes data into the buffer for a given connection.
 
void flush (connection_handle hdl)
 Sends the content of the buffer for a given connection.
 
void ack_writes (datagram_handle hdl, bool enable)
 Enables or disables write notifications for a given datagram socket.
 
std::vector< char > & wr_buf (datagram_handle hdl)
 Returns the write buffer for a given sink.
 
void enqueue_datagram (datagram_handle, std::vector< char >)
 Enqueue a buffer to be sent as a datagram via a given endpoint.
 
void write (datagram_handle hdl, size_t data_size, const void *data)
 Writes data into the buffer of a given sink.
 
void flush (datagram_handle hdl)
 Sends the content of the buffer to a UDP endpoint.
 
middlemanparent ()
 Returns the middleman instance this broker belongs to.
 
void add_scribe (scribe_ptr ptr)
 Adds the unitialized scribe instance ptr to this broker.
 
connection_handle add_scribe (network::native_socket fd)
 Creates and assigns a new scribe from given native socked fd.
 
expected< connection_handleadd_tcp_scribe (const std::string &host, uint16_t port)
 Tries to connect to host on given port and creates a new scribe describing the connection afterwards. More...
 
void move_scribe (scribe_ptr ptr)
 Moves the initialized scribe instance ptr from another broker to this broker. More...
 
void add_doorman (doorman_ptr ptr)
 Adds a doorman instance to this broker.
 
accept_handle add_doorman (network::native_socket fd)
 Creates and assigns a new doorman from given native socked fd.
 
void move_doorman (doorman_ptr ptr)
 Adds a doorman instance to this broker.
 
expected< std::pair< accept_handle, uint16_t > > add_tcp_doorman (uint16_t port=0, const char *in=nullptr, bool reuse_addr=false)
 Tries to open a local port and creates a doorman managing it on success. More...
 
void add_datagram_servant (datagram_servant_ptr ptr)
 Adds a datagram_servant to this broker.
 
void add_hdl_for_datagram_servant (datagram_servant_ptr ptr, datagram_handle hdl)
 Adds the datagram_servant under an additional hdl.
 
datagram_handle add_datagram_servant (network::native_socket fd)
 Creates and assigns a new datagram_servant from a given socket fd.
 
datagram_handle add_datagram_servant_for_endpoint (network::native_socket fd, const network::ip_endpoint &ep)
 Creates and assigns a new datagram_servant from a given socket fd for the remote endpoint ep. More...
 
expected< datagram_handleadd_udp_datagram_servant (const std::string &host, uint16_t port)
 Creates a new datagram_servant for the remote endpoint host and port. More...
 
expected< std::pair< datagram_handle, uint16_t > > add_udp_datagram_servant (uint16_t port=0, const char *in=nullptr, bool reuse_addr=false)
 Tries to open a local port and creates a datagram_servant managing it on success. More...
 
void move_datagram_servant (datagram_servant_ptr ptr)
 Moves an initialized datagram_servant instance ptr from another broker to this one. More...
 
std::string remote_addr (connection_handle hdl)
 Returns the remote address associated with hdl or empty string if hdl is invalid. More...
 
uint16_t remote_port (connection_handle hdl)
 Returns the remote port associated with hdl or 0 if hdl is invalid. More...
 
std::string local_addr (accept_handle hdl)
 Returns the local address associated with hdl or empty string if hdl is invalid. More...
 
uint16_t local_port (accept_handle hdl)
 Returns the local port associated with hdl or 0 if hdl is invalid.
 
accept_handle hdl_by_port (uint16_t port)
 Returns the handle associated with given local port or none.
 
datagram_handle datagram_hdl_by_port (uint16_t port)
 Returns the dgram handle associated with given local port or none.
 
std::string remote_addr (datagram_handle hdl)
 Returns the remote address associated with hdl or an empty string if hdl is invalid. More...
 
uint16_t remote_port (datagram_handle hdl)
 Returns the remote port associated with hdl or 0 if hdl is invalid. More...
 
uint16_t local_port (datagram_handle hdl)
 Returns the remote port associated with hdl or 0 if hdl is invalid. More...
 
bool remove_endpoint (datagram_handle hdl)
 Remove the endpoint hdl from the broker.
 
void close_all ()
 Closes all connections and acceptors.
 
template<class Handle >
bool close (Handle hdl)
 Closes the connection or acceptor identified by handle. More...
 
template<class Handle >
bool valid (Handle hdl)
 Checks whether hdl is assigned to broker.
 
const char * name () const override
 
subtype_t subtype () const override
 Returns a subtype hint for this object. More...
 
size_t num_connections () const
 Returns the number of open connections.
 
std::vector< connection_handleconnections () const
 Returns all handles of all scribe instances attached to this broker.
 
network::multiplexerbackend ()
 Returns the multiplexer running this broker.
 
- Public Member Functions inherited from caf::scheduled_actor
 scheduled_actor (actor_config &cfg)
 
void enqueue (mailbox_element_ptr ptr, execution_unit *eu) override
 
const char * name () const override
 
void launch (execution_unit *eu, bool lazy, bool hide) override
 
bool cleanup (error &&fail_state, execution_unit *host) override
 
void intrusive_ptr_add_ref_impl () override
 Add a strong reference count to this object.
 
void intrusive_ptr_release_impl () override
 Remove a strong reference count from this object.
 
virtual proxy_registryproxy_registry_ptr ()
 Returns a factory for proxies created and managed by this actor or nullptr. More...
 
void quit (error x=error{})
 Finishes execution of this actor after any currently running message handler is done. More...
 
mailbox_typemailbox () noexcept
 Returns the queue for storing incoming messages.
 
stream_manager_mapstream_managers () noexcept
 Returns map for all active streams.
 
stream_manager_mappending_stream_managers () noexcept
 Returns map for all pending streams.
 
void set_default_handler (default_handler fun)
 Sets a custom handler for unexpected messages.
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< result< message >type_erased_tuple &)> >::value >::type set_default_handler (F fun)
 Sets a custom handler for unexpected messages.
 
void set_error_handler (error_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_error_handler (T fun) -> decltype(fun(std::declval< error &>()))
 Sets a custom handler for error messages.
 
void set_down_handler (down_handler fun)
 Sets a custom handler for down messages.
 
template<class T >
auto set_down_handler (T fun) -> decltype(fun(std::declval< down_msg &>()))
 Sets a custom handler for down messages.
 
void set_exit_handler (exit_handler fun)
 Sets a custom handler for error messages.
 
template<class T >
auto set_exit_handler (T fun) -> decltype(fun(std::declval< exit_msg &>()))
 Sets a custom handler for exit messages.
 
void set_exception_handler (exception_handler fun)
 Sets a custom exception handler for this actor. More...
 
template<class F >
std::enable_if< std::is_convertible< F, std::function< error(std::exception_ptr &)> >::value >::type set_exception_handler (F f)
 Sets a custom exception handler for this actor. More...
 
template<class Driver , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t>
make_source_result_t< typename Driver::downstream_manager_type, Ts... > make_source (std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={})
 Creates a new stream source by instantiating the default source implementation with Driver. More...
 
template<class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>>
make_source_result_t< DownstreamManager, Ts... > make_source (std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={})
 Creates a new stream source from given arguments. More...
 
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t<!detail::is_actor_handle< Init >::value &&Trait::valid, make_source_result_t< DownstreamManager > > make_source (Init init, Pull pull, Done done, Finalize finalize={}, policy::arg< DownstreamManager > token={})
 
template<class ActorHandle , class... Ts, class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t< detail::is_actor_handle< ActorHandle >::value, make_source_result_t< DownstreamManager > > make_source (const ActorHandle &dest, std::tuple< Ts... > xs, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={})
 Creates a new stream source and adds dest as first outbound path to it.
 
template<class ActorHandle , class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Pull>, class Trait = stream_source_trait_t<Pull>>
detail::enable_if_t< detail::is_actor_handle< ActorHandle >::value &&Trait::valid, make_source_result_t< DownstreamManager > > make_source (const ActorHandle &dest, Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager > token={})
 Creates a new stream source and adds dest as first outbound path to it.
 
template<class Driver , class Init , class Pull , class Done , class Finalize = unit_t>
Driver::source_ptr_type make_continuous_source (Init init, Pull pull, Done done, Finalize fin={})
 Creates a new continuous stream source by instantiating the default source implementation with `Driver. More...
 
template<class Init , class Pull , class Done , class Finalize = unit_t, class DownstreamManager = broadcast_downstream_manager< typename stream_source_trait_t<Pull>::output>>
stream_source_ptr< DownstreamManager > make_continuous_source (Init init, Pull pull, Done done, Finalize fin={}, policy::arg< DownstreamManager >={})
 Creates a new continuous stream source by instantiating the default source implementation with `Driver. More...
 
template<class Driver , class... Ts>
make_sink_result< typename Driver::input_type > make_sink (const stream< typename Driver::input_type > &src, Ts &&... xs)
 
template<class In , class Init , class Fun , class Finalize = unit_t, class Trait = stream_sink_trait_t<Fun>>
make_sink_result< In > make_sink (const stream< In > &in, Init init, Fun fun, Finalize fin={})
 
template<class Driver , class In , class... Ts, class... Us>
make_stage_result_t< In, typename Driver::downstream_manager_type, Ts... > make_stage (const stream< In > &src, std::tuple< Ts... > xs, Us &&... ys)
 
template<class In , class... Ts, class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
make_stage_result_t< In, DownstreamManager, Ts... > make_stage (const stream< In > &in, std::tuple< Ts... > xs, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={})
 
template<class In , class Init , class Fun , class Finalize = unit_t, class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
make_stage_result_t< In, DownstreamManager > make_stage (const stream< In > &in, Init init, Fun fun, Finalize fin={}, policy::arg< DownstreamManager > token={})
 
template<class Driver , class... Ts>
Driver::stage_ptr_type make_continuous_stage (Ts &&... xs)
 Returns a stream manager (implementing a continuous stage) without in- or outbound path. More...
 
template<class Init , class Fun , class Cleanup , class DownstreamManager = default_downstream_manager_t<Fun>, class Trait = stream_stage_trait_t<Fun>>
stream_stage_ptr< typename Trait::input, DownstreamManager > make_continuous_stage (Init init, Fun fun, Cleanup cleanup, policy::arg< DownstreamManager > token={})
 
void enqueue (strong_actor_ptr sender, message_id mid, message msg, execution_unit *host) override
 
virtual void enqueue (mailbox_element_ptr what, execution_unit *host)=0
 Enqueues a new message wrapped in a mailbox_element to the actor. More...
 

Protected Member Functions

virtual behavior make_behavior ()
 
- Protected Member Functions inherited from caf::io::abstract_broker
void init_broker ()
 
 abstract_broker (actor_config &cfg)
 
template<class Handle >
auto by_id (Handle hdl) -> optional< decltype(*ptr_of(hdl))>
 Returns a scribe or doorman identified by hdl.
 

Additional Inherited Members

- Static Public Member Functions inherited from caf::scheduled_actor
static void default_error_handler (pointer ptr, error &x)
 
static void default_down_handler (pointer ptr, down_msg &x)
 
static void default_exit_handler (pointer ptr, exit_msg &x)
 
static error default_exception_handler (pointer ptr, std::exception_ptr &x)
 
- Protected Types inherited from caf::io::abstract_broker
using doorman_map = std::unordered_map< accept_handle, intrusive_ptr< doorman > >
 
using scribe_map = std::unordered_map< connection_handle, intrusive_ptr< scribe > >
 
using datagram_servant_map = std::unordered_map< datagram_handle, intrusive_ptr< datagram_servant > >
 
- Protected Attributes inherited from caf::scheduled_actor
mailbox_type mailbox_
 Stores incoming messages.
 
detail::behavior_stack bhvr_stack_
 Stores user-defined callbacks for message handling.
 
uint64_t timeout_id_
 Identifies the timeout messages we are currently waiting for.
 
std::forward_list< pending_responseawaited_responses_
 Stores callbacks for awaited responses.
 
detail::unordered_flat_map< message_id, behaviormultiplexed_responses_
 Stores callbacks for multiplexed responses.
 
default_handler default_handler_
 Customization point for setting a default message callback.
 
error_handler error_handler_
 Customization point for setting a default error callback.
 
down_handler down_handler_
 Customization point for setting a default down_msg callback.
 
exit_handler exit_handler_
 Customization point for setting a default exit_msg callback.
 
stream_manager_map stream_managers_
 Stores stream managers for established streams.
 
stream_manager_map pending_stream_managers_
 Stores stream managers for pending streams, i.e., streams that have not yet received an ACK. More...
 
detail::tick_emitter stream_ticks_
 Controls batch and credit timeouts.
 
size_t max_batch_delay_ticks_
 Number of ticks per batch delay.
 
size_t credit_round_ticks_
 Number of ticks of each credit round.
 
detail::private_thread * private_thread_
 Pointer to a private thread object associated with a detached actor.
 
exception_handler exception_handler_
 Customization point for setting a default exception callback.
 

Detailed Description

Describes a dynamically typed broker.


The documentation for this class was generated from the following file: