基础实现思路和难度总结
本次实验根据拉取式模型的火山模型的自上而下遍历节点获取数据的基本模式
分别实现limit算子,排序算子,嵌套循环连接算子,归并连接算子。
框架基本认识
本次操作为Executor基类,每个Executor派生类都包含一个其对应的Operator,即在操作计划树的情况下,构建了一棵相应的操作Executor类,每个类的函数即拉取式模型给出的三步:
- 初始化Init()
- 读取一行 Next()
- 结束 默认析构
class Executor {
public:
explicit Executor(ExecutorContext &context, std::vector<std::shared_ptr<Executor>> children)
: context_(context), children_(std::move(children)) {}
virtual ~Executor() = default;
virtual void Init() = 0;
virtual std::shared_ptr<Record> Next() = 0;
protected:
ExecutorContext &context_;
std::vector<std::shared_ptr<Executor>> children_;
};
因此每个算子获取行数据即调用子Executor节点的Next(),再经过自己的处理,如果符合条件,就输出,不符合则再次运行,寻找下一个符合条件的,若没有则返回空指针。
其中表达式有一些对应的函数需要了解:
- Expr.Evaluate(record):这个函数通过输入记录来代入记录到表达式,其返回的值是Value类型,即值,Value由于类型问题,故用一个类来统一接口。Value.Less,Value.Equal 等等。
- Expr.EvaluateJoin(record1, record2):前面的表达式为连接表达式,类似于“table1.columnA = table2.columnB”的形式,需要传入两个记录数据,会分别代入左右,然后其会判断是否一致来返回JOIN条件是否符合。
Limit 算子
lab4/10-limit.test
初始化
void LimitExecutor::Init() {
children_[0]->Init();
curLimit_ = -1;
curOffset_ = -1;
isFinished_ = false;
}
limit算子即实现 LIMIT 和 OFFSET 的功能,故设置成员变量以表示当前状态,通过记录当前的OFFSET值和LIMIT值,在偏移了几个量后会进行变化。
Next
std::shared_ptr<Record> LimitExecutor::Next() {
if(isFinished_){
return nullptr;
}
if(not loaded_){
if(plan_->limit_offset_.has_value()){
curOffset_ = plan_->limit_offset_.value();
}
if(plan_->limit_count_.has_value()){
curLimit_ = plan_->limit_count_.value();
}
loaded_ = true;
}
auto res = children_[0]->Next();
if(res == nullptr){
isFinished_ = true;
return nullptr;
}
while(curOffset_ > 0){
curOffset_--;
return Next();
}
if(curLimit_ == -1){
return res;
}else if (curLimit_ > 0){
curLimit_--;
return res;
}else{
isFinished_ = true;
return nullptr;
}
}
先递减OFFSET,代表偏移值,过滤掉偏移值的前几行,再对LIMIT进行限制,limit = -1代表没有设立limit即没有限制,反之就根据限制数进行递减,当限制数为0时,该算子输出结束,设置结束状态。
排序算子
lab4/20-sort.test
对输入的数据进行排序,又因ORDERBY有设置的排序键,故要依照排序键进行排序,还有一点即排序需要获取所有数据后再进行,故该算子先读取所有,排完序再依次输出。
初始化
void OrderByExecutor::Init() {
children_[0]->Init();
index_ = 0;
records_.clear();
isSorted_ = false;
}
使用一个vector进行记录,使用一个index_表示当前输出的因该是下标为多少的排序后成员。
Next
std::shared_ptr<Record> OrderByExecutor::Next() {
using pRec = std::shared_ptr<huadb::Record>;
if(not isSorted_){
auto res = children_[0]->Next();
while(res != nullptr){
records_.push_back(res);
res = children_[0]->Next();
}
std::function<bool(pRec, pRec)> compare = [&](pRec r1, pRec r2){
for(auto [orderType, expr]: plan_->order_bys_){
auto value1 = expr->Evaluate(r1);
auto value2 = expr->Evaluate(r2);
if(value1.Equal(value2)){
continue;
}
if(orderType == OrderByType::ASC or orderType == OrderByType::DEFAULT){
return value1.Less(value2);
}else{
return value1.Greater(value2);
}
}
return false;
};
std::sort(records_.begin(), records_.end(), compare);
isSorted_ = true;
}
if(index_ >= records_.size()){
return nullptr;
}
return records_[index_++];
}
根据当前的index状态来输出元素即可,但会有多个排序条件,排序条件由OrderByOperator处理。
class OrderByOperator : public Operator {
public:
OrderByOperator(std::shared_ptr<ColumnList> column_list, std::shared_ptr<Operator> child,
std::vector<std::pair<OrderByType, std::shared_ptr<OperatorExpression>>> order_bys)
: Operator(OperatorType::ORDERBY, std::move(column_list), {std::move(child)}), order_bys_(std::move(order_bys)) {}
std::string ToString(size_t indent_num = 0) const override {
return fmt::format("{}Order:\\n{}", std::string(indent_num * 2, ' '), children_[0]->ToString(indent_num + 1));
}
std::vector<std::pair<OrderByType, std::shared_ptr<OperatorExpression>>> order_bys_;
};
其中用order_bys_来存储排序条件,用一个pair存储了排序是升序还是降序,排序用表达式。
在排序中,设立排序函数,通过遍历排序条件,一旦前面条件的表达式相等,就看后面的排序表达式,直到比较出来为止即可。
嵌套循环连接算子
lab4/30-nested-loop-join.test
本次只要求实现内连接即可。
初始化
void NestedLoopJoinExecutor::Init() {
children_[0]->Init();
children_[1]->Init();
child0_ = children_[0]->Next();
child1_ = nullptr;
}
通过记录两个指针来记录当前遍历到的次序。
Next
std::shared_ptr<Record> NestedLoopJoinExecutor::Next() {
if(child0_ == nullptr){
return nullptr;
}
child1_ = children_[1]->Next();
if(child1_ == nullptr){
children_[1]->Init();
child1_ = children_[1]->Next();
child0_ = children_[0]->Next();
}
if(child0_ == nullptr or child1_ == nullptr){
return nullptr;
}
if(plan_->join_condition_->EvaluateJoin(child0_, child1_) == Value(true)){
std::shared_ptr<Record> pRec0 = std::make_shared<Record>(*child0_);
std::shared_ptr<Record> pRec1 = std::make_shared<Record>(*child1_);
pRec0->Append(*pRec1);
return pRec0;
}else{
return Next();
}
}
如果连接条件符合,即进行连接,连接此处对记录进行了拷贝,产生了新的连接后数据再进行返回。
先让child1遍历完后,再让child0往后一格,child1复位。
从 NestedLoopJoinOperator 中获取连接条件,再借助EvaluateJoin来判断是否符合连接条件。
class NestedLoopJoinOperator : public Operator {
public:
NestedLoopJoinOperator(std::shared_ptr<ColumnList> column_list, std::shared_ptr<Operator> left,
std::shared_ptr<Operator> right, std::shared_ptr<OperatorExpression> join_condition,
JoinType join_type = JoinType::INNER)
: join_condition_(std::move(join_condition)),
join_type_(join_type),
Operator(OperatorType::NESTEDLOOP, std::move(column_list), {std::move(left), std::move(right)}) {}
std::string ToString(size_t indent_num = 0) const override {
return fmt::format("{}NestedLoopJoin: {}\\n{}\\n{}", std::string(indent_num * 2, ' '), join_condition_,
children_[0]->ToString(indent_num + 1), children_[1]->ToString(indent_num + 1));
}
std::shared_ptr<OperatorExpression> join_condition_;
JoinType join_type_;
};
归并连接算子
lab4/40-merge-join.test
初始化
void MergeJoinExecutor::Init() {
children_[0]->Init();
children_[1]->Init();
child0_ = children_[0]->Next();
child1_ = children_[1]->Next();
left_ = 0;
right_ = 0;
leftTemp.clear();
rightTemp.clear();
}
相同的记录child0_ 和child1_ ,不同的是,使用了两个vector和两个下标数据,这是因为归并连接两边的数据可能会出现在连接的键上重复,例如左边有两个在连接值上为1的右边有三个,即会产生6个数据,因此左右记录后像循环连接一样输出即可。
Next
std::shared_ptr<Record> MergeJoinExecutor::Next() {
while ((rightTemp.size() and leftTemp.size()) or (child0_ != nullptr && child1_ != nullptr)) {
if(rightTemp.size() and leftTemp.size()){
std::shared_ptr<Record> pRec0 = std::make_shared<Record>(*leftTemp[left_]);
std::shared_ptr<Record> pRec1 = std::make_shared<Record>(*rightTemp[right_]);
pRec0->Append(*pRec1);
right_++;
if(right_ >= rightTemp.size()){
left_++;
right_ = 0;
}
if(left_ >= leftTemp.size()){
left_ = 0;right_ = 0;
rightTemp.clear();
leftTemp.clear();
}
return pRec0;
}
if(child0_ == nullptr or child1_ == nullptr){
return nullptr;
}
auto leftValue = plan_->left_key_->Evaluate(child0_);
auto rightValue = plan_->right_key_->Evaluate(child1_);
if (leftValue.Equal(rightValue)) {
while(child1_ != nullptr and rightValue == plan_->right_key_->Evaluate(child1_)){
rightTemp.push_back(child1_);
child1_ = children_[1]->Next();
}
while(child0_ != nullptr and rightValue == plan_->right_key_->Evaluate(child0_)){
leftTemp.push_back(child0_);
child0_ = children_[0]->Next();
}
continue;
}
if (leftValue.Less(rightValue)) {
child0_ = children_[0]->Next();
}
else {
child1_ = children_[1]->Next();
}
}
return nullptr;
}
在两值相同的时候,直接把左右相同的值都存成一块,这样的话即将重复变为不重复的处理模式,而后再加上两块之间的组合模式输出即可。
同样,连接条件从MergeJoinOperator 中获取
class MergeJoinOperator : public Operator {
public:
MergeJoinOperator(std::shared_ptr<ColumnList> column_list, std::shared_ptr<Operator> left,
std::shared_ptr<Operator> right, std::shared_ptr<OperatorExpression> left_key,
std::shared_ptr<OperatorExpression> right_key, JoinType join_type = JoinType::INNER)
: left_key_(std::move(left_key)),
right_key_(std::move(right_key)),
join_type_(join_type),
Operator(OperatorType::MERGEJOIN, std::move(column_list), {std::move(left), std::move(right)}) {}
std::string ToString(size_t indent_num = 0) const override {
return fmt::format("{}MergeJoin: left={} right={}\\n{}\\n{}", std::string(indent_num * 2, ' '), left_key_, right_key_,
children_[0]->ToString(indent_num + 1), children_[1]->ToString(indent_num + 1));
}
std::shared_ptr<OperatorExpression> left_key_;
std::shared_ptr<OperatorExpression> right_key_;
JoinType join_type_;
};
至此,实验四完成
难度和坑的总结
算子的处理相对简单,上面的部分处理略有不当,例如将排序放到初始化内可能更符合逻辑,对于归并连接的暂时存储方案也是一时想出来的,并不一定为最优解,仍待优化。