Popularity
2.7
Growing
Activity
8.5
Growing
30
3
5

Programming language: Go

arpc alternatives and similar packages

Based on the "Distributed Systems" category

Do you think we are missing an alternative of arpc or a related project?

Add another 'Distributed Systems' Package

README

ARPC - More Effective Network Communication

GoDoc [MIT licensed][4] Build Status Go Report Card Coverage Statusd

Contents

Features

  • [x] Two-Way Calling
  • [x] Two-Way Notify
  • [x] Sync and Async Calling
  • [x] Sync and Async Response
  • [x] Batch Write | Writev | net.Buffers
  • [x] Broadcast
Pattern Interactive Directions Description
call two-way:c -> ss -> c request and response
notify two-way:c -> ss -> c request without response

Performance

  • simple echo load testing
Framework Protocol Codec Configuration Connection Num Goroutine Num Qps
arpc tcp/localhost encoding/json os: VMWare Ubuntu 18.04cpu: AMD 3500U 4c8tmem: 2G 8 10 80-100k
grpc http2/localhost protobuf os: VMWare Ubuntu 18.04cpu: AMD 3500U 4c8tmem: 2G 8 10 20-30k

Header Layout

  • LittleEndian
cmd async methodlen null bodylen sequence method body
1 byte 1 byte 1 bytes 1 bytes 4 bytes 8 bytes 0 or methodlen bytes ...

Installation

  1. Get and install arpc
$ go get -u github.com/lesismal/arpc
  1. Import in your code:
import "github.com/lesismal/arpc"

Quick start

package main

import "github.com/lesismal/arpc"

func main() {
    server := arpc.NewServer()

    // register router
    server.Handler.Handle("/echo", func(ctx *arpc.Context) {
        str := ""
        if err := ctx.Bind(&str); err == nil {
            ctx.Write(str)
        }
    })

    server.Run(":8888")
}
package main

import (
    "log"
    "net"
    "time"

    "github.com/lesismal/arpc"
)

func main() {
    client, err := arpc.NewClient(func() (net.Conn, error) {
        return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
    })
    if err != nil {
        panic(err)
    }

    client.Run()
    defer client.Stop()

    req := "hello"
    rsp := ""
    err = client.Call("/echo", &req, &rsp, time.Second*5)
    if err != nil {
        log.Fatalf("Call failed: %v", err)
    } else {
        log.Printf("Call Response: \"%v\"", rsp)
    }
}

API Examples

Register Routers

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })
handler.Handle("method", func(ctx *arpc.Context) { ... })

Client Call, CallAsync, Notify

  1. Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
  1. CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}

timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
    response := &Echo{}
    ctx.Bind(response)
    ... 
}, timeout)

// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallAsyncWith(ctx, "/call/echo", request, func(ctx *arpc.Context) {
//  response := &Echo{}
//  ctx.Bind(response)
//  ... 
// })
  1. Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)

Server Call, CallAsync, Notify

  1. Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
    client = ctx.Client
    // release client
    client.OnDisconnected(func(c *arpc.Client){
        client = nil
    })
})

go func() {
    for {
        time.Sleep(time.Second)
        if client != nil {
            client.Call(...)
            client.CallAsync(...)
            client.Notify(...)
        }
    }
}()
  1. Then Call/CallAsync/Notify

Broadcast - Notify

var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})

func broadcast() {
    msg := arpc.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i), nil)
    mux.RLock()
    for client := range clientMap {
        client.PushMsg(msg, arpc.TimeZero)
    }
    mux.RUnlock()
}

Async Response

var handler arpc.Handler

// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler

func asyncResponse(ctx *arpc.Context, data interface{}) {
    ctx.Write(data)
}

handler.Handle("/echo", func(ctx *arpc.Context) {
    req := ...
    err := ctx.Bind(req)
    if err == nil {
        go asyncResponse(ctx, req)
    }
})

Handle New Connection

// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
    ...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
    ...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
    ...
})

Handle Disconnected

// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
    ...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
    ...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
    ...
})

Handle Client's send queue overstock

// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
    ...
})

// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
    ...
})

// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
    ...
})

Custom Net Protocol

// server
var ln net.Listener = ...
svr := arpc.NewServer()
svr.Serve(ln)

// client
dialer := func() (net.Conn, error) { 
    return ... 
}
client, err := arpc.NewClient(dialer)

Custom Codec

var codec arpc.Codec = ...

// package
arpc.DefaultCodec = codec

// server
svr := arpc.NewServer()
svr.Codec = codec

// client
client, err := arpc.NewClient(...)
client.Codec = codec

Custom Logger

var logger arpc.Logger = ...
arpc.SetLogger(logger) // arpc.DefaultLogger = logger

Custom operations before conn's recv and send

arpc.DefaultHandler.BeforeRecv(func(conn net.Conn) error) {
    // ...
})

arpc.DefaultHandler.BeforeSend(func(conn net.Conn) error) {
    // ...
})

Custom arpc.Client's Reader from wrapping net.Conn

arpc.DefaultHandler.SetReaderWrapper(func(conn net.Conn) io.Reader) {
    // ...
})

Custom arpc.Client's send queue capacity

arpc.DefaultHandler.SetSendQueueSize(4096)


*Note that all licence references and agreements mentioned in the arpc README section above are relevant to that project's source code only.