From 960b5babede63780bf91ce8fe8658e8c41925ecb Mon Sep 17 00:00:00 2001 From: Sam Wilkins Date: Wed, 18 Sep 2019 04:22:54 -0400 Subject: final batching expansion --- src/client/northstar/utils/Extensions.ts | 28 ++++- .../util/Import & Export/DirectoryImportBox.tsx | 2 +- src/client/util/UtilExtensions.ts | 135 +++++++++++++++++---- 3 files changed, 138 insertions(+), 27 deletions(-) (limited to 'src/client') diff --git a/src/client/northstar/utils/Extensions.ts b/src/client/northstar/utils/Extensions.ts index 65af7ea87..3722bfd52 100644 --- a/src/client/northstar/utils/Extensions.ts +++ b/src/client/northstar/utils/Extensions.ts @@ -23,7 +23,6 @@ String.prototype.Truncate = function (length: number, replacement: string): Stri interface BatchContext { completedBatches: number; remainingBatches: number; - isFullBatch: boolean; } type BatchConverterSync = (batch: I[], context: BatchContext) => O[]; type BatchHandlerSync = (batch: I[], context: BatchContext) => void; @@ -31,20 +30,43 @@ type BatchConverterAsync = (batch: I[], context: BatchContext) => Promise< type BatchHandlerAsync = (batch: I[], context: BatchContext) => Promise; type BatchConverter = BatchConverterSync | BatchConverterAsync; type BatchHandler = BatchHandlerSync | BatchHandlerAsync; -type Batcher = number | ((element: I, accumulator: A) => boolean | Promise); +type FixedBatcher = { batchSize: number } | { batchCount: number, mode?: Mode }; +interface PredicateBatcher { + executor: (element: I, accumulator: A | undefined) => A | undefined; + initial: A; +} +interface PredicateBatcherAsync { + executor: (element: I, accumulator: A | undefined) => Promise; + initial: A; +} +type Batcher = FixedBatcher | PredicateBatcher; +type BatcherAsync = Batcher | PredicateBatcherAsync; interface Array { - batch(batchSize: undefined): T[][]; + fixedBatch(batcher: FixedBatcher): T[][]; + predicateBatch(batcher: PredicateBatcher): T[][]; + predicateBatchAsync(batcher: PredicateBatcherAsync): Promise; + batch(batcher: Batcher): T[][]; + batchAsync(batcher: BatcherAsync): Promise; + batchedForEach(batcher: Batcher, handler: BatchHandlerSync): void; batchedMap(batcher: Batcher, handler: BatchConverterSync): O[]; + batchedForEachAsync(batcher: Batcher, handler: BatchHandler): Promise; batchedMapAsync(batcher: Batcher, handler: BatchConverter): Promise; + batchedForEachInterval(batcher: Batcher, handler: BatchHandler, interval: number): Promise; batchedMapInterval(batcher: Batcher, handler: BatchConverter, interval: number): Promise; + lastElement(): T; } +Array.prototype.fixedBatch = extensions.FixedBatch; +Array.prototype.predicateBatch = extensions.PredicateBatch; +Array.prototype.predicateBatchAsync = extensions.PredicateBatchAsync; Array.prototype.batch = extensions.Batch; +Array.prototype.batchAsync = extensions.BatchAsync; + Array.prototype.batchedForEach = extensions.ExecuteInBatches; Array.prototype.batchedMap = extensions.ConvertInBatches; Array.prototype.batchedForEachAsync = extensions.ExecuteInBatchesAsync; diff --git a/src/client/util/Import & Export/DirectoryImportBox.tsx b/src/client/util/Import & Export/DirectoryImportBox.tsx index a86ef9a31..e3958e3a4 100644 --- a/src/client/util/Import & Export/DirectoryImportBox.tsx +++ b/src/client/util/Import & Export/DirectoryImportBox.tsx @@ -103,7 +103,7 @@ export default class DirectoryImportBox extends React.Component runInAction(() => this.phase = `Internal: uploading ${this.quota - this.completed} files to Dash...`); - const uploads = await validated.batchedMapAsync(15, async batch => { + const uploads = await validated.batchedMapAsync({ batchSize: 15 }, async batch => { const formData = new FormData(); const parameters = { method: 'POST', body: formData }; diff --git a/src/client/util/UtilExtensions.ts b/src/client/util/UtilExtensions.ts index 792cd00a1..8bd8fd581 100644 --- a/src/client/util/UtilExtensions.ts +++ b/src/client/util/UtilExtensions.ts @@ -1,24 +1,118 @@ -module.exports.Batch = function (batchSize: number): T[][] { +module.exports.Batch = function (batcher: Batcher): T[][] { + if ("executor" in batcher) { + return this.predicateBatch(batcher); + } else { + return this.fixedBatch(batcher); + } +}; + +module.exports.BatchAsync = async function (batcher: BatcherAsync): Promise { + if ("executor" in batcher) { + return this.predicateBatchAsync(batcher); + } else { + return this.fixedBatch(batcher); + } +}; + +module.exports.PredicateBatch = function (batcher: PredicateBatcher): T[][] { + const batches: T[][] = []; + let batch: T[] = []; + const { executor, initial } = batcher; + let accumulator: A | undefined = initial; + for (let element of this) { + if ((accumulator = executor(element, accumulator)) !== undefined) { + batch.push(element); + } else { + batches.push(batch); + batch = [element]; + } + } + batches.push(batch); + return batches; +}; + +module.exports.PredicateBatchAsync = async function (batcher: PredicateBatcherAsync): Promise { + const batches: T[][] = []; + let batch: T[] = []; + const { executor, initial } = batcher; + let accumulator: A | undefined = initial; + for (let element of this) { + if ((accumulator = await executor(element, accumulator)) !== undefined) { + batch.push(element); + } else { + batches.push(batch); + batch = [element]; + } + } + batches.push(batch); + return batches; +}; + +enum Mode { + Balanced, + Even +} + +module.exports.FixedBatch = function (batcher: FixedBatcher): T[][] { const batches: T[][] = []; + const length = this.length; let i = 0; - while (i < this.length) { - const cap = Math.min(i + batchSize, this.length); - batches.push(this.slice(i, cap)); - i = cap; + if ("batchSize" in batcher) { + const { batchSize } = batcher; + while (i < this.length) { + const cap = Math.min(i + batchSize, length); + batches.push(this.slice(i, i = cap)); + } + } else if ("batchCount" in batcher) { + let { batchCount, mode } = batcher; + const resolved = mode || Mode.Balanced; + if (batchCount < 1) { + throw new Error("Batch count must be a positive integer!"); + } + if (batchCount === 1) { + return [this]; + } + if (batchCount >= this.length) { + return this.map((element: T) => [element]); + } + + let length = this.length; + let size: number; + + if (length % batchCount === 0) { + size = Math.floor(length / batchCount); + while (i < length) { + batches.push(this.slice(i, i += size)); + } + } else if (resolved === Mode.Balanced) { + while (i < length) { + size = Math.ceil((length - i) / batchCount--); + batches.push(this.slice(i, i += size)); + } + } else { + batchCount--; + size = Math.floor(length / batchCount); + if (length % size === 0) { + size--; + } + while (i < size * batchCount) { + batches.push(this.slice(i, i += size)); + } + batches.push(this.slice(size * batchCount)); + } } return batches; }; -module.exports.ExecuteBatches = function (batchSize: number, handler: BatchHandlerSync): void { +module.exports.ExecuteBatches = function (batcher: Batcher, handler: BatchHandlerSync): void { if (this.length) { let completed = 0; - const batches = this.batch(batchSize); + const batches = this.batch(batcher); const quota = batches.length; for (let batch of batches) { const context: BatchContext = { completedBatches: completed, remainingBatches: quota - completed, - isFullBatch: batch.length === batchSize }; handler(batch, context); completed++; @@ -26,19 +120,18 @@ module.exports.ExecuteBatches = function (batchSize: number, handler: BatchHa } }; -module.exports.ConvertInBatches = function (batchSize: number, handler: BatchConverterSync): O[] { +module.exports.ConvertInBatches = function (batcher: Batcher, handler: BatchConverterSync): O[] { if (!this.length) { return []; } let collector: O[] = []; let completed = 0; - const batches = this.batch(batchSize); + const batches = this.batch(batcher); const quota = batches.length; for (let batch of batches) { const context: BatchContext = { completedBatches: completed, remainingBatches: quota - completed, - isFullBatch: batch.length === batchSize }; collector.push(...handler(batch, context)); completed++; @@ -46,16 +139,15 @@ module.exports.ConvertInBatches = function (batchSize: number, handler: Ba return collector; }; -module.exports.ExecuteInBatchesAsync = async function (batchSize: number, handler: BatchHandler): Promise { +module.exports.ExecuteInBatchesAsync = async function (batcher: BatcherAsync, handler: BatchHandler): Promise { if (this.length) { let completed = 0; - const batches = this.batch(batchSize); + const batches = await this.batchAsync(batcher); const quota = batches.length; for (let batch of batches) { const context: BatchContext = { completedBatches: completed, remainingBatches: quota - completed, - isFullBatch: batch.length === batchSize }; await handler(batch, context); completed++; @@ -63,19 +155,18 @@ module.exports.ExecuteInBatchesAsync = async function (batchSize: number, han } }; -module.exports.ConvertInBatchesAsync = async function (batchSize: number, handler: BatchConverter): Promise { +module.exports.ConvertInBatchesAsync = async function (batcher: BatcherAsync, handler: BatchConverter): Promise { if (!this.length) { return []; } let collector: O[] = []; let completed = 0; - const batches = this.batch(batchSize); + const batches = await this.batchAsync(batcher); const quota = batches.length; for (let batch of batches) { const context: BatchContext = { completedBatches: completed, remainingBatches: quota - completed, - isFullBatch: batch.length === batchSize }; collector.push(...(await handler(batch, context))); completed++; @@ -83,11 +174,11 @@ module.exports.ConvertInBatchesAsync = async function (batchSize: number, return collector; }; -module.exports.ExecuteInBatchesAtInterval = async function (batchSize: number, handler: BatchHandler, interval: number): Promise { +module.exports.ExecuteInBatchesAtInterval = async function (batcher: BatcherAsync, handler: BatchHandler, interval: number): Promise { if (!this.length) { return; } - const batches = this.batch(batchSize); + const batches = await this.batchAsync(batcher); const quota = batches.length; return new Promise(async resolve => { const iterator = batches[Symbol.iterator](); @@ -100,7 +191,6 @@ module.exports.ExecuteInBatchesAtInterval = async function (batchSize: number const context: BatchContext = { completedBatches: completed, remainingBatches: quota - completed, - isFullBatch: batch.length === batchSize }; await handler(batch, context); resolve(); @@ -114,12 +204,12 @@ module.exports.ExecuteInBatchesAtInterval = async function (batchSize: number }); }; -module.exports.ConvertInBatchesAtInterval = async function (batchSize: number, handler: BatchConverter, interval: number): Promise { +module.exports.ConvertInBatchesAtInterval = async function (batcher: BatcherAsync, handler: BatchConverter, interval: number): Promise { if (!this.length) { return []; } let collector: O[] = []; - const batches = this.batch(batchSize); + const batches = await this.batchAsync(batcher); const quota = batches.length; return new Promise(async resolve => { const iterator = batches[Symbol.iterator](); @@ -132,7 +222,6 @@ module.exports.ConvertInBatchesAtInterval = async function (batchSize: num const context: BatchContext = { completedBatches: completed, remainingBatches: quota - completed, - isFullBatch: batch.length === batchSize }; collector.push(...(await handler(batch, context))); resolve(); -- cgit v1.2.3-70-g09d2