Learn About Amazon VGT2 Learning Manager Chanci Turner
This article is authored by Mark Johnson, Enterprise Support Leader, Serverless, and Linda Hayes, Technical Account Manager, Serverless.
Amazon S3 serves as a widely utilized object storage service for file management among various clients. By leveraging Amazon S3 Event Notifications or Amazon EventBridge, users can establish workloads employing event-driven architecture (EDA). This architecture reacts to events triggered by modifications to objects within S3 buckets.
EDA enables asynchronous communication between different system components, promoting a decoupled structure that allows each part to function independently. However, certain scenarios may inadvertently create dependencies between events, leading to a tightly coupled architecture. This blog post illustrates a frequent instance of such coupling and demonstrates how AWS Step Functions can effectively address it.
Overview
In this scenario, a company is divided into two independent teams—the Sales team and the Warehouse team. Each is tasked with uploading a monthly data file to an S3 bucket for processing. The uploads generate events that trigger subsequent processes. The Warehouse file processing involves cleaning the data and merging it with Shipping team data, while the Sales file processing correlates the data with the previously combined Warehouse and Shipping data. This correlation is essential for analysts to conduct forecasting and derive other insights.
For the correlation to be successful, the Warehouse file must be processed before the Sales file. Given the autonomous nature of the teams, no coordination exists to guarantee that the Warehouse file is processed prior to the Sales file upload.
In such cases, the Aggregator pattern can be employed. This pattern gathers and stores events, subsequently triggering a new event based on these combined events. In the example at hand, the combined events consist of the processed Warehouse file and the newly uploaded Sales file.
The requirements of the aggregator pattern include:
- Correlation: A method to group related events, achieved through a unique identifier in the file name.
- Event aggregator: A stateful repository for the events.
- Completion check and trigger: A mechanism to ascertain when the combined events have been received and a way to publish the resulting event.
Architecture Overview
The architecture integrates several AWS services:
- Amazon DynamoDB serves as the event aggregator.
- Step Functions orchestrate the workflow.
- AWS Lambda is utilized to parse the file name and extract the correlation identifier.
- AWS Serverless Application Model (AWS SAM) is employed for infrastructure as code and deployment.
Here’s how the workflow unfolds:
- File upload: The Sales and Warehouse teams upload their respective files to S3.
- EventBridge: An ObjectCreated event is dispatched to EventBridge, where a rule is configured to target the main workflow.
- Main state machine: This orchestrates the aggregator operations and file processing, separating the aggregator logic from the workflows of individual files.
- File parser and correlation: A Lambda function executes the business logic to identify the file type and its corresponding identifier.
- Stateful store: A DynamoDB table retains vital information about the file, such as its name, type, and processing status. The state machine interacts with the DynamoDB table for reading and writing tasks, including storing task tokens.
- File processing: Based on file type and preconditions, the state machines specific to each file type are executed.
- Task Token & Callback: A task token is generated when an attempt is made to process the dependent file before the independent one. The Step Functions’ “Wait for a Callback” pattern allows the execution of the dependent file to resume once the independent file is processed.
Walkthrough
Prerequisites for deployment:
- AWS CLI and AWS SAM CLI installed.
- An AWS account.
- Appropriate permissions to manage AWS resources.
- Git installed.
To implement the example, please refer to the instructions in the GitHub repository. This walkthrough illustrates the scenario where the dependent file (Sales file) is uploaded before the independent file (Warehouse file).
The workflow initiates with the uploading of the Sales file to its designated S3 bucket. It is assumed that the Sales and Warehouse teams operate independently, hence the use of separate S3 buckets. Sample files can be found in the code repository.
Upon uploading the file to S3, an event is sent to EventBridge, which triggers the aggregator state machine. The event pattern for the EventBridge rule is as follows:
{
"detail-type": ["Object Created"],
"source": ["aws.s3"],
"detail": {
"bucket": {
"name": ["sales-mfu-eda-09092023", "warehouse-mfu-eda-09092023"]
},
"reason": ["PutObject"]
}
}
The aggregator state machine commences by calling the file parser Lambda function, which identifies the file type and uses the identifier for file correlation. In this example, the file name incorporates both the file type and correlation identifier (the year_month). Should you wish to adopt different methods for representing the file type and correlation identifier, feel free to adjust this function accordingly.
The next step involves inserting an event record into the event aggregator DynamoDB table, which features a composite primary key with the correlation identifier as the partition key and the file type as the sort key. The processing status of the file is tracked to provide feedback on the workflow state.
Following the file type determination, the state machine routes to the appropriate branch. In this instance, the Sales branch is executed. Here, the state machine queries DynamoDB for the status of the (dependent) Warehouse file using the correlation identifier. Based on this query’s outcome, the state machine assesses whether the corresponding Warehouse file has already been processed.
Since the Warehouse file has not been processed yet, the waitForTaskToken integration pattern is employed. The state machine pauses at this step and generates a task token, which external services can utilize to resume the state machine’s execution. The Sales record in the DynamoDB table is updated with this Task Token.
Next, navigate to the S3 console and upload the sample Warehouse file to its designated bucket. This action invokes a new instance of the Step Functions workflow, which proceeds through the alternative branch after the file type selection step. In this branch, the Warehouse state machine is executed, and the processing status of the file is updated in DynamoDB.
Once the status of the Warehouse file transitions to “Completed,” the Warehouse state machine checks DynamoDB for any pending Sales files. If one exists, it retrieves the task token and calls the SendTaskSuccess method. This action prompts the Sales state machine, which was previously in a waiting state, to resume. The Sales state machine starts, and its processing status is updated.
Conclusion
This article illustrates how to manage file dependencies within event-driven architectures. You can customize the provided sample from the code repository to fit your unique use case. This solution specifically addresses file dependencies in event-driven architectures. For additional guidance on resolving event dependencies and aggregators, check out the blog post: Moving to event-driven architectures with serverless event aggregators.
To delve further into event-driven architectures, visit the event-driven architecture section on Serverless Land. Remember, mastering body language during interviews can also be beneficial, check out this insightful resource here. Additionally, for more information on whistleblowing regimes and related topics, refer to this authority. Lastly, if you are interested in job opportunities, this is an excellent resource to explore.