Tuesday, March 31, 2020

Implement a Whitelist in Terraform

This happens sometimes that you need to implement a variable in Terraform, that can only take an acceptable list of values. In my case, it was a list of DNS names that needed to be accepted by a security team, and stored in a file on an S3.
The difficulty is to make the Terraform fail if you decide to use a bad value. There are several ways to do that, here is mine:

data "aws_s3_bucket_object" "white_list" {
  bucket = "my-bucket"
  key    = "my_white_list"
}

locals {
  value_to_check = "SomeValue"

  white_list = split(
    " ",
    replace(data.aws_s3_bucket_object.white_list.body, "/\\s+/", " "),
  )
  allowed = zipmap(local.white_list, local.white_list)[local.value_to_check]
}

The data part is fetching my file from an S3, but you can imagine using a simple file command, or even a hardcoded list.
Then I am setting the value to check, which is hardcoded here for the example, but it will typically be calculated or retrieved from some other place. I then create a Terraform list from the file, by removing any extra space and splitting the lines.
Finally, here is my way of making Terraform fail. I create a map from the white list, using the zipmap function, and get the value from it. If the value is not in the map, Terraform will just stop with an error.

Thursday, March 26, 2020

Mock boto3 services not handled by moto

We have a pattern to mock AWS services in our python lambdas. First, in our lamba code, we initialize boto3 clients with properties:

import os
import boto3

@property
def REGION_NAME():
    return os.getenv("REGION", "eu-west-3")

@property
def SQS():
    return boto3.client(service_name="sqs", region_name=REGION_NAME.fget())

def lambda_handler(event, context):
    sqs_client = SQS.fget()
As you can see, the region is also a property. The reason for it is that all clients in moto are declared on the us-east-1 region.
Our unit tests are all in a test sub-folder of our lambda. To write the tests, we usually follow this pattern:
from moto import mock_sqs
from pytest import fixture

from ..mylambda import (
    lambda_handler,
    SQS
)

@fixture(autouse=True)
def prepare_test_env(monkeypatch):
    monkeypatch.setenv("REGION", "us-east-1")   

@mock_sqs
def test_mylambda():
    SQS.fget().create_queue(QueueName='MyQueue')
    
    #do the test...
We import from the lambda in the parent folder the methods to test, as well as the properties. For this to work, we have to create an empty __init__.py file in that directory. We then patch the environment variables, including the region that needs setting to us-east-1. In our test function, we have to mock the boto3 client, using the appropriate moto annotation. Then we write our test code.

In some cases, the boto3 client has no mock in moto. That is our case for Step Functions for instance (I know it is in preparation, but not yet ready as the time of the writing). For those cases, we use the following pattern:

import boto3

@property
def SFN():
    return boto3.client("stepfunctions")
In our lambda, nothing changes. We are still using a property. However, in the unit test, we have to use a patch:

from unittest.mock import (
    PropertyMock,
    patch
)
from ..mylambda import lambda_handler

def test_lambda():
    with patch('mylambda.mylambda.SFN', new_callable=PropertyMock) as mock_stepfunctions:
        lambda_handler(event)

        mock_stepfunctions.fget().start_execution.assert_called_with(
            stateMachineArn="MyMachine", 
            name="State-Machine-0", 
            input="{}"
        )
    
We patch the property SFN with a PropertyMock. By giving it a name, we can then use the mock to assert that it was called with the correct parameters.

Tuesday, March 24, 2020

Use Terraform output in Jenkins file

In a Jenkins pipeline file, you might have several Terraform stacks running in different stages. Make them communicating is usually pretty easy, using the data construct, or the remote state. However, making a Terraform step communicating with another step running shell commands for instance requires a bit more work. Of course, you have the terraform output command, but there is a small glitch: storing the output in a variable ends up storing also a newline character at its end.
So here is a command that helps work around that problem:

def BUCKET_NAME = ''

