56 std::packaged_task<
void(std::stop_token,
job)> task,
job j)
62 auto guard = std::lock_guard{mtx_m};
64 futures_m.emplace_back(j_id, s_id, spec_hash, task.get_future());
66 auto th_res = threads_m.try_emplace(j_id, std::move(task), std::move(j));
74 auto guard = std::lock_guard{mtx_m};
76 return futures_m.size();
81 auto guard = std::lock_guard{mtx_m};
83 for (
const auto& t : threads_m)
85 if (stop_possible(t.first))
86 request_stop(t.first);
89 while (!futures_m.empty())
97 auto lock = hf_registry_m.lock();
99 return hf_registry_m.has_handler_for(s_id, spec_hash);
108 auto lock = hf_registry_m.lock();
110 if (!hf_registry_m.has_handler_for(s_id, spec_hash))
113 "Unable to find handler for Spec 0x{:X} on Socket {}, skip "
120 auto& hf = hf_registry_m.handler_for(s_id, spec_hash);
125 demangle(hf_registry_m.type_for(spec_hash)->name()));
130 auto guard = std::lock_guard{mtx_m};
139 process_exception(s_id, spec_hash);
143 template<
template<
typename>
typename kind_of,
event_c event_t>
152 auto guard = std::lock_guard{mtx_m};
156 pipe_jobs_m.insert({p_id, {}});
162 auto guard = std::lock_guard{mtx_m};
166 if (!pipe_jobs_m.at(p_id).empty())
169 stop_runnings(pipe_jobs_m.at(p_id));
172 pipe_jobs_m.erase(p_id);
178 throw std::runtime_error(std::format(
"Job #{}: invalid Job!", p_id));
183 auto guard = std::lock_guard{mtx_m};
187 if (pipe_jobs_m.contains(p_id))
189 pipe_jobs_m.at(p_id).push_back(j_id);
192 "Job #{} pushed and associated to Pipe {:X} [# size: {}]",
193 j_id, p_id, pipe_jobs_m.at(p_id).size());
199 auto guard = std::lock_guard(mtx_m);
201 if (stop_possible(j_id))
206 std::unordered_set<std::size_t> stop_runnings(std::vector<std::size_t> j_ids)
208 std::unordered_set<std::size_t> pending_jobs;
210 for (
const auto& j_id : j_ids)
212 if (thread_running(j_id))
215 if (stop_possible(j_id))
220 pending_jobs.insert(j_id);
227 void process_futures()
229 for (
auto it = futures_m.begin(); it != futures_m.end();)
231 using namespace std::chrono_literals;
233 auto& [j_id, s_id, spec_hash, f] = *it;
235 if (f.wait_for(0ms) == std::future_status::ready)
244 "Job #{}: Throws, processing exceptions !!", j_id);
246 process_exception(s_id, spec_hash);
249 threads_m.erase(j_id);
254 it = futures_m.erase(it);
263 void process_exception(
auto s_id,
auto spec_hash)
265 auto e_ptr = std::current_exception();
269 std::rethrow_exception(e_ptr);
271 catch (std::exception& e)
282 auto e_hash = spec<fired<exception>>::hash;
284 auto lock = hf_registry_m.lock();
286 if (!hf_registry_m.has_handler_for(s_id, e_hash))
289 auto& hf = hf_registry_m.handler_for(s_id, e_hash);
297 catch (std::exception& e)
300 spec_hash, e.what());
305 "Exeption Handler for 0x{:X} throws: Unknown Exception",
310 void request_stop(
const std::size_t j_id)
312 threads_m[j_id].get_stop_source().request_stop();
315 bool thread_running(
const std::size_t j_id)
317 return threads_m.contains(j_id);
320 bool stop_possible(
const std::size_t j_id)
322 return threads_m[j_id].get_stop_token().stop_possible();
327 using futures_value_type =
328 std::tuple<std::size_t, int, std::size_t, std::future<void>>;
330 std::vector<futures_value_type>
334 std::unordered_map<std::size_t, std::jthread>
337 hf_registry& hf_registry_m;
339 std::unordered_map<int, std::vector<std::size_t>>
342 std::atomic<std::size_t> next_job_id_m{0};