beam package - github.com/apache/beam/sdks/v2/go/pkg/beam - Go Packages

Package beam is an implementation of the Apache Beam (https://beam.apache.org) programming model in Go. Beam provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.

For more on the Beam model see: https://beam.apache.org/documentation/programming-guide

For design choices this implementation makes see: https://s.apache.org/beam-go-sdk-design-rfc

package main

import (
	"context"
	"fmt"
	"regexp"
	"strings"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/direct"
)

func main() {
	// In order to start creating the pipeline for execution, a Pipeline object is needed.
	p := beam.NewPipeline()
	s := p.Root()

	// The pipeline object encapsulates all the data and steps in your processing task.
	// It is the basis for creating the pipeline's data sets as PCollections and its operations
	// as transforms.

	// The PCollection abstraction represents a potentially distributed,
	// multi-element data set. You can think of a PCollection as “pipeline” data;
	// Beam transforms use PCollection objects as inputs and outputs. As such, if
	// you want to work with data in your pipeline, it must be in the form of a
	// PCollection.

	// Transformations are applied in a scoped fashion to the pipeline. The scope
	// can be obtained from the pipeline object.

	// Start by reading text from an input files, and receiving a PCollection.
	lines := textio.Read(s, "protocol://path/file*.txt")

	// Transforms are added to the pipeline so they are part of the work to be
	// executed.  Since this transform has no PCollection as an input, it is
	// considered a 'root transform'

	// A pipeline can have multiple root transforms
	moreLines := textio.Read(s, "protocol://other/path/file*.txt")

	// Further transforms can be applied, creating an arbitrary, acyclic graph.
	// Subsequent transforms (and the intermediate PCollections they produce) are
	// attached to the same pipeline.
	all := beam.Flatten(s, lines, moreLines)
	wordRegexp := regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)
	words := beam.ParDo(s, func(line string, emit func(string)) {
		for _, word := range wordRegexp.FindAllString(line, -1) {
			emit(word)
		}
	}, all)
	formatted := beam.ParDo(s, strings.ToUpper, words)
	textio.Write(s, "protocol://output/path", formatted)

	// Applying a transform adds it to the pipeline, rather than executing it
	// immediately.  Once the whole pipeline of transforms is constructed, the
	// pipeline can be executed by a PipelineRunner.  The direct runner executes the
	// transforms directly, sequentially, in this one process, which is useful for
	// unit tests and simple experiments:
	if _, err := direct.Execute(context.Background(), p); err != nil {
		fmt.Printf("Pipeline failed: %v", err)
	}
}
package main

import (
	"context"
	"regexp"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
)

func ctxWithPtransformID(id string) context.Context {
	ctx := context.Background()
	ctx = metrics.SetBundleID(ctx, "exampleBundle")
	ctx = metrics.SetPTransformID(ctx, id)
	return ctx
}

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func main() {

	// Metrics can be declared outside DoFns, and used inside..
	outside := beam.NewCounter("example.namespace", "count")

	extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
		// They can be defined at time of use within a DoFn, if necessary.
		inside := beam.NewDistribution("example.namespace", "characters")
		for _, word := range wordRE.FindAllString(line, -1) {
			emit(word)
			outside.Inc(ctx, 1)
			inside.Update(ctx, int64(len(word)))
		}
	}
	ctx := ctxWithPtransformID("example")
	extractWordsDofn(ctx, "this has six words in it", func(string) {})
	extractWordsDofn(ctx, "this has seven words in it, see?", func(string) {})

	metrics.DumpToOutFromContext(ctx)
}
Output:

PTransformID: "example"
	example.namespace.characters - count: 13 sum: 43 min: 2 max: 5
	example.namespace.count - value: 13
package main

import (
	"context"
	"regexp"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
)

// A beam_test global context var to improve how the examples look.
var ctx = context.Background()

var wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`)

func main() {

	// Metric proxies can be used in multiple DoFns
	c := beam.NewCounter("example.reusable", "count")

	extractWordsDofn := func(ctx context.Context, line string, emit func(string)) {
		for _, word := range wordRE.FindAllString(line, -1) {
			emit(word)
			c.Inc(ctx, 1)
		}
	}

	extractRunesDofn := func(ctx context.Context, line string, emit func(rune)) {
		for _, r := range line {
			emit(r)
			c.Inc(ctx, 1)
		}
	}
	ctx = metrics.SetBundleID(ctx, "exampleBundle")
	extractWordsDofn(metrics.SetPTransformID(ctx, "extract1"), "this has six words in it", func(string) {})

	extractRunesDofn(metrics.SetPTransformID(ctx, "extract2"), "seven thousand", func(rune) {})

	metrics.DumpToOutFromContext(ctx)
}
Output:

PTransformID: "extract1"
	example.reusable.count - value: 6
PTransformID: "extract2"
	example.reusable.count - value: 14

This section is empty.

These are the reflect.Type instances of the universal types, which are used when binding actual types to "generic" DoFns that use Universal Types.

EnableSchemas is a temporary configuration variable to use Beam Schema encoding by default instead of JSON. Before it is removed, it will be set to true by default and then eventually removed.

Only users who rely on default JSON marshalling behaviour should set this explicitly, and file an issue on the BEAM repo so the issue may be resolved. https://github.com/apache/beam/issues/new/choose

EventTimeType is the reflect.Type of EventTime.

PipelineOptions are global options for the active pipeline. Options can be defined any time before execution and are re-created by the harness on remote execution workers. Global options should be used sparingly.

CrossLanguage is a low-level transform for executing cross-language transforms written in other SDKs. Because this is low-level, it is recommended to use one of the higher-level IO-specific wrappers where available. These can be found in the pkg/beam/io/xlang subdirectory. CrossLanguage is useful for executing cross-language transforms which do not have any existing IO wrappers.

Usage requires an address for an expansion service accessible during pipeline construction, a URN identifying the desired transform, an optional payload with configuration information, and input and output names. It outputs a map of named output PCollections.

For more information on expansion services and other aspects of cross-language transforms in general, refer to the Beam programming guide: https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines

Payload

Payloads are configuration data that some cross-language transforms require for expansion. Consult the documentation of the transform in the source SDK to find out what payload data it requires. If no payload is required, pass in nil.

CrossLanguage accepts payloads as a []byte containing an encoded ExternalConfigurationPayload protobuf. The helper function beam.CrossLanguagePayload is the recommended way to easily encode a standard Go struct for use as a payload.

Inputs and Outputs

Like most transforms, any input PCollections must be provided. Unlike most transforms, output types must be provided because Go cannot infer output types from external transforms.

Inputs and outputs to a cross-language transform may be either named or unnamed. Named inputs/outputs are used when there are more than one input/output, and are provided as maps with names as keys. Unnamed inputs/outputs are used when there is only one, and a map can be quickly constructed with the UnnamedInput and UnnamedOutput methods.

An example of defining named inputs and outputs:

namedInputs := map[string]beam.PCollection{"pcol1": pcol1, "pcol2": pcol2}
namedOutputTypes := map[string]typex.FullType{
    "main": typex.New(reflectx.String),
    "side": typex.New(reflectx.Int64),
}

CrossLanguage outputs a map of PCollections with associated names. These names will match those from provided named outputs. If the beam.UnnamedOutput method was used, the PCollection can be retrieved with beam.UnnamedOutputTag().

An example of retrieving named outputs from a call to CrossLanguage:

outputs := beam.CrossLanguage(...)
mainPcol := outputs["main"]
sidePcol := outputs["side"]

Example

This example shows using CrossLanguage to execute the Prefix cross-language transform using an expansion service running on localhost:8099. Prefix requires a payload containing a prefix to prepend to every input string.

type prefixPayload struct {
    Data string
}
encodedPl := beam.CrossLanguagePayload(prefixPayload{Data: "foo"})
urn := "beam:transforms:xlang:test:prefix"
expansionAddr := "localhost:8099"
outputType := beam.UnnamedOutput(typex.New(reflectx.String))
input := beam.UnnamedInput(inputPcol)
outs := beam.CrossLanguage(s, urn, encodedPl, expansionAddr, input, outputType)
outPcol := outputs[beam.UnnamedOutputTag()]

Alternative Expansion Handlers

The xlangx.RegisterHandler function can be used to register alternative expansion handlers to a namespace, for use with this function. This allows for custom handling of expansion addresses or starting up expansion services automatically beneath the CrossLanguage call.

In addition, urns can be bound to specific expansion addresses, using xlangx.RegisterOverrideForUrn. This allows for testing specific overrides, or other custom implementations to be used instead.

To ignore overrides regardless of URN, wrapping the expansion address in a call to xlangx.Require, will force expansion using the given address.

func CrossLanguagePayload(pl any) []byte

CrossLanguagePayload encodes a native Go struct into a payload for cross-language transforms. payloads are []byte encoded ExternalConfigurationPayload protobufs. In order to fill the contents of the protobuf, the provided struct will be used to converted to a row encoded representation with an accompanying schema, so the input struct must be compatible with schemas.

See https://beam.apache.org/documentation/programming-guide/#schemas for basic information on schemas, and pkg/beam/core/runtime/graphx/schema for details on schemas in the Go SDK.

Example:

type stringPayload struct {
    Data string
}
encodedPl := beam.CrossLanguagePayload(stringPayload{Data: "foo"})

ExternalTagged defines an external PTransform, and allows re-specifying the tags for the input and output PCollections. The interpretation of this primitive is runner specific. The runner is responsible for parsing the payload based on the URN provided to implement the behavior of the operation. Transform libraries should expose an API that captures the user's intent and serialize the payload as a byte slice that the runner will deserialize.

Use ExternalTagged if the runner will need to associate the PTransforms local PCollection tags with values in the payload. Otherwise, prefer External.

Init is the hook that all user code must call after flags processing and other static initialization, for now.

Initialized exposes the initialization status for runners.

MustTaggedN returns the input, but panics if err != nil.

func NewPipelineWithRoot() (*Pipeline, Scope)

NewPipelineWithRoot creates a new empty pipeline and its root scope.

func ParDo0(s Scope, dofn any, col PCollection, opts ...Option)

ParDo0 inserts a ParDo with zero output transform into the pipeline.

func ParDo2(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection)

ParDo2 inserts a ParDo with 2 outputs into the pipeline.

func ParDo3(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection)

ParDo3 inserts a ParDo with 3 outputs into the pipeline.

func ParDo4(s Scope, dofn any, col PCollection, opts ...Option) (PCollection, PCollection, PCollection, PCollection)

ParDo4 inserts a ParDo with 4 outputs into the pipeline.

ParDo5 inserts a ParDo with 5 outputs into the pipeline.

ParDo6 inserts a ParDo with 6 outputs into the pipeline.

ParDo7 inserts a ParDo with 7 outputs into the pipeline.

RegisterCoder registers a user defined coder for a given type, and will be used if there is no existing beam coder for that type. Must be called prior to beam.Init(), preferably in an init() function.

The coder used for a given type follows this ordering:

  1. Coders for Known Beam types.
  2. Coders registered for specific types
  3. Coders registered for interfaces types
  4. Default coder (JSON)

Coders for interface types are iterated over to check if a type satisfies them, and the most recent one registered will be used.

Repeated registrations of the same type overrides prior ones.

RegisterCoder additionally registers the type, and coder functions as per RegisterType and RegisterFunction to avoid redundant calls.

Supported Encoder Signatures

func(T) []byte
func(reflect.Type, T) []byte
func(T) ([]byte, error)
func(reflect.Type, T) ([]byte, error)

Supported Decoder Signatures

func([]byte) T
func(reflect.Type, []byte) T
func([]byte) (T, error)
func(reflect.Type, []byte) (T, error)

where T is the matching user type.

func RegisterDoFn(dofn any)

RegisterDoFn is a convenience function to handle registering a DoFn and all related types. Use this instead of calling RegisterType or RegisterFunction. Like all the Register* functions, RegisterDoFn should be called in `init()` only.

In particular, it will call RegisterFunction for functional DoFns, and RegisterType for the parameter and return types for that function. StructuralDoFns will have RegisterType called for itself and the parameter and return types.

RegisterDoFn will panic if the argument type is not a DoFn.

Usage:

   func init() {
	    beam.RegisterDoFn(FunctionalDoFn)
	    beam.RegisterDoFn(reflect.TypeOf((*StructuralDoFn)(nil)).Elem())
   }
func RegisterFunction(fn any)

RegisterFunction allows function registration. It is beneficial for performance and is needed for functions -- such as custom coders -- serialized during unit tests, where the underlying symbol table is not available. It should be called in `init()` only.

func RegisterInit(hook func())

RegisterInit registers an Init hook. Hooks are expected to be able to figure out whether they apply on their own, notably if invoked in a remote execution environment. They are all executed regardless of the runner.

RegisterRunner associates the name with the supplied runner, making it available to execute a pipeline via Run.

RegisterSchemaProvider allows pipeline authors to provide special handling to convert types to schema representations, when those types are used as fields in types being encoded as schema rows.

At present, the only supported provider interface is SchemaProvider, though this may change in the future.

Providers only need to support a limited set of types for conversion, specifically a single struct type or a pointer to struct type, or an interface type, which they are registered with.

Providers have three tasks with respect to a given supported logical type:

  • Producing schema representative types for their logical types.
  • Producing schema encoders for values of that type, writing beam schema encoded bytes for a value, matching the schema representative type.
  • Producing schema decoders for values of that type, reading beam schema encoded bytes, and producing a value of that type.

Representative Schema types must be structs with only exported fields.

A provider should be thread safe, but it's not required that a produced encoder or decoder is thread safe, since a separate encoder or decoder will be used for simultaneously executed bundles.

If the supported type is an interface, that interface must have a non-empty method set. That is, it cannot be the empty interface.

RegisterSchemaProvider must be called before beam.Init(), and conventionally is called in a package init() function.

// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"bytes"
	"fmt"
	"io"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
	"github.com/google/go-cmp/cmp"
)

// RegisterSchemaProvider must be called before beam.Init, and conventionally in a package init block.
func init() {
	beam.RegisterSchemaProvider(reflect.TypeOf((*Alphabet)(nil)).Elem(), &AlphabetProvider{})
	// TODO(BEAM-9615): Registerying a self encoding type causes a cycle. Needs resolving.
	// beam.RegisterType(reflect.TypeOf((*Cyrillic)(nil)))
	beam.RegisterType(reflect.TypeOf((*Latin)(nil)))
	beam.RegisterType(reflect.TypeOf((*Ελληνικά)(nil)))
}

type Alphabet interface {
	alphabet() string
}

type Cyrillic struct {
	A, B int
}

func (*Cyrillic) alphabet() string {
	return "Cyrillic"
}

type Latin struct {
	// Unexported fields are not serializable by beam schemas by default
	// so we need to handle this ourselves.
	c uint64
	d *float32
}

func (*Latin) alphabet() string {
	return "Latin"
}

type Ελληνικά struct {
	q string
	G func() string
}

func (*Ελληνικά) alphabet() string {
	return "Ελληνικά"
}

// AlphabetProvider provides encodings for types that implement the Alphabet interface.
type AlphabetProvider struct {
	enc *coder.RowEncoderBuilder
	dec *coder.RowDecoderBuilder
}

var (
	typeCyrillic = reflect.TypeOf((*Cyrillic)(nil))
	typeLatin    = reflect.TypeOf((*Latin)(nil))
	typeΕλληνικά = reflect.TypeOf((*Ελληνικά)(nil))
)

func (p *AlphabetProvider) FromLogicalType(rt reflect.Type) (reflect.Type, error) {
	// FromLogicalType produces schema representative types, which match the encoders
	// and decoders that this function generates for this type.
	// While this example uses statically assigned schema representative types, it's
	// possible to generate the returned reflect.Type dynamically instead, using the
	// reflect package.
	switch rt {
	// The Cyrillic type is able to be encoded by default, so we simply use it directly
	// as it's own representative type.
	case typeCyrillic:
		return typeCyrillic, nil
	case typeLatin:
		// The Latin type only has unexported fields, so we need to make the equivalent
		// have exported fields.
		return reflect.TypeOf((*struct {
			C uint64
			D *float32
		})(nil)).Elem(), nil
	case typeΕλληνικά:
		return reflect.TypeOf((*struct{ Q string })(nil)).Elem(), nil
	}
	return nil, fmt.Errorf("Unknown Alphabet: %v", rt)
}

// BuildEncoder returns beam schema encoder functions for types with the Alphabet interface.
func (p *AlphabetProvider) BuildEncoder(rt reflect.Type) (func(any, io.Writer) error, error) {
	switch rt {
	case typeCyrillic:
		if p.enc == nil {
			p.enc = &coder.RowEncoderBuilder{}
		}
		// Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type.
		return p.enc.Build(rt)
	case typeLatin:
		return func(iface any, w io.Writer) error {
			v := iface.(*Latin)
			// Beam Schema Rows have a header that indicates which fields if any, are nil.
			if err := coder.WriteRowHeader(2, func(i int) bool {
				if i == 1 {
					return v.d == nil
				}
				return false
			}, w); err != nil {
				return err
			}
			// Afterwards, each field is encoded using the appropriate helper.
			if err := coder.EncodeVarUint64(v.c, w); err != nil {
				return err
			}
			// Nil fields have nothing written for them other than the header.
			if v.d != nil {
				if err := coder.EncodeDouble(float64(*v.d), w); err != nil {
					return err
				}
			}
			return nil
		}, nil
	case typeΕλληνικά:
		return func(iface any, w io.Writer) error {
			// Since the representation for Ελληνικά never has nil fields
			// we can use the simple header helper.
			if err := coder.WriteSimpleRowHeader(1, w); err != nil {
				return err
			}
			v := iface.(*Ελληνικά)
			if err := coder.EncodeStringUTF8(v.q, w); err != nil {
				return fmt.Errorf("decoding string field A: %v", err)
			}
			return nil
		}, nil
	}
	return nil, fmt.Errorf("Unknown Alphabet: %v", rt)
}

// BuildDecoder returns beam schema decoder functions for types with the Alphabet interface.
func (p *AlphabetProvider) BuildDecoder(rt reflect.Type) (func(io.Reader) (any, error), error) {
	switch rt {
	case typeCyrillic:
		if p.dec == nil {
			p.dec = &coder.RowDecoderBuilder{}
		}
		// Since Cyrillic is by default encodable, defer to the standard schema row decoder for the type.
		return p.dec.Build(rt)
	case typeLatin:
		return func(r io.Reader) (any, error) {
			// Since the d field can be nil, we use the header get the nil bits.
			n, nils, err := coder.ReadRowHeader(r)
			if err != nil {
				return nil, err
			}
			// Header returns the number of fields, so we check if it has what we
			// expect. This allows schemas to evolve if necessary.
			if n != 2 {
				return nil, fmt.Errorf("expected 2 fields, but got %v", n)
			}
			c, err := coder.DecodeVarUint64(r)
			if err != nil {
				return nil, err
			}
			// Check if the field is nil before trying to decode a value for it.
			var d *float32
			if !coder.IsFieldNil(nils, 1) {
				f, err := coder.DecodeDouble(r)
				if err != nil {
					return nil, err
				}
				f32 := float32(f)
				d = &f32
			}
			return &Latin{
				c: c,
				d: d,
			}, nil
		}, nil
	case typeΕλληνικά:
		return func(r io.Reader) (any, error) {
			// Since the representation for Ελληνικά never has nil fields
			// we can use the simple header helper. Returns an error if
			// something unexpected occurs.
			if err := coder.ReadSimpleRowHeader(1, r); err != nil {
				return nil, err
			}
			q, err := coder.DecodeStringUTF8(r)
			if err != nil {
				return nil, fmt.Errorf("decoding string field A: %v", err)
			}
			return &Ελληνικά{
				q: q,
			}, nil
		}, nil
	}
	return nil, nil
}

// Schema providers work on fields of schema encoded types.
type translation struct {
	C *Cyrillic
	L *Latin
	E *Ελληνικά
}

func main() {
	f := float32(42.789)
	want := translation{
		C: &Cyrillic{A: 123, B: 456},
		L: &Latin{c: 789, d: &f},
		E: &Ελληνικά{q: "testing"},
	}
	rt := reflect.TypeOf((*translation)(nil)).Elem()
	enc, err := coder.RowEncoderForStruct(rt)
	if err != nil {
		panic(err)
	}
	dec, err := coder.RowDecoderForStruct(rt)
	if err != nil {
		panic(err)
	}
	var buf bytes.Buffer
	if err := enc(want, &buf); err != nil {
		panic(err)
	}
	got, err := dec(&buf)
	if err != nil {
		panic(err)
	}
	if d := cmp.Diff(want, got,
		cmp.AllowUnexported(Latin{}, Ελληνικά{})); d != "" {
		fmt.Printf("diff in schema encoding translation: (-want,+got)\n%v\n", d)
	} else {
		fmt.Println("No diffs!")
	}
}
Output:

No diffs!

RegisterSchemaProviderWithURN is for internal use only. Users are recommended to use beam.RegisterSchemaProvider() instead. RegisterSchemaProviderWithURN registers a new schema provider for a new logical type defined in pkg/beam/model/pipeline_v1/schema.pb.go

RegisterSchemaProviderWithURN must be called before beam.Init(), and conventionally is called in a package init() function.

RegisterType inserts "external" types into a global type registry to bypass serialization and preserve full method information. It should be called in `init()` only. TODO(wcn): the canonical definition of "external" is in v1.proto. We need user facing copy for this important concept.

TryCrossLanguage coordinates the core functions required to execute the cross-language transform. See CrossLanguage for user documentation.

TryExternalTagged attempts to perform the work of ExternalTagged, returning an error indicating why the operation failed.

UnnamedInput is a helper function for passing single unnamed inputs to beam.CrossLanguage.

Example:

beam.CrossLanguage(s, urn, payload, addr, UnnamedInput(input), outputs)

UnnamedOutput is a helper function for passing single unnamed output types to beam.CrossLanguage. The associated output can be accessed with beam.UnnamedOutputTag.

Example:

resultMap := beam.CrossLanguage(s, urn, payload, addr, inputs, UnnamedOutput(output));
result := resultMap[beam.UnnamedOutputTag()]
func UnnamedOutputTag() string

UnnamedOutputTag provides the output tag used for an output passed to beam.UnnamedOutput. Needed to retrieve the unnamed output PCollection from the result of beam.CrossLanguage.

ValidateKVType panics if the type of the PCollection is not KV<A,B>. It returns (A,B).

ValidateNonCompositeType panics if the type of the PCollection is not a composite type. It returns the type.

BundleFinalization represents the parameter used to register callbacks to be run once the runner has durably persisted output for a bundle. See typex.BundleFinalization for more details.

Coder defines how to encode and decode values of type 'A' into byte streams. Coders are attached to PCollections of the same type. For PCollections consumed by GBK, the attached coders are required to be deterministic.

DecodeCoder decodes a coder. Any custom coder function symbol must be resolvable via the runtime.GlobalSymbolResolver. The types must be encodable.

func NewCoder(t FullType) Coder

NewCoder infers a Coder for any bound full type.

func (c Coder) IsValid() bool

IsValid returns true iff the Coder is valid. Any use of an invalid Coder will result in a panic.

func (c Coder) Type() FullType

Type returns the full type 'A' of elements the coder can encode and decode. 'A' must be a concrete full type, such as int or KV<int,string>.

Counter is a metric that can be incremented and decremented, and is aggregated by the sum.

Counters are safe to use in multiple bundles simultaneously, but not generally threadsafe. Your DoFn needs to manage the thread safety of Beam metrics for any additional concurrency it uses.

func NewCounter(namespace, name string) Counter

NewCounter returns the Counter with the given namespace and name.

Dec decrements the counter within by the given amount. The context must be provided by the framework, or the value will not be recorded.

package main

import (
	"context"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

// A beam_test global context var to improve how the examples look.
var ctx = context.Background()

func main() {
	c := beam.NewCounter("example", "size")
	c.Dec(ctx, int64(len("foobar")))
}

Inc increments the counter within by the given amount. The context must be provided by the framework, or the value will not be recorded.

package main

import (
	"context"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

// A beam_test global context var to improve how the examples look.
var ctx = context.Background()

func main() {
	c := beam.NewCounter("example", "size")
	c.Inc(ctx, int64(len("foobar")))
}

Distribution is a metric that records various statistics about the distribution of reported values.

Distributions are safe to use in multiple bundles simultaneously, but not generally threadsafe. Your DoFn needs to manage the thread safety of Beam metrics for any additional concurrency it uses.

func NewDistribution(namespace, name string) Distribution

NewDistribution returns the Distribution with the given namespace and name.

Update adds an observation to this distribution. The context must be provided by the framework, or the value will not be recorded.

package main

import (
	"context"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

// A beam_test global context var to improve how the examples look.
var ctx = context.Background()

func main() {
	t := time.Millisecond * 42
	d := beam.NewDistribution("example", "latency_micros")
	d.Update(ctx, int64(t/time.Microsecond))
}

ElementDecoder encapsulates being able to decode an element from a reader.

NewElementDecoder returns an ElementDecoder the given type.

ElementEncoder encapsulates being able to encode an element into a writer.

NewElementEncoder returns a new encoding function for the given type.

type EncodedCoder struct {
	
	Coder Coder
}

EncodedCoder is a serialization wrapper around a coder for convenience.

MarshalJSON returns the JSON encoding this value.

UnmarshalJSON sets the state of this instance from the passed in JSON.

EncodedFunc is a serialization wrapper around a function for convenience.

MarshalJSON returns the JSON encoding this value.

UnmarshalJSON sets the state of this instance from the passed in JSON.

EncodedType is a serialization wrapper around a type for convenience.

MarshalJSON returns the JSON encoding this value.

UnmarshalJSON sets the state of this instance from the passed in JSON.

EventTime represents the time of the event that generated an element. This is distinct from the time when an element is processed.

FullType represents the tree structure of data types processed by the graph. It allows representation of composite types, such as KV<int, string> or CoGBK<int, int>, as well as "generic" such types, KV<int,T> or CoGBK<X,Y>, where the free "type variables" are the fixed universal types: T, X, etc.

Gauge is a metric that can have its new value set, and is aggregated by taking the last reported value.

Gauge are safe to use in multiple bundles simultaneously, but not generally threadsafe. Your DoFn needs to manage the thread safety of Beam metrics for any additional concurrency it uses.

func NewGauge(namespace, name string) Gauge

NewGauge returns the Gauge with the given namespace and name.

Set sets the current value for this gauge. The context must be provided by the framework, or the value will not be recorded.

package main

import (
	"context"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

// A beam_test global context var to improve how the examples look.
var ctx = context.Background()

func main() {
	g := beam.NewGauge("example", "progress")
	g.Set(ctx, 42)
}

MetricResult represents a single metric value, for use in writing predicate functions to query PipelineResults.

type Option interface {
	
}

Option is an optional value or context to a transformation, used at pipeline construction time. The primary use case is providing side inputs.

type PCollection struct {
	
}

PCollection is an immutable collection of values of type 'A', which must be a concrete type, such as int or KV<int,string>. A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like textio.Read), and can be passed as the inputs of other PTransforms. Some root transforms produce bounded PCollections and others produce unbounded ones.

Each element in a PCollection has an associated timestamp. Sources assign timestamps to elements when they create PCollections, and other PTransforms propagate these timestamps from their input to their output implicitly or explicitly.

Additionally, each element is assigned to a set of windows. By default, all elements are assigned into a single default window, GlobalWindow.

func AddFixedKey(s Scope, col PCollection) PCollection

AddFixedKey adds a fixed key (0) to every element.

func CoGroupByKey(s Scope, cols ...PCollection) PCollection

CoGroupByKey inserts a CoGBK transform into the pipeline.

func Combine(s Scope, combinefn any, col PCollection, opts ...Option) PCollection

Combine inserts a global Combine transform into the pipeline. It expects a PCollection<T> as input where T is a concrete type. Combine supports TypeDefinition options for binding generic types in combinefn.

func CombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) PCollection

CombinePerKey inserts a GBK and per-key Combine transform into the pipeline. It expects a PCollection<KV<K,T>>. The CombineFn may optionally take a key parameter. CombinePerKey supports TypeDefinition options for binding generic types in combinefn.

func Create(s Scope, values ...any) PCollection

Create inserts a fixed non-empty set of values into the pipeline. The values must be of the same type 'A' and the returned PCollection is of type A.

The returned PCollections can be used as any other PCollections. The values are JSON-coded. Each runner may place limits on the sizes of the values and Create should generally only be used for small collections.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	beam.Create(s, 5, 6, 7, 8, 9)               // PCollection<int>
	beam.Create(s, []int{5, 6}, []int{7, 8, 9}) // PCollection<[]int>
	beam.Create(s, []int{5, 6, 7, 8, 9})        // PCollection<[]int>
	beam.Create(s, "a", "b", "c")               // PCollection<string>
}
func CreateList(s Scope, list any) PCollection

CreateList inserts a fixed set of values into the pipeline from a slice or array. Unlike Create this supports the creation of an empty PCollection.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	beam.CreateList(s, []int{5, 6, 7, 8, 9}) // PCollection<int>
}
func DropKey(s Scope, col PCollection) PCollection

DropKey drops the key for an input PCollection<KV<A,B>>. It returns a PCollection<B>.

func DropValue(s Scope, col PCollection) PCollection

DropValue drops the value for an input PCollection<KV<A,B>>. It returns a PCollection<A>.

func Explode(s Scope, col PCollection) PCollection

Explode is a PTransform that takes a single PCollection<[]A> and returns a PCollection<A> containing all the elements for each incoming slice.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	d := beam.Create(s, []int{1, 2, 3, 4, 5}) // PCollection<[]int>
	beam.Explode(s, d)                        // PCollection<int>
}

External defines a Beam external transform. The interpretation of this primitive is runner specific. The runner is responsible for parsing the payload based on the URN provided to implement the behavior of the operation. Transform libraries should expose an API that captures the user's intent and serialize the payload as a byte slice that the runner will deserialize.

Use ExternalTagged if the runner will need to associate the PTransforms local PCollection tags with values in the payload.

func Flatten(s Scope, cols ...PCollection) PCollection

Flatten is a PTransform that takes either multiple PCollections of type 'A' and returns a single PCollection of type 'A' containing all the elements in all the input PCollections. The name "Flatten" suggests taking a list of lists and flattening them into a single list.

By default, the Coder of the output PCollection is the same as the Coder of the first PCollection.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
)

var s = beam.Scope{}

func main() {
	a := textio.Read(s, "...some file path...") // PCollection<string>
	b := textio.Read(s, "...some other file path...")
	c := textio.Read(s, "...some third file path...")

	beam.Flatten(s, a, b, c) // PCollection<String>
}
func GroupByKey(s Scope, a PCollection) PCollection

GroupByKey is a PTransform that takes a PCollection of type KV<A,B>, groups the values by key and windows, and returns a PCollection of type GBK<A,B> representing a map from each distinct key and window of the input PCollection to an iterable over all the values associated with that key in the input per window. Each key in the output PCollection is unique within each window.

GroupByKey is analogous to converting a multi-map into a uni-map, and related to GROUP BY in SQL. It corresponds to the "shuffle" step between the Mapper and the Reducer in the MapReduce framework.

Two keys of type A are compared for equality by first encoding each of the keys using the Coder of the keys of the input PCollection, and then comparing the encoded bytes. This admits efficient parallel evaluation. Note that this requires that the Coder of the keys be deterministic.

By default, input and output PCollections share a key Coder and iterable values in the input and output PCollection share an element Coder.

GroupByKey is a key primitive in data-parallel processing, since it is the main way to efficiently bring associated data together into one location. It is also a key determiner of the performance of a data-parallel pipeline.

See CoGroupByKey for a way to group multiple input PCollections by a common key at once.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	type Doc struct{}
	var urlDocPairs beam.PCollection             // PCollection<KV<string, Doc>>
	urlToDocs := beam.GroupByKey(s, urlDocPairs) // PCollection<CoGBK<string, Doc>>

	// CoGBK parameters receive an iterator function with all values associated
	// with the same key.
	beam.ParDo0(s, func(key string, values func(*Doc) bool) {
		var cur Doc
		for values(&cur) {
			// ... process all docs having that url ...
		}
	}, urlToDocs) // PCollection<KV<string, []Doc>>
}
func Impulse(s Scope) PCollection

Impulse emits a single empty []byte into the global window. The resulting PCollection is a singleton of type []byte.

The purpose of Impulse is to trigger another transform, such as ones that take all information as side inputs.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	beam.Impulse(s) // PCollection<[]byte>
}
func ImpulseValue(s Scope, value []byte) PCollection

ImpulseValue emits the supplied byte slice into the global window. The resulting PCollection is a singleton of type []byte.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	beam.ImpulseValue(s, []byte{}) // PCollection<[]byte>
}

Must returns the input, but panics if err != nil.

MustN returns the input, but panics if err != nil.

func ParDo(s Scope, dofn any, col PCollection, opts ...Option) PCollection

ParDo is the core element-wise PTransform in Apache Beam, invoking a user-specified function on each of the elements of the input PCollection to produce zero or more output elements, all of which are collected into the output PCollection. Use one of the ParDo variants for a different number of output PCollections. The PCollections do not need to have the same types.

Elements are processed independently, and possibly in parallel across distributed cloud resources. The ParDo processing style is similar to what happens inside the "Mapper" or "Reducer" class of a MapReduce-style algorithm.

DoFns

The function to use to process each element is specified by a DoFn, either as single function or as a struct with methods, notably ProcessElement. The struct may also define Setup, StartBundle, FinishBundle and Teardown methods. The struct is JSON-serialized and may contain construction-time values.

Functions and types used as DoFns must be registered with beam using the beam `register` package, so they may execute on distributed workers. Functions must not be anonymous or closures, or they will fail at execution time.

Conceptually, when a ParDo transform is executed, the elements of the input PCollection are first divided up into some number of "bundles". These are farmed off to distributed worker machines (or locally on a local runner instance). For each bundle of input elements processing proceeds as follows:

  • If a struct, a fresh instance of the argument DoFn is created on a worker from json serialization, and the Setup method is called on this instance, if present. A runner may reuse DoFn instances for multiple bundles. A DoFn that has terminated abnormally (by returning an error) will never be reused.
  • The DoFn's StartBundle method, if provided, is called to initialize it.
  • The DoFn's ProcessElement method is called on each of the input elements in the bundle.
  • The DoFn's FinishBundle method, if provided, is called to complete its work. After FinishBundle is called, the framework will not again invoke ProcessElement or FinishBundle until a new call to StartBundle has occurred.
  • If any of Setup, StartBundle, ProcessElement or FinishBundle methods return an error, the Teardown method, if provided, will be called on the DoFn instance.
  • If a runner will no longer use a DoFn, the Teardown method, if provided, will be called on the discarded instance.

Each of the calls to any of the DoFn's processing methods can produce zero or more output elements. All of the of output elements from all of the DoFn instances are included in an output PCollection.

For example:

func stringLen(word string) int { return len(word)	}
func init() { register.Function1x1(stringLen) }

words := beam.ParDo(s, &Foo{...}, ...)
lengths := beam.ParDo(s, stringLen, words)

Each output element has the same timestamp and is in the same windows as its corresponding input element. The timestamp can be accessed and/or emitted by including a EventTime-typed parameter. The name of the function or struct is used as the DoFn name. Function literals do not have stable names and should thus not be used in production code.

Side Inputs

While a ParDo processes elements from a single "main input" PCollection, it can take additional "side input" PCollections. These SideInput along with the DoFn parameter form express styles of accessing PCollection computed by earlier pipeline operations, passed in to the ParDo transform using SideInput options, and their contents accessible to each of the DoFn operations. For example:

func filterLessThanCutoff(word string, cutoff int, emit func(string)) {
	if len(word) < cutoff {
		emit(word)
	}
}
func init() { register.Function3x0(filterLessThanCutoff) }

words := ...
cufoff := ...  // Singleton PCollection<int>
smallWords := beam.ParDo(s, filterLessThanCutoff, words, beam.SideInput{Input: cutoff})

Additional Outputs

Optionally, a ParDo transform can produce zero or multiple output PCollections. Note the use of ParDo2 to specfic 2 outputs. For example:

func partitionAtCutoff(word string, cutoff int, small, big func(string)) {
	if len(word) < cutoff {
		small(word)
	} else {
		big(word)
	}
}
func init() { register.Function4x0(partitionAtCutoff) }

words := ...
cufoff := ...  // Singleton PCollection<int>
small, big := beam.ParDo2(s, partitionAtCutoff, words, beam.SideInput{Input: cutoff})

By default, the Coders for the elements of each output PCollections is inferred from the concrete type.

No Global Shared State

There are three main ways to initialize the state of a DoFn instance processing a bundle:

  • Define public instance variable state. This state will be automatically JSON serialized and then deserialized in the DoFn instances created for bundles. This method is good for state known when the original DoFn is created in the main program, if it's not overly large. This is not suitable for any state which must only be used for a single bundle, as DoFn's may be used to process multiple bundles.

  • Compute the state as a singleton PCollection and pass it in as a side input to the DoFn. This is good if the state needs to be computed by the pipeline, or if the state is very large and so is best read from file(s) rather than sent as part of the DoFn's serialized state.

  • Initialize the state in each DoFn instance, in a StartBundle method. This is good if the initialization doesn't depend on any information known only by the main program or computed by earlier pipeline operations, but is the same for all instances of this DoFn for all program executions, say setting up empty caches or initializing constant data.

ParDo operations are intended to be able to run in parallel across multiple worker machines. This precludes easy sharing and updating mutable state across those machines. There is no support in the Beam model for communicating and synchronizing updates to shared state across worker machines, so programs should not access any mutable global variable state in their DoFn, without understanding that the Go processes for the main program and workers will each have its own independent copy of such state, and there won't be any automatic copying of that state across Java processes. All information should be communicated to DoFn instances via main and side inputs and serialized state, and all output should be communicated from a DoFn instance via output PCollections, in the absence of external communication mechanisms written by user code.

Splittable DoFns

Splittable DoFns are DoFns that are able to split work within an element, as opposed to only at element boundaries like normal DoFns. This is useful for DoFns that emit many outputs per input element and can distribute that work among multiple workers. The most common examples of this are sources.

In order to split work within an element, splittable DoFns use the concept of restrictions, which are objects that are associated with an element and describe a portion of work on that element. For example, a restriction associated with a filename might describe what byte range within that file to process. In addition to restrictions, splittable DoFns also rely on restriction trackers to track progress and perform splits on a restriction currently being processed. See the `RTracker` interface in core/sdf/sdf.go for more details.

Splitting

Splitting means taking one restriction and splitting into two or more that cover the entire input space of the original one. In other words, processing all the split restrictions should produce identical output to processing the original one.

Splitting occurs in two stages. The initial splitting occurs before any restrictions have started processing. This step is used to split large restrictions into smaller ones that can then be distributed among multiple workers for processing. Initial splitting is user-defined and optional.

Dynamic splitting occurs during the processing of a restriction in runners that have implemented it. If there are available workers, runners may split the unprocessed portion of work from a busy worker and shard it to available workers in order to better distribute work. With unsplittable DoFns this can only occur on element boundaries, but for splittable DoFns this split can land within a restriction and will require splitting that restriction.

  • Note: Dataflow is currently the only runner with support for both initial and dynamic splitting. Other runners do not support dynamic splitting, and stragglers will therefore not be split during execution with liquid sharding.

Splittable DoFn Methods

Making a splittable DoFn requires the following methods to be implemented on a DoFn in addition to the usual DoFn requirements. In the following method signatures `elem` represents the main input elements to the DoFn, and should match the types used in ProcessElement. `restriction` represents the user-defined restriction, and can be any type as long as it is consistent throughout all the splittable DoFn methods:

  • `CreateInitialRestriction(context.Context?, elem) (restriction, error?)` CreateInitialRestriction creates an initial restriction encompassing an entire element. The restriction created stays associated with the element it describes.
  • `SplitRestriction(context.Context?, elem, restriction) ([]restriction, error?)` SplitRestriction takes an element and its initial restriction, and optionally performs an initial split on it, returning a slice of all the split restrictions. If no splits are desired, the method returns a slice containing only the original restriction. This method will always be called on each newly created restriction before they are processed.
  • `RestrictionSize(context.Context?, elem, restriction) (float64, error?)` RestrictionSize returns a cheap size estimation for a restriction. This size is an abstract non-negative scalar value that represents how much work a restriction takes compared to other restrictions in the same DoFn. For example, a size of 200 represents twice as much work as a size of 100, but the numbers do not represent anything on their own. Size is used by runners to estimate work for dynamic work rebalancing. Must be thread safe. Will be invoked concurrently during bundle processing due to runner initiated splitting and progress estimation.
  • `CreateTracker(context.Context?, restriction) (restrictionTracker, error?)` CreateTracker creates and returns a restriction tracker (a concrete type implementing the `sdf.RTracker` interface) given a restriction. The restriction tracker is used to track progress processing a restriction, and to allow for dynamic splits. This method is called on each restriction right before processing begins.
  • `ProcessElement(context.Context?, sdf.RTracker, elem, func emit(output)) (sdf.ProcessContinuation?, error?)` For splittable DoFns, ProcessElement requires a restriction tracker before inputs, and generally requires emits to be used for outputs, since restrictions will generally produce multiple outputs. For more details on processing restrictions in a splittable DoFn, see `sdf.RTracker`. ProcessElement can optionally return a `sdf.ProcessContinuation` to signal to the runner that processing should be resumed at a later time, if not all data within the restriction can be processed within the lifetime of a single bundle. The runner tries to respect the resume time, however it is not guaranteed.

A splittable DoFn can also implement the following optional method:

  • `TruncateRestriction(context.Context?, sdf.RTracker, elem) (restriction, error?)` TruncateRestriction is triggered when a pipeline starts to drain on runners that support pipeline draining. It helps finish the pipeline faster by truncating the restriction. If not implemented, the default behavior for bounded restrictions is to process the remainder of the restriction, and for unbounded restrictions to process until the next SDF-initiated checkpoint or runner-initiated split occurs.

Fault Tolerance

In a distributed system, things can fail: machines can crash, machines can be unable to communicate across the network, etc. While individual failures are rare, the larger the job, the greater the chance that something, somewhere, will fail. Beam runners may strive to mask such failures by retrying failed DoFn bundles. This means that a DoFn instance might process a bundle partially, then crash for some reason, then be rerun (often as a new process) on that same bundle and on the same elements as before. Sometimes two or more DoFn instances will be running on the same bundle simultaneously, with the system taking the results of the first instance to complete successfully. Consequently, the code in a DoFn needs to be written such that these duplicate (sequential or concurrent) executions do not cause problems. If the outputs of a DoFn are a pure function of its inputs, then this requirement is satisfied. However, if a DoFn's execution has external side-effects, such as performing updates to external HTTP services, then the DoFn's code needs to take care to ensure that those updates are idempotent and that concurrent updates are acceptable. This property can be difficult to achieve, so it is advisable to strive to keep DoFns as pure functions as much as possible.

Optimization

Beam runners may choose to apply optimizations to a pipeline before it is executed. A key optimization, fusion, relates to ParDo operations. If one ParDo operation produces a PCollection that is then consumed as the main input of another ParDo operation, the two ParDo operations will be fused together into a single ParDo operation and run in a single pass; this is "producer-consumer fusion". Similarly, if two or more ParDo operations have the same PCollection main input, they will be fused into a single ParDo that makes just one pass over the input PCollection; this is "sibling fusion".

If after fusion there are no more unfused references to a PCollection (e.g., one between a producer ParDo and a consumer ParDo), the PCollection itself is "fused away" and won't ever be written to disk, saving all the I/O and space expense of constructing it.

When Beam runners apply fusion optimization, it is essentially "free" to write ParDo operations in a very modular, composable style, each ParDo operation doing one clear task, and stringing together sequences of ParDo operations to get the desired overall effect. Such programs can be easier to understand, easier to unit-test, easier to extend and evolve, and easier to reuse in new programs. The predefined library of PTransforms that come with Beam makes heavy use of this modular, composable style, trusting to the runner to "flatten out" all the compositions into highly optimized stages.

See https://beam.apache.org/documentation/programming-guide/#pardo for the web documentation for ParDo

Optionally, a ParDo transform can produce zero or multiple output PCollections. Note the use of ParDo2 to specify 2 outputs.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	var words beam.PCollection  // PCollection<string>
	var cutoff beam.PCollection // Singleton PCollection<int>
	small, big := beam.ParDo2(s, func(word string, cutoff int, small, big func(string)) {
		if len(word) < cutoff {
			small(word)
		} else {
			big(word)
		}
	}, words, beam.SideInput{Input: cutoff})

	_, _ = small, big
}
func ParDoN(s Scope, dofn any, col PCollection, opts ...Option) []PCollection

ParDoN inserts a ParDo with any number of outputs into the pipeline.

Partition takes a PCollection<T> and a PartitionFn, uses the PartitionFn to split the elements of the input PCollection into N partitions, and returns a []PCollection<T> that bundles N PCollection<T>s containing the split elements.

A PartitionFn has the signature `func(T) int.`

func lenToTen(s string) int {
	if len(s) > 9 {
		return 10
	}
	return len(s)
}

// Partition functions must be registered with Beam, and must not be closures.
func init() { register.Function1x1(lenToTen) }

// The number of partitions goes up to 11 since we can return 0 through 10
wordsByLength := beam.Partition(s, 11, lenToTen, inputStrings)

T is permitted to be a KV.

func Reshuffle(s Scope, col PCollection) PCollection

Reshuffle copies a PCollection of the same kind and using the same element coder, and maintains the same windowing information. Importantly, it allows the result PCollection to be processed with a different sharding, in a different stage than the input PCollection.

For example, if a computation needs a lot of parallelism but produces only a small amount of output data, then the computation producing the data can run with as much parallelism as needed, while the output file is written with a smaller amount of parallelism, using the following pattern:

pc := bigHairyComputationNeedingParallelism(scope) // PCollection<string>
resharded := beam.Reshuffle(scope, pc)                // PCollection<string>

Another use case is when one has a non-deterministic DoFn followed by one that performs externally-visible side effects. Inserting a Reshuffle between these DoFns ensures that retries of the second DoFn will always be the same, which is necessary to make side effects idempotent.

A Reshuffle will force a break in the optimized pipeline. Consequently, this operation should be used sparingly, only after determining that the pipeline without reshuffling is broken in some way and performing an extra operation is worth the cost.

func Seq(s Scope, col PCollection, dofns ...any) PCollection

Seq is a convenience helper to chain single-input/single-output ParDos together in a sequence.

package main

import (
	"math"
	"strconv"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
)

var s = beam.Scope{}

func main() {
	a := textio.Read(s, "...some file path...") // PCollection<string>

	beam.Seq(s, a,
		strconv.Atoi, // string to int
		func(i int) float64 { return float64(i) }, // int to float64
		math.Signbit, // float64 to bool
	) // PCollection<bool>
}
func SwapKV(s Scope, col PCollection) PCollection

SwapKV swaps the key and value for an input PCollection<KV<A,B>>. It returns a PCollection<KV<B,A>>.

func TryCoGroupByKey(s Scope, cols ...PCollection) (PCollection, error)

TryCoGroupByKey inserts a CoGBK transform into the pipeline. Returns an error on failure.

func TryCombine(s Scope, combinefn any, col PCollection, opts ...Option) (PCollection, error)

TryCombine attempts to insert a global Combine transform into the pipeline. It may fail for multiple reasons, notably that the combinefn is not valid or cannot be bound -- due to type mismatch, say -- to the incoming PCollections.

func TryCombinePerKey(s Scope, combinefn any, col PCollection, opts ...Option) (PCollection, error)

TryCombinePerKey attempts to insert a per-key Combine transform into the pipeline. It may fail for multiple reasons, notably that the combinefn is not valid or cannot be bound -- due to type mismatch, say -- to the incoming PCollection.

TryCreate inserts a fixed non-empty set of values into the pipeline. The values must be of the same type.

TryCreateList inserts a fixed set of values into the pipeline from a slice or array. The values must be of the same type. Unlike TryCreate this supports the creation of an empty PCollection.

TryExternal attempts to perform the work of External, returning an error indicating why the operation failed.

func TryFlatten(s Scope, cols ...PCollection) (PCollection, error)

TryFlatten merges incoming PCollections of type 'A' to a single PCollection of type 'A'. Returns an error indicating the set of PCollections that could not be flattened.

TryGroupByKey inserts a GBK transform into the pipeline. Returns an error on failure.

TryParDo attempts to insert a ParDo transform into the pipeline. It may fail for multiple reasons, notably that the dofn is not valid or cannot be bound -- due to type mismatch, say -- to the incoming PCollections.

TryReshuffle inserts a Reshuffle into the pipeline, and returns an error if the pcollection's unable to be reshuffled.

TryWindowInto attempts to insert a WindowInto transform.

WindowInto applies the windowing strategy to each element.

func (p PCollection) Coder() Coder

Coder returns the coder for the collection. The Coder is of type 'A'.

IsValid returns true iff the PCollection is valid and part of a Pipeline. Any use of an invalid PCollection will result in a panic.

SetCoder set the coder for the collection. The Coder must be of type 'A'.

func (p PCollection) Type() FullType

Type returns the full type 'A' of the elements. 'A' must be a concrete type, such as int or KV<int,string>.

PaneInfo represents a PaneInfo that provides information about current firing when triggers are used.

Pipeline manages a directed acyclic graph of primitive PTransforms, and the PCollections that the PTransforms consume and produce. Each Pipeline is self-contained and isolated from any other Pipeline. The Pipeline owns the PCollections and PTransforms and they can be used by that Pipeline only. Pipelines can safely be executed concurrently.

func NewPipeline() *Pipeline

NewPipeline creates a new empty pipeline.

Build validates the Pipeline and returns a lower-level representation for execution. It is called by runners only.

func (p *Pipeline) Root() Scope

Root returns the root scope of the pipeline.

PipelineResult is the result of beamx.RunWithMetrics.

Run executes the pipeline using the selected registred runner. It is customary to define a "runner" with no default as a flag to let users control runner selection.

SchemaProvider specializes schema handling for complex types, including conversion to a valid schema base type,

In particular, they are intended to handle schema for interface types.

Sepearated out the acting type from the provider implementation is good.

Scope is a hierarchical grouping for composite transforms. Scopes can be enclosed in other scopes and for a tree structure. For pipeline updates, the scope chain form a unique name. The scope chain can also be used for monitoring and visualization purposes.

func (s Scope) IsValid() bool

IsValid returns true iff the Scope is valid. Any use of an invalid Scope will result in a panic.

Scope returns a sub-scope with the given name. The name provided may be augmented to ensure uniqueness.

WithContext creates a named subscope with an attached context for the represented composite transform. Values from that context may be extracted and added to the composite PTransform or generate a new environment for scoped transforms.

If you're not sure whether these apply to your transform, use Scope instead, and do not set a context.

type SideInput struct {
	Input PCollection
}

SideInput provides a view of the given PCollection to the transformation.

package main

import (
	"github.com/apache/beam/sdks/v2/go/pkg/beam"
)

var s = beam.Scope{}

func main() {
	// words and sample are PCollection<string>
	var words, sample beam.PCollection
	// analyzeFn emits values from the primary based on the singleton side input.
	analyzeFn := func(primary string, side string, emit func(string)) {}
	// Use beam.SideInput to declare that the sample PCollection is the side input.
	beam.ParDo(s, analyzeFn, words, beam.SideInput{Input: sample})
}

T is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

TypeDefinition provides construction-time type information that the platform cannot infer, such as structured storage sources. These types are universal types that appear as output only. Types that are inferrable should not be conveyed via this mechanism.

U is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

V is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

W is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

Window represents the aggregation window of this element. An element can be a part of multiple windows, based on the element's event time.

type WindowIntoOption interface {
	
}

AllowedLateness configures for how long data may arrive after the end of a window.

func PanesAccumulate() WindowIntoOption

PanesAccumulate applies an Accumulating AccumulationMode to the window. After a pane fires, already processed elements will accumulate and elements will be repeated in subseqent firings for the window.

func PanesDiscard() WindowIntoOption

PanesDiscard applies a Discarding AccumulationMode to the window. After a pane fires, already processed elements will be discarded and not included in later firings for the window.

Trigger applies the given trigger to the window.

X is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

Y is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.

Z is a Universal Type used to represent "generic" types in DoFn and PCollection signatures. Each universal type is distinct from all others.