aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/client/northstar/utils/Extensions.ts10
-rw-r--r--src/client/util/UtilExtensions.ts22
2 files changed, 24 insertions, 8 deletions
diff --git a/src/client/northstar/utils/Extensions.ts b/src/client/northstar/utils/Extensions.ts
index ad3e06806..8254c775c 100644
--- a/src/client/northstar/utils/Extensions.ts
+++ b/src/client/northstar/utils/Extensions.ts
@@ -31,13 +31,19 @@ type BatchHandlerAsync<I> = (batch: I[], context: BatchContext) => Promise<void>
type BatchConverter<I, O> = BatchConverterSync<I, O> | BatchConverterAsync<I, O>;
type BatchHandler<I> = BatchHandlerSync<I> | BatchHandlerAsync<I>;
type FixedBatcher = { batchSize: number } | { batchCount: number, mode?: Mode };
+interface ExecutorResult<A> {
+ updated: A;
+ makeNextBatch: boolean;
+}
interface PredicateBatcher<I, A> {
- executor: (element: I, accumulator: A | undefined) => A | undefined;
+ executor: (element: I, accumulator: A) => ExecutorResult<A>;
initial: A;
+ persistAccumulator?: boolean;
}
interface PredicateBatcherAsync<I, A> {
- executor: (element: I, accumulator: A | undefined) => Promise<A | undefined>;
+ executor: (element: I, accumulator: A) => Promise<ExecutorResult<A>>;
initial: A;
+ persistAccumulator?: boolean;
}
type Batcher<I, A> = FixedBatcher | PredicateBatcher<I, A>;
type BatcherAsync<I, A> = Batcher<I, A> | PredicateBatcherAsync<I, A>;
diff --git a/src/client/util/UtilExtensions.ts b/src/client/util/UtilExtensions.ts
index da47fc1bc..1447e37cb 100644
--- a/src/client/util/UtilExtensions.ts
+++ b/src/client/util/UtilExtensions.ts
@@ -17,14 +17,19 @@ module.exports.BatchAsync = async function <T, A>(batcher: BatcherAsync<T, A>):
module.exports.PredicateBatch = function <T, A>(batcher: PredicateBatcher<T, A>): T[][] {
const batches: T[][] = [];
let batch: T[] = [];
- const { executor, initial } = batcher;
- let accumulator: A | undefined = initial;
+ const { executor, initial, persistAccumulator } = batcher;
+ let accumulator = initial;
for (let element of this) {
- if ((accumulator = executor(element, accumulator)) !== undefined) {
+ const { updated, makeNextBatch } = executor(element, accumulator);
+ accumulator = updated;
+ if (!makeNextBatch) {
batch.push(element);
} else {
batches.push(batch);
batch = [element];
+ if (!persistAccumulator) {
+ accumulator = initial;
+ }
}
}
batches.push(batch);
@@ -34,14 +39,19 @@ module.exports.PredicateBatch = function <T, A>(batcher: PredicateBatcher<T, A>)
module.exports.PredicateBatchAsync = async function <T, A>(batcher: PredicateBatcherAsync<T, A>): Promise<T[][]> {
const batches: T[][] = [];
let batch: T[] = [];
- const { executor, initial } = batcher;
- let accumulator: A | undefined = initial;
+ const { executor, initial, persistAccumulator } = batcher;
+ let accumulator: A = initial;
for (let element of this) {
- if ((accumulator = await executor(element, accumulator)) !== undefined) {
+ const { updated, makeNextBatch } = await executor(element, accumulator);
+ accumulator = updated;
+ if (!makeNextBatch) {
batch.push(element);
} else {
batches.push(batch);
batch = [element];
+ if (!persistAccumulator) {
+ accumulator = initial;
+ }
}
}
batches.push(batch);