diff --git a/cds/container/wf_hashtable.h b/cds/container/wf_hashtable.h new file mode 100644 index 000000000..532b48835 --- /dev/null +++ b/cds/container/wf_hashtable.h @@ -0,0 +1,534 @@ +#ifndef CDSLIB_CONTAINER_EAWfPAD_HASHTABLE_H +#define CDSLIB_CONTAINER_EAWfPAD_HASHTABLE_H + +#include +#include +#include + +#define INITIAL_BOUND 5 +#define RESIZE_FACTOR 1.5 +#define LOAD_FACTOR 0.75 + +namespace cds { + namespace container { + + /* + Efficient Almost wait-free Parallel Accessible Dynamic Hashtables + Gao, Groote, Hesselink (2003) + */ + + template + class WfHashtable + { + protected: + typedef enum { EMPTY, DEL, VALUE, OLDV } eType; + + template + struct EValue { + + private: + eType type; + int address; + T* value; + + public: + EValue() { + type = EMPTY; + address = 0; + value = NULL; + } + + EValue(int address, T* value) { + this->type = VALUE; + this->address = address; + this->value = value; + } + + // ----------- Setters ----------- + + void setValue(int address, T* value) { + this->address = address; + this->value = value; + } + + void setDel() { + type = DEL; + } + + void setOld() { + type = OLDV; + } + + void setDone() { + type = OLDV; + value = NULL; + } + + // ----------- Getters ----------- + + int ADR() { + return address; + } + + T* val() { + if (type == DEL) { + return NULL; + } + else { + return value; + } + } + + bool empty() { + return type == EMPTY; + } + + bool del() { + return type == DEL; + } + + bool oldp() { + return type == OLDV; + } + + bool done() { + return oldp() && val() == NULL; + } + }; + + struct Hashtable { + int size; // size of the hashtable + int occ; // number of occupied positions in the table + int dels; // number of deleted positions + int bound; // the maximal number of places that can be occupied before refreshing the table + std::atomic*>* table; + + Hashtable(int size, int bound) { + this->size = size; + this->bound = bound; + occ = 0; + dels = 0; + table = new std::atomic*>[size]; + for (int i = 0; i(); + } + } + ~Hashtable() { + for (int i = 0; i* H; // 1..2P + std::atomic currInd; // 1..2P = index of the currently valid hashtable + std::atomic* busy; // 1..2P = number of processes that are using a hashtable + std::atomic* next; // 1..2P = next hashtable to which the contents of hashtable H[i] is being copied + std::atomic* prot; // 1..2P = is used to guard the variables busy[i], next[i] and H[i] + // against being reused for a new table, before all processes have discarded these + + std::atomic numProc; // number of processes currently using hashtable + std::atomic consist; + + public: + template + class WfHashtableProcess { + protected: + WfHashtable * wh; + int index; // 1..2P = index of the hashtable currently used by the process + + public: + + WfHashtableProcess(WfHashtable* wh) { + this->wh = wh; + getAccess(); + } + + ~WfHashtableProcess() { + releaseAccess(index); + wh->numProc--; + } + + // ----------- HASHTABLE METHODS ----------- + + T* find(int a) { + EValue r; + int n, l, k; + Hashtable* h; + + check_consistency(); + h = wh->H[index]; + n = 0; + l = h->size; + + do { + k = key(a, l, n); + r = *h->table[k].load(); + if (r.done()) { + refresh(); + h = wh->H[index]; + l = h->size; + } + else { + n++; + } + + } while (!r.empty() && a != r.ADR()); + + return r.val(); + } + + bool del(int a) + { + EValue* r; + int k, l, n; + Hashtable* h; + bool suc; + + check_consistency(); + h = wh->H[index]; + suc = false; + l = h->size; + n = 0; + + do { + k = key(a, l, n); + r = h->table[k].load(); + if (r->oldp()) { + refresh(); + h = wh->H[index]; + l = h->size; + n = 0; + } + else if (!r->empty() && r->ADR() == a) { + EValue* newValue = new EValue(a, NULL); + newValue->setDel(); + suc = std::atomic_compare_exchange_strong(&h->table[k], &r, newValue); + if (suc) delete r; + else delete newValue; + } + else { + n++; + } + + } while (!(suc || r->empty())); + if (suc) { + h->dels++; + } + return suc; + } + + bool insert(int a, T* v) { + EValue* r; + int k, l, n; + Hashtable* h=0; + bool suc; + check_consistency(); + h = wh->H[index]; + if (h->occ > h->bound) { + newTable(); + h = wh->H[index]; + } + n = 0; l = h->size; suc = false; + do { + k = key(a, l, n); + r = h->table[k].load(); + + if (r->oldp()) { + refresh(); + h = wh->H[index]; + n = 0; l = h->size; + } + else { + if (r->empty()) { + EValue* newValue = new EValue(a, v); + suc = std::atomic_compare_exchange_strong(&h->table[k], &r, newValue); + if (suc) delete r; + else delete newValue; + } + else { + n++; + } + } + } while (!(suc || a == r->ADR())); + if (suc) { + h->occ++; + } + return suc; + } + + void assign(int a, T* v) { + EValue* r; + int k, l, n; + Hashtable* h; + bool suc; + + check_consistency(); + h = wh->H[index]; + if (h->occ > h->bound) { + newTable(); + h = wh->H[index]; + } + n = 0; l = h->size; suc = false; + do { + k = key(a, l, n); + r = h->table[k].load(); + if (r->oldp()) { + refresh(); + h = wh->H[index]; + n = 0; l = h->size; + } + else { + if (r->empty() || a == r->ADR()) { + EValue* newValue = new EValue(a, v); + suc = std::atomic_compare_exchange_strong(&h->table[k], &r, newValue); + if (suc) { delete r; r = NULL; } + else delete newValue; + } + else { + n++; + } + } + } while (!(suc)); + if (r == NULL || r->val() == NULL) { + h->occ++; + } + } + + protected: + + // ----------- ACCESS METHODS ----------- + + void getAccess() { + while (true) { + index = wh->currInd; + wh->prot[index] ++; + if (index == wh->currInd) { + wh->busy[index]++; + if (index == wh->currInd) return; + else releaseAccess(index); + } + else { + wh->prot[index]--; + } + } + } + + void releaseAccess(int i) { + Hashtable* h = wh->H[i]; + wh->busy[i]--; + if (h != NULL && wh->busy[i] == 0) { + Hashtable* null_ptr = NULL; + if (std::atomic_compare_exchange_strong(&wh->H[i], &h, null_ptr)) { + delete h; + } + } + wh->prot[i]--; + } + + void check_consistency() { + bool inconsistent = false; + if (std::atomic_compare_exchange_strong(&wh->consist, &inconsistent, false)) { + throw std::logic_error("Container is inconsistent"); + } + } + + // ----------- HASHING METHODS ----------- + + static unsigned int hash(unsigned int x) { + x = ((x >> 16) ^ x) * 0x45d9f3b; + x = ((x >> 16) ^ x) * 0x45d9f3b; + x = (x >> 16) ^ x; + return x; + } + + static int key(int a, int l, int n) { + return (hash(a) + n) % l; + } + + // ----------- MIGRATION METHODS ----------- + + void allocate(int i, int s, int b) { + Hashtable* tmp = new Hashtable(s, b); + std::atomic_exchange(&wh->H[i], tmp); + if (wh->H[i] != tmp) delete tmp; + } + + void newTable() { + int i; + bool b, bb; + while (wh->next[index] == 0) { + i = rand() % (2 * wh->P); + int tmp = 0; + b = std::atomic_compare_exchange_strong(&wh->prot[i], &tmp, 1); + if (b) { + wh->busy[i] = 1; + int bound = (int)(((wh->H[index].load())->bound - (wh->H[index].load())->dels + 2 * wh->P) * RESIZE_FACTOR); + int size = (int)((bound + 2 * wh->P) / LOAD_FACTOR); + allocate(i, size, bound); + wh->next[i] = 0; + bb = std::atomic_compare_exchange_strong(&wh->next[index], &tmp, i); + if (!bb) releaseAccess(i); + } + } + refresh(); + } + + void migrate() { + int i; + Hashtable* h; + i = wh->next[index]; + wh->prot[i]++; + if (index != wh->currInd) { + wh->prot[i]--; + } + else { + wh->busy[i]++; + h = wh->H[i]; + if (index == wh->currInd) { + moveContents(wh->H[index], h); + int temp=index; + if (std::atomic_compare_exchange_strong(&wh->currInd, &temp, i)) { + wh->busy[index]--; + wh->prot[index]--; + } + } + releaseAccess(i); + } + } + + void refresh() { + if (index != wh->currInd) { + releaseAccess(index); + getAccess(); + } + else { + migrate(); + } + } + + void moveContents(Hashtable* from, Hashtable* to) { + int i; + EValue v; + int* toBeMoved; + toBeMoved = new int[from->size]; + for (int j = 0; jsize; ++j) { + toBeMoved[j] = j; + } + int toBeMovedSize = from->size; + while (wh->currInd == index && toBeMovedSize > 0) { + i = toBeMoved[toBeMovedSize - 1]; + EValue* v = from->table[i]; + if (from->table[i].load()->done()) { + // more efficient stratery could be implemented + toBeMovedSize--; + } + else { + EValue* tmp = new EValue(v->ADR(), v->val()); + tmp->setOld(); + if (std::atomic_compare_exchange_strong(&from->table[i], &v, tmp)) { + if (v->val() != NULL) moveElement(v->ADR(), v->val(), to); + delete v; + from->table[i].load()->setDone(); + toBeMovedSize--; + } + else delete tmp; + } + } + delete toBeMoved; + } + + void moveElement(int a, T* v, Hashtable* to) { + int k, m, n; + EValue* w; + bool b; + + n = 0; + b = false; + m = to->size; + EValue temp(a, NULL); + do { + k = key(a, m, n); + w = to->table[k]; + if (w->val() == NULL) { + EValue* newValue = new EValue(a, v); + b = std::atomic_compare_exchange_strong(&to->table[k], &w, newValue); + if (b) delete w; + else delete newValue; + } + else { + n++; + } + + } while (!(b || a == w->ADR() || wh->currInd != index)); + if (b) to->occ++; + } + }; + + // ----------- WfHashtable API ----------- + + typedef WfHashtableProcess process; + + WfHashtable(int P, int size, int bound) { + this->P = P; + H = new std::atomic[2 * P]; + busy = new std::atomic[2 * P]; + prot = new std::atomic[2 * P]; + next = new std::atomic[2 * P]; + for (int i = 0; i<2 * P; ++i) { + H[i] = NULL; + busy[i] = 0; + prot[i] = 0; + next[i] = 0; + } + + numProc = 0; + currInd = 0; + H[currInd] = new Hashtable(size, bound); + consist = true; + } + + WfHashtable(int P) : WfHashtable(P, INITIAL_BOUND + 2 * P + 1, INITIAL_BOUND) {} + + ~WfHashtable() { + std::atomic_store(&consist, false); + for (int i = 0; i<2 * P; ++i) { + delete H[i]; + } + delete[] H; + delete[] busy; + delete[] prot; + delete[] next; + } + + WfHashtableProcess* getProcess() { + while (true) { + int curNumProc = numProc; + if (numProc < P) { + if (std::atomic_compare_exchange_strong(&numProc, &curNumProc, curNumProc + 1)) { + WfHashtableProcess* process = new WfHashtableProcess(this); + return process; + } + } + else { + throw std::logic_error("Max number of processes exceeded"); + } + } + } + + int size() { + int result = 0; + Hashtable* cur = H[currInd]; + for (int i = 0; isize; ++i) { + if (!cur->table[i].load()->empty() && !cur->table[i].load()->del()) result++; + } + return result; + } + + }; + } +} + +#endif // #ifndef CDSLIB_CONTAINER_EAWfPAD_HASHTABLE_H diff --git a/test/unit/map/CMakeLists.txt b/test/unit/map/CMakeLists.txt index 35d5d4218..a973efa26 100644 --- a/test/unit/map/CMakeLists.txt +++ b/test/unit/map/CMakeLists.txt @@ -122,7 +122,15 @@ add_executable(${UNIT_MAP_SPLIT_LAZY} ${UNIT_MAP_SPLIT_LAZY_SOURCES}) target_link_libraries(${UNIT_MAP_SPLIT_LAZY} ${CDS_TEST_LIBRARIES}) add_test(NAME ${UNIT_MAP_SPLIT_LAZY} COMMAND ${UNIT_MAP_SPLIT_LAZY} WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH}) - +# WF_Hashtable unit test +set(UNIT_MAP_WF_HASHTABLE unit-map-wf-hashtable) +set(UNIT_MAP_WF_HASHTABLE_SOURCES + ../main.cpp + test_wf_hashtable.cpp +) +add_executable(${UNIT_MAP_WF_HASHTABLE} ${UNIT_MAP_WF_HASHTABLE_SOURCES }) +target_link_libraries(${UNIT_MAP_WF_HASHTABLE} ${CDS_TEST_LIBRARIES}) +add_test(NAME ${UNIT_MAP_WF_HASHTABLE} COMMAND ${UNIT_MAP_WF_HASHTABLE} WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH}) add_custom_target( unit-map DEPENDS @@ -134,4 +142,5 @@ add_custom_target( unit-map ${UNIT_MAP_SPLIT_MICHAEL} ${UNIT_MAP_SPLIT_ITERABLE} ${UNIT_MAP_SPLIT_LAZY} + ${UNIT_MAP_WF_HASHTABLE} ) diff --git a/test/unit/map/wf_hashtable.cpp b/test/unit/map/wf_hashtable.cpp new file mode 100644 index 000000000..879c1d4c9 --- /dev/null +++ b/test/unit/map/wf_hashtable.cpp @@ -0,0 +1,103 @@ +#include +#include + +#include +#include + +namespace { + + namespace cc = cds::container; + typedef cc::WfHashtable::process* Proc; + + class WfHashtableTest : public ::testing::Test + { + protected: + void SetUp() + { + table = new cc::WfHashtable(10); + } + + void TearDown() + { + delete table; + } + + cc::WfHashtable* table; + }; + + TEST_F(WfHashtableTest, process_constructor_destructor) + { + Proc p = table->getProcess(); + delete p; + + for (int i = 0; i < 9; ++i) { + p = table->getProcess(); + } + + ASSERT_NO_THROW(p = table->getProcess()); + + ASSERT_THROW(p = table->getProcess(), std::logic_error); + } + + TEST_F(WfHashtableTest, insert_assign_find_delete) + { + Proc p = table->getProcess(); + + double* v = new double[10000]; + for (int i = 0; i<10000; ++i) { + v[i] = (double)i; + p->insert(i, &v[i]); + } + + ASSERT_EQ(10000, table->size()); + + ASSERT_EQ(5000.0, *p->find(5000)); + + double newValue = 5.0; + p->assign(5000, &newValue); + ASSERT_EQ(5.0, *p->find(5000)); + + p->del(5000); + ASSERT_EQ(NULL, p->find(5000)); + + for (int i = 0; i < 10000; ++i) { + p->del(i); + } + ASSERT_EQ(0, table->size()); + + delete v; + } + + TEST_F(WfHashtableTest, null_value) + { + Proc p = table->getProcess(); + p->insert(100, NULL); + p->insert(200, NULL); + p->insert(300, NULL); + ASSERT_EQ(3, table->size()); + + double* value = p->find(200); + ASSERT_EQ(NULL, value); + + p->del(200); + p->del(300); + ASSERT_EQ(1, table->size()); + + } + + TEST(TestObjectValue, read_write) + { + cc::WfHashtable* table = new cc::WfHashtable(10); + cc::WfHashtable::process* p = table->getProcess(); + + std::string str1("hello libcds!"); + p->insert(2017, &str1); + std::string* str_ref = p->find(2017); + ASSERT_EQ(0, str_ref->compare("hello libcds!")); + + str1.append(" happy new year!"); + str_ref = p->find(2017); + ASSERT_EQ(0, str_ref->compare("hello libcds! happy new year!")); + } + +} // namespace