随机读取MongoDB存储的文件
2021/8/3 19:09:25
本文主要是介绍随机读取MongoDB存储的文件,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
背景
最近所开发的项目使用MongoDB的GridFS保存文件,且文件大部分均大于500M,在程序运行过程中需要读取文件数据进行处理。但是MongoDB提供的mongocxx只能将整个文件下载下来或者暂时保存在内存中。每次处理数据都需要下载到本地是比较耗时的,且每次处理完还需要删除临时文件。
GridFS是将一个大文件分成多个chunk进行保存的,因此考虑每次进下载文件的一个chunk到内存中。
总体设计
为了对读取MongoDB中的文件与读取本地文件的接口统一起来,增加了一个抽象基类,对本地文件的读取也通过一个实现了该基类的类来进行。
整体有三个类,一个用于处理本地文件,一个用于处理MongoDB存储的文件还有一个定义了接口的基类.
基类 fileReader
// fileReader class FileReader{ public: /** * 禁止拷贝操作, 复制一个文件操作对象的副本是没有意义的 * 移动操作不需要禁止 */ FileReader(const FileReader&) = delete; FileReader& operator=(const FileReader&) = delete; FileReader(); /** * 析构函数 当用户没有显式调用closeFile关闭文件时 * 析构函数需要负责关闭文件,即做最后的一道保险 */ virtual ~FileReader(); /** * 打开文件 * 文件由构造函数指定 * @return 成功打开文件返回true,否则返回false */ virtual void openFile() = 0; /** * @return 返回文件是否已打开 **/ virtual const bool isOpen() const = 0; /** * 关闭已打开的文件 * @return 文件未打开或成功关闭文件返回true */ virtual const bool closeFile() = 0; /** * 从文件中读取一行数据,并以字符串常量返回 * 遇到"\n"时认为一行 * @return 用字符串表示的该函数数据 */ virtual const std::optional<std::string> getLine() = 0; virtual const bool getLine(char *s, const size_t n) = 0; /** * 从文件的当前位置读取n个字节的数据存储到buff中 * 并且读取文件的位置会向后移动n个字节 * @param buff 用于存放读取到的数据的字节数组,长度必须不小于n指定的大小 * @param n 指定要读取的字节数 * @return 读取失败时返回0,否则返回读到的字节数 */ virtual const size_t read(char* buff, const size_t n) = 0; /** * @return 文件的字节数 */ virtual const int getFileSize() const = 0; /** * @return 用字符串表示的文件类型,无法识别出文件类型时返回UNKNOW_TYPE. * UNKNOW_TYPE为静态字符串常量 */ virtual const std::string getFileType() const = 0; /** * @return 文件名 */ virtual const std::string getFileName() const = 0; /** * 设置要从输入流中读取的下一个字节数据的位置 * @param offset 根据dir设置的模式,从指定位置的偏移量 * @param dir 使用ios_base::seekdir作为类型,含义与标准输入输出一致 * @return 设置成功时返回true,设置的位置非法时返回false **/ virtual bool seekg(const size_t offset, std::ios_base::seekdir dir) = 0; /** * @return 返回当前在文件中的位置 **/ virtual const int tellg() = 0; /** * @return 到达文件末尾时返回true */ virtual const bool eof() = 0; protected: static constexpr char* UNKNOW_TYPE = (char *)"unknow"; };
接口参考std::fstream
,由于MongoDB保存的文件不支持直接修改,所以不提供写的接口。
由于对本地文件的操作就只是做一层简单的转发,在此不展开
MongoFileReader
接口不在此展开,仅展开部分头文件代码
class MongoFileReader: public FileReader{ public: explicit MongoFileReader(std::string_view dbName, std::string_view bucketName, std::string_view fileId); ~MongoFileReader(); public: //与接口一致,不展开 private: std::optional<bsoncxx::document::value> getChunkDoc_(error::PCError &e, const int n); private: static const std::string FILES_SUFFIX; static const std::string CHUNKS_SUFFIX; private: std::string dbName_; std::string bucketName_; //文件在数据库中的id(ObjectId) std::string fileId_; bool isOpen_; int64_t fileSize_; std::string fileName_; mutable std::string fileType_; bsoncxx::document::view metaDataDoc_; /** * 缓冲区大小由读取的文件的chunkSize来决定 * 因此无法使用固定大小的字节数据来作为缓冲区 * 使用unqiue_ptr来存储指向缓冲区的指针 * 缓冲区中存放一个chunck的数据 * 除了文件的最后一个块之外,所有的块大小均为chunckSize_ */ int32_t chunkSize_; const int lineBufferSize_; std::unique_ptr<uint8_t[]> buffPtr_; //chunckNo为当前chunck在此文件中的位置 int currChunkNo_; //当前读取的数据在文件中的位置 size_t currPos_; uint32_t currChunkSize_; int chunksCount_; };
实现
openFile
MongoFileReader在打开文件时需要获取到文件的一些相关信息并进行保存
void MongoFileReader::openFile(){ if(isOpen_){ return; } //获取bucketName.files的文档信息 error::PCError err; auto doc = MongoStorage::getDoc(err, dbName_, bucketName_ + FILES_SUFFIX, fileId_); if(err.type != error::ERROR_TYPE::SUCCESS){ isOpen_ = false; return; } //获取文件信息 mongoHelper::getInt64(doc, "length", fileSize_); mongoHelper::getInt32(doc, "chunkSize", chunkSize_); mongoHelper::getString(doc, "filename", fileName_); mongoHelper::getDoc(doc, "metadata", metaDataDoc_); //计算文件应该有多少个chunks chunksCount_ = (fileSize_ + chunkSize_ - 1) / chunkSize_; //加载第一个chunk的数据 currPos_ = 0; currChunkNo_ = 0; if(auto chunkDoc = getChunkDoc_(err, currChunkNo_); chunkDoc.has_value()){ isOpen_ = mongoHelper::getBinData(chunkDoc.value(), "data", buffPtr_, currChunkSize_); } return ; }
其中MongoStorage::getDoc
是封装的一个辅助函数,作用是从MongoDB中获取一个doc。MongoDB中的文件分成两部分,一部分是xxx.files,记录文件的文件名,大小,metadata等信息,另一部分是xxx.chunks,保存文件的内容。
在获取到文件的基本信息后,获取文件的第一个chunk并保存到buffer中。
getChunkDoc_
getChunkDoc_用来获取文件的一个chunk,n为xxx.chunks中每个文档的n一致
optional<bsoncxx::document::value> MongoFileReader::getChunkDoc_(error::PCError &err, const int n){ using bsoncxx::builder::basic::kvp; using bsoncxx::builder::basic::make_document; bsoncxx::builder::basic::document condition; try{ condition.append(kvp("files_id", bsoncxx::oid(this->fileId_)), kvp("n", n)); auto chunks = MongoStorage::getDocs(err, dbName_, bucketName_ + CHUNKS_SUFFIX, condition); if(err.type != error::ERROR_TYPE::SUCCESS || chunks.size() != 1){ return nullopt; } return chunks[0]; }catch(bsoncxx::exception &e){ return nullopt; } }
read
read函数的参数与功能与标准库中的read基本一致
const size_t MongoFileReader::read(char *buff, const size_t n){ auto posInChunk = currPos_ - (chunkSize_ * currChunkNo_); if(currChunkSize_ - posInChunk >= n){ //当前的缓冲区中的数据足够所需的 memcpy(buff, buffPtr_.get() + posInChunk, n); currPos_ += n; return n; } //当前的缓冲区中的数据不够,但是已经是最后的chunk了 if(currChunkNo_ + 1 == chunksCount_){ size_t lack = currChunkSize_ - posInChunk; memcpy(buff, buffPtr_.get() + posInChunk, lack); return lack; } //当前缓冲区中的数据不够,但还有后续的chunk uint32_t copyed = currChunkSize_ - posInChunk; memcpy(buff, buffPtr_.get() + posInChunk, copyed); auto nextChunkNo = currChunkNo_ + 1; uint32_t nextChunkSize = 0; unique_ptr<uint8_t[]> nextBuff; error::PCError err; while(copyed < n && nextChunkNo < chunksCount_){ if(auto chunkDoc = getChunkDoc_(err, nextChunkNo); chunkDoc.has_value()){ DAO::mongoHelper::getBinData(chunkDoc.value(), "data", nextBuff, nextChunkSize); }else{ throw std::runtime_error("cann't read next chunk"); } auto nextCopyBytes = min(nextChunkSize, (uint32_t)n - copyed); memcpy(buff + copyed, nextBuff.get(), nextCopyBytes); copyed += nextCopyBytes; } //没有发生异常,完成拷贝,更新数据 currPos_ += copyed; currChunkNo_ = nextChunkNo; currChunkSize_ = nextChunkSize; buffPtr_ = move(nextBuff); return copyed; }
getLine(char *s, const size_t n)
获取一行数据,实现思路与read
类似
const bool MongoFileReader::getLine(char *s, const size_t n){ if(!isOpen()){ throw runtime_error("not open file"); } if(currPos_ >= fileSize_){ return false; } auto posInChunk = currPos_ - (chunkSize_ * currChunkNo_); assert(posInChunk >= 0 ); std::unique_ptr<uint8_t[]> buffUp; //存放当前缓冲块的 uint8_t *buffP = buffPtr_.get(); //指向当前读取的缓冲块的指针 auto currChunkNo = currChunkNo_; //当前读取的数据的chunk auto currChunkSize = currChunkSize_; // 当前读取的块的大小 error::PCError err; for(size_t i = 0; i < n; ){ if(posInChunk < currChunkSize){ s[i] = buffP[posInChunk++]; //读到一行 if(s[i] == '\n' || s[i] == '\0'){ break; } ++i; }else if(currChunkNo + 1 == chunksCount_){ //到达文件末尾 s[i] = '\0'; break; }else{ //读取下一个块 currChunkNo += 1; posInChunk = 0; if(auto chunkDoc = getChunkDoc_(err, currChunkNo); chunkDoc.has_value()){ DAO::mongoHelper::getBinData(chunkDoc.value(), "data", buffUp, currChunkSize); buffP = buffUp.get(); }else{ throw std::runtime_error("cann't read next chunk"); } } } currChunkNo_ = currChunkNo; currPos_ = currChunkNo_ * chunkSize_ + posInChunk; currChunkSize_ = currChunkSize; // buffPtr_ = std::move(buff); if(buffPtr_.get() != buffP){ buffPtr_ = std::move(buffUp); } return true; }
getLine()
getLine的另一个版本,不需要调用者提供字符数组,直接返回字符串
内部实现也是通过调用getLine(char *s, const size_t n)
实现的
返回值使用optional是为了处理读到空行的情况
const std::optional<std::string> MongoFileReader::getLine(){ char buffer[lineBufferSize_]; auto currPos = tellg(); if(getLine(buffer, lineBufferSize_) == false){ return nullopt; } string line = buffer; for(int times = 1; line.size() == times * lineBufferSize_; times++){ if(getLine(buffer, lineBufferSize_) == false){ seekg(currPos, ios_base::beg); return nullopt; } line.append(buffer); } return line; }
seekg
计算要移动到的位置所在的块,如果所在块与当前的缓冲区中的块是同一块,则不需要从数据库获取。
bool MongoFileReader::seekg(const size_t offset, ios_base::seekdir dir){ size_t nextPos; if(dir == ios_base::beg){ nextPos = offset; }else if(dir == ios_base::cur){ nextPos = currPos_ + offset; }else{ nextPos = fileSize_ - offset; } if(nextPos < 0 || nextPos >= fileSize_){ return false; } auto nextChunkNo = nextPos / chunkSize_; error::PCError err; if(nextChunkNo != currChunkNo_){ //加载新的chunk if(auto chunkDoc = getChunkDoc_(err, nextChunkNo); chunkDoc.has_value()){ DAO::mongoHelper::getBinData(chunkDoc.value(), "data", buffPtr_, currChunkSize_); }else{ throw std::runtime_error("cann't read next chunk"); } } currPos_ = nextPos; currChunkNo_ = nextChunkNo; return true; }
其余实现较简单,不做展开
这篇关于随机读取MongoDB存储的文件的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-15SendGrid 的 Go 客户端库怎么实现同时向多个邮箱发送邮件?-icode9专业技术文章分享
- 2024-11-15SendGrid 的 Go 客户端库怎么设置header 和 标签tag 呢?-icode9专业技术文章分享
- 2024-11-12Cargo deny安装指路
- 2024-11-02MongoDB项目实战:从入门到初级应用
- 2024-11-01随时随地一键转录,Google Cloud 新模型 Chirp 2 让语音识别更上一层楼
- 2024-10-25Google Cloud动手实验详解:如何在Cloud Run上开发无服务器应用
- 2024-10-24AI ?先驱齐聚 BAAI 2024,发布大规模语言、多模态、具身、生物计算以及 FlagOpen 2.0 等 AI 模型创新成果。
- 2024-10-20goland工具下,如修改一个项目的标准库SDK的版本-icode9专业技术文章分享
- 2024-10-17Go学习:初学者的简单教程
- 2024-10-17Go学习:新手入门完全指南