Programming language: Go
License: MIT License
Tags: Stream Processing    
Latest version: v0.9.8

machine alternatives and similar packages

Based on the "Stream Processing" category.
Alternatively, view machine alternatives based on common mentions on social networks and blogs.

Do you think we are missing an alternative of machine or a related project?

Add another 'Stream Processing' Package


Go PkgGoDev GoDoc Go Report Card Codacy Badge Codacy Badge Version Badge

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.


Add the primary library to your project

  go get github.com/whitaker-io/machine/v2

The main function types are:

// Applicative is a function that is applied to payload and used for transformations
type Applicative[T any] func(d T) T

// Test is a function used in composition of And/Or operations and used to
// filter results down different branches with transformations
type Test[T any] func(d T) (T, error)

// Filter is a function that can be used to filter the payload.
type Filter[T any] func(d T) bool

// Transform is a function used by the Y Combinator to perform a recursion
// on the payload.
// Example:
// func(f Applicative[int]) Applicative[int] {
//   return func(x int) int {
//       if x < 1 {
//           return 1
//       } else {
//           return x * f(x-1)
//       }
//   }
// }
type Transform[T any] func(d Applicative[T]) Applicative[T]

These are used in the Builder type provided by the Stream type:

// Stream is a representation of a data stream and its associated logic.
// The Builder method is the entrypoint into creating the data processing flow.
// All branches of the Stream are required to end in an OutputTo call.
type Stream[T any] interface {
    Start(ctx context.Context, input chan T) error
    Builder() Builder[T]

// Builder is the interface provided for creating a data processing stream.
type Builder[T any] interface {
    Then(a Applicative[T]) Builder[T]
    Y(x Transform[T]) Builder[T]
    Or(x ...Test[T]) (Builder[T], Builder[T])
    And(x ...Test[T]) (Builder[T], Builder[T])
    Filter(f Filter[T]) (Builder[T], Builder[T])
    Duplicate() (Builder[T], Builder[T])
    Loop(x Filter[T]) (loop, out Builder[T])
    Distribute(Edge[T]) Builder[T]
    OutputTo(x chan T)

Distribute is a special method used for fan-out operations. It takes an instance of Edge[T] and can be used most typically to distribute work via a Pub/Sub. The Edge[T] interface is as follows:

// Edge is an interface that is used for transferring data between vertices
type Edge[T any] interface {
    ReceiveOn(ctx context.Context, channel chan T)
    Send(payload T)

The Send method is used for data leaving the associated vertex and the ReceiveOn method is used by the following vertex to receive data. The context.Context used is the same as the one used to start the Stream.

You can also setup Telemetry and other options by passing in the Option type

// Option type for holding machine settings.
type Option[T any] struct {
    // FIFO controls the processing order of the payloads
    // If set to true the system will wait for one payload
    // to be processed before starting the next.
    FIFO bool `json:"fifo,omitempty"`
    // BufferSize sets the buffer size on the edge channels between the
    // vertices, this setting can be useful when processing large amounts
    // of data with FIFO turned on.
    BufferSize int `json:"buffer_size,omitempty"`
    // Telemetry provides the ability to enable and configure telemetry
    Telemetry Telemetry[T] `json:"telemetry,omitempty"`
    // PanicHandler is a function that is called when a panic occurs
    PanicHandler func(err error, payload T) `json:"-"`
    // DeepCopyBetweenVerticies controls whether DeepCopy is performed between verticies.
    // This is useful if the functions applied are holding copies of the payload for
    // longer than they process it. DeepCopy must be set
    DeepCopyBetweenVerticies bool `json:"deep_copy_between_vetricies,omitempty"`
    // DeepCopy is a function to preform a deep copy of the Payload
    DeepCopy func(T) T `json:"-"`

// Telemetry type for holding telemetry settings.
type Telemetry[T any] interface {
    IncrementPayloadCount(vertexName string)
    IncrementErrorCount(vertexName string)
    Duration(vertexName string, duration time.Duration)
    RecordPayload(vertexName string, payload T)
    RecordError(vertexName string, payload T, err error)

🤝 Contributing

Contributions, issues and feature requests are welcome. Feel free to check issues page if you want to contribute. [Check the contributing guide](./CONTRIBUTING.md).


👤 Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

*Note that all licence references and agreements mentioned in the machine README section above are relevant to that project's source code only.