gpool alternatives and similar packages
Based on the "Goroutines" category.
Alternatively, view gpool alternatives based on common mentions on social networks and blogs.
-
ants
🐜🐜🐜 ants is a high-performance and low-cost goroutine pool in Go./ ants 是一个高性能且低损耗的 goroutine 池。 -
goworker
goworker is a Go-based background worker that runs 10 to 100,000* times faster than Ruby-based workers. -
pool
:speedboat: a limited consumer goroutine or unlimited goroutine pool for easier goroutine handling and cancellation -
Goflow
Simply way to control goroutines execution order based on dependencies -
artifex
Simple in-memory job queue for Golang using worker-based dispatching -
go-workers
👷 Library for safely running groups of workers concurrently or consecutively that require input and output through channels -
async
A safe way to execute functions asynchronously, recovering them in case of panic. It also provides an error stack aiming to facilitate fail causes discovery. -
go-actor
A tiny library for writing concurrent programs in Go using actor model -
gollback
Go asynchronous simple function utilities, for managing execution of closures and callbacks -
semaphore
🚦 Semaphore pattern implementation with timeout of lock/unlock operations. -
Hunch
Hunch provides functions like: All, First, Retry, Waterfall etc., that makes asynchronous flow control more intuitive. -
go-do-work
Dynamically resizable pools of goroutines which can queue an infinite number of jobs. -
gowl
Gowl is a process management and process monitoring tool at once. An infinite worker pool gives you the ability to control the pool and processes and monitor their status. -
routine
go routine control, abstraction of the Main and some useful Executors.如果你不会管理Goroutine的话,用它 -
kyoo
Unlimited job queue for go, using a pool of concurrent workers processing the job queue entries -
go-waitgroup
A sync.WaitGroup with error handling and concurrency control -
channelify
Make functions return a channel for parallel processing via go routines. -
execpool
A pool that spins up a given number of processes in advance and attaches stdin and stdout when needed. Very similar to FastCGI but works for any command. -
conexec
A concurrent toolkit to help execute funcs concurrently in an efficient and safe way. It supports specifying the overall timeout to avoid blocking. -
concurrency-limiter
Concurrency limiter with support for timeouts , dynamic priority and context cancellation of goroutines. -
hands
Hands is a process controller used to control the execution and return strategies of multiple goroutines. -
queue
package queue gives you a queue group accessibility. Helps you to limit goroutines, wait for the end of the all goroutines and much more. -
async-job
AsyncJob is an asynchronous queue job manager with light code, clear and speed. I hope so ! 😬
InfluxDB - Power Real-Time Data Analytics at Scale
Do you think we are missing an alternative of gpool or a related project?
Popular Comparisons
README
gpool - a generic context-aware resizable goroutines pool to bound concurrency.
Installation
$ go get github.com/sherifabdlnaby/gpool
import "github.com/sherifabdlnaby/gpool"
Introduction
Easily manages a resizeable pool of context aware goroutines to bound concurrency, A Job
is Enqueued to the pool and only N
jobs can be processing concurrently at the same time.
A Job is simply a
func(){}
, when youEnqueue(..)
a job, the enqueue call will return ONCE the job has started processing. Otherwise if the pool is full it will block until:- pool has room for the job.
- job's
context
is canceled. - the pool is stopped.
The Pool can be re-sized using
Resize()
that will resize the pool in a concurrent safe-way.Resize
can enlarge the pool and any blocked enqueue will unblock after pool is resized, in case of shrinking the poolresize
will not affect any already processing/waiting jobs.Enqueuing a Job will return error
nil
once a job starts,ErrPoolClosed
if the pool is closed, or the context's error if the job's context is canceled while blocking waiting for the pool.Stopping the Pool using
pool.Stop()
will wait for all processing jobs to finish before returning, it will also unblock any blocked job enqueues (enqueues will return ErrPoolClosed).Start
,Stop
, andResize(N)
are all concurrent safe and can be called from multiple goroutines, subsequent calls of Start or Stop has no effect unless called interchangeably.
Usage
- Create new pool
pool, err := gpool.NewPool(concurrency)
Enqueue a job
job := func() { time.Sleep(2000 * time.Millisecond) fmt.Println("did some work") } // Enqueue Job err := pool.Enqueue(ctx, job)
A call to
pool.Enqueue()
will returnnil
ifjob
started processing, blocks if the pool is full,ctx.Err()
if context was canceled while waiting/blocking, or finallyErrPoolClosed
if the pool stopped or was never started.Resize the pool
err = pool.Resize(size)
Will live change the size of the pool, If new size is larger, waiting job enqueues from another goroutines will be unblocked to fit the new size, and if new size is smaller, any new enqueues will block until the current size of the pool is less than the new one.
Stop the pool
pool.Stop()
- ALL Blocked/Waiting jobs will return immediately.
- Stop() WILL Block until all running jobs is done.
Different types of "Enqueues"
Enqueue(ctx, job)
returns ONCE the job has started executing (not after job finishes/return)EnqueueAndWait(ctx, job)
returns ONCE the job has started and finished executing.TryEnqueue(job)
will not block if the pool is full, returnstrue
ONCE the job has started executing andfalse
if pool is full.TryEnqueueAndWait(job)
will not block if the pool is full, returnstrue
ONCE the job has started and finished executing. andfalse
if pool is full.
Benchmarks
$ go test -bench=. -cpu=2 -benchmem
go test -bench=. -cpu=2 -benchmem
goos: darwin
goarch: amd64
pkg: github.com/sherifabdlnaby/gpool
BenchmarkThroughput/PoolSize[2]-2 853724 730 ns/op 125 B/op 2 allocs/op
BenchmarkThroughput/PoolSize[10]-2 3647638 329 ns/op 10 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[100]-2 4869789 248 ns/op 0 B/op 0 allocs/op
BenchmarkThroughput/PoolSize[1000]-2 4320566 280 ns/op 0 B/op 0 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[2]BulkJobs[2]-2 530328 2146 ns/op 275 B/op 5 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[2]BulkJobs[100]-2 10000 112478 ns/op 12737 B/op 239 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[2]BulkJobs[1000]-2 1069 1109384 ns/op 126943 B/op 2380 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10]BulkJobs[2]-2 1417808 844 ns/op 58 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10]BulkJobs[100]-2 30556 39187 ns/op 1764 B/op 33 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10]BulkJobs[1000]-2 3012 385652 ns/op 15737 B/op 295 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[100]BulkJobs[2]-2 1986571 609 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[100]BulkJobs[100]-2 41235 29004 ns/op 37 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[100]BulkJobs[1000]-2 4080 290791 ns/op 188 B/op 4 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[1000]BulkJobs[2]-2 1963564 605 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[1000]BulkJobs[100]-2 42000 28442 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[1000]BulkJobs[1000]-2 4333 284865 ns/op 17 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[2]-2 1963168 611 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[100]-2 42238 28419 ns/op 16 B/op 1 allocs/op
BenchmarkBulkJobs_UnderLimit/PoolSize[10000]BulkJobs[1000]-2 4171 283981 ns/op 29 B/op 1 allocs/op
BenchmarkOneThroughput/PoolSize[S] = Enqueue Async Jobs ( Will not wait for result ) in a Pool of size = S
BenchmarkBulkJobs/PoolSize[S]BulkJobs[J] = Enqueue J
Jobs In Pool of size S
at a time where J
< S
Examples
Example 1 - Simple Job Enqueue
func main() {
concurrency := 2
// Create and start pool.
pool, _ := gpool.NewPool(concurrency)
defer pool.Stop()
// Create JOB
resultChan1 := make(chan int)
ctx := context.Background()
job := func() {
time.Sleep(2000 * time.Millisecond)
resultChan1 <- 1337
}
// Enqueue Job
err1 := pool.Enqueue(ctx, job)
if err1 != nil {
log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
return
}
log.Printf("Job Enqueued and started processing")
log.Printf("Job Done, Received: %v", <-resultChan1)
}
Example 2 - Enqueue A Job with Timeout
func main() {
concurrency := 2
// Create and start pool.
pool, _ := gpool.NewPool(concurrency)
defer pool.Stop()
// Create JOB
resultChan := make(chan int)
ctx := context.Background()
job := func() {
resultChan <- 1337
}
// Enqueue 2 Jobs to fill pool (Will not finish unless we pull result from resultChan)
_ = pool.Enqueue(ctx, job)
_ = pool.Enqueue(ctx, job)
ctxWithTimeout, _ := context.WithTimeout(ctx, 1000 * time.Millisecond)
// Will block for 1 second only because of Timeout
err1 := pool.Enqueue(ctxWithTimeout, job)
if err1 != nil {
log.Printf("Job was not enqueued. Error: [%s]", err1.Error())
}
log.Printf("Job 1 Done, Received: %v", <-resultChan)
log.Printf("Job 2 Done, Received: %v", <-resultChan)
}
Example 3
// size Workers / Concurrent jobs of the Pool
const concurrency = 2
func main() {
pool, _ := gpool.NewPool(concurrency)
defer pool.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
for i := 0; i < 10; i++ {
// Small Interval for more readable output
time.Sleep(500 * time.Millisecond)
go func(i int) {
x := make(chan int, 1)
log.Printf("Job [%v] Enqueueing", i)
err := pool.Enqueue(ctx, func() {
time.Sleep(2000 * time.Millisecond)
x <- i
})
if err != nil {
log.Printf("Job [%v] was not enqueued. [%s]", i, err.Error())
return
}
log.Printf("Job [%v] Enqueue-ed ", i)
log.Printf("Job [%v] Receieved, Result: [%v]", i, <-x)
}(i)
}
}()
// Uncomment to demonstrate ctx cancel of jobs.
//time.Sleep(100 * time.Millisecond)
//cancel()
time.Sleep(5000 * time.Millisecond)
fmt.Println("Stopping...")
pool.Stop()
fmt.Println("Stopped")
fmt.Println("Sleeping for couple of seconds so canceled job have a chance to print out their status")
time.Sleep(4000 * time.Millisecond)
}
Output
2019/01/08 20:15:39 Job [0] Enqueueing
2019/01/08 20:15:39 Job [0] Enqueue-ed
2019/01/08 20:15:39 Job [1] Enqueueing
2019/01/08 20:15:39 Job [1] Enqueue-ed
2019/01/08 20:15:40 Job [2] Enqueueing
2019/01/08 20:15:40 Job [3] Enqueueing
2019/01/08 20:15:41 Job [0] Receieved, Result: [0]
2019/01/08 20:15:41 Job [2] Enqueue-ed
2019/01/08 20:15:41 Job [4] Enqueueing
2019/01/08 20:15:41 Job [3] Enqueue-ed
2019/01/08 20:15:41 Job [1] Receieved, Result: [1]
2019/01/08 20:15:41 Job [5] Enqueueing
2019/01/08 20:15:42 Job [6] Enqueueing
2019/01/08 20:15:42 Job [7] Enqueueing
2019/01/08 20:15:43 Job [4] Enqueue-ed
2019/01/08 20:15:43 Job [2] Receieved, Result: [2]
2019/01/08 20:15:43 Job [8] Enqueueing
Stopping...
2019/01/08 20:15:43 Job [7] was not enqueued. [pool is closed]
2019/01/08 20:15:43 Job [5] was not enqueued. [pool is closed]
2019/01/08 20:15:43 Job [6] was not enqueued. [pool is closed]
2019/01/08 20:15:43 Job [3] Receieved, Result: [3]
2019/01/08 20:15:43 Job [8] was not enqueued. [pool is closed]
2019/01/08 20:15:43 Job [9] Enqueueing
Stopped
2019/01/08 20:15:45 Job [4] Receieved, Result: [4]
Sleeping for couple of seconds so canceled job have a chance to print out their status
2019/01/08 20:15:45 Job [9] was not enqueued. [pool is closed]
Process finished with exit code 0
License
MIT License Copyright (c) 2019 Sherif Abdel-Naby
Contribution
PR(s) are Open and Welcomed.
*Note that all licence references and agreements mentioned in the gpool README section above
are relevant to that project's source code only.