当前位置:首页 > 开发教程 > 数据库 >

Postgres中UPDATE更新语句源码分析

时间:2022-03-16 21:56 来源:未知 作者:才久居我心 收藏

这篇文章主要给大家介绍了关于Postgres中UPDATE更新语句源码分析的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

PG中UPDATE源码分析

本文主要描述SQL中UPDATE语句的源码分析,代码为PG13.3版本。

整体流程分析

update dtea set id = 1;这条最简单的Update语句进行源码分析(dtea不是分区表,不考虑并行等,没有建立任何索引),帮助我们理解update的大致流程。

SQL流程如下:

  • parser(语法解析,生成语法解析树UpdateStmt,检查是否有语法层面的错误)

  • analyze(语义分析, UpdateStmt转为查询树Query, 会查系统表检查有无语义方面的错误)

  • rewrite(规则重写, 根据规则rules重写查询树Query, 根据事先存储在系统表中的规则进行重写,没有的话不进行重写,另外加一句,视图的实现是根据规则系统实现的,也是在这里需要进行处理)

  • optimizer(优化器:逻辑优化、物理优化、生成执行计划, 由Query生成对应的执行计划PlannedStmt, 基于代价的优化器,由最佳路径Path生成最佳执行计划Plan)

  • executor(执行器,会有各种算子,依据执行计划进行处理,火山模型,一次一元组)

  • storage(存储引擎)。中间还有事务处理。事务处理部分的代码这里不再进行分析,免得将问题复杂化。存储引擎那部分也不进行分析,重点关注解析、优化、执行这三部分。

对应的代码:

exec_simple_query(constchar*query_string)
//-------解析器部分--------------
-->pg_parse_query(query_string);//生成语法解析树
-->pg_analyze_and_rewrite(parsetree,query_string,NULL,0,NULL);//生成查询树Query
-->parse_analyze(parsetree,query_string,paramTypes,numParams,queryEnv);//语义分析
-->pg_rewrite_query(query);//规则重写

//--------优化器----------
-->pg_plan_queries()

//--------执行器----------
-->PortalStart(portal,NULL,0,InvalidSnapshot);
-->PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);//执行器执行
-->PortalDrop(portal,false);

解析部分——生成语法解析树UpdateStmt

关键数据结构:UpdateStmtRangeVarResTarget:

/*UpdateStatement*/
typedefstructUpdateStmt
{
NodeTagtype;
RangeVar*relation;/*relationtoupdate*/
List*targetList;/*thetargetlist(ofResTarget)*///对应语句中的setid=0;信息在这里
Node*whereClause;/*qualifications*/
List*fromClause;/*optionalfromclauseformoretables*/
List*returningList;/*listofexpressionstoreturn*/
WithClause*withClause;/*WITHclause*/
}UpdateStmt;

//dtea表
typedefstructRangeVar
{
NodeTagtype;
char*catalogname;/*thecatalog(database)name,orNULL*/
char*schemaname;/*theschemaname,orNULL*/
char*relname;/*therelation/sequencename*/
boolinh;/*expandrelbyinheritancerecursivelyact
*onchildren*/
charrelpersistence;/*seeRELPERSISTENCE_*inpg_class.h*/
Alias*alias;/*tablealias&optionalcolumnaliases*/
intlocation;/*tokenlocation,or-1ifunknown*/
}RangeVar;

//setid=0;经transformTargetList()->transformTargetEntry,会转为TargetEntry
typedefstructResTarget
{
NodeTagtype;
char*name;/*columnnameorNULL*///idcolumn
List*indirection;/*subscripts,fieldnames,and'*',orNIL*/
Node*val;/*thevalueexpressiontocomputeorassign*///=1表达式节点存在这里
intlocation;/*tokenlocation,or-1ifunknown*/
}ResTarget;

用户输入的update语句update dtea set id = 1由字符串会转为可由数据库理解的内部数据结构语法解析树UpdateStmt。执行逻辑在pg_parse_query(query_string);中,需要理解flex与bison。

gram.y中Update语法的定义:

/*****************************************************************************
*QUERY:
*UpdateStmt(UPDATE)
*****************************************************************************/
//结合这条语句分析updatedteasetid=0;
UpdateStmt:opt_with_clauseUPDATErelation_expr_opt_alias
SETset_clause_listfrom_clausewhere_or_current_clausereturning_clause
{
UpdateStmt*n=makeNode(UpdateStmt);
n->relation=$3;
n->targetList=$5;
n->fromClause=$6;
n->whereClause=$7;
n->returningList=$8;
n->withClause=$1;
$$=(Node*)n;
}
;

set_clause_list:
set_clause{$$=$1;}
|set_clause_list','set_clause{$$=list_concat($1,$3);}
;
//对应的是setid=0
set_clause://id=0
set_target'='a_expr
{
$1->val=(Node*)$3;
$$=list_make1($1);
}
|'('set_target_list')''='a_expr
{
intncolumns=list_length($2);
inti=1;
ListCell*col_cell;

foreach(col_cell,$2)/*CreateaMultiAssignRefsourceforeachtarget*/
{
ResTarget*res_col=(ResTarget*)lfirst(col_cell);
MultiAssignRef*r=makeNode(MultiAssignRef);

r->source=(Node*)$5;
r->colno=i;
r->ncolumns=ncolumns;
res_col->val=(Node*)r;
i++;
}

$$=$2;
}
;

set_target:
ColIdopt_indirection
{
$$=makeNode(ResTarget);
$$->name=$1;
$$->indirection=check_indirection($2,yyscanner);
$$->val=NULL;/*upperproductionsetsthis*/
$$->location=@1;
}
;

set_target_list:
set_target{$$=list_make1($1);}
|set_target_list','set_target{$$=lappend($1,$3);}
;

解析部分——生成查询树Query

生成了UpdateStmt后, 会经由parse_analyze语义分析,生成查询树Query,以供后续优化器生成执行计划。主要代码在src/backent/parser/analyze.c

analyze.c : transform the raw parse tree into a query tree

parse_analyze()
-->transformTopLevelStmt(pstate,parseTree);
-->transformOptionalSelectInto(pstate,parseTree->stmt);
-->transformStmt(pstate,parseTree);
//transformsanupdatestatement
-->transformUpdateStmt(pstate,(UpdateStmt*)parseTree);//实际由UpdateStmt转为Query的处理函数

具体的我们看一下transformUpdateStmt函数实现:

