5 Star 15 Fork 1

Gitee 极速下载 / Badger

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/dgraph-io/badger
克隆/下载
managed_db_test.go 20.95 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834
package badger
import (
"fmt"
"math"
"math/rand"
"os"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/dgraph-io/badger/v4/y"
"github.com/dgraph-io/ristretto/z"
)
func val(large bool) []byte {
var buf []byte
if large {
buf = make([]byte, 8192)
} else {
buf = make([]byte, 16)
}
rand.Read(buf)
return buf
}
func numKeys(db *DB) int {
var count int
err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
}
return nil
})
y.Check(err)
return count
}
func numKeysManaged(db *DB, readTs uint64) int {
txn := db.NewTransactionAt(readTs, false)
defer txn.Discard()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
}
return count
}
func TestDropAllManaged(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.managedTxns = true
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(10000)
populate := func(db *DB, start uint64) {
var wg sync.WaitGroup
for i := start; i < start+N; i++ {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", int(i))), val(true))))
require.NoError(t, txn.CommitAt(i, func(err error) {
require.NoError(t, err)
wg.Done()
}))
}
wg.Wait()
}
populate(db, N)
require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64))
require.NoError(t, db.DropAll())
require.NoError(t, db.DropAll()) // Just call it twice, for fun.
require.Equal(t, 0, numKeysManaged(db, math.MaxUint64))
// Check that we can still write to db, and using lower timestamps.
populate(db, 1)
require.Equal(t, int(N), numKeysManaged(db, math.MaxUint64))
require.NoError(t, db.Close())
// Ensure that value log is correctly replayed, that we are preserving badgerHead.
opts.managedTxns = true
db2, err := Open(opts)
require.NoError(t, err)
require.Equal(t, int(N), numKeysManaged(db2, math.MaxUint64))
require.NoError(t, db2.Close())
}
func TestDropAll(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropAll())
require.Equal(t, 0, numKeys(db))
// Check that we can still write to mdb, and using lower timestamps.
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
// Ensure that value log is correctly replayed.
db2, err := Open(opts)
require.NoError(t, err)
require.Equal(t, int(N), numKeys(db2))
require.NoError(t, db2.Close())
}
func TestDropAllTwice(t *testing.T) {
test := func(t *testing.T, opts Options) {
db, err := Open(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(false)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropAll())
require.Equal(t, 0, numKeys(db))
// Call DropAll again.
require.NoError(t, db.DropAll())
require.NoError(t, db.Close())
}
t.Run("disk mode", func(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
test(t, opts)
})
t.Run("InMemory mode", func(t *testing.T) {
opts := getTestOptions("")
opts.InMemory = true
test(t, opts)
})
}
func TestDropAllWithPendingTxn(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
txn := db.NewTransaction(true)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var keys []string
for {
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
item := itr.Item()
keys = append(keys, string(item.KeyCopy(nil)))
_, err := item.ValueCopy(nil)
if err != nil {
t.Logf("Got error during value copy: %v", err)
return
}
}
t.Logf("Got number of keys: %d\n", count)
for _, key := range keys {
item, err := txn.Get([]byte(key))
if err != nil {
t.Logf("Got error during key lookup: %v", err)
return
}
if _, err := item.ValueCopy(nil); err != nil {
t.Logf("Got error during second value copy: %v", err)
return
}
}
}
}()
// Do not cancel txn.
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
require.NoError(t, db.DropAll())
}()
wg.Wait()
}
func TestDropReadOnly(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
opts.ReadOnly = true
db2, err := Open(opts)
// acquireDirectoryLock returns ErrWindowsNotSupported on Windows. It can be ignored safely.
if runtime.GOOS == "windows" {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { _ = db2.DropAll() })
require.NoError(t, db2.Close())
}
}
func TestWriteAfterClose(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
err = db.Update(func(txn *Txn) error {
return txn.SetEntry(NewEntry([]byte("a"), []byte("b")))
})
require.Equal(t, ErrDBClosed, err)
}
func TestDropAllRace(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.managedTxns = true
db, err := Open(opts)
require.NoError(t, err)
N := 10000
// Start a goroutine to keep trying to write to DB while DropAll happens.
closer := z.NewCloser(1)
go func() {
defer closer.Done()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
i := N + 1 // Writes would happen above N.
var errors atomic.Int32
for {
select {
case <-ticker.C:
i++
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
if err := txn.CommitAt(uint64(i), func(err error) {
if err != nil {
errors.Add(1)
}
}); err != nil {
errors.Add(1)
}
case <-closer.HasBeenClosed():
// The following causes a data race.
// t.Logf("i: %d. Number of (expected) write errors: %d.\n", i, errors)
return
}
}
}()
var wg sync.WaitGroup
for i := 1; i <= N; i++ {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
require.NoError(t, txn.CommitAt(uint64(i), func(err error) {
require.NoError(t, err)
wg.Done()
}))
}
wg.Wait()
before := numKeysManaged(db, math.MaxUint64)
require.True(t, before > N)
require.NoError(t, db.DropAll())
closer.SignalAndWait()
after := numKeysManaged(db, math.MaxUint64)
t.Logf("Before: %d. After dropall: %d\n", before, after)
require.True(t, after < before)
db.Close()
}
func TestDropPrefix(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key000")))
require.Equal(t, int(N)-10, numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key00")))
require.Equal(t, int(N)-100, numKeys(db))
expected := int(N)
for i := 0; i < 10; i++ {
require.NoError(t, db.DropPrefix([]byte(fmt.Sprintf("key%d", i))))
expected -= 1000
require.Equal(t, expected, numKeys(db))
}
require.NoError(t, db.DropPrefix([]byte("key1")))
require.Equal(t, 0, numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key")))
require.Equal(t, 0, numKeys(db))
// Check that we can still write to mdb, and using lower timestamps.
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.DropPrefix([]byte("key")))
require.Equal(t, 0, numKeys(db))
require.NoError(t, db.Close())
// Ensure that value log is correctly replayed.
db2, err := Open(opts)
require.NoError(t, err)
require.Equal(t, 0, numKeys(db2))
require.NoError(t, db2.Close())
}
func TestDropPrefixWithPendingTxn(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
defer func() {
require.NoError(t, db.Close())
}()
N := uint64(10000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
txn := db.NewTransaction(true)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
var keys []string
for {
var count int
for itr.Rewind(); itr.Valid(); itr.Next() {
count++
item := itr.Item()
keys = append(keys, string(item.KeyCopy(nil)))
_, err := item.ValueCopy(nil)
if err != nil {
t.Logf("Got error during value copy: %v", err)
return
}
}
t.Logf("Got number of keys: %d\n", count)
for _, key := range keys {
item, err := txn.Get([]byte(key))
if err != nil {
t.Logf("Got error during key lookup: %v", err)
return
}
if _, err := item.ValueCopy(nil); err != nil {
t.Logf("Got error during second value copy: %v", err)
return
}
}
}
}()
// Do not cancel txn.
go func() {
defer wg.Done()
time.Sleep(2 * time.Second)
require.NoError(t, db.DropPrefix([]byte("key0")))
require.NoError(t, db.DropPrefix([]byte("key00")))
require.NoError(t, db.DropPrefix([]byte("key")))
}()
wg.Wait()
}
func TestDropPrefixReadOnly(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.ValueLogFileSize = 5 << 20
db, err := Open(opts)
require.NoError(t, err)
N := uint64(1000)
populate := func(db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
populate(db)
require.Equal(t, int(N), numKeys(db))
require.NoError(t, db.Close())
opts.ReadOnly = true
db2, err := Open(opts)
// acquireDirectoryLock returns ErrWindowsNotSupported on Windows. It can be ignored safely.
if runtime.GOOS == "windows" {
require.Equal(t, err, ErrWindowsNotSupported)
} else {
require.NoError(t, err)
require.Panics(t, func() { _ = db2.DropPrefix([]byte("key0")) })
require.NoError(t, db2.Close())
}
}
func TestDropPrefixRace(t *testing.T) {
dir, err := os.MkdirTemp("", "badger-test")
require.NoError(t, err)
defer removeDir(dir)
opts := getTestOptions(dir)
opts.managedTxns = true
db, err := Open(opts)
require.NoError(t, err)
N := 10000
// Start a goroutine to keep trying to write to DB while DropPrefix happens.
closer := z.NewCloser(1)
go func() {
defer closer.Done()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
i := N + 1 // Writes would happen above N.
var errors atomic.Int32
for {
select {
case <-ticker.C:
i++
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
if err := txn.CommitAt(uint64(i), func(err error) {
if err != nil {
errors.Add(1)
}
}); err != nil {
errors.Add(1)
}
case <-closer.HasBeenClosed():
// The following causes a data race.
// t.Logf("i: %d. Number of (expected) write errors: %d.\n", i, errors)
return
}
}
}()
var wg sync.WaitGroup
for i := 1; i <= N; i++ {
wg.Add(1)
txn := db.NewTransactionAt(math.MaxUint64, true)
require.NoError(t, txn.SetEntry(NewEntry([]byte(key("key", i)), val(false))))
require.NoError(t, txn.CommitAt(uint64(i), func(err error) {
require.NoError(t, err)
wg.Done()
}))
}
wg.Wait()
before := numKeysManaged(db, math.MaxUint64)
require.True(t, before > N)
require.NoError(t, db.DropPrefix([]byte("key00")))
require.NoError(t, db.DropPrefix([]byte("key1")))
require.NoError(t, db.DropPrefix([]byte("key")))
closer.SignalAndWait()
after := numKeysManaged(db, math.MaxUint64)
t.Logf("Before: %d. After dropprefix: %d\n", before, after)
require.True(t, after < before)
require.NoError(t, db.Close())
}
func TestWriteBatchManagedMode(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.BaseTableSize = 1 << 20 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatchAt(1)
defer wb.Cancel()
N, M := 50000, 1000
start := time.Now()
for i := 0; i < N; i++ {
require.NoError(t, wb.Set(key(i), val(i)))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.Delete(key(i)))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))
err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
func TestWriteBatchManaged(t *testing.T) {
key := func(i int) []byte {
return []byte(fmt.Sprintf("%10d", i))
}
val := func(i int) []byte {
return []byte(fmt.Sprintf("%128d", i))
}
opt := DefaultOptions("")
opt.managedTxns = true
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()
N, M := 50000, 1000
start := time.Now()
for i := 0; i < N; i++ {
require.NoError(t, wb.SetEntryAt(&Entry{Key: key(i), Value: val(i)}, 1))
}
for i := 0; i < M; i++ {
require.NoError(t, wb.DeleteAt(key(i), 2))
}
require.NoError(t, wb.Flush())
t.Logf("Time taken for %d writes (w/ test options): %s\n", N+M, time.Since(start))
err := db.View(func(txn *Txn) error {
itr := txn.NewIterator(DefaultIteratorOptions)
defer itr.Close()
i := M
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, string(key(i)), string(item.Key()))
require.Equal(t, item.Version(), uint64(1))
valcopy, err := item.ValueCopy(nil)
require.NoError(t, err)
require.Equal(t, val(i), valcopy)
i++
}
require.Equal(t, N, i)
return nil
})
require.NoError(t, err)
})
}
func TestWriteBatchDuplicate(t *testing.T) {
N := 10
k := []byte("key")
v := []byte("val")
readVerify := func(t *testing.T, db *DB, n int, versions []int) {
err := db.View(func(txn *Txn) error {
iopt := DefaultIteratorOptions
iopt.AllVersions = true
itr := txn.NewIterator(iopt)
defer itr.Close()
i := 0
for itr.Rewind(); itr.Valid(); itr.Next() {
item := itr.Item()
require.Equal(t, k, item.Key())
require.Equal(t, uint64(versions[i]), item.Version())
err := item.Value(func(val []byte) error {
require.Equal(t, v, val)
return nil
})
require.NoError(t, err)
i++
}
require.Equal(t, n, i)
return nil
})
require.NoError(t, err)
}
t.Run("writebatch", func(t *testing.T) {
opt := DefaultOptions("")
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatch()
defer wb.Cancel()
for i := uint64(0); i < uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntry(&Entry{Key: k, Value: v}))
}
require.NoError(t, wb.Flush())
readVerify(t, db, 1, []int{1})
})
})
t.Run("writebatch at", func(t *testing.T) {
opt := DefaultOptions("")
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
opt.managedTxns = true
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewWriteBatchAt(10)
defer wb.Cancel()
for i := uint64(0); i < uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntry(&Entry{Key: k, Value: v}))
}
require.NoError(t, wb.Flush())
readVerify(t, db, 1, []int{10})
})
})
t.Run("managed writebatch", func(t *testing.T) {
opt := DefaultOptions("")
opt.managedTxns = true
opt.BaseTableSize = 1 << 15 // This would create multiple transactions in write batch.
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
wb := db.NewManagedWriteBatch()
defer wb.Cancel()
for i := uint64(1); i <= uint64(N); i++ {
// Multiple versions of the same key.
require.NoError(t, wb.SetEntryAt(&Entry{Key: k, Value: v}, i))
}
require.NoError(t, wb.Flush())
readVerify(t, db, N, []int{10, 9, 8, 7, 6, 5, 4, 3, 2, 1})
})
})
}
func TestZeroDiscardStats(t *testing.T) {
N := uint64(10000)
populate := func(t *testing.T, db *DB) {
writer := db.NewWriteBatch()
for i := uint64(0); i < N; i++ {
require.NoError(t, writer.Set([]byte(key("key", int(i))), val(true)))
}
require.NoError(t, writer.Flush())
}
t.Run("after rewrite", func(t *testing.T) {
opts := getTestOptions("")
opts.ValueLogFileSize = 5 << 20
opts.ValueThreshold = 1 << 10
opts.MemTableSize = 1 << 15
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
populate(t, db)
require.Equal(t, int(N), numKeys(db))
fids := db.vlog.sortedFids()
for _, fid := range fids {
db.vlog.discardStats.Update(fid, 1)
}
// Ensure we have some valid fids.
require.True(t, len(fids) > 2)
fid := fids[0]
require.NoError(t, db.vlog.rewrite(db.vlog.filesMap[fid]))
// All data should still be present.
require.Equal(t, int(N), numKeys(db))
db.vlog.discardStats.Iterate(func(id, val uint64) {
// Vlog with id=fid has been re-written, it's discard stats should be zero.
if uint32(id) == fid {
require.Zero(t, val)
}
})
})
})
t.Run("after dropall", func(t *testing.T) {
opts := getTestOptions("")
opts.ValueLogFileSize = 5 << 20
runBadgerTest(t, &opts, func(t *testing.T, db *DB) {
populate(t, db)
require.Equal(t, int(N), numKeys(db))
// Fill discard stats. Normally these are filled by compaction.
fids := db.vlog.sortedFids()
for _, fid := range fids {
db.vlog.discardStats.Update(fid, 1)
}
db.vlog.discardStats.Iterate(func(id, val uint64) { require.NotZero(t, val) })
require.NoError(t, db.DropAll())
require.Equal(t, 0, numKeys(db))
// We've deleted everything. DS should be zero.
db.vlog.discardStats.Iterate(func(id, val uint64) { require.Zero(t, val) })
})
})
}
Go
1
https://gitee.com/mirrors/Badger.git
git@gitee.com:mirrors/Badger.git
mirrors
Badger
Badger
main

搜索帮助