Time to write some code
Contents
Time to write some code#
We are going to create a simple CR(UD) application using DynamoDB, based on serverless architecture.
We are going to add a DynamoDB Cloudformation definition and 2 endpoints for
POST and GET our data from our serverless API.
DynamoDB#
Add this section to your serverless.yml
at the bottom
resources:
Resources:
UsersTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: userId
AttributeType: S
KeySchema:
- AttributeName: userId
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: Users
Extending the API#
Let’s change this:
functions:
hello:
handler: handler.hello
events:
- http:
path: /
method: get
to this:
functions:
get-user:
handler: handler.get_user
events:
- http:
path: /users/{user_id}
method: get
create-user:
handler: handler.create_user
events:
- http:
path: /users
method: post
Bonus: if you have time, why don't you try to implement UPDATE and DELETE ?
Add Requirements file#
Craete a file called requirements.txt
and paste this inside
boto3==1.17.64
requests==2.28.1
Let’s implement our app#
Edit this file: handler.py
.
Add the DynamoDB client:
import boto3
import json
USERS_TABLE = "Users"
dynamodb_client = boto3.client(
'dynamodb', region_name='us-east-1', endpoint_url='http://localstack:4566'
)
def _verify_response(response):
if "Error" in response:
print(response)
raise Exception(f"HTTPStatusCode: {response['ResponseMetadata']['HTTPStatusCode']}"
f"message={response['Error']['Message']}")
And now add the api functions:
def get_user(event, context):
user_id = event['pathParameters']["user_id"]
result = dynamodb_client.get_item(
TableName=USERS_TABLE, Key={'userId': {'S': user_id}}
)
_verify_response(result)
item = result.get('Item')
if not item:
return {"statusCode": 404, "body": json.dumps({})}
body = {'user_id': item.get('userId').get('S'), 'name': item.get('name').get('S')}
response = {"statusCode": 200, "body": json.dumps(body)}
return response
def create_user(event, context):
print(f"{event=}")
request = json.loads(event["body"])
user_id = request["user_id"]
user_name = request["user_name"]
result = dynamodb_client.put_item(
TableName=USERS_TABLE, Item={'userId': {'S': user_id}, 'name': {'S': user_name}}
)
_verify_response(result)
body = {
"message": "Successfuly added a new user!"
}
response = {"statusCode": 201, "body": json.dumps(body)}
return response