609 lines
17 KiB
JavaScript
609 lines
17 KiB
JavaScript
"use strict";
|
|
|
|
const {
|
|
convertSize,
|
|
max,
|
|
min
|
|
} = require("./utils");
|
|
|
|
const Randomizer = (data, replacement) => {
|
|
var mutable = [...data];
|
|
if (replacement === undefined || typeof replacement !== "boolean") replacement = true;
|
|
|
|
function _next() {
|
|
var selection;
|
|
const index = Math.floor(Math.random() * mutable.length);
|
|
|
|
if (replacement) {
|
|
selection = mutable.slice(index, index + 1)[0];
|
|
} else {
|
|
selection = mutable.splice(index, 1)[0];
|
|
if (mutable.length === 0) mutable = [...data];
|
|
}
|
|
|
|
return selection;
|
|
}
|
|
|
|
return {
|
|
next: _next
|
|
};
|
|
};
|
|
|
|
const Result = (type) => {
|
|
var _result;
|
|
|
|
switch (type) {
|
|
case "stats":
|
|
_result = {
|
|
size: 0,
|
|
lines: 0,
|
|
records: 0,
|
|
errors: [],
|
|
blanks: 0,
|
|
created: undefined,
|
|
modified: undefined,
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: 0
|
|
};
|
|
break;
|
|
case "distribute":
|
|
_result = {
|
|
stores: undefined,
|
|
records: 0,
|
|
errors: [],
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: undefined
|
|
};
|
|
break;
|
|
case "insert":
|
|
_result = {
|
|
inserted: 0,
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: 0
|
|
};
|
|
break;
|
|
case "insertFile":
|
|
_result = {
|
|
lines: 0,
|
|
inserted: 0,
|
|
errors: [],
|
|
blanks: 0,
|
|
start: Date.now(),
|
|
end: undefined
|
|
};
|
|
break;
|
|
case "select":
|
|
_result = {
|
|
lines: 0,
|
|
selected: 0,
|
|
ignored: 0,
|
|
errors: [],
|
|
blanks: 0,
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: 0,
|
|
data: [],
|
|
};
|
|
break;
|
|
case "update":
|
|
_result = {
|
|
lines: 0,
|
|
selected: 0,
|
|
updated: 0,
|
|
unchanged: 0,
|
|
errors: [],
|
|
blanks: 0,
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: 0,
|
|
data: [],
|
|
records: []
|
|
};
|
|
break;
|
|
case "delete":
|
|
_result = {
|
|
lines: 0,
|
|
deleted: 0,
|
|
retained: 0,
|
|
errors: [],
|
|
blanks: 0,
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: 0,
|
|
data: [],
|
|
records: []
|
|
};
|
|
break;
|
|
case "aggregate":
|
|
_result = {
|
|
lines: 0,
|
|
aggregates: {},
|
|
indexed: 0,
|
|
unindexed: 0,
|
|
errors: [],
|
|
blanks: 0,
|
|
start: Date.now(),
|
|
end: undefined,
|
|
elapsed: 0
|
|
};
|
|
break;
|
|
}
|
|
|
|
return _result;
|
|
}
|
|
|
|
const Reduce = (type) => {
|
|
var _reduce;
|
|
|
|
switch (type) {
|
|
case "stats":
|
|
_reduce = Object.assign(Result("stats"), {
|
|
stores: 0,
|
|
min: undefined,
|
|
max: undefined,
|
|
mean: undefined,
|
|
var: undefined,
|
|
std: undefined,
|
|
m2: 0
|
|
});
|
|
break;
|
|
case "drop":
|
|
_reduce = {
|
|
dropped: false,
|
|
start: Date.now(),
|
|
end: 0,
|
|
elapsed: 0
|
|
};
|
|
break;
|
|
case "aggregate":
|
|
_reduce = Object.assign(Result("aggregate"), {
|
|
data: []
|
|
});
|
|
break;
|
|
default:
|
|
_reduce = Result(type);
|
|
break;
|
|
}
|
|
|
|
_reduce.details = undefined;
|
|
|
|
return _reduce;
|
|
};
|
|
|
|
const Handler = (type, ...functions) => {
|
|
var _results = Result(type);
|
|
|
|
const _next = (record, writer) => {
|
|
record = new Record(record);
|
|
_results.lines++;
|
|
|
|
if (record.length === 0) {
|
|
_results.blanks++;
|
|
} else {
|
|
if (record.data) {
|
|
switch (type) {
|
|
case "stats":
|
|
statsHandler(record, _results);
|
|
break;
|
|
case "select":
|
|
selectHandler(record, functions[0], functions[1], _results);
|
|
break;
|
|
case "update":
|
|
updateHandler(record, functions[0], functions[1], writer, _results);
|
|
break;
|
|
case "delete":
|
|
deleteHandler(record, functions[0], writer, _results);
|
|
break;
|
|
case "aggregate":
|
|
aggregateHandler(record, functions[0], functions[1], functions[2], _results);
|
|
break;
|
|
}
|
|
} else {
|
|
_results.errors.push({ error: record.error, line: _results.lines, data: record.source });
|
|
|
|
if (type === "update" || type === "delete") {
|
|
if (writer) {
|
|
writer.write(record.source + "\n");
|
|
} else {
|
|
_results.data.push(record.source);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
const _return = () => {
|
|
_results.end = Date.now();
|
|
_results.elapsed = _results.end - _results.start;
|
|
return _results;
|
|
}
|
|
|
|
return {
|
|
next: _next,
|
|
return: _return
|
|
};
|
|
};
|
|
|
|
const statsHandler = (record, results) => {
|
|
results.records++;
|
|
return results;
|
|
};
|
|
|
|
const selectHandler = (record, selecter, projecter, results) => {
|
|
if (record.select(selecter)) {
|
|
if (projecter) {
|
|
results.data.push(record.project(projecter));
|
|
} else {
|
|
results.data.push(record.data);
|
|
}
|
|
results.selected++;
|
|
} else {
|
|
results.ignored++;
|
|
}
|
|
};
|
|
|
|
const updateHandler = (record, selecter, updater, writer, results) => {
|
|
if (record.select(selecter)) {
|
|
results.selected++;
|
|
if (record.update(updater)) {
|
|
results.updated++;
|
|
results.records.push(record.data);
|
|
} else {
|
|
results.unchanged++;
|
|
}
|
|
} else {
|
|
results.unchanged++;
|
|
}
|
|
|
|
if (writer) {
|
|
writer.write(JSON.stringify(record.data) + "\n");
|
|
} else {
|
|
results.data.push(JSON.stringify(record.data));
|
|
}
|
|
};
|
|
|
|
const deleteHandler = (record, selecter, writer, results) => {
|
|
if (record.select(selecter)) {
|
|
results.deleted++;
|
|
results.records.push(record.data);
|
|
} else {
|
|
results.retained++;
|
|
|
|
if (writer) {
|
|
writer.write(JSON.stringify(record.data) + "\n");
|
|
} else {
|
|
results.data.push(JSON.stringify(record.data));
|
|
}
|
|
}
|
|
};
|
|
|
|
const aggregateHandler = (record, selecter, indexer, projecter, results) => {
|
|
if (record.select(selecter)) {
|
|
const index = record.index(indexer);
|
|
|
|
if (!index) {
|
|
results.unindexed++;
|
|
} else {
|
|
var projection;
|
|
var fields;
|
|
|
|
if (results.aggregates[index]) {
|
|
results.aggregates[index].count++;
|
|
} else {
|
|
results.aggregates[index] = {
|
|
count: 1,
|
|
aggregates: {}
|
|
};
|
|
}
|
|
|
|
if (projecter) {
|
|
projection = record.project(projecter);
|
|
fields = Object.keys(projection);
|
|
} else {
|
|
projection = record.data;
|
|
fields = Object.keys(record.data);
|
|
}
|
|
|
|
for (const field of fields) {
|
|
if (projection[field] !== undefined) {
|
|
if (results.aggregates[index].aggregates[field]) {
|
|
accumulateAggregate(results.aggregates[index].aggregates[field], projection[field]);
|
|
} else {
|
|
results.aggregates[index].aggregates[field] = {
|
|
min: projection[field],
|
|
max: projection[field],
|
|
count: 1
|
|
};
|
|
if (typeof projection[field] === "number") {
|
|
results.aggregates[index].aggregates[field]["sum"] = projection[field];
|
|
results.aggregates[index].aggregates[field]["mean"] = projection[field];
|
|
results.aggregates[index].aggregates[field]["m2"] = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
results.indexed++;
|
|
}
|
|
}
|
|
}
|
|
|
|
const accumulateAggregate = (index, projection) => {
|
|
index["min"] = min(index["min"], projection);
|
|
index["max"] = max(index["max"], projection);
|
|
index["count"]++;
|
|
|
|
// Welford's algorithm
|
|
if (typeof projection === "number") {
|
|
const delta1 = projection - index["mean"];
|
|
index["sum"] += projection;
|
|
index["mean"] += delta1 / index["count"];
|
|
const delta2 = projection - index["mean"];
|
|
index["m2"] += delta1 * delta2;
|
|
}
|
|
|
|
return index;
|
|
};
|
|
|
|
class Record {
|
|
constructor(record) {
|
|
this.source = record.trim();
|
|
this.length = this.source.length
|
|
this.data = {};
|
|
this.error = "";
|
|
|
|
try {
|
|
this.data = JSON.parse(this.source)
|
|
} catch (e) {
|
|
this.data = undefined;
|
|
this.error = e.message;
|
|
}
|
|
}
|
|
}
|
|
|
|
Record.prototype.select = function (selecter) {
|
|
var result;
|
|
|
|
try {
|
|
result = selecter(this.data);
|
|
} catch {
|
|
return false;
|
|
}
|
|
|
|
if (typeof result !== "boolean") {
|
|
throw new TypeError("Selecter must return a boolean");
|
|
} else {
|
|
return result;
|
|
}
|
|
};
|
|
|
|
Record.prototype.update = function (updater) {
|
|
var result;
|
|
|
|
try {
|
|
result = updater(this.data);
|
|
} catch {
|
|
return false;
|
|
}
|
|
|
|
if (typeof result !== "object") {
|
|
throw new TypeError("Updater must return an object");
|
|
} else {
|
|
this.data = result;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
Record.prototype.project = function (projecter) {
|
|
var result;
|
|
|
|
try {
|
|
result = projecter(this.data);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
|
|
if (Array.isArray(result) || typeof result !== "object") {
|
|
throw new TypeError("Projecter must return an object");
|
|
} else {
|
|
return result;
|
|
}
|
|
};
|
|
|
|
Record.prototype.index = function (indexer) {
|
|
try {
|
|
return indexer(this.data);
|
|
} catch {
|
|
return undefined;
|
|
}
|
|
};
|
|
|
|
const Reducer = (type, results) => {
|
|
var _reduce = Reduce(type);
|
|
|
|
var i = 0;
|
|
var aggregates = {};
|
|
|
|
for (const result of results) {
|
|
switch (type) {
|
|
case "stats":
|
|
statsReducer(_reduce, result, i);
|
|
break;
|
|
case "insert":
|
|
insertReducer(_reduce, result);
|
|
break;
|
|
case "select":
|
|
selectReducer(_reduce, result);
|
|
break;
|
|
case "update":
|
|
updateReducer(_reduce, result);
|
|
break;
|
|
case "delete":
|
|
deleteReducer(_reduce, result);
|
|
break;
|
|
case "aggregate":
|
|
aggregateReducer(_reduce, result, aggregates);
|
|
break
|
|
}
|
|
|
|
if (type === "stats") {
|
|
_reduce.stores++;
|
|
i++;
|
|
}
|
|
|
|
if (type === "drop") {
|
|
_reduce.dropped = true;
|
|
} else if (type !== "insert") {
|
|
_reduce.lines += result.lines;
|
|
_reduce.errors = _reduce.errors.concat(result.errors);
|
|
_reduce.blanks += result.blanks;
|
|
}
|
|
|
|
_reduce.start = min(_reduce.start, result.start);
|
|
_reduce.end = max(_reduce.end, result.end);
|
|
}
|
|
|
|
if (type === "stats") {
|
|
_reduce.size = convertSize(_reduce.size);
|
|
_reduce.var = _reduce.m2 / (results.length);
|
|
_reduce.std = Math.sqrt(_reduce.m2 / (results.length));
|
|
delete _reduce.m2;
|
|
} else if (type === "aggregate") {
|
|
for (const index of Object.keys(aggregates)) {
|
|
var aggregate = {
|
|
index: index,
|
|
count: aggregates[index].count,
|
|
aggregates: []
|
|
};
|
|
for (const field of Object.keys(aggregates[index].aggregates)) {
|
|
delete aggregates[index].aggregates[field].m2;
|
|
aggregate.aggregates.push({ field: field, data: aggregates[index].aggregates[field] });
|
|
}
|
|
_reduce.data.push(aggregate);
|
|
}
|
|
delete _reduce.aggregates;
|
|
}
|
|
|
|
_reduce.elapsed = _reduce.end - _reduce.start;
|
|
_reduce.details = results;
|
|
|
|
return _reduce;
|
|
};
|
|
|
|
const statsReducer = (reduce, result, i) => {
|
|
reduce.size += result.size;
|
|
reduce.records += result.records;
|
|
reduce.min = min(reduce.min, result.records);
|
|
reduce.max = max(reduce.max, result.records);
|
|
if (reduce.mean === undefined) reduce.mean = result.records;
|
|
const delta1 = result.records - reduce.mean;
|
|
reduce.mean += delta1 / (i + 2);
|
|
const delta2 = result.records - reduce.mean;
|
|
reduce.m2 += delta1 * delta2;
|
|
reduce.created = min(reduce.created, result.created);
|
|
reduce.modified = max(reduce.modified, result.modified);
|
|
};
|
|
|
|
const insertReducer = (reduce, result) => {
|
|
reduce.inserted += result.inserted;
|
|
};
|
|
|
|
const selectReducer = (reduce, result) => {
|
|
reduce.selected += result.selected;
|
|
reduce.ignored += result.ignored;
|
|
reduce.data = reduce.data.concat(result.data);
|
|
delete result.data;
|
|
};
|
|
|
|
const updateReducer = (reduce, result) => {
|
|
reduce.selected += result.selected;
|
|
reduce.updated += result.updated;
|
|
reduce.unchanged += result.unchanged;
|
|
};
|
|
|
|
const deleteReducer = (reduce, result) => {
|
|
reduce.deleted += result.deleted;
|
|
reduce.retained += result.retained;
|
|
};
|
|
|
|
const aggregateReducer = (reduce, result, aggregates) => {
|
|
reduce.indexed += result.indexed;
|
|
reduce.unindexed += result.unindexed;
|
|
|
|
const indexes = Object.keys(result.aggregates);
|
|
|
|
for (const index of indexes) {
|
|
if (aggregates[index]) {
|
|
aggregates[index].count += result.aggregates[index].count;
|
|
} else {
|
|
aggregates[index] = {
|
|
count: result.aggregates[index].count,
|
|
aggregates: {}
|
|
};
|
|
}
|
|
|
|
const fields = Object.keys(result.aggregates[index].aggregates);
|
|
|
|
for (const field of fields) {
|
|
const aggregateObject = aggregates[index].aggregates[field];
|
|
const resultObject = result.aggregates[index].aggregates[field];
|
|
|
|
if (aggregateObject) {
|
|
reduceAggregate(aggregateObject, resultObject);
|
|
} else {
|
|
aggregates[index].aggregates[field] = {
|
|
min: resultObject["min"],
|
|
max: resultObject["max"],
|
|
count: resultObject["count"]
|
|
};
|
|
|
|
if (resultObject["m2"] !== undefined) {
|
|
aggregates[index].aggregates[field]["sum"] = resultObject["sum"];
|
|
aggregates[index].aggregates[field]["mean"] = resultObject["mean"];
|
|
aggregates[index].aggregates[field]["varp"] = resultObject["m2"] / resultObject["count"];
|
|
aggregates[index].aggregates[field]["vars"] = resultObject["m2"] / (resultObject["count"] - 1);
|
|
aggregates[index].aggregates[field]["stdp"] = Math.sqrt(resultObject["m2"] / resultObject["count"]);
|
|
aggregates[index].aggregates[field]["stds"] = Math.sqrt(resultObject["m2"] / (resultObject["count"] - 1));
|
|
aggregates[index].aggregates[field]["m2"] = resultObject["m2"];
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
delete result.aggregates;
|
|
};
|
|
|
|
const reduceAggregate = (aggregate, result) => {
|
|
const n = aggregate["count"] + result["count"];
|
|
|
|
aggregate["min"] = min(aggregate["min"], result["min"]);
|
|
aggregate["max"] = max(aggregate["max"], result["max"]);
|
|
|
|
// Parallel version of Welford's algorithm
|
|
if (result["m2"] !== undefined) {
|
|
const delta = result["mean"] - aggregate["mean"];
|
|
const m2 = aggregate["m2"] + result["m2"] + (Math.pow(delta, 2) * ((aggregate["count"] * result["count"]) / n));
|
|
aggregate["m2"] = m2;
|
|
aggregate["varp"] = m2 / n;
|
|
aggregate["vars"] = m2 / (n - 1);
|
|
aggregate["stdp"] = Math.sqrt(m2 / n);
|
|
aggregate["stds"] = Math.sqrt(m2 / (n - 1));
|
|
}
|
|
|
|
if (result["sum"] !== undefined) {
|
|
aggregate["mean"] = (aggregate["sum"] + result["sum"]) / n;
|
|
aggregate["sum"] += result["sum"];
|
|
}
|
|
|
|
aggregate["count"] = n;
|
|
};
|
|
|
|
exports.Randomizer = Randomizer;
|
|
exports.Result = Result;
|
|
exports.Reduce = Reduce;
|
|
exports.Handler = Handler;
|
|
exports.Reducer = Reducer;
|