Description
Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?
Go Typed Redis Streams alternatives and similar packages
Based on the "Text Processing" category.
Alternatively, view gtrs alternatives based on common mentions on social networks and blogs.
-
micro-editor
A modern and intuitive terminal-based text editor -
sh
A shell parser, formatter, and interpreter with bash support; includes shfmt -
go-humanize
Go Humans! (formatters for units to human friendly sizes) -
goldmark
:trophy: A markdown parser written in Go. Easy to extend, standard(CommonMark) compliant, well structured. -
bluemonday
bluemonday: a fast golang HTML sanitizer (inspired by the OWASP Java HTML Sanitizer) to scrub user generated content of XSS -
commonregex
๐ซ A collection of common regular expressions for Go -
mxj
Decode / encode XML to/from map[string]interface{} (or JSON); extract values with dot-notation paths and wildcards. Replaces x2j and j2x packages. -
Dataflow kit
Extract structured data from web sites. Web sites scraping. -
xpath
XPath package for Golang, supports HTML, XML, JSON document query. -
htmlquery
htmlquery is golang XPath package for HTML query. -
omniparser
omniparser: a native Golang ETL streaming parser and transform library for CSV, JSON, XML, EDI, text, etc. -
html-to-markdown
โ๏ธ Convert HTML to Markdown. Even works with entire websites and can be extended through rules. -
Koazee
A StreamLike, Immutable, Lazy Loading and smart Golang Library to deal with slices. -
go-pkg-rss
This package reads RSS and Atom feeds and provides a caching mechanism that adheres to the feed specs. -
go-edlib
๐ String comparison and edit distance algorithms library, featuring : Levenshtein, LCS, Hamming, Damerau levenshtein (OSA and Adjacent transpositions algorithms), Jaro-Winkler, Cosine, etc... -
gotabulate
Gotabulate - Easily pretty-print your tabular data with Go -
goribot
A simple golang spider/scraping framework,build a spider in 3 lines. -
goq
A declarative struct-tag-based HTML unmarshaling or scraping package for Go built on top of the goquery library -
xquery
XQuery lets you extract data from HTML/XML documents using XPath expression. -
github_flavored_markdown
GitHub Flavored Markdown renderer with fenced code block highlighting, clickable header anchor links. -
go-pkg-xmlx
Extension to the standard Go XML package. Maintains a node tree that allows forward/backwards browsing and exposes some simple single/multi-node search functions. -
gospider
โก Light weight Golang spider framework | ่ฝป้็ Golang ็ฌ่ซๆกๆถ -
strutil-go
Golang metrics for calculating string similarity and other string utility functions -
editorconfig-core-go
EditorConfig Core written in Go -
shell2telegram
Telegram bot constructor from command-line -
did
A golang package to work with Decentralized Identifiers (DIDs) -
regroup
Match regex group into go struct using struct tags and automatic parsing -
go-fixedwidth
Encoding and decoding for fixed-width formatted data -
go-zero-width
Zero-width character detection and removal for Go -
cat
Extract text from plaintext, .docx, .odt and .rtf files. Pure go. -
align
A general purpose application and library for aligning text. -
pagser
Pagser is a simple, extensible, configurable parse and deserialize html page to struct based on goquery and struct tags for golang crawler
Static code analysis for 29 languages.
Do you think we are missing an alternative of Go Typed Redis Streams or a related project?
Popular Comparisons
README
Go Typed Redis Streams
Effectively reading Redis streams requires some work: counting ids, prefetching and buffering, asynchronously sending acknowledgements and parsing entries. What if it was just the following?
consumer := NewGroupConsumer[MyType](...)
for msg := range consumer.Chan() {
// Handle mssage
consumer.Ack(msg)
}
Wait...it is! ๐ฅ
Quickstart
Define a type that represents your stream data. It'll be parsed automatically with all field names converted to snake case. Missing fields will be skipped silently. You can also use the ConvertibleFrom
and ConvertibleTo
interfaces to do custom parsing.
// maps to {"name": , "priority": }
type Event struct {
Name string
Priority int
}
Consumers
Consumers allow reading redis streams through Go channels. Specify context, a redis client and where to start reading. Make sure to specify StreamConsumerConfig
, if you don't like the default ones or want optimal performance. New entries are fetched asynchronously to provide a fast flow ๐
consumer := NewConsumer[Event](ctx, rdb, StreamIDs{"my-stream": "$"})
for msg := range cs.Chan() {
if msg.Err != nil {
continue
}
var event Event = msg.Data
}
Don't forget to Close()
the consumer. If you want to start reading again where you left off, you can save the last StreamIDs.
ids := cs.Close()
Group Consumers
They work just like regular consumers and allow sending acknowledgements asynchronously. Beware to use Ack
only if you keep processing new messages - that is inside a consuming loop or from another goroutine. Even though this introduces a two-sided depdendecy, the consumer is avoids deadlocks.
cs := NewGroupConsumer[Event](ctx, rdb, "group", "consumer", "stream", ">")
for msg := range cs.Chan() {
cs.Ack(msg)
}
Stopped processing? Check your errors ๐
// Wait for all acknowledgements to complete
errors := cs.AwaitAcks()
// Acknowledgements that were not sent yet or their errors were not consumed
remaining := cs.Close()
Error handling
This is where the simplicity fades a litte, but only a little :) The channel provides not just values, but also errors. Those can be only of three types:
ReadError
reports a failed XRead/XReadGroup request. Consumer will close the channel after this errorAckError
reports a failed XAck requestParseError
speaks for itself
Consumers don't send errors on cancellation and immediately close the channel.
switch errv := msg.Err.(type) {
case nil: // This interface-nil comparison in safe
fmt.Println("Got", msg.Data)
case ReadError:
fmt.Println("ReadError caused by", errv.Err)
return // last message in channel
case AckError:
fmt.Printf("Ack failed %v-%v caused by %v\n", msg.Stream, msg.ID, errv.Err)
case ParseError:
fmt.Println("Failed to parse", errv.Data)
}
All those types are wrapping errors. For example, ParseError
can be unwrapped to:
- Find out why the default parser failed via
FieldParseError
(e.g. assigning string to int field) - Catch custom errors from
ConvertibleFrom
var fpe FieldParseError
if errors.As(msg.Err, &fpe) {
fmt.Printf("Failed to parse field %v because %v", fpe.Field, fpe.Err)
}
errors.Is(msg.Err, errMyTypeFailedToParse)
Streams
Streams are simple wrappers for basic redis commands on a stream.
stream := NewStream[Event](rdb, "my-stream")
stream.Add(ctx, Event{
Kind: "Example event",
Priority: 1,
})
Installation
go get github.com/dranikpg/gtrs
Gtrs is still in its early stages and might change in further releases.
Examples
- This is a small example for reading from three consumers in parallel and handling all types of errors.
Performance
go test -run ^$ -bench BenchmarkConsumer -cpu=1
The iteration cost on a mocked client is about 500-700 ns depending on buffer sizes, which gives it a throughput close to 2 million entries a second ๐. Getting bad results? Make sure to set large buffer sizes in the options.