55static cmode cmode_from_string(
const char* str)
57 auto str_view = std::string_view(str);
59 if (str_view.compare(
"dial") == 0)
61 else if (str_view.compare(
"listen") == 0)
64 throw std::runtime_error(std::format(
"Unable to parse {} to CMODE", str));
76 , socket_m{std::move(s)}
78 register_pipe_notify();
88 socket_m.set_recv_timeout(*opts.
recv_timeout).or_abort();
91 socket_m.set_send_timeout(*opts.
send_timeout).or_abort();
103 .recv_timeout = socket_m.get_recv_timeout().value_or_abort(),
104 .send_timeout = socket_m.get_send_timeout().value_or_abort(),
105 .req_resend_time = socket_m.get_req_resend_time().value_or_abort(),
106 .req_resend_tick = socket_m.get_req_resend_tick().value_or_abort(),
110 void dial(
const char* addr) { emplace_dialer(addr).start().or_abort(); }
112 void listen(
const char* addr) { emplace_listener(addr).start().or_abort(); }
135 template<ev::event_c event_t>
138 op_m.
send(router_m, *
this, p,
ev);
141 void recv() { op_m.recv(router_m, *
this); }
145 int id()
const {
return socket_m.id(); }
151 return socket_m.proto_name().value_or(
"<not-found>");
154 auto format_to(std::format_context& ctx)
const ->
decltype(ctx.out())
156 return std::format_to(ctx.out(),
"Socket #{}-{}",
id(),
proto_name());
160 void pipe_cb(nng_pipe cp, nng_pipe_ev
ev)
166 case NNG_PIPE_EV_ADD_PRE: {
173 case NNG_PIPE_EV_ADD_POST: {
176 router_m.queue_fire(ev::pipe_created{},
id(), *
this, net::pipe{pv});
180 case NNG_PIPE_EV_REM_POST: {
183 router_m.queue_fire(ev::pipe_removed{},
id(), *
this, net::pipe{pv});
187 case NNG_PIPE_EV_NUM: {
194 void register_pipe_notify()
196 static auto pipe_cb = [](nng_pipe p, nng_pipe_ev ev,
void* self) {
197 static_cast<socket*
>(self)->pipe_cb(p, ev);
209 listeners_m.push_back(
210 nngxx::make_listener(socket_m, addr).value_or_abort());
212 return listeners_m.back();
217 dialers_m.push_back(nngxx::make_dialer(socket_m, addr).value_or_abort());
219 return dialers_m.back();
222 ev::enqueuer& router_m;
225 std::vector<nngxx::dialer> dialers_m;
226 std::vector<nngxx::listener> listeners_m;
void queue_fire(event_t ev)
void send(ev::enqueuer &r, tool_t &t, pipe p, event_t ev)
socket(ev::enqueuer &r, nngxx::socket &&s)
Construct a socket.
void set_options(const socket_opt opts)
void send_aio(nngxx::aio_view &a)
void send(event_t ev, pipe p={})
void connect(const char *addr, const cmode mode)
const char * proto_name() const
auto format_to(std::format_context &ctx) const -> decltype(ctx.out())
void listen(const char *addr)
socket_opt options() const
void dial(const char *addr)
void recv_aio(nngxx::aio_view &a)
clev::own< nng_socket > socket
clev::own< nng_listener > listener
clev::own< nng_dialer > dialer
clev::iface< nng_pipe > pipe_view
clev::iface< nng_aio * > aio_view
void debug(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Represents the options that can be configured for a given socket.
std::optional< nng_duration > req_resend_time
NNG_OPT_REQ_RESENDTIME.
std::optional< nng_duration > recv_timeout
NNG_OPT_RECVTIMEO.
std::optional< nng_duration > req_resend_tick
NNG_OPT_REQ_RESENDTICK.
std::optional< nng_duration > send_timeout
NNG_OPT_SENDTIMEO.