| Coverage Report |
 |
|
|
 |
: /*
: Copyright (c) 2025 Giuseppe Roberti.
: All rights reserved.
:
: Redistribution and use in source and binary forms, with or without modification,
: are permitted provided that the following conditions are met:
:
: 1. Redistributions of source code must retain the above copyright notice, this
: list of conditions and the following disclaimer.
:
: 2. Redistributions in binary form must reproduce the above copyright notice,
: this list of conditions and the following disclaimer in the documentation and/or
: other materials provided with the distribution.
:
: 3. Neither the name of the copyright holder nor the names of its contributors
: may be used to endorse or promote products derived from this software without
: specific prior written permission.
:
: THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
: WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
: DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
: ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
: LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
: ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
: (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
: SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
: */
: #pragma once
:
: #include "nngxx/aio.h"
: #include "nngxx/ctx.h"
: #include "nngxx/dialer.h"
: #include "nngxx/listener.h"
: #include "nngxx/pipe.h"
:
: #include "pars/ev/enqueuer.h"
: #include "pars/net/op.h"
: #include "pars/net/socket_opt.h"
:
: #include <format>
: #include <string_view>
: #include <vector>
:
: namespace pars::net
: {
:
: enum class cmode
: {
: dial,
: listen
: };
:
: static cmode cmode_from_string(const char* str)
: {
0 / 1 : auto str_view = std::string_view(str);
:
0 / 1 : if (str_view.compare("dial") == 0)
0 / 1 : return cmode::dial;
0 / 1 : else if (str_view.compare("listen") == 0)
0 / 1 : return cmode::listen;
:
0 / 1 : throw std::runtime_error(std::format("Unable to parse {} to CMODE", str));
: }
:
: /**
: * @brief Represents an nng_socket
: */
: class socket
: {
: public:
: /// Construct a socket
: socket(ev::enqueuer& r, nngxx::socket&& s)
: : router_m{r}
: , socket_m{std::move(s)}
: {
0 / 1 : register_pipe_notify();
: }
:
0 / 1 : ~socket() { stop(); }
:
0 / 1 : operator tool_view() { return tool_view{socket_m}; }
:
: void set_options(const socket_opt opts)
: {
0 / 1 : if (opts.recv_timeout)
0 / 1 : socket_m.set_recv_timeout(*opts.recv_timeout).or_abort();
:
0 / 1 : if (opts.send_timeout)
0 / 1 : socket_m.set_send_timeout(*opts.send_timeout).or_abort();
:
0 / 1 : if (opts.req_resend_time)
0 / 1 : socket_m.set_req_resend_time(*opts.req_resend_time).or_abort();
:
0 / 1 : if (opts.req_resend_tick)
0 / 1 : socket_m.set_req_resend_tick(*opts.req_resend_tick).or_abort();
: }
:
: socket_opt options() const
: {
0 / 1 : return {
0 / 1 : .recv_timeout = socket_m.get_recv_timeout().value_or_abort(),
0 / 1 : .send_timeout = socket_m.get_send_timeout().value_or_abort(),
0 / 1 : .req_resend_time = socket_m.get_req_resend_time().value_or_abort(),
0 / 1 : .req_resend_tick = socket_m.get_req_resend_tick().value_or_abort(),
0 / 1 : };
: }
:
0 / 1 : void dial(const char* addr) { emplace_dialer(addr).start().or_abort(); }
:
0 / 1 : void listen(const char* addr) { emplace_listener(addr).start().or_abort(); }
:
: void connect(const char* addr, const cmode mode)
: {
0 / 1 : switch (mode)
: {
0 / 1 : case cmode::dial:
0 / 1 : dial(addr);
:
0 / 1 : break;
0 / 1 : case cmode::listen:
0 / 1 : listen(addr);
:
0 / 1 : break;
: }
: }
:
0 / 1 : nngxx::ctx make_ctx() { return nngxx::make_ctx(socket_m).value_or_abort(); }
:
0 / 1 : void send_aio(nngxx::aio_view& a) { socket_m.send(a); }
:
0 / 1 : void recv_aio(nngxx::aio_view& a) { socket_m.recv(a); }
:
: template<ev::event_c event_t>
: void send(event_t ev, pipe p = {})
: {
: op_m.send(router_m, *this, p, ev);
: }
:
0 / 1 : void recv() { op_m.recv(router_m, *this); }
:
0 / 1 : void stop() { op_m.stop(); }
:
0 / 1 : int id() const { return socket_m.id(); }
:
0 / 1 : int socket_id() const { return id(); }
:
: const char* proto_name() const
: {
0 / 1 : return socket_m.proto_name().value_or("<not-found>");
: }
:
: auto format_to(std::format_context& ctx) const -> decltype(ctx.out())
: {
0 / 1 : return std::format_to(ctx.out(), "Socket #{}-{}", id(), proto_name());
: }
:
: private:
: void pipe_cb(nng_pipe cp, nng_pipe_ev ev)
: {
0 / 1 : auto pv = nngxx::pipe_view{cp};
:
0 / 1 : switch (ev)
: {
0 / 1 : case NNG_PIPE_EV_ADD_PRE: {
0 / 1 : pars::debug(SL, lf::net, "Pipe 0x{:X} creating! [{}]", pv.id(), *this);
:
0 / 1 : router_m.queue_fire(ev::creating_pipe{}, id(), *this, net::pipe{pv});
: }
0 / 1 : break;
:
0 / 1 : case NNG_PIPE_EV_ADD_POST: {
0 / 1 : pars::debug(SL, lf::net, "Pipe 0x{:X} created! [{}]", pv.id(), *this);
:
0 / 1 : router_m.queue_fire(ev::pipe_created{}, id(), *this, net::pipe{pv});
: }
0 / 1 : break;
:
0 / 1 : case NNG_PIPE_EV_REM_POST: {
0 / 1 : pars::debug(SL, lf::net, "Pipe 0x{:X} removed! [{}]", pv.id(), *this);
:
0 / 1 : router_m.queue_fire(ev::pipe_removed{}, id(), *this, net::pipe{pv});
: }
0 / 1 : break;
:
0 / 1 : case NNG_PIPE_EV_NUM: {
0 / 1 : pars::debug(SL, lf::net, "Pipe 0x{:X} num notified", pv.id());
: }
0 / 1 : break;
: }
: }
:
: void register_pipe_notify()
: {
0 / 1 : static auto pipe_cb = [](nng_pipe p, nng_pipe_ev ev, void* self) {
0 / 1 : static_cast<socket*>(self)->pipe_cb(p, ev);
0 / 1 : };
:
0 / 1 : // NOTE: pass this, cant move socket
0 / 1 : socket_m.pipe_notify(nngxx::pipe_ev::add_pre, pipe_cb, this).or_abort();
0 / 1 : socket_m.pipe_notify(nngxx::pipe_ev::add_post, pipe_cb, this).or_abort();
0 / 1 : socket_m.pipe_notify(nngxx::pipe_ev::rem_post, pipe_cb, this).or_abort();
0 / 1 : socket_m.pipe_notify(nngxx::pipe_ev::num, pipe_cb, this).or_abort();
: }
:
: nngxx::listener& emplace_listener(const char* addr)
: {
0 / 1 : listeners_m.push_back(
0 / 1 : nngxx::make_listener(socket_m, addr).value_or_abort());
:
0 / 1 : return listeners_m.back();
: }
:
: nngxx::dialer& emplace_dialer(const char* addr)
: {
0 / 1 : dialers_m.push_back(nngxx::make_dialer(socket_m, addr).value_or_abort());
:
0 / 1 : return dialers_m.back();
: }
:
: ev::enqueuer& router_m;
: op op_m;
: nngxx::socket socket_m;
: std::vector<nngxx::dialer> dialers_m;
: std::vector<nngxx::listener> listeners_m;
: };
:
: } // namespace pars::net
 |
| Generated by: llvmcov2html |