整体概要
MiniOB主要执行逻辑
整个 SQL 请求的处理分为三个主要阶段:语法解析、语义绑定与执行调度。具体流程如下:
-
解析阶段
客户端传入的 SQL 文本首先由语法解析器处理,生成抽象语法表示
ParsedSql。 -
语义绑定阶段
解析结果经过语义检查和元数据绑定,构建出可执行的语句对象
Stmt,该对象明确了引用的表、列、类型等信息。 -
执行调度阶段
根据语句类型选择不同的执行路径:
-
DDL 语句(如 CREATE / DROP)
直接进入执行环节,无需查询优化器介入。
调用控制器(Commander)分发至对应的 Executor 完成操作。
-
DML 语句(如 SELECT / INSERT / UPDATE / DELETE)
需要经过优化器处理:
-
基于
Stmt构建逻辑计划树 -
执行逻辑优化(例如谓词下推、投影下推等)
-
转换为物理执行计划
采用火山模型的迭代器接口组织执行算子
优化后的执行器在
SqlResult上产出结果数据,最终由通信模块Communicator按协议格式化并返回客户端。 -
-
flowchart TD
A[SQL文本输入] --> B[语法解析 ParsedSql]
B --> C[语义绑定构建 Stmt]
C --> D{语句类型?}
D --> E[DDL]
D --> F[DML]
%% DDL 流程
E --> G[Commander 直接派发执行器]
G --> H[执行元数据操作 返回状态信息]
H --> I[Communicator 返回处理结果]
%% DML 流程
F --> J[构建逻辑计划树]
J --> K[逻辑优化 谓词/投影下推等]
K --> L[转换为物理计划 火山模型算子]
L --> M[执行数据算子 输出至 SqlResult]
M --> N[Communicator 格式化结果]
N --> O[返回结果至客户端]
从SQL执行流程解析sql相关结构 源码解析
我们先来确定数据库交互的角色:
- Client:发送sql语句的用户
- Server:处理sql语句的服务器内核
Communicator
在明确了数据库交互中的 Client 与 Server 角色之后,需要一个中间层来负责二者之间的通信。
在 miniob 中,这一职责由 Communicator 承担。
Communicator 的定位
Communicator 是数据库服务器中负责“网络通信与协议适配”的组件,其核心任务是:
- 从客户端接收原始请求数据;
- 按照指定通信协议解析请求;
- 将解析后的请求封装为服务器内部可处理的事件;
- 在 SQL 执行完成后,将结果重新编码并发送回客户端。
在执行流程中的位置:
在一次完整的 SQL 请求中,Communicator 的位置大致如下:
Client
↓
Communicator(read_event)
↓
SQL 执行流程
↓
Communicator(write_result)
↓
Client
也就是说,Communicator 是 Server 的入口和出口,而中间的 SQL 处理逻辑并不关心请求是从哪里来的,也不关心结果如何被发送。
核心接口
在 Server 侧,Communicator 向上层暴露了一组稳定接口,用于完成一次完整的请求–响应交互。
read_event:从 Client 读取一次请求,并解析为内部可处理的事件对象。
RC Communicator::read_event(SessionEvent *&event);
write_result:将 SQL 执行结果写回 Client。
RC Communicator::write_result(SessionEvent *event, bool &need_disconnect);
SessionEvent
对于一次sql的处理,我们通常会产生很多很多数据。例如SQL的纯文本信息,转化成结构树的结构信息,处理成功还是失败的错误信息,因此我们需要封装一个载体来存储这些数据来贯彻全流程
SessionEvent 用于描述一次完整的客户端 SQL 请求,是数据库内核中贯穿通信层与执行层的核心上下文对象。它封装了请求来源(Communicator)、会话信息(Session)、原始 SQL 语句以及执行结果与调试信息,使得 SQL 在解析、优化、执行和结果返回的各个阶段都能够围绕同一份请求状态进行处理,从而实现通信逻辑与执行逻辑的解耦。
请求处理
RC SqlTaskHandler::handle_event(Communicator *communicator)
{
SessionEvent *event = nullptr;
// 从客户端连接中读取一个请求事件
RC rc = communicator->read_event(event);
if (OB_FAIL(rc)) {
return rc; // 读取失败则直接返回错误码
}
if (nullptr == event) {
return RC::SUCCESS; // 没有事件则认为处理成功
}
// 会话阶段处理(例如更新会话状态、准备执行环境等)
session_stage_.handle_request2(event);
// 封装 SQL 执行事件(从请求事件中提取 SQL)
SQLStageEvent sql_event(event, event->query());
// 执行 SQL 处理
rc = handle_sql(&sql_event);
if (OB_FAIL(rc)) {
LOG_TRACE("failed to handle sql. rc=%s", strrc(rc));
// 如果 SQL 执行失败,设置返回码到 SQLResult 里
event->sql_result()->set_return_code(rc);
}
bool need_disconnect = false;
// 将 SQL 执行结果写回客户端,同时确定是否需要断开连接
rc = communicator->write_result(event, need_disconnect);
LOG_INFO("write result return %s", strrc(rc));
// 清理 session 当前请求状态(结束本次 SQL 请求)
event->session()->set_current_request(nullptr);
Session::set_current_session(nullptr);
// 释放当前事件资源
delete event;
// 如需断开连接则返回内部错误,外层将根据返回值执行断连操作
if (need_disconnect) {
return RC::INTERNAL;
}
return RC::SUCCESS; // 正常处理成功
}
SQL的接受和处理——handle_sql
/**
* 处理一个SQL语句经历这几个阶段。
* 虽然看起来流程比较多,但是对于大多数SQL来说,更多的可以关注parse和executor阶段。
* 通常只有select、delete等带有查询条件的语句才需要进入optimize。
* 对于DDL语句,比如create table、create index等,没有对应的查询计划,可以直接搜索
* create_table_executor、create_index_executor来看具体的执行代码。
* select、delete等DML语句,会产生一些执行计划,如果感觉繁琐,可以跳过optimize直接看
* execute_stage中的执行,通过explain语句看需要哪些operator,然后找对应的operator来
* 调试或者看代码执行过程即可。
*/
RC SqlTaskHandler::handle_sql(SQLStageEvent *sql_event)
{
//查询缓存阶段
RC rc = query_cache_stage_.handle_request(sql_event);
if (OB_FAIL(rc)) {
LOG_TRACE("failed to do query cache. rc=%s", strrc(rc));
return rc;
}
/**
* 解析阶段
*
对SQL语句进行词法分析和语法分析
将SQL文本转换为抽象语法树(AST)
识别SQL语句类型和结构
*/
rc = parse_stage_.handle_request(sql_event);
if (OB_FAIL(rc)) {
LOG_TRACE("failed to do parse. rc=%s", strrc(rc));
return rc;
}
/**
* 解析绑定阶段
*
验证表名、字段名是否存在
检查数据类型是否匹配
将语法树中的标识符与数据库对象进行绑定
*/
rc = resolve_stage_.handle_request(sql_event);
if (OB_FAIL(rc)) {
LOG_TRACE("failed to do resolve. rc=%s", strrc(rc));
return rc;
}
//优化阶段
rc = optimize_stage_.handle_request(sql_event);
if (rc != RC::UNIMPLEMENTED && rc != RC::SUCCESS) {
LOG_TRACE("failed to do optimize. rc=%s", strrc(rc));
return rc;
}
//执行阶段
rc = execute_stage_.handle_request(sql_event);
if (OB_FAIL(rc)) {
LOG_TRACE("failed to do execute. rc=%s", strrc(rc));
return rc;
}
return rc;
}
每个阶段都有各自的Stage,作为工具类一样被使用,作为一个黑盒处理输入输出,将sql_event传入,将结果设置在sql_event之中。
语法解析/词法解析
纯sql字符串 → 字符串的语法树结构
词法解析和语法解析使用了第三方库:Flex + Bison
核心逻辑借助了 .y (Bison 语法文件) 以及 .l (Flex 词法文件)
词法文件规定了哪些字符串能够正常读取例如:
{DIGIT}+ yylval->number=atoi(yytext); RETURN_TOKEN(NUMBER);
{DIGIT}+{DOT}{DIGIT}+ yylval->floats=(float)(atof(yytext)); RETURN_TOKEN(FLOAT);
";" RETURN_TOKEN(SEMICOLON);
{DOT} RETURN_TOKEN(DOT);
EXIT RETURN_TOKEN(EXIT);
HELP RETURN_TOKEN(HELP);
语法文件规定了上述的所有词法如何解析成对象,例如:
select_stmt: /* select 语句的语法解析 */
SELECT expression_list FROM from_node from_list where group_by_opt having_opt opt_order_by opt_limit
{
$$ = new ParsedSqlNode(SCF_SELECT);
if ($2 != nullptr) {
$$->selection.expressions.swap(*$2);
delete $2;
}
if ($5 != nullptr) {
$$->selection.relations.swap(*$5);
delete $5;
}
$$->selection.relations.push_back(*$4);
std::reverse($$->selection.relations.begin(), $$->selection.relations.end());
delete $4;
if ($6 != nullptr) {
$$->selection.conditions = std::move(unique_ptr<Expression>($6));
}
if ($7 != nullptr) {
$$->selection.group_by.swap(*$7);
delete $7;
}
if ($8 != nullptr) {
$$->selection.having_conditions = std::move(unique_ptr<Expression>($8));
}
if ($9 != nullptr) {
$$->selection.order_by.swap(*$9);
delete $9;
}
if ($10 != nullptr) {
$$->selection.limit = std::make_unique<LimitSqlNode>(*$10);
delete $10;
}
}
;
通过读取内容构造出 ParsedSqlNode 其定义均存放在 sql/parser/parse_defs.h
其中包含了期望解析后的所有结构。例如:
struct SelectSqlNode
{
vector<unique_ptr<Expression>> expressions; ///< 查询的表达式
std::vector<InnerJoinSqlNode> relations; ///< 查询的表
unique_ptr<Expression> conditions; ///< 查询条件,使用AND串联起来多个条件
vector<unique_ptr<Expression>> group_by; ///< group by clause
unique_ptr<Expression> having_conditions;
vector<OrderBySqlNode> order_by;
unique_ptr<LimitSqlNode> limit;
};
因此解析阶段其行为即:读取SQL字符串,将字符串进行解析得到ParsedSqlNode 对象,将对象置入event。
其中这个阶段只涉及字符串的处理,值的合理性等,不考虑这个表是否存在,因此解析后类内的属性例如 表名,通常是 string 而并不是已经绑定的 Table * 。
解析绑定阶段
字符串的语法树结构 → 解析后的Stmt结构体
当前我们已经拥有对应的结构来容纳各个字符串,也确定了每个字符串其意义,下一步就是绑定字符串:将表名绑定到表对象,字段名绑定到对应column等等。(不存在即返回不存在错误)
解析后的对象如下:
/**
* @brief Stmt for Statement
* @ingroup Statement
* @details SQL解析后的语句,再进一步解析成Stmt,使用内部的数据结构来表示。
* 比如table_name,解析成具体的 Table对象,attr/field name解析成Field对象。
*/
class Stmt
{
public:
Stmt() = default;
virtual ~Stmt() = default;
virtual StmtType type() const = 0;
public:
static RC create_stmt(Db *db, ParsedSqlNode &sql_node, Stmt *&stmt);
private:
};
每种语句都有自己的Stmt,例如update语句
/**
* @brief 更新语句
* @ingroup Statement
*/
class UpdateStmt : public Stmt
{
public:
UpdateStmt() = default;
~UpdateStmt()
{
if (nullptr != filter_stmt_) {
delete filter_stmt_;
filter_stmt_ = nullptr;
}
}
StmtType type() const override { return StmtType::UPDATE; }
public:
static RC create(Db *db, UpdateSqlNode &update_sql, Stmt *&stmt);
public:
Table *table() const { return table_; }
FilterStmt *filter_stmt() const { return filter_stmt_; }
vector<pair<const FieldMeta*, unique_ptr<Expression>>> &kv_list() { return kv_list_; }
private:
Table *table_ = nullptr;
vector<pair<const FieldMeta*, unique_ptr<Expression>>> kv_list_;
FilterStmt *filter_stmt_ = nullptr;
};
根据语句特征进行拓展继承,例如update,需要表名,列值对,where条件,并且这里并不再是字符串类型,对于列名,通过绑定变为FieldMeta。
对于Expression需要说明:
Expression是一个树形结构,运算符例如加减乘除,或者聚合等等复杂都是一个节点分治,多元运算就有多个子Expression。
enum class ExprType
{
NONE,
STAR, ///< 星号,表示所有字段
UNBOUND_FIELD, ///< 未绑定的字段,需要在resolver阶段解析为FieldExpr
UNBOUND_AGGREGATION, ///< 未绑定的聚合函数,需要在resolver阶段解析为AggregateExpr
FIELD, ///< 字段。在实际执行时,根据行数据内容提取对应字段的值
VALUE, ///< 常量值
CAST, ///< 需要做类型转换的表达式
COMPARISON, ///< 需要做比较的表达式
CONJUNCTION, ///< 多个表达式使用同一种关系(AND或OR)来联结
ARITHMETIC, ///< 算术运算
AGGREGATION, ///< 聚合运算
SYSFUNCTION, ///< 函数(length、round、date_format)
SUBQUERY, ///< 子查询
EXPRLIST, ///< 表达式集合
};
这是所有的表达式类型。由于表达式的复杂性,因此其在语法解析的时候就会被使用,语法解析时,其Field类型均为 UNBOUND_FIELD 即只包含字符串的未绑定对象。
在绑定阶段需要将所有Expression中的 UNBOUND 表达式进行绑定,可以对树形结构进行遍历处理。
在这个阶段之后,所有的Expression 中就不再有 UNBOUND 对象了。
再像 FilterStmt ,where条件被很多语句都有使用,并且通常包含Expression对象,经常被复用因此提取出来作为一个完整的Stmt简化代码。
Stage关键代码:
ParsedSqlNode *sql_node = sql_event->sql_node().get();
Stmt *stmt = nullptr;
rc = Stmt::create_stmt(db, *sql_node, stmt);
if (rc != RC::SUCCESS && rc != RC::UNIMPLEMENTED) {
LOG_WARN("failed to create stmt. rc=%d:%s", rc, strrc(rc));
sql_result->set_return_code(rc);
return rc;
}
sql_event->set_stmt(stmt);
父类的 create_stmt 通过判断 StmtType 进行多态的调用。
通过处理后,得到了当前sql语句绑定对象后的数据结构。
优化阶段(生成计划节点,逻辑优化)
当语义分析(Analyze)完成后,系统根据 SQL Stmt 生成一个初始的逻辑执行计划树。计划树采用火山模型(Volcano Model)描述执行算子之间的数据流关系,以迭代器接口(open-next-close)为核心抽象。
每个计划节点(Plan)代表一个“逻辑操作”(Logical Operator),例如:
- Scan计划(表扫描)
- Filter计划(选择谓词)
- Join计划(连接操作)
- Project计划(字段选择)
- GroupAgg计划(分组聚合)
- Sort计划(排序)
- Limit计划(限制输出)
典型逻辑计划示例
SQL:
SELECT name FROM customers WHERE age > 30;
初始(未优化)逻辑计划树:
Project[name]
│
▼
Filter[age > 30]
│
▼
SeqScan[customers]
或者对于稍微复杂一点的,多个JOIN ON 条件,就会形成树状结构。
常见逻辑优化策略
优化器对逻辑计划进行变换以提升性能,主要包括:
-
谓词下推(Predicate Pushdown)
将 WHERE 条件尽量靠近数据源,以减少上层数据处理量
-
投影下推(Projection Pushdown)
去除不必要字段,减少列数据传输
-
连接顺序优化(Join Order Optimization)
使用基于贪心或代价模型评估连接顺序,例如启发式规则:
- 小表/高选择性过滤优先参与连接
- 依据统计信息选择左深树
-
消除冗余子表达式(CSE)
避免重复计算表达式或子查询
-
简化谓词表达式
如合并范围条件、消除永真与永假条件
逻辑计划节点不包含任何执行代码,其主要是火山模型的数据载体,其包含了所有物理计划执行的关键数据。
根据优化后,再转化成物理计划,物理计划节点类有完整的Next,open,close的函数定义,可以用于执行。物理计划树也就是最后运行的火山模型。
DDL语句的处理
对于DDL语句,例如删除表,建表,都是不需要优化的,因此此处会直接返回 RC::UNIMPLEMENTED 也就是为什么判断条件多一个判断是否是 RC::UNIMPLEMENTED 。
执行阶段
执行阶段的输入是可运行的火山模型物理计划树,只需要获取最顶上的schema,再对上面进行open和next执行。但在miniob中,执行阶段直接执行DDL语句但不直接执行DML语句:
RC ExecuteStage::handle_request(SQLStageEvent *sql_event)
{
RC rc = RC::SUCCESS;
const unique_ptr<PhysicalOperator> &physical_operator = sql_event->physical_operator();
// DDL 语句 physical_operator 是 null,会跳过
if (physical_operator != nullptr) {
// DDL 语句分支
return handle_request_with_physical_operator(sql_event);
}
// DDL 语句分支,只有DDL会执行这里
SessionEvent *session_event = sql_event->session_event();
Stmt *stmt = sql_event->stmt();
if (stmt != nullptr) {
// DDL 语句核心段
CommandExecutor command_executor;
rc = command_executor.execute(sql_event);
session_event->sql_result()->set_return_code(rc);
} else {
return RC::INTERNAL;
}
return rc;
}
DDL语句的执行
每个DDL都不依赖计划树和火山模型进行处理,因此miniob隔离出 executor 来负责DDL语句的执行,例如:
RC CreateTableExecutor::execute(SQLStageEvent *sql_event)
{
Stmt *stmt = sql_event->stmt();
Session *session = sql_event->session_event()->session();
ASSERT(stmt->type() == StmtType::CREATE_TABLE,
"create table executor can not run this command: %d",
static_cast<int>(stmt->type()));
CreateTableStmt *create_table_stmt = static_cast<CreateTableStmt *>(stmt);
const char *table_name = create_table_stmt->table_name().c_str();
RC rc = session->get_current_db()->create_table(table_name, create_table_stmt->attr_infos(), create_table_stmt->primary_keys(), create_table_stmt->storage_format());
return rc;
}
每个DDL语句都有自己的Executor,execute直接提取event中的Stmt数据,进行直接处理。
DDL的语句执行依赖 CommandExecutor ,会直接在上面标记的 rc = command_executor.execute(sql_event); 中判断Stmt的类型找到对应的Executor直接执行。
DML语句预准备
DML语句进入以下函数
RC ExecuteStage::handle_request_with_physical_operator(SQLStageEvent *sql_event)
{
// DML 语句核心段
RC rc = RC::SUCCESS;
unique_ptr<PhysicalOperator> &physical_operator = sql_event->physical_operator();
ASSERT(physical_operator != nullptr, "physical operator should not be null");
SqlResult *sql_result = sql_event->session_event()->sql_result();
sql_result->set_operator(std::move(physical_operator));
return rc;
}
可以看到,其只是将operator设置到了SqlResult当中,并没有执行(见以下补充)
void SqlResult::set_operator(unique_ptr<PhysicalOperator> oper)
{
ASSERT(operator_ == nullptr, "current operator is not null. Result is not closed?");
operator_ = std::move(oper);
operator_->tuple_schema(tuple_schema_);
}
而仅仅只是设置了tuple_schema,因此运行时错误无法在这里被捕捉。
结果输出阶段
在 handle_sql 之后有以下的代码
rc = handle_sql(&sql_event);
if (OB_FAIL(rc)) {
LOG_TRACE("failed to handle sql. rc=%s", strrc(rc));
event->sql_result()->set_return_code(rc);
}
bool need_disconnect = false;
rc = communicator->write_result(event, need_disconnect);
LOG_INFO("write result return %s", strrc(rc));
event->session()->set_current_request(nullptr);
Session::set_current_session(nullptr);
delete event;
if (need_disconnect) {
return RC::INTERNAL;
}
return RC::SUCCESS;
对于结果的输出可能有多种不同的输出方式,其均交给不同的 communicator 自己的执行来决定。
例如此处:
RC PlainCommunicator::write_tuple_result(SqlResult *sql_result)
{
RC rc = RC::SUCCESS;
Tuple *tuple = nullptr;
while (RC::SUCCESS == (rc = sql_result->next_tuple(tuple))) {
assert(tuple != nullptr);
int cell_num = tuple->cell_num();
for (int i = 0; i < cell_num; i++) {
if (i != 0) {
const char *delim = " | ";
rc = writer_->writen(delim, strlen(delim));
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
}
Value value;
rc = tuple->cell_at(i, value);
if (rc != RC::SUCCESS) {
LOG_WARN("failed to get tuple cell value. rc=%s", strrc(rc));
sql_result->close();
return rc;
}
string cell_str = value.to_string();
rc = writer_->writen(cell_str.data(), cell_str.size());
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
}
char newline = '\\n';
rc = writer_->writen(&newline, 1);
if (OB_FAIL(rc)) {
LOG_WARN("failed to send data to client. err=%s", strerror(errno));
sql_result->close();
return rc;
}
}
if (rc == RC::RECORD_EOF) {
rc = RC::SUCCESS;
}
sql_result->set_return_code(rc);
return rc;
}
其获取物理计划执行后,将结果以Tuple的载体获取,再将数据按文本提取,以 | 作为分隔符,将其输出。
至此SQL执行全流程完毕, communicator 自己将自己的数据输出,其中在 SELECT 遇到问题时,应该立即清除所有的缓冲区文本,将 FAILURE 写入输出流。
class SqlResult;
/**
* @brief 与客户端进行通讯
* @ingroup Communicator
* @details 使用简单的文本通讯协议,每个消息使用'\\0'结尾
*/
class PlainCommunicator : public Communicator
{
public:
PlainCommunicator();
virtual ~PlainCommunicator() = default;
RC read_event(SessionEvent *&event) override;
RC write_result(SessionEvent *event, bool &need_disconnect) override;
private:
RC write_state(SessionEvent *event, bool &need_disconnect);
RC write_debug(SessionEvent *event, bool &need_disconnect);
RC write_result_internal(SessionEvent *event, bool &need_disconnect);
RC write_tuple_result(SqlResult *sql_result);
RC write_chunk_result(SqlResult *sql_result);
protected:
vector<char> send_message_delimiter_; ///< 发送消息分隔符
vector<char> debug_message_prefix_; ///< 调试信息前缀
};
以上是一个PlainCommunicator 的示例。其核心逻辑如下:
函数整体功能概述
PlainCommunicator::write_result 用于将 SQL 执行结果通过网络 writer 发送给客户端,并在需要时断开连接。其内部主要流程:
- 调用
write_result_internal来输出真正的查询结果(表头、数据、状态信息等) - 如果仍然不需要断开连接,则尝试发送 Debug 信息
- 再发送消息分隔符(send_message_delimiter)
- 最后执行 flush 将缓存数据推送出去
只有在任何阶段出现错误时才会标记 need_disconnect = true 并提前返回。
核心逻辑 write_result_internal 的详细解析
该函数接收 SessionEvent,对其中的 SqlResult 进行解析与发送。
主要流程如下:
-
初始化
need_disconnect = true默认认为通讯一次即结束,只有成功处理并发送完毕才会重置为 false
-
获取 SQL 执行结果
SqlResult *sql_result = event->sql_result() -
若 SQL 执行失败或没有可执行算子(非 SELECT,如 INSERT / DELETE)
直接调用
write_state返回状态信息,结束通讯即:这类语句一般没有行与列,不需要返回 tuple
-
尝试
sql_result->open()执行算子准备阶段如果失败则关闭 result 并返回状态信息
-
输出表头
遍历
schema.cell_num()获取各列 alias用
" | "分隔,最后添加一个换行符 -
根据执行模式发送结果数据:
- 若为 Chunk 模式:调用
write_chunk_result - 否则调用
write_tuple_result
- 若为 Chunk 模式:调用
-
如果输出数据失败
清空 writer、关闭 result、返回状态信息
-
如果
cell_num == 0(例如 INSERT/DELETE)表示没有输出数据
→ 写状态信息并返回
-
如果成功输出结果则:
- 设置
need_disconnect = false(客户端可继续发送请求) - 关闭 result 并返回成功状态码
- 设置
总结流程图
write_result
├─ write_result_internal
│ ├─ 若失败或非 SELECT → write_state → need_disconnect =true
│ ├─ 输出列头
│ ├─ 输出 tuple/chunk 结果
│ ├─ 若 cell_num ==0 → write_state → need_disconnect =true
│ ├─ 成功 → need_disconnect =false
│ └─return rc
├─write_debug (若不需要断开)
├─ 写入消息分隔符
├─ flush
└─return rc