pars 0.2.1.99
Loading...
Searching...
No Matches
socket.h
Go to the documentation of this file.
1/*
2Copyright (c) 2025 Giuseppe Roberti.
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without modification,
6are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice, this
9list of conditions and the following disclaimer.
10
112. Redistributions in binary form must reproduce the above copyright notice,
12this list of conditions and the following disclaimer in the documentation and/or
13other materials provided with the distribution.
14
153. Neither the name of the copyright holder nor the names of its contributors
16may be used to endorse or promote products derived from this software without
17specific prior written permission.
18
19THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29*/
30#pragma once
31
32#include "nngxx/aio.h"
33#include "nngxx/ctx.h"
34#include "nngxx/dialer.h"
35#include "nngxx/listener.h"
36#include "nngxx/pipe.h"
37
38#include "pars/ev/enqueuer.h"
39#include "pars/net/op.h"
40#include "pars/net/socket_opt.h"
41
42#include <format>
43#include <string_view>
44#include <vector>
45
46namespace pars::net
47{
48
49enum class cmode
50{
53};
54
55static cmode cmode_from_string(const char* str)
56{
57 auto str_view = std::string_view(str);
58
59 if (str_view.compare("dial") == 0)
60 return cmode::dial;
61 else if (str_view.compare("listen") == 0)
62 return cmode::listen;
63
64 throw std::runtime_error(std::format("Unable to parse {} to CMODE", str));
65}
66
70class socket
71{
72public:
75 : router_m{r}
76 , socket_m{std::move(s)}
77 {
78 register_pipe_notify();
79 }
80
81 ~socket() { stop(); }
82
83 operator tool_view() { return tool_view{socket_m}; }
84
85 void set_options(const socket_opt opts)
86 {
87 if (opts.recv_timeout)
88 socket_m.set_recv_timeout(*opts.recv_timeout).or_abort();
89
90 if (opts.send_timeout)
91 socket_m.set_send_timeout(*opts.send_timeout).or_abort();
92
93 if (opts.req_resend_time)
94 socket_m.set_req_resend_time(*opts.req_resend_time).or_abort();
95
96 if (opts.req_resend_tick)
97 socket_m.set_req_resend_tick(*opts.req_resend_tick).or_abort();
98 }
99
101 {
102 return {
103 .recv_timeout = socket_m.get_recv_timeout().value_or_abort(),
104 .send_timeout = socket_m.get_send_timeout().value_or_abort(),
105 .req_resend_time = socket_m.get_req_resend_time().value_or_abort(),
106 .req_resend_tick = socket_m.get_req_resend_tick().value_or_abort(),
107 };
108 }
109
110 void dial(const char* addr) { emplace_dialer(addr).start().or_abort(); }
111
112 void listen(const char* addr) { emplace_listener(addr).start().or_abort(); }
113
114 void connect(const char* addr, const cmode mode)
115 {
116 switch (mode)
117 {
118 case cmode::dial:
119 dial(addr);
120
121 break;
122 case cmode::listen:
123 listen(addr);
124
125 break;
126 }
127 }
128
129 nngxx::ctx make_ctx() { return nngxx::make_ctx(socket_m).value_or_abort(); }
130
131 void send_aio(nngxx::aio_view& a) { socket_m.send(a); }
132
133 void recv_aio(nngxx::aio_view& a) { socket_m.recv(a); }
134
135 template<ev::event_c event_t>
136 void send(event_t ev, pipe p = {})
137 {
138 op_m.send(router_m, *this, p, ev);
139 }
140
141 void recv() { op_m.recv(router_m, *this); }
142
143 void stop() { op_m.stop(); }
144
145 int id() const { return socket_m.id(); }
146
147 int socket_id() const { return id(); }
148
149 const char* proto_name() const
150 {
151 return socket_m.proto_name().value_or("<not-found>");
152 }
153
154 auto format_to(std::format_context& ctx) const -> decltype(ctx.out())
155 {
156 return std::format_to(ctx.out(), "Socket #{}-{}", id(), proto_name());
157 }
158
159private:
160 void pipe_cb(nng_pipe cp, nng_pipe_ev ev)
161 {
162 auto pv = nngxx::pipe_view{cp};
163
164 switch (ev)
165 {
166 case NNG_PIPE_EV_ADD_PRE: {
167 pars::debug(SL, lf::net, "Pipe 0x{:X} creating! [{}]", pv.id(), *this);
168
169 router_m.queue_fire(ev::creating_pipe{}, id(), *this, net::pipe{pv});
170 }
171 break;
172
173 case NNG_PIPE_EV_ADD_POST: {
174 pars::debug(SL, lf::net, "Pipe 0x{:X} created! [{}]", pv.id(), *this);
175
176 router_m.queue_fire(ev::pipe_created{}, id(), *this, net::pipe{pv});
177 }
178 break;
179
180 case NNG_PIPE_EV_REM_POST: {
181 pars::debug(SL, lf::net, "Pipe 0x{:X} removed! [{}]", pv.id(), *this);
182
183 router_m.queue_fire(ev::pipe_removed{}, id(), *this, net::pipe{pv});
184 }
185 break;
186
187 case NNG_PIPE_EV_NUM: {
188 pars::debug(SL, lf::net, "Pipe 0x{:X} num notified", pv.id());
189 }
190 break;
191 }
192 }
193
194 void register_pipe_notify()
195 {
196 static auto pipe_cb = [](nng_pipe p, nng_pipe_ev ev, void* self) {
197 static_cast<socket*>(self)->pipe_cb(p, ev);
198 };
199
200 // NOTE: pass this, cant move socket
201 socket_m.pipe_notify(nngxx::pipe_ev::add_pre, pipe_cb, this).or_abort();
202 socket_m.pipe_notify(nngxx::pipe_ev::add_post, pipe_cb, this).or_abort();
203 socket_m.pipe_notify(nngxx::pipe_ev::rem_post, pipe_cb, this).or_abort();
204 socket_m.pipe_notify(nngxx::pipe_ev::num, pipe_cb, this).or_abort();
205 }
206
207 nngxx::listener& emplace_listener(const char* addr)
208 {
209 listeners_m.push_back(
210 nngxx::make_listener(socket_m, addr).value_or_abort());
211
212 return listeners_m.back();
213 }
214
215 nngxx::dialer& emplace_dialer(const char* addr)
216 {
217 dialers_m.push_back(nngxx::make_dialer(socket_m, addr).value_or_abort());
218
219 return dialers_m.back();
220 }
221
222 ev::enqueuer& router_m;
223 op op_m;
224 nngxx::socket socket_m;
225 std::vector<nngxx::dialer> dialers_m;
226 std::vector<nngxx::listener> listeners_m;
227};
228
229} // namespace pars::net
void queue_fire(event_t ev)
Definition enqueuer.h:52
void send(ev::enqueuer &r, tool_t &t, pipe p, event_t ev)
Definition op.h:65
socket(ev::enqueuer &r, nngxx::socket &&s)
Construct a socket.
Definition socket.h:74
void set_options(const socket_opt opts)
Definition socket.h:85
void send_aio(nngxx::aio_view &a)
Definition socket.h:131
void send(event_t ev, pipe p={})
Definition socket.h:136
void connect(const char *addr, const cmode mode)
Definition socket.h:114
const char * proto_name() const
Definition socket.h:149
int socket_id() const
Definition socket.h:147
auto format_to(std::format_context &ctx) const -> decltype(ctx.out())
Definition socket.h:154
void listen(const char *addr)
Definition socket.h:112
socket_opt options() const
Definition socket.h:100
int id() const
Definition socket.h:145
void dial(const char *addr)
Definition socket.h:110
nngxx::ctx make_ctx()
Definition socket.h:129
void recv_aio(nngxx::aio_view &a)
Definition socket.h:133
Represents an nng_socket or nng_ctx view.
Definition tool_view.h:48
#define SL
Definition log.h:58
clev::own< nng_ctx > ctx
Definition ctx.h:39
clev::own< nng_socket > socket
Definition socket_decl.h:45
clev::own< nng_listener > listener
Definition listener.h:39
clev::own< nng_dialer > dialer
Definition dialer.h:39
clev::iface< nng_pipe > pipe_view
Definition pipe.h:37
clev::iface< nng_aio * > aio_view
Definition aio.h:37
@ net
Definition flags.h:44
void debug(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:129
Represents the options that can be configured for a given socket.
Definition socket_opt.h:48
std::optional< nng_duration > req_resend_time
NNG_OPT_REQ_RESENDTIME.
Definition socket_opt.h:56
std::optional< nng_duration > recv_timeout
NNG_OPT_RECVTIMEO.
Definition socket_opt.h:51
std::optional< nng_duration > req_resend_tick
NNG_OPT_REQ_RESENDTICK.
Definition socket_opt.h:57
std::optional< nng_duration > send_timeout
NNG_OPT_SENDTIMEO.
Definition socket_opt.h:52