Skip to content

Module

pgactivity

pgactivity.context

context(**metadata)

Bases: ContextDecorator

A context manager that adds additional metadata to SQL statements.

Once any code has entered pgactivity.context, all subsequent entrances of pgactivity.context will be overwrite keys.

To add context only if a parent has already entered pgactivity.context, one can call pgactivity.context as a function without entering it. The metadata set in the function call will be part of the context if pgactivity.context has previously been entered. Otherwise it will be ignored.

Parameters:

Name Type Description Default
metadata dict

Metadata that should be attached to the activity context

{}
Example

Here we track a "key" with a value of "value"::

with pgactivity.context(key='value'):
    # Do things..
    # All SQL will have a {'key': 'value'} metadata comment.
    # Nesting will add additional metadata to the current
    # context

# Add metadata if a parent piece of code has already entered
# pgactivity.context
pgactivity.context(key='value')
Source code in pgactivity/runtime.py
def __init__(self, **metadata):
    self.metadata = metadata
    self._pre_execute_hook = None

    if hasattr(_context, "value"):
        _context.value.update(**self.metadata)

pgactivity.cancel

cancel(*pids: int, using: str = DEFAULT_DB_ALIAS) -> List[int]

Cancel activity using the Postgres pg_cancel_backend function.

Parameters:

Name Type Description Default
*pids int

The process ID(s) to cancel.

()
using str

The database to use.

DEFAULT_DB_ALIAS

Returns:

Type Description
List[int]

Canceled process IDs

Source code in pgactivity/core.py
def cancel(*pids: int, using: str = DEFAULT_DB_ALIAS) -> List[int]:
    """Cancel activity using the Postgres ``pg_cancel_backend`` function.

    Args:
        *pids: The process ID(s) to cancel.
        using: The database to use.

    Returns:
        Canceled process IDs
    """
    return _pg_backend_method("cancel", pids, using)

pgactivity.pid

pid(using: str = DEFAULT_DB_ALIAS) -> int

Get the current backend process ID.

Parameters:

Name Type Description Default
using str

The database to use.

DEFAULT_DB_ALIAS

Returns:

Type Description
int

The current backend process ID

Source code in pgactivity/core.py
def pid(using: str = DEFAULT_DB_ALIAS) -> int:
    """Get the current backend process ID.

    Args:
        using: The database to use.

    Returns:
        The current backend process ID
    """
    with connections[using].cursor() as cursor:
        cursor.execute("SELECT pg_backend_pid()")
        return cursor.fetchone()[0]

pgactivity.terminate

terminate(*pids: int, using: str = DEFAULT_DB_ALIAS) -> List[int]

Terminate activity using the Postgres pg_teminate_backend function.

Parameters:

Name Type Description Default
*pids int

The process ID(s) to terminate.

()
using str

The database to use.

DEFAULT_DB_ALIAS

Returns:

Type Description
List[int]

Terminated process IDs

Source code in pgactivity/core.py
def terminate(*pids: int, using: str = DEFAULT_DB_ALIAS) -> List[int]:
    """Terminate activity using the Postgres ``pg_teminate_backend`` function.

    Args:
        *pids: The process ID(s) to terminate.
        using: The database to use.

    Returns:
        Terminated process IDs
    """
    return _pg_backend_method("terminate", pids, using)

pgactivity.timeout

timeout(
    timeout: Union[dt.timedelta, int, float, None] = _unset,
    *,
    using: str = DEFAULT_DB_ALIAS,
    **timedelta_kwargs: int
)

Set the statement timeout as a decorator or context manager.

A value of None will set an infinite statement timeout. A value of less than a millisecond is not permitted.

Nested invocations will successfully apply and rollback the timeout to the previous value.

Parameters:

Name Type Description Default
timeout Union[timedelta, int, float, None]

The number of seconds as an integer or float. Use a timedelta object to precisely specify the timeout interval. Use None for an infinite timeout.

_unset
using str

The database to use.

DEFAULT_DB_ALIAS
**timedelta_kwargs int

Keyword arguments to directly supply to datetime.timedelta to create an interval. E.g. pgactivity.timeout(seconds=1, milliseconds=100) will create a timeout of 1100 milliseconds.

{}

Raises:

Type Description
OperationalError

When a timeout occurs

TypeError

When the timeout interval is an incorrect type

