-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathforeach.js
More file actions
86 lines (83 loc) · 2.5 KB
/
foreach.js
File metadata and controls
86 lines (83 loc) · 2.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import { axios } from '@pipedream/platform';
export default defineComponent({
name: "Foreach",
description: "Runs a sub-workflow for each value of an array",
key: "foreach",
version: "0.1.0",
type: "action",
props: {
records: {
type: "any",
label: "Records to loop",
description: "The array of records to send to processing workflow",
},
workflowUrl: {
type: "string",
label: "Processing workflow URL",
description: "The HTTP endpoint to connect to that's processing single individual records from this workflow"
},
apiToken: {
type: "string",
label: "API Bearer token",
description: "Will be added as a Bearer token in the Authorization header in calls to the workflow URL. Should match the processing workflow.",
secret: true
},
enableConcurrency: {
type: "boolean",
label: "Enable Concurrency",
description: "Whether to send requests concurrently or one at a time. (Default: false)",
default: false,
},
batchSize: {
type: "integer",
label: "Batch Size",
description: "The size of each batch before waiting.",
optional: true,
},
batchInterval: {
type: "integer",
label: "Batch Interval (ms)",
description: "The time, in milliseconds, to wait between each batch.",
optional: true,
},
},
methods: {
async delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
},
async run({ $ }) {
if (typeof this.records == "string") {
this.records = JSON.parse(this.records)
}
if (!Array.isArray(this.records)) {
this.records = [this.records]
}
if (!this.enableConcurrency && !(this.batchSize && this.batchInterval)) {
throw new Error("Batch Size and Interval are required if not waiting for results.")
}
const results = [];
for (let i = 0; i < this.records.length; i++) {
const resp = axios($, {
url: this.workflowUrl,
method: 'POST',
data: this.records[i],
headers: {Authorization: `Bearer ${this.apiToken}`}
})
if (this.enableConcurrency) {
results.push(resp)
if (i > 0 && i % this.batchSize == 0) {
const now = Date.now();
await Promise.allSettled(results);
const elapsed = Date.now() - now;
await this.delay(this.batchInterval - elapsed);
}
} else {
results.push(await resp)
}
}
if (!this.enableConcurrency) {
return results
}
},
})