Handling Large Files in Custom Components
If you need to process large files in your integration, it's easy to exhaust your available memory and encounter out-of-memory (OOM) problems. For example, if you pull down a 100 MB file from an SFTP server, deserialize the CSV to a JavaScript object, map each row to a new format, serialize each row, etc., you can end up with a dozen copies of the data in memory and can overflow the 1GB of memory that the integration runner has by default.
Rather than loading an entire large file at once, it's often better to load and process smaller portions of the file at a time. That way, you can load a few kilobytes of a file, or a few rows of a CSV, process those, and then move to the next set of bytes or rows. If done correctly, a step can process very large files with only a few megabytes of memory.
Processing large files a small portion at a time is generally accomplished in NodeJS with streams.
Let's look at a couple of examples.
Streaming a large file from HTTP to SFTP
Suppose that your integration needs to pull down a file from an HTTP endpoint, and save that file to an SFTP server. If your file is large (say, 200MB in size), and you use several steps to accomplish your goal, you can end up using well over 1 GB of memory:
- The HTTP step will use 200MB when downloading the file
- The HTTP step will use 200MB+ when serializing and persisting the step result
- Depending on the output format, the HTTP step may use another 200MB+ to deserialize the file to JSON, etc.
- The SFTP step will use 200MB when convering the file's contents to a format that SFTP understands
If you download the file from the HTTP endpoint a few KB at a time, and stream those bytes directly to the SFTP server, your step will only use a few MB of memory at a time - the entire file will never be loaded in to memory at once.
┌────────┐ ┌────────┐ ┌────────┐
│ HTTP │──────►│ Your │──────►│ SFTP │
│ Server │ chunk │ action │ chunk │ Server │
└────────┘ └────────┘ └────────┘
In the example below, the axios.get
function takes a parameter, { responseType: "stream" }
.
That will cause response.data
to be of type stream.Readable
.
That stream can be passed to an SFTP client's .put
function.
When it detects a readable stream, ssh2-sft-client will pipe that stream to the SFTP server as chunks are received.
- actions.ts
- inputs.ts
- connections.ts
import { action, util, ConnectionError } from "@prismatic-io/spectral";
import { connectionInput, sftpPathInput, sourceUrlInput } from "./inputs";
import axios from "axios";
import SFTPClient from "ssh2-sftp-client";
const uploadFileFromUrl = action({
display: {
label: "Upload file from URL",
description: "Upload a file from a URL to an SFTP server",
},
inputs: {
connection: connectionInput,
sourceUrl: sourceUrlInput,
sftpPath: sftpPathInput,
},
perform: async (context, params) => {
const sftpClient = new SFTPClient();
const { username, password, host, port, timeout } =
params.connection.fields;
try {
await sftpClient.connect({
username: util.types.toString(username),
password: util.types.toString(password),
host: util.types.toString(host),
port: util.types.toInt(port),
readyTimeout: util.types.toInt(timeout) || 3000,
});
} catch (err) {
throw new ConnectionError(
params.connection,
`Unable to connect to SFTP server. ${err}`,
);
}
const response = await axios.get(params.sourceUrl, {
responseType: "stream",
});
try {
const result = await sftpClient.put(response.data, params.sftpPath);
return { data: result };
} finally {
await sftpClient.end();
}
},
});
export default { uploadFileFromUrl };
import { input, util } from "@prismatic-io/spectral";
export const connectionInput = input({
label: "Connection",
type: "connection",
required: true,
});
export const sourceUrlInput = input({
label: "Source File URL",
type: "string",
clean: util.types.toString,
required: true,
example: "https://files.example.com/my-file.pdf",
});
export const sftpPathInput = input({
label: "Destination File Path",
type: "string",
clean: util.types.toString,
required: true,
example: "/path/to/my-file.pdf",
});
import { connection } from "@prismatic-io/spectral";
export const basic = connection({
key: "basic",
display: {
label: "Basic Username/Password",
description: "Basic Username and Password connection",
},
inputs: {
username: { label: "Username", type: "string", required: true },
password: { label: "Password", type: "password", required: true },
host: { label: "Host", type: "string", required: true },
port: { label: "Port", type: "string", default: "22", required: true },
},
});
export default [basic];
You can extend the HTTP call to be authenticated, have search paramaters, etc.
As long as you specify { responseType: "stream" }
, your response will be a readable stream.
Similar concepts can be applied to stream a file from HTTP to Dropbox, Google Drive, Azure Files, or most other file storage systems - most NodeJS file storage libraries accept streams as inputs or have writeable stream functions.
Streaming and processing a large CSV from Amazon S3
In this example, suppose you host large CSV files in Amazon S3 that represent transactions. These files are formatted like this:
id,product,quantity,price
1,widgets,5,100
2,gadgets,10,3.5
3,whatsits,1,200
.
.
.
But, there are thousands of records and the file is hundreds of MB in size and cannot be loaded into memory all at once.
You want to find the total price of the transactions (sum of quantity x price
) and return just the total price.
- First, we'll use the AWS SDK fetch an object from Amazon S3.
The resulting object's
.Body
property is an instance ofstream.Readable
. - Then, we'll stream the readable file into a popular CSV parser, PapaParse.
PapaParse accepts streams and provides a callback function,
step
, which is run whenever a line of a CSV stream is processed. As we process each record, we'll addquantity x price
to the total price. - Finally, we'll return the total price as the step's result. Because we're not returning the entire file that was read, the runner does not spend time and memory serializing the file as a step result.
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { action, input, util } from "@prismatic-io/spectral";
import { parse } from "papaparse";
import { Readable } from "stream";
interface CsvRecord {
data: {
id: string;
product: string;
quantity: number;
price: number;
};
}
export const processLargeCsvFromS3 = action({
display: {
label: "Process Large CSV from S3",
description: "Find the total price of many transactions in a CSV file",
},
inputs: {
connection: input({
label: "Connection",
type: "connection",
required: true,
}),
bucket: input({
label: "Bucket Name",
type: "string",
required: true,
clean: util.types.toString,
}),
objectKey: input({
label: "Object Key",
type: "string",
required: true,
clean: util.types.toString,
}),
},
perform: async (context, params) => {
// Initialize an Amazon S3 client
const s3 = new S3Client({
region: "us-east-2",
credentials: {
accessKeyId: util.types.toString(params.connection.fields.accessKeyId),
secretAccessKey: util.types.toString(
params.connection.fields.secretAccessKey,
),
},
});
// Initialize an accumulator
let total = 0;
// Fetch an object from Amazon S3.
// The returned item.Body is stream.Readable
const command = new GetObjectCommand({
Bucket: params.bucket,
Key: params.objectKey,
});
const item = await s3.send(command);
// Parse the stream as it is read from S3, running "step" for each record read
await new Promise((resolve) => {
parse(item.Body as Readable, {
header: true,
dynamicTyping: true,
// As each line in the CSV is read, function "step" is called
step: (record: CsvRecord, parser) => {
parser.pause(); // Pause the parser while work is done
total += record.data.quantity * record.data.price;
parser.resume(); // Re-enable the CSV parser after work is complete
},
// When the stream ends, run complete
complete: () => {
resolve(null);
},
});
});
return { data: total };
},
});
export default { processLargeCsvFromS3 };
The parser.pause()
and parser.resume()
above are not necessary for our example, but if you need to do work on each record that you read (for example, transform the data and send it to an API), pausing the CSV parser while that work is done can help you to avoid overwhelming the API you're sending data to.
A CSV file is able to be processed readily as a stream because it can be read line-by-line. Other formats, like JSON or XML, have beginning and ending brackets or tags that may require you to load the file in its entirety.
If you are dealing with JSON, consider JSONL format, which can be read line-by-line. If you are parsing XML, you can parse the XML file by streaming data into node-xml-stream-parser and looking for specific XML tags.