Skip to main content

Processing Data in Parallel

When you have a large set of records to process, you may want to process data in parallel to speed up the computation. This tutorial shows how to process data in parallel by splitting the data into sizable chunks, and simultaneously processing each chunk.

For this example, we'll fetch a "large" dataset from the internet - here we'll pull down 500 "comment" records from the JSONPlaceholder API: https://jsonplaceholder.typicode.com/comments

Example Integration

Split the data into managable chunks

Once we've fetched our data, we can use the Collection Tools component's Chunks action to split the data into managable chunks. Here, we split our 500 records into 10 groups of 50 records each.

If your data is not evenly divisible by the number of elements you specify, the last chunk will contain the remaining elements. For example, if you have 123 records, and split them into chunks of 25, you'll get 4 chunks of 25 records, and 1 chunk of 23 records.

If we open our chunks action's results, we can see 10 groups of 50 records each.

Loop over each chunk

Next, add a Loop Over Items action to your integration and configure it to loop over the chunks you generated.

Send each chunk to a sibling flow

We need to send each chunk of records to a sibling flow. We can do that by sending the chunk as an HTTP payload to the sibling flow's webhook URL using the HTTP component's POST action.

Your instance's flow's webhook URLs are available through your trigger's payload. So,

  1. Add a new flow called "Process Records"
  2. Run a test execution to see the new flow's webhook URL

Feed the webhook URL into an HTTP POST action's URL input, and for the Data input, select the Loop Over Item's currentItem property - that'll represent the current chunk of records and will configure the step to send the chunk of records to the sibling flow.

If we open the Process Records flow, we can see that 10 invocations were made, and each invocation received a chunk of 50 records.

What makes this parallel?

By default, Prismatic executions are asynchronous, meaning that our main flow will not wait for an invocation of the Process Records flow to complete before beginning the next invocation. This allows us to process multiple chunks of records simultaneously, effectively processing data in parallel.

Process each chunk in the sibling flow

Now that records are being sent to the sibling flow, we can process each chunk of records in parallel. Your integration will require some business logic and will reach out to some APIs to fetch or update records. For this example, we'll simply capitalize the body of each comment

(Optional) Aggregate the results

If your integration is mono-directional (i.e. it pulls data from a source and sends the data to a destination), you may not need to aggregate the results. But, if your integration is bi-directional or if you need to aggregate the results for any other reason, you can fetch the results of the parallelized invocations using the execution IDs that your HTTP POST action returned.

If you look at the step results of the Loop Over Items action, you'll see that it returns an array of execution IDs from the HTTP POST action.

We can loop over these execution IDs and fetch the results of each invocation. To do that, we'll:

  • Loop over the execution IDs
    • Loop up to 10 times using the Loop N Times action
    • Check if the execution is finished by querying the Prismatic API
      • If it's finished, fetch the step results of a step in the sibling flow. Break the inner loop.
      • If it's not finished, sleep for a few seconds and check again

Fetch step results from the Prismatic API

We can use the Prismatic component's Raw GraphQL Request action to query the Prismatic API for the step results of a step in the sibling flow.

query myGetExecutionResults($executionId: ID!, $stepName: String!) {
executionResult(id: $executionId) {
id
endedAt
stepResults(displayStepName: $stepName) {
nodes {
resultsUrl
}
}
}
}

If the execution is finished (indicated by whether or not endedAt has a value), we we can use the resultsUrl that we received from the Prismatic API to fetch the step results of a step in the sibling flow, and then break the loop. If the execution is not finished, we'll sleep and then check again.

A few notes:

  • Step results are stored as binary files in S3, and are compressed using MessagePack. Since they are binary files, we need to set the GET Request action's Response Type to Binary.
  • We need to decompress the step results using the MessagePack Decompress action.
  • We can either process the step results in the loop, or we can save the results to an array of results using the Persist Data's Execution - Append Value to List action to aggregate the results into a single array. That's what we do in the example integration here.

Process the results

Finally, we can load the array of step results using an Execution - Get Value action. The results will be an array of arrays, so we can use the Collection Tools component's Flatten action to flatten the array of arrays into a single array. When we do that in our example, we get an array of 500 records, each with a capitalized body.

We can then proceed to do work on each record in the array of results.

Limitations and considerations

There are several limitations and considerations to keep in mind when processing data in parallel:

Simultaneous Execution Limit. By default, Prismatic limits the number of simultaneous executions your organization can have to 250 (25 for professional plans). If you try to run more than that many executions at once, you may receive a 429 Too Many Requests error, and will need to handle that in your integration.

Execution Time Limit. An execution can run for up to 15 minutes. If your execution takes longer than 15 minutes, it will be terminated. When sizing chunks of records, consider how many can be processed within 15 minutes.

Payload size limit. webhook request can be up to about 6MB in size. If a chunk of records exceeds 6MB, the chunks will need to be smaller.

Rate limits. The APIs you integrate with may have rate limits, and parallelizing requests may exceed those limits. Be sure to check the rate limits of the APIs you integrate with.

For information on Prismatic integration limits, see Integration Limits.