基础实现思路和难度总结

在存储实现的基础上添加事务管理,解决数据库崩溃后的恢复等情况。

数据类架构

对于table来说有Table 和 TablePage

  • Table:是上层操作,负责调用TablePage处理页面内容,同时,增删改查操作的日志的管理也由Table完成。
  • TablePage:是数据的底层操作,读入Page解析后,处理对Page内容,属性值的修改,对于本次实验,主要是给REDO和UNDO提供更接近底层的操作函数,可以不产生新的日志或者是减少需要的参数完成简单的功能。

对于log来说有 log_manager, log_records

  • log_manager:日志管理者,负责读取日志,维护日志,还要能调用日志完成REDO UNDO等操作,是日志的直接管理者。
  • log_records:是一个父类,有delete_log, insert_log, new_page_log等等的子类,有抽象函数UNDO和REDO,提供接口给log_manager以完成重做和撤回。子类要根据自己的情况,实现完成不同的REDO和UNDO虚函数实现。

事务回滚

lab20/10-rollback.test

日志追加

InsertRecord

对于日志而言,需要预写日志,即日志插入在数据写入之前。

对于日志插入,如果创建了新的页面,则需要创建页面日志,在创建页面日志之后。预写插入日志,再完成插入。offset 的逻辑通过储存原理可以得出,也就是模拟找到了这个插入的各个数据,再进行插入。

Rid Table::InsertRecord(std::shared_ptr<Record> record, xid_t xid, cid_t cid, bool write_log) {
  if (record->GetSize() > MAX_RECORD_SIZE) {
    throw DbException("Record size too large: " + std::to_string(record->GetSize()));
  }
  pageid_t curPageID = first_page_id_;
  bool hasInserted = false;
  std::shared_ptr<TablePage> curPageHandle;

  slotid_t resSlotID = 0;
  pageid_t resPageID = 0;

  if(first_page_id_ == NULL_PAGE_ID){
    std::shared_ptr<Page> newPgae = buffer_pool_.NewPage(db_oid_, oid_, 0);
    if(write_log){
      log_manager_.AppendNewPageLog(xid, oid_, NULL_PAGE_ID, 0);
    }
    first_page_id_ = 0;
    curPageHandle = std::make_shared<TablePage>(newPgae);
    curPageHandle->Init();
    curPageHandle->SetNextPageId(NULL_PAGE_ID);
    hasInserted = true;
  }

  while(not hasInserted){
    std::shared_ptr<Page> page = buffer_pool_.GetPage(db_oid_, oid_, curPageID);
    curPageHandle = std::make_shared<TablePage>(page);
    if(curPageHandle->GetFreeSpaceSize() > record->GetSize()){
      hasInserted = true;
      resPageID = curPageID;
      break;
    }
    if(curPageHandle->GetNextPageId() == NULL_PAGE_ID){
      break;
    }else{
      curPageID = curPageHandle->GetNextPageId();
    }
  }
  
  if(not hasInserted){
    pageid_t newPageID = curPageID + 1;

    if(write_log){
      log_manager_.AppendNewPageLog(xid, oid_, curPageID, newPageID);
    }
    curPageHandle->SetNextPageId(newPageID);
    std::shared_ptr<Page> newPage = buffer_pool_.NewPage(db_oid_, oid_, newPageID);

    curPageHandle = std::make_shared<TablePage>(newPage);
    curPageHandle->Init();
    curPageHandle->SetNextPageId(NULL_PAGE_ID);
    
    hasInserted = true;
    resPageID = newPageID;
  }

  resSlotID = curPageHandle->GetRecordCount();
  if(write_log){
    db_size_t recordSize = record->GetSize();
    db_size_t offset = curPageHandle->GetUpper() - recordSize;

    char *buffer = new char[record->GetSize()];
    db_size_t size = record->SerializeTo(buffer);

    lsn_t curLsn = log_manager_.AppendInsertLog(xid, oid_, resPageID, resSlotID, offset, recordSize, buffer);
    delete [] buffer;

    curPageHandle->SetPageLSN(curLsn);
  }
  resSlotID = curPageHandle->InsertRecord(record, xid, cid);

  return {resPageID, resSlotID};
}

DeleteRecord

