Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions code/common/checkinputs.q
Original file line number Diff line number Diff line change
Expand Up @@ -226,3 +226,5 @@ checkpostback:{[dict;parameter]
checktimeout:{[dict;parameter]
checktype[-16h;dict;parameter];
:dict};

checkprocs:{[dict;parameter]:.checkinputs.checktype[-11 11h;dict;parameter];};
2 changes: 1 addition & 1 deletion code/dataaccess/dataaccessutils.q
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ readtableproperties:{[tablepropertiepath]
.lg.o[`readtableproperties;"loading table properties"];
table:`tablename`proctype xkey readcsv[tablepropertiepath;"ssssstsss"]; //read in table from file
alltable:?[table;enlist(in;`proctype;enlist`all`);0b;()]; //find any instance of the use "all" or blank for proctype
table:table,![alltable;();0b;(enlist`proctype)!enlist(enlist `hdb)],![alltable;();0b;(enlist`proctype)!enlist(enlist `rdb)]; //join rdb and hdb entries for any "all" or blank entries
table:table,raze{![x;();0b;(enlist`proctype)!enlist(enlist y)]}[alltable]each`rdb`hdb`idb; //join rdb, idb and hdb entries for any "all" or blank entries
table:![table;enlist(in;`proctype;enlist`all`);0b;`symbol$()]; //remove "all" or blank entries from table
table:?[table;$[.proc.proctype=`gateway;();enlist(=;`proctype;`.proc.proctype)];0b;()];
table:update .eodtime.datatimezone ^ datatimezone, .eodtime.rolltimeoffset ^ rolltimeoffset,.eodtime.rolltimezone^rolltimezone from table;
Expand Down
23 changes: 11 additions & 12 deletions code/gateway/dataaccess.q
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ partdict:{[input]
tabname:input[`tablename];
// Remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// Only target specified procs if defined
if[`procs in key input;servers:select from servers where servertype in ((),input`procs)];
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// Create a dictionary of the attributes against servertypes
Expand All @@ -162,6 +164,8 @@ partdict:{[input]
procdict:@[procdict;key procdict;{[x;tabname]if[99h=type x;:x[tabname]];:x}[;tabname]];
// returns the dictionary as min date/ max date
procdict:asc @[procdict;key procdict;{:(min x; max x)}];
// Let idb take precedence over rdb to prevent data duplication
if[all `rdb`idb in key procdict;procdict:delete rdb from procdict];
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is something we want to do by default in TorQ - it may be appropriate in some individual TorQ-based systems, but in general, I don't think we want to exclude all RDB data just because there is an IDB. A typical pattern we expect people to use would be having e.g. most recent hour in RDB and everything up until last write in IDB. If we only return data from IDB, we'll be missing anything received since last write, even though that data would be available in RDB.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this hardcoded removal of rdb if idb is present. As discussed if client has idb/rdb up with full day's worth of data and queries both they will get duplication but that is the correct thing to do here. They can easily dedeupe by using proc arg in input dict

// prevents overlap if more than one process contains a specified date
if[1<count procdict;
procdict:{:$[y~`date$();x;$[within[x 0;(min y;max y)];(1+max[y];x 1);x]]}':[procdict]];
Expand All @@ -177,6 +181,8 @@ adjustqueries:{[options;part]
tabname:options[`tablename];
// remove duplicate servertypes from the gw.servers
servers:select from .gw.servers where i=(first;i)fby servertype;
// only target specified procs if defined
if[`procs in key options;servers:select from servers where servertype in ((),options`procs)];
// extract the procs which have the table defined
servers:select from servers where {[x;tabname]tabname in @[x;`tables]}[;tabname] each attributes;
// create a dictionary of the attributes against servertypes
Expand All @@ -191,18 +197,11 @@ adjustqueries:{[options;part]
partitions:possparts{(min x;max x)}'[partitions];
partitions:`timestamp$partitions;

// adjust the times to account for period end time when int partitioned
c:first[partitions`hdb],-1+ first[partitions`rdb];
d:first[partitions`rdb],options `endtime;
partitions:@[@[partitions;`hdb;:;c];`rdb;:;d];

// if start/end time not a date, then adjust dates parameter for the correct types
if[not a:-12h~tp:type start:options`starttime;
// converts partitions dictionary to timestamps/datetimes
partitions:$[-15h~tp;"z"$;]{(0D+x 0;x[1]+1D-1)}'[partitions];
// convert first and last timestamp to start and end time
partitions:@[partitions;f;:;(start;partitions[f:first key partitions;1])];
partitions:@[partitions;l;:;(partitions[l:last key partitions;0];options`endtime)]];
// adjust the query times accordingly
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment is very vague - what does "accordingly" mean in this context?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

options:@[@[options;`starttime;:;"p"$options`starttime];`endtime;:;$[-14h~type et:options`endtime;-1+1D+et;"p"$et]]; //ensure st/et are timestamps; if date adjust endtime
partitions:{x[0],x[1]+1D-1}each partitions; //create dict of datetime coverage for each process
partitions:@[partitions;f;:;(st:"p"$options`starttime;min(et:"p"$options`endtime;partitions[f:first key partitions;1]))]; //amend query datetimes on hdb
partitions:@[partitions;l;:;(max(st;partitions[l:last key partitions;0]);et)]; //amend query datetimes on rdb/idb
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file uses comments above lines of code, please keep that consistent

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


// adjust map reducable aggregations to get correct components
if[(1<count partitions)&`aggregations in key options;
Expand Down
4 changes: 3 additions & 1 deletion code/processes/idb.q
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ init:{[]
.lg.o[`init;"registering IDBs on WDB process..."];
/-send sync message to WDB to register the existing IDBs.
@[w;(`.servers.registerfromdiscovery;`idb;0b);{.lg.e[`connection;"Failed to register IDB with WDB."];'x}];
/-dataaccess initialisation must be done after wdb loaded.
if[&[`dataaccess in key .proc.params;.proc.proctype=`idb];.dataaccess.init[]];
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jonathonmcmurray as discussed last Fri probably a cleaner way to do this...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to check the proctype? if loading this file, surely it's an idb, regardless of proctype?

Suggested change
if[&[`dataaccess in key .proc.params;.proc.proctype=`idb];.dataaccess.init[]];
if[`dataaccess in key .proc.params;.dataaccess.init[]];

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes of course!

.lg.o[`init; "Initialisation of the IDB is done."];
}

Expand All @@ -101,7 +103,7 @@ init:{[]
reload:.idb.intradayreload;

/-Get the relevant IDB attributes
.proc.getattributes:{`partition`tables!(.idb.currentpartition;tables[])};
.proc.getattributes:{`partition`tables!(enlist .idb.currentpartition;tables[])};

.idb.init[];

Expand Down