Custom Triggers
Writing custom triggers
Integrations are usually triggered on a schedule (meaning instances of the integration run every X minutes, or at a particular time of day) or via webhook (meaning some outside system sends JSON data to a unique URL and an instance processes the data that was sent). The vast majority of integrations built in Prismatic start with a schedule trigger or webhook trigger. There are situations, though, where neither the schedule nor the standard webhook trigger are suitable for one reason or another. That's where writing your own triggers come in handy.
Triggers are custom bits of code that are similar to actions. They give you fine-grained control over how a webhook's payload is presented to the rest of the steps of an integration and what HTTP response is returned to whatever invoked the trigger's webhook URL.
Similar to an action, a trigger is comprised of display
information, a perform
function and inputs
.
Additionally, you specify if your trigger can be invoked synchronously (synchronousResponseSupport
) and if your trigger supports scheduling (scheduleSupport
).
Suppose, for example, a third-party app can be configured to send CSV data via webhook and requires that the webhook echo a header, x-confirmation-code
, back in plaintext to confirm it got the payload.
The default webhook trigger accepts JSON, and responds with an execution ID, so it's not suitable for integrating with this third-party app.
This trigger will return an HTTP 200 and echo a particular header back to the system invoking the webhook, and then it'll parse the CSV payload into an object so that subsequent steps can reference through the trigger's results.body.data
:
import {
input,
trigger,
TriggerPayload,
HttpResponse,
util,
} from "@prismatic-io/spectral";
import papaparse from "papaparse"; // CSV Library
export const csvTrigger = trigger({
display: {
label: "CSV Webhook",
description:
"Accepts and parses CSV data into a referenceable object and returns a plaintext ACK to the webhook caller.",
},
perform: async (context, payload, { hasHeader }) => {
// Create a custom HTTP response that echos a header,
// x-confirmation-code, that was received as part of
// the webhook invocation
const response: HttpResponse = {
statusCode: 200,
contentType: "text/plain; charset=utf-8",
body: payload.headers["x-confirmation-code"],
};
// Create a copy of the webhook payload, deserialize
// the CSV raw body, and add the deserialized object
// to the object to the trigger's outputs
const finalPayload: TriggerPayload = { ...payload };
const parseResult = papaparse.parse(
util.types.toString(payload.rawBody.data),
{
header: util.types.toBool(hasHeader),
},
);
finalPayload.body.data = parseResult.data;
// Return the modified trigger payload and custom HTTP response
return Promise.resolve({
payload: finalPayload,
response,
});
},
inputs: {
// Declare if the incoming CSV has header information
hasHeader: input({
label: "CSV Has Header",
type: "boolean",
default: "false",
}),
},
synchronousResponseSupport: "invalid", // Do not allow synchronous invocations
scheduleSupport: "invalid", // Do not allow scheduled invocations
});
export default { csvTrigger };
Notice a few things about this example:
- The
trigger
's form is very similar to that of anaction
definition. - The
response
contains an HTTPstatusCode
,body
, andcontentType
to be returned to the webhook caller. - The second argument to the
perform
function -payload
- contains the same information that a standard webhook trigger returns. TherawBody.data
presumably contains some CSV text - thebody.data
key of the payload is replaced by the deserialized version of the CSV data. inputs
work the same way that they work for actions - you define a series ofinput
s, and they're passed in as the third parameter of theperform
function.
Instance deploy and delete events for triggers
Similar to a perform
function, a trigger can also define onInstanceDeploy
and onInstanceDelete
functions.
These functions are called when an instance is deployed or deleted, respectively.
They are handy for creating or deleting resources in a third-party system that are associated with an instance (like webhooks, file directories, etc).
Adding a trigger to your component
Once you've written a trigger, you can add it to an existing component the same way you add an action to your component, but using the triggers
key:
import { csvTrigger } from "./csvTrigger";
export default component({
key: "format-name",
public: false,
display: {
label: "Format Name",
description: "Format a person's name given a first, middle, and last name",
iconPath: "icon.png",
},
actions: {
improperFormatName,
properFormatName,
},
triggers: { csvTrigger },
});
App event triggers
It's common for users to what to know when records are created or updated in a third-party app. There are a couple of ways you can achieve this:
- An event-based system uses webhooks to notify your flow whenever something happens.
- A trigger polls the third-party API for changes on a time interval.
Generally speaking, webhook triggers are preferable over polling triggers as they provide near real-time updates.
App event webhook triggers
An app event webhook trigger takes advantage of onInstanceDeploy
and onInstanceDelete
functions (described above).
When a customer configures and deploys an instance of your integration, onInstanceDeploy
configures a webhook.
When the instance is removed, the onInstanceDelete
trigger removes the webhook.
Example app event trigger using webhooks
This example trigger will create a webhook in a third-party app when an instance is deployed, storing the webhook ID in persistent data, and delete the webhook when the instance is deleted:
const acmeWebhookTrigger = trigger({
display: {
label: "Acme Webhook Trigger",
description: "Acme will notify your app when certain events occur in Acme",
},
scheduleSupport: "invalid",
synchronousResponseSupport: "invalid",
inputs: {
connection: input({
label: "Acme Connection",
type: "connection",
required: true,
}),
events: input({
type: "string",
label: "Events",
comments:
"The events that would cause an Acme webhook request to be sent to this flow",
collection: "valuelist",
model: [
{ label: "Lead Created", value: "lead_created" },
{ label: "Lead Updated", value: "lead_updated" },
{ label: "Lead Deleted", value: "lead_deleted" },
],
}),
},
/** Run when a trigger is invoked. This function could contain additional logic for verifying HMAC signatures, etc. */
perform: async (_context, payload, _params) => {
return Promise.resolve({ payload });
},
/** Run when an instance with this trigger is deployed */
onInstanceDeploy: async (context, params) => {
// Get the current flow's webhook URL
const flowWebhookUrl = context.webhookUrls[context.flow.name];
// Create a webhook in Acme
const { data } = await axios.post(
"https://api.acme.com/webhooks",
{
endpoint: flowWebhookUrl,
events: params.events,
},
{
headers: {
Authorization: `Bearer ${params.connection.token?.access_token}`,
},
},
);
// Store the webhook ID in the instance state
return {
crossFlowState: { [`${context.flow.name}-webhook-id`]: data.id },
};
},
/** Run when an instance with this trigger is removed */
onInstanceDelete: async (context, params) => {
// Get the webhook ID from the instance state
const webhookId = context.crossFlowState[`${context.flow.name}-webhook-id`];
// Delete the webhook from Acme
await axios.delete(`https://api.acme.com/webhooks/${webhookId}`, {
headers: {
Authorization: `Bearer ${params.connection.token?.access_token}`,
},
});
},
});
Either the external third-party API, or your trigger, should be designed to be idempotent - meaning that if the onInstanceDeploy
is created twice, it won't cause any problems.
To test your trigger's onInstanceDeploy
and onInstanceDelete
functions in the integration designer, open the Test Runner drawer and click Test Deploy or Test Delete within the Trigger tab.
Note that onInstanceDeploy
and onInstanceDelete
functions do not have access to flow-specific persisted data.
Both functions should read and write data at the crossFlowState
level.
You can store unique data for each flow using key names that include the flow name in order to generate unique persisted data keys, like ${context.flow.name}-webhook-id
in the example above.
App event polling triggers
Polling triggers are used when you want to be notified of changes in an external app, but the app does not support webhooks. The trigger's job is to fetch any new data since the last time it ran.
A pollingTrigger
is similar to a standard trigger that supports running on a schedule.
Its perform
function receives an additional parameter, context.polling
, which has a few functions:
context.polling.getState()
will fetch existing poll state.context.polling.invokeAction()
can invoke an existing component's action (if one exists) to fetch data from the external app. This is handy if you don't want to duplicate logic in your trigger and an action.context.polling.setState()
sets state for the next execution to load.
Generally, a polling trigger's perform
function will look like this:
- Get current poll state from
context.polling.getState()
. This state will represent a cursor of some kind, depending on the API you're working with. If the API is paginated with pages that are numbers, your state may represent the number of the last page you fetched. If records in the API have "updated at" timestamps, this state may represent the most recent timestamp you've processed. - Fetch new records.
Using the cursor you loaded, fetch records that you have not yet processed.
You can either use
context.polling.invokeAction()
to run an action that fetches new data, or you can implement the logic yourself. If the API uses numbered pagination, fetchlastPage + 1
. If the API uses "updated at" timestamps, query for records whereupdated_at > ${previous_updated_at}
. Implementations will be different depending on the service you're integrating with. - Update poll state using
context.polling.stateState()
. Save the newest page number of "updated at" timestamp that you fetched. - Return new records for the flow to process.
If no new records were found, return
polledNoChanges: true
which will cause the execution to stop immediately.
Example PostgreSQL polling trigger
This example polling trigger connects to a PostgreSQL database and queries a table called people
which has columns firstname TEXT
, lastname TEXT
and updated_at TIMESTAMP
.
While PostgreSQL can trigger a webhook request when data changes through a combination of a postgresql TRIGGER function and HTTP plugin, implementing webhooks in your database can cause the database to slow down considerably, since every INSERT or UPDATE waits for an HTTP request. Polling makes more sense when looking for updates in a PostgreSQL database.
The first time this polling trigger runs, it finds MAX(updated_at)::TEXT
.
We cast the timestamp to TEXT
so that it can be stored in persisted state readily, and so that PostgreSQL returns a timestamp with microseconds (it normally returns just milliseconds).
On subsequent runs, we load the cursor
(previous timestamp) that was found, and execute "SELECT firstname, lastname FROM people WHERE updated_at > ${cursor}"
, polling any record that has an updated_at
timestamp greater than the previous timestamp.
import { pollingTrigger } from "@prismatic-io/spectral";
import { connectionInput } from "./inputs";
import { createDB } from "./client";
export const pollPeople = pollingTrigger({
display: {
label: "Poll people table for changes",
description: "Fetch any updated records in the Acme people table",
},
inputs: {
postgresConnection: connectionInput,
},
perform: async (context, payload, params) => {
const db = createDB(params.postgresConnection);
const state = context.polling.getState();
const cursorQuery = "SELECT MAX(updated_at)::TEXT AS cursor FROM people";
try {
if (!state?.cursor) {
// No previous cursor was found. This is the first time this
// trigger has run, so fetch an initial cursor and then exit
const { cursor: newCursor } = await db.one(cursorQuery);
context.polling.setState({ cursor: newCursor });
context.logger.log(
`First time running. Next time records with "updated_at" greater than "${newCursor}" will be fetched.`,
);
return {
payload,
polledNoChanges: true,
};
}
// The trigger has run previously. Fetch results since it last ran
const result = await db.tx(async (task) => {
return {
recordsQuery: await task.manyOrNone(
"SELECT firstname, lastname FROM people WHERE updated_at > ${cursor}",
{ cursor: state.cursor },
),
cursorQuery: await task.one(cursorQuery), // Also fetch new cursor in the same transaction
};
});
const newCursor = result.cursorQuery.cursor;
const records = result.recordsQuery;
context.polling.setState({ cursor: newCursor });
if (records.length > 0) {
// If any new records were found, return them
return {
payload: { ...payload, body: { data: records } },
polledNoChanges: false,
};
} else {
// If no results were found, return nothing and exit
return { payload, polledNoChanges: true };
}
} finally {
await db.$pool.end();
}
},
});
Note that if you return polledNoChanges: true
, the runner will immediately stop and your flow will not continue to run.
Use this property if you checked for new changes, but found none.
Example polling trigger using existing action
In this example, imagine you already have a custom component with an action listProducts
that returns a result like this:
{
"products": [
{"id": 123, "color": "red", "name": "Widget"},
{"id": 456, "color": "red", "name": "Gadget"}
]
"page_info": {
"limit": 100,
"page": 20
}
}
You can leverage this already-existing action in a polling trigger using the pollAction
property, and context.polling.invokeAction()
function:
import { pollingTrigger } from "@prismatic-io/spectral";
import { listProducts } from "./actions";
import { connectionInput } from "./inputs";
interface MyPollingState {
limit?: number;
page?: number;
}
interface Product {
id: number;
color: string;
name: string;
}
interface ListProductsResult {
products: Product[];
page_info: {
limit: number;
page: number;
};
}
const myPollingTrigger = pollingTrigger({
display: {
label: "Poll products API for changes",
description: "Fetch new products from Acme",
},
pollAction: listProducts,
inputs: { connection: connectionInput },
perform: async (context, payload, params) => {
const { limit, page: oldPage }: MyPollingState = context.polling.getState();
const { data } = (await context.polling.invokeAction({
connection: params.connection,
limit,
page: oldPage + 1, // Fetch the next page of results
})) as ListProductsResult;
const { page: newPage } = data.page_info;
const { products } = data;
if (products.length) {
// Some products were found
return {
payload: { ...payload, body: { data: products } },
polledNoChanges: false,
};
} else {
return {
payload,
polledNoChanges: true,
};
}
},
});