Coverage Report
Command: out/build/linux-debug-llvm/test/tests
Date: Wed Dec 17 12:00:02 2025 Instrumented lines: 114
Code covered: 0.0 % Executed lines: 0
             : /*
             : Copyright (c) 2025 Giuseppe Roberti.
             : All rights reserved.
             : 
             : Redistribution and use in source and binary forms, with or without modification,
             : are permitted provided that the following conditions are met:
             : 
             : 1. Redistributions of source code must retain the above copyright notice, this
             : list of conditions and the following disclaimer.
             : 
             : 2. Redistributions in binary form must reproduce the above copyright notice,
             : this list of conditions and the following disclaimer in the documentation and/or
             : other materials provided with the distribution.
             : 
             : 3. Neither the name of the copyright holder nor the names of its contributors
             : may be used to endorse or promote products derived from this software without
             : specific prior written permission.
             : 
             : THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
             : ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
             : WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
             : DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
             : ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
             : (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
             : LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
             : ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
             : (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
             : SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
             : */
             : #pragma once
             : 
             : #include "pars/ev/event.h"
             : #include "pars/ev/hf_registry.h"
             : #include "pars/ev/job.h"
             : 
             : #include <format>
             : #include <functional>
             : #include <future>
             : #include <thread>
             : #include <unordered_map>
             : 
             : namespace pars::ev
             : {
             : 
             : struct runner
             : {
             : public:
             :   runner(hf_registry& hfs)
             :     : hf_registry_m{hfs}
             :   {
             :   }
             : 
      0 / 1  :   std::size_t next_job_id() { return next_job_id_m++; }
             : 
             :   void start_thread(std::size_t spec_hash,
             :                     std::packaged_task<void(std::stop_token, job)> task, job j)
             :   {
      0 / 1  :     auto j_id = j.id();
             : 
      0 / 1  :     auto s_id = j.socket_id();
             : 
      0 / 1  :     auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :     futures_m.emplace_back(j_id, s_id, spec_hash, task.get_future());
             : 
      0 / 1  :     auto th_res = threads_m.try_emplace(j_id, std::move(task), std::move(j));
             : 
      0 / 1  :     if (!th_res.second)
      0 / 1  :       futures_m.pop_back();
             :   }
             : 
             :   auto count_threads()
             :   {
      0 / 1  :     auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :     return futures_m.size();
             :   }
             : 
             :   void stop_all_threads()
             :   {
      0 / 1  :     auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :     for (const auto& t : threads_m)
             :     {
      0 / 1  :       if (stop_possible(t.first))
      0 / 1  :         request_stop(t.first);
             :     }
             : 
      0 / 1  :     while (!futures_m.empty())
      0 / 1  :       process_futures();
             : 
      0 / 1  :     pipe_jobs_m.clear();
             :   }
             : 
             :   bool can_exec(int s_id, std::size_t spec_hash)
             :   {
      0 / 1  :     auto lock = hf_registry_m.lock();
             : 
      0 / 1  :     return hf_registry_m.has_handler_for(s_id, spec_hash);
             :   }
             : 
             :   void exec(job j)
             :   {
      0 / 1  :     auto spec_hash = j.spec_hash();
             : 
      0 / 1  :     auto s_id = j.socket_id();
             : 
      0 / 1  :     auto lock = hf_registry_m.lock();
             : 
      0 / 1  :     if (!hf_registry_m.has_handler_for(s_id, spec_hash))
             :     {
      0 / 1  :       pars::err(SL, lf::event,
      0 / 1  :                 "Unable to find handler for Spec 0x{:X} on Socket {}, skip "
      0 / 1  :                 "message ...",
      0 / 1  :                 spec_hash, s_id);
             : 
      0 / 1  :       return;
             :     }
             : 
      0 / 1  :     auto& hf = hf_registry_m.handler_for(s_id, spec_hash);
             : 
      0 / 1  :     lock.unlock();
             : 
      0 / 1  :     pars::debug(SL, lf::event, "Job #{}: Running Handler [{}]", j.id(),
      0 / 1  :                 demangle(hf_registry_m.type_for(spec_hash)->name()));
             : 
      0 / 1  :     try
             :     {
             :       {
      0 / 1  :         auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :         process_futures();
             :       }
             : 
      0 / 1  :       hf(std::move(j));
             :     }
      0 / 1  :     catch (...)
             :     {
      0 / 1  :       process_exception(s_id, spec_hash);
             :     }
             :   }
             : 
             :   template<template<typename> typename kind_of, event_c event_t>
             :     requires kind_c<kind_of>
             :   void exec(kind_of<event_t> ke)
             :   {
             :     exec(make_job(next_job_id(), std::move(ke)));
             :   }
             : 
             :   void add_pipe(const net::pipe& p)
             :   {
      0 / 1  :     auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :     auto p_id = p.id();
             : 
      0 / 1  :     pipe_jobs_m.insert({p_id, {}});
             :   }
             : 
             :   /// stop all running and remove all pending jobs for pipe p
             :   void remove_pipe(const net::pipe& p)
             :   {
      0 / 1  :     auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :     auto p_id = p.id();
             : 
      0 / 1  :     if (!pipe_jobs_m.at(p_id).empty())
             :     {
      0 / 1  :       // stop all running jobs for pipe p
      0 / 1  :       stop_runnings(pipe_jobs_m.at(p_id));
             :     }
             : 
      0 / 1  :     pipe_jobs_m.erase(p_id);
             :   }
             : 
             :   void associate_job_to_pipe(const int j_id, const int p_id)
             :   {
      0 / 1  :     if (j_id <= 0)
      0 / 1  :       throw std::runtime_error(std::format("Job #{}: invalid Job!", p_id));
             : 
      0 / 1  :     if (p_id <= 0)
      0 / 1  :       return;
             : 
      0 / 1  :     auto guard = std::lock_guard{mtx_m};
             : 
      0 / 1  :     // add j_id to the jobs related to p, unless p was removed with a previous
      0 / 1  :     // call to remove_pipe(p) (ie: pipe_jobs_m does not contains p.id())
      0 / 1  :     if (pipe_jobs_m.contains(p_id))
             :     {
      0 / 1  :       pipe_jobs_m.at(p_id).push_back(j_id);
             : 
      0 / 1  :       pars::debug(SL, lf::event,
      0 / 1  :                   "Job #{} pushed and associated to Pipe {:X} [# size: {}]",
      0 / 1  :                   j_id, p_id, pipe_jobs_m.at(p_id).size());
             :     }
             :   }
             : 
             :   void stop_thread(const int j_id)
             :   {
      0 / 1  :     auto guard = std::lock_guard(mtx_m);
             : 
      0 / 1  :     if (stop_possible(j_id))
      0 / 1  :       request_stop(j_id);
             :   }
             : 
             : private:
             :   std::unordered_set<std::size_t> stop_runnings(std::vector<std::size_t> j_ids)
             :   {
      0 / 1  :     std::unordered_set<std::size_t> pending_jobs;
             : 
      0 / 1  :     for (const auto& j_id : j_ids)
             :     {
      0 / 1  :       if (thread_running(j_id))
             :       {
      0 / 1  :         // stop running job, if possible
      0 / 1  :         if (stop_possible(j_id))
      0 / 1  :           request_stop(j_id);
             :       }
      0 / 1  :       else
             :       {
      0 / 1  :         pending_jobs.insert(j_id);
             :       }
             :     }
             : 
      0 / 1  :     return pending_jobs;
             :   }
             : 
             :   void process_futures()
             :   {
      0 / 1  :     for (auto it = futures_m.begin(); it != futures_m.end();)
             :     {
      0 / 1  :       using namespace std::chrono_literals;
             : 
      0 / 1  :       auto& [j_id, s_id, spec_hash, f] = *it;
             : 
      0 / 1  :       if (f.wait_for(0ms) == std::future_status::ready)
             :       {
      0 / 1  :         try
             :         {
      0 / 1  :           f.get();
             :         }
      0 / 1  :         catch (...)
             :         {
      0 / 1  :           pars::debug(SL, lf::event,
      0 / 1  :                       "Job #{}: Throws, processing exceptions !!", j_id);
             : 
      0 / 1  :           process_exception(s_id, spec_hash);
             :         }
             : 
      0 / 1  :         threads_m.erase(j_id);
             : 
      0 / 1  :         pars::debug(SL, lf::event, "Job #{}: Done! [# futures: {}]", j_id,
      0 / 1  :                     futures_m.size());
             : 
      0 / 1  :         it = futures_m.erase(it);
             :       }
      0 / 1  :       else
             :       {
      0 / 1  :         ++it;
             :       }
             :     }
             :   }
             : 
             :   void process_exception(auto s_id, auto spec_hash)
             :   {
      0 / 1  :     auto e_ptr = std::current_exception();
             : 
      0 / 1  :     try
             :     {
      0 / 1  :       std::rethrow_exception(e_ptr);
             :     }
      0 / 1  :     catch (std::exception& e)
             :     {
      0 / 1  :       pars::err(SL, lf::event, "Handler for 0x{:X} throws: {}", spec_hash,
      0 / 1  :                 e.what());
             :     }
      0 / 1  :     catch (...)
             :     {
      0 / 1  :       pars::err(SL, lf::event, "Handler for 0x{:X} throws: Unknown Exception",
      0 / 1  :                 spec_hash);
             :     }
             : 
      0 / 1  :     auto e_hash = spec<fired<exception>>::hash;
             : 
      0 / 1  :     auto lock = hf_registry_m.lock();
             : 
      0 / 1  :     if (!hf_registry_m.has_handler_for(s_id, e_hash))
      0 / 1  :       return;
             : 
      0 / 1  :     auto& hf = hf_registry_m.handler_for(s_id, e_hash);
             : 
      0 / 1  :     lock.unlock();
             : 
      0 / 1  :     try
             :     {
      0 / 1  :       hf(make_job(next_job_id(), fired{exception{e_ptr}, {}}));
             :     }
      0 / 1  :     catch (std::exception& e)
             :     {
      0 / 1  :       pars::err(SL, lf::event, "Exception Handler for 0x{:X} throws: {}",
      0 / 1  :                 spec_hash, e.what());
             :     }
      0 / 1  :     catch (...)
             :     {
      0 / 1  :       pars::err(SL, lf::event,
      0 / 1  :                 "Exeption Handler for 0x{:X} throws: Unknown Exception",
      0 / 1  :                 spec_hash);
             :     }
             :   }
             : 
             :   void request_stop(const std::size_t j_id)
             :   {
      0 / 1  :     threads_m[j_id].get_stop_source().request_stop();
             :   }
             : 
             :   bool thread_running(const std::size_t j_id)
             :   {
      0 / 1  :     return threads_m.contains(j_id);
             :   }
             : 
             :   bool stop_possible(const std::size_t j_id)
             :   {
      0 / 1  :     return threads_m[j_id].get_stop_token().stop_possible();
             :   }
             : 
             :   std::mutex mtx_m; ///< protects futures_m, threads_m
             : 
             :   using futures_value_type =
             :     std::tuple<std::size_t, int, std::size_t, std::future<void>>;
             : 
             :   std::vector<futures_value_type>
             :     futures_m; ///< all job ids, socket ids, spec hashes
             :                ///< and futures from threads that are running
             : 
             :   std::unordered_map<std::size_t, std::jthread>
             :     threads_m; ///< the jthread for a given job id, if any
             : 
             :   hf_registry& hf_registry_m;
             : 
             :   std::unordered_map<int, std::vector<std::size_t>>
             :     pipe_jobs_m; ///< all queued ids of jobs for a given pipe id
             : 
             :   std::atomic<std::size_t> next_job_id_m{0};
             : };
             : 
             : } // namespace pars::ev
Generated by: llvmcov2html