Popularity
2.3
Stable
Activity
0.0
Stable
28
2
8

Programming language: Go
License: MIT License
Tags: Messaging    
Latest version: v0.1.14

ami alternatives and similar packages

Based on the "Messaging" category.
Alternatively, view ami alternatives based on common mentions on social networks and blogs.

Do you think we are missing an alternative of ami or a related project?

Add another 'Messaging' Package

README

Ami

Go client to reliable queues based on Redis Cluster Streams.

Godoc Coverage Status Go Report Card Go

Consume/produce performance

Performance is dependent from:

  • Redis Cluster nodes count;
  • ping RTT from client to Redis Cluster master nodes;
  • network speed between nodes;
  • message sizes;
  • Ami configuration.

As example, 10-nodes Redis Cluster with half of nodes in other datacenter (50 msec ping), 1 master/1 slave, with message "{}" got:

$ go run examples/performance/main.go
Produced 1000000 in 3.423883 sec, rps 292066.022156
Consumed 151000 in 1.049238 sec, rps 143913.931722
Acked 151000 in 0.973587 sec, rps 155096.612263

Producer example

    type errorLogger struct{}

    func (l *errorLogger) AmiError(err error) {
        println("Got error from Ami:", err.Error())
    }

    pr, err := ami.NewProducer(
        ami.ProducerOptions{
            ErrorNotifier:     &errorLogger{},
            Name:              "ruthie",
            PendingBufferSize: 10000000,
            PipeBufferSize:    50000,
            PipePeriod:        time.Microsecond * 1000,
            ShardsCount:       10,
        },
        &redis.ClusterOptions{
            Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
            ReadTimeout:  time.Second * 60,
            WriteTimeout: time.Second * 60,
        },
    )
    if err != nil {
        panic(err)
    }

    for i := 0; i < 10000; i++ {
        pr.Send("{}")
    }

    pr.Close()

Consumer example

    type errorLogger struct{}

    func (l *errorLogger) AmiError(err error) {
        println("Got error from Ami:", err.Error())
    }

    cn, err := ami.NewConsumer(
        ami.ConsumerOptions{
            Consumer:          "alice",
            ErrorNotifier:     &errorLogger{},
            Name:              "ruthie",
            PendingBufferSize: 10000000,
            PipeBufferSize:    50000,
            PipePeriod:        time.Microsecond * 1000,
            PrefetchCount:     100,
            ShardsCount:       10,
        },
        &redis.ClusterOptions{
            Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
            ReadTimeout:  time.Second * 60,
            WriteTimeout: time.Second * 60,
        },
    )
    if err != nil {
        panic(err)
    }

    c := cn.Start()

    wg := sync.WaitGroup{}
    wg.Add(1)

    go func() {
        for {
            m, more := <-c
            if !more {
                break
            }
            println("Got", m.Body, "ID", m.ID)
            cn.Ack(m)
        }
        wg.Done()
    }()

    time.Sleep(time.Second)

    cn.Stop()
    wg.Wait()

    cn.Close()