/*transformUpdateStmt-transformsanupdatestatement*/
staticQuery*transformUpdateStmt(ParseState*pstate,UpdateStmt*stmt){
Query*qry=makeNode(Query);
ParseNamespaceItem*nsitem;
Node*qual;

qry->commandType=CMD_UPDATE;
pstate->p_is_insert=false;

/*processtheWITHclauseindependentlyofallelse*/
if(stmt->withClause){
qry->hasRecursive=stmt->withClause->recursive;
qry->cteList=transformWithClause(pstate,stmt->withClause);
qry->hasModifyingCTE=pstate->p_hasModifyingCTE;
}

qry->resultRelation=setTargetTable(pstate,stmt->relation,stmt->relation->inh,true,ACL_UPDATE);
nsitem=pstate->p_target_nsitem;

/*subqueriesinFROMcannotaccesstheresultrelation*/
nsitem->p_lateral_only=true;
nsitem->p_lateral_ok=false;

/*theFROMclauseisnon-standardSQLsyntax.WeusedtobeabletodothiswithREPLACEinPOSTQUELsowekeepthefeature.*/
transformFromClause(pstate,stmt->fromClause);

/*remainingclausescanreferencetheresultrelationnormally*/
nsitem->p_lateral_only=false;
nsitem->p_lateral_ok=true;

qual=transformWhereClause(pstate,stmt->whereClause,EXPR_KIND_WHERE,"WHERE");
qry->returningList=transformReturningList(pstate,stmt->returningList);

/*NowwearedonewithSELECT-likeprocessing,andcangetonwith
*transformingthetargetlisttomatchtheUPDATEtargetcolumns.*/
qry->targetList=transformUpdateTargetList(pstate,stmt->targetList);//处理SQL语句中的setid=1

qry->rtable=pstate->p_rtable;
qry->jointree=makeFromExpr(pstate->p_joinlist,qual);
qry->hasTargetSRFs=pstate->p_hasTargetSRFs;
qry->hasSubLinks=pstate->p_hasSubLinks;

assign_query_collations(pstate,qry);

returnqry;
}

这里面要重点关注一下transformTargetList,会将抽象语法树中的ResTarget转为查询器的TargetEntry

typedefstructTargetEntry
{
Exprxpr;
Expr*expr;/*expressiontoevaluate*/
AttrNumberresno;/*attributenumber(seenotesabove)*/
char*resname;/*nameofthecolumn(couldbeNULL)*/
Indexressortgroupref;/*nonzeroifreferencedbyasort/groupclause*/
Oidresorigtbl;/*OIDofcolumn'ssourcetable*/
AttrNumberresorigcol;/*column'snumberinsourcetable*/
boolresjunk;/*settotruetoeliminatetheattributefromfinaltargetlist*/
}TargetEntry;

对于其内部处理可参考源码src/backend/parser中的相关处理,这里不再细述。需要重点阅读一下README,PG源码中所有的README都是非常好的资料,一定要认真读。

优化器——生成执行计划

这块的内容很多,主要的逻辑是先进行逻辑优化,比如子查询、子链接、常量表达式、选择下推等等的处理,因为我们要分析的这条语句十分简单,所以逻辑优化的这部分都没有涉及到。物理优化,涉及到选择率,代价估计,索引扫描还是顺序扫描,选择那种连接方式,应用动态规划呢还是基因算法,选择nestloop-join、merge-join还是hash-join等。因为我们这个表没有建索引,更新单表也不涉及到多表连接,所以物理优化这块涉及的也不多。路径生成,生成最佳路径,再由最佳路径生成执行计划。

在路径生成这块,最基础的是对表的扫描方式,比如顺序扫描、索引扫描,再往上是连接方式,采用那种连接方式,再往上是比如排序、Limit等路径......,由底向上生成路径。我们要分析的语句很简单,没有其他处理,就顺序扫描再更新就可以了。

这里先不考虑并行执行计划。我们先看一下其执行计划结果:

postgres@postgres=#explainupdatedteasetid=0;
QUERYPLAN
--------------------------------------------------------------
Updateondtea(cost=0.00..19.00rows=900width=68)
->SeqScanondtea(cost=0.00..19.00rows=900width=68)
(2rows)

下面我们分析一下其执行计划的生成流程:

//由查询树Query-->Path-->Plan(PlannedStmt)
pg_plan_queries()
-->pg_plan_query()
-->planner()
-->standard_planner(Query*parse,constchar*query_string,intcursorOptions,ParamListInfoboundParams)
//由Query--->PlannerInfo
-->subquery_planner(glob,parse,NULL,false,tuple_fraction);//涉及到很多逻辑优化的内容,很多不列出
-->pull_up_sublinks(root);
-->pull_up_subqueries(root);//这里只列出几个重要的逻辑优化内容,其他的不再列出......
//如果是update/delete分区表继承表则走inheritance_planner(),其他情况走grouping_planner()
-->inheritance_planner()//update/delete分区表继承表的情况
-->grouping_planner()
-->grouping_planner()//非分区表、继承表的情况
-->preprocess_targetlist(root);//update虽然只更新一列,但是插入一条新元组的时候,需要知道其他列信息.
-->rewriteTargetListUD(parse,target_rte,target_relation);
-->expand_targetlist()
-->query_planner(root,standard_qp_callback,&qp_extra);//重要
-->add_base_rels_to_query()
-->deconstruct_jointree(root);
-->add_other_rels_to_query(root);//展开分区表到PlannerInfo中的相关字段中
-->expand_inherited_rtentry()
-->expand_planner_arrays(root,num_live_parts);
-->make_one_rel(root,joinlist);
-->set_base_rel_sizes(root);
-->set_rel_size();
-->set_append_rel_size(root,rel,rti,rte);//如果是分区表或者继承走这里,否则走下面
-->set_rel_size(root,childrel,childRTindex,childRTE);//处理子分区表
-->set_plain_rel_size(root,rel,rte);
-->set_plain_rel_size()//如果不是分区表或者继承
-->set_baserel_size_estimates()
-->set_base_rel_pathlists(root);
-->set_rel_pathlist(root,rel,rti,root->simple_rte_array[rti]);
-->set_append_rel_pathlist(root,rel,rti,rte);//生成各分区表的访问路径
-->make_rel_from_joinlist(root,joinlist);//动态规划还是基因规划
-->standard_join_search()//动态规划
-->geqo()//基因规划与动态规划二选一
-->apply_scanjoin_target_to_paths()
-->create_modifytable_path()
//由PlannerInfo--->RelOptInfo
-->fetch_upper_rel(root,UPPERREL_FINAL,NULL);
//由RelOptInfo--->Path
-->get_cheapest_fractional_path(final_rel,tuple_fraction);
//由PlannerInfo+Path--->Plan
-->create_plan(root,best_path);
//后续处理,由Plan--->PlannedStmt

核心数据结构:PlannedStmt、PlannerInfo、RelOptInfo(存储访问路径及其代价)、Path

Path:所有的路径都继承自Path,所以这个比较重要。

typedefstructPath
{
NodeTagtype;
NodeTagpathtype;/*tagidentifyingscan/joinmethod*/

RelOptInfo*parent;/*therelationthispathcanbuild*/
PathTarget*pathtarget;/*listofVars/Exprs,cost,width*/

ParamPathInfo*param_info;/*parameterizationinfo,orNULLifnone*/

boolparallel_aware;/*engageparallel-awarelogic*/
boolparallel_safe;/*OKtouseaspartofparallelplan*/
intparallel_workers;/*desired#ofworkers;0=notparallel*/

/*estimatedsize/costsforpath(seecostsize.cformoreinfo)*/
doublerows;/*estimatednumberofresulttuples*/
Coststartup_cost;/*costexpendedbeforefetchinganytuples*/
Costtotal_cost;/*totalcost(assumingalltuplesfetched)*/

List*pathkeys;/*sortorderingofpath'soutput*/
/*pathkeysisaListofPathKeynodes;seeabove*/
}Path;

/*ModifyTablePathrepresentsperformingINSERT/UPDATE/DELETEmodifications
*WerepresentmostthingsthatwillbeintheModifyTableplannode
*literally,exceptwehavechildPath(s)notPlan(s).Butanalysisofthe
*OnConflictExprisdeferredtocreateplan.c,asiscollectionofFDWdata.*/
typedefstructModifyTablePath
{
Pathpath;//可以看到ModifyTablePath继承自Path
CmdTypeoperation;/*INSERT,UPDATE,orDELETE*/
boolcanSetTag;/*dowesetthecommandtag/es_processed*/
IndexnominalRelation;/*ParentRTindexforuseofEXPLAIN*/
IndexrootRelation;/*RootRTindex,iftargetispartitioned*/
boolpartColsUpdated;/*somepartkeyinhierarchyupdated*/
List*resultRelations;/*integerlistofRTindexes*/
List*subpaths;/*Path(s)producingsourcedata*/
List*subroots;/*per-target-tablePlannerInfos*/
List*withCheckOptionLists;/*per-target-tableWCOlists*/
List*returningLists;/*per-target-tableRETURNINGtlists*/
List*rowMarks;/*PlanRowMarks(non-lockingonly)*/
OnConflictExpr*onconflict;/*ONCONFLICTclause,orNULL*/
intepqParam;/*IDofParamforEvalPlanQualre-eval*/
}ModifyTablePath;

生成update执行路径,最终都是要生成ModifyTablePath,本例中路径生成过程:Path-->ProjectionPath-->ModifyTablePath,也就是先顺序扫描表,再修改表。后面由路径生成执行计划。

/*create_modifytable_path
*CreatesapathnodethatrepresentsperformingINSERT/UPDATE/DELETEmods
*
*'rel'istheparentrelationassociatedwiththeresult
*'resultRelations'isanintegerlistofactualRTindexesoftargetrel(s)
*'subpaths'isalistofPath(s)producingsourcedata(oneperrel)
*'subroots'isalistofPlannerInfostructs(oneperrel)*/
ModifyTablePath*create_modifytable_path(PlannerInfo*root,RelOptInfo*rel,
CmdTypeoperation,boolcanSetTag,
IndexnominalRelation,IndexrootRelation,
boolpartColsUpdated,
List*resultRelations,List*subpaths,
List*subroots,
List*withCheckOptionLists,List*returningLists,
List*rowMarks,OnConflictExpr*onconflict,
intepqParam)
{
ModifyTablePath*pathnode=makeNode(ModifyTablePath);
doubletotal_size;
ListCell*lc;

Assert(list_length(resultRelations)==list_length(subpaths));
Assert(list_length(resultRelations)==list_length(subroots));
Assert(withCheckOptionLists==NIL||list_length(resultRelations)==list_length(withCheckOptionLists));
Assert(returningLists==NIL||list_length(resultRelations)==list_length(returningLists));

pathnode->path.pathtype=T_ModifyTable;
pathnode->path.parent=rel;

pathnode->path.pathtarget=rel->reltarget;/*pathtargetisnotinteresting,justmakeitminimallyvalid*/
/*Fornow,assumeweareaboveanyjoins,sonoparameterization*/
pathnode->path.param_info=NULL;
pathnode->path.parallel_aware=false;
pathnode->path.parallel_safe=false;
pathnode->path.parallel_workers=0;
pathnode->path.pathkeys=NIL;

/**Computecost&rowcountassumofsubpathcosts&rowcounts.
*
*Currently,wedon'tchargeanythingextrafortheactualtable
*modificationwork,norfortheWITHCHECKOPTIONSorRETURNING
*expressionsifany.Itwouldonlybewindowdressing,since
*ModifyTableisalwaysatop-levelnodeandthereisnowayforthe
*coststochangeanyhigher-levelplanningchoices.Butwemightwant
*tomakeitlookbettersometime.*/
pathnode->path.startup_cost=0;
pathnode->path.total_cost=0;
pathnode->path.rows=0;
total_size=0;
foreach(lc,subpaths)
{
Path*subpath=(Path*)lfirst(lc);

if(lc==list_head(subpaths))/*firstnode*/
pathnode->path.startup_cost=subpath->startup_cost;
pathnode->path.total_cost+=subpath->total_cost;
pathnode->path.rows+=subpath->rows;
total_size+=subpath->pathtarget->width*subpath->rows;
}

/*Setwidthtotheaveragewidthofthesubpathoutputs.XXXthisis
*totallywrong:weshouldreportzeroifnoRETURNING,elseanaverage
*oftheRETURNINGtlistwidths.Butit'swhathappenedhistorically,
*andimprovingitisataskforanotherday.*/
if(pathnode->path.rows>0)
total_size/=pathnode->path.rows;
pathnode->path.pathtarget->width=rint(total_size);

pathnode->operation=operation;
pathnode->canSetTag=canSetTag;
pathnode->nominalRelation=nominalRelation;
pathnode->rootRelation=rootRelation;
pathnode->partColsUpdated=partColsUpdated;
pathnode->resultRelations=resultRelations;
pathnode->subpaths=subpaths;
pathnode->subroots=subroots;
pathnode->withCheckOptionLists=withCheckOptionLists;
pathnode->returningLists=returningLists;
pathnode->rowMarks=rowMarks;
pathnode->onconflict=onconflict;
pathnode->epqParam=epqParam;

returnpathnode;
}

现在我们生成了最优的update路径,需要由路径生成执行计划:

Plan*create_plan(PlannerInfo*root,Path*best_path)
{
Plan*plan;
Assert(root->plan_params==NIL);/*plan_paramsshouldnotbeinuseincurrentquerylevel*/

/*Initializethismodule'sworkspaceinPlannerInfo*/
root->curOuterRels=NULL;
root->curOuterParams=NIL;

/*Recursivelyprocessthepathtree,demandingthecorrecttlistresult*/
plan=create_plan_recurse(root,best_path,CP_EXACT_TLIST);//实际实现是在这里

/**Makesurethetopmostplannode'stargetlistexposestheoriginal
*columnnamesandotherdecorativeinfo.Targetlistsgeneratedwithin
*theplannerdon'tbotherwiththatstuff,butwemusthaveitonthe
*top-leveltlistseenatexecutiontime.However,ModifyTableplan
*nodesdon'thaveatlistmatchingthequerytreetargetlist.*/
if(!IsA(plan,ModifyTable))
apply_tlist_labeling(plan->targetlist,root->processed_tlist);

/**AttachanyinitPlanscreatedinthisqueryleveltothetopmostplan
*node.(Inprincipletheinitplanscouldgoinanyplannodeator
*abovewherethey'rereferenced,butthereseemsnoreasontoputthem
*anylowerthanthetopmostnodeforthequerylevel.Also,see
*commentsforSS_finalize_planbeforeyoutrytochangethis.)*/
SS_attach_initplans(root,plan);

/*CheckwesuccessfullyassignedallNestLoopParamstoplannodes*/
if(root->curOuterParams!=NIL)
elog(ERROR,"failedtoassignallNestLoopParamstoplannodes");

/**Resetplan_paramstoensureparamIDsusedfornestloopparamsarenotre-usedlater*/
root->plan_params=NIL;

returnplan;
}

//由最佳路径生成最佳执行计划
staticModifyTable*create_modifytable_plan(PlannerInfo*root,ModifyTablePath*best_path)
{
ModifyTable*plan;
List*subplans=NIL;
ListCell*subpaths,
*subroots;

/*Buildtheplanforeachinputpath*/
forboth(subpaths,best_path->subpaths,subroots,best_path->subroots)
{
Path*subpath=(Path*)lfirst(subpaths);
PlannerInfo*subroot=(PlannerInfo*)lfirst(subroots);
Plan*subplan;

/*InaninheritedUPDATE/DELETE,referencetheper-childmodified
*subrootwhilecreatingPlansfromPathsforthechildrel.Thisis
*akluge,butotherwiseit'stoohardtoensurethatPlancreation
*functions(particularlyinFDWs)don'tdependonthecontentsof
*"root"matchingwhattheysawatPathcreationtime.Themain
*downsideisthatcreationfunctionsforPlansthatmightappear
*belowaModifyTablecannotexpecttomodifythecontentsof"root"
*andhaveit"stick"forsubsequentprocessingsuchassetrefs.c.
*That'snotgreat,butitseemsbetterthanthealternative.*/
subplan=create_plan_recurse(subroot,subpath,CP_EXACT_TLIST);

/*Transferresname/resjunklabeling,too,tokeepexecutorhappy*/
apply_tlist_labeling(subplan->targetlist,subroot->processed_tlist);

subplans=lappend(subplans,subplan);
}

plan=make_modifytable(root,best_path->operation,best_path->canSetTag,
best_path->nominalRelation,best_path->rootRelation,
best_path->partColsUpdated,best_path->resultRelations,
subplans,best_path->subroots,best_path->withCheckOptionLists,
best_path->returningLists,best_path->rowMarks,
best_path->onconflict,best_path->epqParam);

copy_generic_path_info(&plan->plan,&best_path->path);

returnplan;
}

最终的执行计划是ModifyTable:

/*----------------
*ModifyTablenode-
*Applyrowsproducedbysubplan(s)toresulttable(s),
*byinserting,updating,ordeleting.
*
*Iftheoriginallynamedtargettableisapartitionedtable,both
*nominalRelationandrootRelationcontaintheRTindexofthepartition
*root,whichisnototherwisementionedintheplan.OtherwiserootRelation
*iszero.However,nominalRelationwillalwaysbeset,asit'stherelthat
*EXPLAINshouldclaimistheINSERT/UPDATE/DELETEtarget.
*
*NotethatrowMarksandepqParamarepresumedtobevalidforallthe
*subplan(s);theycan'tcontainanyinfothatvariesacrosssubplans.
*----------------*/
typedefstructModifyTable
{
Planplan;
CmdTypeoperation;/*INSERT,UPDATE,orDELETE*/
boolcanSetTag;/*dowesetthecommandtag/es_processed*/
IndexnominalRelation;/*ParentRTindexforuseofEXPLAIN*/
IndexrootRelation;/*RootRTindex,iftargetispartitioned*/
boolpartColsUpdated;/*somepartkeyinhierarchyupdated*/
List*resultRelations;/*integerlistofRTindexes*/
intresultRelIndex;/*indexoffirstresultRelinplan'slist*/
introotResultRelIndex;/*indexofthepartitionedtableroot*/
List*plans;/*plan(s)producingsourcedata*/
List*withCheckOptionLists;/*per-target-tableWCOlists*/
List*returningLists;/*per-target-tableRETURNINGtlists*/
List*fdwPrivLists;/*per-target-tableFDWprivatedatalists*/
Bitmapset*fdwDirectModifyPlans;/*indicesofFDWDMplans*/
List*rowMarks;/*PlanRowMarks(non-lockingonly)*/
intepqParam;/*IDofParamforEvalPlanQualre-eval*/
OnConflictActiononConflictAction;/*ONCONFLICTaction*/
List*arbiterIndexes;/*ListofONCONFLICTarbiterindexOIDs*/
List*onConflictSet;/*SETforINSERTONCONFLICTDOUPDATE*/
Node*onConflictWhere;/*WHEREforONCONFLICTUPDATE*/
IndexexclRelRTI;/*RTIoftheEXCLUDEDpseudorelation*/
List*exclRelTlist;/*tlistoftheEXCLUDEDpseudorelation*/
}ModifyTable;

执行器

根据上面的执行计划,去执行。主要是各种算子的实现,其中要理解执行器的运行原理,主要是火山模型,一次一元组。我们看一下其调用过程。

CreatePortal("",true,true);
PortalDefineQuery(portal,NULL,query_string,commandTag,plantree_list,NULL);
PortalStart(portal,NULL,0,InvalidSnapshot);
PortalRun(portal,FETCH_ALL,true,true,receiver,receiver,&qc);
-->PortalRunMulti()
-->ProcessQuery()
-->ExecutorStart(queryDesc,0);
-->standard_ExecutorStart()
-->estate=CreateExecutorState();//创建EState
-->estate->es_output_cid=GetCurrentCommandId(true);//获得cid,后面更新的时候要用
-->InitPlan(queryDesc,eflags);
-->ExecInitNode(plan,estate,eflags);
-->ExecInitModifyTable()//初始化ModifyTableState
-->ExecutorRun(queryDesc,ForwardScanDirection,0L,true);
-->standard_ExecutorRun()
-->ExecutePlan()
-->ExecProcNode(planstate);//一次一元组火山模型
-->node->ExecProcNode(node);
-->ExecProcNodeFirst(PlanState*node)
-->node->ExecProcNode(node);
-->ExecModifyTable(PlanState*pstate)
-->ExecUpdate()
-->table_tuple_update(Relationrel,......)
-->rel->rd_tableam->tuple_update()
-->heapam_tuple_update(Relationrelation,......)
-->heap_update(relation,otid,tuple,cid,......)

-->ExecutorFinish(queryDesc);
-->ExecutorEnd(queryDesc);
PortalDrop(portal,false);

关键数据结构:

//ModifyTableStateinformation
typedefstructModifyTableState
{
PlanStateps;/*itsfirstfieldisNodeTag*/
CmdTypeoperation;/*INSERT,UPDATE,orDELETE*/
boolcanSetTag;/*dowesetthecommandtag/es_processed*/
boolmt_done;/*arewedone*/
PlanState**mt_plans;/*subplans(onepertargetrel)*/
intmt_nplans;/*numberofplansinthearray*/
intmt_whichplan;/*whichoneisbeingexecuted(0..n-1)*/
TupleTableSlot**mt_scans;/*inputtuplecorrespondingtounderlying
*plans*/
ResultRelInfo*resultRelInfo;/*per-subplantargetrelations*/
ResultRelInfo*rootResultRelInfo;/*roottargetrelation(partitioned
*tableroot)*/
List**mt_arowmarks;/*per-subplanExecAuxRowMarklists*/
EPQStatemt_epqstate;/*forevaluatingEvalPlanQualrechecks*/
boolfireBSTriggers;/*doweneedtofirestmttriggers*/

/*Slotforstoringtuplesintherootpartitionedtable'srowtypeduring
*anUPDATEofapartitionedtable.*/
TupleTableSlot*mt_root_tuple_slot;

structPartitionTupleRouting*mt_partition_tuple_routing;/*Tuple-routingsupportinfo*/

structTransitionCaptureState*mt_transition_capture;/*controlstransitiontablepopulationforspecifiedoperation*/

/*controlstransitiontablepopulationforINSERT...ONCONFLICTUPDATE*/
structTransitionCaptureState*mt_oc_transition_capture;

/*Perplanmapfortupleconversionfromchildtoroot*/
TupleConversionMap**mt_per_subplan_tupconv_maps;
}ModifyTableState;

核心执行算子实现:

/*----------------------------------------------------------------
*ExecModifyTable
*
*Performtablemodificationsasrequired,andreturnRETURNINGresults
*ifneeded.
*----------------------------------------------------------------*/
staticTupleTableSlot*ExecModifyTable(PlanState*pstate)
{
ModifyTableState*node=castNode(ModifyTableState,pstate);
PartitionTupleRouting*proute=node->mt_partition_tuple_routing;
EState*estate=node->ps.state;
CmdTypeoperation=node->operation;
ResultRelInfo*saved_resultRelInfo;
ResultRelInfo*resultRelInfo;
PlanState*subplanstate;
JunkFilter*junkfilter;
TupleTableSlot*slot;
TupleTableSlot*planSlot;
ItemPointertupleid;
ItemPointerDatatuple_ctid;
HeapTupleDataoldtupdata;
HeapTupleoldtuple;

CHECK_FOR_INTERRUPTS();

/*ThisshouldNOTgetcalledduringEvalPlanQual;weshouldhavepasseda
*subplantreetoEvalPlanQual,instead.Usearuntimetestnotjust
*Assertbecausethisconditioniseasytomissintesting.*/
if(estate->es_epq_active!=NULL)
elog(ERROR,"ModifyTableshouldnotbecalledduringEvalPlanQual");

/*Ifwe'vealreadycompletedprocessing,don'ttrytodomore.Weneed
*thistestbecauseExecPostprocessPlanmightcallusanextratime,and
*oursubplan'snodesaren'tnecessarilyrobustagainstbeingcalled
*extratimes.*/
if(node->mt_done)
returnNULL;

/*Onfirstcall,fireBEFORESTATEMENTtriggersbeforeproceeding.*/
if(node->fireBSTriggers)
{
fireBSTriggers(node);
node->fireBSTriggers=false;
}

/*Preloadlocalvariables*/
resultRelInfo=node->resultRelInfo+node->mt_whichplan;
subplanstate=node->mt_plans[node->mt_whichplan];
junkfilter=resultRelInfo->ri_junkFilter;

/*es_result_relation_infomustpointtothecurrentlyactiveresultrelationwhilewearewithinthisModifyTablenode.
*EventhoughModifyTablenodescan'tbenestedstatically,theycanbenested
*dynamically(sinceoursubplancouldincludeareferencetoamodifying
*CTE).Sowehavetosaveandrestorethecaller'svalue.*/
saved_resultRelInfo=estate->es_result_relation_info;
estate->es_result_relation_info=resultRelInfo;

/*Fetchrowsfromsubplan(s),andexecutetherequiredtablemodificationforeachrow.*/
for(;;)
{
/*Resettheper-output-tupleexprcontext.Thisisneededbecause
*triggersexpecttousethatcontextasworkspace.It'sabitugly
*todothisbelowthetopleveloftheplan,however.Wemightneedtorethinkthislater.*/
ResetPerTupleExprContext(estate);

/*Resetper-tuplememorycontextusedforprocessingonconflictand
*returningclauses,tofreeanyexpressionevaluationstorageallocatedinthepreviouscycle.*/
if(pstate->ps_ExprContext)
ResetExprContext(pstate->ps_ExprContext);

planSlot=ExecProcNode(subplanstate);
if(TupIsNull(planSlot))
{
/*advancetonextsubplanifany*/
node->mt_whichplan++;//分区表的update,每个分区分布对应一个subplan,当执行完一个分区再执行下一个分区
if(node->mt_whichplan<node->mt_nplans)
{
resultRelInfo++;
subplanstate=node->mt_plans[node->mt_whichplan];
junkfilter=resultRelInfo->ri_junkFilter;
estate->es_result_relation_info=resultRelInfo;
EvalPlanQualSetPlan(&node->mt_epqstate,subplanstate->plan,node->mt_arowmarks[node->mt_whichplan]);
/*Preparetoconverttransitiontuplesfromthischild.*/
if(node->mt_transition_capture!=NULL){
node->mt_transition_capture->tcs_map=tupconv_map_for_subplan(node,node->mt_whichplan);
}
if(node->mt_oc_transition_capture!=NULL){
node->mt_oc_transition_capture->tcs_map=tupconv_map_for_subplan(node,node->mt_whichplan);
}
continue;
}
else
break;
}

/*Ensureinputtupleistherightformatforthetargetrelation.*/
if(node->mt_scans[node->mt_whichplan]->tts_ops!=planSlot->tts_ops){
ExecCopySlot(node->mt_scans[node->mt_whichplan],planSlot);
planSlot=node->mt_scans[node->mt_whichplan];
}

/*IfresultRelInfo->ri_usesFdwDirectModifyistrue,allweneedtodohereiscomputetheRETURNINGexpressions.*/
if(resultRelInfo->ri_usesFdwDirectModify)
{
Assert(resultRelInfo->ri_projectReturning);
slot=ExecProcessReturning(resultRelInfo->ri_projectReturning,RelationGetRelid(resultRelInfo->ri_RelationDesc),NULL,planSlot);

estate->es_result_relation_info=saved_resultRelInfo;
returnslot;
}

EvalPlanQualSetSlot(&node->mt_epqstate,planSlot);
slot=planSlot;

tupleid=NULL;
oldtuple=NULL;
if(junkfilter!=NULL)
{
/*extractthe'ctid'or'wholerow'junkattribute.*/
if(operation==CMD_UPDATE||operation==CMD_DELETE)
{
charrelkind;
Datumdatum;
boolisNull;

relkind=resultRelInfo->ri_RelationDesc->rd_rel->relkind;
if(relkind==RELKIND_RELATION||relkind==RELKIND_MATVIEW)
{
datum=ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
/*shouldn'tevergetanullresult...*/
if(isNull)
elog(ERROR,"ctidisNULL");

tupleid=(ItemPointer)DatumGetPointer(datum);
tuple_ctid=*tupleid;/*besurewedon'tfreectid!!*/
tupleid=&tuple_ctid;
}
/*Usethewholerowattribute,whenavailable,toreconstructtheoldrelationtuple.*/
elseif(AttributeNumberIsValid(junkfilter->jf_junkAttNo))
{
datum=ExecGetJunkAttribute(slot,junkfilter->jf_junkAttNo,&isNull);
/*shouldn'tevergetanullresult...*/
if(isNull)
elog(ERROR,"wholerowisNULL");

oldtupdata.t_data=DatumGetHeapTupleHeader(datum);
oldtupdata.t_len=HeapTupleHeaderGetDatumLength(oldtupdata.t_data);
ItemPointerSetInvalid(&(oldtupdata.t_self));
/*Historically,viewtriggersseeinvalidt_tableOid.*/
oldtupdata.t_tableOid=(relkind==RELKIND_VIEW)InvalidOid:RelationGetRelid(resultRelInfo->ri_RelationDesc);
oldtuple=&oldtupdata;
}
else
Assert(relkind==RELKIND_FOREIGN_TABLE);
}

/*applythejunkfilterifneeded.*/
if(operation!=CMD_DELETE)
slot=ExecFilterJunk(junkfilter,slot);
}

switch(operation)
{
caseCMD_INSERT:
if(proute)/*Preparefortupleroutingifneeded.*/
slot=ExecPrepareTupleRouting(node,estate,proute,resultRelInfo,slot);
slot=ExecInsert(node,slot,planSlot,NULL,estate->es_result_relation_info,estate,node->canSetTag);
if(proute)/*RevertExecPrepareTupleRouting'sstatechange.*/
estate->es_result_relation_info=resultRelInfo;
break;
caseCMD_UPDATE:
slot=ExecUpdate(node,tupleid,oldtuple,slot,planSlot,
&node->mt_epqstate,estate,node->canSetTag);
break;
caseCMD_DELETE:
slot=ExecDelete(node,tupleid,oldtuple,planSlot,
&node->mt_epqstate,estate,
true,node->canSetTag,false/*changingPart*/,NULL,NULL);
break;
default:
elog(ERROR,"unknownoperation");
break;
}

/*IfwegotaRETURNINGresult,returnittocaller.We'llcontinuetheworkonnextcall.*/
if(slot){
estate->es_result_relation_info=saved_resultRelInfo;
returnslot;
}
}

estate->es_result_relation_info=saved_resultRelInfo;/*Restorees_result_relation_infobeforeexiting*/
fireASTriggers(node);/*We'redone,butfireAFTERSTATEMENTtriggersbeforeexiting.*/

node->mt_done=true;

returnNULL;
}

