Popularity
4.5
Declining
Activity
0.0
Stable
97
9
23

Programming language: Go
License: MIT License
Tags: Messaging    
Latest version: v3.0.0

rabbus alternatives and similar packages

Based on the "Messaging" category.
Alternatively, view rabbus alternatives based on common mentions on social networks and blogs.

Do you think we are missing an alternative of rabbus or a related project?

Add another 'Messaging' Package

README

Rabbus 🚌 ✨

  • A tiny wrapper over amqp exchanges and queues.
  • In memory retries with exponential backoff for sending messages.
  • Protect producer calls with circuit breaker.
  • Automatic reconnect to RabbitMQ broker when connection is lost.
  • Go channel API.

Installation

go get -u github.com/rafaeljesus/rabbus

Usage

The rabbus package exposes an interface for emitting and listening RabbitMQ messages.

Emit

import (
    "context"
    "time"

    "github.com/rafaeljesus/rabbus"
)

func main() {
    timeout := time.After(time.Second * 3)
    cbStateChangeFunc := func(name, from, to string) {
        // do something when state is changed
    }
    r, err := rabbus.New(
        rabbusDsn,
        rabbus.Durable(true),
        rabbus.Attempts(5),
        rabbus.Sleep(time.Second*2),
        rabbus.Threshold(3),
        rabbus.OnStateChange(cbStateChangeFunc),
    )
    if err != nil {
        // handle error
    }

    defer func(r Rabbus) {
        if err := r.Close(); err != nil {
            // handle error
        }
    }(r)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go r.Run(ctx)

    msg := rabbus.Message{
        Exchange: "test_ex",
        Kind:     "topic",
        Key:      "test_key",
        Payload:  []byte(`foo`),
    }

    r.EmitAsync() <- msg

    for {
        select {
        case <-r.EmitOk():
            // message was sent
        case <-r.EmitErr():
            // failed to send message
        case <-timeout:
            // handle timeout error
        }
    }
}

Listen

import (
    "context"
    "encoding/json"
    "time"

    "github.com/rafaeljesus/rabbus"
)

func main() {
    timeout := time.After(time.Second * 3)
    cbStateChangeFunc := func(name, from, to string) {
        // do something when state is changed
    }
    r, err := rabbus.New(
        rabbusDsn,
        rabbus.Durable(true),
        rabbus.Attempts(5),
        rabbus.Sleep(time.Second*2),
        rabbus.Threshold(3),
        rabbus.OnStateChange(cbStateChangeFunc),
    )
    if err != nil {
        // handle error
    }

    defer func(r Rabbus) {
        if err := r.Close(); err != nil {
            // handle error
        }
    }(r)

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    go r.Run(ctx)

    messages, err := r.Listen(rabbus.ListenConfig{
        Exchange:    "events_ex",
        Kind:        "topic",
        Key:         "events_key",
        Queue:       "events_q",
        DeclareArgs: rabbus.NewDeclareArgs().WithMessageTTL(15 * time.Minute).With("foo", "bar"),
        BindArgs:    rabbus.NewBindArgs().With("baz", "qux"),
    })
    if err != nil {
        // handle errors during adding listener
    }
    defer close(messages)

    go func(messages chan ConsumerMessage) {
        for m := range messages {
            m.Ack(false)
        }
    }(messages)
}

Contributing

  • Fork it
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create new Pull Request

Badges

Build Status Go Report Card Go Doc


GitHub @rafaeljesus  ·  Medium @_jesus_rafael  ·  Twitter @_jesus_rafael