graph package - github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph - Go Packages
Package graph is the internal representation of the Beam execution plan. This package is used by the public-facing Beam package to organize the user's pipeline into a connected graph structure. This graph is a precise, strongly-typed representation of the user's intent, and allows the runtime to verify typing of collections, and tracks the data dependency relationships to allow an optimizer to schedule the work.
- Constants
- Variables
- func Bounded(ns []*Node) bool
- func CoGBKMainInput(components int) func(*config)
- func IsLifecycleMethod(n string) bool
- func NewNamespaceGenerator(n int) func() string
- func NodeTypes(list []*Node) []typex.FullType
- func NumMainInputs(num mainInputs) func(*config)
- type CombineFn
- func (f *CombineFn) AddInputFn() *funcx.Fn
- func (f *CombineFn) CompactFn() *funcx.Fn
- func (f *CombineFn) CreateAccumulatorFn() *funcx.Fn
- func (f *CombineFn) ExtractOutputFn() *funcx.Fn
- func (f *CombineFn) MergeAccumulatorsFn() *funcx.Fn
- func (f *CombineFn) Name() string
- func (f *CombineFn) SetupFn() *funcx.Fn
- func (f *CombineFn) TeardownFn() *funcx.Fn
- type DoFn
- func (f *DoFn) Annotations() map[string][]byte
- func (f *DoFn) FinishBundleFn() *funcx.Fn
- func (f *DoFn) IsSplittable() bool
- func (f *DoFn) Name() string
- func (f *DoFn) OnTimerFn() (*funcx.Fn, bool)
- func (f *DoFn) PipelineState() []state.PipelineState
- func (f *DoFn) PipelineTimers() ([]timers.PipelineTimer, []string)
- func (f *DoFn) ProcessElementFn() *funcx.Fn
- func (f *DoFn) SetupFn() *funcx.Fn
- func (f *DoFn) StartBundleFn() *funcx.Fn
- func (f *DoFn) TeardownFn() *funcx.Fn
- type DynFn
- type ExpandedTransform
- type ExternalTransform
- type Fn
- type Graph
- func (g *Graph) Build() ([]*MultiEdge, []*Node, error)
- func (g *Graph) NewEdge(parent *Scope) *MultiEdge
- func (g *Graph) NewNode(t typex.FullType, w *window.WindowingStrategy, bounded bool) *Node
- func (g *Graph) NewScope(parent *Scope, name string) *Scope
- func (g *Graph) Root() *Scope
- func (g *Graph) String() string
- type Inbound
- type InputKind
- type MultiEdge
- func NewCoGBK(g *Graph, s *Scope, ns []*Node) (*MultiEdge, error)
- func NewCombine(g *Graph, s *Scope, u *CombineFn, in *Node, ac *coder.Coder, ...) (*MultiEdge, error)
- func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound, outs []*Outbound) (*MultiEdge, func(*Node, bool))
- func NewExternal(g *Graph, s *Scope, payload *Payload, in []*Node, out []typex.FullType, ...) *MultiEdge
- func NewFlatten(g *Graph, s *Scope, in []*Node) (*MultiEdge, error)
- func NewImpulse(g *Graph, s *Scope, value []byte) *MultiEdge
- func NewParDo(g *Graph, s *Scope, u *DoFn, in []*Node, rc *coder.Coder, ...) (*MultiEdge, error)
- func NewReshuffle(g *Graph, s *Scope, in *Node) (*MultiEdge, error)
- func NewTaggedExternal(g *Graph, s *Scope, payload *Payload, ins []*Inbound, outs []*Outbound, ...) *MultiEdge
- func NewWindowInto(g *Graph, s *Scope, ws *window.WindowingStrategy, in *Node) *MultiEdge
- type Node
- type Opcode
- type Outbound
- type Payload
- type Scope
- type SplittableDoFn
- func (f *SplittableDoFn) CreateInitialRestrictionFn() *funcx.Fn
- func (f *SplittableDoFn) CreateTrackerFn() *funcx.Fn
- func (f *SplittableDoFn) CreateWatermarkEstimatorFn() *funcx.Fn
- func (f *SplittableDoFn) HasTruncateRestriction() bool
- func (f *SplittableDoFn) InitialWatermarkEstimatorStateFn() *funcx.Fn
- func (f *SplittableDoFn) IsStatefulWatermarkEstimating() bool
- func (f *SplittableDoFn) IsWatermarkEstimating() bool
- func (f *SplittableDoFn) Name() string
- func (f *SplittableDoFn) RestrictionSizeFn() *funcx.Fn
- func (f *SplittableDoFn) RestrictionT() reflect.Type
- func (f *SplittableDoFn) SplitRestrictionFn() *funcx.Fn
- func (f *SplittableDoFn) TruncateRestrictionFn() *funcx.Fn
- func (f *SplittableDoFn) WatermarkEstimatorStateFn() *funcx.Fn
- func (f *SplittableDoFn) WatermarkEstimatorStateT() reflect.Type
- func (f *SplittableDoFn) WatermarkEstimatorT() reflect.Type
const ( MainUnknown mainInputs = -1 MainSingle mainInputs = 1 MainKv mainInputs = 2 )
The following constants prefixed with "Main" represent valid numbers of DoFn main inputs for DoFn construction and validation.
CombinePerKeyScope is the Go SDK canonical name for the combine composite scope. With Beam Portability, "primitive" composite transforms like combine have their URNs & payloads attached to a high level scope, with a default representation beneath. The use of this const permits the translation layer to confirm the SDK expects this combine to be liftable by a runner and should set this scope's URN and Payload accordingly.
func CoGBKMainInput ¶
func CoGBKMainInput(components int) func(*config)
CoGBKMainInput is an optional config to NewDoFn which specifies the number of components of a CoGBK input to the DoFn being created, allowing for more complete validation.
Example usage:
var col beam.PCollection graph.NewDoFn(fn, graph.CoGBKMainInput(len(col.Type().Components())))
IsLifecycleMethod return true if the passed in string is one of the lifecycle method names used by the Go SDK as DoFn or CombineFn lifecycle methods. These are the only methods that need shims generated for them.
NewNamespaceGenerator returns a functions that generates a random string of n alphabets
NodeTypes returns the fulltypes of the supplied slice of nodes.
func NumMainInputs ¶
func NumMainInputs(num mainInputs) func(*config)
NumMainInputs is an optional config to NewDoFn which specifies the number of main inputs to the DoFn being created, allowing for more complete validation. Valid inputs are the package constants of type mainInputs.
Example usage:
graph.NewDoFn(fn, graph.NumMainInputs(graph.MainKv))
CombineFn represents a CombineFn.
NewCombineFn constructs a CombineFn from the given value, if possible.
AddInputFn returns the "AddInput" function, if present.
CompactFn returns the "Compact" function, if present.
CreateAccumulatorFn returns the "CreateAccumulator" function, if present.
ExtractOutputFn returns the "ExtractOutput" function, if present.
MergeAccumulatorsFn returns the "MergeAccumulators" function. If it is the only method present, then InputType == AccumulatorType == OutputType.
Name returns the name of the function or struct.
SetupFn returns the "Setup" function, if present.
TeardownFn returns the "Teardown" function, if present.
DoFn represents a DoFn.
AsDoFn converts a Fn to a DoFn, if possible. numMainIn specifies how many main inputs are expected in the DoFn's method signatures. Valid inputs are the package constants of type mainInputs. If that number is MainUnknown then validation is done by best effort and may miss some edge cases.
NewDoFn constructs a DoFn from the given value, if possible.
Annotations returns the optional annotations of the DoFn, if present.
FinishBundleFn returns the "FinishBundle" function, if present.
IsSplittable returns whether the DoFn is a valid Splittable DoFn.
Name returns the name of the function or struct.
OnTimerFn return the "OnTimer" function and a bool indicating whether the function is defined or not for the DoFn.
PipelineState returns a list of PipelineState objects used to access/mutate global pipeline state (if any).
PipelineTimers returns the list of PipelineTimer objects defined for the DoFn.
SetupFn returns the "Setup" function, if present.
StartBundleFn returns the "StartBundle" function, if present.
TeardownFn returns the "Teardown" function, if present.
DynFn is a generator for dynamically-created functions:
gen: (name string, t reflect.Type, []byte) -> func : T
where the generated function, fn : T, is re-created at runtime. This concept allows serialization of dynamically-generated functions, which do not have a valid (unique) symbol such as one created via reflect.MakeFunc.
type ExpandedTransform ¶
ExpandedTransform stores the expansion response associated to each ExternalTransform.
Components and Transform fields are purposely typed as any to avoid unnecesary proto related imports into graph.
ExternalTransform represents the cross-language transform in and out of pipeline graph. It is associated with each MultiEdge and it's Inbound and Outbound links. It also stores the associated expansion response within the Expanded field.
WithNamedInputs adds a map (tag -> index of Inbound in MultiEdge.Input) of named inputs corresponsing to ExternalTransform's InputsMap
WithNamedOutputs adds a map (tag -> index of Outbound in MultiEdge.Output) of named outputs corresponsing to ExternalTransform's OutputsMap
Fn holds either a function or struct receiver.
NewFn pre-processes a function, dynamic function or struct for graph construction.
Name returns the name of the function or struct.
Graph represents an in-progress deferred execution graph and is easily translatable to the model graph. This graph representation allows precise control over scope and connectivity.
New returns an empty graph with the scope set to the root.
Build performs finalization on the graph. It verifies the correctness of the graph structure, typechecks the plan and returns a slice of the edges in the graph.
NewEdge creates a new edge of the graph in the supplied scope.
NewNode creates a new node in the graph of the supplied fulltype.
NewScope creates and returns a new scope that is a child of the supplied scope.
Inbound represents an inbound data link from a Node.
NamedInboundLinks returns an array of new Inbound links and a map (tag -> index of Inbound in MultiEdge.Input) of corresponding indices with respect to their names.
InputKind represents the role of the input and its shape.
const ( Main InputKind = "Main" Singleton InputKind = "Singleton" Slice InputKind = "Slice" Map InputKind = "Map" MultiMap InputKind = "MultiMap" Iter InputKind = "Iter" ReIter InputKind = "ReIter" )
Valid input kinds.
Bind returns the inbound, outbound and underlying output types for a Fn, when bound to the underlying input types. The complication of bind is primarily that UserFns have loose signatures and bind must produce valid type information for the execution plan.
For example,
func (t EventTime, k typex.X, v int, emit func(string, typex.X))
or
func (context.Context, k typex.X, v int) (string, typex.X, error)
are UserFns that may take one or two incoming fulltypes: either KV<X,int> or X with a singleton side input of type int. For the purpose of the shape of data processing, the two forms are equivalent. The non-data types, context.Context and error, are not part of the data signature, but in play only at runtime.
If either was bound to the input type [KV<string,int>], bind would return:
inbound: [Main: KV<X,int>] outbound: [KV<string,X>] output: [KV<string,string>]
Note that it propagates the assignment of X to string in the output type.
If either was instead bound to the input fulltypes [float, int], the result would be:
inbound: [Main: X, Singleton: int] outbound: [KV<string,X>] output: [KV<string, float>]
Here, the inbound shape and output types are different from before.
MultiEdge represents a primitive data processing operation. Each non-user code operation may be implemented by either the harness or the runner.
NewCoGBK inserts a new CoGBK edge into the graph.
NewCombine inserts a new Combine edge into the graph. Combines cannot have side input.
func NewCrossLanguage(g *Graph, s *Scope, ext *ExternalTransform, ins []*Inbound, outs []*Outbound) (*MultiEdge, func(*Node, bool))
NewCrossLanguage inserts a Cross-langugae External transform using initialized input and output nodes
NewExternal inserts an External transform. The system makes no assumptions about what this transform might do.
NewFlatten inserts a new Flatten edge in the graph. Flatten output type is the shared input type.
NewImpulse inserts a new Impulse edge into the graph. It must use the built-in bytes coder.
NewParDo inserts a new ParDo edge into the graph.
NewReshuffle inserts a new Reshuffle edge into the graph.
func NewTaggedExternal(g *Graph, s *Scope, payload *Payload, ins []*Inbound, outs []*Outbound, bounded bool) *MultiEdge
NewTaggedExternal inserts an External transform with tagged inbound and outbound connections. The system makes no assumptions about what this transform might do.
NewWindowInto inserts a new WindowInto edge into the graph.
ID returns the graph-local identifier for the edge.
Name returns a not-necessarily-unique name for the edge.
Node is a typed connector describing the data type and encoding. A node may have multiple inbound and outbound connections. The underlying type must be a complete type, i.e., not include any type variables.
ID returns the graph-local identifier for the node.
Type returns the underlying full type of the data, such as KV<int,string>.
WindowingStrategy returns the window applied to the data.
Opcode represents a primitive Beam instruction kind.
Outbound represents an outbound data link to a Node.
NamedOutboundLinks returns an array of new Outbound links and a map (tag -> index of Outbound in MultiEdge.Output) of corresponding indices with respect to their names.
Payload represents an external payload.
Scope is a syntactic Scope, such as arising from a composite Transform. It has no semantic meaning at execution time. Used by monitoring.
ID returns the graph-local identifier for the scope.
SplittableDoFn represents a DoFn implementing SDF methods.
func (f *SplittableDoFn) CreateInitialRestrictionFn() *funcx.Fn
CreateInitialRestrictionFn returns the "CreateInitialRestriction" function, if present.
CreateTrackerFn returns the "CreateTracker" function, if present.
func (f *SplittableDoFn) CreateWatermarkEstimatorFn() *funcx.Fn
CreateWatermarkEstimatorFn returns the "createWatermarkEstimator" function, if present
func (f *SplittableDoFn) HasTruncateRestriction() bool
HasTruncateRestriction returns whether the DoFn implements a custom truncate restriction function.
func (f *SplittableDoFn) InitialWatermarkEstimatorStateFn() *funcx.Fn
InitialWatermarkEstimatorStateFn returns the "InitialWatermarkEstimatorState" function, if present
func (f *SplittableDoFn) IsStatefulWatermarkEstimating() bool
IsStatefulWatermarkEstimating returns whether the DoFn implements custom watermark state.
func (f *SplittableDoFn) IsWatermarkEstimating() bool
IsWatermarkEstimating returns whether the DoFn implements a custom watermark estimator.
Name returns the name of the function or struct.
RestrictionSizeFn returns the "RestrictionSize" function, if present.
RestrictionT returns the restriction type from the SDF.
SplitRestrictionFn returns the "SplitRestriction" function, if present.
func (f *SplittableDoFn) TruncateRestrictionFn() *funcx.Fn
TruncateRestrictionFn returns the "TruncateRestriction" function, if present.
func (f *SplittableDoFn) WatermarkEstimatorStateFn() *funcx.Fn
WatermarkEstimatorStateFn returns the "WatermarkEstimatorState" function, if present
WatermarkEstimatorStateT returns the type of the watermark estimator state from the SDF
WatermarkEstimatorT returns the type of the watermark estimator from the SDF