Application centric Cloudwatch logging in AWS Lambda functions with python3+ runtimes


Application centric logging is a system where there are one or more components all directing thier log entries to a single logger. In the AWS context, this could mean an application composed of one or more AWS Lamba functions each logging to a single application-wide AWS CloudWatch log stream. By “single”, I mean single to the application, not to each function.

In lambda functions with python runtimes, the default mode of logging is one log stream per lambda. We can do this via the print() function or the logging module. But there are sometimes situations where mulltiple lambdas are co-operating to solve a larger problem, where the co-operation is synchronous, and where it would be of value to be able to view a unified log stream for events accross multiple lambdas.

How do we achieve this, with a simple client interface? In this post, I present a solution.

A Solution

Include in the packaging of your AWS Lambda function, the following python 3 script with filename “”.

##  CustomLogging class
import boto3
import time, sys
import logging 

def coerceLoggingType( logType):
  if (logType is None) or (logType == ''):
    logType = logging.INFO
  elif isinstance( logType, str):
    logType = getattr( logging, logType.upper(), logging.INFO)
  return logType

global stockFormats
global defaultFormat
global levelNames

defaultFormat = '#standard'

stockFormats = {
  '#standard': '{level}: {func}: {caller}: ',
  '#short'   : '{level}: {func}: ',
  '#simple'  : '{func}: '} 

