基础实现思路和难度总结
在存储实现的基础上添加事务管理,解决数据库崩溃后的恢复等情况。
数据类架构
对于table来说有Table 和 TablePage
- Table:是上层操作,负责调用TablePage处理页面内容,同时,增删改查操作的日志的管理也由Table完成。
- TablePage:是数据的底层操作,读入Page解析后,处理对Page内容,属性值的修改,对于本次实验,主要是给REDO和UNDO提供更接近底层的操作函数,可以不产生新的日志或者是减少需要的参数完成简单的功能。
对于log来说有 log_manager, log_records
- log_manager:日志管理者,负责读取日志,维护日志,还要能调用日志完成REDO UNDO等操作,是日志的直接管理者。
- log_records:是一个父类,有delete_log, insert_log, new_page_log等等的子类,有抽象函数UNDO和REDO,提供接口给log_manager以完成重做和撤回。子类要根据自己的情况,实现完成不同的REDO和UNDO虚函数实现。
事务回滚
lab20/10-rollback.test
日志追加
InsertRecord
对于日志而言,需要预写日志,即日志插入在数据写入之前。
对于日志插入,如果创建了新的页面,则需要创建页面日志,在创建页面日志之后。预写插入日志,再完成插入。offset 的逻辑通过储存原理可以得出,也就是模拟找到了这个插入的各个数据,再进行插入。
Rid Table::InsertRecord(std::shared_ptr<Record> record, xid_t xid, cid_t cid, bool write_log) {
if (record->GetSize() > MAX_RECORD_SIZE) {
throw DbException("Record size too large: " + std::to_string(record->GetSize()));
}
pageid_t curPageID = first_page_id_;
bool hasInserted = false;
std::shared_ptr<TablePage> curPageHandle;
slotid_t resSlotID = 0;
pageid_t resPageID = 0;
if(first_page_id_ == NULL_PAGE_ID){
std::shared_ptr<Page> newPgae = buffer_pool_.NewPage(db_oid_, oid_, 0);
if(write_log){
log_manager_.AppendNewPageLog(xid, oid_, NULL_PAGE_ID, 0);
}
first_page_id_ = 0;
curPageHandle = std::make_shared<TablePage>(newPgae);
curPageHandle->Init();
curPageHandle->SetNextPageId(NULL_PAGE_ID);
hasInserted = true;
}
while(not hasInserted){
std::shared_ptr<Page> page = buffer_pool_.GetPage(db_oid_, oid_, curPageID);
curPageHandle = std::make_shared<TablePage>(page);
if(curPageHandle->GetFreeSpaceSize() > record->GetSize()){
hasInserted = true;
resPageID = curPageID;
break;
}
if(curPageHandle->GetNextPageId() == NULL_PAGE_ID){
break;
}else{
curPageID = curPageHandle->GetNextPageId();
}
}
if(not hasInserted){
pageid_t newPageID = curPageID + 1;
if(write_log){
log_manager_.AppendNewPageLog(xid, oid_, curPageID, newPageID);
}
curPageHandle->SetNextPageId(newPageID);
std::shared_ptr<Page> newPage = buffer_pool_.NewPage(db_oid_, oid_, newPageID);
curPageHandle = std::make_shared<TablePage>(newPage);
curPageHandle->Init();
curPageHandle->SetNextPageId(NULL_PAGE_ID);
hasInserted = true;
resPageID = newPageID;
}
resSlotID = curPageHandle->GetRecordCount();
if(write_log){
db_size_t recordSize = record->GetSize();
db_size_t offset = curPageHandle->GetUpper() - recordSize;
char *buffer = new char[record->GetSize()];
db_size_t size = record->SerializeTo(buffer);
lsn_t curLsn = log_manager_.AppendInsertLog(xid, oid_, resPageID, resSlotID, offset, recordSize, buffer);
delete [] buffer;
curPageHandle->SetPageLSN(curLsn);
}
resSlotID = curPageHandle->InsertRecord(record, xid, cid);
return {resPageID, resSlotID};
}
DeleteRecord
对于删除操作而言会更加简单,只需要page_id_和slot_id_即可。
void Table::DeleteRecord(const Rid &rid, xid_t xid, bool write_log) {
std::shared_ptr<Page> page = buffer_pool_.GetPage(db_oid_, oid_, rid.page_id_);
TablePage pageHandle(page);
if(write_log){
lsn_t curLsn = log_manager_.AppendDeleteLog(xid, oid_, rid.page_id_, rid.slot_id_);
pageHandle.SetPageLSN(curLsn);
}
pageHandle.DeleteRecord(rid.slot_id_, xid);
}
Undo 日志读取
Rollback 函数(log/log_manager)
这个函数的参数是事务ID,任务是回滚这一个事务,对于事务回滚我们可以知道,为了避免从后往前遍历找 事务ID = 某个值 的所有日志,不如让日志有链表一样的指针,对于任意一个日志,如果它属于事务A,则它会记录自己的上一条事务A的日志LSN。
通过活跃事务表(时刻记录事务的最新日志LSN)找到最后一条日志,再通过前事务指针不断遍历,直到NULL_LSN。
对于获取日志内容:
通过判断 lsn 和 flushed_lsn_ 的大小关系,flushed_lsn_ 代表了最后一条没有写入磁盘的日志。也就是说:
如果 lsn < flushed_lsn_ :
日志在磁盘,使用 disk_.ReadLog(lastLSN, MAX_LOG_SIZE, logData);
如果 lsn ≥ flushed_lsn_ :
日志在缓存中,遍历log_buffer_找到日志。
获取到日志后对日志进行UNDO操作,并更新活跃事务表。
如果回滚完成,从活跃事务表中删除记录。
void LogManager::Rollback(xid_t xid) {
lsn_t lastLSN = att_[xid];
lsn_t prevLSN = NULL_LSN;
while(lastLSN != NULL_LSN){
std::shared_ptr<huadb::LogRecord> logRec;
if(lastLSN < flushed_lsn_){
char logData[MAX_LOG_SIZE];
disk_.ReadLog(lastLSN, MAX_LOG_SIZE, logData);
logRec = LogRecord::DeserializeFrom(lastLSN, logData);
}else{
for(auto pLog: log_buffer_){
if(pLog->GetLSN() == lastLSN){
logRec = pLog;
break;
}
}
}
prevLSN = logRec->GetPrevLSN();
logRec->Undo(*buffer_pool_, *catalog_, *this, prevLSN);
lastLSN = prevLSN;
att_[xid] = lastLSN;
}
att_.erase(xid);
}
实现日志的 Undo 操作
TablePage::UndoDeleteRecord
这个函数为了便于对应的回滚操作,对于UNDO 一个删除操作而言,和插入一个原来的记录不同,我们的DELETE操作并没有对数据进行真正的清空,而是在数据头上打了一个标记,这个标记说明了这个记录是否被删除。
因此,对于重新插入一个数据而言,我们有更好的undo方法,就是去除这个删除标记,就变成撤销删除了。
void TablePage::UndoDeleteRecord(slotid_t slot_id) {
Record header;
header.DeserializeHeaderFrom(page_data_ + slots_[slot_id].offset_);
header.SetDeleted(false);
header.SerializeHeaderTo(page_data_ + slots_[slot_id].offset_);
page_->SetDirty();
}
InsertLog::Undo
撤销一个插入操作也就是将插入的记录再删除。pageHandle的函数并不会记录日志而产生新的日志,新的日志产生都是由Table类完成的。
void InsertLog::Undo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager, lsn_t undo_next_lsn) {
oid_t db_oid = catalog.GetDatabaseOid(GetOid());
std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
TablePage pageHandle(page);
pageHandle.DeleteRecord(slot_id_, xid_);
}
DeleteLog::Undo
非常简单,就是使用上面我们简化过的恢复删除函数即可。
void DeleteLog::Undo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager, lsn_t undo_next_lsn) {
oid_t db_oid = catalog.GetDatabaseOid(GetOid());
std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
TablePage pageHandle(page);
pageHandle.UndoDeleteRecord(slot_id_);
}
NewPageLog::Undo
对于撤回一个新的页面,想不到很好的解决方法,就将该页清空,虽然可以让它的前一页的后置指针变成NULL指针,但是如果新建的是第一个page,就很难做到真正的删除,因此直接采用初始化,清空这个页面的所有内容。
void NewPageLog::Undo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager, lsn_t undo_next_lsn) {
oid_t db_oid = catalog.GetDatabaseOid(GetOid());
std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
TablePage pageHandle(page);
pageHandle.Init();
}
至此我们完成了所有的事务回滚函数,通过测试样例:
lab2/10-rollback.test
事务重做
lab2/20-recover.test
Redo 日志读取
LogManager::Redo
对于最基础的事务重做,只需要从头将所有事务都重做一遍即可,因此采用从LSN = 0 开始,next_lsn_代表了下一个日志(未生成)的LSN,所以执行到未生成的LSN即可。
需要注意的是,这里的LSN概念略有不同,这里的LSN并不是以1递增,此处的LSN恰好是日志在储存中的偏移量offset,它也是递增的,并且不需要维护123456,只需要offset还能更方便的访问,因此,遍历的时候,并不是curLSN ++:
而是curLSN += logRec->GetSize();
void LogManager::Redo() {
lsn_t curLSN = 0;
lsn_t end = next_lsn_;
while (curLSN < end) {
std::shared_ptr<huadb::LogRecord> logRec = nullptr;
if (curLSN <= flushed_lsn_) {
char logData[MAX_LOG_SIZE + 10];
disk_.ReadLog(curLSN, MAX_LOG_SIZE, logData);
logRec = LogRecord::DeserializeFrom(curLSN, logData);
} else {
for (const auto& pLog : log_buffer_) {
if (pLog->GetLSN() == curLSN) {
logRec = pLog;
break;
}
}
}
logRec->Redo(*buffer_pool_, *catalog_, *this);
curLSN += logRec->GetSize();
}
}
实现日志的 Redo 操作
TablePage::RedoInsertRecord
和另一个函数同理,是提供了更方便的插入方式,以不需要更多的数据。
需要注意的是,这个redo和插入不同,插入会直接使用新的slot_id ,但重做日志不同,重做日志是不论这个位置是否有数据,都要进行覆写,因此,插入数据的时候需要维护好lower_ 和 upper_指针,关于这两者的信息,可以移步到关于数据库存储的博客里。
由于是重做插入,因此给的slot_id就是最新的,也就可以直接指定lower_ 的值,又offset是日志的参数,因此直接给upper_。
使用memcpy在对应位置写入数据即可。
void TablePage::RedoInsertRecord(slotid_t slot_id, char *raw_record, db_size_t page_offset, db_size_t record_size) {
*lower_ = PAGE_HEADER_SIZE + (slot_id + 1) * sizeof(Slot);
*upper_ = page_offset;
slots_[slot_id].offset_ = *upper_;
slots_[slot_id].size_ = record_size;
memcpy(page_data_ + page_offset, raw_record, record_size);
page_->SetDirty();
}
InsertLog::Redo
关于插入,有了我们上面刚实现的函数,直接找到对应的页面进行调用即可。
void InsertLog::Redo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager) {
// 如果 oid_ 不存在,表示该表已经被删除,无需 redo
if (!catalog.TableExists(oid_)) {
return;
}
oid_t db_oid = catalog.GetDatabaseOid(GetOid());
std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
TablePage pageHandle(page);
pageHandle.RedoInsertRecord(slot_id_, record_, page_offset_, record_size_);
}
DeleteLog::Redo
不过多赘述,就是调用删除日志的函数,重新删除一遍。
void DeleteLog::Redo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager) {
// 如果 oid_ 不存在,表示该表已经被删除,无需 redo
if (!catalog.TableExists(oid_)) {
return;
}
oid_t db_oid = catalog.GetDatabaseOid(GetOid());
std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
TablePage pageHandle(page);
pageHandle.DeleteRecord(slot_id_, xid_);
}
NewPageLog::Redo
对于重建新的页面,必须知道的是,页面是有编号连接的。类似于链表结构。因此,要重做新建页面,也就需要知道上一个页面,并设置NextPageId,然后对新页面进行初始化即可。
void NewPageLog::Redo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager) {
// 如果 oid_ 不存在,表示该表已经被删除,无需 redo
if (!catalog.TableExists(oid_)) {
return;
}
oid_t db_oid = catalog.GetDatabaseOid(GetOid());
std::shared_ptr<huadb::Page> page, lastPage;
if(page_id_ == 0){
page = buffer_pool.NewPage(db_oid, oid_, page_id_);
lastPage = nullptr;
}else{
page = buffer_pool.NewPage(db_oid, oid_, page_id_);
lastPage = buffer_pool.GetPage(db_oid, oid_, page_id_ - 1);
}
if(lastPage != nullptr){
TablePage pageHandle(lastPage);
pageHandle.SetNextPageId(page_id_);
}
TablePage pageHandle(page);
pageHandle.Init();
}
至此我们完成了所有的事务重做函数,通过测试样例:
lab2/20-recover.test
ARIES 恢复算法
lab2/30-aries.test
对于ARISE恢复算法,我们进行简单的复习:
ARISE恢复算法分成三个步骤:
- 分析:重建 脏页表 和 活跃事务表
- 重做:使用 脏页表 决定重做的起点
- 回滚:使用 活跃事务表 快速精准定位需要回滚的事务
这样一下就疏通了,我们对以上三个函数进行实现:
分析:LogManager::Analyze()
分析函数内先找到检查点,检查点机制缩短了了分析所需的时间。
检查点之前的所有记录都是已经落实完毕的,也就不需要被恢复。
现在来说明分析的行为步骤:
分析找到一个起点(检查点),开始一直往剩下的所有日志开始遍历。
- 对于活跃事务表att_:
活跃事务表记录了所有活跃的事务以及其最近的日志LSN
- 如果当前日志对应事务已经在活跃事务表内:
①是COMMIT日志,从活跃事务表中删除②否则更新活跃事务表当前日志LSN
- 如果当前日志对应事务不在活跃事务表内:
如果不是COMMIT日志,就加入活跃事务表,并以当前日志LSN作为对应值。
活跃事务表att_依据以上行为构建。
- 对于脏页表dpt_:
脏页表记录了哪些页面被日志修改过,并记录这个页面最早的修改日志LSN
- 如果当前日志对应页面在脏页表内:忽视
- 如果当前日志对应页面不在脏页表内,且当前日志是增删改日志:增加到dpt_日志中,进行记录。
脏页表dpt_依据以上行为构建。
SetNextXid也是需要注意的,在分析的时候遍历了所有日志,对于刚回复的数据库对事务id的管理出现了混乱,不知道哪个才是下一个事务ID,因此对这些事务ID中找到最大的+1就是下一个事务ID
void LogManager::Analyze() {
// 恢复 Master Record 等元信息
// 恢复故障时正在使用的数据库
if (disk_.FileExists(NEXT_LSN_NAME)) {
std::ifstream in(NEXT_LSN_NAME);
lsn_t next_lsn;
in >> next_lsn;
next_lsn_ = next_lsn;
} else {
next_lsn_ = FIRST_LSN;
}
flushed_lsn_ = next_lsn_ - 1;
lsn_t checkpoint_lsn = 0;
if (disk_.FileExists(MASTER_RECORD_NAME)) {
std::ifstream in(MASTER_RECORD_NAME);
in >> checkpoint_lsn;
}
// 重建 事务表 和 脏页表
xid_t max_xid = transaction_manager_.GetNextXid();
lsn_t curLSN = checkpoint_lsn;
lsn_t end = next_lsn_;
while (curLSN < end) {
std::shared_ptr<huadb::LogRecord> logRec = nullptr;
if (curLSN <= flushed_lsn_) {
char logData[MAX_LOG_SIZE + 10];
disk_.ReadLog(curLSN, MAX_LOG_SIZE, logData);
logRec = LogRecord::DeserializeFrom(curLSN, logData);
} else {
for (const auto& pLog : log_buffer_) {
if (pLog->GetLSN() == curLSN) {
logRec = pLog;
break;
}
}
}
xid_t xid = logRec->GetXid();
max_xid = std::max(xid + 1, max_xid);
if(att_.count(xid)){
if(logRec->GetType() == LogType::COMMIT){
att_.erase(xid);
}else{
att_[xid] = logRec->GetLSN();
}
}else{
if(logRec->GetType() != LogType::COMMIT)
att_[xid] = logRec->GetLSN();
}
if(logRec->GetType() == LogType::DELETE){
std::shared_ptr<DeleteLog> curLog = std::dynamic_pointer_cast<DeleteLog>(logRec);
TablePageid page_id = {curLog->GetOid(), curLog->GetPageId()};
if(not dpt_.count(page_id)){
dpt_[page_id] = curLog->GetLSN();
}
}else if(logRec->GetType() == LogType::INSERT){
std::shared_ptr<InsertLog> curLog = std::dynamic_pointer_cast<InsertLog>(logRec);
TablePageid page_id = {curLog->GetOid(), curLog->GetPageId()};
if(not dpt_.count(page_id)){
dpt_[page_id] = curLog->GetLSN();
}
}else if(logRec->GetType() == LogType::NEW_PAGE){
std::shared_ptr<NewPageLog> curLog = std::dynamic_pointer_cast<NewPageLog>(logRec);
TablePageid page_id = {curLog->GetOid(), curLog->GetPageId()};
if(not dpt_.count(page_id)){
dpt_[page_id] = curLog->GetLSN();
}
}
curLSN += logRec->GetSize();
}
transaction_manager_.SetNextXid(max_xid);
}
重做:LogManager::Redo()
重做函数利用了脏页表,但使用的方法也十分简单粗暴,它遍历脏页表中的所有页面,这些页面都是需要Redo的,因为他们是脏页,可能在数据库崩溃的时候丢失了这些脏页。所以对于重做,和原来的操作唯一的不同点就是起点不同,遍历所有脏页最早的修改LSN,找到这些LSN中最小的作为起点,开始全部往后遍历一遍。
void LogManager::Redo() {
lsn_t minRecLSN = 0;
if(dpt_.empty()){
minRecLSN = Checkpoint();
}else{
minRecLSN = next_lsn_;
for(auto [_, lsn]: dpt_){
minRecLSN = std::min(minRecLSN, lsn);
}
}
lsn_t curLSN = minRecLSN;
lsn_t end = next_lsn_;
while (curLSN < end) {
std::shared_ptr<huadb::LogRecord> logRec = nullptr;
if (curLSN <= flushed_lsn_) {
char logData[MAX_LOG_SIZE + 10];
disk_.ReadLog(curLSN, MAX_LOG_SIZE, logData);
logRec = LogRecord::DeserializeFrom(curLSN, logData);
} else {
for (const auto& pLog : log_buffer_) {
if (pLog->GetLSN() == curLSN) {
logRec = pLog;
break;
}
}
}
logRec->Redo(*buffer_pool_, *catalog_, *this);
curLSN += logRec->GetSize();
}
}
回滚LogManager::Undo()
相当的方便,我们有了所有要回滚的事务ID,以及最新的对应日志LSN,每个日志还能找到自己事务的上一个LSN,按照上面的RollBack函数逻辑,直接调用即可。
void LogManager::Undo() {
for(auto[xid,_]: att_){
Rollback(xid);
}
}
至此我们完成了ARIES 恢复算法,通过测试样例:
lab2/30-aries.test
难度和坑的总结
被LSN卡了一会,没有意识到LSN并不是按照1递增,而是刚刚好依据日志大小也作为日志的偏移值,导致一开始使用++一直出现日志读取错误或者是下标错误。
在创建新页面的Redo中,对上一个页面的处理有失误,导致找了很久的BUG
建议:对于创建新页面的UNDO,第一个页面无法删除,进退两难