Source code in pgactivity/core.py
@contextlib.contextmanager
def timeout(
    timeout: Union[dt.timedelta, int, float, None] = _unset,
    *,
    using: str = DEFAULT_DB_ALIAS,
    **timedelta_kwargs: int,
):
    """Set the statement timeout as a decorator or context manager.

    A value of ``None`` will set an infinite statement timeout.
    A value of less than a millisecond is not permitted.

    Nested invocations will successfully apply and rollback the timeout to
    the previous value.

    Args:
        timeout: The number of seconds as an integer or float. Use a timedelta
            object to precisely specify the timeout interval. Use ``None`` for
            an infinite timeout.
        using: The database to use.
        **timedelta_kwargs: Keyword arguments to directly supply to
            datetime.timedelta to create an interval. E.g.
            `pgactivity.timeout(seconds=1, milliseconds=100)`
            will create a timeout of 1100 milliseconds.

    Raises:
        django.db.utils.OperationalError: When a timeout occurs
        TypeError: When the timeout interval is an incorrect type
    """
    if timedelta_kwargs:
        timeout = dt.timedelta(**timedelta_kwargs)
    elif timeout is _unset:
        raise ValueError("Must supply a value to pgactivity.timeout")

    if timeout is not None:
        timeout = _cast_timeout(timeout)

        if not timeout:
            raise ValueError(
                "Must supply value greater than a millisecond to pgactivity.timeout"
                " or use ``None`` to reset the timeout."
            )
    else:
        timeout = dt.timedelta()

    if not hasattr(_timeout, "value"):
        _timeout.value = None

    old_timeout = _timeout.value
    _timeout.value = int(timeout.total_seconds() * 1000)

    try:
        with connections[using].cursor() as cursor:
            cursor.execute(f"SELECT set_config('statement_timeout', '{_timeout.value}', false)")
            yield
    finally:
        _timeout.value = old_timeout

        with connections[using].cursor() as cursor:
            if not _is_transaction_errored(cursor):
                if _timeout.value is None:
                    cursor.execute("SELECT set_config('statement_timeout', NULL, false)")
                else:
                    cursor.execute(
                        f"SELECT set_config('statement_timeout', '{_timeout.value}', false)"
                    )

pgactivity.contrib

pgactivity.contrib.execute_from_command_line

execute_from_command_line(
    *args: Any,
    ignore_commands: Union[List[str], None] = None,
    exec_func: Union[Callable, None] = None,
    **kwargs: Any
)

A drop-in replacement for Django's execute_from_command_line that attaches the command name as context. Can be used in a manage.py file.

Parameters:

Name Type Description Default
*args Any

Args for exec_func

()
ignore_commands Union[List[str], None]

Command names that should be ignored. Both "runserver" and "runserver_plus" are always added to this value.

None
exec_func Union[Callable, None]

the function to executed. Defaults to Django's execute_from_command_line function.

None
**kwargs Any

Kwargs for exec_func

{}
Source code in pgactivity/contrib.py
def execute_from_command_line(
    *args: Any,
    ignore_commands: Union[List[str], None] = None,
    exec_func: Union[Callable, None] = None,
    **kwargs: Any,
):
    """
    A drop-in replacement for Django's ``execute_from_command_line`` that attaches
    the command name as context. Can be used in a manage.py file.

    Arguments:
        *args: Args for `exec_func`
        ignore_commands: Command names that should be ignored. Both
            "runserver" and "runserver_plus" are always added to this value.
        exec_func: the function to executed. Defaults to Django's
            `execute_from_command_line` function.
        **kwargs: Kwargs for `exec_func`
    """
    exec_func = exec_func or django_execute_from_command_line
    ignore_commands = ignore_commands or []
    ignore_commands = list(ignore_commands) + ["runserver", "runserver_plus"]

    if len(sys.argv) > 1 and sys.argv[1] not in ignore_commands:
        activity_context = runtime.context(command=sys.argv[1])
    else:  # pragma: no cover
        activity_context = contextlib.ExitStack()

    with activity_context:
        exec_func(*args, **kwargs)

pgactivity.middleware

pgactivity.middleware.ActivityMiddleware

ActivityMiddleware(get_response: Callable)

Annotates the url/method in the pgactivity context.

Source code in pgactivity/middleware.py
def ActivityMiddleware(get_response: Callable):
    """
    Annotates the url/method in the pgactivity context.
    """

    def middleware(request):
        with runtime.context(url=request.path, method=request.method):
            return get_response(request)

    return middleware

pgactivity.models

pgactivity.models.JSONField

Bases: JSONField

A JSONField that has a stable import path.

Useful for migrations since the JSONField path changed in a Django upgrade.

pgactivity.models.NoObjectsManager

Bases: Manager

Django's dumpdata and other commands will try to dump PG models. This manager is set as the default manager on PG models to prevent that.

pgactivity.models.PGActivity

Bases: PGTable

Wraps Postgres's pg_stat_activity view.

Attributes:

Name Type Description
start DateTimeField

The start of the query.

duration DurationField

The duration of the query.

query TextField

The SQL.

context JSONField

Context tracked by pgactivity.context.

state CharField

The state of the query. One of ACTIVE, IDLE, IDLE_IN_TRANSACTION, IDLE_IN_TRANSACTION_(ABORTED) FASTPATH_FUNCTION_CALL, or DISABLED.

