{"id":384,"date":"2024-04-14T15:10:10","date_gmt":"2024-04-14T05:10:10","guid":{"rendered":"https:\/\/seanbdurkin.id.au\/pascaliburnus2\/?p=384"},"modified":"2024-04-14T16:28:55","modified_gmt":"2024-04-14T06:28:55","slug":"application-centric-cloudwatch-logging-in-aws-lambda-functions-with-python3-runtimes","status":"publish","type":"post","link":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/archives\/384","title":{"rendered":"Application centric Cloudwatch logging in AWS Lambda functions with python3+ runtimes"},"content":{"rendered":"\n<figure class=\"wp-block-image size-full\"><a href=\"https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?ssl=1\"><img data-recalc-dims=\"1\" loading=\"lazy\" decoding=\"async\" width=\"640\" height=\"184\" data-attachment-id=\"385\" data-permalink=\"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/archives\/384\/lambda-to-cw\" data-orig-file=\"https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?fit=651%2C187\" data-orig-size=\"651,187\" data-comments-opened=\"0\" data-image-meta=\"{&quot;aperture&quot;:&quot;0&quot;,&quot;credit&quot;:&quot;&quot;,&quot;camera&quot;:&quot;&quot;,&quot;caption&quot;:&quot;&quot;,&quot;created_timestamp&quot;:&quot;0&quot;,&quot;copyright&quot;:&quot;&quot;,&quot;focal_length&quot;:&quot;0&quot;,&quot;iso&quot;:&quot;0&quot;,&quot;shutter_speed&quot;:&quot;0&quot;,&quot;title&quot;:&quot;&quot;,&quot;orientation&quot;:&quot;0&quot;}\" data-image-title=\"lambda-to-cw\" data-image-description=\"\" data-image-caption=\"\" data-medium-file=\"https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?fit=300%2C86\" data-large-file=\"https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?fit=640%2C184\" src=\"https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?resize=640%2C184&#038;ssl=1\" alt=\"\" class=\"wp-image-385\" srcset=\"https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?w=651 651w, https:\/\/i0.wp.com\/seanbdurkin.id.au\/pascaliburnus2\/wp-content\/uploads\/2024\/04\/lambda-to-cw.jpg?resize=300%2C86 300w\" sizes=\"auto, (max-width: 640px) 100vw, 640px\" \/><\/a><\/figure>\n\n\n\n<h2 class=\"wp-block-heading\">Abstract<\/h2>\n\n\n\n<p>Application centric logging is a system where there are one or more components all directing thier log entries to a single logger. In the AWS context, this could mean an application composed of one or more AWS Lamba functions each logging to a single application-wide AWS CloudWatch log stream. By &#8220;single&#8221;, I mean single to the application, not to each function.<\/p>\n\n\n\n<p>In lambda functions with python runtimes, the default mode of logging is one log stream per lambda. We can do this via the print() function or the logging module. But there are sometimes situations where mulltiple lambdas are co-operating to solve a larger problem, where the co-operation is synchronous, and where it would be of value to be able to view a unified log stream for events accross multiple lambdas.<\/p>\n\n\n\n<p>How do we achieve this, with a simple client interface? In this post, I present a solution.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">A Solution<\/h2>\n\n\n\n<p>Include in the packaging of your AWS Lambda function, the following python 3 script with filename &#8220;custom_logging.py&#8221;.<\/p>\n\n\n\n<p><\/p>\n\n\n<div class=\"wp-block-syntaxhighlighter-code \"><pre class=\"brush: python; title: ; notranslate\" title=\"\">\n################################################################################\n##  CustomLogging class\n################################################################################\nimport boto3\nimport time, sys\nimport logging \n\ndef coerceLoggingType( logType):\n  if (logType is None) or (logType == &#039;&#039;):\n    logType = logging.INFO\n  elif isinstance( logType, str):\n    logType = getattr( logging, logType.upper(), logging.INFO)\n  return logType\n\nglobal stockFormats\nglobal defaultFormat\nglobal levelNames\n\ndefaultFormat = &#039;#standard&#039;\n\nstockFormats = {\n  &#039;#standard&#039;: &#039;{level}: {func}: {caller}: &#039;,\n  &#039;#short&#039;   : &#039;{level}: {func}: &#039;,\n  &#039;#simple&#039;  : &#039;{func}: &#039;} \n\nlevelNames = {\n  logging.DEBUG   : &#039;DEBUG&#039;,\n  logging.INFO    : &#039;INFO&#039;,\n  logging.WARNING : &#039;WARNING&#039;,\n  logging.ERROR   : &#039;ERROR&#039;,\n  logging.CRITICAL: &#039;CRITICAL&#039;}\n\nbotoLoggers = &#x5B;&#039;boto&#039;, &#039;boto3&#039;, &#039;botocore&#039;, &#039;urllib3&#039;]\n\ndef _json_formatter( obj):\n  &quot;&quot;&quot;Formatter for unserialisable values.&quot;&quot;&quot;\n  return str( obj)\n\nclass JsonFormatter( logging.Formatter):\n  &quot;&quot;&quot;AWS Lambda Logging formatter.\n  Formats the log message as a JSON encoded string.  If the message is a\n  dict it will be used directly.  If the message can be parsed as JSON, then\n  the parse d value is used in the output record.\n  &quot;&quot;&quot;\n  def __init__( self, **kwargs):\n    super( JsonFormatter, self).__init__()\n    self.format_dict = {\n      &#039;timestamp&#039;: &#039;%(asctime)s&#039;,\n      &#039;level&#039;: &#039;%(levelname)s&#039;,\n      &#039;location&#039;: &#039;%(name)s.%(funcName)s:%(lineno)d&#039;}\n    self.format_dict.update(kwargs)\n    self.default_json_formatter = kwargs.pop( &#039;json_default&#039;, _json_formatter)\n\n  def format( self, record):\n    record_dict = record.__dict__.copy()\n    record_dict&#x5B;&#039;asctime&#039;] = self.formatTime( record)\n    log_dict = {\n      k: v % record_dict\n      for k, v in self.format_dict.items() if v}\n    if isinstance( record_dict&#x5B;&#039;msg&#039;], dict):\n      log_dict&#x5B;&#039;message&#039;] = record_dict&#x5B;&#039;msg&#039;]\n    else:\n      log_dict&#x5B;&#039;message&#039;] = record.getMessage()\n    # Attempt to decode the message as JSON, if so, merge it with the\n    # overall message for clarity.\n    try:\n      log_dict&#x5B;&#039;message&#039;] = json.loads( log_dict&#x5B;&#039;message&#039;])\n    except ( TypeError, ValueError):\n      pass\n    if record.exc_info:\n      # Cache the traceback text to avoid converting it multiple times\n      # (it&#039;s constant anyway)\n      # from logging.Formatter:format\n      if not record.exc_text:\n        record.exc_text = self.formatException( record.exc_info)\n    if record.exc_text:\n      log_dict&#x5B;&#039;exception&#039;] = record.exc_text\n    json_record = json.dumps( log_dict, default=self.default_json_formatter)\n    if hasattr( json_record, &#039;decode&#039;):  # pragma: no cover\n      json_record = json_record.decode( &#039;utf-8&#039;)\n    return json_record\n\ndef setupCanonicalLogLevels( logger, level, fmt, formatter_cls=JsonFormatter, boto_level=None, **kwargs):\n  if not isinstance( logger, logging.Logger):\n    raise Exception( &#039;Wrong class of logger passed to setupCanonicalLogLevels().&#039;)\n  if logger is not None:\n    logger.setLevel( level)\n  logging.root.setLevel( level)\n  if fmt is not None:\n    logging.basicConfig( format=fmt)\n    fmtObj = logging.Formatter( fmt)\n  else:\n    fmtObj = None\n  for handler in logging.root.handlers:\n    try:\n      if fmtObj is not None:\n        handler.setFormatter( fmtObj)\n      elif formatter_cls is not None:\n        handler.setFormatter( formatter_cls( **kwargs))\n    except:\n      pass\n  if boto_level is None:\n    boto_level = level\n  for loggerId in botoLoggers:\n    try:\n      logging.getLogger( loggerId).setLevel( boto_level)\n    except:\n      pass\n \n \nclass NullLogger():\n  def __init__( self):\n    pass\n \n  def purge( self):\n    pass\n \n  def log( self, level, msg, withPurge=False):\n    pass\n \n  def debug( self, msg, withPurge=False):\n    pass\n \n  def info( self, msg, withPurge=False):\n    pass\n \n  def warning( self, msg, withPurge=False):\n    pass\n \n  def critical( self, msg, withPurge=False):\n    pass\n \n  def error( self, msg, withPurge=False):\n    pass\n \n  def exception( self, msg, withPurge=False):\n    pass\n \n  def classCode( self):\n    return &#039;#null&#039;\n \n  def isPurgeable( self):\n    return False\n \nclass PrintLogger():\n  def __init__( self, threshold):\n    self.threshold = threshold\n \n  def purge( self):\n    pass\n \n  def log( self, level, msg, withPurge=False):\n    if level &gt;= self.threshold:\n      print( msg)\n \n  def debug( self, msg, withPurge=False):\n    self.log( logging.DEBUG, msg, False)\n \n  def info( self, msg, withPurge=False):\n    self.log( logging.INFO, msg, False)\n \n  def warning( self, msg, withPurge=False):\n    self.log( logging.WARNING, msg, False)\n \n  def critical( self, msg, withPurge=False):\n    self.log( logging.CRITICAL, msg, False)\n \n  def error( self, msg, withPurge=False):\n    self.log( logging.ERROR, msg, False)\n \n  def exception( self, msg, withPurge=False):\n    self.log( logging.ERROR, msg, False)\n \n  def classCode( self):\n    return &#039;#print&#039;\n \n  def isPurgeable( self):\n    return False\n \ndef createPolymorphicLogger( logClass, logGroup, logStream, logLevel = logging.INFO, functionName = None, msgFormat = None):\n  if logClass == &#039;cloud-watch&#039;:\n    return CustomLogging( logGroup, logStream, logLevel, functionName, msgFormat)\n  elif logClass == &#039;#print&#039;:\n    return PrintLogger( logLevel)\n  elif (logClass == &#039;#null&#039;) or (logClass is None):\n    return NullLogger()\n  elif isinstance( logClass, dict) and (&#039;logging&#039; in logClass):\n    loggingParams    = logClass.get( &#039;logging&#039;, {})\n    cloudWatchParams = loggingParams.get( &#039;cloud-watch&#039;, {})\n    if msgFormat is None:\n      msgFormat = &#039;#mini&#039;\n    actualLogClass  = loggingParams.get( &#039;class&#039;)\n    logGroup     = cloudWatchParams.get( &#039;group&#039;   , logGroup)\n    logStream    = cloudWatchParams.get( &#039;stream&#039;  , logStream)\n    logLevel     =    loggingParams.get( &#039;level&#039;   , logLevel)\n    functionName = cloudWatchParams.get( &#039;function&#039;, functionName)\n    msgFormat    = cloudWatchParams.get( &#039;format&#039;  , msgFormat)\n    return createLogger( actualLogClass, logGroup, logStream, logLevel, functionName, msgFormat)\n  elif isinstance( logClass, dict) and (&#039;class&#039; in logClass):\n    canonicalLogClassRecord = {&#039;logging&#039;: logClass}\n    return createLogger( canonicalLogClassRecord, logGroup, logStream, logLevel, functionName, msgFormat)\n  elif logClass == &#039;#standard-logger&#039;:\n    logger = logging.getLogger( name=logStream)\n    if msgFormat is None:\n      msgFormat = &#039;&#x5B;%(levelname)s] %(message)s&#039;\n    setupCanonicalLogLevels( logger, logLevel, msgFormat, JsonFormatter, logging.ERROR)\n    return logger\n  else:\n    raise Exception( f&#039;Unrecognised log class {logClass}&#039;)\n \ndef getClassCode( logger):\n  code = &#039;#null&#039;\n  if isinstance( logger, logging.Logger):\n    code = &#039;#standard-logger&#039;\n  elif logger is not None:\n    try:\n      code = logger.classCode()\n    except:\n      code = &#039;#unrecognised&#039;\n  return code\n \ndef isLoggerPurgeable( logger):\n  result = False\n  if (not isinstance( logger, logging.Logger)) and (logger is not None):\n    try:\n      result = logger.isPurgeable()\n    except:\n      pass\n  return result\n \nclass CustomLogging:\n  def __init__( self, logGroup, logStream, logLevel = logging.INFO, functionName = None, msgFormat = None):\n    &quot;&quot;&quot; logGroup is the name of the CloudWatch log group. If none, the messages passes to print.\n        logStream is the name of the stream. It is required. It is a string. There is no embedded date processing.\n        logLevel is one of the logging level constants or its string equivalent. Posts below this level will be swallowed.\n        functionName is the name of the lambda.\n        msgFormat determines the logged message prefix. It is either a format string, a label or a function.\n          If it is a format string, the following substitution identifiers:\n            {level}  The message log level.\n            {func}   The passed functionName\n            {caller} The python caller function name\n          If it is a label, is one of:\n            #standard   - This is the default.\n            #short\n            #simple\n            #mini\n          If it is a function (or callable object), it must be a function that returns a prefix string with\n            the following input parameters in order:\n              level           - passed message level\n              functionName  - constructed function name\n              caller          - invoker caller name\n              logMsg          - passed message\n             \n        EXAMPLE USAGE 1:\n          import custom_logging, logging\n         \n          logger = CustomLogging( &#039;\/aws\/ec2\/prod\/odin&#039;, &#039;2022-06-29-MLC_DAILY-143&#039;, logging.INFO, &#039;CoolLambdaFunc&#039;, &#039;#mini&#039;)\n          logger.info( &#039;Hello friend! This is an info&#039;)\n          logger.error( &#039;I broke it!&#039;)\n          logger.purge()\n       \n        \n        EXAMPLE USAGE 2:\n          import custom_logging, logging\n         \n          logger = CustomLogging( None, None, logging.DEBUG, &#039;CoolLambdaFunc&#039;, &#039;#mini&#039;)\n          logger.info( &#039;This is the same as print&#039;)\n      \n        \n        EXAMPLE USAGE 3:\n          import custom_logging, logging\n         \n          logger = CustomLogging( None, None, logging.WARNING, &#039;CoolLambdaFunc&#039;, &#039;{caller} | {level} !! {func}: &#039;)\n          \n       \n        \n        EXAMPLE USAGE 3:\n          import custom_logging, logging\n         \n          def colourMePink( level, functionName, caller, logMsg):\n            if level == logging.DEBUG:\n              prefix = &#039;{level}: {func}: {caller}: &#039;.format( level = sLevel, func = functionName, caller = caller)\n            elif  level == logging.INFO:\n              prefix = &#039;&#039;\n            else:\n              prefix = &#039;{level}: &#039;.format( level = sLevel)\n            return prefix\n         \n          logger = CustomLogging( None, None, logging.INFO, None, colourMePink)\n          \n    &quot;&quot;&quot;\n    self.logs           = boto3.client( &#039;logs&#039;, region_name=&#039;ap-southeast-2&#039;)\n    self.logEvents      = &#x5B;]\n    self.functionName = functionName\n    if self.functionName is None:\n      self.functionName = &#039;&#039;\n    self.logGroup       = logGroup\n    self.logStream      = logStream\n    self.msgFormat = msgFormat\n    if self.msgFormat is None:\n      self.msgFormat = defaultFormat\n    if isinstance( self.msgFormat, str) and (self.msgFormat in stockFormats):\n      self.msgFormat = stockFormats&#x5B;self.msgFormat]\n    elif self.msgFormat == &#039;#mini&#039;:\n      self.msgFormat = self._miniFormat\n    self.logLevel       = coerceLoggingType( logLevel)\n    self.sequenceToken  = None\n    self.sequenceTokenIsValid = False\n    self.maxEventsInBuffer = 20\n    self.maxBufferAgeMs = 60000 # 1 minute.\n \n  def _formatMessage( self, caller, logType, logMsg):\n    prefix = &#039;&#039;\n    if caller is None:\n      try:\n        caller = sys._getframe(3).f_code.co_name\n      except:\n        caller = &#039;&#039;\n    sLevel = levelNames.get( logType, str( logType))\n    if isinstance( self.msgFormat, str):\n      prefix = self.msgFormat.format( level = sLevel, func = self.functionName, caller = caller)\n    elif callable( self.msgFormat):\n      prefix = self.msgFormat( logType, self.functionName, caller, logMsg)\n    return prefix + str( logMsg)\n \n  def _miniFormat( self, level, functionName, caller, logMsg):\n    prefix = &#039;&#039;\n    if level &gt;= logging.WARNING:\n      prefix = levelNames&#x5B; level] + &#039;: &#039;\n    if functionName != &#039;&#039;:\n      prefix = prefix + functionName + &#039;: &#039;\n    return prefix\n \n  def _getSequenceToken( self):\n    self.sequenceToken = None\n    self.sequenceTokenIsValid = True\n    try:\n      response = self.logs.describe_log_streams( logGroupName=self.logGroup, logStreamNamePrefix=self.logStream)\n    except self.logs.exceptions.ResourceNotFoundException:\n      return &#039;group-not-found&#039;\n    try:\n      if &#039;uploadSequenceToken&#039; in response&#x5B;&#039;logStreams&#039;]&#x5B;0]:\n        self.sequenceToken = response&#x5B;&#039;logStreams&#039;]&#x5B;0]&#x5B;&#039;uploadSequenceToken&#039;]\n      if self.sequenceToken == &#039;&#039;:\n        self.sequenceToken = None\n    except:\n      pass\n    if self.sequenceToken is None:\n      return &#039;stream-not-found-or-virgin-stream&#039;\n    else:\n      return None\n \n  def put( self, logMsg, logType = logging.INFO, withPurge=False, callFunc = None):\n    logType = coerceLoggingType( logType)\n    if self.logLevel &lt;= logType:\n      if self.logGroup is not None:\n        timestamp = int( round( time.time() * 1000))\n        message = self._formatMessage( callFunc, logType, logMsg)\n        logEvent = {&#039;timestamp&#039;: timestamp, &#039;message&#039;: message}\n        if self.logLevel == logging.DEBUG:\n         print( message)\n        self.logEvents.append( logEvent)\n        count = len( self.logEvents)\n        if withPurge or \\\n           (count &gt;= self.maxEventsInBuffer) or \\\n           ((count &gt;= 1) and ((timestamp - self.logEvents&#x5B;0]&#x5B;&#039;timestamp&#039;]) &gt;= self.maxBufferAgeMs)):\n          self.purge()\n      else:\n        print( logMsg)\n \n  def classCode( self):\n    return &#039;cloud-watch&#039;\n \n  def _primitive_put_log_events( self):\n    event_log = {\n      &#039;logGroupName&#039; : self.logGroup,\n      &#039;logStreamName&#039;: self.logStream,\n      &#039;logEvents&#039;    : self.logEvents}\n    if self.sequenceToken is not None:\n      event_log&#x5B;&#039;sequenceToken&#039;] = self.sequenceToken\n    try:\n      response = self.logs.put_log_events( **event_log)\n      self.sequenceToken = response.get( &#039;nextSequenceToken&#039;)\n      self.sequenceTokenIsValid = True\n      result = None\n    except self.logs.exceptions.ResourceAlreadyExistsException:\n      self.sequenceTokenIsValid = False\n      result = None\n    except self.logs.exceptions.DataAlreadyAcceptedException:\n      self.sequenceTokenIsValid = False\n      result = None\n    except self.logs.exceptions.InvalidSequenceTokenException:\n      self.sequenceTokenIsValid = False\n      result = &#039;invalid-sequence-token&#039;\n    except self.logs.exceptions.ResourceNotFoundException:\n      self.sequenceTokenIsValid = True\n      self.sequenceToken = None\n      result = &#039;stream-not-found&#039;\n    return result\n \n  def _primitive_create_log_stream( self):\n    self.sequenceTokenIsValid = True\n    self.sequenceToken = None\n    try:\n      self.logs.create_log_stream( logGroupName=self.logGroup, logStreamName=self.logStream)\n      result = None\n    except self.logs.exceptions.ResourceAlreadyExistsException:\n      self.sequenceTokenIsValid = False\n      result = None\n    except self.logs.exceptions.ResourceNotFoundException:\n      result = &#039;group-not-found&#039;\n    return result\n \n  def _primitive_create_log_group( self):\n   self.sequenceTokenIsValid = True\n    self.sequenceToken = None\n    try:\n      self.logs.create_log_group( logGroupName=self.logGroup)\n    except self.logs.exceptions.ResourceAlreadyExistsException:\n      pass\n \n  def _robust_put_log_events( self):\n    status = &#039;hungry&#039;\n    for tryCount in range( 100):\n      if status == &#039;group-not-found&#039;:\n        self._primitive_create_log_group()\n        status = &#039;stream-not-found&#039;\n      elif status == &#039;stream-not-found&#039;:\n        status = self._primitive_create_log_stream()\n        if status is None:\n          status = &#039;hungry&#039;\n      elif status == &#039;invalid-sequence-token&#039;:\n        getSequenceResult = self._getSequenceToken()\n        # getSequenceResult == &#039;group-not-found&#039; | &#039;stream-not-found-or-virgin-stream&#039; | None\n        if getSequenceResult == &#039;group-not-found&#039;:\n          status = &#039;group-not-found&#039;\n        elif getSequenceResult == &#039;stream-not-found-or-virgin-stream&#039;:\n          status = &#039;stream-not-found&#039;\n        else:\n          status = &#039;ready&#039;\n      elif status == &#039;hungry&#039;:\n        if not self.sequenceTokenIsValid:\n          status = &#039;invalid-sequence-token&#039;\n        else:\n          status = &#039;ready&#039;\n      elif status == &#039;ready&#039;:\n        status = self._primitive_put_log_events()\n        if status is None:\n          status = &#039;done&#039;\n      if status == &#039;done&#039;:\n        break\n    if status != &#039;done&#039;:\n      raise Exception( &#039;Failed to post to CloudWatch Logs.&#039;)\n \n  def purge( self):\n    if len( self.logEvents) &gt; 0:\n      try:\n        self._robust_put_log_events()\n      except Exception as ex:\n        print( self.logEvents)\n        print( ex)\n      self.logEvents = &#x5B;]\n \n  def log( self, level, msg, withPurge=False):\n    self.put( msg, level, withPurge, None)\n \n  def debug( self, msg, withPurge=False):\n    self.put( msg, logging.DEBUG, withPurge, None)\n \n  def info( self, msg, withPurge=False):\n    self.put( msg, logging.INFO, withPurge, None)\n \n  def warning( self, msg, withPurge=False):\n    self.put( msg, logging.WARNING, withPurge, None)\n \n  def error( self, msg, withPurge=False):\n    self.put( msg, logging.ERROR, withPurge, None)\n \n  def critical( self, msg, callFunc = None):\n    self.put( msg, logging.CRITICAL, True, callFunc)\n \n  def exception( self, msg, withPurge=True):\n    self.log( logging.ERROR, msg, True)\n \n  def isPurgeable( self):\n    return True\n \n  def __del__( self):\n    try:\n      self.purge()\n    except:\n      pass\n<\/pre><\/div>\n\n\n<h2 class=\"wp-block-heading\">How to use<\/h2>\n\n\n\n<p>Import custom_logging. In your lambda code, where you need application-centric logging, invoke the factory method <code>createPolymorphicLogger()<\/code> to create a logger. Then send all your application-centric log events to this logger, instead of print().<\/p>\n\n\n\n<p>The logger is going to have the following public methods.<\/p>\n\n\n\n<ul class=\"wp-block-list\">\n<li>purge()<\/li>\n\n\n\n<li>log( level, msg, withPurge=False)<\/li>\n\n\n\n<li>debug\/info\/warning\/critical\/error\/exception( msg, withPurge=False)<\/li>\n<\/ul>\n\n\n\n<p>Use the log() method to log a string message. &#8216;level&#8217; is one of the usual logging levels: DEBUG, INFO etc. For performance reasons, messages are buffered before actually sending to CloudWatch. The buffer is purged when either: (A) the buffer gets too long; or (B) the buffer ages out (1 minute); or (C) the withPurge parameter is explicitly set to True. Invoking the purge() method or releasing the custom logger class instance will also  do it.<\/p>\n\n\n\n<p>The debug() etc methods are short hand for the log() method when the level is fixed.<\/p>\n\n\n\n<h2 class=\"wp-block-heading\">How to configure it<\/h2>\n\n\n\n<p>Refer to the inline comments.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>Abstract Application centric logging is a system where there are one or more components all directing thier log entries to a single logger. In the AWS context, this could mean an application composed of one or more AWS Lamba functions &hellip; <a href=\"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/archives\/384\">Continue reading <span class=\"meta-nav\">&rarr;<\/span><\/a><\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"jetpack_post_was_ever_published":false,"_jetpack_newsletter_access":"","_jetpack_dont_email_post_to_subs":false,"_jetpack_newsletter_tier_id":0,"_jetpack_memberships_contains_paywalled_content":false,"_jetpack_memberships_contains_paid_content":false,"footnotes":"","jetpack_publicize_message":"","jetpack_publicize_feature_enabled":true,"jetpack_social_post_already_shared":true,"jetpack_social_options":{"image_generator_settings":{"template":"highway","default_image_id":0,"font":"","enabled":false},"version":2}},"categories":[16],"tags":[18,19],"class_list":["post-384","post","type-post","status-publish","format-standard","hentry","category-python","tag-logging","tag-python"],"jetpack_publicize_connections":[],"jetpack_featured_media_url":"","jetpack_shortlink":"https:\/\/wp.me\/p2QXbt-6c","jetpack_sharing_enabled":true,"_links":{"self":[{"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/posts\/384","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/comments?post=384"}],"version-history":[{"count":6,"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/posts\/384\/revisions"}],"predecessor-version":[{"id":392,"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/posts\/384\/revisions\/392"}],"wp:attachment":[{"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/media?parent=384"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/categories?post=384"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/seanbdurkin.id.au\/pascaliburnus2\/wp-json\/wp\/v2\/tags?post=384"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}