Change CDBTransaction to compare keys by their serialized form

...instead of comparing by using the keys < operator.
There are 2 reasons for this:
1. This better mimics the behavior of CDBWrapper. Otherwise there is
   a chance of discrepancy when it comes to key equality.
2. Next commit will introduce CDBTransactionIterator, which relies on
   CDBTransaction and CDBWrapper being compatible in the way keys are
   compared.
This commit is contained in:
Alexander Block 2019-04-04 09:58:13 +02:00
parent 5482083eba
commit 6d1599bc68

View File

@ -395,162 +395,111 @@ protected:
Parent &parent;
CommitTarget &commitTarget;
struct KeyHolder {
virtual ~KeyHolder() = default;
virtual bool Less(const KeyHolder &b) const = 0;
virtual void Erase(CommitTarget &commitTarget) = 0;
};
typedef std::unique_ptr<KeyHolder> KeyHolderPtr;
template <typename K>
struct KeyHolderImpl : KeyHolder {
KeyHolderImpl(const K &_key)
: key(_key) {
struct DataStreamCmp {
static bool less(const CDataStream& a, const CDataStream& b) {
return std::lexicographical_compare(a.begin(), a.end(), b.begin(), b.end());
}
virtual bool Less(const KeyHolder &b) const {
auto *b2 = dynamic_cast<const KeyHolderImpl<K>*>(&b);
return key < b2->key;
bool operator()(const CDataStream& a, const CDataStream& b) const {
return less(a, b);
}
virtual void Erase(CommitTarget &commitTarget) {
commitTarget.Erase(key);
}
K key;
};
struct KeyValueHolder {
virtual ~KeyValueHolder() = default;
virtual void Write(CommitTarget &parent) = 0;
struct ValueHolder {
virtual ~ValueHolder() = default;
virtual void Write(const CDataStream& ssKey, CommitTarget &parent) = 0;
};
typedef std::unique_ptr<KeyValueHolder> KeyValueHolderPtr;
typedef std::unique_ptr<ValueHolder> ValueHolderPtr;
template <typename K, typename V>
struct KeyValueHolderImpl : KeyValueHolder {
KeyValueHolderImpl(const KeyHolderImpl<K> &_key, const V &_value)
: key(_key),
value(_value) { }
KeyValueHolderImpl(const KeyHolderImpl<K> &_key, V &&_value)
: key(_key),
value(std::forward<V>(_value)) { }
virtual void Write(CommitTarget &commitTarget) {
template <typename V>
struct ValueHolderImpl : ValueHolder {
ValueHolderImpl(const V &_value) : value(_value) { }
virtual void Write(const CDataStream& ssKey, CommitTarget &commitTarget) {
// we're moving the value instead of copying it. This means that Write() can only be called once per
// KeyValueHolderImpl instance. Commit() clears the write maps, so this ok.
commitTarget.Write(key.key, std::move(value));
// ValueHolderImpl instance. Commit() clears the write maps, so this ok.
commitTarget.Write(ssKey, std::move(value));
}
const KeyHolderImpl<K> &key;
V value;
};
struct keyCmp {
bool operator()(const KeyHolderPtr &a, const KeyHolderPtr &b) const {
return a->Less(*b);
}
};
typedef std::map<KeyHolderPtr, KeyValueHolderPtr, keyCmp> KeyValueMap;
typedef std::map<std::type_index, KeyValueMap> TypeKeyValueMap;
TypeKeyValueMap writes;
TypeKeyValueMap deletes;
template <typename K>
KeyValueMap *getMapForType(TypeKeyValueMap &m, bool create) {
auto it = m.find(typeid(K));
if (it != m.end()) {
return &it->second;
}
if (!create)
return nullptr;
auto it2 = m.emplace(typeid(K), KeyValueMap());
return &it2.first->second;
template<typename K>
static CDataStream KeyToDataStream(const K& key) {
CDataStream ssKey(SER_DISK, CLIENT_VERSION);
ssKey.reserve(DBWRAPPER_PREALLOC_KEY_SIZE);
ssKey << key;
return ssKey;
}
template <typename K>
KeyValueMap *getWritesMap(bool create) {
return getMapForType<K>(writes, create);
}
typedef std::map<CDataStream, ValueHolderPtr, DataStreamCmp> WritesMap;
typedef std::set<CDataStream, DataStreamCmp> DeletesSet;
template <typename K>
KeyValueMap *getDeletesMap(bool create) {
return getMapForType<K>(deletes, create);
}
template <typename K, typename KV>
void writeImpl(KeyHolderImpl<K>* k, KV&& kv) {
auto k2 = KeyHolderPtr(k);
KeyValueMap *ds = getDeletesMap<K>(false);
if (ds)
ds->erase(k2);
KeyValueMap *ws = getWritesMap<K>(true);
ws->erase(k2);
ws->emplace(std::make_pair(std::move(k2), std::forward<KV>(kv)));
}
WritesMap writes;
DeletesSet deletes;
public:
CDBTransaction(Parent &_parent, CommitTarget &_commitTarget) : parent(_parent), commitTarget(_commitTarget) {}
template <typename K, typename V>
void Write(const K& key, const V& v) {
auto k = new KeyHolderImpl<K>(key);
auto kv = std::make_unique<KeyValueHolderImpl<K, V>>(*k, v);
writeImpl(k, std::move(kv));
Write(KeyToDataStream(key), v);
}
template <typename K, typename V>
void Write(const K& key, V&& v) {
auto k = new KeyHolderImpl<K>(key);
auto kv = std::make_unique<KeyValueHolderImpl<K, typename std::remove_reference<V>::type>>(*k, std::forward<V>(v));
writeImpl(k, std::move(kv));
template <typename V>
void Write(const CDataStream& ssKey, const V& v) {
deletes.erase(ssKey);
auto it = writes.emplace(ssKey, nullptr).first;
it->second = std::make_unique<ValueHolderImpl<V>>(v);
}
template <typename K, typename V>
bool Read(const K& key, V& value) {
KeyHolderPtr k(new KeyHolderImpl<K>(key));
return Read(KeyToDataStream(key), value);
}
KeyValueMap *ds = getDeletesMap<K>(false);
if (ds && ds->count(k))
template <typename V>
bool Read(const CDataStream& ssKey, V& value) {
if (deletes.count(ssKey)) {
return false;
KeyValueMap *ws = getWritesMap<K>(false);
if (ws) {
auto it = ws->find(k);
if (it != ws->end()) {
auto *impl = dynamic_cast<KeyValueHolderImpl<K, V> *>(it->second.get());
if (!impl)
return false;
value = impl->value;
return true;
}
}
return parent.Read(key, value);
auto it = writes.find(ssKey);
if (it != writes.end()) {
auto *impl = dynamic_cast<ValueHolderImpl<V> *>(it->second.get());
if (!impl) {
throw std::runtime_error("Read called with V != previously written type");
}
value = impl->value;
return true;
}
return parent.Read(ssKey, value);
}
template <typename K>
bool Exists(const K& key) {
KeyHolderPtr k(new KeyHolderImpl<K>(key));
return Exists(KeyToDataStream(key));
}
KeyValueMap *ds = getDeletesMap<K>(false);
if (ds && ds->count(k))
bool Exists(const CDataStream& ssKey) {
if (deletes.count(ssKey)) {
return false;
}
KeyValueMap *ws = getWritesMap<K>(false);
if (ws && ws->count(k))
if (writes.count(ssKey)) {
return true;
}
return parent.Exists(key);
return parent.Exists(ssKey);
}
template <typename K>
void Erase(const K& key) {
KeyHolderPtr k(new KeyHolderImpl<K>(key));
return Erase(KeyToDataStream(key));
}
KeyValueMap *ws = getWritesMap<K>(false);
if (ws)
ws->erase(k);
KeyValueMap *ds = getDeletesMap<K>(true);
ds->emplace(std::move(k), nullptr);
void Erase(const CDataStream& ssKey) {
writes.erase(ssKey);
deletes.emplace(ssKey);
}
void Clear() {
@ -559,15 +508,11 @@ public:
}
void Commit() {
for (auto &p : deletes) {
for (auto &p2 : p.second) {
p2.first->Erase(commitTarget);
}
for (const auto &k : deletes) {
commitTarget.Erase(k);
}
for (auto &p : writes) {
for (auto &p2 : p.second) {
p2.second->Write(commitTarget);
}
p.second->Write(p.first, commitTarget);
}
Clear();
}