Use Case:-
I am sure you all remember how much boilerplate code we used to write when we convert the DynamoDB stream to the business object. Here is a small snippet of the GoLang program.
m := &EntitySearch{}
for name, value := range recordImage {
if name == "ExternalId" {
m.ExternalId = value.String()
} else if name == "Source" {
m.Source = value.String()
} else if name == "EntityId" {
m.EntityId = value.String()
} else if name == "EntityType" {
m.EntityType = value.String()
}
During ReInvent 2022, the introduction of EventBridge pipes brought about a significant transformation in how we utilize the DynamoDB stream by converting it into business objects. This breakthrough has nearly eliminated the need for writing Lambda code for the conversion process. But what if you have a DynamoDB List/Map object in your stream? EventBridge pipes don't support all JSON Path.
What if you have List or Map objects like this respectively?
"Skills": [
"AWS",
"GoLang",
"Java",
"TypeScript"
]
{
"orderItems": [
{
"itemId": "1234567890",
"quantity": 1,
"price": 10.00
},
{
"itemId": "9876543210",
"quantity": 2,
"price": 5.00
}
]
}
if you’ve played with EventBridge Pipes you know that you can do a bit of a transform in target if you try to access Skills objects like this "$.dynamodb.NewImage.Skills.L[*].S", you will get an error
Here is the solution where I used the Pipeline Enrichment step with the Step Function. Step Function support fully JSON path.
The Architecture:
Step Functions:-
You can download the source code from here, which has been implemented using AWS CDK and SAM, whichever approach you prefer.
CDK Deploy:-
1) cd cdk
2) npm install
3) cdk deploy --all -a "npx ts-node bin/app.ts" --profile <your profile name>
SAM Deploy:-
1) cd sam
2) sam deploy --stack-name EventBridgePipeEnrich --capabilities CAPABILITY_NAMED_IAM --guided --profile <profile name>
Insert Data in DynamoDB:-
1) cd dynamodb
2) ./dynamodb.sh
You will see the below record in the DynamoDB table.
Here are the final Cloudwatch logs after an Enrichment.
Understand State Machine:-
I am utilizing straightforward Step Functions with an Inline Map to iterate over a DynamoDB Stream. It's worth noting that in this case, I am using the expression 'Skills.$' to refer to the 'Skills' attribute within the 'NewImage' object of the DynamoDB record, where the values are stored as a list of strings ('L[*].S').
{
"Comment": "A description of my state machine",
"StartAt": "DynamoDB Map",
"States": {
"DynamoDB Map": {
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Transform",
"States": {
"Transform": {
"Type": "Pass",
"Parameters": {
"details": {
"meta-data": {
"correlationId.$": "$.eventID",
"eventName.$": "$.eventName"
},
"data": {
"PK.$": "$.dynamodb.Keys.PK.S",
"SK.$": "$.dynamodb.Keys.SK.S",
"Skills.$": "$.dynamodb.NewImage.Skills.L[*].S",
"Contact": {
"Home.$": "$.dynamodb.NewImage.Contact.M.Home.S",
"Phone.$": "$.dynamodb.NewImage.Contact.M.Phone.S"
}
}
}
},
"InputPath": "$",
"End": true
}
}
},
"End": true
}
}
}
DynamoDB table and Stream:-
To ensure that EventBridge pipes can read it, we must activate the DynamoDB stream. Reading Both the new and the old images of the item.
this._table = new dynamodb.Table(this, id, {
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: cdk.RemovalPolicy.DESTROY,
partitionKey: { name: 'PK', type: dynamodb.AttributeType.STRING },
sortKey: {
name: 'SK', type: dynamodb.AttributeType.STRING },
tableName: `Employee`,
stream: StreamViewType.NEW_AND_OLD_IMAGES
});
EventBridge Pipes:-
Here is how I set up EventBridge pipes.
const pipe = new pipes.CfnPipe(this, 'pipe', {
name: 'employee-app-pipe',
roleArn: props.employeePipeRole.roleArn,
source: props.table.tableStreamArn!,
target: props.employeeEventBus.targetArn,
sourceParameters: {
dynamoDbStreamParameters: {
startingPosition: 'LATEST',
batchSize: 1,
deadLetterConfig: {
arn: props.employeeDLQueue.queueArn,
},
maximumRetryAttempts: 1,
},
filterCriteria: {
filters: [{
pattern: '{ "eventName": ["INSERT","MODIFY"] }',
}],
}
},
enrichment: props.employeeAppStateMachine.stateMachine.attrArn,
targetParameters: {
eventBridgeEventBusParameters: {
detailType: 'EmployeeDetailsChanged',
source: 'employee-app',
},
},
});
Reading LATEST from the DynamoDB stream.
Reading 1 Item in batch due to Sample Example.
Make sure you set "maximumRetryAttempts" value. By default value is -1 and it will keep retrying for the next 24 hours. you can read more here.
For Enrichment steps, call Step Functions props.employeeAppStateMachine.stateMachine.attrArn
Wrap-Up:-
In the provided example, we demonstrate the utilization of EventBridge Enrichment for processing complex JSON structures, specifically DynamoDB List/Map data types. This approach is necessary because EventBridge pipes lack complete support for JSON path expressions. Employing this example will prove useful when working with intricate JSON structures.
I hope you enjoy reading this post, and I sincerely hope it proves beneficial to you during your implementation process.