在之前的 中,介绍了assembleResponse函数(位于instance.cpp第224行),它会根据op操作枚举类型来调用相应的crud操作,枚举类型定义如下: 可以看到dbUpdate = 2001 为更新操作枚举值,下面我们看一下assembleResponse在确定是更新操作时调用的方法,如下: 从上面代码可以看出,系统在确定dbUpdate操作时,调用了receivedUpdate()方法(位于instance.cpp文件第570行),下面是该方法的定义:
enum Operations { opReply = 1 , /* reply. responseTo is set. */ dbMsg = 1000 , /* generic msg command followed by a string */ dbUpdate = 2001 , /* 更新对象 */ dbInsert = 2002 , // dbGetByOID = 2003, dbQuery = 2004 , dbGetMore = 2005 , dbDelete = 2006 , dbKillCursors = 2007 };
// instance.cpp文件第224行 assembleResponse( Message & m, DbResponse & dbresponse, const SockAddr & client ) { ..... try { if ( op == dbInsert ) { // 添加记录操作 receivedInsert(m, currentOp); } else if ( op == dbUpdate ) { // 更新记录 receivedUpdate(m, currentOp); } else if ( op == dbDelete ) { // 删除记录 receivedDelete(m, currentOp); } else if ( op == dbKillCursors ) { // 删除Cursors(游标)对象 currentOp.ensureStarted(); logThreshold = 10 ; ss << " killcursors " ; receivedKillCursors(m); } else { mongo::log() << " operation isn't supported: " << op << endl; currentOp.done(); log = true ; } } ..... } }
void receivedUpdate(Message & m, CurOp & op) { DbMessage d(m); // 初始化数据库格式的消息 const char * ns = d.getns(); // 获取名空间,用于接下来insert数据 assert( * ns); // 因为CUD操作在主库中操作,所以这里断言名空间包含的db信息中是不是主库,即"master" uassert( 10054 , " not master " , isMasterNs( ns ) ); op.debug().str << ns << ' ' ; // 获取标志位信息(标识更新一条或多条等)关于消息结构体。有关消息结构参见我的这篇文章: // http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html int flags = d.pullInt(); // 获取"更新消息"结构体中的selector(也就是要更新的数据条件,相关于where) BSONObj query = d.nextJsObj(); assert( d.moreJSObjs() ); assert( query.objsize() < m.header() -> dataLen() ); BSONObj toupdate = d.nextJsObj(); // 要更新的记录 uassert( 10055 , " update object too large " , toupdate.objsize() <= BSONObjMaxUserSize); assert( toupdate.objsize() < m.header() -> dataLen() ); assert( query.objsize() + toupdate.objsize() < m.header() -> dataLen() ); // 标识是否为upsert方式,即:如果存在就更新,如果不存在就插入 bool upsert = flags & UpdateOption_Upsert; // 是否更新所有满足条件(where)的记录 bool multi = flags & UpdateOption_Multi; // 是否更新所有节点(sharding状态) bool broadcast = flags & UpdateOption_Broadcast; { string s = query.toString(); /* todo: we shouldn't do all this ss stuff when we don't need it, it will slow us down. instead, let's just story the query BSON in the debug object, and it can toString() lazily */ op.debug().str << " query: " << s; op.setQuery(query); } writelock lk; // 如果不更新所有节点(sharding)且当前物理结点是shard 状态时 if ( ! broadcast && handlePossibleShardedMessage( m , 0 ) ) return ; // if this ever moves to outside of lock, need to adjust check Client::Context::_finishInit Client::Context ctx( ns ); UpdateResult res = updateObjects(ns, toupdate, query, upsert, multi, true , op.debug() ); // 更新对象 lastError.getSafe() -> recordUpdate( res.existing , res.num , res.upserted ); // for getlasterror }
// update.cpp 文件第1279行 UpdateResult updateObjects( const char * ns, const BSONObj & updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug & debug ) { // 断言记录的ns是否在"保留的$集合"中 uassert( 10155 , " cannot update reserved $ collection " , strchr(ns, ' $ ' ) == 0 ); if ( strstr(ns, " .system. " ) ) { /* dm: it's very important that system.indexes is never updated as IndexDetails has pointers into it */ uassert( 10156 , str::stream() << " cannot update system collection: " << ns << " q: " << patternOrig << " u: " << updateobj , legalClientSystemNS( ns , true ) ); } return _updateObjects( false , ns, updateobj, patternOrig, upsert, multi, logop, debug); }
// update.cpp 文件第1027行 UpdateResult _updateObjects( bool god, const char * ns, const BSONObj & updateobj, BSONObj patternOrig, bool upsert, bool multi, bool logop , OpDebug & debug, RemoveSaver * rs ) { DEBUGUPDATE( " update: " << ns << " update: " << updateobj << " query: " << patternOrig << " upsert: " << upsert << " multi: " << multi ); Client & client = cc(); int profile = client.database() -> profile; StringBuilder & ss = debug.str; if ( logLevel > 2 ) ss << " update: " << updateobj.toString(); /* idea with these here it to make them loop invariant for multi updates, and thus be a bit faster for that case */ /* NOTE: when yield() is added herein, these must be refreshed after each call to yield! */ NamespaceDetails * d = nsdetails(ns); // can be null if an upsert... NamespaceDetailsTransient * nsdt = & NamespaceDetailsTransient::get_w(ns); /* end note */ auto_ptr < ModSet > mods; // 定义存储修改信息操作(如$inc, $set, $push,)的集合实例 bool isOperatorUpdate = updateobj.firstElement().fieldName()[ 0 ] == ' $ ' ; int modsIsIndexed = false ; // really the # of indexes if ( isOperatorUpdate ) { if ( d && d -> indexBuildInProgress ) { // 如果正在构建索引 set < string > bgKeys; d -> inProgIdx().keyPattern().getFieldNames(bgKeys); // 获取当前对象的所有字段(field)信息 mods.reset( new ModSet(updateobj, nsdt -> indexKeys(), & bgKeys)); // 为mods绑定操作信息 } else { mods.reset( new ModSet(updateobj, nsdt -> indexKeys()) ); // 为mods绑定操作信息; } modsIsIndexed = mods -> isIndexed(); } // upsert:如果存在就更新,如果不存在就插入 if ( ! upsert && ! multi && isSimpleIdQuery(patternOrig) && d && ! modsIsIndexed ) { int idxNo = d -> findIdIndex(); if ( idxNo >= 0 ) { ss << " byid " ; // 根据id更新记录信息 return _updateById(isOperatorUpdate, idxNo, mods. get (), profile, d, nsdt, god, ns, updateobj, patternOrig, logop, debug); } } set < DiskLoc > seenObjects; int numModded = 0 ; long long nscanned = 0 ; MatchDetails details; // 构造“更新操作”实例对象并用其构造游标操作(符)实例 shared_ptr < MultiCursor::CursorOp > opPtr( new UpdateOp( mods. get () && mods -> hasDynamicArray() ) ); // 构造MultiCursor查询游标(参见其构造方法中的 nextClause()语句) shared_ptr < MultiCursor > c( new MultiCursor( ns, patternOrig, BSONObj(), opPtr, true ) ); auto_ptr < ClientCursor > cc; while ( c -> ok() ) { // 遍历(下面的c->advance()调用)游标指向的记录信息 nscanned ++ ; bool atomic = c -> matcher() -> docMatcher().atomic(); // 并将其与更新操作中的条件进行匹配 if ( ! c -> matcher() -> matches( c -> currKey(), c -> currLoc(), & details ) ) { c -> advance(); // 将游标跳转到下一条记录 if ( nscanned % 256 == 0 && ! atomic ) { if ( cc. get () == 0 ) { shared_ptr < Cursor > cPtr = c; cc.reset( new ClientCursor( QueryOption_NoCursorTimeout , cPtr , ns ) ); } if ( ! cc -> yield () ) { cc.release(); // TODO should we assert or something? break ; } if ( ! c -> ok() ) { break ; } } continue ; } Record * r = c -> _current(); // 游标当前所指向的记录 DiskLoc loc = c -> currLoc(); // 游标当前所指向的记录所在地址 // TODO Maybe this is unnecessary since we have seenObjects if ( c -> getsetdup( loc ) ) { // 判断当前记录是否是重复 c -> advance(); continue ; } BSONObj js(r); BSONObj pattern = patternOrig; if ( logop ) { // 记录日志 BSONObjBuilder idPattern; BSONElement id; // NOTE: If the matching object lacks an id, we'll log // with the original pattern. This isn't replay-safe. // It might make sense to suppress the log instead // if there's no id. if ( js.getObjectID( id ) ) { idPattern.append( id ); pattern = idPattern.obj(); } else { uassert( 10157 , " multi-update requires all modified objects to have an _id " , ! multi ); } } if ( profile ) ss << " nscanned: " << nscanned; ...... uassert( 10158 , " multi update only works with $ operators " , ! multi ); // 查看更新记录操作的时间戳,本人猜测这么做可能因为mongodb会采用最后更新时间戳解决分布式系统 // 一致性的问题, 也就是通常使用的Last write wins准则,有关信息可参见这篇文章: // http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer BSONElementManipulator::lookForTimestamps( updateobj ); checkNoMods( updateobj ); // 更新记录 theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god); if ( logop ) { // 记录日志操作 DEV if ( god ) log() << " REALLY?? " << endl; // god doesn't get logged, this would be bad. logOp( " u " , ns, updateobj, & pattern ); } return UpdateResult( 1 , 0 , 1 ); // 返回操作结果 } if ( numModded ) return UpdateResult( 1 , 1 , numModded ); ...... return UpdateResult( 0 , 0 , 0 ); }
theDataFileMgr.updateRecord(ns, d, nsdt, r, loc , updateobj.objdata(), updateobj.objsize(), debug, god);
该方法会执行最终更新操作,其定义如下:
// pdfile.cpp 文件934行 const DiskLoc DataFileMgr::updateRecord( const char * ns, NamespaceDetails * d, NamespaceDetailsTransient * nsdt, Record * toupdate, const DiskLoc & dl, const char * _buf, int _len, OpDebug & debug, bool god) { StringBuilder & ss = debug.str; dassert( toupdate == dl.rec() ); BSONObj objOld(toupdate); BSONObj objNew(_buf); DEV assert( objNew.objsize() == _len ); DEV assert( objNew.objdata() == _buf ); // 如果_buf中不包含_id,但要更新的记录(toupdate)有_id if ( ! objNew.hasElement( " _id " ) && objOld.hasElement( " _id " ) ) { /* add back the old _id value if the update removes it. Note this implementation is slow (copies entire object multiple times), but this shouldn't happen often, so going for simple code, not speed. */ BSONObjBuilder b; BSONElement e; assert( objOld.getObjectID(e) ); // 获取对象objOld的ID并绑定到e b.append(e); // 为了最好的性能,先放入_id b.appendElements(objNew); objNew = b.obj(); } /* 重复key检查 */ vector < IndexChanges > changes; bool changedId = false ; // 获取要修改的索引信息(包括要移除和添加的index key,并将结果返回给changes) getIndexChanges(changes, * d, objNew, objOld, changedId); // 断言是否要修改_id索引 uassert( 13596 , str::stream() << " cannot change _id of a document old: " << objOld << " new: " << objNew , ! changedId ); dupCheck(changes, * d, dl); // 重复key检查,如果重复则通过断言终止当前程序 // 如果要更新的记录比最终要插入的记录尺寸小 if ( toupdate -> netLength() < objNew.objsize() ) { // 如不合适,则重新分配 uassert( 10003 , " failing update: objects in a capped ns cannot grow " , ! (d && d -> capped)); d -> paddingTooSmall(); if ( cc().database() -> profile ) ss << " moved " ; // 删除指定的记录(record),删除操作详见我的这篇文章: // http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html deleteRecord(ns, toupdate, dl); // 插入新的BSONObj信息,插入操作详见我的这篇文章: // http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html return insert(ns, objNew.objdata(), objNew.objsize(), god); } nsdt -> notifyOfWriteOp(); d -> paddingFits(); /* 如果有要修改的索引 */ { unsigned keyUpdates = 0 ; int z = d -> nIndexesBeingBuilt(); // 获取索引(包括正在构建)数 for ( int x = 0 ; x < z; x ++ ) { IndexDetails & idx = d -> idx(x); // 遍历当前更新记录要修改(移除)的索引键信息 for ( unsigned i = 0 ; i < changes[x].removed.size(); i ++ ) { try { // 移除当前记录在索引b树中相应信息(索引键) idx.head.btree() -> unindex(idx.head, idx, * changes[x].removed[i], dl); } catch (AssertionException & ) { ss << " exception update unindex " ; problem() << " caught assertion update unindex " << idx.indexNamespace() << endl; } } assert( ! dl.isNull() ); // 获取指定名称(key)下的子对象 BSONObj idxKey = idx.info.obj().getObjectField( " key " ); Ordering ordering = Ordering::make(idxKey); // 生成排序方式 keyUpdates += changes[x].added.size(); // 遍历当前更新记录要修改(插入)的索引键信息 for ( unsigned i = 0 ; i < changes[x].added.size(); i ++ ) { try { // 之前做了dupCheck()操作,所以这里不用担心重复key的问题 // 在b树中添加索引键信息,有关该方法的定义参见我的这篇文章 // http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html idx.head.btree() -> bt_insert( idx.head, dl, * changes[x].added[i], ordering, /* dupsAllowed */ true , idx); } catch (AssertionException & e) { ss << " exception update index " ; problem() << " caught assertion update index " << idx.indexNamespace() << " " << e << endl; } } } if ( keyUpdates && cc().database() -> profile ) ss << ' \n ' << keyUpdates << " key updates " ; } // update in place int sz = objNew.objsize(); // 将新修改的记录信息复制到旧记录(toupdate)所在位置 memcpy(getDur().writingPtr(toupdate -> data, sz), objNew.objdata(), sz); return dl; }
参考链接: http://www.cnblogs.com/daizhj/archive/2011/03/30/1999699.html http://www.cnblogs.com/daizhj/archive/2011/04/06/mongodb_delete_recode_source_code.html http://www.cnblogs.com/daizhj/archive/2011/04/02/2003335.html http://blog.mongodb.org/post/520888030/on-distributed-consistency-part-5-many-writer
原文链接:
作者: daizhj, 代震军 微博: http://t.sina.com.cn/daizhj Tags: mongodb,c++,source code