基础实现思路和难度总结

先解决万圣节问题,后需要完成数据库并发的三种控制级别:读已提交,可重复读,可串行化。

本次实验使用MVCC多版本控制实现读已提交和可重复读,使用SS2PL严格两阶段锁来实现可串行化。

实验内容概述

本次实验的MVCC主要通过给数据记录(行)添加maxXid,minXid,cid。修改原来的tablePage添加数据方式以记录这三个值。

为了满足读已提交和可重复读,采用在seqScan扫描时添加可见性判断,参考以上三个值以判断事务是否可见。注意两者采用不同的活跃事务集,在两者条件下,使用不同的活跃事务集和不同的判断条件来判断可见性。

在实现两阶段锁时,先完成锁管理器,正确判断锁的升级,相斥条件。然后再在各个操作添加相应的锁,有表集锁和行级锁。SELECT有 for update和 for share。如果SELECT什么都不加,那就只给表级加意向锁,行级不加。

解决万圣节问题

10-halloween.test

添加记录头信息

对于版本控制MVCC,对于每个记录,本次的实验通过添加三个新的值,原本的deleted_(删除标签)将会被弃用,对于并行来说,不同事务看到不同的数据记录是不一样的,可能A可以访问,而B看到的就是已删除。

首先我们回顾xid代表了事务的id,以下是新的属性:

  • xmin_:代表插入该记录的事务id
  • xmax_:代表删除该记录的事务id
  • cid_:对于每个事务来说,一个事务有多个操作,比如有多次SELECT,多次UPDATE,或者是多种操作结合,因此对于每个操作,我们依旧进行隔离,使用一个cid代表commandID,代表事务内的操作id,同时,不同事务的操作id是独立的,没有比较的价值。

通过修改原来的table_page.cpp来完成,此处只展示修改后的代码。

//插入  InsertRecord函数
record->SetCid(cid);
record->SetXmin(xid);
record->SetXmax(NULL_XID);

// 删除  DeleteRecord函数
// header.SetDeleted(true); // 原来的,已删除
header.SetXmax(xid);

// 撤销删除  UndoDeleteRecord函数
// header.SetDeleted(false); // 原来的,已删除
header.SetXmax(NULL_XID);

回顾万圣节问题

UPDATE test SET id = id + 1

对于本次实验来说,所谓的更新操作有以下步骤:

  • 扫描所有数据,扫描到数据项
  • 删除原记录
  • 插入更新后的新记录,回到第一步继续扫描

万圣节问题也就是在最后一步插入后,第一步又读取到新插入的数据,也就是自己更新过的数据由于被新插入,以为自己没更新,进行无限循环次更新。

解决万圣节问题

导致问题的主要原因就是:这句操作A扫描了操作A插入的数据

解决方法:操作A看不到操作A更新后的数据(不可见)

对于原本的扫描,我们通过判断deleted_标记来判断是否可见,如果已删除,就是不可见,如果未删除,就是可见。

现在的扫描,我们判断Xmax_,如果是NULL_XID,就是未删除,否则就是已删除。

此时再多考虑一句,如果当前记录是本事务创建的,并且也是由本操作创建的,那么不可见。

std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
		if(record->Xmin_ == xid and record->cid_ == cid){
			return false;
		}
		if(record->Xmax_ == NULL_XID){
			return true;
		}else{
			return false;
		}
  };

通过设计这个判断是否可见函数,替代原来的判断是否删除的位置即可。

通过测试样例 10-halloween.test

通过MVCC完成并发控制可重复读隔离

20-mvcc_insert.test 21-mvcc_delete.test 22-mvcc_update.test 23-write_skew.test 30-repeatable_read.test

可重复读隔离

对于一个事务,从开始到完成的整个期间,它只能访问 在自己提交之前 已经commit的事务修改后的记录。

设置活跃事务集合

活跃事务集合也是本次要考虑的参考数据。对于可重复读隔离,用于判断的活跃事务集合为事务开始时依旧活跃(没有提交没有回滚)的事务id集合。

这个活跃事务集合是冻结的,一旦获取之后不会改变。对于这个活跃事务集合,通常是快照。

