chrismitchellonline

AWS DynamoDB Streams With Lambda

2022-10-20

In this article we’ll look at how we can use AWS DynamoDB streams to trigger a Lambda function when we insert data into a DynamoDB table. We’ll take a look at a real world example of a users table where we add additional data to the table on an insert action.

Building and Deploying Infrastructure with CDK

We’ll use AWS CDK to build our infrastructure and deploy to AWS environment, with Typescript for the CDK language. Run the cdk init script to build a new project:

cdk init --language typescript

By default CDK will use the current folder name for a naming convention, in my case generating the file lib/dynamo-db-streams-cdk-stack.ts. We’ll add our infrastructure code here.

DynamoDB CDK

First we’ll add the DynamoDB table:

    const dynamodb = cdk.aws_dynamodb;
    const userLoginTable = new dynamodb.Table(this, 'UserLoginTable', {
        partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },    
        stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
        tableName: "user-login-table"
});

This construct will create a DynamoDB table with the name user-login-table, enable streams that will send NEW_AND_OLD_IMAGES, and has a partition key userId as a string.

For more information about other options available when creating a DynamoDB using CDK See AWS Documentation

Lambda Function CDK

Next create the Lambda function construct:

    const lambda = cdk.aws_lambda;
    const userLoginTableLambdaTrigger = new lambda.Function(this,"UserLoginTableLambdaTrigger", {
      runtime: lambda.Runtime.NODEJS_16_X,
      handler: 'index.handler',
      code: lambda.Code.fromAsset(path.join(__dirname, '../src/lambda/user-login-table-trigger')),
      functionName:"user-login-table-trigger"
    })

This construct will create a Lambda function named user-login-table-trigger with runtime NodeJS 16. The source code for this function we load using the lambda.Code.fromAsset method, where we can point to a local source directly to load our Lambda code. To do this, we’ll need to include the path library:

import * as path from "path";

Later in this article we’ll add our Lambda source code.

For more information about the options available when creating a Lambda function with CDK, See AWS Documentation

Add Stream Event to Lambda

We’ll use CDK’s Lambda object CfnEventSourceMapping method to trigger the Lambda function when data in DynamoDB changes:

    new lambda.CfnEventSourceMapping(this,"UserLoginTableStreamTrigger",{      
        functionName: userLoginTableLambdaTrigger.functionName,
        batchSize: 5,
        eventSourceArn: userLoginTable.tableStreamArn,
        filterCriteria: {
          filters: [{
            pattern: ' {"eventName": ["INSERT"]}',
          }],
        },      
        startingPosition: "TRIM_HORIZON", 
    });

We set the function name by using the Lambda function object we created earlier userLoginTableLambdaTrigger.functionName, and the eventSourceArn comes from the DynamoDB object we created userLoginTable.tableStreamArn. Also set the filter criteria to limit when our Lambda function gets called. In this example we only want to call Lambda when a new record is written to Dynamo.

For more information about how to manage streaming data to Lambda See AWS Documentation

Lambda Permissions

By default CDK will create a Lambda role with the correct execution and logging policies already attached. In this exercise we will also need to read and write data from Dynamo, as well as read DynamoDB streams. Using the DynamoDB object we created earlier, userLoginTable, we can access helper methods to wire up these permissions by passing our Lambda function object:

userLoginTable.grantStreamRead(userLoginTableLambdaTrigger);
userLoginTable.grantReadWriteData(userLoginTableLambdaTrigger);

This will ensure the role Lambda uses to execute will have the correct permissions to interact with DynamoDB and its streamed data.

Lambda Source Code

One last task before we can deploy our CDK stack is to add the source code for our Lambda function. For now I’ll create a simple “Hello World” Lambda and deploy that source code, and later we’ll look at an example of a more useful Lambda function.

For our Lambda source code, create a directory at src/lambda/user-login-table-trigger, this path should match the Lambda construct from our CDK code. In this directory add an index.js file with the following Javascript:

exports.handler = async (event) => {
    
    console.log("Hello World, here is our event:");    
    console.log(JSON.stringify(event));

    return

}

Notice that we log the event that is passed to Lambda with console.log(JSON.stringify(event)); This event represents the data passed from the DynamoDB stream event and will contain the inserted record, the keys present on the table, and metadata describing the record. This will be used later when we need to process this data.

Finalize CDK

Here is our final CDK class:

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import * as path from "path";

export class DynamoDbStreamsCdkStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    const dynamodb = cdk.aws_dynamodb;
    const userLoginTable = new dynamodb.Table(this, 'UserLoginTable', {
      partitionKey: { name: 'userId', type: dynamodb.AttributeType.STRING },    
      stream: dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
      tableName: "user-login-table"
    });

    const lambda = cdk.aws_lambda;
    const userLoginTableLambdaTrigger = new lambda.Function(this,"UserLoginTableLambdaTrigger", {
      runtime: lambda.Runtime.NODEJS_16_X,
      handler: 'index.handler',
      code: lambda.Code.fromAsset(path.join(__dirname, '../src/lambda/user-login-table-trigger')),
      functionName:"user-login-table-trigger"
    })

    new lambda.CfnEventSourceMapping(this,"UserLoginTableStreamTrigger",{      
        functionName: userLoginTableLambdaTrigger.functionName,
        batchSize: 5,
        eventSourceArn: userLoginTable.tableStreamArn,
        filterCriteria: {
          filters: [{
            pattern: ' {"eventName": ["INSERT"]}',
          }],
        },      
        startingPosition: "TRIM_HORIZON", 
    });

    userLoginTable.grantStreamRead(userLoginTableLambdaTrigger);
    userLoginTable.grantReadWriteData(userLoginTableLambdaTrigger);

  }
}

