基础实现思路和难度总结
先解决万圣节问题,后需要完成数据库并发的三种控制级别:读已提交,可重复读,可串行化。
本次实验使用MVCC多版本控制实现读已提交和可重复读,使用SS2PL严格两阶段锁来实现可串行化。
实验内容概述
本次实验的MVCC主要通过给数据记录(行)添加maxXid,minXid,cid。修改原来的tablePage添加数据方式以记录这三个值。
为了满足读已提交和可重复读,采用在seqScan扫描时添加可见性判断,参考以上三个值以判断事务是否可见。注意两者采用不同的活跃事务集,在两者条件下,使用不同的活跃事务集和不同的判断条件来判断可见性。
在实现两阶段锁时,先完成锁管理器,正确判断锁的升级,相斥条件。然后再在各个操作添加相应的锁,有表集锁和行级锁。SELECT有 for update和 for share。如果SELECT什么都不加,那就只给表级加意向锁,行级不加。
解决万圣节问题
10-halloween.test
添加记录头信息
对于版本控制MVCC,对于每个记录,本次的实验通过添加三个新的值,原本的deleted_(删除标签)将会被弃用,对于并行来说,不同事务看到不同的数据记录是不一样的,可能A可以访问,而B看到的就是已删除。
首先我们回顾xid代表了事务的id,以下是新的属性:
- xmin_:代表插入该记录的事务id
- xmax_:代表删除该记录的事务id
- cid_:对于每个事务来说,一个事务有多个操作,比如有多次SELECT,多次UPDATE,或者是多种操作结合,因此对于每个操作,我们依旧进行隔离,使用一个cid代表commandID,代表事务内的操作id,同时,不同事务的操作id是独立的,没有比较的价值。
通过修改原来的table_page.cpp来完成,此处只展示修改后的代码。
//插入 InsertRecord函数
record->SetCid(cid);
record->SetXmin(xid);
record->SetXmax(NULL_XID);
// 删除 DeleteRecord函数
// header.SetDeleted(true); // 原来的,已删除
header.SetXmax(xid);
// 撤销删除 UndoDeleteRecord函数
// header.SetDeleted(false); // 原来的,已删除
header.SetXmax(NULL_XID);
回顾万圣节问题
UPDATE test SET id = id + 1
对于本次实验来说,所谓的更新操作有以下步骤:
- 扫描所有数据,扫描到数据项
- 删除原记录
- 插入更新后的新记录,回到第一步继续扫描
万圣节问题也就是在最后一步插入后,第一步又读取到新插入的数据,也就是自己更新过的数据由于被新插入,以为自己没更新,进行无限循环次更新。
解决万圣节问题
导致问题的主要原因就是:这句操作A扫描了操作A插入的数据
解决方法:操作A看不到操作A更新后的数据(不可见)
对于原本的扫描,我们通过判断deleted_标记来判断是否可见,如果已删除,就是不可见,如果未删除,就是可见。
现在的扫描,我们判断Xmax_,如果是NULL_XID,就是未删除,否则就是已删除。
此时再多考虑一句,如果当前记录是本事务创建的,并且也是由本操作创建的,那么不可见。
std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
if(record->Xmin_ == xid and record->cid_ == cid){
return false;
}
if(record->Xmax_ == NULL_XID){
return true;
}else{
return false;
}
};
通过设计这个判断是否可见函数,替代原来的判断是否删除的位置即可。
通过测试样例 10-halloween.test
通过MVCC完成并发控制可重复读隔离
20-mvcc_insert.test 21-mvcc_delete.test 22-mvcc_update.test 23-write_skew.test 30-repeatable_read.test
可重复读隔离
对于一个事务,从开始到完成的整个期间,它只能访问 在自己提交之前 已经commit的事务修改后的记录。
设置活跃事务集合
活跃事务集合也是本次要考虑的参考数据。对于可重复读隔离,用于判断的活跃事务集合为事务开始时依旧活跃(没有提交没有回滚)的事务id集合。
这个活跃事务集合是冻结的,一旦获取之后不会改变。对于这个活跃事务集合,通常是快照。
std::shared_ptr<Record> SeqScanExecutor::Next() {
std::unordered_set<xid_t> active_xids;
// 根据隔离级别,获取活跃事务的 xid(通过 context_ 获取需要的信息)
xid_t xid = context_.GetXid();
cid_t cid = context_.GetCid();
TransactionManager &manager = context_.GetTransactionManager();
IsolationLevel level = context_.GetIsolationLevel();
if(level == IsolationLevel::REPEATABLE_READ){
active_xids = manager.GetSnapshot(xid);
}
std::shared_ptr<Record> record = scan_->GetNextRecord(xid, level, cid, active_xids);
return record;
}
数据记录可见性判断
按照隔离级别要求,我们修改可见性判断函数,函数不再是简单的直接判断是否删除,代码如下:
std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
xid_t del_xid = rec->GetXmax();
if (del_xid != NULL_XID) {
if (del_xid == xid && rec->GetCid() <= cid) {
return false;
}
if (!active_xids.count(del_xid) and del_xid < xid) {
return false;
}
}
xid_t insert_xid = rec->GetXmin();
if (insert_xid != NULL_XID) {
if (insert_xid > xid){
return false;
}
if (insert_xid == xid && rec->GetCid() >= cid) {
return false;
}
if (active_xids.count(insert_xid)) {
return false;
}
}else{
return false;
}
return true;
};
对于可见性判断,我们将判断拆成两个步骤:检查是否插入,检查是否删除。
是否删除
- XmaxID == NULL_XID:没有被删除,记录可见 END
- XmaxID == xid:等于当前的id,代表是当前事务,比较cid
- 记录cid ≥ 当前cid:还未删除,记录可见 END
- 记录cid < 当前cid:已删除,记录不可见 END
- XmaxID in active_xids:当前删除还未提交,故记录可见 END
- XmaxID not in active_xids:不在活跃事务列表,要么已commit,要么是之后的新事务。
- XmaxID ≥ xid:是新事务,新事务的删除不可见,故记录可见 END
- XmaxID < xid:是已提交事务,事务的删除可见,故记录不可见 END
是否插入
- XminID == NULL_XID:该记录未插入,不可见 END
- XminID == xid:等于当前的id,代表是当前事务,比较cid
- 记录cid > 当前cid:还未插入,不可见 END
- 记录cid == 当前cid:万圣节问题,不可见 END
- 记录cid < 当前cid:已经插入,可见 END
- XminID in active_xids:当前删除还未提交,插入未生效,不可见
- XminID not in active_xids:不在活跃事务列表,要么已commit,要么是之后的新事务。
- XminID ≥ xid:是新事务,新事务的插入不可见,故记录不可见 END
- XminID < xid:是已提交事务,事务的插入可见,故记录可见 END
通过以上的两次检查,如果两次检查都说明可见,则可见,若一者不可见,则不可见。
通过测试样例 20 - 30
实现读已提交隔离
40-read_committed.test
读已提交隔离
对于读已提交隔离,使用的活跃事务集合更加宽松,为实时的活跃事务集合。并且判断可见性和可重复读相似度很高。
设置活跃事务集合
读已提交隔离的活跃事务集合是实时的活跃事务集合,和读已提交是不一样的。
std::shared_ptr<Record> SeqScanExecutor::Next() {
std::unordered_set<xid_t> active_xids;
// 根据隔离级别,获取活跃事务的 xid(通过 context_ 获取需要的信息)
xid_t xid = context_.GetXid();
cid_t cid = context_.GetCid();
TransactionManager &manager = context_.GetTransactionManager();
IsolationLevel level = context_.GetIsolationLevel();
if(level == IsolationLevel::REPEATABLE_READ){
active_xids = manager.GetSnapshot(xid);
} else if(level == IsolationLevel::READ_COMMITTED){
active_xids = manager.GetActiveTransactions();
}
std::shared_ptr<Record> record = scan_->GetNextRecord(xid, level, cid, active_xids);
return record;
}
记录可见性判断
相比可重复读来说,不需要判断xid > 当前xid,因为都会被活跃事务集合记录。
std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
if(isolation_level == IsolationLevel::READ_COMMITTED){
xid_t del_xid = rec->GetXmax();
if (del_xid != NULL_XID) {
if (del_xid == xid && rec->GetCid() <= cid) {
return false;
}
if (!active_xids.count(del_xid) and del_xid != xid) {
return false;
}
}
xid_t insert_xid = rec->GetXmin();
if (insert_xid != NULL_XID) {
if (insert_xid == xid) {
if(rec->GetCid() >= cid){
return false;
}else{
return true;
}
}
if (active_xids.count(insert_xid)) {
return false;
}
} else {
return false;
}
return true;
} else { // if(isolation_level == IsolationLevel::REPEATABLE_READ)
// 同上文,略
};
是否删除
- XmaxID == NULL_XID:没有被删除,记录可见 END
- XmaxID == xid:等于当前的id,代表是当前事务,比较cid
- 记录cid ≥ 当前cid:还未删除,记录可见 END
- 记录cid < 当前cid:已删除,记录不可见 END
- XmaxID in active_xids:当前删除还未提交,故记录可见 END
- XmaxID not in active_xids:不在活跃事务列表,是已提交事务,事务的删除可见,故记录不可见 END。(不同点)
是否插入
- XminID == NULL_XID:该记录未插入,不可见 END
- XminID == xid:等于当前的id,代表是当前事务,比较cid
- 记录cid > 当前cid:还未插入,不可见 END
- 记录cid == 当前cid:万圣节问题,不可见 END
- 记录cid < 当前cid:已经插入,可见 END
- XminID in active_xids:当前删除还未提交,插入未生效,不可见
- XminID not in active_xids:不在活跃事务列表,是已提交事务,事务的插入可见,故记录可见 END(不同点)
通过以上的两次检查,如果两次检查都说明可见,则可见,若一者不可见,则不可见。
通过样例 40-read_committed.test
实现锁管理器
MVCC只能支持到可重复读,对于可串行化隔离,需要两阶段锁的帮助。
在完成这件事之前,我们要先完成锁的管理。
对于锁,有5种不同的锁,共享锁,互斥锁,意向共享锁,意向互斥锁,共享意向互斥锁
我们先完成锁的相斥性判断和升级判断
enum class LockType {
S = 0, // 共享锁
X = 1, // 互斥锁
IS = 2, // 意向共享锁
IX = 3, // 意向互斥锁
SIX = 4, // 共享意向互斥锁
};
bool LockManager::Compatible(LockType type_a, LockType type_b) const {
// 判断锁是否相容
// LAB 3 DONE
const bool compatibility[5][5] = {
{1, 0, 1, 0, 0}, // SHARED_LOCK
{0, 0, 0, 0, 0}, // EXCLUSIVE_LOCK
{1, 0, 1, 1, 1}, // INTENTION_SHARED_LOCK
{0, 0, 1, 1, 0}, // INTENTION_EXCLUSIVE_LOCK
{0, 0, 1, 0, 0} // SHARED_INTENTION_EXCLUSIVE_LOCK
};
return compatibility[static_cast<int>(type_a)][static_cast<int>(type_b)];
}
LockType LockManager::Upgrade(LockType self, LockType other) const {
// 升级锁类型
// LAB 3 DONE
const int upgrade[5][5] = {
{0, 1, 0, 4, 4}, // SHARED_LOCK
{1, 1, 1, 1, 1}, // EXCLUSIVE_LOCK
{0, 1, 2, 3, 4}, // INTENTION_SHARED_LOCK
{4, 1, 3, 3, 4}, // INTENTION_EXCLUSIVE_LOCK
{4, 1, 4, 4, 4} // SHARED_INTENTION_EXCLUSIVE_LOCK
};
return static_cast<LockType>(upgrade[static_cast<int>(self)][static_cast<int>(other)]);
}
此处的升级指的是同一个事务不同操作之间结合的升级。
例如操作1是SELECT,但操作2是UPDATE,故两者结合即是互斥锁。
结合规律条件已经很具体的写在上面了。
锁的数据结构管理
本次实验的粒度设置了两个锁记录,一个是 表锁,一个是 行锁。
struct Lock {
xid_t xid;
LockType lock_type;
};
// 新增的成员变量
std::map<oid_t, std::vector<Lock>> lock_table_;
std::map<std::pair<oid_t, Rid>, std::vector<Lock>> row_lock_table_;
std::map<xid_t, std::vector<oid_t>> xid_table_lock_;
std::map<xid_t, std::vector<std::pair<oid_t, Rid>>> xid_row_lock_;
先用一个map对应oid和vector
为了便于之后可以将某个xid的锁全部解锁,可以使用另外的两个表,分别代表xid在哪些表或者行上上了锁。
bool LockManager::LockTable(xid_t xid, LockType lock_type, oid_t oid) {
// 对数据表加锁,成功加锁返回 true,如果数据表已被其他事务加锁,且锁的类型不相容,返回 false
// 如果本事务已经持有该数据表的锁,根据需要升级锁的类型
// LAB 3 DONE
std::vector<Lock> &locks = lock_table_[oid];
Lock *oriLock = nullptr;
for(auto &lock: locks){
if(lock.xid == xid){
oriLock = &lock;
break;
}
}
if(oriLock != nullptr){
LockType upgraded_lock = Upgrade(oriLock->lock_type, lock_type);
for(auto &lock: locks){
if(lock.xid != xid and not Compatible(lock.lock_type, upgraded_lock)){
return false;
}
}
oriLock->lock_type = upgraded_lock;
}else{
for(auto &lock: locks){
if(not Compatible(lock.lock_type, lock_type)){
return false;
}
}
locks.push_back({xid, lock_type});
xid_table_lock_[xid].push_back(oid);
}
return true;
}
bool LockManager::LockRow(xid_t xid, LockType lock_type, oid_t oid, Rid rid) {
// 对数据行加锁,成功加锁返回 true,如果数据行已被其他事务加锁,且锁的类型不相容,返回 false
// 如果本事务已经持有该数据行的锁,根据需要升级锁的类型
// LAB 3 DONE
std::pair<oid_t, Rid> row = {oid, rid};
std::vector<Lock> &locks = row_lock_table_[row];
Lock *oriLock = nullptr;
for(auto &lock: locks){
if(lock.xid == xid){
oriLock = &lock;
break;
}
}
if(oriLock != nullptr){
LockType upgraded_lock = Upgrade(oriLock->lock_type, lock_type);
for(auto &lock: locks){
if(lock.xid != xid and not Compatible(lock.lock_type, upgraded_lock)){
return false;
}
}
oriLock->lock_type = upgraded_lock;
}else{
for(auto &lock: locks){
if(not Compatible(lock.lock_type, lock_type)){
return false;
}
}
locks.push_back({xid, lock_type});
xid_row_lock_[xid].push_back(row);
}
return true;
}
void LockManager::ReleaseLocks(xid_t xid) {
// 释放事务 xid 持有的所有锁
// LAB 3 DONE
if (xid_table_lock_.find(xid) != xid_table_lock_.end()) {
for (oid_t oid : xid_table_lock_[xid]) {
auto& locks = lock_table_[oid];
locks.erase(std::remove_if(locks.begin(), locks.end(),
[xid](const Lock& lock) { return lock.xid == xid; }),
locks.end());
}
xid_table_lock_.erase(xid);
}
if (xid_row_lock_.find(xid) != xid_row_lock_.end()) {
for (const auto& row : xid_row_lock_[xid]) {
auto& locks = row_lock_table_[row];
locks.erase(std::remove_if(locks.begin(), locks.end(),
[xid](const Lock& lock) { return lock.xid == xid; }),
locks.end());
}
xid_row_lock_.erase(xid);
}
}
按照上面定义的数据结构,简单定义函数即可。
完成了锁的管理,接下来可以实现可串行化了
使用MVCC+SS2PL实现可串行化隔离
50-lock.test
60-mv2pl.test
MVCC作为基础,将两阶段锁添加上去,就可以实现可串行化,我们已经定义了所有锁的数据结构和锁的操作,接下来只要实施上即可。
SELECT加锁
对于可串行化隔离,SELECT有不同的加锁情况:
- SELECT FOR SHARE:为表加上意向共享锁,数据行加上共享锁
- SELECT FOR UPDATE:为表加上意向互斥锁,数据行加上互斥锁
- SELECT:为表加上意向共享锁,不加行锁
std::shared_ptr<Record> SeqScanExecutor::Next() {
std::unordered_set<xid_t> active_xids;
// 根据隔离级别,获取活跃事务的 xid(通过 context_ 获取需要的信息)
// 通过 context_ 获取正确的锁,加锁失败时抛出异常
// LAB 3 BEGIN
xid_t xid = context_.GetXid();
cid_t cid = context_.GetCid();
TransactionManager &manager = context_.GetTransactionManager();
IsolationLevel level = context_.GetIsolationLevel();
if(level == IsolationLevel::REPEATABLE_READ){
active_xids = manager.GetSnapshot(xid);
} else if(level == IsolationLevel::READ_COMMITTED){
active_xids = manager.GetActiveTransactions();
} else if(level == IsolationLevel::SERIALIZABLE){
active_xids = manager.GetSnapshot(xid);
}
std::shared_ptr<Record> record = scan_->GetNextRecord(xid, level, cid, active_xids);
if (!record) {
return nullptr; // 扫描结束
}
if(level == IsolationLevel::SERIALIZABLE){
LockManager &lockManager = context_.GetLockManager();
bool success;
success = lockManager.LockTable(xid, LockType::IS, scan_->table_->GetOid());
if(not success){
throw DbException("cannot add lock");
}
}
return record;
}
以上是最终的扫描函数,以下还有FOR SHARE 和 FOR UPDATE:
std::shared_ptr<Record> LockRowsExecutor::Next() {
auto record = children_[0]->Next();
if (record == nullptr) {
return nullptr;
}
xid_t xid = context_.GetXid();
LockManager &lockManager = context_.GetLockManager();
// 根据 plan_ 的 lock type 获取正确的锁,加锁失败时抛出异常
// LAB3 DONE
if (plan_->GetLockType() == SelectLockType::SHARE){
bool success;
success = lockManager.LockTable(xid, LockType::IS, plan_->GetOid());
if(not success){
throw DbException("cannot add lock");
}
success = lockManager.LockRow(xid, LockType::S, plan_->GetOid(), record->GetRid());
if(not success){
throw DbException("cannot add lock");
}
}
if (plan_->GetLockType() == SelectLockType::UPDATE){
bool success;
success = lockManager.LockTable(xid, LockType::IX, plan_->GetOid());
if(not success){
throw DbException("cannot add lock");
}
success = lockManager.LockRow(xid, LockType::X, plan_->GetOid(), record->GetRid());
if(not success){
throw DbException("cannot add lock");
}
}
if (plan_->GetLockType() == SelectLockType::NOLOCK){
;
}
return record;
}
INSERT 加锁
对于可串行化隔离,INSERT有:
- 对表级加意向互斥锁,对数据行加互斥锁
std::shared_ptr<Record> InsertExecutor::Next() {
if (finished_) {
return nullptr;
}
uint32_t count = 0;
while (auto record = children_[0]->Next()) {
std::vector<Value> values(column_list_.Length());
const auto &insert_columns = plan_->GetInsertColumns().GetColumns();
for (size_t i = 0; i < insert_columns.size(); i++) {
auto column_index = column_list_.GetColumnIndex(insert_columns[i].GetName());
values[column_index] = record->GetValue(i);
}
auto table_record = std::make_shared<Record>(std::move(values));
// 通过 context_ 获取正确的锁,加锁失败时抛出异常
// LAB 3 BEGIN
LockManager &lockManager = context_.GetLockManager();
xid_t xid = context_.GetXid();
bool success;
success = lockManager.LockTable(xid, LockType::IX, table_->GetOid());
if(not success){
throw DbException("cannot add lock");
}
auto rid = table_->InsertRecord(std::move(table_record), context_.GetXid(), context_.GetCid(), true);
count++;
success = lockManager.LockRow(xid, LockType::X, table_->GetOid(), rid);
if(not success){
throw DbException("cannot add lock");
}
}
finished_ = true;
return std::make_shared<Record>(std::vector{Value(count)});
}
DELETE 加锁
对于可串行化隔离,DELETE有:
- 对表级加意向互斥锁,对数据行加互斥锁
std::shared_ptr<Record> DeleteExecutor::Next() {
if (finished_) {
return nullptr;
}
uint32_t count = 0;
while (auto record = children_[0]->Next()) {
// 通过 context_ 获取正确的锁,加锁失败时抛出异常
LockManager &lockManager = context_.GetLockManager();
xid_t xid = context_.GetXid();
bool success;
success = lockManager.LockTable(xid, LockType::IX, table_->GetOid());
if(not success){
throw DbException("cannot add lock");
}
success = lockManager.LockRow(xid, LockType::X, table_->GetOid(), record->GetRid());
if(not success){
throw DbException("cannot add lock");
}
table_->DeleteRecord(record->GetRid(), context_.GetXid(), true);
count++;
}
finished_ = true;
return std::make_shared<Record>(std::vector{Value(count)});
}
UPDATE 加锁
对于可串行化隔离,UPDATE有:
- 对表级加意向互斥锁,对数据行加互斥锁
std::shared_ptr<Record> UpdateExecutor::Next() {
if (finished_) {
return nullptr;
}
uint32_t count = 0;
while (auto record = children_[0]->Next()) {
std::vector<Value> values;
for (const auto &expr : plan_->update_exprs_) {
values.push_back(expr->Evaluate(record));
}
auto new_record = std::make_shared<Record>(std::move(values));
// 通过 context_ 获取正确的锁,加锁失败时抛出异常
// LAB 3 BEGIN
LockManager &lockManager = context_.GetLockManager();
xid_t xid = context_.GetXid();
bool success;
success = lockManager.LockTable(xid, LockType::IX, table_->GetOid());
if(not success){
throw DbException("cannot add lock");
}
success = lockManager.LockRow(xid, LockType::X, table_->GetOid(), record->GetRid());
if(not success){
throw DbException("cannot add lock");
}
auto rid = table_->UpdateRecord(record->GetRid(), context_.GetXid(), context_.GetCid(), new_record, true);
count++;
}
finished_ = true;
return std::make_shared<Record>(std::vector{Value(count)});
}
修改可见性函数
以下是最后的可见性函数
std::function<bool(std::shared_ptr<Record>)> isVisable = [&](std::shared_ptr<Record> rec) {
if(isolation_level == IsolationLevel::READ_COMMITTED){
xid_t del_xid = rec->GetXmax();
if (del_xid != NULL_XID) {
if (del_xid == xid && rec->GetCid() <= cid) {
return false;
}
if (!active_xids.count(del_xid) and del_xid != xid) {
return false;
}
}
xid_t insert_xid = rec->GetXmin();
if (insert_xid != NULL_XID) {
if (insert_xid == xid) {
if(rec->GetCid() >= cid){
return false;
}else{
return true;
}
}
if (active_xids.count(insert_xid)) {
return false;
}
} else {
return false;
}
return true;
} else { // if(isolation_level == IsolationLevel::REPEATABLE_READ)
xid_t del_xid = rec->GetXmax();
if (del_xid != NULL_XID) {
if (del_xid == xid && rec->GetCid() <= cid) {
return false;
}
if (!active_xids.count(del_xid) and del_xid < xid) {
return false;
}
}
xid_t insert_xid = rec->GetXmin();
if (insert_xid != NULL_XID) {
if (insert_xid > xid){
return false;
}
if (insert_xid == xid && rec->GetCid() >= cid) {
return false;
}
if (active_xids.count(insert_xid)) {
return false;
}
}else{
return false;
}
return true;
}
};
通过样例50-lock.test 60-mv2pl.test
本次实验完成
难度和坑的总结
这个实验花费了很长时间,主要集中在部分问题上
- 在可重复读隔离级别,xid如果大于当前id,就和活跃事务相当。感觉可以把可重复读和读已提交两个实验任务的顺序对调,会好很多。
- 设置了新的数据项max min xid后,需要再调整之前的有个初始化定义点,在日志中插入的记录也要包含设置好的max min。
- 对于锁的升级尝试有所问题,最后在测试样例中不断debug,调整成功
- 可串行化的SELECT不带FOR是只给表加锁,不给数据行加锁。