std::shared_ptr<Record> SeqScanExecutor::Next() {
  std::unordered_set<xid_t> active_xids;
  // 根据隔离级别,获取活跃事务的 xid(通过 context_ 获取需要的信息)
  xid_t xid = context_.GetXid();
  cid_t cid = context_.GetCid();
  TransactionManager &manager = context_.GetTransactionManager();
  IsolationLevel level = context_.GetIsolationLevel();
  
  if(level == IsolationLevel::REPEATABLE_READ){
    active_xids = manager.GetSnapshot(xid);
  }
  std::shared_ptr<Record> record = scan_->GetNextRecord(xid, level, cid, active_xids);
  return record;
}

数据记录可见性判断

按照隔离级别要求,我们修改可见性判断函数,函数不再是简单的直接判断是否删除,代码如下:

std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {

  xid_t del_xid = rec->GetXmax();
  if (del_xid != NULL_XID) {
    if (del_xid == xid && rec->GetCid() <= cid) {
      return false;
    }
    if (!active_xids.count(del_xid) and del_xid < xid) {
      return false;
    }
  }
  
  xid_t insert_xid = rec->GetXmin();
  if (insert_xid != NULL_XID) {
    if (insert_xid > xid){
      return false;
    }
    if (insert_xid == xid && rec->GetCid() >= cid) {
      return false;
    }
    if (active_xids.count(insert_xid)) {
      return false;
    }
  }else{
    return false;
  }
  
  return true;
  
};

对于可见性判断,我们将判断拆成两个步骤:检查是否插入,检查是否删除。

是否删除

  • XmaxID == NULL_XID:没有被删除,记录可见 END
  • XmaxID == xid:等于当前的id,代表是当前事务,比较cid
    • 记录cid ≥ 当前cid:还未删除,记录可见 END
    • 记录cid < 当前cid:已删除,记录不可见 END
  • XmaxID in active_xids:当前删除还未提交,故记录可见 END
  • XmaxID not in active_xids:不在活跃事务列表,要么已commit,要么是之后的新事务。
    • XmaxID ≥ xid:是新事务,新事务的删除不可见,故记录可见 END
    • XmaxID < xid:是已提交事务,事务的删除可见,故记录不可见 END

是否插入

  • XminID == NULL_XID:该记录未插入,不可见 END
  • XminID == xid:等于当前的id,代表是当前事务,比较cid
    • 记录cid > 当前cid:还未插入,不可见 END
    • 记录cid == 当前cid:万圣节问题,不可见 END
    • 记录cid < 当前cid:已经插入,可见 END
  • XminID in active_xids:当前删除还未提交,插入未生效,不可见
  • XminID not in active_xids:不在活跃事务列表,要么已commit,要么是之后的新事务。
    • XminID ≥ xid:是新事务,新事务的插入不可见,故记录不可见 END
    • XminID < xid:是已提交事务,事务的插入可见,故记录可见 END

通过以上的两次检查,如果两次检查都说明可见,则可见,若一者不可见,则不可见。

通过测试样例 20 - 30

实现读已提交隔离

40-read_committed.test

读已提交隔离

对于读已提交隔离,使用的活跃事务集合更加宽松,为实时的活跃事务集合。并且判断可见性和可重复读相似度很高。

设置活跃事务集合

读已提交隔离的活跃事务集合是实时的活跃事务集合,和读已提交是不一样的。

std::shared_ptr<Record> SeqScanExecutor::Next() {
  std::unordered_set<xid_t> active_xids;
  // 根据隔离级别,获取活跃事务的 xid(通过 context_ 获取需要的信息)
  xid_t xid = context_.GetXid();
  cid_t cid = context_.GetCid();
  TransactionManager &manager = context_.GetTransactionManager();
  IsolationLevel level = context_.GetIsolationLevel();
  
  if(level == IsolationLevel::REPEATABLE_READ){
    active_xids = manager.GetSnapshot(xid);
  } else if(level == IsolationLevel::READ_COMMITTED){
    active_xids = manager.GetActiveTransactions();
  }
  std::shared_ptr<Record> record = scan_->GetNextRecord(xid, level, cid, active_xids);
  return record;
}

记录可见性判断

相比可重复读来说,不需要判断xid > 当前xid,因为都会被活跃事务集合记录。

