Description
Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?
Go Typed Redis Streams alternatives and similar packages
Based on the "Text Processing" category.
Alternatively, view gtrs alternatives based on common mentions on social networks and blogs.
-
goldmark
:trophy: A markdown parser written in Go. Easy to extend, standard(CommonMark) compliant, well structured. -
bluemonday
bluemonday: a fast golang HTML sanitizer (inspired by the OWASP Java HTML Sanitizer) to scrub user generated content of XSS -
omniparser
omniparser: a native Golang ETL streaming parser and transform library for CSV, JSON, XML, EDI, text, etc. -
html-to-markdown
โ๏ธ Convert HTML to Markdown. Even works with entire websites and can be extended through rules. -
mxj
Decode / encode XML to/from map[string]interface{} (or JSON); extract values with dot-notation paths and wildcards. Replaces x2j and j2x packages. -
go-pkg-rss
DISCONTINUED. This package reads RSS and Atom feeds and provides a caching mechanism that adheres to the feed specs. -
go-edlib
๐ String comparison and edit distance algorithms library, featuring : Levenshtein, LCS, Hamming, Damerau levenshtein (OSA and Adjacent transpositions algorithms), Jaro-Winkler, Cosine, etc... -
goq
A declarative struct-tag-based HTML unmarshaling or scraping package for Go built on top of the goquery library -
go-pkg-xmlx
DISCONTINUED. Extension to the standard Go XML package. Maintains a node tree that allows forward/backwards browsing and exposes some simple single/multi-node search functions. -
github_flavored_markdown
GitHub Flavored Markdown renderer with fenced code block highlighting, clickable header anchor links. -
pagser
Pagser is a simple, extensible, configurable parse and deserialize html page to struct based on goquery and struct tags for golang crawler
InfluxDB - Purpose built for real-time analytics at any scale.
Do you think we are missing an alternative of Go Typed Redis Streams or a related project?
Popular Comparisons
README
Go Typed Redis Streams
Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?
consumer := NewGroupConsumer[MyType](...)
for msg := range consumer.Chan() {
// Handle mssage
consumer.Ack(msg)
}
Wait...it is! ๐ฅ
Quickstart
Define a type that represents your stream data. It'll be parsed automatically with all field names converted to snake case. Missing fields will be skipped silently. You can also use the ConvertibleFrom
and ConvertibleTo
interfaces to do custom parsing.
// maps to {"name": , "priority": }
type Event struct {
Name string
Priority int
}
Consumers
Consumers allow reading redis streams through Go channels. Specify context, a redis client and where to start reading. Make sure to specify StreamConsumerConfig
, if you don't like the default ones or want optimal performance. New entries are fetched asynchronously to provide a fast flow ๐
consumer := NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})
for msg := range cs.Chan() {
if msg.Err != nil {
continue
}
var event Event = msg.Data
}
Don't forget to Close()
the consumer. If you want to start reading again where you left off, you can save the last StreamIDs.
ids := cs.Close()
Group Consumers
They work just like regular consumers and allow sending acknowledgements asynchronously. Beware to use Ack
only if you keep processing new messages - that is inside a consuming loop or from another goroutine. Even though this introduces a two-sided depdendecy, the consumer is avoids deadlocks.
cs := NewGroupConsumer[Event](ctx, rdb, "group", "consumer", "stream", ">")
for msg := range cs.Chan() {
cs.Ack(msg)
}
Stopped processing? Check your errors ๐
// Wait for all acknowledgements to complete
errors := cs.AwaitAcks()
// Acknowledgements that were not sent yet or their errors were not consumed
remaining := cs.Close()
Error handling
This is where the simplicity fades a litte, but only a little :) The channel provides not just values, but also errors. Those can be only of three types:
ReadError
reports a failed XRead/XReadGroup request. Consumer will close the channel after this errorAckError
reports a failed XAck requestParseError
speaks for itself
Consumers don't send errors on cancellation and immediately close the channel.
switch errv := msg.Err.(type) {
case nil: // This interface-nil comparison in safe
fmt.Println("Got", msg.Data)
case ReadError:
fmt.Println("ReadError caused by", errv.Err)
return // last message in channel
case AckError:
fmt.Printf("Ack failed %v-%v caused by %v\n", msg.Stream, msg.ID, errv.Err)
case ParseError:
fmt.Println("Failed to parse", errv.Data)
}
All those types are wrapping errors. For example, ParseError
can be unwrapped to:
- Find out why the default parser failed via
FieldParseError
(e.g. assigning string to int field) - Catch custom errors from
ConvertibleFrom
var fpe FieldParseError
if errors.As(msg.Err, &fpe) {
fmt.Printf("Failed to parse field %v because %v", fpe.Field, fpe.Err)
}
errors.Is(msg.Err, errMyTypeFailedToParse)
Streams
Streams are simple wrappers for basic redis commands on a stream.
stream := NewStream[Event](rdb, "my-stream")
stream.Add(ctx, Event{
Kind: "Example event",
Priority: 1,
})
Installation
go get github.com/dranikpg/gtrs
Gtrs is still in its early stages and might change in further releases.
Examples
- This is a small example for reading from three consumers in parallel and handling all types of errors.
Performance
go test -run ^$ -bench BenchmarkConsumer -cpu=1
The iteration cost on a mocked client is about 500-700 ns depending on buffer sizes, which gives it a throughput close to 2 million entries a second ๐. Getting bad results? Make sure to set large buffer sizes in the options.