FOUND Coverage Report


src/
File: common/pipeline/pipelines.hpp
Date: 2025-11-20 21:57:26
Lines:
61/61
100.0%
Functions:
155/155
100.0%
Branches:
31/31
100.0%

Line Branch Exec Source
1 #ifndef SRC_COMMON_PIPELINE_PIPELINES_HPP_
2 #define SRC_COMMON_PIPELINE_PIPELINES_HPP_
3
4 #include <optional>
5 #include <stdexcept>
6 #include <cassert>
7 #include <memory>
8 #include <type_traits>
9 #include <utility>
10
11 #include "common/pipeline/stages.hpp"
12
13 /// The default number of pipeline stages
14 #define DEFAULT_NUM_STAGES 10
15
16 namespace found {
17
18 /**
19 * A Pipeline<Input, Output, N> is an abstract Pipeline
20 * that takes an Input, outputs an Output, with N stages
21 *
22 * @param Input The input type
23 * @param Output The output type
24 * @param N The number of stages it uses
25 *
26 * @pre Output must be able to be default constructed
27 */
28 template<typename Input,
29 typename Output,
30 size_t N = DEFAULT_NUM_STAGES>
31 class Pipeline : public FunctionStage<Input, Output> {
32 public:
33 /**
34 * Runs a Pipeline
35 *
36 * @note Because Pipelines already construct their
37 * return type into the correct destination, there
38 * is no need to set the product to the result of the
39 * Run, so we optimize here. This override actually
40 * isn't necessary for code correctness, but is great
41 * for optimization.
42 */
43 38 void DoAction() override {
44
1/1
✓ Branch 2 taken 11 times.
38 this->Run(this->resource);
45 38 }
46
47 /**
48 * Runs this Pipeline
49 *
50 * @param input The input to the Pipeline
51 *
52 * @return The output of the Pipeline
53 *
54 * @post This method must also construct
55 * Output into the correct destination
56 * so that Pipeline::GetProduct() may be
57 * used
58 *
59 * @note In this function only, Pipeline::GetProduct()
60 * indicates whether storage is present or not for
61 * the output.
62 */
63 virtual Output Run(const Input &input) = 0;
64
65 protected:
66 /// The stages of this
67 Action *stages[N];
68 /// Ownership storage for the stages
69 std::unique_ptr<Action> ownedStages[N];
70 /// The number of stages
71 size_t size = 0;
72 /// Whether we're complete
73 bool ready = false;
74 /// The final product. This is only sometimes used
75 std::optional<Output> finalProduct;
76
77 /**
78 * Adds a stage to this pipeline, taking ownership of the provided stage.
79 *
80 * @param stage The stage to add to the pipeline (ownership transferred via unique_ptr)
81 *
82 * @throw std::invalid_argument iff this pipeline has already been completed
83 *
84 * @pre This method is called when the number of registered stages is
85 * less than N - 1
86 *
87 * @post The pipeline stores the stage internally, extending its lifetime to match
88 * the pipeline's lifetime.
89 */
90 236 inline void AddStageHelper(std::unique_ptr<Action> &&stage) {
91 assert(this->size < N);
92
3/3
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 116 times.
✓ Branch 4 taken 2 times.
236 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
93 232 this->stages[size] = stage.get();
94 232 this->ownedStages[size++] = std::move(stage);
95 232 }
96
97 /**
98 * Completes a pipeline with a final stage, taking ownership of it.
99 *
100 * @param stage The final stage to add (ownership transferred via unique_ptr)
101 *
102 * @throw std::invalid_argument iff this pipeline has already been completed
103 *
104 * @pre This method is called when the number of
105 * registered stages is less than N
106 * @pre The pipeline is not ready yet (this->ready == false)
107 *
108 * @post The pipeline is ready (this->ready == true) and retains the stage
109 * for the remainder of its lifetime.
110 */
111 22 inline void CompleteHelper(std::unique_ptr<Action> &&stage) {
112 assert(this->size < N);
113
3/3
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 13 times.
✓ Branch 4 taken 1 times.
22 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
114 20 this->stages[size] = stage.get();
115 20 this->ownedStages[size++] = std::move(stage);
116 20 this->ready = true;
117 20 }
118
119 /**
120 * Runs the entire pipeline
121 *
122 * @pre The pipeline is ready (this->ready == true)
123 */
124 128 inline void DoActionHelper() {
125
2/2
✓ Branch 0 taken 125 times.
✓ Branch 1 taken 64 times.
378 for (size_t i = 0; i < this->size; i++) {
126 250 this->stages[i]->DoAction();
127 }
128 128 }
129 };
130
131 /**
132 * SequentialPipeline is composite Stage (i.e. A Stage
133 * that is made up of many stages). You can
134 * add 1 to many stages into a SequentialPipeline so long
135 * as the Input to the first stage is the same
136 * Input to the SequentialPipeline, and the Output to the
137 * last stage is the Output of the SequentialPipeline.
138 *
139 * A SequentialPipeline runs by feeding the input to the first
140 * stage, taking its output and feeding it to the next
141 * stage, and so on until the last stage outputs
142 * the output.
143 *
144 * @param Input The SequentialPipeline's Input
145 * @param Output The SequentialPipeline's Output
146 * @param N Number of stages
147 *
148 * @pre Output must be able to be default constructed (i.e. output
149 * has trait Output::Output())
150 */
151 template<typename Input, typename Output, size_t N = DEFAULT_NUM_STAGES>
152 class SequentialPipeline : public Pipeline<Input, Output, N> {
153 public:
154 /**
155 * Constructs an empty SequentialPipeline
156 */
157 114 SequentialPipeline() = default;
158
159 /**
160 * Adds a stage to this pipeline
161 *
162 * @param stage The stage to add to the pipeline
163 *
164 * @return this, with the new stage added (for chaining)
165 *
166 * @throws invalid_argument iff this is the first time this
167 * method is called, and I does not match Input OR if the SequentialPipeline
168 * is already complete (aka this::Complete was called successfully)
169 *
170 * @pre Iff this method has already been called, O from the last
171 * parameter must match I of the next parameter
172 * @pre This method is called when the number of registered stages is
173 * less than N - 1
174 */
175 template<typename StageType>
176 226 SequentialPipeline &AddStage(std::unique_ptr<StageType> stage) {
177 static_assert(std::is_base_of<FunctionStage<typename StageType::InputType,
178 typename StageType::OutputType>, StageType>::value,
179 "Stage must derive from FunctionStage");
180 226 StageType *stagePtr = stage.get();
181
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 57 times.
226 if (this->size == 0) {
182 if (!std::is_same<Input, typename StageType::InputType>::value) {
183
1/1
✓ Branch 2 taken 1 times.
2 throw std::invalid_argument("The initial input type is not correct");
184 }
185 110 this->firstResource = reinterpret_cast<Input *>(&stagePtr->GetResource());
186 } else {
187 // Chain here, and blindly trust the user
188 114 *this->lastProduct = static_cast<void *>(&stagePtr->GetResource());
189 }
190 // Add to our list
191
1/1
✓ Branch 4 taken 111 times.
226 Pipeline<Input, Output, N>::AddStageHelper(std::move(stage));
192 // Now, reset the lastProduct to be of this stage
193 222 this->lastProduct = reinterpret_cast<void **>(&stagePtr->GetProduct());
194 // Return the pipeline for chaining
195 222 return *this;
196 }
197
198 /**
199 * Adds a stage to the pipeline and marks it as the last stage,
200 * preventing further manipulation of the SequentialPipeline
201 *
202 * @param stage The stage to add
203 *
204 * @return this, with the last stage added (to run this::Run)
205 *
206 * @throws invalid_argument iff this is the first time this
207 * method is called, and I does not match Input OR if the SequentialPipeline
208 * is already complete (aka this::Complete was called successfully)
209 *
210 * @pre This method is called when the number of registered stages is less
211 * than N
212 */
213 template<typename StageType>
214 112 SequentialPipeline &Complete(std::unique_ptr<StageType> stage) {
215 static_assert(std::is_same<Output, typename StageType::OutputType>::value,
216 "Final stage output must match pipeline output");
217 assert(this->size < N);
218
3/3
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 55 times.
✓ Branch 4 taken 1 times.
112 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
219
1/1
✓ Branch 4 taken 55 times.
110 this->AddStage(std::move(stage));
220 110 this->ready = true;
221 110 return *this;
222 }
223
224 /**
225 * Executes this SequentialPipeline
226 *
227 * @param input The input to this SequentialPipeline
228 *
229 * @return The output of the SequentialPipeline
230 *
231 * @note A SequentialPipeline runs by feeding the input to the first
232 * stage, taking its output and feeding it to the next
233 * stage, and so on until the last stage outputs
234 * the output.
235 *
236 * @pre The pipeline must have been completed successfully,
237 * i.e. before this::Run is called, this::Complete must have
238 * been called successfully
239 */
240 108 Output Run(const Input &input) override {
241 assert(!this->finalProduct);
242 // MANUAL VERIFICATION: The below branch is fully tested via pipeline-test.cpp
243
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 53 times.
108 if (!this->ready)
244
1/1
✓ Branch 2 taken 1 times.
2 throw std::runtime_error("This is an illegal action: the pipeline is not ready yet");
245 // Don't use "Input resource" unless necessary
246 106 *this->firstResource = input;
247 // If this pipeline is not composed, construct output
248 // into ourself, otherwise construct into the outer pipeline
249
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 11 times.
106 if (this->product == nullptr) {
250 // engage our finalProduct
251 84 this->finalProduct.emplace();
252 84 *this->lastProduct = &this->finalProduct.value();
253 84 this->product = &this->finalProduct.value();
254 } else {
255 22 *this->lastProduct = this->product;
256 }
257 106 Pipeline<Input, Output, N>::DoActionHelper();
258 106 return *this->product;
259 }
260
261 private:
262 /// The pointer to the variable that will store the first input
263 Input *firstResource = nullptr;
264 /// A temporary variable that always points to the last Stage's product field
265 void **lastProduct = nullptr;
266 };
267
268 /**
269 * A ModifyingPipeline modifies a resource with a given
270 * set of stages
271 *
272 * @param T The resource to modify
273 * @param N The number of stages that modify the resource
274 */
275 template<typename T, size_t N = DEFAULT_NUM_STAGES>
276 class ModifyingPipeline : public Pipeline<T, T, N> {
277 public:
278 /**
279 * Constructs a ModifyingPipeline
280 */
281 31 ModifyingPipeline() = default;
282
283 /**
284 * Adds a stage to this
285 *
286 * @param stage The ModifyingStage<T> to add to this
287 *
288 * @return this, with the added stage
289 */
290 12 ModifyingPipeline &AddStage(std::unique_ptr<ModifyingStage<T>> stage) {
291 assert(this->size < N - 1);
292
1/1
✓ Branch 4 taken 5 times.
14 Pipeline<T, T, N>::AddStageHelper(std::move(stage));
293 10 return *this;
294 }
295
296 /**
297 * Completes a pipeline with a stage
298 *
299 * @param stage The stage to add
300 *
301 * @return this, with the completed pipeline
302 *
303 * @pre This method is called when the number of
304 * registered stages is less than N
305 */
306 22 ModifyingPipeline &Complete(std::unique_ptr<ModifyingStage<T>> stage) {
307 assert(this->size < N);
308
1/1
✓ Branch 4 taken 13 times.
24 Pipeline<T, T, N>::CompleteHelper(std::move(stage));
309 20 return *this;
310 }
311
312 /**
313 * Executes this Modifying Pipeline
314 *
315 * @param input The input to modify
316 *
317 * @return input, but modified by all
318 * the registered stages, in order
319 */
320 13 T Run(const T &input) override {
321 assert(!this->finalProduct);
322
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 6 times.
13 if (!this->ready)
323
1/1
✓ Branch 2 taken 1 times.
2 throw std::runtime_error("This is an illegal action: the pipeline is not ready yet");
324 // Construct here if there is no next product,
325 // or construct there if there is
326
2/2
✓ Branch 0 taken 1 times.
✓ Branch 1 taken 5 times.
11 if (this->product == nullptr) {
327 // engage our finalProduct
328 3 this->finalProduct.emplace();
329 3 this->product = &this->finalProduct.value();
330 }
331 11 *this->product = input;
332
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 6 times.
27 for (size_t i = 0; i < this->size; i++) {
333 dynamic_cast<ModifyingStage<T> *>(this->stages[i])->SetResource(*this->product); // GCOVR_EXCL_LINE
334 }
335 11 Pipeline<T, T, N>::DoActionHelper();
336 11 return *this->product;
337 }
338 };
339
340 } // namespace found
341
342 #endif // SRC_COMMON_PIPELINE_PIPELINES_HPP_
343