pars 0.2.1
Loading...
Searching...
No Matches
dispatcher.h
Go to the documentation of this file.
1/*
2Copyright (c) 2025 Giuseppe Roberti.
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without modification,
6are permitted provided that the following conditions are met:
7
81. Redistributions of source code must retain the above copyright notice, this
9list of conditions and the following disclaimer.
10
112. Redistributions in binary form must reproduce the above copyright notice,
12this list of conditions and the following disclaimer in the documentation and/or
13other materials provided with the distribution.
14
153. Neither the name of the copyright holder nor the names of its contributors
16may be used to endorse or promote products derived from this software without
17specific prior written permission.
18
19THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
20ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
23ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
26ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29*/
30#pragma once
31
32#include "pars/ev/event.h"
33#include "pars/ev/hf_registry.h"
34#include "pars/ev/job.h"
35#include "pars/ev/kind.h"
36#include "pars/ev/runner.h"
37#include "pars/log.h"
38
39#include <condition_variable>
40#include <deque>
41#include <mutex>
42
43namespace pars::ev
44{
45
47{
48public:
50 : runner_m{r}
51 {
52 }
53
55
56 void run()
57 {
58 running_m = true;
59
60 queue_back(fired{init{}, {}});
61
62 for (;;)
63 {
64 auto lock = std::unique_lock{mtx_m};
65
66 cond_m.wait(lock, [&]() { return !queue_m.empty() || !running_m; });
67
68 if (!running_m)
69 {
70 cond_m.wait(lock, [&]() { return terminate_m; });
71
72 return;
73 }
74
75 // NOTE: exec is executed after mtx_m unlock
76
77 runner_m.exec(next_job(lock));
78 }
79 }
80
82 {
83 auto guard = std::lock_guard{mtx_m};
84
85 queue_m.clear();
86
87 runner_m.stop_all_threads();
88
89 running_m = false;
90
91 cond_m.notify_one();
92 }
93
95 {
96 auto guard = std::lock_guard{mtx_m};
97
98 if (running_m)
99 throw std::runtime_error("Call stop_running first!");
100
101 terminate_m = true;
102
103 cond_m.notify_one();
104 }
105
107 {
108 auto guard = std::lock_guard{mtx_m};
109
110 return running_m == false;
111 }
112
114
115 template<template<typename> typename kind_of, event_c event_t>
116 requires kind_c<kind_of>
117 void queue_back(kind_of<event_t> ke)
118 {
119 auto guard = std::lock_guard{mtx_m};
120
121 queue(std::move(ke),
122 std::bind(std::mem_fn<void(decltype(queue_m)::value_type&&)>(
123 &decltype(queue_m)::push_back),
124 &queue_m, std::placeholders::_1));
125 }
126
127 template<template<typename> typename kind_of, event_c event_t>
128 requires kind_c<kind_of>
129 void queue_front(kind_of<event_t> ke)
130 {
131 auto guard = std::lock_guard{mtx_m};
132
133 queue(std::move(ke),
134 std::bind(std::mem_fn<void(decltype(queue_m)::value_type&&)>(
135 &decltype(queue_m)::push_front),
136 &queue_m, std::placeholders::_1));
137 }
138
139private:
141
142 std::mutex mtx_m;
143 std::condition_variable cond_m;
144
146
147 job next_job(auto& lock)
148 {
149 auto j{std::move(queue_m.front())};
150
151 queue_m.pop_front();
152
153 // we would like to use exec(next_job(std::move(lock)))
154 // but it is implementation-defined wether the lock moved into next_job
155 // is destroyed before exec is executed or not
156 // https://eel.is/c++draft/expr.call#6.sentence-10
157
158 // for this reason, we are going to unlock here
159 lock.unlock();
160
161 return j;
162 }
163
164 bool terminate_m{false};
165 bool running_m{false};
166 runner& runner_m;
167
169
170 template<template<typename> typename kind_of, event_c event_t>
171 requires kind_c<kind_of>
172 void queue(kind_of<event_t> ke, auto push_fn)
173 {
174 if (!running_m)
175 return;
176
177 auto j_id = runner_m.next_job_id();
178
179 push_fn(make_job(j_id, std::move(ke)));
180
181 if constexpr (internal_event_c<event_t>)
182 {
183 pars::debug(SL, lf::event, "Job #{} pushed [# jobs: {}]", j_id,
184 queue_m.size());
185 }
186 else if constexpr (network_event_c<event_t>)
187 {
188 auto p_id = ke.md().pipe().id();
189
190 runner_m.associate_job_to_pipe(j_id, p_id);
191
193 "Job #{} pushed and associated with Pipe {:X} [# jobs: {}]",
194 j_id, p_id, queue_m.size());
195 }
196
197 cond_m.notify_one();
198 }
199
200 std::deque<job> queue_m;
201};
202
203} // namespace pars::ev
void queue_front(kind_of< event_t > ke)
Definition dispatcher.h:129
dispatcher(runner &r)
Definition dispatcher.h:49
void queue_back(kind_of< event_t > ke)
Definition dispatcher.h:117
#define SL
Definition log.h:58
@ event
Definition flags.h:43
void debug(spdlog::source_loc loc, pars::lf lf, spdlog::format_string_t< args_t... > fmt, args_t &&... args)
Definition log.h:129