Implementing Email Notifications via AWS SQS → SNS → Lambda → SES Pipeline


4 views

When designing serverless notification systems, a common requirement is to process queue messages and trigger email notifications. The specific question here is whether we can reverse the typical SNS→SQS pattern to create an SQS→SNS→Lambda workflow that ultimately sends emails via SES.

While SNS can publish to SQS queues (a push model), the reverse isn't natively supported. SQS operates on a pull model where consumers retrieve messages. However, we can architect a solution using Lambda event source mappings:

// Sample CloudFormation template snippet
Resources:
  MyQueue:
    Type: AWS::SQS::Queue
  QueueProcessorFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Runtime: nodejs18.x
      Code:
        ZipFile: |
          const AWS = require('aws-sdk');
          const sns = new AWS.SNS();
          const ses = new AWS.SES();
          
          exports.handler = async (event) => {
            for (const record of event.Records) {
              await sns.publish({
                TopicArn: process.env.SNS_TOPIC_ARN,
                Message: record.body
              }).promise();
            }
          };
  SQSEventSource:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      EventSourceArn: !GetAtt MyQueue.Arn
      FunctionName: !GetAtt QueueProcessorFunction.Arn
      BatchSize: 10

Instead of direct SQS→SNS, consider these approaches:

  1. Lambda Poller Pattern:
    // Lambda function processing SQS and publishing to SNS
    const processQueue = async () => {
      const sqs = new AWS.SQS();
      const params = {
        QueueUrl: process.env.QUEUE_URL,
        MaxNumberOfMessages: 10
      };
      
      const data = await sqs.receiveMessage(params).promise();
      if (data.Messages) {
        // Process and publish to SNS
        await Promise.all(data.Messages.map(processMessage));
      }
    };
    
  2. Step Functions Integration:
    // State machine definition that orchestrates the flow
    {
      "StartAt": "ProcessQueue",
      "States": {
        "ProcessQueue": {
          "Type": "Task",
          "Resource": "arn:aws:states:::sqs:receiveMessage",
          "Next": "PublishToSNS"
        },
        "PublishToSNS": {
          "Type": "Task",
          "Resource": "arn:aws:states:::sns:publish",
          "Next": "TriggerEmailLambda"
        }
      }
    }
    

When implementing this pattern:

  • Ensure proper error handling for failed SNS publishes
  • Implement dead-letter queues for both SQS and Lambda
  • Monitor message age in the queue to detect processing delays
  • Consider message visibility timeouts and Lambda timeout alignment

Here's a complete example using CDK (TypeScript):

import * as cdk from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subscriptions from 'aws-cdk-lib/aws-sns-subscriptions';

export class NotificationStack extends cdk.Stack {
  constructor(scope: cdk.App, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Create resources
    const queue = new sqs.Queue(this, 'EmailQueue');
    const topic = new sns.Topic(this, 'NotificationTopic');
    
    // Lambda that will process SNS messages and send emails
    const emailLambda = new lambda.Function(this, 'EmailHandler', {
      runtime: lambda.Runtime.NODEJS_18_X,
      handler: 'index.handler',
      code: lambda.Code.fromInline(
        const AWS = require('aws-sdk');
        const ses = new AWS.SES();
        
        exports.handler = async (event) => {
          await ses.sendEmail({
            Destination: { ToAddresses: [event.Records[0].Sns.Message.email] },
            Message: { Body: { Text: { Data: event.Records[0].Sns.Message.content } } },
            Source: 'notifications@example.com'
          }).promise();
        };
      )
    });
    
    // Connect the pieces
    topic.addSubscription(new subscriptions.LambdaSubscription(emailLambda));
    
    // Lambda that polls SQS and publishes to SNS
    const queueProcessor = new lambda.Function(this, 'QueueProcessor', {
      runtime: lambda.Runtime.NODEJS_18_X,
      handler: 'index.handler',
      code: lambda.Code.fromInline(
        const AWS = require('aws-sdk');
        const sns = new AWS.SNS();
        
        exports.handler = async (event) => {
          await Promise.all(event.Records.map(record => {
            return sns.publish({
              TopicArn: process.env.TOPIC_ARN,
              Message: record.body
            }).promise();
          }));
        };
      ),
      environment: {
        TOPIC_ARN: topic.topicArn
      }
    });
    
    queue.grantConsumeMessages(queueProcessor);
    topic.grantPublish(queueProcessor);
  }
}

Many developers wonder about the reverse flow of SQS→SNS→Lambda compared to the more common SNS→SQS pattern. The core question is whether we can:

  1. Push messages to SQS first
  2. Have SNS consume from SQS
  3. Trigger Lambda via SNS subscription
  4. Process emails through SES

AWS doesn't natively support SNS consuming directly from SQS. The standard integration is unidirectional:


// Standard SNS→SQS integration
const params = {
  TopicArn: 'arn:aws:sns:us-east-1:123456789012:MyTopic',
  Protocol: 'sqs',
  Endpoint: 'arn:aws:sqs:us-east-1:123456789012:MyQueue'
};
sns.subscribe(params, (err, data) => {...});

Here are two practical approaches to achieve the desired workflow:

Option 1: Lambda Polling SQS

The most straightforward solution:


// Lambda function code
const AWS = require('aws-sdk');
const ses = new AWS.SES();

exports.handler = async (event) => {
  for (const record of event.Records) {
    const message = JSON.parse(record.body);
    await ses.sendEmail({
      Source: 'sender@example.com',
      Destination: { ToAddresses: [message.recipient] },
      Message: { /* email content */ }
    }).promise();
  }
};

Option 2: SQS→SNS Bridge Lambda

For cases where you need SNS's fan-out capability:


// Bridge Lambda function
exports.handler = async (event) => {
  const sns = new AWS.SNS();
  for (const record of event.Records) {
    await sns.publish({
      TopicArn: 'arn:aws:sns:us-east-1:123456789012:EmailTopic',
      Message: record.body
    }).promise();
  }
};
  • Set proper DLQ configurations for both SQS and Lambda
  • Implement message visibility timeouts matching your processing time
  • Consider batch sizes when processing SQS messages

When choosing between the options:

Approach Latency Cost Complexity
Direct SQS→Lambda Lower Lower Simpler
SQS→Bridge→SNS→Lambda Higher Higher More complex