pars 0.2.1
Loading...
Searching...
No Matches
pars::ev::runner Struct Reference

#include <runner.h>

Public Member Functions

 runner (hf_registry &hfs)
 
std::size_t next_job_id ()
 
void start_thread (std::size_t spec_hash, std::packaged_task< void(std::stop_token, job)> task, job j)
 
auto count_threads ()
 
void stop_all_threads ()
 
bool can_exec (int s_id, std::size_t spec_hash)
 
void exec (job j)
 
template<template< typename > typename kind_of, event_c event_t>
requires kind_c<kind_of>
void exec (kind_of< event_t > ke)
 
void add_pipe (const net::pipe &p)
 
void remove_pipe (const net::pipe &p)
 stop all running and remove all pending jobs for pipe p
 
void associate_job_to_pipe (const int j_id, const int p_id)
 
void stop_thread (const int j_id)
 

Detailed Description

Definition at line 45 of file runner.h.

Constructor & Destructor Documentation

◆ runner()

pars::ev::runner::runner ( hf_registry & hfs)
inline

Definition at line 48 of file runner.h.

49 : hf_registry_m{hfs}
50 {
51 }

Member Function Documentation

◆ add_pipe()

void pars::ev::runner::add_pipe ( const net::pipe & p)
inline

Definition at line 150 of file runner.h.

151 {
152 auto guard = std::lock_guard{mtx_m};
153
154 auto p_id = p.id();
155
156 pipe_jobs_m.insert({p_id, {}});
157 }

References pars::net::pipe::id().

Here is the call graph for this function:

◆ associate_job_to_pipe()

void pars::ev::runner::associate_job_to_pipe ( const int j_id,
const int p_id )
inline

Definition at line 175 of file runner.h.

176 {
177 if (j_id <= 0)
178 throw std::runtime_error(std::format("Job #{}: invalid Job!", p_id));
179
180 if (p_id <= 0)
181 return;
182
183 auto guard = std::lock_guard{mtx_m};
184
185 // add j_id to the jobs related to p, unless p was removed with a previous
186 // call to remove_pipe(p) (ie: pipe_jobs_m does not contains p.id())
187 if (pipe_jobs_m.contains(p_id))
188 {
189 pipe_jobs_m.at(p_id).push_back(j_id);
190
192 "Job #{} pushed and associated to Pipe {:X} [# size: {}]",
193 j_id, p_id, pipe_jobs_m.at(p_id).size());
194 }
195 }
#define SL
Definition log.h:58
@ event
Definition flags.h:43
void debug(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:129

References pars::debug(), pars::event, and SL.

Here is the call graph for this function:

◆ can_exec()

bool pars::ev::runner::can_exec ( int s_id,
std::size_t spec_hash )
inline

Definition at line 95 of file runner.h.

96 {
97 auto lock = hf_registry_m.lock();
98
99 return hf_registry_m.has_handler_for(s_id, spec_hash);
100 }

◆ count_threads()

auto pars::ev::runner::count_threads ( )
inline

Definition at line 72 of file runner.h.

73 {
74 auto guard = std::lock_guard{mtx_m};
75
76 return futures_m.size();
77 }

◆ exec() [1/2]

void pars::ev::runner::exec ( job j)
inline

Definition at line 102 of file runner.h.

103 {
104 auto spec_hash = j.spec_hash();
105
106 auto s_id = j.socket_id();
107
108 auto lock = hf_registry_m.lock();
109
110 if (!hf_registry_m.has_handler_for(s_id, spec_hash))
111 {
113 "Unable to find handler for Spec 0x{:X} on Socket {}, skip "
114 "message ...",
115 spec_hash, s_id);
116
117 return;
118 }
119
120 auto& hf = hf_registry_m.handler_for(s_id, spec_hash);
121
122 lock.unlock();
123
124 pars::debug(SL, lf::event, "Job #{}: Running Handler [{}]", j.id(),
125 demangle(hf_registry_m.type_for(spec_hash)->name()));
126
127 try
128 {
129 {
130 auto guard = std::lock_guard{mtx_m};
131
132 process_futures();
133 }
134
135 hf(std::move(j));
136 }
137 catch (...)
138 {
139 process_exception(s_id, spec_hash);
140 }
141 }
void err(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:211

References pars::debug(), pars::err(), pars::event, pars::ev::job::id(), SL, pars::ev::job::socket_id(), and pars::ev::job::spec_hash().

Referenced by exec().

Here is the call graph for this function:
Here is the caller graph for this function:

◆ exec() [2/2]

template<template< typename > typename kind_of, event_c event_t>
requires kind_c<kind_of>
void pars::ev::runner::exec ( kind_of< event_t > ke)
inline

Definition at line 145 of file runner.h.

146 {
147 exec(make_job(next_job_id(), std::move(ke)));
148 }
std::size_t next_job_id()
Definition runner.h:53
void exec(job j)
Definition runner.h:102

References exec(), and next_job_id().

Here is the call graph for this function:

◆ next_job_id()

std::size_t pars::ev::runner::next_job_id ( )
inline

Definition at line 53 of file runner.h.

53{ return next_job_id_m++; }

Referenced by exec().

Here is the caller graph for this function:

◆ remove_pipe()

void pars::ev::runner::remove_pipe ( const net::pipe & p)
inline

stop all running and remove all pending jobs for pipe p

Definition at line 160 of file runner.h.

161 {
162 auto guard = std::lock_guard{mtx_m};
163
164 auto p_id = p.id();
165
166 if (!pipe_jobs_m.at(p_id).empty())
167 {
168 // stop all running jobs for pipe p
169 stop_runnings(pipe_jobs_m.at(p_id));
170 }
171
172 pipe_jobs_m.erase(p_id);
173 }

References pars::net::pipe::id().

Here is the call graph for this function:

◆ start_thread()

void pars::ev::runner::start_thread ( std::size_t spec_hash,
std::packaged_task< void(std::stop_token, job)> task,
job j )
inline

Definition at line 55 of file runner.h.

57 {
58 auto j_id = j.id();
59
60 auto s_id = j.socket_id();
61
62 auto guard = std::lock_guard{mtx_m};
63
64 futures_m.emplace_back(j_id, s_id, spec_hash, task.get_future());
65
66 auto th_res = threads_m.try_emplace(j_id, std::move(task), std::move(j));
67
68 if (!th_res.second)
69 futures_m.pop_back();
70 }

References pars::ev::job::id(), and pars::ev::job::socket_id().

Here is the call graph for this function:

◆ stop_all_threads()

void pars::ev::runner::stop_all_threads ( )
inline

Definition at line 79 of file runner.h.

80 {
81 auto guard = std::lock_guard{mtx_m};
82
83 for (const auto& t : threads_m)
84 {
85 if (stop_possible(t.first))
86 request_stop(t.first);
87 }
88
89 while (!futures_m.empty())
90 process_futures();
91
92 pipe_jobs_m.clear();
93 }

◆ stop_thread()

void pars::ev::runner::stop_thread ( const int j_id)
inline

Definition at line 197 of file runner.h.

198 {
199 auto guard = std::lock_guard(mtx_m);
200
201 if (stop_possible(j_id))
202 request_stop(j_id);
203 }

The documentation for this struct was generated from the following file: