博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Mongodb源码分析--更新记录
阅读量:4319 次
发布时间:2019-06-06

本文共 12277 字,大约阅读时间需要 40 分钟。

    在之前的 中,介绍了assembleResponse函数(位于instance.cpp第224行),它会根据op操作枚举类型来调用相应的crud操作,枚举类型定义如下:
     
enum
 Operations {
        opReply 
=
 
1
,     
/*
 reply. responseTo is set. 
*/
        dbMsg 
=
 
1000
,    
/*
 generic msg command followed by a string 
*/
        dbUpdate 
=
 
2001
/*
 更新对象 
*/
        dbInsert 
=
 
2002
,
        
//
dbGetByOID = 2003,
        dbQuery 
=
 
2004
,
        dbGetMore 
=
 
2005
,
        dbDelete 
=
 
2006
,
        dbKillCursors 
=
 
2007
    };
    可以看到dbUpdate = 2001 为更新操作枚举值,下面我们看一下assembleResponse在确定是更新操作时调用的方法,如下:
 
   
//
instance.cpp文件第224行
    assembleResponse( Message 
&
m, DbResponse 
&
dbresponse, 
const
 SockAddr 
&
client ) {
    .....
            
try
 {
                
if
 ( op 
==
 dbInsert ) {  
//
添加记录操作
                    receivedInsert(m, currentOp);
                }
                
else
 
if
 ( op 
==
 dbUpdate ) { 
//
更新记录
                    receivedUpdate(m, currentOp);
                }
                
else
 
if
 ( op 
==
 dbDelete ) { 
//
删除记录
                    receivedDelete(m, currentOp);
                }
                
else
 
if
 ( op 
==
 dbKillCursors ) { 
//
删除Cursors(游标)对象
                    currentOp.ensureStarted();
                    logThreshold 
=
 
10
;
                    ss 
<<
 
"
killcursors 
"
;
                    receivedKillCursors(m);
                }
                
else
 {
                    mongo::log() 
<<
 
"
    operation isn't supported: 
"
 
<<
 op 
<<
 endl;
                    currentOp.done();
                    log 
=
 
true
;
                }
            }
          .....
        }
    }
    从上面代码可以看出,系统在确定dbUpdate操作时,调用了receivedUpdate()方法(位于instance.cpp文件第570行),下面是该方法的定义:

 

void
 receivedUpdate(Message
&
 m, CurOp
&
 op) {       
        DbMessage d(m);
//
初始化数据库格式的消息
        
const
 
char
 
*
ns 
=
 d.getns();
//
获取名空间,用于接下来insert数据
        assert(
*
ns);
        
//
因为CUD操作在主库中操作,所以这里断言名空间包含的db信息中是不是主库,即"master"
        uassert( 
10054
 ,  
"
not master
"
, isMasterNs( ns ) );
        op.debug().str 
<<
 ns 
<<
 
'
 
'
;
        
//
获取标志位信息(标识更新一条或多条等)关于消息结构体。有关消息结构参见我的这篇文章:
        
//
http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
        
int
 flags 
=
 d.pullInt();        
        
//
获取"更新消息"结构体中的selector(也就是要更新的数据条件,相关于where)
        BSONObj query 
=
 d.nextJsObj();
        assert( d.moreJSObjs() );
        assert( query.objsize() 
<
 m.header()
->
dataLen() );
        BSONObj toupdate 
=
 d.nextJsObj();
//
要更新的记录
        uassert( 
10055
 , 
"
update object too large
"
, toupdate.objsize() 
<=
 BSONObjMaxUserSize);
        assert( toupdate.objsize() 
<
 m.header()
->
dataLen() );
        assert( query.objsize() 
+
 toupdate.objsize() 
<
 m.header()
->
dataLen() );
        
//
标识是否为upsert方式,即:如果存在就更新,如果不存在就插入
        
bool
 upsert 
=
 flags 
&
 UpdateOption_Upsert;
        
//
是否更新所有满足条件(where)的记录
        
bool
 multi 
=
 flags 
&
 UpdateOption_Multi;
        
//
是否更新所有节点(sharding状态)
        
bool
 broadcast 
=
 flags 
&
 UpdateOption_Broadcast;
        {
            
string
 s 
=
 query.toString();
            
/*
 todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down.
               instead, let's just story the query BSON in the debug object, and it can toString()
               lazily
            
*/
            op.debug().str 
<<
 
"
 query: 
"
 
<<
 s;
            op.setQuery(query);
        }
        writelock lk;
        
//
 如果不更新所有节点(sharding)且当前物理结点是shard 状态时
        
if
 ( 
!
 broadcast 
&&
 handlePossibleShardedMessage( m , 
0
 ) )
            
return
;
        
//
if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit
        Client::Context ctx( ns );
        UpdateResult res 
=
 updateObjects(ns, toupdate, query, upsert, multi, 
true
, op.debug() );
//
更新对象
        lastError.getSafe()
->
recordUpdate( res.existing , res.num , res.upserted ); 
//
 for getlasterror
    }
     上面的方法中,主要是对消息进行折包解析,找出要更新的数据记录及相应查询条件,以及更新方式(即upsert),然后再在“写锁”环境下执行更新数据操作。
   
     最终上面代码会调用 updateObjects()方法,该方法定义如下:

 

   
//
update.cpp 文件第1279行
   UpdateResult updateObjects(
const
 
char
 
*
ns, 
const
 BSONObj
&
 updateobj, BSONObj patternOrig, 
bool
 upsert, 
bool
 multi, 
bool
 logop , OpDebug
&
 debug ) {
        
//
断言记录的ns是否在"保留的$集合"中
        uassert( 
10155
 , 
"
cannot update reserved $ collection
"
, strchr(ns, 
'
$
'
==
 
0
 );
        
if
 ( strstr(ns, 
"
.system.
"
) ) {
         
/*
 dm: it's very important that system.indexes is never updated as IndexDetails has pointers into it 
*/
            uassert( 
10156
 , str::stream() 
<<
 
"
cannot update system collection: 
"
 
<<
 ns 
<<
 
"
 q: 
"
 
<<
 patternOrig 
<<
 
"
 u: 
"
 
<<
 updateobj , legalClientSystemNS( ns , 
true
 ) );
        }
        
return
 _updateObjects(
false
, ns, updateobj, patternOrig, upsert, multi, logop, debug);
    }

    上面方法对要更新的ns进行判断,以避免因更新保留的集合而对系统结构造成损坏,如果一切正常,则调用 _updateObjects方法,如下:

    
//
update.cpp 文件第1027行
    UpdateResult _updateObjects(
bool
 god, 
const
 
char
 
*
ns, 
const
 BSONObj
&
 updateobj, BSONObj patternOrig, 
bool
 upsert, 
bool
 multi, 
bool
 logop , OpDebug
&
 debug, RemoveSaver
*
 rs ) {
        DEBUGUPDATE( 
"
update: 
"
 
<<
 ns 
<<
 
"
 update: 
"
 
<<
 updateobj 
<<
 
"
 query: 
"
 
<<
 patternOrig 
<<
 
"
 upsert: 
"
 
<<
 upsert 
<<
 
"
 multi: 
"
 
<<
 multi );
        Client
&
 client 
=
 cc();
        
int
 profile 
=
 client.database()
->
profile;
        StringBuilder
&
 ss 
=
 debug.str;
        
if
 ( logLevel 
>
 
2
 )
            ss 
<<
 
"
 update: 
"
 
<<
 updateobj.toString();
        
/*
 idea with these here it to make them loop invariant for multi updates, and thus be a bit faster for that case 
*/
        
/*
 NOTE: when yield() is added herein, these must be refreshed after each call to yield! 
*/
        NamespaceDetails 
*
=
 nsdetails(ns); 
//
 can be null if an upsert...
        NamespaceDetailsTransient 
*
nsdt 
=
 
&
NamespaceDetailsTransient::get_w(ns);
        
/*
 end note 
*/
        auto_ptr
<
ModSet
>
 mods;
//
定义存储修改信息操作(如$inc, $set, $push,)的集合实例
        
bool
 isOperatorUpdate 
=
 updateobj.firstElement().fieldName()[
0
==
 
'
$
'
;
        
int
 modsIsIndexed 
=
 
false
//
 really the # of indexes
        
if
 ( isOperatorUpdate ) {
            
if
( d 
&&
 d
->
indexBuildInProgress ) {
//
如果正在构建索引
                
set
<
string
>
 bgKeys;
              d
->
inProgIdx().keyPattern().getFieldNames(bgKeys);
//
获取当前对象的所有字段(field)信息
              mods.reset( 
new
 ModSet(updateobj, nsdt
->
indexKeys(), 
&
bgKeys));
//
为mods绑定操作信息
            }
            
else
 {
                mods.reset( 
new
 ModSet(updateobj, nsdt
->
indexKeys()) );
//
为mods绑定操作信息;
            }
            modsIsIndexed 
=
 mods
->
isIndexed();
        }
        
//
upsert:如果存在就更新,如果不存在就插入
        
if
!
upsert 
&&
 
!
multi 
&&
 isSimpleIdQuery(patternOrig) 
&&
 d 
&&
 
!
modsIsIndexed ) {
            
int
 idxNo 
=
 d
->
findIdIndex();
            
if
( idxNo 
>=
 
0
 ) {
                ss 
<<
 
"
 byid 
"
;
                
//
根据id更新记录信息
                
return
 _updateById(isOperatorUpdate, idxNo, mods.
get
(), profile, d, nsdt, god, ns, updateobj, patternOrig, logop, debug);
            }
        }
        
set
<
DiskLoc
>
 seenObjects;
        
int
 numModded 
=
 
0
;
        
long
 
long
 nscanned 
=
 
0
;
        MatchDetails details;
        
//
构造“更新操作”实例对象并用其构造游标操作(符)实例
        shared_ptr
<
 MultiCursor::CursorOp 
>
 opPtr( 
new
 UpdateOp( mods.
get
() 
&&
 mods
->
hasDynamicArray() ) );
        
//
构造MultiCursor查询游标(参见其构造方法中的 nextClause()语句)
        shared_ptr
<
 MultiCursor 
>
 c( 
new
 MultiCursor( ns, patternOrig, BSONObj(), opPtr, 
true
 ) );
        auto_ptr
<
ClientCursor
>
 cc;
        
while
 ( c
->
ok() ) {
//
遍历(下面的c->advance()调用)游标指向的记录信息
            nscanned
++
;
            
bool
 atomic 
=
 c
->
matcher()
->
docMatcher().atomic();
            
//
 并将其与更新操作中的条件进行匹配
            
if
 ( 
!
 c
->
matcher()
->
matches( c
->
currKey(), c
->
currLoc(), 
&
details ) ) {
                c
->
advance();
//
将游标跳转到下一条记录
                
if
 ( nscanned 
%
 
256
 
==
 
0
 
&&
 
!
 atomic ) {
                    
if
 ( cc.
get
() 
==
 
0
 ) {
                        shared_ptr
<
 Cursor 
>
 cPtr 
=
 c;
                        cc.reset( 
new
 ClientCursor( QueryOption_NoCursorTimeout , cPtr , ns ) );
                    }
                    
if
 ( 
!
 cc
->
yield
() ) {
                        cc.release();
                        
//
 TODO should we assert or something?
                        
break
;
                    }
                    
if
 ( 
!
c
->
ok() ) {
                        
break
;
                    }
                }
                
continue
;
            }
            Record 
*
=
 c
->
_current();
//
游标当前所指向的记录
            DiskLoc loc 
=
 c
->
currLoc();
//
游标当前所指向的记录所在地址
            
//
 TODO Maybe this is unnecessary since we have seenObjects
            
if
 ( c
->
getsetdup( loc ) ) {
//
判断当前记录是否是重复
                c
->
advance();
                
continue
;
            }
            BSONObj js(r);
            BSONObj pattern 
=
 patternOrig;
            
if
 ( logop ) {
//
记录日志
                BSONObjBuilder idPattern;
                BSONElement id;
                
//
 NOTE: If the matching object lacks an id, we'll log
                
//
 with the original pattern.  This isn't replay-safe.
                
//
 It might make sense to suppress the log instead
                
//
 if there's no id.
                
if
 ( js.getObjectID( id ) ) {
                    idPattern.append( id );
                    pattern 
=
 idPattern.obj();
                }
                
else
 {
                    uassert( 
10157
 ,  
"
multi-update requires all modified objects to have an _id
"
 , 
!
 multi );
                }
            }
            
if
 ( profile )
                ss 
<<
 
"
 nscanned:
"
 
<<
 nscanned;
            ......   
            uassert( 
10158
 ,  
"
multi update only works with $ operators
"
 , 
!
 multi );
       
//
查看更新记录操作的时间戳,本人猜测这么做可能因为mongodb会采用最后更新时间戳解决分布式系统
       
//
一致性的问题,
也就是通常使用的Last write wins准则,有关信息可参见这篇文章:
       
//
http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer
            BSONElementManipulator::lookForTimestamps( updateobj );
            checkNoMods( updateobj );
            
//
更新记录
            theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);
            
if
 ( logop ) {
//
记录日志操作
                DEV 
if
( god ) log() 
<<
 
"
REALLY??
"
 
<<
 endl; 
//
 god doesn't get logged, this would be bad.
                logOp(
"
u
"
, ns, updateobj, 
&
pattern );
            }
            
return
 UpdateResult( 
1
 , 
0
 , 
1
 );
//
返回操作结果
        }
        
if
 ( numModded )
            
return
 UpdateResult( 
1
 , 
1
 , numModded );
        ......
        
return
 UpdateResult( 
0
 , 
0
 , 
0
 );
    }

     上面的代码主要执行构造更新消息中的查询条件(selector)游标,并将“游标指向”的记录遍历出来与查询条件进行匹配,如果匹配命中,则进行更新。(有关游标的构造和继承实现体系,mongodb做的有些复杂,很难一句说清,我会在本系列后面另用篇幅进行说明)
    
    注意上面代码段中的这行代码:
 

   theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);

    该方法会执行最终更新操作,其定义如下:

   

//
pdfile.cpp 文件934行
      
const
 DiskLoc DataFileMgr::updateRecord(
        
const
 
char
 
*
ns,
        NamespaceDetails 
*
d,
        NamespaceDetailsTransient 
*
nsdt,
        Record 
*
toupdate, 
const
 DiskLoc
&
 dl,
        
const
 
char
 
*
_buf, 
int
 _len, OpDebug
&
 debug,  
bool
 god) {
        StringBuilder
&
 ss 
=
 debug.str;
        dassert( toupdate 
==
 dl.rec() );
        BSONObj objOld(toupdate);
        BSONObj objNew(_buf);
        DEV assert( objNew.objsize() 
==
 _len );
        DEV assert( objNew.objdata() 
==
 _buf );
        
//
如果_buf中不包含_id,但要更新的记录(toupdate)有_id
        
if
!
objNew.hasElement(
"
_id
"
&&
 objOld.hasElement(
"
_id
"
) ) {
            
/*
 add back the old _id value if the update removes it.  Note this implementation is slow
               (copies entire object multiple times), but this shouldn't happen often, so going for simple
               code, not speed.
            
*/
            BSONObjBuilder b;
            BSONElement e;
            assert( objOld.getObjectID(e) );
//
获取对象objOld的ID并绑定到e
            b.append(e); 
//
 为了最好的性能,先放入_id
            b.appendElements(objNew);
            objNew 
=
 b.obj();
        }
        
/*
重复key检查
*/
        vector
<
IndexChanges
>
 changes;
        
bool
 changedId 
=
 
false
;
        
//
获取要修改的索引信息(包括要移除和添加的index key,并将结果返回给changes)
        getIndexChanges(changes, 
*
d, objNew, objOld, changedId);
        
//
断言是否要修改_id索引
        uassert( 
13596
 , str::stream() 
<<
 
"
cannot change _id of a document old:
"
 
<<
 objOld 
<<
 
"
 new:
"
 
<<
 objNew , 
!
 changedId );
        dupCheck(changes, 
*
d, dl);
//
重复key检查,如果重复则通过断言终止当前程序
        
//
如果要更新的记录比最终要插入的记录尺寸小
        
if
 ( toupdate
->
netLength() 
<
 objNew.objsize() ) {
            
//
 如不合适,则重新分配
            uassert( 
10003
 , 
"
failing update: objects in a capped ns cannot grow
"
!
(d 
&&
 d
->
capped));
            d
->
paddingTooSmall();
            
if
 ( cc().database()
->
profile )
                ss 
<<
 
"
 moved 
"
;
          
//
删除指定的记录(record),删除操作详见我的这篇文章:
          
//
http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html
            deleteRecord(ns, toupdate, dl);
            
//
插入新的BSONObj信息,插入操作详见我的这篇文章:
            
//
http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html
            
return
 insert(ns, objNew.objdata(), objNew.objsize(), god);
        }
        nsdt
->
notifyOfWriteOp();
        d
->
paddingFits();
        
/*
 如果有要修改的索引 
*/
        {
            unsigned keyUpdates 
=
 
0
;
            
int
 z 
=
 d
->
nIndexesBeingBuilt();
//
获取索引(包括正在构建)数
            
for
 ( 
int
 x 
=
 
0
; x 
<
 z; x
++
 ) {
                IndexDetails
&
 idx 
=
 d
->
idx(x);
                
//
遍历当前更新记录要修改(移除)的索引键信息
                
for
 ( unsigned i 
=
 
0
; i 
<
 changes[x].removed.size(); i
++
 ) {
                    
try
 {
                        
//
移除当前记录在索引b树中相应信息(索引键)
                        idx.head.btree()
->
unindex(idx.head, idx, 
*
changes[x].removed[i], dl);
                    }
                    
catch
 (AssertionException
&
) {
                        ss 
<<
 
"
 exception update unindex 
"
;
                        problem() 
<<
 
"
 caught assertion update unindex 
"
 
<<
 idx.indexNamespace() 
<<
 endl;
                    }
                }
                assert( 
!
dl.isNull() );
                
//
获取指定名称(key)下的子对象
                BSONObj idxKey 
=
 idx.info.obj().getObjectField(
"
key
"
);
                Ordering ordering 
=
 Ordering::make(idxKey);
//
生成排序方式
                keyUpdates 
+=
 changes[x].added.size();
                
                
//
遍历当前更新记录要修改(插入)的索引键信息
                
for
 ( unsigned i 
=
 
0
; i 
<
 changes[x].added.size(); i
++
 ) {
                    
try
 {
                        
//
之前做了dupCheck()操作,所以这里不用担心重复key的问题
                        
//
在b树中添加索引键信息,有关该方法的定义参见我的这篇文章
                        
//
http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html
                        idx.head.btree()
->
bt_insert(
                            idx.head,
                            dl, 
*
changes[x].added[i], ordering, 
/*
dupsAllowed
*/
true
, idx);
                    }
                    
catch
 (AssertionException
&
 e) {
                        ss 
<<
 
"
 exception update index 
"
;
                        problem() 
<<
 
"
 caught assertion update index 
"
 
<<
 idx.indexNamespace() 
<<
 
"
 
"
 
<<
 e 
<<
 endl;
                    }
                }
            }
            
if
( keyUpdates 
&&
 cc().database()
->
profile )
                ss 
<<
 
'
\n
'
 
<<
 keyUpdates 
<<
 
"
 key updates 
"
;
        }
        
//
  update in place
        
int
 sz 
=
 objNew.objsize();
        
//
将新修改的记录信息复制到旧记录(toupdate)所在位置
        memcpy(getDur().writingPtr(toupdate
->
data, sz), objNew.objdata(), sz);
        
return
 dl;
    }

 

    上面代码段主要先对B树索引进行修改(这里采用先移除再重建方式),之后直接更新旧记录在内存中的数据,最终完成了记录的更新操作。
    最后,用一张时序图回顾一下更新记录时mongodb服务端代码的执行流程:
   
    好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍Mongodb的游标(cursor)设计体系和实现方式。

 

 

    参考链接:
    http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html
    http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html
    http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html
    http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer

 

    原文链接:

    作者: daizhj, 代震军   
    微博: http://t.sina.com.cn/daizhj
    Tags: mongodb,c++,source code

 

转载于:https://www.cnblogs.com/daizhj/archive/2011/04/11/mongodb_update_recode_source_code.html

你可能感兴趣的文章
转自 zera php中extends和implements的区别
查看>>
Array.of使用实例
查看>>
【Luogu】P2498拯救小云公主(spfa)
查看>>
如何获取网站icon
查看>>
几种排序写法
查看>>
java 多线程的应用场景
查看>>
dell support
查看>>
转:Maven项目编译后classes文件中没有dao的xml文件以及没有resources中的配置文件的问题解决...
查看>>
MTK android 设置里 "关于手机" 信息参数修改
查看>>
单变量微积分笔记6——线性近似和二阶近似
查看>>
补几天前的读书笔记
查看>>
HDU 1829/POJ 2492 A Bug's Life
查看>>
CKplayer:视频推荐和分享插件设置
查看>>
CentOS系统将UTC时间修改为CST时间
查看>>
redis常见面试题
查看>>
导航控制器的出栈
查看>>
玩转CSS3,嗨翻WEB前端,CSS3伪类元素详解/深入浅出[原创][5+3时代]
查看>>
iOS 9音频应用播放音频之播放控制暂停停止前进后退的设置
查看>>
Delphi消息小记
查看>>
HNOI2016
查看>>