Skip to content

Home

The Mister Webhooks API is organized around the MisterWebhooksConsumer class. Its sole purpose is to support writing event consumers easily. The result is a minimal API.

Here's an example of the smallest possible Mister Webhooks client:

from mister_webhooks import ConnectionProfile, MisterWebhooksConsumer
import sys, pprint

if __name__ == "__main__":
    topic, profile_path = sys.argv[1:]

    profile = ConnectionProfile.from_file(profile_path)
    consumer = MisterWebhooksConsumer(topic, profile)

    consumer.run(pprint.pprint)

This program takes the topic to consume from as its first command-line argument and the path to the connection profile file as its second. It loads the connection profile from disk, uses it to create a consumer, and then runs the consumer loop, pretty-printing every webhook event as it comes in.

ConnectionProfile dataclass

ConnectionProfile represents the data within a Mister Webhooks connection profile. A ConnectionProfile is usually created using one of the static methods, and then used to instantiate a MisterWebhooksConsumer.

Attributes:

Name Type Description
consumer_name str

Name of this consumer.

auth_mechanism Literal

'plain'): SASL authentication mechanism this consumer uses.

auth_secret str

(str): The secret to be used in SASL authentication.

kafka_bootstrap str

(str): Kafka DNS domain & port to use to find brokers.

Source code in python/mister_webhooks/connection_profile.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@dataclass
class ConnectionProfile:
    """ConnectionProfile represents the data within a Mister Webhooks connection profile. A ConnectionProfile is usually
    created using one of the static methods, and then used to instantiate a MisterWebhooksConsumer.

    Attributes:
      consumer_name (str): Name of this consumer.
      auth_mechanism (Literal: 'plain'): SASL authentication mechanism this consumer uses.
      auth_secret: (str): The secret to be used in SASL authentication.
      kafka_bootstrap: (str): Kafka DNS domain & port to use to find brokers.
    """

    consumer_name: str
    auth_mechanism: Literal["plain"]
    auth_secret: str
    kafka_bootstrap: str

    @staticmethod
    def from_file(path) -> "ConnectionProfile":
        """from_file reads a ConnectionProfile from a Mister Webhooks connection profile file."""
        with io.open(path, "r") as f:
            data = json.load(f)

            return ConnectionProfile(
                data["consumer_name"],
                data["auth"]["mechanism"],
                data["auth"]["secret"],
                data["kafka"]["bootstrap"],
            )

from_file(path) staticmethod

from_file reads a ConnectionProfile from a Mister Webhooks connection profile file.

Source code in python/mister_webhooks/connection_profile.py
24
25
26
27
28
29
30
31
32
33
34
35
@staticmethod
def from_file(path) -> "ConnectionProfile":
    """from_file reads a ConnectionProfile from a Mister Webhooks connection profile file."""
    with io.open(path, "r") as f:
        data = json.load(f)

        return ConnectionProfile(
            data["consumer_name"],
            data["auth"]["mechanism"],
            data["auth"]["secret"],
            data["kafka"]["bootstrap"],
        )

HTTPMethod

Bases: Enum

HTTPMethod represents the HTTP method used to transmit the webhook event.

Only the document manipulation methods are encoded here, namely:

  • GET
  • HEAD
  • POST
  • PUT
  • PATCH
  • DELETE
Source code in python/mister_webhooks/webhook_event.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class HTTPMethod(Enum):
    """HTTPMethod represents the HTTP method used to transmit the webhook event.

    Only the document manipulation methods are encoded here, namely:

    - GET
    - HEAD
    - POST
    - PUT
    - PATCH
    - DELETE

    """

    GET = auto
    HEAD = auto
    POST = auto
    PUT = auto
    DELETE = auto
    PATCH = auto

MisterWebhooksConsumer

MisterWebhooksConsumer is a message consumer for Mister Webhooks events.

Parameters:

Name Type Description Default
topic str

The Mister Webhooks topic to consume from.

required
profile ConnectionProfile

The connection profile to connect with.

