| Coverage Report |
 |
|
|
 |
: /*
: Copyright (c) 2025 Giuseppe Roberti.
: All rights reserved.
:
: Redistribution and use in source and binary forms, with or without modification,
: are permitted provided that the following conditions are met:
:
: 1. Redistributions of source code must retain the above copyright notice, this
: list of conditions and the following disclaimer.
:
: 2. Redistributions in binary form must reproduce the above copyright notice,
: this list of conditions and the following disclaimer in the documentation and/or
: other materials provided with the distribution.
:
: 3. Neither the name of the copyright holder nor the names of its contributors
: may be used to endorse or promote products derived from this software without
: specific prior written permission.
:
: THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
: WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
: DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
: ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
: (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
: LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
: ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
: (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
: SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
: */
: #pragma once
:
: #include "pars/ev/event.h"
: #include "pars/ev/hf_registry.h"
: #include "pars/ev/job.h"
:
: #include <format>
: #include <functional>
: #include <future>
: #include <thread>
: #include <unordered_map>
:
: namespace pars::ev
: {
:
: struct runner
: {
: public:
: runner(hf_registry& hfs)
: : hf_registry_m{hfs}
: {
: }
:
0 / 1 : std::size_t next_job_id() { return next_job_id_m++; }
:
: void start_thread(std::size_t spec_hash,
: std::packaged_task<void(std::stop_token, job)> task, job j)
: {
0 / 1 : auto j_id = j.id();
:
0 / 1 : auto s_id = j.socket_id();
:
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : futures_m.emplace_back(j_id, s_id, spec_hash, task.get_future());
:
0 / 1 : auto th_res = threads_m.try_emplace(j_id, std::move(task), std::move(j));
:
0 / 1 : if (!th_res.second)
0 / 1 : futures_m.pop_back();
: }
:
: auto count_threads()
: {
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : return futures_m.size();
: }
:
: void stop_all_threads()
: {
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : for (const auto& t : threads_m)
: {
0 / 1 : if (stop_possible(t.first))
0 / 1 : request_stop(t.first);
: }
:
0 / 1 : while (!futures_m.empty())
0 / 1 : process_futures();
:
0 / 1 : pipe_jobs_m.clear();
: }
:
: bool can_exec(int s_id, std::size_t spec_hash)
: {
0 / 1 : auto lock = hf_registry_m.lock();
:
0 / 1 : return hf_registry_m.has_handler_for(s_id, spec_hash);
: }
:
: void exec(job j)
: {
0 / 1 : auto spec_hash = j.spec_hash();
:
0 / 1 : auto s_id = j.socket_id();
:
0 / 1 : auto lock = hf_registry_m.lock();
:
0 / 1 : if (!hf_registry_m.has_handler_for(s_id, spec_hash))
: {
0 / 1 : pars::err(SL, lf::event,
0 / 1 : "Unable to find handler for Spec 0x{:X} on Socket {}, skip "
0 / 1 : "message ...",
0 / 1 : spec_hash, s_id);
:
0 / 1 : return;
: }
:
0 / 1 : auto& hf = hf_registry_m.handler_for(s_id, spec_hash);
:
0 / 1 : lock.unlock();
:
0 / 1 : pars::debug(SL, lf::event, "Job #{}: Running Handler [{}]", j.id(),
0 / 1 : demangle(hf_registry_m.type_for(spec_hash)->name()));
:
0 / 1 : try
: {
: {
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : process_futures();
: }
:
0 / 1 : hf(std::move(j));
: }
0 / 1 : catch (...)
: {
0 / 1 : process_exception(s_id, spec_hash);
: }
: }
:
: template<template<typename> typename kind_of, event_c event_t>
: requires kind_c<kind_of>
: void exec(kind_of<event_t> ke)
: {
: exec(make_job(next_job_id(), std::move(ke)));
: }
:
: void add_pipe(const net::pipe& p)
: {
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : auto p_id = p.id();
:
0 / 1 : pipe_jobs_m.insert({p_id, {}});
: }
:
: /// stop all running and remove all pending jobs for pipe p
: void remove_pipe(const net::pipe& p)
: {
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : auto p_id = p.id();
:
0 / 1 : if (!pipe_jobs_m.at(p_id).empty())
: {
0 / 1 : // stop all running jobs for pipe p
0 / 1 : stop_runnings(pipe_jobs_m.at(p_id));
: }
:
0 / 1 : pipe_jobs_m.erase(p_id);
: }
:
: void associate_job_to_pipe(const int j_id, const int p_id)
: {
0 / 1 : if (j_id <= 0)
0 / 1 : throw std::runtime_error(std::format("Job #{}: invalid Job!", p_id));
:
0 / 1 : if (p_id <= 0)
0 / 1 : return;
:
0 / 1 : auto guard = std::lock_guard{mtx_m};
:
0 / 1 : // add j_id to the jobs related to p, unless p was removed with a previous
0 / 1 : // call to remove_pipe(p) (ie: pipe_jobs_m does not contains p.id())
0 / 1 : if (pipe_jobs_m.contains(p_id))
: {
0 / 1 : pipe_jobs_m.at(p_id).push_back(j_id);
:
0 / 1 : pars::debug(SL, lf::event,
0 / 1 : "Job #{} pushed and associated to Pipe {:X} [# size: {}]",
0 / 1 : j_id, p_id, pipe_jobs_m.at(p_id).size());
: }
: }
:
: void stop_thread(const int j_id)
: {
0 / 1 : auto guard = std::lock_guard(mtx_m);
:
0 / 1 : if (stop_possible(j_id))
0 / 1 : request_stop(j_id);
: }
:
: private:
: std::unordered_set<std::size_t> stop_runnings(std::vector<std::size_t> j_ids)
: {
0 / 1 : std::unordered_set<std::size_t> pending_jobs;
:
0 / 1 : for (const auto& j_id : j_ids)
: {
0 / 1 : if (thread_running(j_id))
: {
0 / 1 : // stop running job, if possible
0 / 1 : if (stop_possible(j_id))
0 / 1 : request_stop(j_id);
: }
0 / 1 : else
: {
0 / 1 : pending_jobs.insert(j_id);
: }
: }
:
0 / 1 : return pending_jobs;
: }
:
: void process_futures()
: {
0 / 1 : for (auto it = futures_m.begin(); it != futures_m.end();)
: {
0 / 1 : using namespace std::chrono_literals;
:
0 / 1 : auto& [j_id, s_id, spec_hash, f] = *it;
:
0 / 1 : if (f.wait_for(0ms) == std::future_status::ready)
: {
0 / 1 : try
: {
0 / 1 : f.get();
: }
0 / 1 : catch (...)
: {
0 / 1 : pars::debug(SL, lf::event,
0 / 1 : "Job #{}: Throws, processing exceptions !!", j_id);
:
0 / 1 : process_exception(s_id, spec_hash);
: }
:
0 / 1 : threads_m.erase(j_id);
:
0 / 1 : pars::debug(SL, lf::event, "Job #{}: Done! [# futures: {}]", j_id,
0 / 1 : futures_m.size());
:
0 / 1 : it = futures_m.erase(it);
: }
0 / 1 : else
: {
0 / 1 : ++it;
: }
: }
: }
:
: void process_exception(auto s_id, auto spec_hash)
: {
0 / 1 : auto e_ptr = std::current_exception();
:
0 / 1 : try
: {
0 / 1 : std::rethrow_exception(e_ptr);
: }
0 / 1 : catch (std::exception& e)
: {
0 / 1 : pars::err(SL, lf::event, "Handler for 0x{:X} throws: {}", spec_hash,
0 / 1 : e.what());
: }
0 / 1 : catch (...)
: {
0 / 1 : pars::err(SL, lf::event, "Handler for 0x{:X} throws: Unknown Exception",
0 / 1 : spec_hash);
: }
:
0 / 1 : auto e_hash = spec<fired<exception>>::hash;
:
0 / 1 : auto lock = hf_registry_m.lock();
:
0 / 1 : if (!hf_registry_m.has_handler_for(s_id, e_hash))
0 / 1 : return;
:
0 / 1 : auto& hf = hf_registry_m.handler_for(s_id, e_hash);
:
0 / 1 : lock.unlock();
:
0 / 1 : try
: {
0 / 1 : hf(make_job(next_job_id(), fired{exception{e_ptr}, {}}));
: }
0 / 1 : catch (std::exception& e)
: {
0 / 1 : pars::err(SL, lf::event, "Exception Handler for 0x{:X} throws: {}",
0 / 1 : spec_hash, e.what());
: }
0 / 1 : catch (...)
: {
0 / 1 : pars::err(SL, lf::event,
0 / 1 : "Exeption Handler for 0x{:X} throws: Unknown Exception",
0 / 1 : spec_hash);
: }
: }
:
: void request_stop(const std::size_t j_id)
: {
0 / 1 : threads_m[j_id].get_stop_source().request_stop();
: }
:
: bool thread_running(const std::size_t j_id)
: {
0 / 1 : return threads_m.contains(j_id);
: }
:
: bool stop_possible(const std::size_t j_id)
: {
0 / 1 : return threads_m[j_id].get_stop_token().stop_possible();
: }
:
: std::mutex mtx_m; ///< protects futures_m, threads_m
:
: using futures_value_type =
: std::tuple<std::size_t, int, std::size_t, std::future<void>>;
:
: std::vector<futures_value_type>
: futures_m; ///< all job ids, socket ids, spec hashes
: ///< and futures from threads that are running
:
: std::unordered_map<std::size_t, std::jthread>
: threads_m; ///< the jthread for a given job id, if any
:
: hf_registry& hf_registry_m;
:
: std::unordered_map<int, std::vector<std::size_t>>
: pipe_jobs_m; ///< all queued ids of jobs for a given pipe id
:
: std::atomic<std::size_t> next_job_id_m{0};
: };
:
: } // namespace pars::ev
 |
| Generated by: llvmcov2html |