Source code for aio_pika.exceptions

import asyncio

import pamqp.exceptions
from aiormq.exceptions import (
    AMQPChannelError,
    AMQPConnectionError,
    AMQPError,
    AMQPException,
    AuthenticationError,
    ChannelClosed,
    ChannelInvalidStateError,
    ChannelNotFoundEntity,
    ChannelPreconditionFailed,
    ConnectionClosed,
    DeliveryError,
    DuplicateConsumerTag,
    IncompatibleProtocolError,
    InvalidFrameError,
    MethodNotImplemented,
    ProbableAuthenticationError,
    ProtocolSyntaxError,
    PublishError,
)


CONNECTION_EXCEPTIONS = (
    AMQPError,
    ConnectionError,
    OSError,
    RuntimeError,
    StopAsyncIteration,
    pamqp.exceptions.PAMQPException,
)


[docs] class MessageProcessError(AMQPError): reason = "%s: %r"
[docs] class QueueEmpty(AMQPError, asyncio.QueueEmpty): pass
__all__ = ( "AMQPChannelError", "AMQPConnectionError", "AMQPError", "AMQPException", "AuthenticationError", "CONNECTION_EXCEPTIONS", "ChannelClosed", "ChannelInvalidStateError", "ChannelNotFoundEntity", "ChannelPreconditionFailed", "ConnectionClosed", "DeliveryError", "DuplicateConsumerTag", "IncompatibleProtocolError", "InvalidFrameError", "MessageProcessError", "MethodNotImplemented", "ProbableAuthenticationError", "ProtocolSyntaxError", "PublishError", "QueueEmpty", )