Wednesday, September 22, 2021

TLRU Cache in Python

Implementing an LRU cache in Python is quite easy, you just use the @lru_cache decorator from the functools library. For TLRU, where items have an expiry time, no standard exists. So here is my implementation:

from collections import OrderedDict
import time


class TLRU:
    def __init__(selffunc=Nonemaxsize=128ttl=120):
        self.func = func
        self.maxsize = maxsize
        self.ttl = ttl
        self.cache = OrderedDict()
        self.decorator_with_parameters = func == None

    def get_value(selfkey):
        result = self.cache.get(key)
        if not result:
            return None

        valueexpires_at = result

        if expires_at < time.time():
            del self.cache[key]
            return None

        self.cache.move_to_end(key)
        return value


    def put_value(selfkeyvalue):
        if len(self.cache) >= self.maxsize:
            self.cache.popitem(False)
        self.cache[key] = (valuetime.time() + self.ttl)

    def _call(self, *args):
        value = self.get_value(args)
        if value:
            return value

        value = self.func(*args)
        self.put_value(argsvalue)

        return value

    def __call__(self, *args):
        if self.decorator_with_parameters:
            self.func = args[0]
            return self._call

        return self._call(*args)

    def clear_cache(self):
        self.cache.clear()

Now for an explanation of the code, for those interested.

Our cache implementation is using the OrderedDict class from the collections lib. It behaves as a dict, but keeps the items sorted in the order of insertion. 

Here is the implementation of the value retrieval method:

    def get_value(selfkey):
        result = self.cache.get(key)
        if not result:
            return None

        valueexpires_at = result

        if expires_at < time.time():
            del self.cache[key]
            return None

        self.cache.move_to_end(key)
        return value

It first looks for the value in the cache. If it does not find it, it returns None immediately. Items in our cache are tuples packing both the value and the expiry time in seconds. So our second action is to check if the value expired. If yes, we remove it from the cache and answers back that we do not have it. If we do find that the value is fresh enough, we push it to the bottom of our OrderedDict using the move_to_end method.

Storing a value goes like this:

    def put_value(selfkeyvalue):
        if len(self.cache) >= self.maxsize:
            self.cache.popitem(False)
        self.cache[key] = (valuetime.time() + self.ttl)

We do two things here. First, if we reached the cache's maximum size, we remove the oldest value. The False parameter to the popitem method allows the OrderedDict to behave like a FIFO queue. Then, we compute the expiry date and pack it together with the value inside our cache.

Here is the method that wraps our cached function:

    def _call(self, *args):
        value = self.get_value(args)
        if value:
            return value

        value = self.func(*args)
        self.put_value(argsvalue)

        return value

It looks for our value in the cache. If we find it, we just return it. In the other case, we call the wrapped function, store the result into the cache before returning it.

Now for the trickier parts. First the constructor:

    def __init__(selffunc=Nonemaxsize=128ttl=120):
        self.func = func
        self.maxsize = maxsize
        self.ttl = ttl
        self.cache = OrderedDict()
        self.decorator_with_parameters = func == None

It doesn't look tricky, but you have to know that decorators have two ways of working, depending if you use it with parameters or not:

  • Decorators without parameters call the constructor with the wrapped function as the first parameter. 
  • Decorators with parameters call the constructor without parameters.

That is why the func parameter is optional. We detect which case is running by checking if func is None.

Now for the function called by the decorator pattern:

    def __call__(self, *args):
        if self.decorator_with_parameters:
            self.func = args[0]
            return self._call

        return self._call(*args)

Again, two cases:

  • Decorators without parameters call this function with the wrapped function's parameters, expecting the value as a result
  • Decorators with parameters call this function with the wrapped function's reference, expecting a reference to the wrapper as a result.

So  if we have parameters in our decorator, we store the wrapped function, which is our first method argument, into the func attribute, then return the wrapper reference. In the other case, we simply call the wrapper immediately.

You can use it like this:

@TLRU
def calc(vy):
    print("Calculating"vy)
    return v * y

Or with parameters:

@TLRU(maxsize=10000ttl=360)
def calc(vy):
    print("Calculating"vy)
    return v * y


Friday, August 13, 2021

Find reasons of failed DynamoDB Transaction with boto3

 When working with Transactions in DynamoDB, you pack several actions into one query. When one of them failed, all the queries are rolled back. But how do you know which one failed? Reading the documentation, you find the following note:

If using Java, DynamoDB lists the cancellation reasons on the CancellationReasons property. This property is not set for other languages. Transaction cancellation reasons are ordered in the order of requested items, if an item has no error it will have NONE code and Null message.

 Why only in Java? I'm working in Python, using the boto3 library. Do I still have a way to access the information? As it turns out, there is a workaround. When I print the message of  my exception, I see the following:

botocore.errorfactory.TransactionCanceledException: An error occurred (TransactionCanceledException) when calling the TransactWriteItems operation: Transaction cancelled, please refer cancellation reasons for specific reasons [None, ConditionalCheckFailed, None]

 The reasons array is printed at the end of the message. If it is already there, why not have it in a more accessible format? From there, it is not difficult to extract it:

import re

def reasons(e):
    m = re.search("\\[(.*)\\]"str(e))
    reasons = m.group(1).split(", ")
    return [None if reason == "None" else reason for reason in reasons]

