Skip to main content

Command Palette

Search for a command to run...

Logging RedisCluster on ElasticAPM

Updated
2 min read
S

I have been working at the rapid changing software development field over 19 years as a software engineer, architect, devops. I am an expert in designing and implementing scalable architectures to deal with large transactions and massive requests on e-commerce and messaging services. I am skillful at using opensource technologies like Node.js, Nginx, Redis, Cassandra, ELK and Kafka to secure site reliability. I also have proficiency in C#, Java, Javascript, Python and Go.

ElasticAPM agent for python doesn't support RedisCluster properly so custom instrumentation should be implemented to enable RedisCluster logging.

Because ElasticAPM can hook call stack to build span data for logging with custom instrumentation, I created redisclusterInstrumentation.py file to support RedisCluster logging.

# -*- coding: utf-8 -*-
from __future__ import absolute_import

from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import capture_span

class RedisClusterInstrumentation(AbstractInstrumentedModule):
    name = "rediscluster"

    instrument_list = [("rediscluster.client", "RedisCluster.execute_command"),
                       ("rediscluster.client", "RedisCluster.pipeline"),
                       ("rediscluster.client", "StrictRedisCluster.execute_command"),
                       ("rediscluster.client", "StrictRedisCluster.pipeline")]

    def call(self, module, method, wrapped, instance, args, kwargs):
        if method == "StrictRedisCluster.pipeline" or method == "RedisCluster.pipeline":
            wrapped_name = "PIPELINE BEGIN"
        else:
            if len(args) > 1:
                wrapped_name = "{} {}".format(args[0], args[1])
            elif len(args) == 1:
                wrapped_name = "{}".format(args[0])
            else:
                wrapped_name = self.get_wrapped_name(wrapped, instance, method)

        extra = {"db": {"type": "sql", "statement": ' '.join(map(str, args)) }}
        with capture_span(wrapped_name, "cache.rediscluster", extra=extra, leaf=True):
            return wrapped(*args, **kwargs)

class RedisClusterPipelineInstrumentation(AbstractInstrumentedModule):
    name = "rediscluster"

    instrument_list = [("rediscluster.pipeline", "StrictClusterPipeline.execute_command"),
                       ("rediscluster.pipeline", "StrictClusterPipeline.execute")]

    def call(self, module, method, wrapped, instance, args, kwargs):
        if method == "StrictClusterPipeline.execute":
            wrapped_name = "PIPELINE EXECUTE"
        else:
            if len(args) > 1:
                wrapped_name = "PIPELINE {} {}".format(args[0], args[1])
            elif len(args) == 1:
                wrapped_name = "PIPELINE {}".format(args[0])
            else:
                wrapped_name = self.get_wrapped_name(wrapped, instance, method)

        extra = {"db": {"type": "sql", "statement": ' '.join(map(str, args)) }}
        with capture_span(wrapped_name, "cache.rediscluster", extra=extra, leaf=True):
            return wrapped(*args, **kwargs)

Adding redisclusterInstumentation module to base.py needed.

from elasticapm.instrumentation import register
from frameworks.redisclusterInstrumentation import RedisClusterInstrumentation, RedisClusterPipelineInstrumentation

register.register('frameworks.redisclusterInstrumentation.RedisClusterInstrumentation')
register.register('frameworks.redisclusterInstrumentation.RedisClusterPipelineInstrumentation')

Finally, we can see RedisCluster logging on ElasticAPM working.