required
Source code in python/mister_webhooks/client.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class MisterWebhooksConsumer:
    """MisterWebhooksConsumer is a message consumer for Mister Webhooks events.

    Parameters:
        topic (str): The Mister Webhooks topic to consume from.
        profile (ConnectionProfile): The connection profile to connect with.
    """

    kafkaConsumer: KafkaConsumer

    def __init__(self, topic: str, profile: ConnectionProfile) -> None:
        auth = {}

        match profile.auth_mechanism:
            case "plain":
                auth: Dict[str, str] = {
                    "sasl_mechanism": "PLAIN",
                    "sasl_plain_username": profile.consumer_name,
                    "sasl_plain_password": profile.auth_secret,
                }

        self.kafkaConsumer = KafkaConsumer(
            topic,
            group_id=profile.consumer_name,
            bootstrap_servers=[profile.kafka_bootstrap],
            enable_auto_commit=False,
            allow_auto_create_topics=False,
            security_protocol="SASL_SSL",
            ssl_context=new_ssl_client_context(),
            **auth,
        )

    def run(self, cb: Callable[[WebhookEvent], None]) -> None:
        """
        Run runs the consumer and passes each incoming webhook event to the provided callback. It automatically
        takes care of offset bookkeeping when the callback runs without error. All exceptions thrown by the
        callback are re-raised.

        Parameters:
            cb (Callable[[WebhookEvent], None]): The callback to run on each message.
        """
        for msg in self.kafkaConsumer:
            decoded: None | WebhookEvent = decode_message(msg)
            match decoded:
                case None:
                    continue
                case x:
                    cb(x)
                    self.kafkaConsumer.commit(
                        offsets=MisterWebhooksConsumer.__messageCommitOffset(msg)
                    )

    @staticmethod
    def __messageCommitOffset(
        msg: ConsumerRecord,
    ) -> Dict[TopicPartition, OffsetAndMetadata]:
        return {
            TopicPartition(topic=msg.topic, partition=msg.partition): OffsetAndMetadata(
                offset=msg.offset + 1, metadata=None, leader_epoch=msg.leader_epoch
            )
        }

run(cb)

Run runs the consumer and passes each incoming webhook event to the provided callback. It automatically takes care of offset bookkeeping when the callback runs without error. All exceptions thrown by the callback are re-raised.

Parameters:

Name Type Description Default
cb Callable[[WebhookEvent], None]

The callback to run on each message.

required
Source code in python/mister_webhooks/client.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def run(self, cb: Callable[[WebhookEvent], None]) -> None:
    """
    Run runs the consumer and passes each incoming webhook event to the provided callback. It automatically
    takes care of offset bookkeeping when the callback runs without error. All exceptions thrown by the
    callback are re-raised.

    Parameters:
        cb (Callable[[WebhookEvent], None]): The callback to run on each message.
    """
    for msg in self.kafkaConsumer:
        decoded: None | WebhookEvent = decode_message(msg)
        match decoded:
            case None:
                continue
            case x:
                cb(x)
                self.kafkaConsumer.commit(
                    offsets=MisterWebhooksConsumer.__messageCommitOffset(msg)
                )

WebhookEvent dataclass

WebhookEvent represents a received webhook event

Source code in python/mister_webhooks/webhook_event.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
@dataclass
class WebhookEvent:
    "WebhookEvent represents a received webhook event"

    method: HTTPMethod
    """HTTP method used to transmit the webhook event"""

    timestamp: datetime
    """Time at which the webhook event occurred"""

    headers: Dict[str, List[bytes]]
    """HTTP headers captured from the webhook request"""

    payload: Any
    """Webhook request body"""

    @staticmethod
    def decode(
        envelopeType: int, timestamp: datetime, raw: ByteString
    ) -> "WebhookEvent":
        match envelopeType:
            case 0x80:
                try:
                    envelope: KafkaMessageEnvelopeV1 = envelope_reader.read(
                        avro.io.BinaryDecoder(io.BytesIO(raw))
                    )  # type: ignore
                except Exception as exc:
                    raise EnvelopeDecodeError(exc)

                try:
                    return WebhookEvent._from_kafka_message_envelope_v1(
                        timestamp, envelope
                    )
                except Exception as exc:
                    raise PayloadDecodeError(exc)

            case unknown:
                raise ValueError(
                    "envelope type 0x%2x is unsupported, please upgrade mister-webhooks-client"
                    % unknown
                )

    @staticmethod
    def _from_kafka_message_envelope_v1(
        timestamp: datetime, envelope: KafkaMessageEnvelopeV1
    ) -> "WebhookEvent":
        match envelope["encoding"]:
            case "CBOR":
                body = cbor2.loads(envelope["payload"])
            case "JSON":
                body = json.loads(envelope["payload"])

        return WebhookEvent(
            HTTPMethod[envelope["method"]],
            timestamp,
            {k: vs for (k, vs) in envelope["headers"]},
            body,
        )

headers instance-attribute

HTTP headers captured from the webhook request

method instance-attribute

HTTP method used to transmit the webhook event

payload instance-attribute

Webhook request body

timestamp instance-attribute

Time at which the webhook event occurred