FOUND
Loading...
Searching...
No Matches
pipelines.hpp
1#ifndef SRC_COMMON_PIPELINE_PIPELINES_HPP_
2#define SRC_COMMON_PIPELINE_PIPELINES_HPP_
3
4#include <optional>
5#include <stdexcept>
6#include <cassert>
7#include <memory>
8#include <type_traits>
9#include <utility>
10
11#include "common/pipeline/stages.hpp"
12
14#define DEFAULT_NUM_STAGES 10
15
16namespace found {
17
28template<typename Input,
29 typename Output,
30 size_t N = DEFAULT_NUM_STAGES>
31class Pipeline : public FunctionStage<Input, Output> {
32 public:
43 void DoAction() override {
44 this->Run(this->resource);
45 }
46
63 virtual Output Run(const Input &input) = 0;
64
65 protected:
67 std::unique_ptr<Action> stages[N];
69 size_t size = 0;
71 bool ready = false;
73 std::optional<Output> finalProduct;
74
88 inline void AddStageHelper(std::unique_ptr<Action> &&stage) {
89 assert(this->size < N);
90 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
91 this->stages[size++] = std::move(stage);
92 }
93
108 inline void CompleteHelper(std::unique_ptr<Action> &&stage) {
109 assert(this->size < N);
110 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
111 this->stages[size++] = std::move(stage);
112 this->ready = true;
113 }
114
120 inline void DoActionHelper() {
121 for (size_t i = 0; i < this->size; i++) {
122 this->stages[i]->DoAction();
123 }
124 }
125};
126
147template<typename Input, typename Output, size_t N = DEFAULT_NUM_STAGES>
148class SequentialPipeline : public Pipeline<Input, Output, N> {
149 public:
154
171 template<typename I, typename O> SequentialPipeline &AddStage(std::unique_ptr<FunctionStage<I, O>> stage) {
172 FunctionStage<I, O> *stagePtr = stage.get();
173 if (this->size == 0) {
174 if (!std::is_same<Input, I>::value) {
175 throw std::invalid_argument("The initial input type is not correct");
176 }
177 this->firstResource = reinterpret_cast<Input *>(&stagePtr->GetResource());
178 } else {
179 // Chain here, and blindly trust the user
180 *this->lastProduct = static_cast<void *>(&stagePtr->GetResource());
181 }
182 // Add to our list
184 // Now, reset the lastProduct to be of this stage
185 this->lastProduct = reinterpret_cast<void **>(&stagePtr->GetProduct());
186 // Return the pipeline for chaining
187 return *this;
188 }
189
204 template<typename I> SequentialPipeline &Complete(std::unique_ptr<FunctionStage<I, Output>> stage) {
205 assert(this->size < N);
206 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
207 this->AddStage(std::move(stage));
208 this->ready = true;
209 return *this;
210 }
211
228 Output Run(const Input &input) override {
229 assert(!this->finalProduct);
230 // MANUAL VERIFICATION: The below branch is fully tested via pipeline-test.cpp
231 if (!this->ready)
232 throw std::runtime_error("This is an illegal action: the pipeline is not ready yet");
233 // Don't use "Input resource" unless necessary
234 *this->firstResource = input;
235 // If this pipeline is not composed, construct output
236 // into ourself, otherwise construct into the outer pipeline
237 if (this->product == nullptr) {
238 // engage our finalProduct
239 this->finalProduct.emplace();
240 *this->lastProduct = &this->finalProduct.value();
241 this->product = &this->finalProduct.value();
242 } else {
243 *this->lastProduct = this->product;
244 }
246 return *this->product;
247 }
248
249 private:
251 Input *firstResource = nullptr;
253 void **lastProduct = nullptr;
254};
255
263template<typename T, size_t N = DEFAULT_NUM_STAGES>
264class ModifyingPipeline : public Pipeline<T, T, N> {
265 public:
269 ModifyingPipeline() = default;
270
279 assert(this->size < N - 1);
280 Pipeline<T, T, N>::AddStageHelper(std::move(stage));
281 return *this;
282 }
283
295 assert(this->size < N);
296 Pipeline<T, T, N>::CompleteHelper(std::move(stage));
297 return *this;
298 }
299
308 T Run(const T &input) override {
309 assert(!this->finalProduct);
310 if (!this->ready)
311 throw std::runtime_error("This is an illegal action: the pipeline is not ready yet");
312 // Construct here if there is no next product,
313 // or construct there if there is
314 if (this->product == nullptr) { // GCOVR_EXCL_BR_LINE
315 // engage our finalProduct
316 this->finalProduct.emplace();
317 this->product = &this->finalProduct.value();
318 }
319 *this->product = input;
320 for (size_t i = 0; i < this->size; i++) {
321 dynamic_cast<ModifyingStage<T> *>(this->stages[i].get())
322 ->SetResource(*this->product); // GCOVR_EXCL_LINE
323 }
325 return *this->product;
326 }
327};
328
329} // namespace found
330
331#endif // SRC_COMMON_PIPELINE_PIPELINES_HPP_
A FunctionStage is a data structure that wraps a function, and taking in parameter Input and returnin...
Definition stages.hpp:57
Output * product
The pointer to the stored output for this.
Definition stages.hpp:97
Input & GetResource()
Returns the stored input of this.
Definition stages.hpp:83
Output *& GetProduct()
Returns the stored output of this.
Definition stages.hpp:91
Input resource
The stored input for this.
Definition stages.hpp:95
A ModifyingPipeline modifies a resource with a given set of stages.
Definition pipelines.hpp:264
ModifyingPipeline & Complete(std::unique_ptr< ModifyingStage< T > > stage)
Completes a pipeline with a stage.
Definition pipelines.hpp:294
ModifyingPipeline()=default
Constructs a ModifyingPipeline.
T Run(const T &input) override
Executes this Modifying Pipeline.
Definition pipelines.hpp:308
ModifyingPipeline & AddStage(std::unique_ptr< ModifyingStage< T > > stage)
Adds a stage to this.
Definition pipelines.hpp:278
ModifyingStage is a stage that modifies a resource.
Definition stages.hpp:106
A Pipeline<Input, Output, N> is an abstract Pipeline that takes an Input, outputs an Output,...
Definition pipelines.hpp:31
void AddStageHelper(std::unique_ptr< Action > &&stage)
Adds a stage to this pipeline, taking ownership of the provided stage.
Definition pipelines.hpp:88
void CompleteHelper(std::unique_ptr< Action > &&stage)
Completes a pipeline with a final stage, taking ownership of it.
Definition pipelines.hpp:108
void DoActionHelper()
Runs the entire pipeline.
Definition pipelines.hpp:120
std::optional< Output > finalProduct
The final product. This is only sometimes used.
Definition pipelines.hpp:73
size_t size
The number of stages.
Definition pipelines.hpp:69
std::unique_ptr< Action > stages[N]
Ownership storage for the stages.
Definition pipelines.hpp:67
virtual Output Run(const Input &input)=0
Runs this Pipeline.
void DoAction() override
Runs a Pipeline.
Definition pipelines.hpp:43
bool ready
Whether we're complete.
Definition pipelines.hpp:71
SequentialPipeline is composite Stage (i.e.
Definition pipelines.hpp:148
void ** lastProduct
A temporary variable that always points to the last Stage's product field.
Definition pipelines.hpp:253
SequentialPipeline & AddStage(std::unique_ptr< FunctionStage< I, O > > stage)
Adds a stage to this pipeline.
Definition pipelines.hpp:171
SequentialPipeline()=default
Constructs an empty SequentialPipeline.
SequentialPipeline & Complete(std::unique_ptr< FunctionStage< I, Output > > stage)
Adds a stage to the pipeline and marks it as the last stage, preventing further manipulation of the S...
Definition pipelines.hpp:204
Input * firstResource
The pointer to the variable that will store the first input.
Definition pipelines.hpp:251
Output Run(const Input &input) override
Executes this SequentialPipeline.
Definition pipelines.hpp:228