对于删除操作而言会更加简单,只需要page_id_和slot_id_即可。

void Table::DeleteRecord(const Rid &rid, xid_t xid, bool write_log) {
  std::shared_ptr<Page> page = buffer_pool_.GetPage(db_oid_, oid_, rid.page_id_);
  TablePage pageHandle(page);

  if(write_log){
    lsn_t curLsn = log_manager_.AppendDeleteLog(xid, oid_, rid.page_id_, rid.slot_id_);
    pageHandle.SetPageLSN(curLsn);
  }
  pageHandle.DeleteRecord(rid.slot_id_, xid);
}

Undo 日志读取

Rollback 函数(log/log_manager)

这个函数的参数是事务ID,任务是回滚这一个事务,对于事务回滚我们可以知道,为了避免从后往前遍历找 事务ID = 某个值 的所有日志,不如让日志有链表一样的指针,对于任意一个日志,如果它属于事务A,则它会记录自己的上一条事务A的日志LSN。

通过活跃事务表(时刻记录事务的最新日志LSN)找到最后一条日志,再通过前事务指针不断遍历,直到NULL_LSN。

对于获取日志内容:

通过判断 lsn 和 flushed_lsn_ 的大小关系,flushed_lsn_ 代表了最后一条没有写入磁盘的日志。也就是说:

如果 lsn < flushed_lsn_ :

日志在磁盘,使用 disk_.ReadLog(lastLSN, MAX_LOG_SIZE, logData);

如果 lsn ≥ flushed_lsn_ :

日志在缓存中,遍历log_buffer_找到日志。

获取到日志后对日志进行UNDO操作,并更新活跃事务表。

如果回滚完成,从活跃事务表中删除记录。

void LogManager::Rollback(xid_t xid) {
  lsn_t lastLSN = att_[xid];
  lsn_t prevLSN = NULL_LSN;
  while(lastLSN != NULL_LSN){
    std::shared_ptr<huadb::LogRecord> logRec;
    if(lastLSN < flushed_lsn_){
      char logData[MAX_LOG_SIZE];
      disk_.ReadLog(lastLSN, MAX_LOG_SIZE, logData);
      logRec = LogRecord::DeserializeFrom(lastLSN, logData);
    }else{
      for(auto pLog: log_buffer_){
        if(pLog->GetLSN() == lastLSN){
          logRec = pLog;
          break;
        }
      }
    }
    prevLSN = logRec->GetPrevLSN();
    logRec->Undo(*buffer_pool_, *catalog_, *this, prevLSN);
    lastLSN = prevLSN;
    att_[xid] = lastLSN;
  }
  att_.erase(xid);
}

实现日志的 Undo 操作

TablePage::UndoDeleteRecord

这个函数为了便于对应的回滚操作,对于UNDO 一个删除操作而言,和插入一个原来的记录不同,我们的DELETE操作并没有对数据进行真正的清空,而是在数据头上打了一个标记,这个标记说明了这个记录是否被删除。

因此,对于重新插入一个数据而言,我们有更好的undo方法,就是去除这个删除标记,就变成撤销删除了。

void TablePage::UndoDeleteRecord(slotid_t slot_id) {
  Record header;
  header.DeserializeHeaderFrom(page_data_ + slots_[slot_id].offset_);
  header.SetDeleted(false);
  header.SerializeHeaderTo(page_data_ + slots_[slot_id].offset_);
  page_->SetDirty();
}

InsertLog::Undo

撤销一个插入操作也就是将插入的记录再删除。pageHandle的函数并不会记录日志而产生新的日志,新的日志产生都是由Table类完成的。

void InsertLog::Undo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager, lsn_t undo_next_lsn) {
  oid_t db_oid = catalog.GetDatabaseOid(GetOid());
  std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
  TablePage pageHandle(page);
  pageHandle.DeleteRecord(slot_id_, xid_);
}

DeleteLog::Undo

非常简单,就是使用上面我们简化过的恢复删除函数即可。

void DeleteLog::Undo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager, lsn_t undo_next_lsn) {
  oid_t db_oid = catalog.GetDatabaseOid(GetOid());
  std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
  TablePage pageHandle(page);
  pageHandle.UndoDeleteRecord(slot_id_);
}

NewPageLog::Undo

