Definition
The $externalFunction
stage triggers processes in a specific AWS
Lambda resource. Your request to the AWS Lambda process can either be synchronous
or asynchronous.
Create an AWS Lambda and Authenticate with Unified AWS Access
In order to call an AWS Lambda resource from within your Atlas Stream Processing pipeline, your AWS Lambda must be deployed to the same AWS region in which your Atlas Stream Processing is deployed. To learn more about deploying an AWS Lambda resource, see the AWS documentation.
Create an AWS Lambda Function.
With either the AWS CLI or through the AWS UI, create a lambda function.
Configure Unified AWS Access.
Note
The procedure described here only covers the basic setup flow in the Atlas UI. To learn more, see the the Set Up Unified AWS Access documentation.
Required Access
To set up unified AWS access, you must have
Organization Owner
or Project Owner
access to
the project.
Prerequisites
An Atlas account.
Note
Your AWS IAM policy must include the
lambda:InvokeFunction
action.You must replace the placeholder
ExternalId
andResource
values with your own, which are available through the Unified AWS Access configuration process. Note that theExternalId
in this example includes a wildcard that matches any Lambda Function with a name that begins withfunction-
.
Add Trust Relationships to an Existing Role in the Atlas UI
Next, you must enable your self-managed AWS IAM role to execute your AWS Lambda resource.
permission-policy.json
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction" ], "Resource": "arn:aws:lambda:us-east-1:257394458927:function:<function-name>" } ] }
Navigate to the AWS IAM integration page in your Atlas project and click on the Authorize an AWS IAM role button.
Create a new role (or modify an existing role) with the
role-trust-policy.json
that is shown in the modal.Once the role is created (or the existing role is updated with the new trust policy), paste the role's ARN in the modal.
In the AWS console, go to IAM > Roles and select your role.
In the permissions tab, add a new "inline permission" to allow this role to invoke your lambda(s). The example
permission-policy.json
provided above adds the permission to run any lambda with the name<function-name>
.Finally, navigate to your Atlas Stream Processing Instance, add a new AWS Lambda connection and choose the AWS IAM Role ARN that you configured in the previous step.
Connect your Atlas Stream Processing Instance to your AWS Lambda Function
In order to send a request to your AWS Lambda resource from within your Atlas Stream Processing pipeline, you must first add your AWS Lambda resource as a connection in your Atlas Stream Processing resource.
You can add your AWS Lambda resource as a connection through the Atlas UI,
the Atlas CLI, or the Atlas API, as shown in the following example.
You can update the roleArn
placeholder in the example with the arn
from your AWS IAM configuration.
curl --user "username:password" --digest \ --header "Content-Type: application/json" \ --header "Accept: application/vnd.atlas.2023-02-01+json" \ --include \ --data '{"name": "TestAWSLambdaConnection","type": "AWSLambda","aws": {"roleArn": "arn:aws:iam::<aws_account>:role/<role_name>"}}' \ --request POST "https://cloud.mongodb.com/api/atlas/v2/groups/<group_id>/streams/<tenant_name>/connections"
Syntax
Minimal request
The following example shows the required fields for a minimal request.
{ $externalFunction: { connectionName: "myLambdaConnection", functionName: "arn:aws:lambda:region:account-id:function:function-name", as: "response", }}
Customized request
The following, customized example specifies error handling, synchronous execution and a preprocessed Atlas Stream Processing document as a payload in addition to the required fields illustrated above.
{ $externalFunction: { connectionName: "myLambdaConnection", functionName: "arn:aws:lambda:region:account-id:function:function-name", execution: "sync" as: "response", onError: "fail", payload: [{$replaceRoot: { newRoot: "$fullDocument.payloadToSend" } }, { $addFields: { sum: { $sum: "$randomArray" }}}, { $project: { success: 1, sum: 1 }}], }}
Note
The onError
field defines behavior for API level errors for both
synchronous and asynchronous requests to your AWS Lambda resource as well as
AWS Lambda function errors for synchronous requests.
The $externalFunction
stage takes a document with the following fields:
Field | Type | Necessity | Description |
---|---|---|---|
| string | Required | Label that identifies the connection in the Connection Registry, to which the request is sent. |
| string | Required | The full AWS ARN or the name of the AWS Lambda function to be triggered. |
| enum | Optional | Parameter that specifies whether the AWS Lambda function should be called synchronously or asynchronously. Accepted values are:
Defaults to |
| string | Optional | Name of the field for the REST API response. If the endpoint returns 0 bytes, the operator doesn't set the |
| string | Optional | Behavior when the operator encounters an
Defaults to |
| array | Optional | Custom inner pipeline that allows you to customize the request body sent to the API endpoint.
|
Behavior
The $externalFunction
stage sends a request to the specified AWS Lambda
resource. If the request is synchronous, the response is returned in the specified field
and the pipeline continues processing. If the request is asynchronous, the
pipeline continues processing without waiting for the response.
If a synchronous request fails, the pipeline's error handling behavior is determined
by the onError
field. If the request is asynchronous, the onError
field
only applied to AWS API errors, as opposed to AWS Lambda function errors. If the
onError
field is not specified, the default behavior is to send the affected
document to the dead letter queue.
onError Value | Behavior |
---|---|
| The affected document is sent to the dead letter queue. |
| The affected document is ignored. |
| The stream processor is terminated on error. |
If the pipeline can't convert the document to proper JSON, or the configured
pipeline does not create a valid bson object as a product of the final stage,
the document is sent to the dead letter queue regardless of the
onError
setting.
Errors arising from incorrect configuration of the $externalFunction
operator
itself, such as invalid expressions, do not trigger the onError
behavior.
Instead, the stream processor fails with an error message. The Atlas Stream Processing pipeline
retries failed requests that may be the result of transient errors.