FOUND
Loading...
Searching...
No Matches
pipelines.hpp
1#ifndef SRC_COMMON_PIPELINE_PIPELINES_HPP_
2#define SRC_COMMON_PIPELINE_PIPELINES_HPP_
3
4#include <optional>
5#include <stdexcept>
6#include <cassert>
7
8#include "common/pipeline/stages.hpp"
9
11#define DEFAULT_NUM_STAGES 10
12
13namespace found {
14
25template<typename Input,
26 typename Output,
27 size_t N = DEFAULT_NUM_STAGES>
28class Pipeline : public FunctionStage<Input, Output> {
29 public:
40 void DoAction() override {
41 this->Run(this->resource);
42 }
43
60 virtual Output Run(const Input &input) = 0;
61
62 protected:
66 size_t size = 0;
68 bool ready = false;
70 std::optional<Output> finalProduct;
71
83 inline void AddStageHelper(Action &stage) {
84 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
85 this->stages[size++] = &stage;
86 }
87
102 inline void CompleteHelper(Action &stage) {
103 assert(this->size < N);
104 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
105 this->stages[size++] = &stage;
106 this->ready = true;
107 }
108
114 inline void DoActionHelper() {
115 for (size_t i = 0; i < this->size; i++) {
116 this->stages[i]->DoAction();
117 }
118 }
119};
120
141template<typename Input, typename Output, size_t N = DEFAULT_NUM_STAGES>
142class SequentialPipeline : public Pipeline<Input, Output, N> {
143 public:
148
165 template<typename I, typename O> SequentialPipeline &AddStage(FunctionStage<I, O> &stage) {
166 if (this->size == 0) {
167 if (!std::is_same<Input, I>::value) {
168 throw std::invalid_argument("The initial input type is not correct");
169 }
170 this->firstResource = reinterpret_cast<Input *>(&stage.GetResource());
171 } else {
172 // Chain here, and blindly trust the user
173 *this->lastProduct = static_cast<void *>(&stage.GetResource());
174 }
175 // Add to our list
177 // Now, reset the lastProduct to be of this stage
178 this->lastProduct = reinterpret_cast<void **>(&stage.GetProduct());
179 // Return the pipeline for chaining
180 return *this;
181 }
182
199 assert(this->size < N);
200 if (this->ready) throw std::invalid_argument("Pipeline is already ready");
201 this->AddStage(stage);
202 this->ready = true;
203 return *this;
204 }
205
222 Output Run(const Input &input) override {
223 assert(!this->finalProduct);
224 // MANUAL VERIFICATION: The below branch is fully tested via pipeline-test.cpp
225 if (!this->ready)
226 throw std::runtime_error("This is an illegal action: the pipeline is not ready yet");
227 // Don't use "Input resource" unless necessary
228 *this->firstResource = input;
229 // If this pipeline is not composed, construct output
230 // into ourself, otherwise construct into the outer pipeline
231 if (this->product == nullptr) {
232 // engage our finalProduct
233 this->finalProduct.emplace();
234 *this->lastProduct = &this->finalProduct.value();
235 this->product = &this->finalProduct.value();
236 } else {
237 *this->lastProduct = this->product;
238 }
240 return *this->product;
241 }
242
243 private:
245 Input *firstResource = nullptr;
247 void **lastProduct = nullptr;
248};
249
257template<typename T, size_t N = DEFAULT_NUM_STAGES>
258class ModifyingPipeline : public Pipeline<T, T, N> {
259 public:
263 ModifyingPipeline() = default;
264
273 assert(this->size < N - 1);
275 return *this;
276 }
277
289 assert(this->size < N);
291 return *this;
292 }
293
302 T Run(const T &input) override {
303 assert(!this->finalProduct);
304 if (!this->ready)
305 throw std::runtime_error("This is an illegal action: the pipeline is not ready yet");
306 // Construct here if there is no next product,
307 // or construct there if there is
308 if (this->product == nullptr) {
309 // engage our finalProduct
310 this->finalProduct.emplace();
311 this->product = &this->finalProduct.value();
312 }
313 *this->product = input;
314 for (size_t i = 0; i < this->size; i++) {
315 dynamic_cast<ModifyingStage<T> *>(this->stages[i])->SetResource(*this->product);
316 }
318 return *this->product;
319 }
320};
321
322} // namespace found
323
324#endif // SRC_COMMON_PIPELINE_PIPELINES_HPP_
Action is an interface that wraps a function that does something.
Definition stages.hpp:17
virtual void DoAction()=0
Performs some action.
A FunctionStage is a data structure that wraps a function, and taking in parameter Input and returnin...
Definition stages.hpp:56
Output * product
The pointer to the stored output for this.
Definition stages.hpp:96
Input & GetResource()
Returns the stored input of this.
Definition stages.hpp:82
Output *& GetProduct()
Returns the stored output of this.
Definition stages.hpp:90
Input resource
The stored input for this.
Definition stages.hpp:94
A ModifyingPipeline modifies a resource with a given set of stages.
Definition pipelines.hpp:258
ModifyingPipeline & Complete(ModifyingStage< T > &stage)
Completes a pipeline with a stage.
Definition pipelines.hpp:288
ModifyingPipeline()=default
Constructs a ModifyingPipeline.
T Run(const T &input) override
Executes this Modifying Pipeline.
Definition pipelines.hpp:302
ModifyingPipeline & AddStage(ModifyingStage< T > &stage)
Adds a stage to this.
Definition pipelines.hpp:272
ModifyingStage is a stage that modifies a resource.
Definition stages.hpp:105
A Pipeline<Input, Output, N> is an abstract Pipeline that takes an Input, outputs an Output,...
Definition pipelines.hpp:28
void DoActionHelper()
Runs the entire pipeline.
Definition pipelines.hpp:114
std::optional< Output > finalProduct
The final product. This is only sometimes used.
Definition pipelines.hpp:70
size_t size
The number of stages.
Definition pipelines.hpp:66
void AddStageHelper(Action &stage)
Adds a stage to this pipeline.
Definition pipelines.hpp:83
virtual Output Run(const Input &input)=0
Runs this Pipeline.
void CompleteHelper(Action &stage)
Completes a pipeline with a stage.
Definition pipelines.hpp:102
Action * stages[N]
The stages of this.
Definition pipelines.hpp:64
void DoAction() override
Runs a Pipeline.
Definition pipelines.hpp:40
bool ready
Whether we're complete.
Definition pipelines.hpp:68
SequentialPipeline is composite Stage (i.e.
Definition pipelines.hpp:142
void ** lastProduct
A temporary variable that always points to the last Stage's product field.
Definition pipelines.hpp:247
SequentialPipeline & Complete(FunctionStage< I, Output > &stage)
Adds a stage to the pipeline and marks it as the last stage, preventing further manipulation of the S...
Definition pipelines.hpp:198
SequentialPipeline()=default
Constructs an empty SequentialPipeline.
SequentialPipeline & AddStage(FunctionStage< I, O > &stage)
Adds a stage to this pipeline.
Definition pipelines.hpp:165
Input * firstResource
The pointer to the variable that will store the first input.
Definition pipelines.hpp:245
Output Run(const Input &input) override
Executes this SequentialPipeline.
Definition pipelines.hpp:222