Logging RedisCluster on ElasticAPM
I have been working at the rapid changing software development field over 19 years as a software engineer, architect, devops. I am an expert in designing and implementing scalable architectures to deal with large transactions and massive requests on e-commerce and messaging services. I am skillful at using opensource technologies like Node.js, Nginx, Redis, Cassandra, ELK and Kafka to secure site reliability. I also have proficiency in C#, Java, Javascript, Python and Go.
ElasticAPM agent for python doesn't support RedisCluster properly so custom instrumentation should be implemented to enable RedisCluster logging.
- apm 4.x instrumentation : https://github.com/elastic/apm-agent-python/tree/4.x/elasticapm/instrumentation/packages
Because ElasticAPM can hook call stack to build span data for logging with custom instrumentation, I created redisclusterInstrumentation.py file to support RedisCluster logging.
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import capture_span
class RedisClusterInstrumentation(AbstractInstrumentedModule):
name = "rediscluster"
instrument_list = [("rediscluster.client", "RedisCluster.execute_command"),
("rediscluster.client", "RedisCluster.pipeline"),
("rediscluster.client", "StrictRedisCluster.execute_command"),
("rediscluster.client", "StrictRedisCluster.pipeline")]
def call(self, module, method, wrapped, instance, args, kwargs):
if method == "StrictRedisCluster.pipeline" or method == "RedisCluster.pipeline":
wrapped_name = "PIPELINE BEGIN"
else:
if len(args) > 1:
wrapped_name = "{} {}".format(args[0], args[1])
elif len(args) == 1:
wrapped_name = "{}".format(args[0])
else:
wrapped_name = self.get_wrapped_name(wrapped, instance, method)
extra = {"db": {"type": "sql", "statement": ' '.join(map(str, args)) }}
with capture_span(wrapped_name, "cache.rediscluster", extra=extra, leaf=True):
return wrapped(*args, **kwargs)
class RedisClusterPipelineInstrumentation(AbstractInstrumentedModule):
name = "rediscluster"
instrument_list = [("rediscluster.pipeline", "StrictClusterPipeline.execute_command"),
("rediscluster.pipeline", "StrictClusterPipeline.execute")]
def call(self, module, method, wrapped, instance, args, kwargs):
if method == "StrictClusterPipeline.execute":
wrapped_name = "PIPELINE EXECUTE"
else:
if len(args) > 1:
wrapped_name = "PIPELINE {} {}".format(args[0], args[1])
elif len(args) == 1:
wrapped_name = "PIPELINE {}".format(args[0])
else:
wrapped_name = self.get_wrapped_name(wrapped, instance, method)
extra = {"db": {"type": "sql", "statement": ' '.join(map(str, args)) }}
with capture_span(wrapped_name, "cache.rediscluster", extra=extra, leaf=True):
return wrapped(*args, **kwargs)
Adding redisclusterInstumentation module to base.py needed.
from elasticapm.instrumentation import register
from frameworks.redisclusterInstrumentation import RedisClusterInstrumentation, RedisClusterPipelineInstrumentation
register.register('frameworks.redisclusterInstrumentation.RedisClusterInstrumentation')
register.register('frameworks.redisclusterInstrumentation.RedisClusterPipelineInstrumentation')
Finally, we can see RedisCluster logging on ElasticAPM working.