std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
  if(isolation_level == IsolationLevel::READ_COMMITTED){
    xid_t del_xid = rec->GetXmax();
    if (del_xid != NULL_XID) {
      if (del_xid == xid && rec->GetCid() <= cid) {
        return false;
      }
      
      if (!active_xids.count(del_xid) and del_xid != xid) {
        return false;
      }
    }
    xid_t insert_xid = rec->GetXmin();
    if (insert_xid != NULL_XID) {
      if (insert_xid == xid) {
        if(rec->GetCid() >= cid){
          return false;
        }else{
          return true;
        }
      }
      
      if (active_xids.count(insert_xid)) {
        return false;
      }
    } else {
      return false;
    }
    return true;
  } else { // if(isolation_level == IsolationLevel::REPEATABLE_READ)
	// 同上文,略
};

是否删除

  • XmaxID == NULL_XID:没有被删除,记录可见 END
  • XmaxID == xid:等于当前的id,代表是当前事务,比较cid
    • 记录cid ≥ 当前cid:还未删除,记录可见 END
    • 记录cid < 当前cid:已删除,记录不可见 END
  • XmaxID in active_xids:当前删除还未提交,故记录可见 END
  • XmaxID not in active_xids:不在活跃事务列表,是已提交事务,事务的删除可见,故记录不可见 END。(不同点)

是否插入

  • XminID == NULL_XID:该记录未插入,不可见 END
  • XminID == xid:等于当前的id,代表是当前事务,比较cid
    • 记录cid > 当前cid:还未插入,不可见 END
    • 记录cid == 当前cid:万圣节问题,不可见 END
    • 记录cid < 当前cid:已经插入,可见 END
  • XminID in active_xids:当前删除还未提交,插入未生效,不可见
  • XminID not in active_xids:不在活跃事务列表,是已提交事务,事务的插入可见,故记录可见 END(不同点)

通过以上的两次检查,如果两次检查都说明可见,则可见,若一者不可见,则不可见。

通过样例 40-read_committed.test

实现锁管理器

MVCC只能支持到可重复读,对于可串行化隔离,需要两阶段锁的帮助。

在完成这件事之前,我们要先完成锁的管理。

对于锁,有5种不同的锁,共享锁,互斥锁,意向共享锁,意向互斥锁,共享意向互斥锁

我们先完成锁的相斥性判断和升级判断

enum class LockType {
  S = 0,    // 共享锁
  X = 1,    // 互斥锁
  IS = 2,   // 意向共享锁
  IX = 3,   // 意向互斥锁
  SIX = 4,  // 共享意向互斥锁
};

bool LockManager::Compatible(LockType type_a, LockType type_b) const {
  // 判断锁是否相容
  // LAB 3 DONE
  const bool compatibility[5][5] = {
    {1, 0, 1, 0, 0},  // SHARED_LOCK
    {0, 0, 0, 0, 0},  // EXCLUSIVE_LOCK
    {1, 0, 1, 1, 1},  // INTENTION_SHARED_LOCK
    {0, 0, 1, 1, 0},  // INTENTION_EXCLUSIVE_LOCK
    {0, 0, 1, 0, 0}   // SHARED_INTENTION_EXCLUSIVE_LOCK
  };
  return compatibility[static_cast<int>(type_a)][static_cast<int>(type_b)];
}

LockType LockManager::Upgrade(LockType self, LockType other) const {
  // 升级锁类型
  // LAB 3 DONE
  const int upgrade[5][5] = {
    {0, 1, 0, 4, 4},  // SHARED_LOCK
    {1, 1, 1, 1, 1},  // EXCLUSIVE_LOCK
    {0, 1, 2, 3, 4},  // INTENTION_SHARED_LOCK
    {4, 1, 3, 3, 4},  // INTENTION_EXCLUSIVE_LOCK
    {4, 1, 4, 4, 4}   // SHARED_INTENTION_EXCLUSIVE_LOCK
  };
  return static_cast<LockType>(upgrade[static_cast<int>(self)][static_cast<int>(other)]);
}

此处的升级指的是同一个事务不同操作之间结合的升级。

例如操作1是SELECT,但操作2是UPDATE,故两者结合即是互斥锁。

结合规律条件已经很具体的写在上面了。

锁的数据结构管理

本次实验的粒度设置了两个锁记录,一个是 表锁,一个是 行锁。

struct Lock {
  xid_t xid;
  LockType lock_type;
};
// 新增的成员变量
std::map<oid_t, std::vector<Lock>> lock_table_;
std::map<std::pair<oid_t, Rid>, std::vector<Lock>> row_lock_table_;
std::map<xid_t, std::vector<oid_t>> xid_table_lock_;
std::map<xid_t, std::vector<std::pair<oid_t, Rid>>> xid_row_lock_;

