5 Star 15 Fork 1

Gitee 极速下载 / Badger

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/dgraph-io/badger
value.go 32.87 KB
一键复制 编辑 原始数据 按行查看 历史
* Copyright 2017 Dgraph Labs, Inc. and Contributors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package badger
import (
otrace "go.opencensus.io/trace"
// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
// uint32, so limiting at max uint32.
var maxVlogFileSize uint32 = math.MaxUint32
// Values have their first byte being byteData or byteDelete. This helps us distinguish between
// a key that has never been seen and a key that has been explicitly deleted.
const (
bitDelete byte = 1 << 0 // Set if the key has been deleted.
bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
// Set if item shouldn't be discarded via compactions (used by merge operator)
bitMergeEntry byte = 1 << 3
// The MSB 2 bits are for transactions.
bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
mi int64 = 1 << 20 //nolint:unused
// size of vlog header.
// +----------------+------------------+
// | keyID(8 bytes) | baseIV(12 bytes)|
// +----------------+------------------+
vlogHeaderSize = 20
var errStop = errors.New("Stop iteration")
var errTruncate = errors.New("Do truncate")
type logEntry func(e Entry, vp valuePointer) error
type safeRead struct {
k []byte
v []byte
recordOffset uint32
lf *logFile
// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
// bytes read. The hashReader writes to h (hash) what it reads from r.
type hashReader struct {
r io.Reader
h hash.Hash32
bytesRead int // Number of bytes read.
func newHashReader(r io.Reader) *hashReader {
hash := crc32.New(y.CastagnoliCrcTable)
return &hashReader{
r: r,
h: hash,
// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
func (t *hashReader) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if err != nil {
return n, err
t.bytesRead += n
return t.h.Write(p[:n])
// ReadByte reads exactly one byte from the reader. Returns error on failure.
func (t *hashReader) ReadByte() (byte, error) {
b := make([]byte, 1)
_, err := t.Read(b)
return b[0], err
// Sum32 returns the sum32 of the underlying hash.
func (t *hashReader) Sum32() uint32 {
return t.h.Sum32()
// Entry reads an entry from the provided reader. It also validates the checksum for every entry
// read. Returns error on failure.
func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
tee := newHashReader(reader)
var h header
hlen, err := h.DecodeFrom(tee)
if err != nil {
return nil, err
if h.klen > uint32(1<<16) { // Key length must be below uint16.
return nil, errTruncate
kl := int(h.klen)
if cap(r.k) < kl {
r.k = make([]byte, 2*kl)
vl := int(h.vlen)
if cap(r.v) < vl {
r.v = make([]byte, 2*vl)
e := &Entry{}
e.offset = r.recordOffset
e.hlen = hlen
buf := make([]byte, h.klen+h.vlen)
if _, err := io.ReadFull(tee, buf[:]); err != nil {
if err == io.EOF {
err = errTruncate
return nil, err
if r.lf.encryptionEnabled() {
if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
return nil, err
e.Key = buf[:h.klen]
e.Value = buf[h.klen:]
var crcBuf [crc32.Size]byte
if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
if err == io.EOF {
err = errTruncate
return nil, err
crc := y.BytesToU32(crcBuf[:])
if crc != tee.Sum32() {
return nil, errTruncate
e.meta = h.meta
e.UserMeta = h.userMeta
e.ExpiresAt = h.expiresAt
return e, nil
func (vlog *valueLog) rewrite(f *logFile) error {
for _, fid := range vlog.filesToBeDeleted {
if fid == f.fid {
return errors.Errorf("value log file already marked for deletion fid: %d", fid)
maxFid := vlog.maxFid
y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
vlog.opt.Infof("Rewriting fid: %d", f.fid)
wb := make([]*Entry, 0, 1000)
var size int64
y.AssertTrue(vlog.db != nil)
var count, moved int
fe := func(e Entry) error {
if count%100000 == 0 {
vlog.opt.Debugf("Processing entry %d", count)
vs, err := vlog.db.get(e.Key)
if err != nil {
return err
if discardEntry(e, vs, vlog.db) {
return nil
// Value is still present in value log.
if len(vs.Value) == 0 {
return errors.Errorf("Empty value: %+v", vs)
var vp valuePointer
// If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
if vp.Fid > f.fid {
return nil
// If the entry found from the LSM Tree points to an offset greater than the one
// read from vlog, don't do anything.
if vp.Offset > e.offset {
return nil
// If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
// insert them back into the DB.
// NOTE: It might be possible that the entry read from the LSM Tree points to
// an older vlog file. See the comments in the else part.
if vp.Fid == f.fid && vp.Offset == e.offset {
// This new entry only contains the key, and a pointer to the value.
ne := new(Entry)
// Remove only the bitValuePointer and transaction markers. We
// should keep the other bits.
ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
ne.UserMeta = e.UserMeta
ne.ExpiresAt = e.ExpiresAt
ne.Key = append([]byte{}, e.Key...)
ne.Value = append([]byte{}, e.Value...)
es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
// Consider size of value as well while considering the total size
// of the batch. There have been reports of high memory usage in
// rewrite because we don't consider the value size. See #1292.
es += int64(len(e.Value))
// Ensure length and size of wb is within transaction limits.
if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
size+es >= vlog.opt.maxBatchSize {
if err := vlog.db.batchSet(wb); err != nil {
return err
size = 0
wb = wb[:0]
wb = append(wb, ne)
size += es
} else { //nolint:staticcheck
// It might be possible that the entry read from LSM Tree points to
// an older vlog file. This can happen in the following situation.
// Assume DB is opened with
// numberOfVersionsToKeep=1
// Now, if we have ONLY one key in the system "FOO" which has been
// updated 3 times and the same key has been garbage collected 3
// times, we'll have 3 versions of the movekey
// for the same key "FOO".
// NOTE: moveKeyi is the gc'ed version of the original key with version i
// We're calling the gc'ed keys as moveKey to simplify the
// explanantion. We used to add move keys but we no longer do that.
// Assume we have 3 move keys in L0.
// - moveKey1 (points to vlog file 10),
// - moveKey2 (points to vlog file 14) and
// - moveKey3 (points to vlog file 15).
// Also, assume there is another move key "moveKey1" (points to
// vlog file 6) (this is also a move Key for key "FOO" ) on upper
// levels (let's say 3). The move key "moveKey1" on level 0 was
// inserted because vlog file 6 was GCed.
// Here's what the arrangement looks like
// L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
// L1 => ....
// L2 => ....
// L3 => (moveKey1 => vlog6)
// When L0 compaction runs, it keeps only moveKey3 because the number of versions
// to keep is set to 1. (we've dropped moveKey1's latest version)
// The new arrangement of keys is
// L0 => ....
// L1 => (moveKey3 => vlog15)
// L2 => ....
// L3 => (moveKey1 => vlog6)
// Now if we try to GC vlog file 10, the entry read from vlog file
// will point to vlog10 but the entry read from LSM Tree will point
// to vlog6. The move key read from LSM tree will point to vlog6
// because we've asked for version 1 of the move key.
// This might seem like an issue but it's not really an issue
// because the user has set the number of versions to keep to 1 and
// the latest version of moveKey points to the correct vlog file
// and offset. The stale move key on L3 will be eventually dropped
// by compaction because there is a newer versions in the upper
// levels.
return nil
_, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
return fe(e)
if err != nil {
return err
batchSize := 1024
var loops int
for i := 0; i < len(wb); {
if batchSize == 0 {
vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
return ErrNoRewrite
end := i + batchSize
if end > len(wb) {
end = len(wb)
if err := vlog.db.batchSet(wb[i:end]); err != nil {
if err == ErrTxnTooBig {
// Decrease the batch size to half.
batchSize = batchSize / 2
return err
i += batchSize
vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
vlog.opt.Infof("Removing fid: %d", f.fid)
var deleteFileNow bool
// Entries written to LSM. Remove the older file now.
// Just a sanity-check.
if _, ok := vlog.filesMap[f.fid]; !ok {
return errors.Errorf("Unable to find fid: %d", f.fid)
if vlog.iteratorCount() == 0 {
delete(vlog.filesMap, f.fid)
deleteFileNow = true
} else {
vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
if deleteFileNow {
if err := vlog.deleteLogFile(f); err != nil {
return err
return nil
func (vlog *valueLog) incrIteratorCount() {
func (vlog *valueLog) iteratorCount() int {
return int(vlog.numActiveIterators.Load())
func (vlog *valueLog) decrIteratorCount() error {
num := vlog.numActiveIterators.Add(-1)
if num != 0 {
return nil
lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
for _, id := range vlog.filesToBeDeleted {
lfs = append(lfs, vlog.filesMap[id])
delete(vlog.filesMap, id)
vlog.filesToBeDeleted = nil
for _, lf := range lfs {
if err := vlog.deleteLogFile(lf); err != nil {
return err
return nil
func (vlog *valueLog) deleteLogFile(lf *logFile) error {
if lf == nil {
return nil
defer lf.lock.Unlock()
// Delete fid from discard stats as well.
vlog.discardStats.Update(lf.fid, -1)
return lf.Delete()
func (vlog *valueLog) dropAll() (int, error) {
// If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
if vlog.db.opt.InMemory {
return 0, nil
// We don't want to block dropAll on any pending transactions. So, don't worry about iterator
// count.
var count int
deleteAll := func() error {
defer vlog.filesLock.Unlock()
for _, lf := range vlog.filesMap {
if err := vlog.deleteLogFile(lf); err != nil {
return err
vlog.filesMap = make(map[uint32]*logFile)
vlog.maxFid = 0
return nil
if err := deleteAll(); err != nil {
return count, err
vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
return count, err
return count, nil
func (db *DB) valueThreshold() int64 {
return db.threshold.valueThreshold.Load()
type valueLog struct {
dirPath string
// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
filesMap map[uint32]*logFile
maxFid uint32
filesToBeDeleted []uint32
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
numActiveIterators atomic.Int32
db *DB
writableLogOffset atomic.Uint32 // read by read, written by write
numEntriesWritten uint32
opt Options
garbageCh chan struct{}
discardStats *discardStats
func vlogFilePath(dirPath string, fid uint32) string {
return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
func (vlog *valueLog) fpath(fid uint32) string {
return vlogFilePath(vlog.dirPath, fid)
func (vlog *valueLog) populateFilesMap() error {
vlog.filesMap = make(map[uint32]*logFile)
files, err := os.ReadDir(vlog.dirPath)
if err != nil {
return errFile(err, vlog.dirPath, "Unable to open log dir.")
found := make(map[uint64]struct{})
for _, file := range files {
if !strings.HasSuffix(file.Name(), ".vlog") {
fsz := len(file.Name())
fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
if err != nil {
return errFile(err, file.Name(), "Unable to parse log id.")
if _, ok := found[fid]; ok {
return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
found[fid] = struct{}{}
lf := &logFile{
fid: uint32(fid),
path: vlog.fpath(uint32(fid)),
registry: vlog.db.registry,
vlog.filesMap[uint32(fid)] = lf
if vlog.maxFid < uint32(fid) {
vlog.maxFid = uint32(fid)
return nil
func (vlog *valueLog) createVlogFile() (*logFile, error) {
fid := vlog.maxFid + 1
path := vlog.fpath(fid)
lf := &logFile{
fid: fid,
path: path,
registry: vlog.db.registry,
writeAt: vlogHeaderSize,
opt: vlog.opt,
err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
if err != z.NewFile && err != nil {
return nil, err
vlog.filesMap[fid] = lf
y.AssertTrue(vlog.maxFid < fid)
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
vlog.numEntriesWritten = 0
return lf, nil
func errFile(err error, path string, msg string) error {
return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func (vlog *valueLog) init(db *DB) {
vlog.opt = db.opt
vlog.db = db
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
if vlog.opt.InMemory {
vlog.dirPath = vlog.opt.ValueDir
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
lf, err := InitDiscardStats(vlog.opt)
vlog.discardStats = lf
// See TestPersistLFDiscardStats for purpose of statement below.
func (vlog *valueLog) open(db *DB) error {
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
if db.opt.InMemory {
return nil
if err := vlog.populateFilesMap(); err != nil {
return err
// If no files are found, then create a new file.
if len(vlog.filesMap) == 0 {
if vlog.opt.ReadOnly {
return nil
_, err := vlog.createVlogFile()
return y.Wrapf(err, "Error while creating log file in valueLog.open")
fids := vlog.sortedFids()
for _, fid := range fids {
lf, ok := vlog.filesMap[fid]
// Just open in RDWR mode. This should not create a new log file.
lf.opt = vlog.opt
if err := lf.open(vlog.fpath(fid), os.O_RDWR,
2*vlog.opt.ValueLogFileSize); err != nil {
return y.Wrapf(err, "Open existing file: %q", lf.path)
// We shouldn't delete the maxFid file.
if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
vlog.opt.Infof("Deleting empty file: %s", lf.path)
if err := lf.Delete(); err != nil {
return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
delete(vlog.filesMap, fid)
if vlog.opt.ReadOnly {
return nil
// Now we can read the latest value log file, and see if it needs truncation. We could
// technically do this over all the value log files, but that would mean slowing down the value
// log open.
last, ok := vlog.filesMap[vlog.maxFid]
lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
func(_ Entry, vp valuePointer) error {
return nil
if err != nil {
return y.Wrapf(err, "while iterating over: %s", last.path)
if err := last.Truncate(int64(lastOff)); err != nil {
return y.Wrapf(err, "while truncating last value log file: %s", last.path)
// Don't write to the old log file. Always create a new one.
if _, err := vlog.createVlogFile(); err != nil {
return y.Wrapf(err, "Error while creating log file in valueLog.open")
return nil
func (vlog *valueLog) Close() error {
if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
return nil
vlog.opt.Debugf("Stopping garbage collection of values.")
var err error
for id, lf := range vlog.filesMap {
lf.lock.Lock() // We won’t release the lock.
offset := int64(-1)
if !vlog.opt.ReadOnly && id == vlog.maxFid {
offset = int64(vlog.woffset())
if terr := lf.Close(offset); terr != nil && err == nil {
err = terr
if vlog.discardStats != nil {
if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
err = terr
return err
// sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
// filesMap.
func (vlog *valueLog) sortedFids() []uint32 {
toBeDeleted := make(map[uint32]struct{})
for _, fid := range vlog.filesToBeDeleted {
toBeDeleted[fid] = struct{}{}
ret := make([]uint32, 0, len(vlog.filesMap))
for fid := range vlog.filesMap {
if _, ok := toBeDeleted[fid]; !ok {
ret = append(ret, fid)
sort.Slice(ret, func(i, j int) bool {
return ret[i] < ret[j]
return ret
type request struct {
// Input values
Entries []*Entry
// Output values and wait group stuff below
Ptrs []valuePointer
Wg sync.WaitGroup
Err error
ref atomic.Int32
func (req *request) reset() {
req.Entries = req.Entries[:0]
req.Ptrs = req.Ptrs[:0]
req.Wg = sync.WaitGroup{}
req.Err = nil
func (req *request) IncrRef() {
func (req *request) DecrRef() {
nRef := req.ref.Add(-1)
if nRef > 0 {
req.Entries = nil
func (req *request) Wait() error {
err := req.Err
req.DecrRef() // DecrRef after writing to DB.
return err
type requests []*request
func (reqs requests) DecrRef() {
for _, req := range reqs {
func (reqs requests) IncrRef() {
for _, req := range reqs {
// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
func (vlog *valueLog) sync() error {
if vlog.opt.SyncWrites || vlog.opt.InMemory {
return nil
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
// Sometimes it is possible that vlog.maxFid has been increased but file creation
// with same id is still in progress and this function is called. In those cases
// entry for the file might not be present in vlog.filesMap.
if curlf == nil {
return nil
err := curlf.Sync()
return err
func (vlog *valueLog) woffset() uint32 {
return vlog.writableLogOffset.Load()
// validateWrites will check whether the given requests can fit into 4GB vlog file.
// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
func (vlog *valueLog) validateWrites(reqs []*request) error {
vlogOffset := uint64(vlog.woffset())
for _, req := range reqs {
// calculate size of the request.
size := estimateRequestSize(req)
estimatedVlogOffset := vlogOffset + size
if estimatedVlogOffset > uint64(maxVlogFileSize) {
return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
estimatedVlogOffset, maxVlogFileSize)
if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
// We'll create a new vlog file if the estimated offset is greater or equal to
// max vlog size. So, resetting the vlogOffset.
vlogOffset = 0
// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
vlogOffset = estimatedVlogOffset
return nil
// estimateRequestSize returns the size that needed to be written for the given request.
func estimateRequestSize(req *request) uint64 {
size := uint64(0)
for _, e := range req.Entries {
size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
return size
// write is thread-unsafe by design and should not be called concurrently.
func (vlog *valueLog) write(reqs []*request) error {
if vlog.db.opt.InMemory {
return nil
// Validate writes before writing to vlog. Because, we don't want to partially write and return
// an error.
if err := vlog.validateWrites(reqs); err != nil {
return y.Wrapf(err, "while validating writes")
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
defer func() {
if vlog.opt.SyncWrites {
if err := curlf.Sync(); err != nil {
vlog.opt.Errorf("Error while curlf sync: %v\n", err)
write := func(buf *bytes.Buffer) error {
if buf.Len() == 0 {
return nil
n := uint32(buf.Len())
endOffset := vlog.writableLogOffset.Add(n)
// Increase the file size if we cannot accommodate this entry.
// [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
if int(endOffset) >= len(curlf.Data) {
if err := curlf.Truncate(int64(endOffset)); err != nil {
return err
start := int(endOffset - n)
y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
return nil
toDisk := func() error {
if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
if err := curlf.doneWriting(vlog.woffset()); err != nil {
return err
newlf, err := vlog.createVlogFile()
if err != nil {
return err
curlf = newlf
return nil
buf := new(bytes.Buffer)
for i := range reqs {
b := reqs[i]
b.Ptrs = b.Ptrs[:0]
var written, bytesWritten int
valueSizes := make([]int64, 0, len(b.Entries))
for j := range b.Entries {
e := b.Entries[j]
valueSizes = append(valueSizes, int64(len(e.Value)))
if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
b.Ptrs = append(b.Ptrs, valuePointer{})
var p valuePointer
p.Fid = curlf.fid
p.Offset = vlog.woffset()
// We should not store transaction marks in the vlog file because it will never have all
// the entries in a transaction. If we store entries with transaction marks then value
// GC will not be able to iterate on the entire vlog file.
// But, we still want the entry to stay intact for the memTable WAL. So, store the meta
// in a temporary variable and reassign it after writing to the value log.
tmpMeta := e.meta
e.meta = e.meta &^ (bitTxn | bitFinTxn)
plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
if err != nil {
return err
// Restore the meta.
e.meta = tmpMeta
p.Len = uint32(plen)
b.Ptrs = append(b.Ptrs, p)
if err := write(buf); err != nil {
return err
bytesWritten += buf.Len()
// No need to flush anything, we write to file directly via mmap.
y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
vlog.numEntriesWritten += uint32(written)
// We write to disk here so that all entries that are part of the same transaction are
// written to the same vlog file.
if err := toDisk(); err != nil {
return err
return toDisk()
// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
defer vlog.filesLock.RUnlock()
ret, ok := vlog.filesMap[vp.Fid]
if !ok {
// log file has gone away, we can't do anything. Return.
return nil, errors.Errorf("file with ID: %d not found", vp.Fid)
// Check for valid offset if we are reading from writable log.
maxFid := vlog.maxFid
// In read-only mode we don't need to check for writable offset as we are not writing anything.
// Moreover, this offset is not set in readonly mode.
if !vlog.opt.ReadOnly && vp.Fid == maxFid {
currentOffset := vlog.woffset()
if vp.Offset >= currentOffset {
return nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, currentOffset)
return ret, nil
// Read reads the value log at a given location.
// TODO: Make this read private.
func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
buf, lf, err := vlog.readValueBytes(vp)
// log file is locked so, decide whether to lock immediately or let the caller to
// unlock it, after caller uses it.
cb := vlog.getUnlockCallback(lf)
if err != nil {
return nil, cb, err
if vlog.opt.VerifyValueChecksum {
hash := crc32.New(y.CastagnoliCrcTable)
if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
// Fetch checksum from the end of the buffer.
checksum := buf[len(buf)-crc32.Size:]
if hash.Sum32() != y.BytesToU32(checksum) {
return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
var h header
headerLen := h.Decode(buf)
kv := buf[headerLen:]
if lf.encryptionEnabled() {
kv, err = lf.decryptKV(kv, vp.Offset)
if err != nil {
return nil, cb, err
if uint32(len(kv)) < h.klen+h.vlen {
vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
return nil, nil, errors.Errorf("Invalid read: Len: %d read at:[%d:%d]",
len(kv), h.klen, h.klen+h.vlen)
return kv[h.klen : h.klen+h.vlen], cb, nil
// getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
// otherwise, it unlock the logfile and return nil.
func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
if lf == nil {
return nil
return lf.lock.RUnlock
// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
// logFile unlocking.
func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
buf, err := lf.read(vp)
y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
return buf, lf, err
func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
defer vlog.filesLock.RUnlock()
// Pick a candidate that contains the largest amount of discardable data
fid, discard := vlog.discardStats.MaxDiscard()
// MaxDiscard will return fid=0 if it doesn't have any discard data. The
// vlog files start from 1.
if fid == 0 {
vlog.opt.Debugf("No file with discard stats")
return nil
lf, ok := vlog.filesMap[fid]
// This file was deleted but it's discard stats increased because of compactions. The file
// doesn't exist so we don't need to do anything. Skip it and retry.
if !ok {
vlog.discardStats.Update(fid, -1)
goto LOOP
// We have a valid file.
fi, err := lf.Fd.Stat()
if err != nil {
vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
return nil
if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
discard, thr, fi.Name())
return nil
if fid < vlog.maxFid {
vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
lf, ok := vlog.filesMap[fid]
return lf
// Don't randomly pick any value log file.
return nil
func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
if vs.Version != y.ParseTs(e.Key) {
// Version not found. Discard.
return true
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return true
if (vs.Meta & bitValuePointer) == 0 {
// Key also stores the value in LSM. Discard.
return true
if (vs.Meta & bitFinTxn) > 0 {
// Just a txn finish entry. Discard.
return true
return false
func (vlog *valueLog) doRunGC(lf *logFile) error {
_, span := otrace.StartSpan(context.Background(), "Badger.GC")
span.Annotatef(nil, "GC rewrite for: %v", lf.path)
defer span.End()
if err := vlog.rewrite(lf); err != nil {
return err
// Remove the file from discardStats.
vlog.discardStats.Update(lf.fid, -1)
return nil
func (vlog *valueLog) waitOnGC(lc *z.Closer) {
defer lc.Done()
<-lc.HasBeenClosed() // Wait for lc to be closed.
// Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
// the channel of size 1.
vlog.garbageCh <- struct{}{}
func (vlog *valueLog) runGC(discardRatio float64) error {
select {
case vlog.garbageCh <- struct{}{}:
// Pick a log file for GC.
defer func() {
lf := vlog.pickLog(discardRatio)
if lf == nil {
return ErrNoRewrite
return vlog.doRunGC(lf)
return ErrRejected
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
if vlog.opt.InMemory {
for fid, discard := range stats {
vlog.discardStats.Update(fid, discard)
// The following is to coordinate with some test cases where we want to
// verify that at least one iteration of updateDiscardStats has been completed.
type vlogThreshold struct {
logger Logger
percentile float64
valueThreshold atomic.Int64
valueCh chan []int64
clearCh chan bool
closer *z.Closer
// Metrics contains a running log of statistics like amount of data stored etc.
vlMetrics *z.HistogramData
func initVlogThreshold(opt *Options) *vlogThreshold {
getBounds := func() []float64 {
mxbd := opt.maxValueThreshold
mnbd := float64(opt.ValueThreshold)
y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
size := math.Min(mxbd-mnbd+1, 1024.0)
bdstp := (mxbd - mnbd) / size
bounds := make([]float64, int64(size))
for i := range bounds {
if i == 0 {
bounds[0] = mnbd
if i == int(size-1) {
bounds[i] = mxbd
bounds[i] = bounds[i-1] + bdstp
return bounds
lt := &vlogThreshold{
logger: opt.Logger,
percentile: opt.VLogPercentile,
valueCh: make(chan []int64, 1000),
clearCh: make(chan bool, 1),
closer: z.NewCloser(1),
vlMetrics: z.NewHistogramData(getBounds()),
return lt
func (v *vlogThreshold) Clear(opt Options) {
v.clearCh <- true
func (v *vlogThreshold) update(sizes []int64) {
v.valueCh <- sizes
func (v *vlogThreshold) close() {
func (v *vlogThreshold) listenForValueThresholdUpdate() {
defer v.closer.Done()
for {
select {
case <-v.closer.HasBeenClosed():
case val := <-v.valueCh:
for _, e := range val {
// we are making it to get Options.VlogPercentile so that values with sizes
// in range of Options.VlogPercentile will make it to the LSM tree and rest to the
// value log file.
p := int64(v.vlMetrics.Percentile(v.percentile))
if v.valueThreshold.Load() != p {
if v.logger != nil {
v.logger.Infof("updating value of threshold to: %d", p)
case <-v.clearCh:
