pars 0.2.1
Loading...
Searching...
No Matches
op.h
Go to the documentation of this file.
1/*
2Copyright (c) 2025 Giuseppe Roberti.
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without modification,
6are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice, this
9list of conditions and the following disclaimer.
10
112. Redistributions in binary form must reproduce the above copyright notice,
12this list of conditions and the following disclaimer in the documentation and/or
13other materials provided with the distribution.
14
153. Neither the name of the copyright holder nor the names of its contributors
16may be used to endorse or promote products derived from this software without
17specific prior written permission.
18
19THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29*/
30#pragma once
31
32#include "nngxx/aio.h"
33#include "nngxx/err.h"
34#include "nngxx/msg.h"
35#include "nngxx/pipe.h"
36
37#include "pars/ev/enqueuer.h"
38#include "pars/ev/event.h"
39#include "pars/ev/serializer.h"
40#include "pars/fmt/helpers.h"
41#include "pars/net/dir.h"
42
43#include <spdlog/spdlog.h>
44
45#include <expected>
46#include <functional>
47
48namespace pars::net
49{
50
51using cb_f = std::function<void(clev::expected<void>, nngxx::msg)>;
52
53class op
54{
55public:
57 {
58 if (aio_m)
59 aio_m.wait();
60 }
61
62 explicit operator bool() { return static_cast<bool>(aio_m); }
63
64 template<ev::event_c event_t, tool_c tool_t>
65 void send(ev::enqueuer& r, tool_t& t, pipe p, event_t ev)
66 {
68
69 if (p)
70 m.set_pipe(p);
71
72 pars::debug(SL, lf::net, "{}: Send Message [{}]!", f::pntl{p, t}, m);
73
74 // replace the callback with the new one
75 cb_m = [&, p](clev::expected<void> res, nngxx::msg m) mutable {
76 if (res)
77 {
78 // NOTE: m is empty on success
79
80 pars::debug(SL, lf::net, "{}: Sent Event [{}]!", f::pntl{p, t}, ev);
81
82 r.queue_sent(std::move(ev), t.socket_id(), t, p);
83 }
84 else
85 {
86 // NOTE: m is not empty on failure
87
88 auto pv = m.get_pipe();
89
90 pars::err(SL, lf::net, "{}: Error Sending {}! [msg:{},err:{}]",
91 f::pntl{pv, t}, nametype(ev), m, res.error());
92
93 r.queue_fire(ev::network_error{res.error(), dir::out}, t.socket_id(), t,
94 pv);
95 }
96 };
97
98 // make aio - NOTE: pass this, cant move op
99 aio_m = nngxx::make_aio(op::send_cb, this).value_or_abort();
100
101 // start send
102 aio_m.set_msg(std::move(m));
103 t.send_aio(aio_m);
104 }
105
106 template<tool_c tool_t>
107 void recv(ev::enqueuer& r, tool_t& t)
108 {
109 pars::debug(SL, lf::net, "{}: Receive Message!", f::pntl{{}, t});
110
111 // replace the operation with the new one
112 cb_m = [&](clev::expected<void> res, nngxx::msg m) {
113 if (res)
114 {
115 // NOTE: m is not empty on success
116
117 auto pv = m.get_pipe();
118
119 pars::debug(SL, lf::net, "{}: Received Message! [{}]", f::pntl{pv, t},
120 m);
121
122 r.queue_received(std::move(m), t.socket_id(), t, pv);
123 }
124 else
125 {
126 // NOTE: m is empty on failure
127
128 auto pv = nngxx::pipe_view();
129
130 pars::err(SL, lf::net, "{}: Error Receiving! [{}]", f::pntl{pv, t},
131 res.error());
132
133 r.queue_fire(ev::network_error{res.error(), dir::in}, t.socket_id(), t,
134 pv);
135 }
136 };
137
138 // make aio - NOTE: pass this, cant move op
139 aio_m = nngxx::make_aio(op::recv_cb, this).value_or_abort();
140
141 // start recv
142 t.recv_aio(aio_m);
143 }
144
145 void sleep(nng_duration ms, std::function<void()> f)
146 {
147 cb_m = [&, f](clev::expected<void> res, nngxx::msg m) {
148 if (res)
149 {
150 // the sleep completed successfully, execute f
151 f();
152 }
153 };
154
155 // make aio - NOTE: pass this, cant move op
156 aio_m = nngxx::make_aio(op::sleep_cb, this).value_or_abort();
157
158 nngxx::sleep(ms, aio_m);
159 }
160
161 void reset_sleep(nng_duration ms)
162 {
163 stop();
164
165 // make aio - NOTE: pass this, cant move op
166 aio_m = nngxx::make_aio(op::sleep_cb, this).value_or_abort();
167
168 nngxx::sleep(ms, aio_m);
169 }
170
179 std::error_code result() const
180 {
181 return aio_m.result().error_or(nngxx::err::success);
182 }
183
194 void abort(nngxx::err err) { aio_m.abort(err); }
195
213 void cancel() { aio_m.cancel(); }
214
222 void wait() const { aio_m.wait(); }
223
234 void stop() { aio_m.stop(); }
235
236private:
237 static void send_cb(void* arg)
238 {
239 auto self = static_cast<op*>(arg);
240
241 // get the result
242 auto res = self->aio_m.result();
243
244 nngxx::msg msg;
245
246 if (!res)
247 {
248 // take ownership of the message
249 msg = self->aio_m.release_msg();
250 }
251
252 // execute the callback passing ownership of the message
253 self->cb_m(res, std::move(msg));
254 }
255
256 static void recv_cb(void* arg)
257 {
258 auto self = static_cast<op*>(arg);
259
260 // get the result
261 auto res = self->aio_m.result();
262
263 nngxx::msg msg;
264
265 if (res)
266 {
267 // take ownership of the message
268 msg = self->aio_m.release_msg();
269 }
270
271 // execute the callback passing ownership of the message
272 self->cb_m(res, std::move(msg));
273 }
274
275 static void sleep_cb(void* arg)
276 {
277 auto self = static_cast<op*>(arg);
278
279 auto res = self->aio_m.result();
280
281 if (res)
282 self->cb_m(res, nngxx::msg{});
283 }
284
285 nngxx::aio aio_m;
286 cb_f cb_m;
287};
288
289} // namespace pars::net
void queue_fire(event_t ev)
Definition enqueuer.h:52
void queue_sent(event_t ev, int s_id, tool_t &t, net::pipe p)
Definition enqueuer.h:92
void queue_received(nngxx::msg m, int s_id, tool_t &t, net::pipe p)
Definition enqueuer.h:98
void stop()
stop asynchronous I/O operation
Definition op.h:234
void send(ev::enqueuer &r, tool_t &t, pipe p, event_t ev)
Definition op.h:65
void recv(ev::enqueuer &r, tool_t &t)
Definition op.h:107
void reset_sleep(nng_duration ms)
Definition op.h:161
std::error_code result() const
return result of asynchronous operation
Definition op.h:179
void wait() const
wait for asynchronous I/O operation
Definition op.h:222
void sleep(nng_duration ms, std::function< void()> f)
Definition op.h:145
void abort(nngxx::err err)
abort asynchronous I/O operation
Definition op.h:194
void cancel()
cancel asynchronous I/O operation
Definition op.h:213
#define SL
Definition log.h:58
clev::own< nng_msg * > msg
Definition msg.h:39
err
Definition err.h:74
@ success
Definition err.h:75
clev::iface< nng_pipe > pipe_view
Definition pipe.h:37
clev::own< nng_aio * > aio
Definition aio.h:39
std::function< void(clev::expected< void >, nngxx::msg)> cb_f
Definition op.h:51
@ net
Definition flags.h:44
void err(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:211
void debug(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:129
std::string nametype()
Definition nametype.h:37
static nngxx::msg to_network(event_t &ev)
Definition serializer.h:53