Skip to main content

Processing Data in Parallel

When you have a large set of records to process, you may want to process data in parallel to speed up the computation. This tutorial shows how to process data in parallel by splitting the data into sizable chunks, and simultaneously processing each chunk.

For this example, we'll fetch a "large" dataset from the internet - here we'll pull down 500 "comment" records from the JSONPlaceholder API: https://jsonplaceholder.typicode.com/comments

Example Integration

Split the data into manageable chunks

Once we've fetched our data, we can use the Collection Tools component's Chunks action to split the data into manageable chunks. Here, we split our 500 records into 10 groups of 50 records each.

Configuring the Chunks action to split the data into 10 groups of 50 records each.

If your data is not evenly divisible by the number of elements you specify, the last chunk will contain the remaining elements. For example, if you have 108 records, and split them into chunks of 25, you'll get 4 chunks of 25 records, and 1 chunk of 8 records.

If we open our chunks action's results, we can see 10 groups of 50 records each.

The results of the Chunks action, showing 10 groups of 50 records each.

Loop over each chunk

Next, add a Loop Over Items action to your integration and configure it to loop over the chunks you generated.

Configuring the Loop Over Items action to loop over the chunks generated in the previous step.

Send each chunk to a sibling flow

We need to send each chunk of records to a sibling flow. To do that, we'll use cross-flow invocations.

We'll add a Invoke Flow step to our integration, and select a sibling flow to send our chunk to.

So, first add a sibling flow that has a Cross-Flow Trigger.

Then, select the sibling flow for your Invoke Flow step's Flow Name input. Reference the Loop Over Item's currentItem property for the Data input of the Invoke Flow action - that'll represent the current chunk of records and will configure the step to send the chunk of records to the sibling flow.

Invoke a sibling flow by sending the current chunk of records to the sibling flow's webhook URL.

If we open the Process Records flow after running a test of our parent flow, we can see that Process Records was invoked ten times, and each invocation received a chunk of 50 records.

The Process Records flow was invoked 10 times, and each invocation received a chunk of 50 records.
What makes this parallel?

By default, Prismatic executions are asynchronous, meaning that our main flow will not wait for an invocation of the Process Records flow to complete before beginning the next invocation. This allows us to process multiple chunks of records simultaneously, effectively processing data in parallel.

Process each chunk in the sibling flow

Now that records are being sent to the sibling flow, we can process each chunk of records in parallel. Your integration will require some business logic and will reach out to some APIs to fetch or update records. For this example, we'll simply capitalize the body of each comment

The Process Records flow was invoked 10 times, and each invocation received a chunk of 50 records.

(Optional) Aggregate the results

If your integration is mono-directional (i.e. it pulls data from a source and sends the data to a destination), you may not need to aggregate the results. But, if your integration is bi-directional or if you need to aggregate the results for any other reason, you can fetch the results of the parallelized invocations using the execution IDs that your HTTP POST action returned.

If you look at the step results of the Loop Over Items action, you'll see that it returns an array of execution IDs from the HTTP POST action.

The Loop Over Items action returns an array of execution IDs from the HTTP POST action.

We can loop over these execution IDs and fetch the results of each invocation. To do that, we'll:

  • Loop over the execution IDs
    • Loop up to 10 times using the Loop N Times action
    • Check if the execution is finished by querying the Prismatic API
      • If it's finished, fetch the step results of a step in the sibling flow. Break the inner loop.
      • If it's not finished, sleep for a few seconds and check again

Fetch step results from the Prismatic API

We can use the Prismatic component's Raw GraphQL Request action to query the Prismatic API for the step results of a step in the sibling flow.

query myGetExecutionResults($executionId: ID!, $stepName: String!) {
executionResult(id: $executionId) {
id
endedAt
stepResults(displayStepName: $stepName) {
nodes {
resultsUrl
}
}
}
}
Fetch data from the Prismatic API

If the execution is finished (indicated by whether or not endedAt has a value), we we can use the resultsUrl that we received from the Prismatic API to fetch the step results of a step in the sibling flow, and then break the loop. If the execution is not finished, we'll sleep and then check again.

Fetch results from S3

A few notes:

  • Step results are stored as binary files in S3, and are compressed using MessagePack. Since they are binary files, we need to set the GET Request action's Response Type to Binary.
  • We need to decompress the step results using the MessagePack Decompress action.
  • We can either process the step results in the loop, or we can save the results to an array of results using the Persist Data's Execution - Append Value to List action to aggregate the results into a single array. That's what we do in the example integration here.

Process the results

Finally, we can load the array of step results using an Execution - Get Value action. The results will be an array of arrays, so we can use the Collection Tools component's Flatten action to flatten the array of arrays into a single array. When we do that in our example, we get an array of 500 records, each with a capitalized body.

Flatten the array of step results into a single array.

We can then proceed to do work on each record in the array of results.

Limitations and considerations

There are several limitations and considerations to keep in mind when processing data in parallel:

Simultaneous Execution Limit. The number of concurrent executions your organization can run is determined by your pricing plan. If you try to run more than that many executions at once, you may receive a 429 Too Many Requests error, and will need to handle that in your integration.

Execution Time Limit. An execution can run for up to 15 minutes. If your execution takes longer than 15 minutes, it will be terminated. When sizing chunks of records, consider how many can be processed within 15 minutes.

Payload size limit. webhook request can be up to about 6MB in size. If a chunk of records exceeds 6MB, the chunks will need to be smaller.

Rate limits. The APIs you integrate with may have rate limits, and parallelizing requests may exceed those limits. Be sure to check the rate limits of the APIs you integrate with.

For information on Prismatic integration limits, see Integration Limits.