完成Pipeline。-- 代码源自虎哥博客。感谢
2022/1/11 23:11:22
本文主要是介绍完成Pipeline。-- 代码源自虎哥博客。感谢,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
当时想实践一下Pipeline的构建。
未能实现的原因主要是在于
1. ClickHouse架构认识不足。
2.CMakeLists.txt 功力不足。
各占一半一半。
参见虎哥的博客: https://bohutang.me/2020/06/11/clickhouse-and-friends-processor/
1. Source
1 5 9 13 17 21 25 29 |
class MySource : public ISource { public: String getName() const override { return "MySource"; } MySource(UInt64 end_) : ISource(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})), end(end_) { } private: UInt64 end; bool done = false; Chunk generate() override { if (done) { return Chunk(); } MutableColumns columns; columns.emplace_back(ColumnUInt64::create()); for (auto i = 0U; i < end; i++) columns[0]->insert(i); done = true; return Chunk(std::move(columns), end); } }; |
2. MyAddTransform
1 5 9 13 17 21 25 29 33 37 41 45 49 53 57 61 65 69 73 |
class MyAddTransformer : public IProcessor { public: String getName() const override { return "MyAddTransformer"; } MyAddTransformer() : IProcessor( {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})}, {Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})}) , input(inputs.front()) , output(outputs.front()) { } Status prepare() override { if (output.isFinished()) { input.close(); return Status::Finished; } if (!output.canPush()) { input.setNotNeeded(); return Status::PortFull; } if (has_process_data) { output.push(std::move(current_chunk)); has_process_data = false; } if (input.isFinished()) { output.finish(); return Status::Finished; } if (!input.hasData()) { input.setNeeded(); return Status::NeedData; } current_chunk = input.pull(false); return Status::Ready; } void work() override { auto num_rows = current_chunk.getNumRows(); auto result_columns = current_chunk.cloneEmptyColumns(); auto columns = current_chunk.detachColumns(); for (auto i = 0U; i < num_rows; i++) { auto val = columns[0]->getUInt(i); result_columns[0]->insert(val+1); } current_chunk.setColumns(std::move(result_columns), num_rows); has_process_data = true; } InputPort & getInputPort() { return input; } OutputPort & getOutputPort() { return output; } protected: bool has_input = false; bool has_process_data = false; Chunk current_chunk; InputPort & input; OutputPort & output; }; |
3. MySink
1 5 9 13 17 21 25 29 33 |
class MySink : public ISink { public: String getName() const override { return "MySinker"; } MySink() : ISink(Block({ColumnWithTypeAndName{ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "number"}})) { } private: WriteBufferFromFileDescriptor out{STDOUT_FILENO}; FormatSettings settings; void consume(Chunk chunk) override { size_t rows = chunk.getNumRows(); size_t columns = chunk.getNumColumns(); for (size_t row_num = 0; row_num < rows; ++row_num) { writeString("prefix-", out); for (size_t column_num = 0; column_num < columns; ++column_num) { if (column_num != 0) writeChar('\t', out); getPort() .getHeader() .getByPosition(column_num) .type->serializeAsText(*chunk.getColumns()[column_num], row_num, out, settings); } writeChar('\n', out); } out.next(); } }; |
4. DAG Scheduler
1 5 9 13 |
int main(int, char **) { auto source0 = std::make_shared<MySource>(5); auto add0 = std::make_shared<MyAddTransformer>(); auto sinker0 = std::make_shared<MySink>(); /// Connect. connect(source0->getPort(), add0->getInputPort()); connect(add0->getOutputPort(), sinker0->getPort()); std::vector<ProcessorPtr> processors = {source0, add0, sinker0}; PipelineExecutor executor(processors); executor.execute(1); }
|
这篇关于完成Pipeline。-- 代码源自虎哥博客。感谢的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2025-01-10百万架构师第十三课:源码分析:Spring 源码分析:Spring核心IOC容器及依赖注入原理|JavaGuide
- 2025-01-10便捷好用的电商API工具合集
- 2025-01-09必试!帮 J 人团队解决物流错发漏发的软件神器!
- 2025-01-09不容小觑!助力 J 人物流客服安抚情绪的软件!
- 2025-01-09为什么医疗团队协作离不开智能文档工具?
- 2025-01-09惊叹:J 人团队用啥软件让物流服务快又准?
- 2025-01-09如何利用数据分析工具优化项目资源分配?4种工具推荐
- 2025-01-09多学科协作难?这款文档工具可以帮你省心省力
- 2025-01-09团队中的技术项目经理TPM:工作内容与资源优化策略
- 2025-01-09JIT生产管理法:优化流程,提升竞争力的秘诀