pipeline {
    stages {
        stage('Terraform') {
            steps {
                sh "terraform init"
                sh "terraform apply"
                script {
                    BUCKET_NAME = sh (script: 'terraform output bucket_name | xargs echo -n', returnStdout: true)
                }
            }
        }
        stage('Another') {
            steps {
                sh "echo ${BUCKET_NAME}"
            }
        }
    }
}

Sunday, March 22, 2020

Mock SQL connection in Python

I have a Python code that uses pyodbc library to send SQL queries to an MSSQL database. I would like to unit test it, but I do not want to install the pyodbc library on my testing machine. Fortunately, Python allows us to easily mock things, even Modules.
The first hurdle is to avoid looking for the pyodbc library in the import statement. For modules, Python has quite a simple method. It stores them in the dict sys.modules the first time it meets an import. The solution is to insert a Mock in this dict before we import the file to test. For instance, we have this line at the beginning of the file we want to test:

import pyodbc
In our test file, we will add these lines:

import sys
from unittest.mock import MagicMock

mock_pyodbc = MagicMock()
sys.modules['pyodbc'] = mock_pyodbc

import module_to_test
We insert a MagicMock into the modules dict. After that, we simply import our own Module. Pyodbc is already imported, so no need to look for it again.
The second part is to test that the code we are testing is calling the query we expect it to. The good thing with Mocks is that we do not have to implement any code. We let it handle everything. Here is the code we are testing:

connection = pyodbc.connect(
    connection_string, autocommit=True, timeout=ODBC_CONNECTION_TIMEOUT
)
with connection.cursor() as cursor:
    cursor.execute(query)
Every time a method is called, the Mock will generate another Mock object. The reason we chose a MagicMock is that it can handle also the magic methods, such as __enter__() here, which is called because of the with construct.
So how do we test that our query is called? Here is the testing line:
# Check that the correct query is being executed
mock_pyodbc.connect().cursor().__enter__().execute.assert_called_with(expected_query)


Saturday, March 21, 2020

Send Batch of Messages to SQS

We have a simple code that can send up to several thousand messages to an SQS queue. Using Python and boto3, the code looks like this:

sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME)
for message in messages:
    queue.send_message(message)

When you have really a lot of messages in an array, it is possible to send them by batch of up to 10 messages, using the send_message_batch method. When doing this, there are two problems to solve: creating the batches of 10 messages, and generating an ID for each message. AWS enforce the generation of this ID so it can send back a response containing the list of messages that failed or succeeded, identified by the ID.

Here is the new code:

sqs = boto3.resource('sqs')
queue = sqs.get_queue_by_name(QueueName=SQS_QUEUE_NAME)

for i in range(0, len(messages), 10):
    chunk = messages[i:i+10]
    queue.send_messages(Entries=[
        {
            "Id": "MSG" + str(id), 
            "MessageBody": message
        } for id, message in zip(range(10), chunk)
    ])

Our loop now jump to every tenth message. Inside the loop, we create a chunk of 10 messages, and use the send_message_batch method to send it. You can see that we use a list comprehension the runs over both the chunk and a range to generate the ID.

Monday, March 9, 2020

Terraform: move resource between state files

When you have several terraform stacks to handle, it might happen that you realize that one resource is created in the incorrect stack. The easiest way to move it is usually to remove it from one stack, apply, then add it to the other stack, and apply again. But for some resources, this is solution is difficult to implement.
In my case, it was an S3 bucket, containing several big files. It would have been a long process to backup the files, destroy them from the bucket, then restore them on the destination bucket. So here is the way to move a resource between stacks without destroying it.

First, you have to pull your destination state file locally. Say you want to move your module my_bucket from a stack in folderA to another stack in folderB:

cd folderB terraform state pull > folderB.state

Second step, you have to move your resource to its new destination:

cd ../folderA terraform state mv -state-out ../folderB/folderB.state module.my_bucket module.my_bucket

The mv command takes the source and destination name of your resource as parameters, so it is possible to rename your resource as you move it. As the final step, you push your destination state file to its remote location:

cd ../folderB terraform state push folderB.state