import asyncio
import pamqp.exceptions
from aiormq.exceptions import (
AMQPChannelError,
AMQPConnectionError,
AMQPError,
AMQPException,
AuthenticationError,
ChannelClosed,
ChannelInvalidStateError,
ChannelNotFoundEntity,
ChannelPreconditionFailed,
ConnectionClosed,
DeliveryError,
DuplicateConsumerTag,
IncompatibleProtocolError,
InvalidFrameError,
MethodNotImplemented,
ProbableAuthenticationError,
ProtocolSyntaxError,
PublishError,
)
CONNECTION_EXCEPTIONS = (
AMQPError,
ConnectionError,
OSError,
RuntimeError,
StopAsyncIteration,
pamqp.exceptions.PAMQPException,
)
[docs]
class MessageProcessError(AMQPError):
reason = "%s: %r"
[docs]
class QueueEmpty(AMQPError, asyncio.QueueEmpty):
pass
__all__ = (
"AMQPChannelError",
"AMQPConnectionError",
"AMQPError",
"AMQPException",
"AuthenticationError",
"CONNECTION_EXCEPTIONS",
"ChannelClosed",
"ChannelInvalidStateError",
"ChannelNotFoundEntity",
"ChannelPreconditionFailed",
"ConnectionClosed",
"DeliveryError",
"DuplicateConsumerTag",
"IncompatibleProtocolError",
"InvalidFrameError",
"MessageProcessError",
"MethodNotImplemented",
"ProbableAuthenticationError",
"ProtocolSyntaxError",
"PublishError",
"QueueEmpty",
)