先用一个map对应oid和vector代表 表锁,每一个表oid对于一个锁数组,记录了所有锁。同理,用pair将两个值作为key,就可以代表 行锁。

为了便于之后可以将某个xid的锁全部解锁,可以使用另外的两个表,分别代表xid在哪些表或者行上上了锁。

bool LockManager::LockTable(xid_t xid, LockType lock_type, oid_t oid) {
  // 对数据表加锁,成功加锁返回 true,如果数据表已被其他事务加锁,且锁的类型不相容,返回 false
  // 如果本事务已经持有该数据表的锁,根据需要升级锁的类型
  // LAB 3 DONE
  std::vector<Lock> &locks = lock_table_[oid];

  Lock *oriLock = nullptr;

  for(auto &lock: locks){
    if(lock.xid == xid){
      oriLock = &lock;
      break;
    }
  }

  if(oriLock != nullptr){
    LockType upgraded_lock = Upgrade(oriLock->lock_type, lock_type);
    for(auto &lock: locks){
      if(lock.xid != xid and not Compatible(lock.lock_type, upgraded_lock)){
        return false;
      }
    }
    oriLock->lock_type = upgraded_lock;
  }else{
    for(auto &lock: locks){
      if(not Compatible(lock.lock_type, lock_type)){
        return false;
      }
    }
    locks.push_back({xid, lock_type});
    xid_table_lock_[xid].push_back(oid);
  }
  return true;
}

bool LockManager::LockRow(xid_t xid, LockType lock_type, oid_t oid, Rid rid) {
  // 对数据行加锁,成功加锁返回 true,如果数据行已被其他事务加锁,且锁的类型不相容,返回 false
  // 如果本事务已经持有该数据行的锁,根据需要升级锁的类型
  // LAB 3 DONE
  std::pair<oid_t, Rid> row = {oid, rid};

  std::vector<Lock> &locks = row_lock_table_[row];

  Lock *oriLock = nullptr;
  for(auto &lock: locks){
    if(lock.xid == xid){
      oriLock = &lock;
      break;
    }
  }

  if(oriLock != nullptr){
    LockType upgraded_lock = Upgrade(oriLock->lock_type, lock_type);
    for(auto &lock: locks){
      if(lock.xid != xid and not Compatible(lock.lock_type, upgraded_lock)){
        return false;
      }
    }
    oriLock->lock_type = upgraded_lock;
  }else{
    for(auto &lock: locks){
      if(not Compatible(lock.lock_type, lock_type)){
        return false;
      }
    }
    locks.push_back({xid, lock_type});
    xid_row_lock_[xid].push_back(row);
  }
  return true;
}

void LockManager::ReleaseLocks(xid_t xid) {
  // 释放事务 xid 持有的所有锁
  // LAB 3 DONE
  if (xid_table_lock_.find(xid) != xid_table_lock_.end()) {
    for (oid_t oid : xid_table_lock_[xid]) {
      auto& locks = lock_table_[oid];
      locks.erase(std::remove_if(locks.begin(), locks.end(),
                                 [xid](const Lock& lock) { return lock.xid == xid; }),
                  locks.end());
    }
    xid_table_lock_.erase(xid);
  }

  if (xid_row_lock_.find(xid) != xid_row_lock_.end()) {
    for (const auto& row : xid_row_lock_[xid]) {
      auto& locks = row_lock_table_[row];
      locks.erase(std::remove_if(locks.begin(), locks.end(),
                                 [xid](const Lock& lock) { return lock.xid == xid; }),
                  locks.end());
    }
    xid_row_lock_.erase(xid);
  }
}

按照上面定义的数据结构,简单定义函数即可。

完成了锁的管理,接下来可以实现可串行化了

使用MVCC+SS2PL实现可串行化隔离

50-lock.test

60-mv2pl.test

MVCC作为基础,将两阶段锁添加上去,就可以实现可串行化,我们已经定义了所有锁的数据结构和锁的操作,接下来只要实施上即可。

SELECT加锁

对于可串行化隔离,SELECT有不同的加锁情况:

  • SELECT FOR SHARE:为表加上意向共享锁,数据行加上共享锁
  • SELECT FOR UPDATE:为表加上意向互斥锁,数据行加上互斥锁
  • SELECT:为表加上意向共享锁,不加行锁