对于撤回一个新的页面,想不到很好的解决方法,就将该页清空,虽然可以让它的前一页的后置指针变成NULL指针,但是如果新建的是第一个page,就很难做到真正的删除,因此直接采用初始化,清空这个页面的所有内容。

void NewPageLog::Undo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager, lsn_t undo_next_lsn) {
  oid_t db_oid = catalog.GetDatabaseOid(GetOid());

  std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
  TablePage pageHandle(page);
  pageHandle.Init();
}

至此我们完成了所有的事务回滚函数,通过测试样例:

lab2/10-rollback.test

事务重做

lab2/20-recover.test

Redo 日志读取

LogManager::Redo

对于最基础的事务重做,只需要从头将所有事务都重做一遍即可,因此采用从LSN = 0 开始,next_lsn_代表了下一个日志(未生成)的LSN,所以执行到未生成的LSN即可。

需要注意的是,这里的LSN概念略有不同,这里的LSN并不是以1递增,此处的LSN恰好是日志在储存中的偏移量offset,它也是递增的,并且不需要维护123456,只需要offset还能更方便的访问,因此,遍历的时候,并不是curLSN ++:

而是curLSN += logRec->GetSize();

void LogManager::Redo() {
  lsn_t curLSN = 0;
  lsn_t end = next_lsn_;
  while (curLSN < end) {
    std::shared_ptr<huadb::LogRecord> logRec = nullptr;
    if (curLSN <= flushed_lsn_) {
        char logData[MAX_LOG_SIZE + 10];
        disk_.ReadLog(curLSN, MAX_LOG_SIZE, logData);
        logRec = LogRecord::DeserializeFrom(curLSN, logData);
    } else {
        for (const auto& pLog : log_buffer_) {
            if (pLog->GetLSN() == curLSN) {
                logRec = pLog;
                break;
            }
        }
    }
    logRec->Redo(*buffer_pool_, *catalog_, *this);
    curLSN += logRec->GetSize();
  }
}

实现日志的 Redo 操作

TablePage::RedoInsertRecord

和另一个函数同理,是提供了更方便的插入方式,以不需要更多的数据。

需要注意的是,这个redo和插入不同,插入会直接使用新的slot_id ,但重做日志不同,重做日志是不论这个位置是否有数据,都要进行覆写,因此,插入数据的时候需要维护好lower_ 和 upper_指针,关于这两者的信息,可以移步到关于数据库存储的博客里。

由于是重做插入,因此给的slot_id就是最新的,也就可以直接指定lower_ 的值,又offset是日志的参数,因此直接给upper_。

使用memcpy在对应位置写入数据即可。

void TablePage::RedoInsertRecord(slotid_t slot_id, char *raw_record, db_size_t page_offset, db_size_t record_size) {
  *lower_ = PAGE_HEADER_SIZE + (slot_id + 1) * sizeof(Slot);
  *upper_ = page_offset;
  slots_[slot_id].offset_ = *upper_;
  slots_[slot_id].size_ = record_size;
  memcpy(page_data_ + page_offset, raw_record, record_size);
  page_->SetDirty();
}

InsertLog::Redo

关于插入,有了我们上面刚实现的函数,直接找到对应的页面进行调用即可。

void InsertLog::Redo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager) {
  // 如果 oid_ 不存在,表示该表已经被删除,无需 redo
  if (!catalog.TableExists(oid_)) {
    return;
  }
  oid_t db_oid = catalog.GetDatabaseOid(GetOid());
  std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
  TablePage pageHandle(page);
  pageHandle.RedoInsertRecord(slot_id_, record_, page_offset_, record_size_);
}

DeleteLog::Redo

不过多赘述,就是调用删除日志的函数,重新删除一遍。

void DeleteLog::Redo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager) {
  // 如果 oid_ 不存在,表示该表已经被删除,无需 redo
  if (!catalog.TableExists(oid_)) {
    return;
  }
  oid_t db_oid = catalog.GetDatabaseOid(GetOid());
  std::shared_ptr<huadb::Page> page = buffer_pool.GetPage(db_oid, oid_, page_id_);
  TablePage pageHandle(page);
  pageHandle.DeleteRecord(slot_id_, xid_);
}

NewPageLog::Redo

