pars 0.2.1
Loading...
Searching...
No Matches
server_backend.cpp
Go to the documentation of this file.
1/*
2Copyright (c) 2025 Giuseppe Roberti.
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without modification,
6are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice, this
9list of conditions and the following disclaimer.
10
112. Redistributions in binary form must reproduce the above copyright notice,
12this list of conditions and the following disclaimer in the documentation and/or
13other materials provided with the distribution.
14
153. Neither the name of the copyright holder nor the names of its contributors
16may be used to endorse or promote products derived from this software without
17specific prior written permission.
18
19THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29*/
30#include "event.h"
31#include "fib.h"
32
33namespace pars_example::apps
34{
35
36using namespace event;
37using namespace resource;
38
40class server_backend : public app::single<comp::backend>
41{
42private:
44
45 using parent_type = self_type;
46 using self = server_backend;
47
49
50 const milli cleanup_timeout{1000};
51 const milli rep_recv_timeout{-1};
52 const milli rep_send_timeout{1000};
53
55
56 component_type::connect_p connect_p;
57 int max_allowed{128};
58 int max_served{0};
59
61
62 std::atomic<int> tot_served{0};
63 app::state_machine<server_state> state = {server_state::creating};
65
67
68 using parent_type::parent_type;
69
71
72 void startup(int argc, char** argv) override
73 {
75
77 auto ts = state.tx(server_state::creating, server_state::initializing);
78
80 switch (argc)
81 {
82 case 5:
83 max_allowed = std::stoi(argv[4]);
84
85 [[fallthrough]];
86
87 case 4:
88 max_served = std::stoi(argv[3]);
89
90 [[fallthrough]];
91
92 case 3:
93 connect_p.service_cmode = net::cmode_from_string(argv[1]);
94
95 connect_p.service_addr = argv[2];
96
97 break;
98
99 default:
100 throw std::invalid_argument("Usage: ./server_backend service_cmode "
101 "service_addr [max_served [max_allowed]]");
102 }
103
105 hfs().on<fired, init>(&self::initialize, this);
106
107 hfs().on<fired, shutdown>(&self::terminate, this);
108
109 comp().rep().on<received, fib_requested>(&self::fire_compute, this);
110
111 comp().rep().on<fired, fib_requested>(&self::compute_answer, this);
112
113 comp().rep().on<sent, fib_computed>(&self::recv_next, this);
114
115 comp().rep().on<fired, creating_pipe>(&self::accept_pipe, this);
116
117 comp().rep().on<fired, pipe_created>(&self::log_new_pipe, this);
118
119 comp().rep().on<fired, pipe_removed>(&self::log_rem_pipe, this);
120
121 comp().rep().on<fired, network_error>(&self::close_pipe, this);
122
124 ts.commit();
125
126 pars::info(SL, "Application Started!");
127 }
128
130
132 void initialize(hf_arg<fired, init> fired)
133 {
134 auto ts = state.tx(server_state::initializing, server_state::running);
135
136 comp().init({.num_ctxs = max_allowed,
137 .rep_opts = {.recv_timeout = rep_recv_timeout.count(),
138 .send_timeout = rep_send_timeout.count()}});
139
140 comp().connect(connect_p);
141
142 ts.commit();
143
144 pars::info(SL, "Fired {}, Application Initialized!", fired.event());
145 }
146
148 void accept_pipe(hf_arg<fired, creating_pipe> fired)
149 {
150 state.ensure(server_state::running);
151
152 auto [ev, md] = fired.as_tuple();
153
154 auto resources_count = resources.count();
155
156 if (resources_count < max_allowed)
157 {
158 resources.emplace(md.pipe().id(), pipe_state::waiting_work);
159
160 pars::info(SL, "{}: Fired {}, Pipe Accepted! [# resources: {} < {}]", md,
161 ev, resources_count, max_allowed);
162 }
163 else
164 {
165 md.pipe().close().or_abort();
166
167 pars::info(SL, "{}: Fired {}, Pipe Rejected! [# resources: {} >= {}]", md,
168 ev, resources_count, max_allowed);
169 }
170 }
171
173 void log_new_pipe(hf_arg<fired, pipe_created> fired)
174 {
175 state.ensure(server_state::running);
176
177 auto [ev, md] = fired.as_tuple();
178
179 pars::info(SL, "{}: Fired {} [# resources: {}]", md, ev, resources.count());
180 }
181
183 void log_rem_pipe(hf_arg<fired, pipe_removed> fired)
184 {
185 state.ensure(server_state::running);
186
187 auto [ev, md] = fired.as_tuple();
188
189 pars::info(SL, "{}: Fired {} [# resources: {}]", md, ev, resources.count());
190 }
191
193 void fire_compute(hf_arg<received, fib_requested> recv)
194 {
195 state.ensure(server_state::running);
196
197 auto [ev, md] = recv.as_tuple();
198
199 auto locked = resources.locked_resource(md.pipe().id());
200
201 auto& pipe_resource = locked.resource();
202
204 if (pipe_resource.state.current() == pipe_state::working)
205 {
206 pars::info(SL, "{}: Received {}, Already Working: Returning ...", md, ev);
207
208 return;
209 }
210
211 auto ts =
212 pipe_resource.state.tx(pipe_state::waiting_work, pipe_state::working);
213
214 pipe_resource.save_tool(md.tool());
215
216 router().queue_fire(ev, md);
217
218 ts.commit();
219
220 pars::info(SL, "{}: Received {}, Fire {}!", md, ev, ev);
221 }
222
224 void compute_answer(hf_arg<fired, fib_requested> fired)
225 {
226 state.ensure(server_state::running);
227
228 auto [ev, md] = fired.as_tuple();
229
230 auto p = md.pipe();
231
232 auto locked = resources.locked_resource(p.id());
233
234 auto& pipe_resource = locked.resource();
235
236 auto ts = pipe_resource.state.tx(pipe_state::working, pipe_state::done);
237
239
240 std::size_t fib_n;
241
242 try
243 {
244 fib_n = compute::fib(ev.n, ev.use_fast_fib, md);
245 }
246 catch (const compute::stop_requested&)
247 {
248 locked.guard().unlock();
249
250 ts.rollback();
251
252 resources.delete_resource(p.id());
253
254 pars::info(SL, "{}: Fired {}, Stop Requested! [# resources: {}]", md, ev,
255 resources.count());
256
257 return;
258 }
259
261
262 auto out_ev = fib_computed{ev.work_id, fib_n};
263
264 auto& ctx = comp().rep().ctxs().of(pipe_resource.load_tool());
265
266 ctx.send(out_ev, p);
267
268 ts.commit();
269
270 pars::info(SL, "{}: Fired {}, Send {}!", md, ev, out_ev);
271 }
272
274 void recv_next(hf_arg<sent, fib_computed> sent)
275 {
276 state.ensure(server_state::running);
277
278 auto [ev, md] = sent.as_tuple();
279
280 auto& p = md.pipe();
281
282 resources.delete_resource(p.id());
283
284 if (++tot_served == max_served)
285 {
286 auto shutdown_ev = shutdown{};
287
288 router().queue_fire(shutdown_ev);
289
290 pars::info(SL, "{}: Sent {}, Fire {}! [{} succesfully served]", md, ev,
291 shutdown_ev, max_served);
292
293 return;
294 }
295
297
298 auto& ctx = comp().rep().ctxs().of(md.tool());
299
300 ctx.recv();
301
302 pars::info(SL, "{}: Sent {}, Receiving! [# served: {}]", md, ev,
303 tot_served.load());
304 }
305
307 void close_pipe(hf_arg<fired, network_error> fired)
308 {
309 state.ensure(server_state::running);
310
311 auto [ev, md] = fired.as_tuple();
312
313 auto& p = md.pipe();
314
315 resources.delete_resource(p.id());
316
317 p.close().or_abort();
318
319 auto& ctx = comp().rep().ctxs().of(md.tool());
320
321 ctx.recv();
322
323 auto& [e, dir] = ev;
324
325 pars::info(SL, "{}: Fired {} (during {}), Pipe Closed!", md, ev, dir);
326 }
327
329 void terminate(hf_arg<fired, shutdown> fired)
330 {
331 state.ensure(server_state::running);
332
334
335 pars::info(SL, "Fired {}, Graceful Terminated!", fired.event());
336
337 std::cout << "SHUTDOWN ..." << std::endl;
338 }
339};
340
341} // namespace pars_example::apps
342
343int main(int argc, char** argv)
344{
346
347 try
348 {
349 return app.exec(argc, argv);
350 }
351 catch (std::exception& e)
352 {
353 std::cout << std::format("Error: {}", e.what()) << "\n";
354
355 return EXIT_FAILURE;
356 }
357}
single< component_type > self_type
Definition single.h:47
ev::hf_registry & hfs()
Definition single.h:74
component_type & comp()
Definition single.h:70
void init(const init_p &params)
Definition backend.h:55
void connect(const connect_p &params)
Definition backend.h:68
net::rep & rep()
Definition backend.h:73
void queue_fire(event_t ev)
Definition enqueuer.h:52
context & of(const net::tool_view &t)
void on(void(class_t::*hf)(ev::hf_arg< kind_of, event_t >), class_t *self)
Definition rep.h:75
context_registry & ctxs()
Get the context_registry.
Definition rep.h:62
Runs the backend component as a single application (rep).
#define SL
Definition log.h:58
kind_of< event_t > hf_arg
Definition make_hf.h:46
void info(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:157
@ event
Definition flags.h:43
std::chrono::duration< nng_duration, std::milli > milli
Definition init.h:47
int main(int argc, char **argv)
state_type current() const
state_tx< state_type > tx(state_type to_state)
void on(void(class_t::*mem_fn)(hf_arg< kind_of, event_t >), class_t *self)
Definition hf_registry.h:74
void save_tool(net::tool_view t)
Definition common.h:85
app::state_machine< pipe_state > state
Definition common.h:74
const net::tool_view & load_tool() const
Definition common.h:87