std::shared_ptr<Record> SeqScanExecutor::Next() {
  std::unordered_set<xid_t> active_xids;
  // 根据隔离级别,获取活跃事务的 xid(通过 context_ 获取需要的信息)
  // 通过 context_ 获取正确的锁,加锁失败时抛出异常
  // LAB 3 BEGIN

  xid_t xid = context_.GetXid();
  cid_t cid = context_.GetCid();
  TransactionManager &manager = context_.GetTransactionManager();
  IsolationLevel level = context_.GetIsolationLevel();
  if(level == IsolationLevel::REPEATABLE_READ){
    active_xids = manager.GetSnapshot(xid);
  } else if(level == IsolationLevel::READ_COMMITTED){
    active_xids = manager.GetActiveTransactions();
  } else if(level == IsolationLevel::SERIALIZABLE){
    active_xids = manager.GetSnapshot(xid);
  }

  std::shared_ptr<Record> record = scan_->GetNextRecord(xid, level, cid, active_xids);

  if (!record) {
    return nullptr; // 扫描结束
  }

  if(level == IsolationLevel::SERIALIZABLE){

    LockManager &lockManager = context_.GetLockManager();
    bool success;
    success = lockManager.LockTable(xid, LockType::IS, scan_->table_->GetOid());
    if(not success){
      throw DbException("cannot add lock");
    }
  }

  return record;
}

以上是最终的扫描函数,以下还有FOR SHARE 和 FOR UPDATE:

std::shared_ptr<Record> LockRowsExecutor::Next() {
  auto record = children_[0]->Next();
  if (record == nullptr) {
    return nullptr;
  }

  xid_t xid = context_.GetXid();
  LockManager &lockManager = context_.GetLockManager();
  // 根据 plan_ 的 lock type 获取正确的锁,加锁失败时抛出异常
  // LAB3 DONE
  if (plan_->GetLockType() == SelectLockType::SHARE){
    bool success;
    success = lockManager.LockTable(xid, LockType::IS, plan_->GetOid());
    if(not success){
      throw DbException("cannot add lock");
    }

    success = lockManager.LockRow(xid, LockType::S, plan_->GetOid(), record->GetRid());
    if(not success){
      throw DbException("cannot add lock");
    }
  }
  if (plan_->GetLockType() == SelectLockType::UPDATE){
    bool success;
    success = lockManager.LockTable(xid, LockType::IX, plan_->GetOid());
    if(not success){
      throw DbException("cannot add lock");
    }

    success = lockManager.LockRow(xid, LockType::X, plan_->GetOid(), record->GetRid());
    if(not success){
      throw DbException("cannot add lock");
    }
  }
  if (plan_->GetLockType() == SelectLockType::NOLOCK){
		;
  }
  return record;
}

INSERT 加锁

对于可串行化隔离,INSERT有:

  • 对表级加意向互斥锁,对数据行加互斥锁
std::shared_ptr<Record> InsertExecutor::Next() {
  if (finished_) {
    return nullptr;
  }
  uint32_t count = 0;
  while (auto record = children_[0]->Next()) {
    std::vector<Value> values(column_list_.Length());
    const auto &insert_columns = plan_->GetInsertColumns().GetColumns();
    for (size_t i = 0; i < insert_columns.size(); i++) {
      auto column_index = column_list_.GetColumnIndex(insert_columns[i].GetName());
      values[column_index] = record->GetValue(i);
    }
    auto table_record = std::make_shared<Record>(std::move(values));
    // 通过 context_ 获取正确的锁,加锁失败时抛出异常
    // LAB 3 BEGIN
    LockManager &lockManager = context_.GetLockManager();
    xid_t xid = context_.GetXid();
    
    bool success;
    success = lockManager.LockTable(xid, LockType::IX, table_->GetOid());
    if(not success){
      throw DbException("cannot add lock");
    }

    auto rid = table_->InsertRecord(std::move(table_record), context_.GetXid(), context_.GetCid(), true);
    count++;

    success = lockManager.LockRow(xid, LockType::X, table_->GetOid(), rid);
    if(not success){
      throw DbException("cannot add lock");
    }
  }
  finished_ = true;
  return std::make_shared<Record>(std::vector{Value(count)});
}

DELETE 加锁

对于可串行化隔离,DELETE有:

  • 对表级加意向互斥锁,对数据行加互斥锁
