Popularity
4.6
Growing
Activity
0.0
-
76
9
23

Programming language: Go
Tags: Messaging    
Latest version: v3.0.0

rabbus alternatives and similar packages

Based on the "Messaging" category

Do you think we are missing an alternative of rabbus or a related project?

Add another 'Messaging' Package

README

Rabbus 🚌 ✨

  • A tiny wrapper over amqp exchanges and queues.
  • In memory retries with exponential backoff for sending messages.
  • Protect producer calls with circuit breaker.
  • Automatic reconnect to RabbitMQ broker when connection is lost.
  • Go channel API.

Installation

go get -u github.com/rafaeljesus/rabbus

Usage

The rabbus package exposes an interface for emitting and listening RabbitMQ messages.

Emit

import (
    "context"
    "time"

    "github.com/rafaeljesus/rabbus"
)

func main() {
    timeout := time.After(time.Second * 3)
    cbStateChangeFunc := func(name, from, to string) {
        // do something when state is changed
    }
    r, err := rabbus.New(
        rabbusDsn,
        rabbus.Durable(true),
        rabbus.Attempts(5),
        rabbus.Sleep(time.Second*2),
        rabbus.Threshold(3),
        rabbus.OnStateChange(cbStateChangeFunc),
    )
    if err != nil {
        // handle error
    }

    defer func(r Rabbus) {
        if err := r.Close(); err != nil {
            // handle error
        }
    }(r)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go r.Run(ctx)

    msg := rabbus.Message{
        Exchange: "test_ex",
        Kind:     "topic",
        Key:      "test_key",
        Payload:  []byte(`foo`),
    }

    r.EmitAsync() <- msg

    for {
        select {
        case <-r.EmitOk():
            // message was sent
        case <-r.EmitErr():
            // failed to send message
        case <-timeout:
            // handle timeout error
        }
    }
}

Listen

import (
    "context"
    "encoding/json"
    "time"

    "github.com/rafaeljesus/rabbus"
)

func main() {
    timeout := time.After(time.Second * 3)
    cbStateChangeFunc := func(name, from, to string) {
        // do something when state is changed
    }
    r, err := rabbus.New(
        rabbusDsn,
        rabbus.Durable(true),
        rabbus.Attempts(5),
        rabbus.Sleep(time.Second*2),
        rabbus.Threshold(3),
        rabbus.OnStateChange(cbStateChangeFunc),
    )
    if err != nil {
        // handle error
    }

    defer func(r Rabbus) {
        if err := r.Close(); err != nil {
            // handle error
        }
    }(r)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go r.Run(ctx)

    messages, err := r.Listen(rabbus.ListenConfig{
        Exchange:    "events_ex",
        Kind:        "topic",
        Key:         "events_key",
        Queue:       "events_q",
        DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"),
        BindArgs:    rabbus.NewBindArgs().With("baz", "qux"),
    })
    if err != nil {
        // handle errors during adding listener
    }
    defer close(messages)

    go func(messages chan ConsumerMessage) {
        for m := range messages {
            m.Ack(false)
        }
    }(messages)
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael