Skip to content
11 changes: 9 additions & 2 deletions lib/Desktop-File-Interface/desktopFileInterface.c
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
#if defined(_WIN32) || defined(_WIN64)
#include <io.h>
#else
#include <unistd.h>
#endif

#include "desktopFileInterface.h"

typedef struct {
Expand Down Expand Up @@ -159,12 +165,13 @@ char *tempFilePath(void) {

#else
/* POSIX systems */
snprintf(tempPathBuffer, sizeof(tempPathBuffer),
"/tmp/embeddb_%luXXXXXX", (unsigned long)rand());
snprintf(tempPathBuffer, sizeof(tempPathBuffer), "/tmp/embeddb_XXXXXX");

int fd = mkstemp(tempPathBuffer);
if (fd >= 0) {
close(fd);
} else {
perror("mkstemp failed");
}
#endif

Expand Down
3 changes: 2 additions & 1 deletion platformio.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
;
; Please visit documentation for the other options and examples
; https://docs.platformio.org/page/projectconf.html

[env:desktop]
platform = native
test_build_src = true
test_framework = unity
build_src_filter =
+<**/*.c>
+<**/*.cpp>
Expand All @@ -28,6 +28,7 @@ build_type = debug
[env:desktop-dist]
platform = native
test_build_src = true
test_framework = unity
build_src_filter =
+<**/*.c>
+<**/*.cpp>
Expand Down
39 changes: 34 additions & 5 deletions src/benchmarks/sortBenchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@
#endif

void insertData(embedDBState* state, const char* filename);
void insertNValues(embedDBState* state, int32_t n) {
int32_t key = 0, value = 0;

for (int32_t i = 0; i < n; i++) {
if (i % 10 == 0) {
value = 0;
}
embedDBPut(state, &key, &value);
key++;
value++;
}
}
void sort_order_last(int32_t numValues, embedDBState* stateUWA, embedDBSchema* baseSchema);
void sort_order_first(int32_t numValues, embedDBState* stateUWA, embedDBSchema* baseSchema);

Expand All @@ -67,7 +79,7 @@ int sortQueryBenchmark() {
stateUWA->eraseSizeInPages = 4;
stateUWA->numDataPages = 20000;
stateUWA->numIndexPages = 1000;
stateUWA->numSplinePoints = 30;
stateUWA->numSplinePoints = 120;

if (STORAGE_TYPE == 1) {
printf("Dataflash is not currently supported. Defaulting to SD card interface.");
Expand All @@ -92,16 +104,24 @@ int sortQueryBenchmark() {
return -1;
}

stateUWA->rules = NULL;
stateUWA->numRules = 0;

int8_t colSizes[] = {4, 4, 4, 4};
int8_t colSignedness[] = {embedDB_COLUMN_UNSIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED, embedDB_COLUMN_SIGNED};
ColumnType colTypes[] = {embedDB_COLUMN_UINT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32, embedDB_COLUMN_INT32};
embedDBSchema* baseSchema = embedDBCreateSchema(4, colSizes, colSignedness, colTypes);

// Insert data
const char datafileName[] = "data/uwa500K.bin";
insertData(stateUWA, datafileName);
// insertData(stateUWA, datafileName);
insertNValues(stateUWA, 500000);

#ifdef ARDUINO
uint32_t start_time, end_time;
#else
struct timespec start_time, end_time;
#endif
int32_t num_values[] = {100, 1000, 10000, 100000, 500000};

printf("\nProjection followed by Sort\n");
Expand All @@ -111,13 +131,22 @@ int sortQueryBenchmark() {

// Repeat each run for consistency
for (int j = 0; j < 1; j++) {
#ifdef ARDUINO
start_time = clock();
#else
clock_gettime(CLOCK_MONOTONIC, &start_time);
#endif

sort_order_last(num_values[i], stateUWA, baseSchema);

#ifdef ARDUINO
end_time = clock();
uint32_t elapsed_time = end_time - start_time;
printf("\tElapsed time: %u ms\n", elapsed_time);
#else
clock_gettime(CLOCK_MONOTONIC, &end_time);
double elapsed_ms = time_diff_ms(start_time, end_time);
printf("\tElapsed time: %.3f ms\n", elapsed_ms);
double elapsed_time = time_diff_ms(start_time, end_time);
printf("\tElapsed time: %.3f ms\n", elapsed_time);
#endif
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/desktopMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
#elif WHICH_PROGRAM == 3
#include "benchmarks/queryInterfaceBenchmark.h"
#elif WHICH_PROGRAM == 4
#include "benchmarks/activeRulesBenchmark.h"
#include "benchmarks/sortBenchmark.h"
#elif WHICH_PROGRAM == 5
#include "benchmarks/activeRulesBenchmark.h"
#endif

int main() {
Expand Down
4 changes: 4 additions & 0 deletions src/dueMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ static ArduinoOutStream cout(Serial);
#include "benchmarks/variableDataBenchmark.h"
#elif WHICH_PROGRAM == 3
#include "benchmarks/queryInterfaceBenchmark.h"
#elif WHICH_PROGRAM == 4
#include "benchmarks/sortBenchmark.h"
#endif

#define ENABLE_DEDICATED_SPI 1
Expand Down Expand Up @@ -109,6 +111,8 @@ void setup() {
test_vardata();
#elif WHICH_PROGRAM == 3
advancedQueryExample();
#elif WHICH_PROGRAM == 4
sortQueryBenchmark();
#endif
}

Expand Down
8 changes: 4 additions & 4 deletions src/query-interface/activeRules.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void executeRules(embedDBState* state, void* key, void* data) {
case GET_CUSTOM:
handleCustomQuery(state, state->rules[i], key, data);
break;
default:
default:;
#ifdef PRINT_ERRORS
printf("ERROR: Unsupported rule type\n");
#endif
Expand Down Expand Up @@ -177,7 +177,7 @@ embedDBOperator* createOperator(embedDBState* state, activeRule* rule, void*** a
case GET_MIN:
aggFunc = createMinAggregate(rule->colNum, rule->schema->columnSizes[rule->colNum]);
break;
default:
default:;
#ifdef PRINT_ERRORS
printf("ERROR: Unsupported rule type\n");
#endif
Expand Down Expand Up @@ -223,7 +223,7 @@ void executeComparison(activeRule* rule, void* aggregateValue, Comparator compar
case NotEqual:
if (comparisonResult != 0) rule->callback(aggregateValue, data, rule->context);
break;
default:
default:;
#ifdef PRINT_ERRORS
printf("ERROR: Unsupported operation\n");
#endif
Expand Down Expand Up @@ -266,7 +266,7 @@ void handleCustomQuery(embedDBState* state, activeRule* rule, void* key, void* d
case DBDOUBLE:
executeComparison(rule, result, doubleComparator, data);
break;
default:
default:;
#ifdef PRINT_ERRORS
printf("ERROR: Unsupported return type\n");
#endif
Expand Down
82 changes: 33 additions & 49 deletions src/query-interface/sort/adaptive_sort.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@

// #define DEBUG 1
// #define DEBUG_OUTPUT 1
// #define DEBUG_READ 1
// #define DEBUG_READ 1
// #define DEBUG_HEAP 0
// #define ADAPTIVE_SORT_PRINT
// #define ADAPTIVE_SORT_PRINT_FINISH
// #define ADAPTIVE_SORT_PRINT_FINISH
#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) || defined(DEBUG_HEAP) || defined(ADAPTIVE_SORT_PRINT) || defined(ADAPTIVE_SORT_PRINT_FINISH)
#include "debug_print.h"
#else
Expand Down Expand Up @@ -141,14 +141,16 @@ int adaptive_sort(
int16_t i, status;
int32_t numSublist = 0;
void* addr;
int8_t startNewSublist = 0;
int32_t numShiftOutOutput = 0, numShiftIntoOutput = 0, numShiftOtherBlock = 0;
int16_t cpuPenalty = 3;

/* Distribution estimation variables */
int16_t avgDistinct = 0; /* Average # of distinct values per run. Multiplied by 10 so can do integer rather than float operations. */
/* Note: Could be int8_t as larger than 255 is above cutoff for using MinSort. */
uint8_t numDistinctInRun = 0; /* Number of distinct values in current run */
uint16_t numDistinctInRun = 0; /* Number of distinct values in current run */

int optimistic = false;
int optimistic = true;
if (optimistic) {
// Do FLASH MinSort init first
#ifdef DEBUG
Expand All @@ -165,17 +167,17 @@ int adaptive_sort(
avgDistinct = 16;

int16_t numPasses = (int)ceil(log(es->num_pages / bufferSizeInBlocks) / log(bufferSizeInBlocks));
int32_t nobSortCost = numPasses * (10 + writeToReadRatio) / 10;
int32_t nobSortCost = (numPasses * (10 + writeToReadRatio)) / 10;

#ifdef DEBUG
#ifdef ADAPTIVE_SORT_PRINT
debug_log("Adaptive calculation.\n");
debug_log("NOB sort cost. # runs: %d", numSublist);
debug_log(" # passes: %d cost: %d\n", numPasses, nobSortCost);
debug_log("MinSort cost. Num sublists: %d ", numSublist);
debug_log(" Avg. distinct/sublist: %d\n", avgDistinct / 10);
#endif

if (avgDistinct < nobSortCost)
if (avgDistinct * cpuPenalty < nobSortCost)
// if (true)
{
#ifdef DEBUG
Expand Down Expand Up @@ -368,6 +370,12 @@ int adaptive_sort(
sublistSize, outputCount, heapSize, listSize, recordsLeft);
#endif

if (startNewSublist) {
outputCount = 0;
haveOutputKey = 0;
sublistSize = 0;
startNewSublist = 0;
}
// Read in page
addr = buffer + es->headerSize;
for (i = 0; i < tuplesPerPage; i++) {
Expand Down Expand Up @@ -430,41 +438,6 @@ int adaptive_sort(

// Swap output records into output buffer from heap if smaller than records currently there. (I/O block is id zero)
for (i = 0; i < tuplesPerPage; i++) {
/*
* HEAP-EMPTY START NEW RUN TRANSITION
*/
if (heapSize == 0) {
if (listSize > 0) {
// Finish current run and start a new one
numSublist++;
metric->num_runs++;

sublistSize = 0;
outputCount = 0;
haveOutputKey = 0;

#ifdef DEBUG
debug_log("DEBUG: Heap empty → starting new run, promoting list (%d records)\n",
listSize);
#endif

// Promote frozen list to heap
for (int32_t k = listSize - 1; k >= 0; k--) {
shiftUp_rev(buffer + heapStartOffset,
buffer + es->page_size + k * es->record_size,
heapSize, es, metric);
heapSize++;
}
listSize = 0;

// Restart filling the output page for the new run
i = -1;
continue;
} else {
// No heap, no list → nothing left to output
break;
}
}
#ifdef DEBUG
if (i < 3 || i == tuplesPerPage - 1) { // Only log first 3 and last iteration
debug_log(" Inner loop i=%d: recordsRead=%d, outputCount=%d, recordsLeft=%d, heapSize=%d\n",
Expand All @@ -473,6 +446,16 @@ int adaptive_sort(
#endif
// Check if we've read all records from the current page
if (recordsRead == 0 || i >= recordsRead) {
heapVal = buffer + heapStartOffset;
if (haveOutputKey && heapSize > 0 &&
es->compare_fcn(heapVal + es->key_offset,
lastOutputKey + es->key_offset) < 0) {
numSublist++;
metric->num_runs++;
numDistinctInRun = 1;
startNewSublist = 1;
break;
}
// Check if there are any records left
if (recordsLeft <= 0) {
break;
Expand Down Expand Up @@ -540,7 +523,7 @@ int adaptive_sort(
memcpy(buffer + es->headerSize + i * es->record_size, buffer + heapStartOffset, es->record_size); /* Heap into input/output block */
metric->num_memcpys += 2;
// Determine if the value is different than the last one to estimate the number of distinct values
if (numDistinctInRun < 255 && haveOutputKey) {
if (haveOutputKey) {
// Value is different
metric->num_compar++;
if (es->compare_fcn(lastOutputKey + es->key_offset, inputVal + es->key_offset) < 0)
Expand Down Expand Up @@ -659,7 +642,6 @@ int adaptive_sort(
#endif
numDistinctInRun = 0;
} /* end pessmistic */

#ifdef DEBUG
debug_log("\n=== REPLACEMENT SELECTION COMPLETE ===\n");
debug_log("Number of sublists created: %d\n", numSublist);
Expand All @@ -674,8 +656,8 @@ int adaptive_sort(
uint32_t blockIdx = *((uint32_t*)buffer);
uint16_t count = *((uint16_t*)(buffer + BLOCK_COUNT_OFFSET));

debug_log("Block %d: blockIdx=%u, count=%u, first 10 values:", debugBlock, blockIdx, count);
for (int v = 0; v < count && v < 10; v++) {
debug_log("Block %d: blockIdx=%u, count=%u,", debugBlock, blockIdx, count);
for (int v = 0; v < count && v < 63; v++) {
debug_log(" %d", *(int32_t*)(buffer + es->headerSize + v * es->record_size + es->key_offset));
}
if (count > 10) debug_log(" ...");
Expand Down Expand Up @@ -707,18 +689,20 @@ int adaptive_sort(

int16_t numPasses = (int)ceil(log(numSublist) / log(bufferSizeInBlocks));
int32_t nobSortCost = numPasses * (10 + writeToReadRatio) / 10;
int32_t minSortCost = avgDistinct / 10 * cpuPenalty;

#ifdef ADAPTIVE_SORT_PRINT
debug_log("Adaptive calculation.\n");
debug_log("NOB sort cost. # runs: %d", numSublist);
debug_log(" # passes: %d cost: %d\n", numPasses, nobSortCost);
debug_log("MinSort cost. Num sublists: %d ", numSublist);
debug_log(" Avg. distinct/sublist: %d\n", avgDistinct / 10);

debug_log("Sublist version possible: %d\n", sublistVersionPossible);
debug_log("Buffer size bytes: %d Sort key size: %d\n", bufferSizeBytes, SORT_KEY_SIZE);
#endif

// Make decision to use either no output buffer sort or MinSort
if (avgDistinct / 10 < nobSortCost) {
if (minSortCost < nobSortCost) {
/* */
/* MinSort */
/* */
Expand Down Expand Up @@ -1479,7 +1463,7 @@ int adaptive_sort(
/* end of run */
}

if (record2[0] != -1) { /* Tuples in output block to write out */
if (record2[0] > 0) { /* Tuples in output block to write out */
// fseek(outputFile, lastWritePos, SEEK_SET);
// if (0 == fwrite(buffer + OUTPUT_BLOCK_ID * es->page_size, (size_t)es->page_size, 1, outputFile))
// { /* File write error - arduino prints 1st value nmemb times if nmemb != 1 */
Expand Down
8 changes: 5 additions & 3 deletions src/query-interface/sort/flash_minsort.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ This is no output sort with block headers and iterator input. Heap used when mov
// #define DEBUG 1
// #define DEBUG_OUTPUT 1
// #define DEBUG_READ 1
#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ)
// #define MINSORT_END 1
#if defined(DEBUG) || defined(DEBUG_OUTPUT) || defined(DEBUG_READ) || defined(MINSORT_END)
#include "debug_print.h"
#else
#ifndef debug_log
Expand Down Expand Up @@ -502,8 +503,9 @@ int flash_minsort(
clock_t end = clock();
#endif

#ifdef DEBUG
debug_log("Complete. Comparisons: %d MemCopies: %d\n", metric->num_compar, metric->num_memcpys);
#ifdef MINSORT_END
debug_log("Complete. Comparisons: %d Reads: %d Writes: %d MemCopies: %d\n", metric->num_compar, metric->num_reads,
metric->num_writes, metric->num_memcpys);
#endif

return 0; // Successful completion
Expand Down
Loading
Loading