Now I can use it like that:

import boto3

client = boto3.client("dynamodb")

try:
    client.transact_write_items(
        [
            {
                "ConditionCheck": {... }
            },
            {
                "Put": {
                    "Item": {...},
                    "ConditionExpression": ...
                }
            },
            {
                "Put": {...}
            },
        ]
    )
except client.exceptions.TransactionCanceledException as e:
    if reasons(e)[0is not None:
        raise MissingItemError()
    raise OperationFailedError()

I hope this array will be added to the exception's response structure as a normal field in a future version. At least, the feature request exists in boto3.

Wednesday, April 28, 2021

Java Haiku: Forever


public class Forever {
    public void forever() {
        ever = true;
        for(;ever;);
    }
}

 

Thursday, April 15, 2021

Creating AWS resources in pytest fixtures with moto

 When unit testing functions that access AWS resources, the easiest way is to use the moto library. For instance, I have to test several functions that access an SQS queue, so I proceed this way:

import boto3
from moto import mock_sqs

def create_queue():
    client = boto3.client("sqs")
    queue_url = client.create_queue(QueueName="queue")["QueueUrl"]
    sqs = boto3.resource("sqs")
    return sqs.Queue(queue_url)

@mock_sqs
def test_something()
    queue = create_queue()
    ...

Since I had this create_queue() call at the beginning of most of my test functions, I wanted to make it a fixture. So i tried this way:

import boto3
from moto import mock_sqs
from pytest import fixture

@fixture
@mock_sqs
def queue():
    client = boto3.client("sqs")
    queue_url = client.create_queue(QueueName="queue")["QueueUrl"]
    sqs = boto3.resource("sqs")
    return sqs.Queue(queue_url)

@mock_sqs
def test_something(queue)
    ...

Unfortunately, this raised an error in my test functions stating that the queue does not exist. The reason for it is that the decorator @mock_sqs on the fixture would destroy my SQS queue as soon as it would leave the queue() method.

The solution is simple: do not use the mock as a decorator, but trigger it when the fixture is initializing and destroy it when it terminates. That means using a yield within the fixture to return the queue:

import boto3
from moto import mock_sqs
from pytest import fixture

@fixture
def queue():
    mock_sqs().start()

    client = boto3.client("sqs")
    queue_url = client.create_queue(QueueName="queue")["QueueUrl"]
    sqs = boto3.resource("sqs")
    yield sqs.Queue(queue_url)
    
    mock_sqs().stop()

def test_something(queue)
    ...

Notice that we have to drop the decorator on the test function as well.

Thanks for the answers to this moto issue.

Monday, February 15, 2021

Airflow: Glue Operator looses its region name

 The other day, I tried to run an AWS Glue script from our Airflow instance. Nothing fancy, it would just convert a parquet file to CSV between two S3 buckets. Looking for an operator to use, I found that there is indeed a Glue Operator. It looks pretty easy to configure, so I tried it out:

parquet2csv_glue = AwsGlueJobOperator(
    task_id='parquet2csv_glue',
    dag=parquet2csv_dag,
    job_name='glue-parquet2csv',
    region_name='eu-west-3',
    aws_conn_id=None,
    script_args={
        "--SOURCE_PATH": input_path,
        "--TARGET_PATH": output_path,
    }
)

After triggering the DAG, it would fail, telling me that I need to set the region name. Well, I thought I did!

It probably comes from the fact that recent boto3 versions require region to be set, while it was optional in the past. As airflow is open source, I decided to have a look at the code. As it turns out, the bug was not too hard to spot. The Glue Operator creates a Glue Hook, which declares this constructor:

class AwsGlueJobHook(AwsBaseHook):
    def __init__(
        self,
        s3_bucket: Optional[str] = None,
        job_name: Optional[str] = None,
        desc: Optional[str] = None,
        concurrent_run_limitint = 1,
        script_location: Optional[str] = None,
        retry_limitint = 0,
        num_of_dpusint = 10,
        region_name: Optional[str] = None,
        iam_role_name: Optional[str] = None,
        *args,
        **kwargs,
    ):
        self.job_name = job_name
        self.desc = desc
        self.concurrent_run_limit = concurrent_run_limit
        self.script_location = script_location
        self.retry_limit = retry_limit
        self.num_of_dpus = num_of_dpus
        self.region_name = region_name
        self.s3_bucket = s3_bucket
        self.role_name = iam_role_name
        self.s3_glue_logs = 'logs/glue-logs/'
        kwargs['client_type'] = 'glue'
        super().__init__(*args, **kwargs)

The Hook derives from a base class, AwsBaseHook, that handles the common connection part for all AWS Hooks. The call to the constructor of the super class does not forward the region name. It should probably be called like this:

        super().__init__(region_name=region_name, *args, **kwargs)

I opened a bug report. But in the meanwhile, I still needed my code to work. So I found quite an ugly patch. There is probably better, but since I could see that the boto3 session was created in the AwsBaseHook class, and since our Airflow instance is running from an EC2 and I can just inherit its profile, I made this simple workaround:

import boto3
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

# ugly patch to resolve the bug in the Glue Hook
def _get_credentials(selfregion_name):
    return boto3.session.Session(region_name="eu-west-3"), None

AwsBaseHook._get_credentials = _get_credentials

Hopefully, it won't stay here long...