// Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value){ WriteBatch batch; batch.Put(key, value); returnWrite(opt, &batch); }
MutexLock l(&mutex_); writers_.push_back(&w); while (!w.done && &w != writers_.front()) { w.cv.Wait(); } if (w.done) { return w.status; }
// May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock // during this phase since &w is currently responsible for logging // and protects against concurrent loggers and concurrent writes // into mem_. { mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); bool sync_error = false; if (status.ok() && options.sync) { status = logfile_->Sync(); if (!status.ok()) { sync_error = true; } } if (status.ok()) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); if (sync_error) { // The state of the log file is indeterminate: the log record we // just added may or may not show up when the DB is re-opened. // So we force the DB into a mode where all future writes fail. RecordBackgroundError(status); } } if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence); }
while (true) { Writer* ready = writers_.front(); writers_.pop_front(); if (ready != &w) { ready->status = status; ready->done = true; ready->cv.Signal(); } if (ready == last_writer) break; }
// Notify new head of write queue if (!writers_.empty()) { writers_.front()->cv.Signal(); }
return status; }
再看下Writer的定义
1 2 3 4 5 6 7 8 9 10
// Information kept for every waiting writer structDBImpl::Writer { Status status; WriteBatch* batch; bool sync; bool done; port::CondVar cv;