pars 0.2.1.99
Loading...
Searching...
No Matches
runner.h
Go to the documentation of this file.
1/*
2Copyright (c) 2025 Giuseppe Roberti.
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without modification,
6are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice, this
9list of conditions and the following disclaimer.
10
112. Redistributions in binary form must reproduce the above copyright notice,
12this list of conditions and the following disclaimer in the documentation and/or
13other materials provided with the distribution.
14
153. Neither the name of the copyright holder nor the names of its contributors
16may be used to endorse or promote products derived from this software without
17specific prior written permission.
18
19THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29*/
30#pragma once
31
32#include "pars/ev/event.h"
33#include "pars/ev/hf_registry.h"
34#include "pars/ev/job.h"
35
36#include <format>
37#include <functional>
38#include <future>
39#include <thread>
40#include <unordered_map>
41
42namespace pars::ev
43{
44
45struct runner
46{
47public:
49 : hf_registry_m{hfs}
50 {
51 }
52
53 std::size_t next_job_id() { return next_job_id_m++; }
54
55 void start_thread(std::size_t spec_hash,
56 std::packaged_task<void(std::stop_token, job)> task, job j)
57 {
58 auto j_id = j.id();
59
60 auto s_id = j.socket_id();
61
62 auto guard = std::lock_guard{mtx_m};
63
64 futures_m.emplace_back(j_id, s_id, spec_hash, task.get_future());
65
66 auto th_res = threads_m.try_emplace(j_id, std::move(task), std::move(j));
67
68 if (!th_res.second)
69 futures_m.pop_back();
70 }
71
73 {
74 auto guard = std::lock_guard{mtx_m};
75
76 return futures_m.size();
77 }
78
80 {
81 auto guard = std::lock_guard{mtx_m};
82
83 for (const auto& t : threads_m)
84 {
85 if (stop_possible(t.first))
86 request_stop(t.first);
87 }
88
89 while (!futures_m.empty())
90 process_futures();
91
92 pipe_jobs_m.clear();
93 }
94
95 bool can_exec(int s_id, std::size_t spec_hash)
96 {
97 auto lock = hf_registry_m.lock();
98
99 return hf_registry_m.has_handler_for(s_id, spec_hash);
100 }
101
102 void exec(job j)
103 {
104 auto spec_hash = j.spec_hash();
105
106 auto s_id = j.socket_id();
107
108 auto lock = hf_registry_m.lock();
109
110 if (!hf_registry_m.has_handler_for(s_id, spec_hash))
111 {
113 "Unable to find handler for Spec 0x{:X} on Socket {}, skip "
114 "message ...",
115 spec_hash, s_id);
116
117 return;
118 }
119
120 auto& hf = hf_registry_m.handler_for(s_id, spec_hash);
121
122 lock.unlock();
123
124 pars::debug(SL, lf::event, "Job #{}: Running Handler [{}]", j.id(),
125 demangle(hf_registry_m.type_for(spec_hash)->name()));
126
127 try
128 {
129 {
130 auto guard = std::lock_guard{mtx_m};
131
132 process_futures();
133 }
134
135 hf(std::move(j));
136 }
137 catch (...)
138 {
139 process_exception(s_id, spec_hash);
140 }
141 }
142
143 template<template<typename> typename kind_of, event_c event_t>
144 requires kind_c<kind_of>
145 void exec(kind_of<event_t> ke)
146 {
147 exec(make_job(next_job_id(), std::move(ke)));
148 }
149
150 void add_pipe(const net::pipe& p)
151 {
152 auto guard = std::lock_guard{mtx_m};
153
154 auto p_id = p.id();
155
156 pipe_jobs_m.insert({p_id, {}});
157 }
158
160 void remove_pipe(const net::pipe& p)
161 {
162 auto guard = std::lock_guard{mtx_m};
163
164 auto p_id = p.id();
165
166 if (!pipe_jobs_m.at(p_id).empty())
167 {
168 // stop all running jobs for pipe p
169 stop_runnings(pipe_jobs_m.at(p_id));
170 }
171
172 pipe_jobs_m.erase(p_id);
173 }
174
175 void associate_job_to_pipe(const int j_id, const int p_id)
176 {
177 if (j_id <= 0)
178 throw std::runtime_error(std::format("Job #{}: invalid Job!", p_id));
179
180 if (p_id <= 0)
181 return;
182
183 auto guard = std::lock_guard{mtx_m};
184
185 // add j_id to the jobs related to p, unless p was removed with a previous
186 // call to remove_pipe(p) (ie: pipe_jobs_m does not contains p.id())
187 if (pipe_jobs_m.contains(p_id))
188 {
189 pipe_jobs_m.at(p_id).push_back(j_id);
190
192 "Job #{} pushed and associated to Pipe {:X} [# size: {}]",
193 j_id, p_id, pipe_jobs_m.at(p_id).size());
194 }
195 }
196
197 void stop_thread(const int j_id)
198 {
199 auto guard = std::lock_guard(mtx_m);
200
201 if (stop_possible(j_id))
202 request_stop(j_id);
203 }
204
205private:
206 std::unordered_set<std::size_t> stop_runnings(std::vector<std::size_t> j_ids)
207 {
208 std::unordered_set<std::size_t> pending_jobs;
209
210 for (const auto& j_id : j_ids)
211 {
212 if (thread_running(j_id))
213 {
214 // stop running job, if possible
215 if (stop_possible(j_id))
216 request_stop(j_id);
217 }
218 else
219 {
220 pending_jobs.insert(j_id);
221 }
222 }
223
224 return pending_jobs;
225 }
226
227 void process_futures()
228 {
229 for (auto it = futures_m.begin(); it != futures_m.end();)
230 {
231 using namespace std::chrono_literals;
232
233 auto& [j_id, s_id, spec_hash, f] = *it;
234
235 if (f.wait_for(0ms) == std::future_status::ready)
236 {
237 try
238 {
239 f.get();
240 }
241 catch (...)
242 {
244 "Job #{}: Throws, processing exceptions !!", j_id);
245
246 process_exception(s_id, spec_hash);
247 }
248
249 threads_m.erase(j_id);
250
251 pars::debug(SL, lf::event, "Job #{}: Done! [# futures: {}]", j_id,
252 futures_m.size());
253
254 it = futures_m.erase(it);
255 }
256 else
257 {
258 ++it;
259 }
260 }
261 }
262
263 void process_exception(auto s_id, auto spec_hash)
264 {
265 auto e_ptr = std::current_exception();
266
267 try
268 {
269 std::rethrow_exception(e_ptr);
270 }
271 catch (std::exception& e)
272 {
273 pars::err(SL, lf::event, "Handler for 0x{:X} throws: {}", spec_hash,
274 e.what());
275 }
276 catch (...)
277 {
278 pars::err(SL, lf::event, "Handler for 0x{:X} throws: Unknown Exception",
279 spec_hash);
280 }
281
282 auto e_hash = spec<fired<exception>>::hash;
283
284 auto lock = hf_registry_m.lock();
285
286 if (!hf_registry_m.has_handler_for(s_id, e_hash))
287 return;
288
289 auto& hf = hf_registry_m.handler_for(s_id, e_hash);
290
291 lock.unlock();
292
293 try
294 {
295 hf(make_job(next_job_id(), fired{exception{e_ptr}, {}}));
296 }
297 catch (std::exception& e)
298 {
299 pars::err(SL, lf::event, "Exception Handler for 0x{:X} throws: {}",
300 spec_hash, e.what());
301 }
302 catch (...)
303 {
305 "Exeption Handler for 0x{:X} throws: Unknown Exception",
306 spec_hash);
307 }
308 }
309
310 void request_stop(const std::size_t j_id)
311 {
312 threads_m[j_id].get_stop_source().request_stop();
313 }
314
315 bool thread_running(const std::size_t j_id)
316 {
317 return threads_m.contains(j_id);
318 }
319
320 bool stop_possible(const std::size_t j_id)
321 {
322 return threads_m[j_id].get_stop_token().stop_possible();
323 }
324
325 std::mutex mtx_m;
326
327 using futures_value_type =
328 std::tuple<std::size_t, int, std::size_t, std::future<void>>;
329
330 std::vector<futures_value_type>
331 futures_m;
333
334 std::unordered_map<std::size_t, std::jthread>
335 threads_m;
336
337 hf_registry& hf_registry_m;
338
339 std::unordered_map<int, std::vector<std::size_t>>
340 pipe_jobs_m;
341
342 std::atomic<std::size_t> next_job_id_m{0};
343};
344
345} // namespace pars::ev
std::size_t id() const
Definition job.h:89
int socket_id() const
Definition job.h:91
std::size_t spec_hash() const
Definition job.h:93
int id() const noexcept
Definition pipe.h:58
#define SL
Definition log.h:58
fired(event_t, metadata< fired, event_t >) -> fired< event_t >
@ event
Definition flags.h:43
void err(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:211
void debug(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:129
void stop_all_threads()
Definition runner.h:79
runner(hf_registry &hfs)
Definition runner.h:48
void add_pipe(const net::pipe &p)
Definition runner.h:150
bool can_exec(int s_id, std::size_t spec_hash)
Definition runner.h:95
std::size_t next_job_id()
Definition runner.h:53
void exec(kind_of< event_t > ke)
Definition runner.h:145
void exec(job j)
Definition runner.h:102
auto count_threads()
Definition runner.h:72
void remove_pipe(const net::pipe &p)
stop all running and remove all pending jobs for pipe p
Definition runner.h:160
void associate_job_to_pipe(const int j_id, const int p_id)
Definition runner.h:175
void start_thread(std::size_t spec_hash, std::packaged_task< void(std::stop_token, job)> task, job j)
Definition runner.h:55
void stop_thread(const int j_id)
Definition runner.h:197