Softens the GBEK determinism requirement by damccorm · Pull Request #36495 · apache/beam

This drops the determinism requirement for GBEK coders from an error to a warning. This matches what GBK does today, which is important because users should be able to just drop in a --gbek pipeline option and have things just work.

Today, some of our built-in beam transforms fail with this left in. For example, without this change, testDataframeSum fails with:

java.lang.RuntimeException: Traceback (most recent call last):
  File "apache_beam/coders/coder_impl.py", line 540, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
  File "apache_beam/coders/coder_impl.py", line 460, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 481, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 544, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode 'BlockManager
Items: Index(['b'], dtype='object')
Axis 1: Index([100], dtype='int64', name='a')
NumpyBlock: slice(0, 1, 1), 1 x 1, dtype: int32' of type '<class 'pandas.core.internals.managers.BlockManager'>', please provide a type hint for the input of 'GroupByEncryptedKey Group by encrypted keyThe key coder is not deterministic. This may result in incorrect pipeline output. This can be fixed by adding a type hint to the operation preceding the GroupByKey step, and for custom key classes, by writing a deterministic custom Coder. Please see the documentation for more details.'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1673, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "/usr/local/lib/python3.13/site-packages/apache_beam/transforms/util.py", line 444, in process
    encoded_value = self.value_coder.encode(v)
  File "/usr/local/lib/python3.13/site-packages/apache_beam/coders/coders.py", line 459, in encode
    return self.get_impl().encode(value)
           ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^
  File "apache_beam/coders/coder_impl.py", line 237, in apache_beam.coders.coder_impl.StreamCoderImpl.encode
  File "apache_beam/coders/coder_impl.py", line 240, in apache_beam.coders.coder_impl.StreamCoderImpl.encode
  File "apache_beam/coders/coder_impl.py", line 1120, in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 481, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
  File "apache_beam/coders/coder_impl.py", line 542, in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_special_deterministic
TypeError: Unable to deterministically encode '     b
a     
100  3' of type '<class 'pandas.core.frame.DataFrame'>', please provide a type hint for the input of 'GroupByEncryptedKey Group by encrypted keyThe key coder is not deterministic. This may result in incorrect pipeline output. This can be fixed by adding a type hint to the operation preceding the GroupByKey step, and for custom key classes, by writing a deterministic custom Coder. Please see the documentation for more details.'

During handling of the above exception, another exception occurred:

I'd assume other dataframe tests fail similarly.

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.