| Server IP : www.new.bangkokfinder.com / Your IP : 162.158.170.163 Web Server : nginx/1.20.1 System : Linux new 4.15.0-159-generic #167-Ubuntu SMP Tue Sep 21 08:55:05 UTC 2021 x86_64 User : bangkokfinder ( 1000) PHP Version : 7.4.33 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : ON Directory : /usr/lib/python3/dist-packages/twisted/logger/ |
Upload File : |
# -*- test-case-name: twisted.logger.test.test_buffer -*-
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
"""
Log observer that maintains a buffer.
"""
from collections import deque
from zope.interface import implementer
from ._observer import ILogObserver
_DEFAULT_BUFFER_MAXIMUM = 64 * 1024
@implementer(ILogObserver)
class LimitedHistoryLogObserver(object):
"""
L{ILogObserver} that stores events in a buffer of a fixed size::
>>> from twisted.logger import LimitedHistoryLogObserver
>>> history = LimitedHistoryLogObserver(5)
>>> for n in range(10): history({'n': n})
...
>>> repeats = []
>>> history.replayTo(repeats.append)
>>> len(repeats)
5
>>> repeats
[{'n': 5}, {'n': 6}, {'n': 7}, {'n': 8}, {'n': 9}]
>>>
"""
def __init__(self, size=_DEFAULT_BUFFER_MAXIMUM):
"""
@param size: The maximum number of events to buffer. If L{None}, the
buffer is unbounded.
@type size: L{int}
"""
self._buffer = deque(maxlen=size)
def __call__(self, event):
self._buffer.append(event)
def replayTo(self, otherObserver):
"""
Re-play the buffered events to another log observer.
@param otherObserver: An observer to replay events to.
@type otherObserver: L{ILogObserver}
"""
for event in self._buffer:
otherObserver(event)