对于重建新的页面,必须知道的是,页面是有编号连接的。类似于链表结构。因此,要重做新建页面,也就需要知道上一个页面,并设置NextPageId,然后对新页面进行初始化即可。

void NewPageLog::Redo(BufferPool &buffer_pool, Catalog &catalog, LogManager &log_manager) {
  // 如果 oid_ 不存在,表示该表已经被删除,无需 redo
  if (!catalog.TableExists(oid_)) {
    return;
  }
  oid_t db_oid = catalog.GetDatabaseOid(GetOid());

  std::shared_ptr<huadb::Page> page, lastPage;
  if(page_id_ == 0){
    page = buffer_pool.NewPage(db_oid, oid_, page_id_);
    lastPage = nullptr;
  }else{
    page = buffer_pool.NewPage(db_oid, oid_, page_id_);
    lastPage = buffer_pool.GetPage(db_oid, oid_, page_id_ - 1);
  }
  if(lastPage != nullptr){
    TablePage pageHandle(lastPage);
    pageHandle.SetNextPageId(page_id_);
  }
  TablePage pageHandle(page);
  pageHandle.Init();
}

至此我们完成了所有的事务重做函数,通过测试样例:

lab2/20-recover.test

ARIES 恢复算法

lab2/30-aries.test

对于ARISE恢复算法,我们进行简单的复习:

ARISE恢复算法分成三个步骤:

  • 分析:重建 脏页表 和 活跃事务表
  • 重做:使用 脏页表 决定重做的起点
  • 回滚:使用 活跃事务表 快速精准定位需要回滚的事务

这样一下就疏通了,我们对以上三个函数进行实现:

分析:LogManager::Analyze()

分析函数内先找到检查点,检查点机制缩短了了分析所需的时间。

检查点之前的所有记录都是已经落实完毕的,也就不需要被恢复。

现在来说明分析的行为步骤:

分析找到一个起点(检查点),开始一直往剩下的所有日志开始遍历。

  1. 对于活跃事务表att_:

活跃事务表记录了所有活跃的事务以及其最近的日志LSN

  • 如果当前日志对应事务已经在活跃事务表内:

①是COMMIT日志,从活跃事务表中删除②否则更新活跃事务表当前日志LSN

  • 如果当前日志对应事务不在活跃事务表内:

如果不是COMMIT日志,就加入活跃事务表,并以当前日志LSN作为对应值。

活跃事务表att_依据以上行为构建。

  1. 对于脏页表dpt_:

脏页表记录了哪些页面被日志修改过,并记录这个页面最早的修改日志LSN

  • 如果当前日志对应页面在脏页表内:忽视
  • 如果当前日志对应页面不在脏页表内,且当前日志是增删改日志:增加到dpt_日志中,进行记录。

脏页表dpt_依据以上行为构建。

SetNextXid也是需要注意的,在分析的时候遍历了所有日志,对于刚回复的数据库对事务id的管理出现了混乱,不知道哪个才是下一个事务ID,因此对这些事务ID中找到最大的+1就是下一个事务ID