We can run the CDK deploy command to create the infrastructure:

cdk deploy

when complete you’ll see a small green output message similar to the following:

DynamoDbStreamsCdkStack: creating CloudFormation changeset...

 ✅  DynamoDbStreamsCdkStack

✨  Deployment time: 43.11s

At this point all of our infrastructure, triggers, and permissions should be in place. Writing an item to the user-login-table DynamoDB table will result in an invocation of our user-login-table-trigger Lambda function. Currently our function doesn’t do much, lets look at a real world example of a Lambda function that will update our table when a record is inserted.

DynamoDB Streams Real World Example

Currently we have a DynamoDB table that we can write userId’s to. Lets say that we want to know where a user logs in from. A web or mobile application can provide us the userId and a latitude/longitude value when they log in, but ultimately we want to store the actual address based on this latitude and longitude value. We can get this value by calling an external API service such as Google Maps API to do this address conversion. Instead of doing this in the same call from the web application, we’ll offload this process to our DynamoDB streams Lambda trigger to populate this data.

Updated Lambda Function

Here is the updated Lambda function that will do our address lookup based on a latitude/longitude value pair. Add the following code to src/lambda/user-login-table-trigger/index.js:

const axios = require("axios");
const AWS = require("aws-sdk");
/**
 * Called from DDB trigger. Use event data to convert lat/long to Address and store
 * back in DDB as array
 * @param {Object} event 
 * @returns 
 */
exports.handler = async (event) => {
    
    console.log("Hello World, here is our event:");    
    console.log(JSON.stringify(event));

    for(record of event.Records){
        const userId = record.dynamodb.Keys.userId.S
        const latlng = record.dynamodb.NewImage.latlng.S;

        const apiKey = 'xxx';
        const url = "https://maps.googleapis.com/maps/api/geocode/json?latlng=" + latlng + "&key="+apiKey;
        const response = await axios.get(url)
        const address = response.data.results[0].formatted_address;        
        
        const dynamoDB = new AWS.DynamoDB.DocumentClient();
        try{
            await dynamoDB.update({
                TableName: "user-login-table",
                Key: {
                  userId: userId,
                },
                UpdateExpression: `set address = :address`,
                ExpressionAttributeValues: {
                  ":address": address,
                },
                
            }).promise()
        }
        catch(error){
            console.log(error);
            //continue to process the rest of the records
        }
               
    }    

    return;
}

First off we’ll need to install Axios NPM package, this will allow us to easily make promise based HTTP requests. Install with NPM:

npm install axios

In our Lambda function the first thing we do is log the event coming from DynamoDB. We don’t always have to log this data, but its helpful to understand what the data is coming from DynamoDB streams:

    console.log("Hello World, here is our event:");    
    console.log(JSON.stringify(event));

From our event object we can see that DynamoDB streams always sends records in an array, even if its only 1 record. This batch size is determined by our CfnEventSourceMapping object we created with CDK earlier, and in this example its 5 records per batch. Set up our for loop to loop through the records:

 for(record of event.Records){

Now that we have individual records, we can extract the data that was inserted into DynamoDB, in this case userId and latlng values:

const userId = record.dynamodb.Keys.userId
const latlng = record.dynamodb.NewImage.latlng;

And then use this data to call Google Maps API to return an address from our latlng data:

    const apiKey = 'xxx';
    const url = "https://maps.googleapis.com/maps/api/geocode/json?latlng=" + latlng + "&key="+apiKey;
    const response = await axios.get(url)
    const address = response.data.results[0].formatted_address;

Note: I’m hiding the apiKey value here, you’ll need to generate this from your own Google account.

Now we have an address string that we can write back to DynamoDB using the DocumentClient object’s update method:

        const dynamoDB = new AWS.DynamoDB.DocumentClient();
        try{
            await dynamoDB.update({
                TableName: "user-login-table",
                Key: {
                  userId: userId,
                },
                UpdateExpression: `set address = :address`,
                ExpressionAttributeValues: {
                  ":address": address,
                },
                
            }).promise()
        }
        catch(error){
            console.log(error);
            //continue to process the rest of the records
        }

And now our original record will be updated with an address field.

Testing

At this point we would begin writing our front end application to post data that would be tracking our user logins, but for now we test using the AWS CLI to write data to DynamoDB and verify that our trigger and subsequent update logic is working.

Write a record to DynamoDB using the AWS CLI with the following command, notice we are writing the following values:

  • userId: 1234
  • latlng: 40.714224,-73.961452
aws dynamodb put-item --table-name=user-login-table --item '{"userId":{"S":"1234"},"latlng":{"S":"40.714224,-73.961452"}}'

After writing this data we can retrieve all of the records from our table with a scan (Typically we would avoid scan operations in Dynamo, but this table only has one record and is only for demonstration).

After just a few milliseconds we should get updated address information:

aws dynamodb scan --table-name user-login-table

Notice that the address value has been added from the Lambda invocation:

{
    "Items": [
        {
            "latlng": {
                "S": "40.714224,-73.961452"
            },
            "address": {
                "S": "277 Bedford Ave, Brooklyn, NY 11211, USA"
            },
            "userId": {
                "S": "1234"
            }
        }
    ],
    "Count": 1,
    "ScannedCount": 1,
    "ConsumedCapacity": null
}

Conclusion

Hopefully you find this method of updating DynamoDB with streams and Lambda and the real world example helpful. There are countless other uses for streaming data out of DynamoDB, which is particularly helpful in offloading longer load times associated with external API calls. Feel free to comment below with questions or other use cases you’ve found!

comments powered by Disqus
Social Media
Sponsor