std::shared_ptr<Record> DeleteExecutor::Next() {
  if (finished_) {
    return nullptr;
  }
  uint32_t count = 0;
  while (auto record = children_[0]->Next()) {
    // 通过 context_ 获取正确的锁,加锁失败时抛出异常
    
    LockManager &lockManager = context_.GetLockManager();
    xid_t xid = context_.GetXid();
    
    bool success;
    success = lockManager.LockTable(xid, LockType::IX, table_->GetOid());
    if(not success){
      throw DbException("cannot add lock");
    }

    success = lockManager.LockRow(xid, LockType::X, table_->GetOid(), record->GetRid());
    if(not success){
      throw DbException("cannot add lock");
    }
    table_->DeleteRecord(record->GetRid(), context_.GetXid(), true);
    count++;
  }
  finished_ = true;
  return std::make_shared<Record>(std::vector{Value(count)});
}

UPDATE 加锁

对于可串行化隔离,UPDATE有:

  • 对表级加意向互斥锁,对数据行加互斥锁
std::shared_ptr<Record> UpdateExecutor::Next() {
  if (finished_) {
    return nullptr;
  }
  uint32_t count = 0;
  while (auto record = children_[0]->Next()) {
    std::vector<Value> values;
    for (const auto &expr : plan_->update_exprs_) {
      values.push_back(expr->Evaluate(record));
    }
    auto new_record = std::make_shared<Record>(std::move(values));
    // 通过 context_ 获取正确的锁,加锁失败时抛出异常
    // LAB 3 BEGIN

    LockManager &lockManager = context_.GetLockManager();
    xid_t xid = context_.GetXid();
    
    bool success;
    success = lockManager.LockTable(xid, LockType::IX, table_->GetOid());
    if(not success){
      throw DbException("cannot add lock");
    }

    success = lockManager.LockRow(xid, LockType::X, table_->GetOid(), record->GetRid());
    if(not success){
      throw DbException("cannot add lock");
    }
    
    auto rid = table_->UpdateRecord(record->GetRid(), context_.GetXid(), context_.GetCid(), new_record, true);
    count++;
  }
  finished_ = true;
  return std::make_shared<Record>(std::vector{Value(count)});
}

修改可见性函数

以下是最后的可见性函数

std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
  if(isolation_level == IsolationLevel::READ_COMMITTED){
    xid_t del_xid = rec->GetXmax();
    if (del_xid != NULL_XID) {
      if (del_xid == xid && rec->GetCid() <= cid) {
        return false;
      }
      
      if (!active_xids.count(del_xid) and del_xid != xid) {
        return false;
      }
    }
    xid_t insert_xid = rec->GetXmin();
    if (insert_xid != NULL_XID) {
      if (insert_xid == xid) {
        if(rec->GetCid() >= cid){
          return false;
        }else{
          return true;
        }
      }
      
      if (active_xids.count(insert_xid)) {
        return false;
      }
    } else {
      return false;
    }
    return true;
  } else { // if(isolation_level == IsolationLevel::REPEATABLE_READ)
    xid_t del_xid = rec->GetXmax();
    if (del_xid != NULL_XID) {
      if (del_xid == xid && rec->GetCid() <= cid) {
        return false;
      }
      if (!active_xids.count(del_xid) and del_xid < xid) {
        return false;
      }
    }
    xid_t insert_xid = rec->GetXmin();
    if (insert_xid != NULL_XID) {
      if (insert_xid > xid){
        return false;
      }
      if (insert_xid == xid && rec->GetCid() >= cid) {
        return false;
      }
      if (active_xids.count(insert_xid)) {
        return false;
      }
    }else{
      return false;
    }
    return true;
  }
};

通过样例50-lock.test 60-mv2pl.test

本次实验完成

难度和坑的总结

这个实验花费了很长时间,主要集中在部分问题上

  1. 在可重复读隔离级别,xid如果大于当前id,就和活跃事务相当。感觉可以把可重复读和读已提交两个实验任务的顺序对调,会好很多。
  2. 设置了新的数据项max min xid后,需要再调整之前的有个初始化定义点,在日志中插入的记录也要包含设置好的max min。
  3. 对于锁的升级尝试有所问题,最后在测试样例中不断debug,调整成功
  4. 可串行化的SELECT不带FOR是只给表加锁,不给数据行加锁。