[BEAM-14539] Ensure that the print stream can handle larger byte arra… · apache/beam@999bcea
@@ -17,6 +17,7 @@
1717 */
1818package org.apache.beam.runners.dataflow.worker.logging;
191920+import java.io.ByteArrayOutputStream;
2021import java.io.IOException;
2122import java.io.OutputStream;
2223import java.io.PrintStream;
@@ -26,7 +27,6 @@
2627import java.nio.charset.Charset;
2728import java.nio.charset.CharsetDecoder;
2829import java.nio.charset.CoderMalfunctionError;
29-import java.nio.charset.CoderResult;
3030import java.nio.charset.CodingErrorAction;
3131import java.util.Formatter;
3232import java.util.Locale;
@@ -35,6 +35,7 @@
3535import java.util.logging.Level;
3636import java.util.logging.LogRecord;
3737import java.util.logging.Logger;
38+import javax.annotation.concurrent.GuardedBy;
3839import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
39404041/**
@@ -45,14 +46,16 @@
4546"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
4647})
4748class JulHandlerPrintStreamAdapterFactory {
48-private static final AtomicBoolean outputWarning = new AtomicBoolean(false);
49+private static final AtomicBoolean OUTPUT_WARNING = new AtomicBoolean(false);
50+51+@VisibleForTesting
52+static final String LOGGING_DISCLAIMER =
53+String.format(
54+"Please use a logger instead of System.out or System.err.%n"
55+ + "Please switch to using org.slf4j.Logger.%n"
56+ + "See: https://cloud.google.com/dataflow/pipelines/logging");
49575058private static class JulHandlerPrintStream extends PrintStream {
51-private static final String LOGGING_DISCLAIMER =
52-String.format(
53-"Please use a logger instead of System.out or System.err.%n"
54- + "Please switch to using org.slf4j.Logger.%n"
55- + "See: https://cloud.google.com/dataflow/pipelines/logging");
5659// This limits the number of bytes which we buffer in case we don't have a flush.
5760private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars
5861@@ -61,12 +64,19 @@ private static class JulHandlerPrintStream extends PrintStream {
61646265private final Handler handler;
6366private final String loggerName;
64-private final StringBuilder buffer;
6567private final Level messageLevel;
68+69+@GuardedBy("this")
70+private final StringBuilder buffer;
71+72+@GuardedBy("this")
6673private final CharsetDecoder decoder;
74+75+@GuardedBy("this")
6776private final CharBuffer decoded;
68-private int carryOverBytes;
69-private byte[] carryOverByteArray;
77+78+@GuardedBy("this")
79+private ByteArrayOutputStream carryOverBytes;
70807181private JulHandlerPrintStream(
7282Handler handler, String loggerName, Level logLevel, Charset charset)
@@ -90,8 +100,7 @@ public void write(int i) throws IOException {
90100 .newDecoder()
91101 .onMalformedInput(CodingErrorAction.REPLACE)
92102 .onUnmappableCharacter(CodingErrorAction.REPLACE);
93-this.carryOverByteArray = new byte[6];
94-this.carryOverBytes = 0;
103+this.carryOverBytes = new ByteArrayOutputStream();
95104this.decoded = CharBuffer.allocate(BUFFER_LIMIT);
96105 }
97106@@ -129,47 +138,55 @@ public synchronized void write(int i) {
129138130139@Override
131140public void write(byte[] a, int offset, int length) {
141+if (length == 0) {
142+return;
143+ }
144+132145ByteBuffer incoming = ByteBuffer.wrap(a, offset, length);
146+assert incoming.hasArray();
147+148+String msg = null;
133149// Consume the added bytes, flushing on decoded newlines or if we hit
134150// the buffer limit.
135-String msg = null;
136-synchronized (decoder) {
137-decoded.clear();
138-boolean flush = false;
151+synchronized (this) {
152+int startLength = buffer.length();
153+139154try {
140155// Process any remaining bytes from last time by adding a byte at a time.
141-while (carryOverBytes > 0 && incoming.hasRemaining()) {
142-carryOverByteArray[carryOverBytes++] = incoming.get();
143-ByteBuffer wrapped = ByteBuffer.wrap(carryOverByteArray, 0, carryOverBytes);
156+while (carryOverBytes.size() > 0 && incoming.hasRemaining()) {
157+carryOverBytes.write(incoming.get());
158+ByteBuffer wrapped =
159+ByteBuffer.wrap(carryOverBytes.toByteArray(), 0, carryOverBytes.size());
144160decoder.decode(wrapped, decoded, false);
145161if (!wrapped.hasRemaining()) {
146-carryOverBytes = 0;
162+carryOverBytes.reset();
147163 }
148164 }
149-carryOverBytes = 0;
150-if (incoming.hasRemaining()) {
151-CoderResult result = decoder.decode(incoming, decoded, false);
152-if (result.isOverflow()) {
153-flush = true;
154- }
155-// Keep the unread bytes.
156-assert (incoming.remaining() <= carryOverByteArray.length);
157-while (incoming.hasRemaining()) {
158-carryOverByteArray[carryOverBytes++] = incoming.get();
159- }
165+166+// Append chunks while we are hitting the decoded buffer limit
167+while (decoder.decode(incoming, decoded, false).isOverflow()) {
168+decoded.flip();
169+buffer.append(decoded);
170+decoded.clear();
160171 }
161- } catch (CoderMalfunctionError error) {
162-decoder.reset();
163-carryOverBytes = 0;
164-error.printStackTrace();
165- }
166-decoded.flip();
167-synchronized (this) {
168-int startLength = buffer.length();
172+173+// Append the partial chunk
174+decoded.flip();
169175buffer.append(decoded);
170-if (flush || buffer.indexOf("\n", startLength) >= 0) {
176+decoded.clear();
177+178+// Check to see if we should output this message
179+if (buffer.length() > BUFFER_LIMIT || buffer.indexOf("\n", startLength) >= 0) {
171180msg = flushBufferToString();
172181 }
182+183+// Keep all unread bytes.
184+carryOverBytes.write(
185+incoming.array(), incoming.arrayOffset() + incoming.position(), incoming.remaining());
186+ } catch (CoderMalfunctionError error) {
187+decoder.reset();
188+carryOverBytes.reset();
189+error.printStackTrace();
173190 }
174191 }
175192publishIfNonEmpty(msg);
@@ -386,7 +403,7 @@ private void publishIfNonEmpty(String message) {
386403return;
387404 }
388405if (logger.isLoggable(messageLevel)) {
389-if (outputWarning.compareAndSet(false, true)) {
406+if (OUTPUT_WARNING.compareAndSet(false, true)) {
390407LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER);
391408log.setLoggerName(loggerName);
392409handler.publish(log);
@@ -411,8 +428,7 @@ static PrintStream create(
411428 }
412429 }
413430414-@VisibleForTesting
415431static void reset() {
416-outputWarning.set(false);
432+OUTPUT_WARNING.set(false);
417433 }
418434}