[BEAM-14539] Ensure that the print stream can handle larger byte arra… · apache/beam@999bcea

@@ -17,6 +17,7 @@

1717

*/

1818

package org.apache.beam.runners.dataflow.worker.logging;

191920+

import java.io.ByteArrayOutputStream;

2021

import java.io.IOException;

2122

import java.io.OutputStream;

2223

import java.io.PrintStream;

@@ -26,7 +27,6 @@

2627

import java.nio.charset.Charset;

2728

import java.nio.charset.CharsetDecoder;

2829

import java.nio.charset.CoderMalfunctionError;

29-

import java.nio.charset.CoderResult;

3030

import java.nio.charset.CodingErrorAction;

3131

import java.util.Formatter;

3232

import java.util.Locale;

@@ -35,6 +35,7 @@

3535

import java.util.logging.Level;

3636

import java.util.logging.LogRecord;

3737

import java.util.logging.Logger;

38+

import javax.annotation.concurrent.GuardedBy;

3839

import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;

39404041

/**

@@ -45,14 +46,16 @@

4546

"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)

4647

})

4748

class JulHandlerPrintStreamAdapterFactory {

48-

private static final AtomicBoolean outputWarning = new AtomicBoolean(false);

49+

private static final AtomicBoolean OUTPUT_WARNING = new AtomicBoolean(false);

50+51+

@VisibleForTesting

52+

static final String LOGGING_DISCLAIMER =

53+

String.format(

54+

"Please use a logger instead of System.out or System.err.%n"

55+

+ "Please switch to using org.slf4j.Logger.%n"

56+

+ "See: https://cloud.google.com/dataflow/pipelines/logging");

49575058

private static class JulHandlerPrintStream extends PrintStream {

51-

private static final String LOGGING_DISCLAIMER =

52-

String.format(

53-

"Please use a logger instead of System.out or System.err.%n"

54-

+ "Please switch to using org.slf4j.Logger.%n"

55-

+ "See: https://cloud.google.com/dataflow/pipelines/logging");

5659

// This limits the number of bytes which we buffer in case we don't have a flush.

5760

private static final int BUFFER_LIMIT = 1 << 10; // 1024 chars

5861

@@ -61,12 +64,19 @@ private static class JulHandlerPrintStream extends PrintStream {

61646265

private final Handler handler;

6366

private final String loggerName;

64-

private final StringBuilder buffer;

6567

private final Level messageLevel;

68+69+

@GuardedBy("this")

70+

private final StringBuilder buffer;

71+72+

@GuardedBy("this")

6673

private final CharsetDecoder decoder;

74+75+

@GuardedBy("this")

6776

private final CharBuffer decoded;

68-

private int carryOverBytes;

69-

private byte[] carryOverByteArray;

77+78+

@GuardedBy("this")

79+

private ByteArrayOutputStream carryOverBytes;

70807181

private JulHandlerPrintStream(

7282

Handler handler, String loggerName, Level logLevel, Charset charset)

@@ -90,8 +100,7 @@ public void write(int i) throws IOException {

90100

.newDecoder()

91101

.onMalformedInput(CodingErrorAction.REPLACE)

92102

.onUnmappableCharacter(CodingErrorAction.REPLACE);

93-

this.carryOverByteArray = new byte[6];

94-

this.carryOverBytes = 0;

103+

this.carryOverBytes = new ByteArrayOutputStream();

95104

this.decoded = CharBuffer.allocate(BUFFER_LIMIT);

96105

}

97106

@@ -129,47 +138,55 @@ public synchronized void write(int i) {

129138130139

@Override

131140

public void write(byte[] a, int offset, int length) {

141+

if (length == 0) {

142+

return;

143+

}

144+132145

ByteBuffer incoming = ByteBuffer.wrap(a, offset, length);

146+

assert incoming.hasArray();

147+148+

String msg = null;

133149

// Consume the added bytes, flushing on decoded newlines or if we hit

134150

// the buffer limit.

135-

String msg = null;

136-

synchronized (decoder) {

137-

decoded.clear();

138-

boolean flush = false;

151+

synchronized (this) {

152+

int startLength = buffer.length();

153+139154

try {

140155

// Process any remaining bytes from last time by adding a byte at a time.

141-

while (carryOverBytes > 0 && incoming.hasRemaining()) {

142-

carryOverByteArray[carryOverBytes++] = incoming.get();

143-

ByteBuffer wrapped = ByteBuffer.wrap(carryOverByteArray, 0, carryOverBytes);

156+

while (carryOverBytes.size() > 0 && incoming.hasRemaining()) {

157+

carryOverBytes.write(incoming.get());

158+

ByteBuffer wrapped =

159+

ByteBuffer.wrap(carryOverBytes.toByteArray(), 0, carryOverBytes.size());

144160

decoder.decode(wrapped, decoded, false);

145161

if (!wrapped.hasRemaining()) {

146-

carryOverBytes = 0;

162+

carryOverBytes.reset();

147163

}

148164

}

149-

carryOverBytes = 0;

150-

if (incoming.hasRemaining()) {

151-

CoderResult result = decoder.decode(incoming, decoded, false);

152-

if (result.isOverflow()) {

153-

flush = true;

154-

}

155-

// Keep the unread bytes.

156-

assert (incoming.remaining() <= carryOverByteArray.length);

157-

while (incoming.hasRemaining()) {

158-

carryOverByteArray[carryOverBytes++] = incoming.get();

159-

}

165+166+

// Append chunks while we are hitting the decoded buffer limit

167+

while (decoder.decode(incoming, decoded, false).isOverflow()) {

168+

decoded.flip();

169+

buffer.append(decoded);

170+

decoded.clear();

160171

}

161-

} catch (CoderMalfunctionError error) {

162-

decoder.reset();

163-

carryOverBytes = 0;

164-

error.printStackTrace();

165-

}

166-

decoded.flip();

167-

synchronized (this) {

168-

int startLength = buffer.length();

172+173+

// Append the partial chunk

174+

decoded.flip();

169175

buffer.append(decoded);

170-

if (flush || buffer.indexOf("\n", startLength) >= 0) {

176+

decoded.clear();

177+178+

// Check to see if we should output this message

179+

if (buffer.length() > BUFFER_LIMIT || buffer.indexOf("\n", startLength) >= 0) {

171180

msg = flushBufferToString();

172181

}

182+183+

// Keep all unread bytes.

184+

carryOverBytes.write(

185+

incoming.array(), incoming.arrayOffset() + incoming.position(), incoming.remaining());

186+

} catch (CoderMalfunctionError error) {

187+

decoder.reset();

188+

carryOverBytes.reset();

189+

error.printStackTrace();

173190

}

174191

}

175192

publishIfNonEmpty(msg);

@@ -386,7 +403,7 @@ private void publishIfNonEmpty(String message) {

386403

return;

387404

}

388405

if (logger.isLoggable(messageLevel)) {

389-

if (outputWarning.compareAndSet(false, true)) {

406+

if (OUTPUT_WARNING.compareAndSet(false, true)) {

390407

LogRecord log = new LogRecord(Level.WARNING, LOGGING_DISCLAIMER);

391408

log.setLoggerName(loggerName);

392409

handler.publish(log);

@@ -411,8 +428,7 @@ static PrintStream create(

411428

}

412429

}

413430414-

@VisibleForTesting

415431

static void reset() {

416-

outputWarning.set(false);

432+

OUTPUT_WARNING.set(false);

417433

}

418434

}