Custom Triggers
Writing Custom Triggers
Integrations are usually triggered on a schedule (meaning instances of the integration run every X minutes, or at a particular time of day) or via webhook (meaning some outside system sends JSON data to a unique URL and an instance processes the data that was sent). The vast majority of integrations built in Prismatic start with a schedule trigger or webhook trigger. There are situations, though, where neither the schedule nor the standard webhook trigger are suitable for one reason or another. That's where writing your own triggers come in handy.
Triggers are custom bits of code that are similar to actions. They give you fine-grained control over how a webhook's payload is presented to the rest of the steps of an integration and what HTTP response is returned to whatever invoked the trigger's webhook URL.
Similar to an action, a trigger is comprised of display
information, a perform
function and inputs
.
Additionally, you specify if your trigger can be invoked synchronously (synchronousResponseSupport
) and if your trigger supports scheduling (scheduleSupport
).
Suppose, for example, a third-party app can be configured to send CSV data via webhook and requires that the webhook echo a header, x-confirmation-code
, back in plaintext to confirm it got the payload.
The default webhook trigger accepts JSON, and responds with an execution ID, so it's not suitable for integrating with this third-party app.
This trigger will return an HTTP 200 and echo a particular header back to the system invoking the webhook, and then it'll parse the CSV payload into an object so that subsequent steps can reference through the trigger's results.body.data
:
import {
input,
trigger,
TriggerPayload,
HttpResponse,
util,
} from "@prismatic-io/spectral";
import papaparse from "papaparse"; // CSV Library
export const csvTrigger = trigger({
display: {
label: "CSV Webhook",
description:
"Accepts and parses CSV data into a referenceable object and returns a plaintext ACK to the webhook caller.",
},
perform: async (context, payload, { hasHeader }) => {
// Create a custom HTTP response that echos a header,
// x-confirmation-code, that was received as part of
// the webhook invocation
const response: HttpResponse = {
statusCode: 200,
contentType: "text/plain; charset=utf-8",
body: payload.headers["x-confirmation-code"],
};
// Create a copy of the webhook payload, deserialize
// the CSV raw body, and add the deserialized object
// to the object to the trigger's outputs
const finalPayload: TriggerPayload = { ...payload };
const parseResult = papaparse.parse(
util.types.toString(payload.rawBody.data),
{
header: util.types.toBool(hasHeader),
},
);
finalPayload.body.data = parseResult.data;
// Return the modified trigger payload and custom HTTP response
return Promise.resolve({
payload: finalPayload,
response,
});
},
inputs: {
// Declare if the incoming CSV has header information
hasHeader: input({
label: "CSV Has Header",
type: "boolean",
default: "false",
}),
},
synchronousResponseSupport: "invalid", // Do not allow synchronous invocations
scheduleSupport: "invalid", // Do not allow scheduled invocations
});
export default { csvTrigger };
Notice a few things about this example:
- The
trigger
's form is very similar to that of anaction
definition. - The
response
contains an HTTPstatusCode
,body
, andcontentType
to be returned to the webhook caller. - The second argument to the
perform
function -payload
- contains the same information that a standard webhook trigger returns. TherawBody.data
presumably contains some CSV text - thebody.data
key of the payload is replaced by the deserialized version of the CSV data. inputs
work the same way that they work for actions - you define a series ofinput
s, and they're passed in as the third parameter of theperform
function.
Instance deploy and delete events for triggers
Similar to a perform
function, a trigger can also define onInstanceDeploy
and onInstanceDelete
functions.
These functions are called when an instance is deployed or deleted, respectively.
They are handy for creating or deleting resources in a third-party system that are associated with an instance (like file directories, webhooks, etc).
This example trigger will create a webhook in a third-party app when an instance is deployed, storing the webhook ID in persistent data, and delete the webhook when the instance is deleted:
const acmeWebhookTrigger = trigger({
display: {
label: "Acme Webhook Trigger",
description: "Acme will notify your app when certain events occur in Acme",
},
scheduleSupport: "invalid",
synchronousResponseSupport: "invalid",
inputs: {
connection: input({
label: "Acme Connection",
type: "connection",
required: true,
}),
events: input({
type: "string",
label: "Events",
comments:
"The events that would cause an Acme webhook request to be sent to this flow",
collection: "valuelist",
model: [
{ label: "Lead Created", value: "lead_created" },
{ label: "Lead Updated", value: "lead_updated" },
{ label: "Lead Deleted", value: "lead_deleted" },
],
}),
},
/** Run when a trigger is invoked. This function could contain additional logic for verifying HMAC signatures, etc. */
perform: async (_context, payload, _params) => {
return Promise.resolve({ payload });
},
/** Run when an instance with this trigger is deployed */
onInstanceDeploy: async (context, params) => {
// Get the current flow's webhook URL
const flowWebhookUrl = context.webhookUrls[context.flow.name];
// Create a webhook in Acme
const { data } = await axios.post(
"https://api.acme.com/webhooks",
{
endpoint: flowWebhookUrl,
events: params.events,
},
{
headers: {
Authorization: `Bearer ${params.connection.token?.access_token}`,
},
},
);
// Store the webhook ID in the instance state
return {
crossFlowState: { [`${context.flow.name}-webhook-id`]: data.id },
};
},
/** Run when an instance with this trigger is removed */
onInstanceDelete: async (context, params) => {
// Get the webhook ID from the instance state
const webhookId = context.crossFlowState[`${context.flow.name}-webhook-id`];
// Delete the webhook from Acme
await axios.delete(`https://api.acme.com/webhooks/${webhookId}`, {
headers: {
Authorization: `Bearer ${params.connection.token?.access_token}`,
},
});
},
});
Either the external third-party API, or your trigger, should be designed to be idempotent - meaning that if the onInstanceDeploy
is created twice, it won't cause any problems.
To test your trigger's onInstanceDeploy
and onInstanceDelete
functions in the integration designer, open the Test Runner drawer and click Test Deploy or Test Delete within the Trigger tab.
Note that onInstanceDeploy
and onInstanceDelete
functions do not have access to flow-specific persisted data.
Both functions should read and write data at the crossFlowState
level.
You can store unique data for each flow using key names that include the flow name in order to generate unique persisted data keys, like ${context.flow.name}-webhook-id
in the example above.
Polling triggers
Polling triggers are used when you want to be notified of changes in an external app, but the app does not support webhooks. The trigger's job is to fetch any new data since the last time it ran.
Your pollingTrigger
has an additional parameter, context.polling
, which has a few functions:
context.polling.getState()
will fetch existing state. This state might include an "Update At" timestamp or other API cursor.context.polling.invokeAction()
can invoke an existing action (if one exists) to fetch data from the external app.context.polling.setState()
sets state for the next execution to load.
import { getPaginationFromResults, foundNewResults } from "./somewhere";
type MyPollingState = {
limit?: number;
page?: number;
};
const myPollingTrigger = pollingTrigger({
pollAction: listProducts,
perform: async (context, payload, params) => {
const pollState: MyPollingState = context.polling.getState();
let polledNoChanges = true;
// Params will hint on both trigger inputs & associated action inputs
const limit = pollState.limit ?? params.limit;
const pageToFetch = pollState.page ?? params.page;
// Will hint only on action inputs. Can replace params if needed
const results = context.polling.invokeAction({
...params,
page: pageToFetch,
});
const newPollState = getPaginationFromResults(results);
context.polling.setState(newPollState);
if (foundNewResults(results)) {
polledNoChanges = false;
}
return Promise.resolve({
payload: { ...payload, body: { data: results } },
// True if no changes found, False if we found data to act on
polledNoChanges,
});
},
});
Note that if you return polledNoChanges: true
, the runner will immediately stop and your flow will not continue to run.
Use this property if you checked for new changes, but found none.
Add a trigger to your component
Once you've written a trigger, you can add it to an existing component the same way you add an action to your component, but using the triggers
key:
import { csvTrigger } from "./csvTrigger";
export default component({
key: "format-name",
public: false,
display: {
label: "Format Name",
description: "Format a person's name given a first, middle, and last name",
iconPath: "icon.png",
},
actions: {
improperFormatName,
properFormatName,
},
triggers: { csvTrigger },
});