xact_start DateTimeField

Time when the current transaction was started, or null if no transaction is active.

backend_start DateTimeField

Time when this process was started.

state_change DateTimeField

Time when the state was last changed.

wait_event_type CharField

Type of event for which backend is waiting or null. See values here <https://www.postgresql.org/docs/current/monitoring-stats.html#WAIT-EVENT-TABLE>__. Note that values are snake case.

wait_event CharField

Wait event name if backend is currently waiting. See values here <https://www.postgresql.org/docs/current/monitoring-stats.html#WAIT-EVENT-ACTIVITY-TABLE>__. Note that values are in snake case.

backend_xid CharField

Top-level transaction identifier of this backend, if any.

backend_xmin CharField

The current backend's xmin horizon, if any.

backend_type CharField

One of LAUNCHER, AUTOVACUUM_WORKER, LOGICAL_REPLICATION_LAUNCHER, LOGICAL_REPLICATION_WORKER, PARALLEL_WORKER, BACKGROUND_WRITER, CLIENT_BACKEND, CHECKPOINTER, ARCHIVER, STARTUP, WALRECEIVER, WALSENDER, or WALWRITER.

client_addr CharField

IP address of the client connected to this backend, if any.

client_hostname CharField

Host name of the connected client, as reported by a reverse DNS lookup of client_addr.

client_port IntegerField

TCP port number that the client is using for communication with this backend, or -1 if a Unix socket is used.

pgactivity.models.PGActivityQuerySet

PGActivityQuerySet(model=None, query=None, using=None, hints=None)

Bases: PGTableQuerySet

The Queryset for the PGActivity model.

Source code in pgactivity/models.py
def __init__(self, model=None, query=None, using=None, hints=None):
    if query is None:
        query = PGTableQuery(model)

    super().__init__(model, query, using, hints)

cancel

cancel() -> List[int]

Cancel filtered activity.

Source code in pgactivity/models.py
def cancel(self) -> List[int]:
    """Cancel filtered activity."""
    pids = list(self.values_list("id", flat=True))
    return core.cancel(*pids, using=self.db)

config

config(name: str, **overrides: Any) -> models.QuerySet

Use a config name from settings.PGACTIVITY_CONFIGS to apply filters. Config overrides can be provided in the keyword arguments.

Parameters:

Name Type Description Default
name str

Name of the config. Must be a key from settings.PGACTIVITY_CONFIGS.

required
**overrides Any

Any overrides to apply to the final config dictionary.

{}

Returns:

Name Type Description
dict QuerySet

The configuration

Source code in pgactivity/models.py
def config(self, name: str, **overrides: Any) -> models.QuerySet:
    """
    Use a config name from ``settings.PGACTIVITY_CONFIGS``
    to apply filters. Config overrides can be provided
    in the keyword arguments.

    Args:
        name: Name of the config. Must be a key from ``settings.PGACTIVITY_CONFIGS``.
        **overrides: Any overrides to apply to the final config dictionary.

    Returns:
        dict: The configuration
    """
    qset = self

    cfg = config.get(name, **overrides)

    qset = qset.using(cfg.get("database", DEFAULT_DB_ALIAS))
    qset = qset.pid(*cfg.get("pids", []))

    for f in cfg.get("filters", []) or []:
        key, val = f.split("=", 1)
        qset = qset.filter(**{key: val})

    return qset

terminate

terminate() -> List[int]

Terminate filtered activity.

Source code in pgactivity/models.py
def terminate(self) -> List[int]:
    """Terminate filtered activity."""
    pids = list(self.values_list("id", flat=True))
    return core.terminate(*pids, using=self.db)

pgactivity.models.PGTableQueryCompiler

Bases: SQLCompiler

as_sql

as_sql(*args, **kwargs)

Return a CTE for the pg_stat_activity to facilitate queries

Source code in pgactivity/models.py
def as_sql(self, *args, **kwargs):
    """
    Return a CTE for the pg_stat_activity to facilitate queries
    """
    ctes = "WITH " + ", ".join(self.get_ctes())

    sql, params = super().as_sql(*args, **kwargs)
    return ctes + sql, params

pgactivity.models.PGTableQuerySet

PGTableQuerySet(model=None, query=None, using=None, hints=None)

Bases: QuerySet

The base queryset for PG* models.

Allows for the process IDs to be set on the query compiler, making the query much more efficient.

Source code in pgactivity/models.py
def __init__(self, model=None, query=None, using=None, hints=None):
    if query is None:
        query = PGTableQuery(model)

    super().__init__(model, query, using, hints)

pid

pid(*pids)

Set the PIDs to filter against

Source code in pgactivity/models.py
def pid(self, *pids):
    """Set the PIDs to filter against"""
    qs = self._clone()
    qs.query.pids = [int(pid) for pid in pids]
    return qs