levelNames = {
  logging.DEBUG   : 'DEBUG',
  logging.INFO    : 'INFO',
  logging.WARNING : 'WARNING',
  logging.ERROR   : 'ERROR',

botoLoggers = ['boto', 'boto3', 'botocore', 'urllib3']

def _json_formatter( obj):
  """Formatter for unserialisable values."""
  return str( obj)

class JsonFormatter( logging.Formatter):
  """AWS Lambda Logging formatter.
  Formats the log message as a JSON encoded string.  If the message is a
  dict it will be used directly.  If the message can be parsed as JSON, then
  the parse d value is used in the output record.
  def __init__( self, **kwargs):
    super( JsonFormatter, self).__init__()
    self.format_dict = {
      'timestamp': '%(asctime)s',
      'level': '%(levelname)s',
      'location': '%(name)s.%(funcName)s:%(lineno)d'}
    self.default_json_formatter = kwargs.pop( 'json_default', _json_formatter)

  def format( self, record):
    record_dict = record.__dict__.copy()
    record_dict['asctime'] = self.formatTime( record)
    log_dict = {
      k: v % record_dict
      for k, v in self.format_dict.items() if v}
    if isinstance( record_dict['msg'], dict):
      log_dict['message'] = record_dict['msg']
      log_dict['message'] = record.getMessage()
    # Attempt to decode the message as JSON, if so, merge it with the
    # overall message for clarity.
      log_dict['message'] = json.loads( log_dict['message'])
    except ( TypeError, ValueError):
    if record.exc_info:
      # Cache the traceback text to avoid converting it multiple times
      # (it's constant anyway)
      # from logging.Formatter:format
      if not record.exc_text:
        record.exc_text = self.formatException( record.exc_info)
    if record.exc_text:
      log_dict['exception'] = record.exc_text
    json_record = json.dumps( log_dict, default=self.default_json_formatter)
    if hasattr( json_record, 'decode'):  # pragma: no cover
      json_record = json_record.decode( 'utf-8')
    return json_record

def setupCanonicalLogLevels( logger, level, fmt, formatter_cls=JsonFormatter, boto_level=None, **kwargs):
  if not isinstance( logger, logging.Logger):
    raise Exception( 'Wrong class of logger passed to setupCanonicalLogLevels().')
  if logger is not None:
    logger.setLevel( level)
  logging.root.setLevel( level)
  if fmt is not None:
    logging.basicConfig( format=fmt)
    fmtObj = logging.Formatter( fmt)
    fmtObj = None
  for handler in logging.root.handlers:
      if fmtObj is not None:
        handler.setFormatter( fmtObj)
      elif formatter_cls is not None:
        handler.setFormatter( formatter_cls( **kwargs))
  if boto_level is None:
    boto_level = level
  for loggerId in botoLoggers:
      logging.getLogger( loggerId).setLevel( boto_level)
class NullLogger():
  def __init__( self):
  def purge( self):
  def log( self, level, msg, withPurge=False):
  def debug( self, msg, withPurge=False):
  def info( self, msg, withPurge=False):
  def warning( self, msg, withPurge=False):
  def critical( self, msg, withPurge=False):
  def error( self, msg, withPurge=False):
  def exception( self, msg, withPurge=False):
  def classCode( self):
    return '#null'
  def isPurgeable( self):
    return False
class PrintLogger():
  def __init__( self, threshold):
    self.threshold = threshold
  def purge( self):
  def log( self, level, msg, withPurge=False):
    if level >= self.threshold:
      print( msg)
  def debug( self, msg, withPurge=False):
    self.log( logging.DEBUG, msg, False)
  def info( self, msg, withPurge=False):
    self.log( logging.INFO, msg, False)
  def warning( self, msg, withPurge=False):
    self.log( logging.WARNING, msg, False)
  def critical( self, msg, withPurge=False):
    self.log( logging.CRITICAL, msg, False)
  def error( self, msg, withPurge=False):
    self.log( logging.ERROR, msg, False)
  def exception( self, msg, withPurge=False):
    self.log( logging.ERROR, msg, False)
  def classCode( self):
    return '#print'
  def isPurgeable( self):
    return False
def createPolymorphicLogger( logClass, logGroup, logStream, logLevel = logging.INFO, functionName = None, msgFormat = None):
  if logClass == 'cloud-watch':
    return CustomLogging( logGroup, logStream, logLevel, functionName, msgFormat)
  elif logClass == '#print':
    return PrintLogger( logLevel)
  elif (logClass == '#null') or (logClass is None):
    return NullLogger()
  elif isinstance( logClass, dict) and ('logging' in logClass):
    loggingParams    = logClass.get( 'logging', {})
    cloudWatchParams = loggingParams.get( 'cloud-watch', {})
    if msgFormat is None:
      msgFormat = '#mini'
    actualLogClass  = loggingParams.get( 'class')
    logGroup     = cloudWatchParams.get( 'group'   , logGroup)
    logStream    = cloudWatchParams.get( 'stream'  , logStream)
    logLevel     =    loggingParams.get( 'level'   , logLevel)
    functionName = cloudWatchParams.get( 'function', functionName)
    msgFormat    = cloudWatchParams.get( 'format'  , msgFormat)
    return createLogger( actualLogClass, logGroup, logStream, logLevel, functionName, msgFormat)
  elif isinstance( logClass, dict) and ('class' in logClass):
    canonicalLogClassRecord = {'logging': logClass}
    return createLogger( canonicalLogClassRecord, logGroup, logStream, logLevel, functionName, msgFormat)
  elif logClass == '#standard-logger':
    logger = logging.getLogger( name=logStream)
    if msgFormat is None:
      msgFormat = '[%(levelname)s] %(message)s'
    setupCanonicalLogLevels( logger, logLevel, msgFormat, JsonFormatter, logging.ERROR)
    return logger
    raise Exception( f'Unrecognised log class {logClass}')
def getClassCode( logger):
  code = '#null'
  if isinstance( logger, logging.Logger):
    code = '#standard-logger'
  elif logger is not None:
      code = logger.classCode()
      code = '#unrecognised'
  return code
def isLoggerPurgeable( logger):
  result = False
  if (not isinstance( logger, logging.Logger)) and (logger is not None):
      result = logger.isPurgeable()
  return result
class CustomLogging:
  def __init__( self, logGroup, logStream, logLevel = logging.INFO, functionName = None, msgFormat = None):
    """ logGroup is the name of the CloudWatch log group. If none, the messages passes to print.
        logStream is the name of the stream. It is required. It is a string. There is no embedded date processing.
        logLevel is one of the logging level constants or its string equivalent. Posts below this level will be swallowed.
        functionName is the name of the lambda.
        msgFormat determines the logged message prefix. It is either a format string, a label or a function.
          If it is a format string, the following substitution identifiers:
            {level}  The message log level.
            {func}   The passed functionName
            {caller} The python caller function name
          If it is a label, is one of:
            #standard   - This is the default.
          If it is a function (or callable object), it must be a function that returns a prefix string with
            the following input parameters in order:
              level           - passed message level
              functionName  - constructed function name
              caller          - invoker caller name
              logMsg          - passed message
        EXAMPLE USAGE 1:
          import custom_logging, logging
          logger = CustomLogging( '/aws/ec2/prod/odin', '2022-06-29-MLC_DAILY-143', logging.INFO, 'CoolLambdaFunc', '#mini')
 'Hello friend! This is an info')
          logger.error( 'I broke it!')
        EXAMPLE USAGE 2:
          import custom_logging, logging
          logger = CustomLogging( None, None, logging.DEBUG, 'CoolLambdaFunc', '#mini')
 'This is the same as print')
        EXAMPLE USAGE 3:
          import custom_logging, logging
          logger = CustomLogging( None, None, logging.WARNING, 'CoolLambdaFunc', '{caller} | {level} !! {func}: ')
        EXAMPLE USAGE 3:
          import custom_logging, logging
          def colourMePink( level, functionName, caller, logMsg):
            if level == logging.DEBUG:
              prefix = '{level}: {func}: {caller}: '.format( level = sLevel, func = functionName, caller = caller)
            elif  level == logging.INFO:
              prefix = ''
              prefix = '{level}: '.format( level = sLevel)
            return prefix
          logger = CustomLogging( None, None, logging.INFO, None, colourMePink)
    self.logs           = boto3.client( 'logs', region_name='ap-southeast-2')
    self.logEvents      = []
    self.functionName = functionName
    if self.functionName is None:
      self.functionName = ''
    self.logGroup       = logGroup
    self.logStream      = logStream
    self.msgFormat = msgFormat
    if self.msgFormat is None:
      self.msgFormat = defaultFormat
    if isinstance( self.msgFormat, str) and (self.msgFormat in stockFormats):
      self.msgFormat = stockFormats[self.msgFormat]
    elif self.msgFormat == '#mini':
      self.msgFormat = self._miniFormat
    self.logLevel       = coerceLoggingType( logLevel)
    self.sequenceToken  = None
    self.sequenceTokenIsValid = False
    self.maxEventsInBuffer = 20
    self.maxBufferAgeMs = 60000 # 1 minute.
  def _formatMessage( self, caller, logType, logMsg):
    prefix = ''
    if caller is None:
        caller = sys._getframe(3).f_code.co_name
        caller = ''
    sLevel = levelNames.get( logType, str( logType))
    if isinstance( self.msgFormat, str):
      prefix = self.msgFormat.format( level = sLevel, func = self.functionName, caller = caller)
    elif callable( self.msgFormat):
      prefix = self.msgFormat( logType, self.functionName, caller, logMsg)
    return prefix + str( logMsg)
  def _miniFormat( self, level, functionName, caller, logMsg):
    prefix = ''
    if level >= logging.WARNING:
      prefix = levelNames[ level] + ': '
    if functionName != '':
      prefix = prefix + functionName + ': '
    return prefix
  def _getSequenceToken( self):
    self.sequenceToken = None
    self.sequenceTokenIsValid = True
      response = self.logs.describe_log_streams( logGroupName=self.logGroup, logStreamNamePrefix=self.logStream)
    except self.logs.exceptions.ResourceNotFoundException:
      return 'group-not-found'
      if 'uploadSequenceToken' in response['logStreams'][0]:
        self.sequenceToken = response['logStreams'][0]['uploadSequenceToken']
      if self.sequenceToken == '':
        self.sequenceToken = None
    if self.sequenceToken is None:
      return 'stream-not-found-or-virgin-stream'
      return None
  def put( self, logMsg, logType = logging.INFO, withPurge=False, callFunc = None):
    logType = coerceLoggingType( logType)
    if self.logLevel <= logType:
      if self.logGroup is not None:
        timestamp = int( round( time.time() * 1000))
        message = self._formatMessage( callFunc, logType, logMsg)
        logEvent = {'timestamp': timestamp, 'message': message}
        if self.logLevel == logging.DEBUG:
         print( message)
        self.logEvents.append( logEvent)
        count = len( self.logEvents)
        if withPurge or \
           (count >= self.maxEventsInBuffer) or \
           ((count >= 1) and ((timestamp - self.logEvents[0]['timestamp']) >= self.maxBufferAgeMs)):
        print( logMsg)
  def classCode( self):
    return 'cloud-watch'
  def _primitive_put_log_events( self):
    event_log = {
      'logGroupName' : self.logGroup,
      'logStreamName': self.logStream,
      'logEvents'    : self.logEvents}
    if self.sequenceToken is not None:
      event_log['sequenceToken'] = self.sequenceToken
      response = self.logs.put_log_events( **event_log)
      self.sequenceToken = response.get( 'nextSequenceToken')
      self.sequenceTokenIsValid = True
      result = None
    except self.logs.exceptions.ResourceAlreadyExistsException:
      self.sequenceTokenIsValid = False
      result = None
    except self.logs.exceptions.DataAlreadyAcceptedException:
      self.sequenceTokenIsValid = False
      result = None
    except self.logs.exceptions.InvalidSequenceTokenException:
      self.sequenceTokenIsValid = False
      result = 'invalid-sequence-token'
    except self.logs.exceptions.ResourceNotFoundException:
      self.sequenceTokenIsValid = True
      self.sequenceToken = None
      result = 'stream-not-found'
    return result
  def _primitive_create_log_stream( self):
    self.sequenceTokenIsValid = True
    self.sequenceToken = None
      self.logs.create_log_stream( logGroupName=self.logGroup, logStreamName=self.logStream)
      result = None
    except self.logs.exceptions.ResourceAlreadyExistsException:
      self.sequenceTokenIsValid = False
      result = None
    except self.logs.exceptions.ResourceNotFoundException:
      result = 'group-not-found'
    return result
  def _primitive_create_log_group( self):
   self.sequenceTokenIsValid = True
    self.sequenceToken = None
      self.logs.create_log_group( logGroupName=self.logGroup)
    except self.logs.exceptions.ResourceAlreadyExistsException:
  def _robust_put_log_events( self):
    status = 'hungry'
    for tryCount in range( 100):
      if status == 'group-not-found':
        status = 'stream-not-found'
      elif status == 'stream-not-found':
        status = self._primitive_create_log_stream()
        if status is None:
          status = 'hungry'
      elif status == 'invalid-sequence-token':
        getSequenceResult = self._getSequenceToken()
        # getSequenceResult == 'group-not-found' | 'stream-not-found-or-virgin-stream' | None
        if getSequenceResult == 'group-not-found':
          status = 'group-not-found'
        elif getSequenceResult == 'stream-not-found-or-virgin-stream':
          status = 'stream-not-found'
          status = 'ready'
      elif status == 'hungry':
        if not self.sequenceTokenIsValid:
          status = 'invalid-sequence-token'
          status = 'ready'
      elif status == 'ready':
        status = self._primitive_put_log_events()
        if status is None:
          status = 'done'
      if status == 'done':
    if status != 'done':
      raise Exception( 'Failed to post to CloudWatch Logs.')
  def purge( self):
    if len( self.logEvents) > 0:
      except Exception as ex:
        print( self.logEvents)
        print( ex)
      self.logEvents = []
  def log( self, level, msg, withPurge=False):
    self.put( msg, level, withPurge, None)
  def debug( self, msg, withPurge=False):
    self.put( msg, logging.DEBUG, withPurge, None)
  def info( self, msg, withPurge=False):
    self.put( msg, logging.INFO, withPurge, None)
  def warning( self, msg, withPurge=False):
    self.put( msg, logging.WARNING, withPurge, None)
  def error( self, msg, withPurge=False):
    self.put( msg, logging.ERROR, withPurge, None)
  def critical( self, msg, callFunc = None):
    self.put( msg, logging.CRITICAL, True, callFunc)
  def exception( self, msg, withPurge=True):
    self.log( logging.ERROR, msg, True)
  def isPurgeable( self):
    return True
  def __del__( self):

How to use

Import custom_logging. In your lambda code, where you need application-centric logging, invoke the factory method createPolymorphicLogger() to create a logger. Then send all your application-centric log events to this logger, instead of print().

The logger is going to have the following public methods.

  • purge()
  • log( level, msg, withPurge=False)
  • debug/info/warning/critical/error/exception( msg, withPurge=False)

Use the log() method to log a string message. ‘level’ is one of the usual logging levels: DEBUG, INFO etc. For performance reasons, messages are buffered before actually sending to CloudWatch. The buffer is purged when either: (A) the buffer gets too long; or (B) the buffer ages out (1 minute); or (C) the withPurge parameter is explicitly set to True. Invoking the purge() method or releasing the custom logger class instance will also do it.

The debug() etc methods are short hand for the log() method when the level is fixed.

How to configure it

Refer to the inline comments.

