Go · AWS · Lambda · Terraform · Event Architecture · Event Driven

Event Driven Architecture (AWS)

Amazon’s cloud service (AWS) allows for an event based architecture, that is, a series of products can be linked to one another via events, with the events emitted by one product being processed by another event.

The glue between the two products, a processing stage, is lambdas.

A simple example of the event driven architecture might look like:

Some data is written to an S3, how doesn’t matter for this demonstration, which generates an event. That event is consumed by a lambda, which processes the ‘raw’ data written to the S3, and writes transformed data to a new S3.

Why would this happen?

If we call the first S3 “REST API requests” and the second S3 “uniform requests” and understand that other requests, such as “gRPC requests” might exist, with their own lambda for processing, then it might be clear that the first is a log for incoming requests in one format, which is then processed into a uniform format for handling by the next phase, which could be a business logic lambda, and so on.

The starting point is the architectural discussion, fortunately we’ve already had that. This is going to be S3 to Lambda to S3, but most AWS services generate events that can be used this way.

The full code can be found at Event Driven Lambda Repo

Lambda code

A Go lambda is a fairly trivial piece of code, it’s the definition of a handler of an event, that calls whatever code is felt necessary.

The following has a handler called by main, and that handler calls two functions, the downloader (the data from the upstream S3 needs to be downloaded to the lambda) and the uploader (the data will be uploaded to the downstream S3).

In between the two functions would normally be a call to the business logic, which isn’t necessary for this trivial example.

// Package main
package main

import (
	"context"
	"fmt"
	"log"
	"os"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func storeLocally(sess *session.Session, s3Event events.S3Event) ([]string, error) {
	// loop over all the records in the received event
	// for the purposes of this example we're expecting one or many
	// records per event
	filenames := []string{}
	log.Print("About to loop over records")
	for _, record := range s3Event.Records {
		// create a file with the same name as the file that will be retrieved
		log.Printf("Creating file locally %s", record.S3.Object.Key)
		file, err := os.Create("/tmp/" + record.S3.Object.Key)
		if err != nil {
			log.Printf("Error creating file %v", err)
			return []string{}, err
		}

		defer file.Close()

		log.Printf("Downloading from %s", record.S3.Bucket.Name)
		downloader := s3manager.NewDownloader(sess)
		// download the object, and store it in the local file
		_, err = downloader.Download(file,
			&s3.GetObjectInput{
				Bucket: &record.S3.Bucket.Name,
				Key:    &record.S3.Object.Key,
			})
		if err != nil {
			log.Printf("Error downloading file %v", err)
			return []string{}, err
		}
		filenames = append(filenames, "/tmp/"+record.S3.Object.Key)
	}
	log.Printf("have %d filenames %s", len(filenames), filenames[0])
	return filenames, nil
}

func upload(sess *session.Session, filenames []string, bucket string) error {
	uploader := s3manager.NewUploader(sess)

	log.Print("Uploads")
	for _, filename := range filenames {
		file, err := os.Open(filename)
		if err != nil {
			log.Printf("Open File error %v", err)
			return fmt.Errorf("uploadFile::: Unable to open file %s, %v", filename, err)
		}

		defer file.Close()

		_, err = uploader.Upload(&s3manager.UploadInput{
			Bucket: &bucket,
			Key:    &filename,
			Body:   file,
		})

		if err != nil {
			// Print the error and exit.
			log.Printf("Upload error %v", err)
			return fmt.Errorf("Unable to upload %q to %q, %v", filename, bucket, err)
		}
	}
	log.Print("Uploads complete")
	return nil
}

// s3 event handler
func handler(ctx context.Context, s3Event events.S3Event) {
	sess := session.Must(session.NewSessionWithOptions(session.Options{
		SharedConfigState: session.SharedConfigEnable,
	}))
	filenames, err := storeLocally(sess, s3Event)
	if err != nil {
		log.Printf("ERROR unable to store files locally with error %v", err)
		return
	}
	// I have chosen to download from one s3, then upload the data to another s3
	// in two steps, instead of using the CopyObject API
	// https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
	// because this intermediate step is where processing would occur, for
	// example, if the upstream object was YAML, and the downstream object
	// needed to be JSON, the transformation would occur here.
	targetBucket := os.Getenv("DST_BUCKET")

	err = upload(sess, filenames, targetBucket)
	if err != nil {
		log.Printf("ERROR unable to upload files with error %v", err)
	}
}

func main() {
	// Make the handler available for Remote Procedure Call by AWS Lambda
	lambda.Start(handler)
}

The next task is to create the items in the cloud. A CloudFormation template will need to be created.

Because it’s the 21st century, the Cloud Formation is going to be managed in code, there are a few possibilities here, CloudFormation Template is the official AWS way, and Terraform is one of the many 3rd party options.

For me, terraform is the hands down best. The many many providers mean that the configuration is simple to transform for other IAAS, PAAS, and SAAS offerings.

main.tf

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 3.27"
    }
  }
  required_version = ">= 0.14.9"
}

// Environment variables
variable "aws_region" {
  default     = "us-west-2"
  description = "AWS deployment region"
  type        = string
}

variable "env_name" {
  default     = "Event-Driven-Go-Example"
  description = "Terraform environment name"
  type        = string
}