void LogManager::Analyze() {
  // 恢复 Master Record 等元信息
  // 恢复故障时正在使用的数据库
  if (disk_.FileExists(NEXT_LSN_NAME)) {
    std::ifstream in(NEXT_LSN_NAME);
    lsn_t next_lsn;
    in >> next_lsn;
    next_lsn_ = next_lsn;
  } else {
    next_lsn_ = FIRST_LSN;
  }
  flushed_lsn_ = next_lsn_ - 1;
  lsn_t checkpoint_lsn = 0;

  if (disk_.FileExists(MASTER_RECORD_NAME)) {
    std::ifstream in(MASTER_RECORD_NAME);
    in >> checkpoint_lsn;
  }
  // 重建 事务表 和 脏页表
  xid_t max_xid = transaction_manager_.GetNextXid();

  lsn_t curLSN = checkpoint_lsn;
  lsn_t end = next_lsn_;
  while (curLSN < end) {
    std::shared_ptr<huadb::LogRecord> logRec = nullptr;
    if (curLSN <= flushed_lsn_) {
        char logData[MAX_LOG_SIZE + 10];
        disk_.ReadLog(curLSN, MAX_LOG_SIZE, logData);
        logRec = LogRecord::DeserializeFrom(curLSN, logData);
    } else {
        for (const auto& pLog : log_buffer_) {
            if (pLog->GetLSN() == curLSN) {
                logRec = pLog;
                break;
            }
        }
    }
    xid_t xid = logRec->GetXid();
    max_xid = std::max(xid + 1, max_xid);

    if(att_.count(xid)){
      if(logRec->GetType() == LogType::COMMIT){
        att_.erase(xid);
      }else{
        att_[xid] = logRec->GetLSN();
      }
    }else{
      if(logRec->GetType() != LogType::COMMIT)
        att_[xid] = logRec->GetLSN();
    }

    if(logRec->GetType() == LogType::DELETE){
      std::shared_ptr<DeleteLog> curLog = std::dynamic_pointer_cast<DeleteLog>(logRec);
      TablePageid page_id = {curLog->GetOid(), curLog->GetPageId()};
      if(not dpt_.count(page_id)){
        dpt_[page_id] = curLog->GetLSN();
      }
    }else if(logRec->GetType() == LogType::INSERT){
      std::shared_ptr<InsertLog> curLog = std::dynamic_pointer_cast<InsertLog>(logRec);
      TablePageid page_id = {curLog->GetOid(), curLog->GetPageId()};
      if(not dpt_.count(page_id)){
        dpt_[page_id] = curLog->GetLSN();
      }
    }else if(logRec->GetType() == LogType::NEW_PAGE){
      std::shared_ptr<NewPageLog> curLog = std::dynamic_pointer_cast<NewPageLog>(logRec);
      TablePageid page_id = {curLog->GetOid(), curLog->GetPageId()};
      if(not dpt_.count(page_id)){
        dpt_[page_id] = curLog->GetLSN();
      }
    }
    curLSN += logRec->GetSize();
  }
  transaction_manager_.SetNextXid(max_xid);
}

重做:LogManager::Redo()

重做函数利用了脏页表,但使用的方法也十分简单粗暴,它遍历脏页表中的所有页面,这些页面都是需要Redo的,因为他们是脏页,可能在数据库崩溃的时候丢失了这些脏页。所以对于重做,和原来的操作唯一的不同点就是起点不同,遍历所有脏页最早的修改LSN,找到这些LSN中最小的作为起点,开始全部往后遍历一遍。

void LogManager::Redo() {
  lsn_t minRecLSN = 0;

  if(dpt_.empty()){
    minRecLSN = Checkpoint();
  }else{
    minRecLSN = next_lsn_;
    for(auto [_, lsn]: dpt_){
      minRecLSN = std::min(minRecLSN, lsn);
    }
  }

  lsn_t curLSN = minRecLSN;
  lsn_t end = next_lsn_;
  while (curLSN < end) {
    std::shared_ptr<huadb::LogRecord> logRec = nullptr;
    if (curLSN <= flushed_lsn_) {
        char logData[MAX_LOG_SIZE + 10];
        disk_.ReadLog(curLSN, MAX_LOG_SIZE, logData);
        logRec = LogRecord::DeserializeFrom(curLSN, logData);
    } else {
        for (const auto& pLog : log_buffer_) {
            if (pLog->GetLSN() == curLSN) {
                logRec = pLog;
                break;
            }
        }
    }
    logRec->Redo(*buffer_pool_, *catalog_, *this);
    curLSN += logRec->GetSize();
  }
}

回滚LogManager::Undo()

相当的方便,我们有了所有要回滚的事务ID,以及最新的对应日志LSN,每个日志还能找到自己事务的上一个LSN,按照上面的RollBack函数逻辑,直接调用即可。

void LogManager::Undo() {
  for(auto[xid,_]: att_){
    Rollback(xid);
  }
}

至此我们完成了ARIES 恢复算法,通过测试样例:

lab2/30-aries.test

难度和坑的总结

被LSN卡了一会,没有意识到LSN并不是按照1递增,而是刚刚好依据日志大小也作为日志的偏移值,导致一开始使用++一直出现日志读取错误或者是下标错误。

在创建新页面的Redo中,对上一个页面的处理有失误,导致找了很久的BUG

建议:对于创建新页面的UNDO,第一个页面无法删除,进退两难