Fix PulsarIO by Abacn · Pull Request #36141 · apache/beam

@Abacn

The main issue for current PulsarIO.read is

* It is based on Pulsar Reader instead of PulsarConsumer, which then
  do not support acknowledgement

* The while() block in reader DoFn would never return until topic termination,
  this basically means pipeline stuck

* The restriction is on publishTime, and tryClaim assumes its ordering.
  This is not true. reader returning message is ordered on messageId.
  This is a wrong choice. Currently unresolved

* PulsarMessage's coder implementation dropped message. This causes
  Data loss if the PulsarIO.read do not follow an immediate mapping

* Tests are defunct and errors are suppressed, making them succeed spuriously

Current PulsarIO.write is even more primitive. Pipeline expansion actually fails. It is not idempotent.

Major fixes include

* Allow Pulsar reader to have a timeout

* Fix PulsarMessage and coder to include serializable fields from message

* Fix mock client/reader and add a full read pipeline in test

* Fix issues prevent PulsarIO.write from expanding. now it works minimally,
  that is publish every message received (at least once).

* Working integration tests for read and write

This has made PulsarIO.read minimally functionable. Although it won't split
and can only run single thread.

Going forward, we should re-implement reader DoFn based on Pulsar consumer.
Thoughs rename the current DoFn to "NaiveReadFromPulsarDoFn"