5 Star 15 Fork 1

Gitee 极速下载 / Badger

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/dgraph-io/badger
克隆/下载
value.go 32.87 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197
/*
* Copyright 2017 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package badger
import (
"bytes"
"context"
"fmt"
"hash"
"hash/crc32"
"io"
"math"
"os"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/pkg/errors"
otrace "go.opencensus.io/trace"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/z"
)
// maxVlogFileSize is the maximum size of the vlog file which can be created. Vlog Offset is of
// uint32, so limiting at max uint32.
var maxVlogFileSize uint32 = math.MaxUint32
// Values have their first byte being byteData or byteDelete. This helps us distinguish between
// a key that has never been seen and a key that has been explicitly deleted.
const (
bitDelete byte = 1 << 0 // Set if the key has been deleted.
bitValuePointer byte = 1 << 1 // Set if the value is NOT stored directly next to key.
bitDiscardEarlierVersions byte = 1 << 2 // Set if earlier versions can be discarded.
// Set if item shouldn't be discarded via compactions (used by merge operator)
bitMergeEntry byte = 1 << 3
// The MSB 2 bits are for transactions.
bitTxn byte = 1 << 6 // Set if the entry is part of a txn.
bitFinTxn byte = 1 << 7 // Set if the entry is to indicate end of txn in value log.
mi int64 = 1 << 20 //nolint:unused
// size of vlog header.
// +----------------+------------------+
// | keyID(8 bytes) | baseIV(12 bytes)|
// +----------------+------------------+
vlogHeaderSize = 20
)
var errStop = errors.New("Stop iteration")
var errTruncate = errors.New("Do truncate")
type logEntry func(e Entry, vp valuePointer) error
type safeRead struct {
k []byte
v []byte
recordOffset uint32
lf *logFile
}
// hashReader implements io.Reader, io.ByteReader interfaces. It also keeps track of the number
// bytes read. The hashReader writes to h (hash) what it reads from r.
type hashReader struct {
r io.Reader
h hash.Hash32
bytesRead int // Number of bytes read.
}
func newHashReader(r io.Reader) *hashReader {
hash := crc32.New(y.CastagnoliCrcTable)
return &hashReader{
r: r,
h: hash,
}
}
// Read reads len(p) bytes from the reader. Returns the number of bytes read, error on failure.
func (t *hashReader) Read(p []byte) (int, error) {
n, err := t.r.Read(p)
if err != nil {
return n, err
}
t.bytesRead += n
return t.h.Write(p[:n])
}
// ReadByte reads exactly one byte from the reader. Returns error on failure.
func (t *hashReader) ReadByte() (byte, error) {
b := make([]byte, 1)
_, err := t.Read(b)
return b[0], err
}
// Sum32 returns the sum32 of the underlying hash.
func (t *hashReader) Sum32() uint32 {
return t.h.Sum32()
}
// Entry reads an entry from the provided reader. It also validates the checksum for every entry
// read. Returns error on failure.
func (r *safeRead) Entry(reader io.Reader) (*Entry, error) {
tee := newHashReader(reader)
var h header
hlen, err := h.DecodeFrom(tee)
if err != nil {
return nil, err
}
if h.klen > uint32(1<<16) { // Key length must be below uint16.
return nil, errTruncate
}
kl := int(h.klen)
if cap(r.k) < kl {
r.k = make([]byte, 2*kl)
}
vl := int(h.vlen)
if cap(r.v) < vl {
r.v = make([]byte, 2*vl)
}
e := &Entry{}
e.offset = r.recordOffset
e.hlen = hlen
buf := make([]byte, h.klen+h.vlen)
if _, err := io.ReadFull(tee, buf[:]); err != nil {
if err == io.EOF {
err = errTruncate
}
return nil, err
}
if r.lf.encryptionEnabled() {
if buf, err = r.lf.decryptKV(buf[:], r.recordOffset); err != nil {
return nil, err
}
}
e.Key = buf[:h.klen]
e.Value = buf[h.klen:]
var crcBuf [crc32.Size]byte
if _, err := io.ReadFull(reader, crcBuf[:]); err != nil {
if err == io.EOF {
err = errTruncate
}
return nil, err
}
crc := y.BytesToU32(crcBuf[:])
if crc != tee.Sum32() {
return nil, errTruncate
}
e.meta = h.meta
e.UserMeta = h.userMeta
e.ExpiresAt = h.expiresAt
return e, nil
}
func (vlog *valueLog) rewrite(f *logFile) error {
vlog.filesLock.RLock()
for _, fid := range vlog.filesToBeDeleted {
if fid == f.fid {
vlog.filesLock.RUnlock()
return errors.Errorf("value log file already marked for deletion fid: %d", fid)
}
}
maxFid := vlog.maxFid
y.AssertTruef(f.fid < maxFid, "fid to move: %d. Current max fid: %d", f.fid, maxFid)
vlog.filesLock.RUnlock()
vlog.opt.Infof("Rewriting fid: %d", f.fid)
wb := make([]*Entry, 0, 1000)
var size int64
y.AssertTrue(vlog.db != nil)
var count, moved int
fe := func(e Entry) error {
count++
if count%100000 == 0 {
vlog.opt.Debugf("Processing entry %d", count)
}
vs, err := vlog.db.get(e.Key)
if err != nil {
return err
}
if discardEntry(e, vs, vlog.db) {
return nil
}
// Value is still present in value log.
if len(vs.Value) == 0 {
return errors.Errorf("Empty value: %+v", vs)
}
var vp valuePointer
vp.Decode(vs.Value)
// If the entry found from the LSM Tree points to a newer vlog file, don't do anything.
if vp.Fid > f.fid {
return nil
}
// If the entry found from the LSM Tree points to an offset greater than the one
// read from vlog, don't do anything.
if vp.Offset > e.offset {
return nil
}
// If the entry read from LSM Tree and vlog file point to the same vlog file and offset,
// insert them back into the DB.
// NOTE: It might be possible that the entry read from the LSM Tree points to
// an older vlog file. See the comments in the else part.
if vp.Fid == f.fid && vp.Offset == e.offset {
moved++
// This new entry only contains the key, and a pointer to the value.
ne := new(Entry)
// Remove only the bitValuePointer and transaction markers. We
// should keep the other bits.
ne.meta = e.meta &^ (bitValuePointer | bitTxn | bitFinTxn)
ne.UserMeta = e.UserMeta
ne.ExpiresAt = e.ExpiresAt
ne.Key = append([]byte{}, e.Key...)
ne.Value = append([]byte{}, e.Value...)
es := ne.estimateSizeAndSetThreshold(vlog.db.valueThreshold())
// Consider size of value as well while considering the total size
// of the batch. There have been reports of high memory usage in
// rewrite because we don't consider the value size. See #1292.
es += int64(len(e.Value))
// Ensure length and size of wb is within transaction limits.
if int64(len(wb)+1) >= vlog.opt.maxBatchCount ||
size+es >= vlog.opt.maxBatchSize {
if err := vlog.db.batchSet(wb); err != nil {
return err
}
size = 0
wb = wb[:0]
}
wb = append(wb, ne)
size += es
} else { //nolint:staticcheck
// It might be possible that the entry read from LSM Tree points to
// an older vlog file. This can happen in the following situation.
// Assume DB is opened with
// numberOfVersionsToKeep=1
//
// Now, if we have ONLY one key in the system "FOO" which has been
// updated 3 times and the same key has been garbage collected 3
// times, we'll have 3 versions of the movekey
// for the same key "FOO".
//
// NOTE: moveKeyi is the gc'ed version of the original key with version i
// We're calling the gc'ed keys as moveKey to simplify the
// explanantion. We used to add move keys but we no longer do that.
//
// Assume we have 3 move keys in L0.
// - moveKey1 (points to vlog file 10),
// - moveKey2 (points to vlog file 14) and
// - moveKey3 (points to vlog file 15).
//
// Also, assume there is another move key "moveKey1" (points to
// vlog file 6) (this is also a move Key for key "FOO" ) on upper
// levels (let's say 3). The move key "moveKey1" on level 0 was
// inserted because vlog file 6 was GCed.
//
// Here's what the arrangement looks like
// L0 => (moveKey1 => vlog10), (moveKey2 => vlog14), (moveKey3 => vlog15)
// L1 => ....
// L2 => ....
// L3 => (moveKey1 => vlog6)
//
// When L0 compaction runs, it keeps only moveKey3 because the number of versions
// to keep is set to 1. (we've dropped moveKey1's latest version)
//
// The new arrangement of keys is
// L0 => ....
// L1 => (moveKey3 => vlog15)
// L2 => ....
// L3 => (moveKey1 => vlog6)
//
// Now if we try to GC vlog file 10, the entry read from vlog file
// will point to vlog10 but the entry read from LSM Tree will point
// to vlog6. The move key read from LSM tree will point to vlog6
// because we've asked for version 1 of the move key.
//
// This might seem like an issue but it's not really an issue
// because the user has set the number of versions to keep to 1 and
// the latest version of moveKey points to the correct vlog file
// and offset. The stale move key on L3 will be eventually dropped
// by compaction because there is a newer versions in the upper
// levels.
}
return nil
}
_, err := f.iterate(vlog.opt.ReadOnly, 0, func(e Entry, vp valuePointer) error {
return fe(e)
})
if err != nil {
return err
}
batchSize := 1024
var loops int
for i := 0; i < len(wb); {
loops++
if batchSize == 0 {
vlog.db.opt.Warningf("We shouldn't reach batch size of zero.")
return ErrNoRewrite
}
end := i + batchSize
if end > len(wb) {
end = len(wb)
}
if err := vlog.db.batchSet(wb[i:end]); err != nil {
if err == ErrTxnTooBig {
// Decrease the batch size to half.
batchSize = batchSize / 2
continue
}
return err
}
i += batchSize
}
vlog.opt.Infof("Processed %d entries in %d loops", len(wb), loops)
vlog.opt.Infof("Total entries: %d. Moved: %d", count, moved)
vlog.opt.Infof("Removing fid: %d", f.fid)
var deleteFileNow bool
// Entries written to LSM. Remove the older file now.
{
vlog.filesLock.Lock()
// Just a sanity-check.
if _, ok := vlog.filesMap[f.fid]; !ok {
vlog.filesLock.Unlock()
return errors.Errorf("Unable to find fid: %d", f.fid)
}
if vlog.iteratorCount() == 0 {
delete(vlog.filesMap, f.fid)
deleteFileNow = true
} else {
vlog.filesToBeDeleted = append(vlog.filesToBeDeleted, f.fid)
}
vlog.filesLock.Unlock()
}
if deleteFileNow {
if err := vlog.deleteLogFile(f); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) incrIteratorCount() {
vlog.numActiveIterators.Add(1)
}
func (vlog *valueLog) iteratorCount() int {
return int(vlog.numActiveIterators.Load())
}
func (vlog *valueLog) decrIteratorCount() error {
num := vlog.numActiveIterators.Add(-1)
if num != 0 {
return nil
}
vlog.filesLock.Lock()
lfs := make([]*logFile, 0, len(vlog.filesToBeDeleted))
for _, id := range vlog.filesToBeDeleted {
lfs = append(lfs, vlog.filesMap[id])
delete(vlog.filesMap, id)
}
vlog.filesToBeDeleted = nil
vlog.filesLock.Unlock()
for _, lf := range lfs {
if err := vlog.deleteLogFile(lf); err != nil {
return err
}
}
return nil
}
func (vlog *valueLog) deleteLogFile(lf *logFile) error {
if lf == nil {
return nil
}
lf.lock.Lock()
defer lf.lock.Unlock()
// Delete fid from discard stats as well.
vlog.discardStats.Update(lf.fid, -1)
return lf.Delete()
}
func (vlog *valueLog) dropAll() (int, error) {
// If db is opened in InMemory mode, we don't need to do anything since there are no vlog files.
if vlog.db.opt.InMemory {
return 0, nil
}
// We don't want to block dropAll on any pending transactions. So, don't worry about iterator
// count.
var count int
deleteAll := func() error {
vlog.filesLock.Lock()
defer vlog.filesLock.Unlock()
for _, lf := range vlog.filesMap {
if err := vlog.deleteLogFile(lf); err != nil {
return err
}
count++
}
vlog.filesMap = make(map[uint32]*logFile)
vlog.maxFid = 0
return nil
}
if err := deleteAll(); err != nil {
return count, err
}
vlog.db.opt.Infof("Value logs deleted. Creating value log file: 1")
if _, err := vlog.createVlogFile(); err != nil { // Called while writes are stopped.
return count, err
}
return count, nil
}
func (db *DB) valueThreshold() int64 {
return db.threshold.valueThreshold.Load()
}
type valueLog struct {
dirPath string
// guards our view of which files exist, which to be deleted, how many active iterators
filesLock sync.RWMutex
filesMap map[uint32]*logFile
maxFid uint32
filesToBeDeleted []uint32
// A refcount of iterators -- when this hits zero, we can delete the filesToBeDeleted.
numActiveIterators atomic.Int32
db *DB
writableLogOffset atomic.Uint32 // read by read, written by write
numEntriesWritten uint32
opt Options
garbageCh chan struct{}
discardStats *discardStats
}
func vlogFilePath(dirPath string, fid uint32) string {
return fmt.Sprintf("%s%s%06d.vlog", dirPath, string(os.PathSeparator), fid)
}
func (vlog *valueLog) fpath(fid uint32) string {
return vlogFilePath(vlog.dirPath, fid)
}
func (vlog *valueLog) populateFilesMap() error {
vlog.filesMap = make(map[uint32]*logFile)
files, err := os.ReadDir(vlog.dirPath)
if err != nil {
return errFile(err, vlog.dirPath, "Unable to open log dir.")
}
found := make(map[uint64]struct{})
for _, file := range files {
if !strings.HasSuffix(file.Name(), ".vlog") {
continue
}
fsz := len(file.Name())
fid, err := strconv.ParseUint(file.Name()[:fsz-5], 10, 32)
if err != nil {
return errFile(err, file.Name(), "Unable to parse log id.")
}
if _, ok := found[fid]; ok {
return errFile(err, file.Name(), "Duplicate file found. Please delete one.")
}
found[fid] = struct{}{}
lf := &logFile{
fid: uint32(fid),
path: vlog.fpath(uint32(fid)),
registry: vlog.db.registry,
}
vlog.filesMap[uint32(fid)] = lf
if vlog.maxFid < uint32(fid) {
vlog.maxFid = uint32(fid)
}
}
return nil
}
func (vlog *valueLog) createVlogFile() (*logFile, error) {
fid := vlog.maxFid + 1
path := vlog.fpath(fid)
lf := &logFile{
fid: fid,
path: path,
registry: vlog.db.registry,
writeAt: vlogHeaderSize,
opt: vlog.opt,
}
err := lf.open(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 2*vlog.opt.ValueLogFileSize)
if err != z.NewFile && err != nil {
return nil, err
}
vlog.filesLock.Lock()
vlog.filesMap[fid] = lf
y.AssertTrue(vlog.maxFid < fid)
vlog.maxFid = fid
// writableLogOffset is only written by write func, by read by Read func.
// To avoid a race condition, all reads and updates to this variable must be
// done via atomics.
vlog.writableLogOffset.Store(vlogHeaderSize)
vlog.numEntriesWritten = 0
vlog.filesLock.Unlock()
return lf, nil
}
func errFile(err error, path string, msg string) error {
return fmt.Errorf("%s. Path=%s. Error=%v", msg, path, err)
}
// init initializes the value log struct. This initialization needs to happen
// before compactions start.
func (vlog *valueLog) init(db *DB) {
vlog.opt = db.opt
vlog.db = db
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
if vlog.opt.InMemory {
return
}
vlog.dirPath = vlog.opt.ValueDir
vlog.garbageCh = make(chan struct{}, 1) // Only allow one GC at a time.
lf, err := InitDiscardStats(vlog.opt)
y.Check(err)
vlog.discardStats = lf
// See TestPersistLFDiscardStats for purpose of statement below.
db.logToSyncChan(endVLogInitMsg)
}
func (vlog *valueLog) open(db *DB) error {
// We don't need to open any vlog files or collect stats for GC if DB is opened
// in InMemory mode. InMemory mode doesn't create any files/directories on disk.
if db.opt.InMemory {
return nil
}
if err := vlog.populateFilesMap(); err != nil {
return err
}
// If no files are found, then create a new file.
if len(vlog.filesMap) == 0 {
if vlog.opt.ReadOnly {
return nil
}
_, err := vlog.createVlogFile()
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
fids := vlog.sortedFids()
for _, fid := range fids {
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
// Just open in RDWR mode. This should not create a new log file.
lf.opt = vlog.opt
if err := lf.open(vlog.fpath(fid), os.O_RDWR,
2*vlog.opt.ValueLogFileSize); err != nil {
return y.Wrapf(err, "Open existing file: %q", lf.path)
}
// We shouldn't delete the maxFid file.
if lf.size.Load() == vlogHeaderSize && fid != vlog.maxFid {
vlog.opt.Infof("Deleting empty file: %s", lf.path)
if err := lf.Delete(); err != nil {
return y.Wrapf(err, "while trying to delete empty file: %s", lf.path)
}
delete(vlog.filesMap, fid)
}
}
if vlog.opt.ReadOnly {
return nil
}
// Now we can read the latest value log file, and see if it needs truncation. We could
// technically do this over all the value log files, but that would mean slowing down the value
// log open.
last, ok := vlog.filesMap[vlog.maxFid]
y.AssertTrue(ok)
lastOff, err := last.iterate(vlog.opt.ReadOnly, vlogHeaderSize,
func(_ Entry, vp valuePointer) error {
return nil
})
if err != nil {
return y.Wrapf(err, "while iterating over: %s", last.path)
}
if err := last.Truncate(int64(lastOff)); err != nil {
return y.Wrapf(err, "while truncating last value log file: %s", last.path)
}
// Don't write to the old log file. Always create a new one.
if _, err := vlog.createVlogFile(); err != nil {
return y.Wrapf(err, "Error while creating log file in valueLog.open")
}
return nil
}
func (vlog *valueLog) Close() error {
if vlog == nil || vlog.db == nil || vlog.db.opt.InMemory {
return nil
}
vlog.opt.Debugf("Stopping garbage collection of values.")
var err error
for id, lf := range vlog.filesMap {
lf.lock.Lock() // We won’t release the lock.
offset := int64(-1)
if !vlog.opt.ReadOnly && id == vlog.maxFid {
offset = int64(vlog.woffset())
}
if terr := lf.Close(offset); terr != nil && err == nil {
err = terr
}
}
if vlog.discardStats != nil {
vlog.db.captureDiscardStats()
if terr := vlog.discardStats.Close(-1); terr != nil && err == nil {
err = terr
}
}
return err
}
// sortedFids returns the file id's not pending deletion, sorted. Assumes we have shared access to
// filesMap.
func (vlog *valueLog) sortedFids() []uint32 {
toBeDeleted := make(map[uint32]struct{})
for _, fid := range vlog.filesToBeDeleted {
toBeDeleted[fid] = struct{}{}
}
ret := make([]uint32, 0, len(vlog.filesMap))
for fid := range vlog.filesMap {
if _, ok := toBeDeleted[fid]; !ok {
ret = append(ret, fid)
}
}
sort.Slice(ret, func(i, j int) bool {
return ret[i] < ret[j]
})
return ret
}
type request struct {
// Input values
Entries []*Entry
// Output values and wait group stuff below
Ptrs []valuePointer
Wg sync.WaitGroup
Err error
ref atomic.Int32
}
func (req *request) reset() {
req.Entries = req.Entries[:0]
req.Ptrs = req.Ptrs[:0]
req.Wg = sync.WaitGroup{}
req.Err = nil
req.ref.Store(0)
}
func (req *request) IncrRef() {
req.ref.Add(1)
}
func (req *request) DecrRef() {
nRef := req.ref.Add(-1)
if nRef > 0 {
return
}
req.Entries = nil
requestPool.Put(req)
}
func (req *request) Wait() error {
req.Wg.Wait()
err := req.Err
req.DecrRef() // DecrRef after writing to DB.
return err
}
type requests []*request
func (reqs requests) DecrRef() {
for _, req := range reqs {
req.DecrRef()
}
}
func (reqs requests) IncrRef() {
for _, req := range reqs {
req.IncrRef()
}
}
// sync function syncs content of latest value log file to disk. Syncing of value log directory is
// not required here as it happens every time a value log file rotation happens(check createVlogFile
// function). During rotation, previous value log file also gets synced to disk. It only syncs file
// if fid >= vlog.maxFid. In some cases such as replay(while opening db), it might be called with
// fid < vlog.maxFid. To sync irrespective of file id just call it with math.MaxUint32.
func (vlog *valueLog) sync() error {
if vlog.opt.SyncWrites || vlog.opt.InMemory {
return nil
}
vlog.filesLock.RLock()
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
// Sometimes it is possible that vlog.maxFid has been increased but file creation
// with same id is still in progress and this function is called. In those cases
// entry for the file might not be present in vlog.filesMap.
if curlf == nil {
vlog.filesLock.RUnlock()
return nil
}
curlf.lock.RLock()
vlog.filesLock.RUnlock()
err := curlf.Sync()
curlf.lock.RUnlock()
return err
}
func (vlog *valueLog) woffset() uint32 {
return vlog.writableLogOffset.Load()
}
// validateWrites will check whether the given requests can fit into 4GB vlog file.
// NOTE: 4GB is the maximum size we can create for vlog because value pointer offset is of type
// uint32. If we create more than 4GB, it will overflow uint32. So, limiting the size to 4GB.
func (vlog *valueLog) validateWrites(reqs []*request) error {
vlogOffset := uint64(vlog.woffset())
for _, req := range reqs {
// calculate size of the request.
size := estimateRequestSize(req)
estimatedVlogOffset := vlogOffset + size
if estimatedVlogOffset > uint64(maxVlogFileSize) {
return errors.Errorf("Request size offset %d is bigger than maximum offset %d",
estimatedVlogOffset, maxVlogFileSize)
}
if estimatedVlogOffset >= uint64(vlog.opt.ValueLogFileSize) {
// We'll create a new vlog file if the estimated offset is greater or equal to
// max vlog size. So, resetting the vlogOffset.
vlogOffset = 0
continue
}
// Estimated vlog offset will become current vlog offset if the vlog is not rotated.
vlogOffset = estimatedVlogOffset
}
return nil
}
// estimateRequestSize returns the size that needed to be written for the given request.
func estimateRequestSize(req *request) uint64 {
size := uint64(0)
for _, e := range req.Entries {
size += uint64(maxHeaderSize + len(e.Key) + len(e.Value) + crc32.Size)
}
return size
}
// write is thread-unsafe by design and should not be called concurrently.
func (vlog *valueLog) write(reqs []*request) error {
if vlog.db.opt.InMemory {
return nil
}
// Validate writes before writing to vlog. Because, we don't want to partially write and return
// an error.
if err := vlog.validateWrites(reqs); err != nil {
return y.Wrapf(err, "while validating writes")
}
vlog.filesLock.RLock()
maxFid := vlog.maxFid
curlf := vlog.filesMap[maxFid]
vlog.filesLock.RUnlock()
defer func() {
if vlog.opt.SyncWrites {
if err := curlf.Sync(); err != nil {
vlog.opt.Errorf("Error while curlf sync: %v\n", err)
}
}
}()
write := func(buf *bytes.Buffer) error {
if buf.Len() == 0 {
return nil
}
n := uint32(buf.Len())
endOffset := vlog.writableLogOffset.Add(n)
// Increase the file size if we cannot accommodate this entry.
// [Aman] Should this be >= or just >? Doesn't make sense to extend the file if it big enough already.
if int(endOffset) >= len(curlf.Data) {
if err := curlf.Truncate(int64(endOffset)); err != nil {
return err
}
}
start := int(endOffset - n)
y.AssertTrue(copy(curlf.Data[start:], buf.Bytes()) == int(n))
curlf.size.Store(endOffset)
return nil
}
toDisk := func() error {
if vlog.woffset() > uint32(vlog.opt.ValueLogFileSize) ||
vlog.numEntriesWritten > vlog.opt.ValueLogMaxEntries {
if err := curlf.doneWriting(vlog.woffset()); err != nil {
return err
}
newlf, err := vlog.createVlogFile()
if err != nil {
return err
}
curlf = newlf
}
return nil
}
buf := new(bytes.Buffer)
for i := range reqs {
b := reqs[i]
b.Ptrs = b.Ptrs[:0]
var written, bytesWritten int
valueSizes := make([]int64, 0, len(b.Entries))
for j := range b.Entries {
buf.Reset()
e := b.Entries[j]
valueSizes = append(valueSizes, int64(len(e.Value)))
if e.skipVlogAndSetThreshold(vlog.db.valueThreshold()) {
b.Ptrs = append(b.Ptrs, valuePointer{})
continue
}
var p valuePointer
p.Fid = curlf.fid
p.Offset = vlog.woffset()
// We should not store transaction marks in the vlog file because it will never have all
// the entries in a transaction. If we store entries with transaction marks then value
// GC will not be able to iterate on the entire vlog file.
// But, we still want the entry to stay intact for the memTable WAL. So, store the meta
// in a temporary variable and reassign it after writing to the value log.
tmpMeta := e.meta
e.meta = e.meta &^ (bitTxn | bitFinTxn)
plen, err := curlf.encodeEntry(buf, e, p.Offset) // Now encode the entry into buffer.
if err != nil {
return err
}
// Restore the meta.
e.meta = tmpMeta
p.Len = uint32(plen)
b.Ptrs = append(b.Ptrs, p)
if err := write(buf); err != nil {
return err
}
written++
bytesWritten += buf.Len()
// No need to flush anything, we write to file directly via mmap.
}
y.NumWritesVlogAdd(vlog.opt.MetricsEnabled, int64(written))
y.NumBytesWrittenVlogAdd(vlog.opt.MetricsEnabled, int64(bytesWritten))
vlog.numEntriesWritten += uint32(written)
vlog.db.threshold.update(valueSizes)
// We write to disk here so that all entries that are part of the same transaction are
// written to the same vlog file.
if err := toDisk(); err != nil {
return err
}
}
return toDisk()
}
// Gets the logFile and acquires and RLock() for the mmap. You must call RUnlock on the file
// (if non-nil)
func (vlog *valueLog) getFileRLocked(vp valuePointer) (*logFile, error) {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
ret, ok := vlog.filesMap[vp.Fid]
if !ok {
// log file has gone away, we can't do anything. Return.
return nil, errors.Errorf("file with ID: %d not found", vp.Fid)
}
// Check for valid offset if we are reading from writable log.
maxFid := vlog.maxFid
// In read-only mode we don't need to check for writable offset as we are not writing anything.
// Moreover, this offset is not set in readonly mode.
if !vlog.opt.ReadOnly && vp.Fid == maxFid {
currentOffset := vlog.woffset()
if vp.Offset >= currentOffset {
return nil, errors.Errorf(
"Invalid value pointer offset: %d greater than current offset: %d",
vp.Offset, currentOffset)
}
}
ret.lock.RLock()
return ret, nil
}
// Read reads the value log at a given location.
// TODO: Make this read private.
func (vlog *valueLog) Read(vp valuePointer, _ *y.Slice) ([]byte, func(), error) {
buf, lf, err := vlog.readValueBytes(vp)
// log file is locked so, decide whether to lock immediately or let the caller to
// unlock it, after caller uses it.
cb := vlog.getUnlockCallback(lf)
if err != nil {
return nil, cb, err
}
if vlog.opt.VerifyValueChecksum {
hash := crc32.New(y.CastagnoliCrcTable)
if _, err := hash.Write(buf[:len(buf)-crc32.Size]); err != nil {
runCallback(cb)
return nil, nil, y.Wrapf(err, "failed to write hash for vp %+v", vp)
}
// Fetch checksum from the end of the buffer.
checksum := buf[len(buf)-crc32.Size:]
if hash.Sum32() != y.BytesToU32(checksum) {
runCallback(cb)
return nil, nil, y.Wrapf(y.ErrChecksumMismatch, "value corrupted for vp: %+v", vp)
}
}
var h header
headerLen := h.Decode(buf)
kv := buf[headerLen:]
if lf.encryptionEnabled() {
kv, err = lf.decryptKV(kv, vp.Offset)
if err != nil {
return nil, cb, err
}
}
if uint32(len(kv)) < h.klen+h.vlen {
vlog.db.opt.Errorf("Invalid read: vp: %+v", vp)
return nil, nil, errors.Errorf("Invalid read: Len: %d read at:[%d:%d]",
len(kv), h.klen, h.klen+h.vlen)
}
return kv[h.klen : h.klen+h.vlen], cb, nil
}
// getUnlockCallback will returns a function which unlock the logfile if the logfile is mmaped.
// otherwise, it unlock the logfile and return nil.
func (vlog *valueLog) getUnlockCallback(lf *logFile) func() {
if lf == nil {
return nil
}
return lf.lock.RUnlock
}
// readValueBytes return vlog entry slice and read locked log file. Caller should take care of
// logFile unlocking.
func (vlog *valueLog) readValueBytes(vp valuePointer) ([]byte, *logFile, error) {
lf, err := vlog.getFileRLocked(vp)
if err != nil {
return nil, nil, err
}
buf, err := lf.read(vp)
y.NumReadsVlogAdd(vlog.db.opt.MetricsEnabled, 1)
y.NumBytesReadsVlogAdd(vlog.db.opt.MetricsEnabled, int64(len(buf)))
return buf, lf, err
}
func (vlog *valueLog) pickLog(discardRatio float64) *logFile {
vlog.filesLock.RLock()
defer vlog.filesLock.RUnlock()
LOOP:
// Pick a candidate that contains the largest amount of discardable data
fid, discard := vlog.discardStats.MaxDiscard()
// MaxDiscard will return fid=0 if it doesn't have any discard data. The
// vlog files start from 1.
if fid == 0 {
vlog.opt.Debugf("No file with discard stats")
return nil
}
lf, ok := vlog.filesMap[fid]
// This file was deleted but it's discard stats increased because of compactions. The file
// doesn't exist so we don't need to do anything. Skip it and retry.
if !ok {
vlog.discardStats.Update(fid, -1)
goto LOOP
}
// We have a valid file.
fi, err := lf.Fd.Stat()
if err != nil {
vlog.opt.Errorf("Unable to get stats for value log fid: %d err: %+v", fi, err)
return nil
}
if thr := discardRatio * float64(fi.Size()); float64(discard) < thr {
vlog.opt.Debugf("Discard: %d less than threshold: %.0f for file: %s",
discard, thr, fi.Name())
return nil
}
if fid < vlog.maxFid {
vlog.opt.Infof("Found value log max discard fid: %d discard: %d\n", fid, discard)
lf, ok := vlog.filesMap[fid]
y.AssertTrue(ok)
return lf
}
// Don't randomly pick any value log file.
return nil
}
func discardEntry(e Entry, vs y.ValueStruct, db *DB) bool {
if vs.Version != y.ParseTs(e.Key) {
// Version not found. Discard.
return true
}
if isDeletedOrExpired(vs.Meta, vs.ExpiresAt) {
return true
}
if (vs.Meta & bitValuePointer) == 0 {
// Key also stores the value in LSM. Discard.
return true
}
if (vs.Meta & bitFinTxn) > 0 {
// Just a txn finish entry. Discard.
return true
}
return false
}
func (vlog *valueLog) doRunGC(lf *logFile) error {
_, span := otrace.StartSpan(context.Background(), "Badger.GC")
span.Annotatef(nil, "GC rewrite for: %v", lf.path)
defer span.End()
if err := vlog.rewrite(lf); err != nil {
return err
}
// Remove the file from discardStats.
vlog.discardStats.Update(lf.fid, -1)
return nil
}
func (vlog *valueLog) waitOnGC(lc *z.Closer) {
defer lc.Done()
<-lc.HasBeenClosed() // Wait for lc to be closed.
// Block any GC in progress to finish, and don't allow any more writes to runGC by filling up
// the channel of size 1.
vlog.garbageCh <- struct{}{}
}
func (vlog *valueLog) runGC(discardRatio float64) error {
select {
case vlog.garbageCh <- struct{}{}:
// Pick a log file for GC.
defer func() {
<-vlog.garbageCh
}()
lf := vlog.pickLog(discardRatio)
if lf == nil {
return ErrNoRewrite
}
return vlog.doRunGC(lf)
default:
return ErrRejected
}
}
func (vlog *valueLog) updateDiscardStats(stats map[uint32]int64) {
if vlog.opt.InMemory {
return
}
for fid, discard := range stats {
vlog.discardStats.Update(fid, discard)
}
// The following is to coordinate with some test cases where we want to
// verify that at least one iteration of updateDiscardStats has been completed.
vlog.db.logToSyncChan(updateDiscardStatsMsg)
}
type vlogThreshold struct {
logger Logger
percentile float64
valueThreshold atomic.Int64
valueCh chan []int64
clearCh chan bool
closer *z.Closer
// Metrics contains a running log of statistics like amount of data stored etc.
vlMetrics *z.HistogramData
}
func initVlogThreshold(opt *Options) *vlogThreshold {
getBounds := func() []float64 {
mxbd := opt.maxValueThreshold
mnbd := float64(opt.ValueThreshold)
y.AssertTruef(mxbd >= mnbd, "maximum threshold bound is less than the min threshold")
size := math.Min(mxbd-mnbd+1, 1024.0)
bdstp := (mxbd - mnbd) / size
bounds := make([]float64, int64(size))
for i := range bounds {
if i == 0 {
bounds[0] = mnbd
continue
}
if i == int(size-1) {
bounds[i] = mxbd
continue
}
bounds[i] = bounds[i-1] + bdstp
}
return bounds
}
lt := &vlogThreshold{
logger: opt.Logger,
percentile: opt.VLogPercentile,
valueCh: make(chan []int64, 1000),
clearCh: make(chan bool, 1),
closer: z.NewCloser(1),
vlMetrics: z.NewHistogramData(getBounds()),
}
lt.valueThreshold.Store(opt.ValueThreshold)
return lt
}
func (v *vlogThreshold) Clear(opt Options) {
v.valueThreshold.Store(opt.ValueThreshold)
v.clearCh <- true
}
func (v *vlogThreshold) update(sizes []int64) {
v.valueCh <- sizes
}
func (v *vlogThreshold) close() {
v.closer.SignalAndWait()
}
func (v *vlogThreshold) listenForValueThresholdUpdate() {
defer v.closer.Done()
for {
select {
case <-v.closer.HasBeenClosed():
return
case val := <-v.valueCh:
for _, e := range val {
v.vlMetrics.Update(e)
}
// we are making it to get Options.VlogPercentile so that values with sizes
// in range of Options.VlogPercentile will make it to the LSM tree and rest to the
// value log file.
p := int64(v.vlMetrics.Percentile(v.percentile))
if v.valueThreshold.Load() != p {
if v.logger != nil {
v.logger.Infof("updating value of threshold to: %d", p)
}
v.valueThreshold.Store(p)
}
case <-v.clearCh:
v.vlMetrics.Clear()
}
}
}
Go
1
https://gitee.com/mirrors/Badger.git
git@gitee.com:mirrors/Badger.git
mirrors
Badger
Badger
main

搜索帮助