我们看一下具体执行Update的实现

```c++
/*----------------------------------------------------------------
*ExecUpdate
*
*note:wecan'trunUPDATEquerieswithtransactionsoffbecauseUPDATEsareactuallyINSERTsandour
*scanwillmistakenlyloopforever,updatingthetupleitjustinserted..Thisshouldbefixedbutuntilit
*is,wedon'twanttogetstuckinaninfiniteloopwhichcorruptsyourdatabase..
*
*Whenupdatingatable,tupleididentifiesthetupletoupdateandoldtupleisNULL.
*
*ReturnsRETURNINGresultifany,otherwiseNULL.
*----------------------------------------------------------------*/
staticTupleTableSlot*
ExecUpdate(ModifyTableState*mtstate,
ItemPointertupleid,
HeapTupleoldtuple,
TupleTableSlot*slot,
TupleTableSlot*planSlot,
EPQState*epqstate,
EState*estate,
boolcanSetTag)
{
ResultRelInfo*resultRelInfo;
RelationresultRelationDesc;
TM_Resultresult;
TM_FailureDatatmfd;
List*recheckIndexes=NIL;
TupleConversionMap*saved_tcs_map=NULL;

/*aborttheoperationifnotrunningtransactions*/
if(IsBootstrapProcessingMode())
elog(ERROR,"cannotUPDATEduringbootstrap");

ExecMaterializeSlot(slot);

/*getinformationonthe(current)resultrelation*/
resultRelInfo=estate->es_result_relation_info;
resultRelationDesc=resultRelInfo->ri_RelationDesc;

/*BEFOREROWUPDATETriggers*/
if(resultRelInfo->ri_TrigDesc&&resultRelInfo->ri_TrigDesc->trig_update_before_row)
{
if(!ExecBRUpdateTriggers(estate,epqstate,resultRelInfo,tupleid,oldtuple,slot))
returnNULL;/*"donothing"*/
}

/*INSTEADOFROWUPDATETriggers*/
if(resultRelInfo->ri_TrigDesc&&resultRelInfo->ri_TrigDesc->trig_update_instead_row)
{
if(!ExecIRUpdateTriggers(estate,resultRelInfo,oldtuple,slot))
returnNULL;/*"donothing"*/
}
elseif(resultRelInfo->ri_FdwRoutine)
{
/*Computestoredgeneratedcolumns*/
if(resultRelationDesc->rd_att->constr&&resultRelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(estate,slot,CMD_UPDATE);

/*updateinforeigntable:lettheFDWdoit*/
slot=resultRelInfo->ri_FdwRoutine->ExecForeignUpdate(estate,resultRelInfo,slot,planSlot);

if(slot==NULL)/*"donothing"*/
returnNULL;

/*AFTERROWTriggersorRETURNINGexpressionsmightreferencethe
*tableoidcolumn,so(re-)initializetts_tableOidbeforeevaluatingthem.*/
slot->tts_tableOid=RelationGetRelid(resultRelationDesc);
}
else
{
LockTupleModelockmode;
boolpartition_constraint_failed;
boolupdate_indexes;

/*Constraintsmightreferencethetableoidcolumn,so(re-)initialize
*tts_tableOidbeforeevaluatingthem.*/
slot->tts_tableOid=RelationGetRelid(resultRelationDesc);

/*Computestoredgeneratedcolumns*/
if(resultRelationDesc->rd_att->constr&&resultRelationDesc->rd_att->constr->has_generated_stored)
ExecComputeStoredGenerated(estate,slot,CMD_UPDATE);

/*
*CheckanyRLSUPDATEWITHCHECKpolicies
*
*IfwegenerateanewcandidatetupleafterEvalPlanQualtesting,we
*mustloopbackhereandrecheckanyRLSpoliciesandconstraints.
*(Wedon'tneedtoredotriggers,however.IfthereareanyBEFORE
*triggersthentrigger.cwillhavedonetable_tuple_locktolockthe
*correcttuple,sothere'snoneedtodothemagain.)*/
lreplace:;

/*ensureslotisindependent,considere.g.EPQ*/
ExecMaterializeSlot(slot);

/*Ifpartitionconstraintfails,thisrowmightgetmovedtoanother
*partition,inwhichcaseweshouldchecktheRLSCHECKpolicyjust
*beforeinsertingintothenewpartition,ratherthandoingithere.
*Thisisbecauseatriggeronthatpartitionmightagainchangethe
*row.SoskiptheWCOchecksifthepartitionconstraintfails.*/
partition_constraint_failed=resultRelInfo->ri_PartitionCheck&&!ExecPartitionCheck(resultRelInfo,slot,estate,false);

if(!partition_constraint_failed&&resultRelInfo->ri_WithCheckOptions!=NIL)
{
/*ExecWithCheckOptions()willskipanyWCOswhicharenotofthekindwearelookingforatthispoint.*/
ExecWithCheckOptions(WCO_RLS_UPDATE_CHECK,resultRelInfo,slot,estate);
}

/*Ifapartitioncheckfailed,trytomovetherowintotherightpartition.*/
if(partition_constraint_failed)
{
booltuple_deleted;
TupleTableSlot*ret_slot;
TupleTableSlot*orig_slot=slot;
TupleTableSlot*epqslot=NULL;
PartitionTupleRouting*proute=mtstate->mt_partition_tuple_routing;
intmap_index;
TupleConversionMap*tupconv_map;

/*DisallowanINSERTONCONFLICTDOUPDATEthatcausesthe
*originalrowtomigratetoadifferentpartition.Maybethis
*canbeimplementedsomeday,butitseemsafringefeaturewith
*littleredeemingvalue.*/
if(((ModifyTable*)mtstate->ps.plan)->onConflictAction==ONCONFLICT_UPDATE)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("invalidONUPDATEspecification"),
errdetail("Theresulttuplewouldappearinadifferentpartitionthantheoriginaltuple.")));

/*WhenanUPDATEisrunonaleafpartition,wewillnothave
*partitiontupleroutingsetup.Inthatcase,failwith
*partitionconstraintviolationerror.*/
if(proute==NULL)
ExecPartitionCheckEmitError(resultRelInfo,slot,estate);

/*Rowmovement,part1.Deletethetuple,butskipRETURNING
*processing.WewanttoreturnrowsfromINSERT.*/
ExecDelete(mtstate,tupleid,oldtuple,planSlot,epqstate,estate,false,false/*canSetTag*/,true/*changingPart*/,&tuple_deleted,&epqslot);

/*ForsomereasonifDELETEdidn'thappen(e.g.triggerprevented
*it,oritwasalreadydeletedbyself,oritwasconcurrently
*deletedbyanothertransaction),thenweshouldskiptheinsert
*aswell;otherwise,anUPDATEcouldcauseanincreaseinthe
*totalnumberofrowsacrossallpartitions,whichisclearlywrong.
*
*ForanormalUPDATE,thecasewherethetuplehasbeenthe
*subjectofaconcurrentUPDATEorDELETEwouldbehandledby
*theEvalPlanQualmachinery,butforanUPDATEthatwe've
*translatedintoaDELETEfromthispartitionandanINSERTinto
*someotherpartition,that'snotavailable,becauseCTIDchains
*can'tspan relationboundaries.Wemimicthesemanticstoa
*limitedextentbyskippingtheINSERTiftheDELETEfailsto
*findatuple.Thisensuresthattwoconcurrentattemptsto
*UPDATEthesametupleatthesametimecan'tturnonetuple
*intotwo,andthatanUPDATEofajust-deletedtuplecan'tresurrectit.*/
if(!tuple_deleted)
{
/*
*epqslotwillbetypicallyNULL.ButwhenExecDelete()
*findsthatanothertransactionhasconcurrentlyupdatedthe
*samerow,itre-fetchestherow,skipsthedelete,and
*epqslotissettothere-fetchedtupleslot.Inthatcase,
*weneedtodoallthechecksagain.
*/
if(TupIsNull(epqslot))
returnNULL;
else
{
slot=ExecFilterJunk(resultRelInfo->ri_junkFilter,epqslot);
gotolreplace;
}
}

/*Updatessetthetransitioncapturemaponlywhenanewsubplan
*ischosen.Butforinserts,itissetforeachrow.Soafter
*INSERT,weneedtorevertbacktothemapcreatedforUPDATE;
*otherwisethenextUPDATEwillincorrectlyusetheonecreated
*forINSERT.SofirstsavetheonecreatedforUPDATE.*/
if(mtstate->mt_transition_capture)
saved_tcs_map=mtstate->mt_transition_capture->tcs_map;

/*resultRelInfoisoneoftheper-subplanresultRelInfos.Sowe
*shouldconvertthetupleintoroot'stupledescriptor,since
*ExecInsert()startsthesearchfromroot.Thetupleconversion
*maplistisintheorderofmtstate->resultRelInfo[],soto
*retrievetheoneforthisresultRel,weneedtoknowthe
*positionoftheresultRelinmtstate->resultRelInfo[].*/
map_index=resultRelInfo-mtstate->resultRelInfo;
Assert(map_index>=0&&map_index<mtstate->mt_nplans);
tupconv_map=tupconv_map_for_subplan(mtstate,map_index);
if(tupconv_map!=NULL)
slot=execute_attr_map_slot(tupconv_map->attrMap,slot,mtstate->mt_root_tuple_slot);

/*Preparefortuplerouting,makingitlooklikewe'reinsertingintotheroot.*/
Assert(mtstate->rootResultRelInfo!=NULL);
slot=ExecPrepareTupleRouting(mtstate,estate,proute,mtstate->rootResultRelInfo,slot);

ret_slot=ExecInsert(mtstate,slot,planSlot,
orig_slot,resultRelInfo,
estate,canSetTag);

/*RevertExecPrepareTupleRouting'snodechange.*/
estate->es_result_relation_info=resultRelInfo;
if(mtstate->mt_transition_capture)
{
mtstate->mt_transition_capture->tcs_original_insert_tuple=NULL;
mtstate->mt_transition_capture->tcs_map=saved_tcs_map;
}

returnret_slot;
}

/*Checktheconstraintsofthetuple.We'vealreadycheckedthe
*partitionconstraintabove;however,wemuststillensurethetuple
*passesallotherconstraints,sowewillcallExecConstraints()and
*haveitvalidateallremainingchecks.*/
if(resultRelationDesc->rd_att->constr)
ExecConstraints(resultRelInfo,slot,estate);

/*replacetheheaptuple
*
*Note:ifes_crosscheck_snapshotisn'tInvalidSnapshot,wecheck
*thattherowtobeupdatedisvisibletothatsnapshot,andthrowa
*can't-serializeerrorifnot.Thisisaspecial-casebehavior
*neededforreferentialintegrityupdatesintransaction-snapshotmodetransactions.*/
result=table_tuple_update(resultRelationDesc,tupleid,slot,estate->es_output_cid,
estate->es_snapshot,estate->es_crosscheck_snapshot,true/*waitforcommit*/,&tmfd,&lockmode,&update_indexes);

switch(result)
{
caseTM_SelfModified:

/*Thetargettuplewasalreadyupdatedordeletedbythe
*currentcommand,orbyalatercommandinthecurrent
*transaction.TheformercaseispossibleinajoinUPDATE
*wheremultipletuplesjointothesametargettuple.This
*isprettyquestionable,butPostgreshasalwaysallowedit:
*wejustexecutethefirstupdateactionandignore
*additionalupdateattempts.
*
*Thelattercasearisesifthetupleismodifiedbya
*commandinaBEFOREtrigger,orperhapsbyacommandina
*volatilefunctionusedinthequery.Insuchsituationswe
*shouldnotignoretheupdate,butitisequallyunsafeto
*proceed.Wedon'twanttodiscardtheoriginalUPDATE
*whilekeepingthetriggeredactionsbasedonit;andwe
*havenoprincipledwaytomergethisupdatewiththe
*previousones.Sothrowinganerroristheonlysafe
*course.
*
*Ifatriggeractuallyintendsthistypeofinteraction,it
*canre-executetheUPDATE(assumingitcanfigureouthow)
*andthenreturnNULLtocanceltheouterupdate.*/
if(tmfd.cmax!=estate->es_output_cid)
ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tupletobeupdatedwasalreadymodifiedbyanoperationtriggeredbythecurrentcommand"),
errhint("ConsiderusinganAFTERtriggerinsteadofaBEFOREtriggertopropagatechangestootherrows.")));

/*Else,alreadyupdatedbyself;nothingtodo*/
returnNULL;

caseTM_Ok:
break;

caseTM_Updated:
{
TupleTableSlot*inputslot;
TupleTableSlot*epqslot;

if(IsolationUsesXactSnapshot())
ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("couldnotserializeaccessduetoconcurrentupdate")));

/*Alreadyknowthatwe'regoingtoneedtodoEPQ,sofetchtupledirectlyintotherightslot.*/
inputslot=EvalPlanQualSlot(epqstate,resultRelationDesc,resultRelInfo->ri_RangeTableIndex);

result=table_tuple_lock(resultRelationDesc,tupleid,estate->es_snapshot,inputslot,estate->es_output_cid,lockmode,LockWaitBlock,TUPLE_LOCK_FLAG_FIND_LAST_VERSION,&tmfd);

switch(result)
{
caseTM_Ok:
Assert(tmfd.traversed);
epqslot=EvalPlanQual(epqstate,resultRelationDesc,resultRelInfo->ri_RangeTableIndex,inputslot);
if(TupIsNull(epqslot))
/*Tuplenotpassingqualsanymore,exiting...*/
returnNULL;

slot=ExecFilterJunk(resultRelInfo->ri_junkFilter,epqslot);
gotolreplace;

caseTM_Deleted:
/*tuplealreadydeleted;nothingtodo*/
returnNULL;

caseTM_SelfModified:

/*
*Thiscanbereachedwhenfollowinganupdatechainfromatupleupdatedbyanothersession,
*reachingatuplethatwasalreadyupdatedinthistransaction.Ifpreviouslymodifiedby
*thiscommand,ignoretheredundantupdate,otherwiseerrorout.
*
*SeealsoTM_SelfModifiedresponsetotable_tuple_update()above.*/
if(tmfd.cmax!=estate->es_output_cid)
ereport(ERROR,(errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION),
errmsg("tupletobeupdatedwasalreadymodifiedbyanoperationtriggeredbythecurrentcommand"),errhint("ConsiderusinganAFTERtriggerinsteadofaBEFOREtriggertopropagatechangestootherrows.")));
returnNULL;

default:
/*seetable_tuple_lockcallinExecDelete()*/
elog(ERROR,"unexpectedtable_tuple_lockstatus:%u",result);
returnNULL;
}
}

break;

caseTM_Deleted:
if(IsolationUsesXactSnapshot())
ereport(ERROR,(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),errmsg("couldnotserializeaccessduetoconcurrentdelete")));
/*tuplealreadydeleted;nothingtodo*/
returnNULL;

default:
elog(ERROR,"unrecognizedtable_tuple_updatestatus:%u",
result);
returnNULL;
}

/*insertindexentriesfortupleifnecessary*/
if(resultRelInfo->ri_NumIndices>0&&update_indexes)
recheckIndexes=ExecInsertIndexTuples(slot,estate,false,NULL,NIL);
}

if(canSetTag)
(estate->es_processed)++;

/*AFTERROWUPDATETriggers*/
ExecARUpdateTriggers(estate,resultRelInfo,tupleid,oldtuple,slot,recheckIndexes,mtstate->operation==CMD_INSERTmtstate->mt_oc_transition_capture:mtstate->mt_transition_capture);

list_free(recheckIndexes);

/*CheckanyWITHCHECKOPTIONconstraintsfromparentviews.Weare
*requiredtodothisaftertestingallconstraintsanduniqueness
*violationspertheSQLspec,sowedoitafteractuallyupdatingthe
*recordintheheapandallindexes.
*
*ExecWithCheckOptions()willskipanyWCOswhicharenotofthekindwe
*arelookingforatthispoint.*/
if(resultRelInfo->ri_WithCheckOptions!=NIL)
ExecWithCheckOptions(WCO_VIEW_CHECK,resultRelInfo,slot,estate);

if(resultRelInfo->ri_projectReturning)/*ProcessRETURNINGifpresent*/
returnExecProcessReturning(resultRelInfo->ri_projectReturning,RelationGetRelid(resultRelationDesc),slot,planSlot);

returnNULL;
}

再往下就是涉及到存储引擎的部分了,我们重点看一下其对外的接口输入参数。重点是这4个参数:

  • relation - table to be modified (caller must hold suitable lock) (要更新的那个表)

  • otid - TID of old tuple to be replaced (要更新的元组ID,对应的是老的元组,更新后相当于是插入一条新元组,老元组的tid值要更新为新的tid值)

  • slot - newly constructed tuple data to store (新元组的值)

  • cid - update command ID (used for visibility test, and stored into cmax/cmin if successful) (cid值,事务相关) 执行器层面的更新算子是建立在存储引擎提供的底层table_tuple_update接口之上的。是我们编写ExecUpdate以及ExecModifyTable的基础。

/*
*Updateatuple.

*Inputparameters:
*relation-tabletobemodified(callermustholdsuitablelock)
*otid-TIDofoldtupletobereplaced
*slot-newlyconstructedtupledatatostore
*cid-updatecommandID(usedforvisibilitytest,andstoredintocmax/cminifsuccessful)
*crosscheck-ifnotInvalidSnapshot,alsocheckoldtupleagainstthis
*wait-trueifshouldwaitforanyconflictingupdatetocommit/abort

*Outputparameters:
*tmfd-filledinfailurecases(seebelow)
*lockmode-filledwithlockmodeacquiredontuple
*update_indexes-insuccesscasesthisissettotrueifnewindexentriesarerequiredforthistuple
*
*Normal,successfulreturnvalueisTM_Ok,whichmeanswedidactuallyupdateit.*/
staticinlineTM_Result
table_tuple_update(Relationrel,ItemPointerotid,TupleTableSlot*slot,CommandIdcid,
Snapshotsnapshot,Snapshotcrosscheck,boolwait,TM_FailureData*tmfd,LockTupleMode*lockmode,bool*update_indexes)
{
returnrel->rd_tableam->tuple_update(rel,otid,slot,cid,
snapshot,crosscheck,wait,tmfd,lockmode,update_indexes);
}

事务

这一块主要是要理解PG中update语句并不是原地更新元组,而是插入一条新元组。因为PG实现MVCC与Mysql,Oracle的实现方式有所不同,并不是通过undo日志实现的,相当于把undo日志记录到了原有的表中,并不是单独存放在一个地方。具体的不再细述,内容太多了,以后再分析事务部分。

好了,内容很多,分析源码的时候,涉及到的知识点以及逻辑是非常多的,我们最好每次分析只抓一个主干,不然每个都分析,最后就会比较乱。就先分析到这里吧。

总结

到此这篇关于Postgres中UPDATE更新语句源码分析的文章就介绍到这了,更多相关Postgres中UPDATE源码内容请搜索源码搜藏网以前的文章或继续浏览下面的相关文章希望大家以后多多支持源码搜藏网!


数据库阅读排行

最新文章