Skip to main content

Custom Triggers

Writing Custom Triggers

Integrations are usually triggered on a schedule (meaning instances of the integration run every X minutes, or at a particular time of day) or via webhook (meaning some outside system sends JSON data to a unique URL and an instance processes the data that was sent). The vast majority of integrations built in Prismatic start with a schedule trigger or webhook trigger. There are situations, though, where neither the schedule nor the standard webhook trigger are suitable for one reason or another. That's where writing your own triggers come in handy.

Triggers are custom bits of code that are similar to actions. They give you fine-grained control over how a webhook's payload is presented to the rest of the steps of an integration and what HTTP response is returned to whatever invoked the trigger's webhook URL.

Similar to an action, a trigger is comprised of display information, a perform function and inputs. Additionally, you specify if your trigger can be invoked synchronously (synchronousResponseSupport) and if your trigger supports scheduling (scheduleSupport).

Suppose, for example, a third-party app can be configured to send CSV data via webhook and requires that the webhook echo a header, x-confirmation-code, back in plaintext to confirm it got the payload. The default webhook trigger accepts JSON, and responds with an execution ID, so it's not suitable for integrating with this third-party app.

This trigger will return an HTTP 200 and echo a particular header back to the system invoking the webhook, and then it'll parse the CSV payload into an object so that subsequent steps can reference through the trigger's results.body.data:

import {
input,
trigger,
TriggerPayload,
HttpResponse,
util,
} from "@prismatic-io/spectral";
import papaparse from "papaparse"; // CSV Library

export const csvTrigger = trigger({
display: {
label: "CSV Webhook",
description:
"Accepts and parses CSV data into a referenceable object and returns a plaintext ACK to the webhook caller.",
},
perform: async (context, payload, { hasHeader }) => {
// Create a custom HTTP response that echos a header,
// x-confirmation-code, that was received as part of
// the webhook invocation
const response: HttpResponse = {
statusCode: 200,
contentType: "text/plain; charset=utf-8",
body: payload.headers["x-confirmation-code"],
};

// Create a copy of the webhook payload, deserialize
// the CSV raw body, and add the deserialized object
// to the object to the trigger's outputs
const finalPayload: TriggerPayload = { ...payload };

const parseResult = papaparse.parse(
util.types.toString(payload.rawBody.data),
{
header: util.types.toBool(hasHeader),
},
);

finalPayload.body.data = parseResult.data;

// Return the modified trigger payload and custom HTTP response
return Promise.resolve({
payload: finalPayload,
response,
});
},
inputs: {
// Declare if the incoming CSV has header information
hasHeader: input({
label: "CSV Has Header",
type: "boolean",
default: "false",
}),
},
synchronousResponseSupport: "invalid", // Do not allow synchronous invocations
scheduleSupport: "invalid", // Do not allow scheduled invocations
});

export default { csvTrigger };

Notice a few things about this example:

  • The trigger's form is very similar to that of an action definition.
  • The response contains an HTTP statusCode, body, and contentType to be returned to the webhook caller.
  • The second argument to the perform function - payload - contains the same information that a standard webhook trigger returns. The rawBody.data presumably contains some CSV text - the body.data key of the payload is replaced by the deserialized version of the CSV data.
  • inputs work the same way that they work for actions - you define a series of inputs, and they're passed in as the third parameter of the perform function.

Instance deploy and delete events for triggers

Similar to a perform function, a trigger can also define onInstanceDeploy and onInstanceDelete functions. These functions are called when an instance is deployed or deleted, respectively. They are handy for creating or deleting resources in a third-party system that are associated with an instance (like file directories, webhooks, etc).

This example trigger will create a webhook in a third-party app when an instance is deployed, storing the webhook ID in persistent data, and delete the webhook when the instance is deleted:

const acmeWebhookTrigger = trigger({
display: {
label: "Acme Webhook Trigger",
description: "Acme will notify your app when certain events occur in Acme",
},
scheduleSupport: "invalid",
synchronousResponseSupport: "invalid",
inputs: {
connection: input({
label: "Acme Connection",
type: "connection",
required: true,
}),
events: input({
type: "string",
label: "Events",
comments:
"The events that would cause an Acme webhook request to be sent to this flow",
collection: "valuelist",
model: [
{ label: "Lead Created", value: "lead_created" },
{ label: "Lead Updated", value: "lead_updated" },
{ label: "Lead Deleted", value: "lead_deleted" },
],
}),
},

/** Run when a trigger is invoked. This function could contain additional logic for verifying HMAC signatures, etc. */
perform: async (_context, payload, _params) => {
return Promise.resolve({ payload });
},

/** Run when an instance with this trigger is deployed */
onInstanceDeploy: async (context, params) => {
// Get the current flow's webhook URL
const flowWebhookUrl = context.webhookUrls[context.flow.name];

// Create a webhook in Acme
const { data } = await axios.post(
"https://api.acme.com/webhooks",
{
endpoint: flowWebhookUrl,
events: params.events,
},
{
headers: {
Authorization: `Bearer ${params.connection.token?.access_token}`,
},
},
);

// Store the webhook ID in the instance state
return {
crossFlowState: { [`${context.flow.name}-webhook-id`]: data.id },
};
},

/** Run when an instance with this trigger is removed */
onInstanceDelete: async (context, params) => {
// Get the webhook ID from the instance state
const webhookId = context.crossFlowState[`${context.flow.name}-webhook-id`];

// Delete the webhook from Acme
await axios.delete(`https://api.acme.com/webhooks/${webhookId}`, {
headers: {
Authorization: `Bearer ${params.connection.token?.access_token}`,
},
});
},
});
Ensure your onInstanceDeploy function is idempotent

Either the external third-party API, or your trigger, should be designed to be idempotent - meaning that if the onInstanceDeploy is created twice, it won't cause any problems.

To test your trigger's onInstanceDeploy and onInstanceDelete functions in the integration designer, open the Test Runner drawer and click Test Deploy or Test Delete within the Trigger tab.

warning

Note that onInstanceDeploy and onInstanceDelete functions do not have access to flow-specific persisted data. Both functions should read and write data at the crossFlowState level. You can store unique data for each flow using key names that include the flow name in order to generate unique persisted data keys, like ${context.flow.name}-webhook-id in the example above.

Polling triggers

Polling triggers are used when you want to be notified of changes in an external app, but the app does not support webhooks. The trigger's job is to fetch any new data since the last time it ran.

Your pollingTrigger has an additional parameter, context.polling, which has a few functions:

  • context.polling.getState() will fetch existing state. This state might include an "Update At" timestamp or other API cursor.
  • context.polling.invokeAction() can invoke an existing action (if one exists) to fetch data from the external app.
  • context.polling.setState() sets state for the next execution to load.
Example polling trigger
import { getPaginationFromResults, foundNewResults } from "./somewhere";

type MyPollingState = {
limit?: number;
page?: number;
};

const myPollingTrigger = pollingTrigger({
pollAction: listProducts,
perform: async (context, payload, params) => {
const pollState: MyPollingState = context.polling.getState();
let polledNoChanges = true;

// Params will hint on both trigger inputs & associated action inputs
const limit = pollState.limit ?? params.limit;
const pageToFetch = pollState.page ?? params.page;

// Will hint only on action inputs. Can replace params if needed
const results = context.polling.invokeAction({
...params,
page: pageToFetch,
});

const newPollState = getPaginationFromResults(results);
context.polling.setState(newPollState);

if (foundNewResults(results)) {
polledNoChanges = false;
}

return Promise.resolve({
payload: { ...payload, body: { data: results } },
// True if no changes found, False if we found data to act on
polledNoChanges,
});
},
});

Note that if you return polledNoChanges: true, the runner will immediately stop and your flow will not continue to run. Use this property if you checked for new changes, but found none.

Add a trigger to your component

Once you've written a trigger, you can add it to an existing component the same way you add an action to your component, but using the triggers key:

import { csvTrigger } from "./csvTrigger";

export default component({
key: "format-name",
public: false,
display: {
label: "Format Name",
description: "Format a person's name given a first, middle, and last name",
iconPath: "icon.png",
},
actions: {
improperFormatName,
properFormatName,
},
triggers: { csvTrigger },
});