Fix PulsarIO by Abacn · Pull Request #36141 · apache/beam
The main issue for current PulsarIO.read is * It is based on Pulsar Reader instead of PulsarConsumer, which then do not support acknowledgement * The while() block in reader DoFn would never return until topic termination, this basically means pipeline stuck * The restriction is on publishTime, and tryClaim assumes its ordering. This is not true. reader returning message is ordered on messageId. This is a wrong choice. Currently unresolved * PulsarMessage's coder implementation dropped message. This causes Data loss if the PulsarIO.read do not follow an immediate mapping * Tests are defunct and errors are suppressed, making them succeed spuriously Current PulsarIO.write is even more primitive. Pipeline expansion actually fails. It is not idempotent. Major fixes include * Allow Pulsar reader to have a timeout * Fix PulsarMessage and coder to include serializable fields from message * Fix mock client/reader and add a full read pipeline in test * Fix issues prevent PulsarIO.write from expanding. now it works minimally, that is publish every message received (at least once). * Working integration tests for read and write This has made PulsarIO.read minimally functionable. Although it won't split and can only run single thread. Going forward, we should re-implement reader DoFn based on Pulsar consumer. Thoughs rename the current DoFn to "NaiveReadFromPulsarDoFn"