// Build the lambda file
resource "null_resource" "build_lambda_exec" {
  // run this when the main.go changes
  triggers = {
    source_code_hash = "${filebase64sha256("${path.module}/lambda/main.go")}"
  }
  provisioner "local-exec" {
    command     = "${path.module}/build.sh"
    working_dir = path.module
  }
}

// Lambda Zip file
data "archive_file" "s3_copy_lambda_function" {
  source_dir  = "${path.module}/lambda/"
  output_path = "${path.module}/lambda.zip"
  type        = "zip"
}

provider "aws" {
  profile = "default"
  region  = var.aws_region
}

// policy for the lambda
resource "aws_iam_policy" "lambda_policy" {
  name = "iam_for_${var.env_name}_lambda"

  policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
  {
      "Action": [
        "s3:ListBucket",
        "s3:GetObject",
        "s3:CopyObject",
        "s3:HeadObject"
      ],
      "Effect": "Allow",
      "Resource" : [
        "arn:aws:s3:::${aws_s3_bucket.my_producer.id}",
        "arn:aws:s3:::${aws_s3_bucket.my_producer.id}/*"
      ]
  },
   {
      "Action": [
        "s3:ListBucket",
        "s3:PutObject",
        "s3:PutObjectAcl",
        "s3:CopyObject",
        "s3:HeadObject"
      ],
      "Effect": "Allow",
      "Resource": [
        "arn:aws:s3:::${aws_s3_bucket.my_consumer.id}",
        "arn:aws:s3:::${aws_s3_bucket.my_consumer.id}/*"
      ]
    },
    {
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Effect": "Allow",
      "Resource": "*"
    }
  ]
}
EOF
}

// AWS IAM Role for the lambda
resource "aws_iam_role" "s3_copy_function" {
  name = "app-${var.env_name}-lambda"

  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": "sts:AssumeRole",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Effect": "Allow"
    }
  ]
}
EOF
}

// Attach the policy to the role
resource "aws_iam_role_policy_attachment" "terraform_lambda_iam_policy_basic_execution" {
  role       = aws_iam_role.s3_copy_function.id
  policy_arn = aws_iam_policy.lambda_policy.arn
}

// Resources

// Random string generator
resource "random_string" "unique_name" {
  length  = 8
  special = false
  upper   = false
  lower   = true
  number  = false
}

// Bucket Producer of events to lambda
resource "aws_s3_bucket" "my_producer" {
  bucket = "${random_string.unique_name.id}-my-producer"
}

// Bucket Consumer of data from lambda
resource "aws_s3_bucket" "my_consumer" {
  bucket = "${random_string.unique_name.id}-my-consumer"
  acl    = "private"
}

// allow bucket to notify lambda of events
resource "aws_lambda_permission" "allow_bucket" {
  statement_id  = "AllowExecutionFromS3Bucket"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.s3_copy_lambda_function.arn
  principal     = "s3.amazonaws.com"
  source_arn    = aws_s3_bucket.my_producer.arn // which bucket is going to call our lambda
}

// Lambda function
resource "aws_lambda_function" "s3_copy_lambda_function" {
  filename      = "lambda.zip"
  function_name = "example_lambda_name"
  role          = aws_iam_role.s3_copy_function.arn
  handler       = "s3_lambda"
  runtime       = "go1.x"
  // Set environment variables for the lambda code
  environment {
    variables = {
      DST_BUCKET = aws_s3_bucket.my_consumer.id
      REGION     = "${var.aws_region}"
    }
  }
}


resource "aws_s3_bucket_notification" "bucket_notification" {
  // bucket that sends the notification
  bucket = aws_s3_bucket.my_producer.id

  // The lambda function that will be notified
  lambda_function {
    lambda_function_arn = aws_lambda_function.s3_copy_lambda_function.arn
    events              = ["s3:ObjectCreated:*"]
  }

  depends_on = [aws_lambda_permission.allow_bucket]
}

The final piece of code is a small script that terraform is configure to call, that builds the lambda, and puts it into a zip file (lambda needs to be in a zip file, it’s then uploaded to an S3, before being deployed). Note that the CGO_ENABLE environment variable needs to be set to avoid libc compatibility issues.

build.sh


#!/bin/bash

GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o s3_lambda lambda/main.go
zip lambda.zip s3_lambda

This leaves one last thing. Deployment (oh, sure, testing too if you insist!)

To deploy it’s assumed that the build system has AWS CLI set up and configured properly, as well as Terraform, Go, and Bash.

terraform init # this installs the plugins that have been requested in the .tf file
terraform validate # easy check the script for obvious errors
terraform apply -auto-approve # create, change, or remove the resources on AWS required to match those desired 

Testing

This isn’t a scripted test. All that’s required, though is to log into the AWS console, and upload a file (I have created a foo.json in my repository for this purpose) to the S3 bucket that will produce the notifications for the lambda to react to.

The consumer s3 will then have the object copied to it.

Clean up

The main.tfabove does NOT force delete the S3s if they have objects in them. I was concerned that someone might blindly copy the file and accidentally end up in a situation where production data has left the building :)

So, manually delete the objects in the producer and consumer buckets, then run the following command.

terraform destroy -auto-approve

Summary

Event driven, serverless, architecture using AWS, Go, and terraform is a very simple affair, and allows developers to create powerful scaleable systems.

Published:
comments powered by Disqus