Pool table instances to make PureJavaSnappy thread safe by bokken · Pull Request #271 · xerial/snappy-java

@@ -1,26 +1,41 @@ package org.xerial.snappy.pure;
import org.xerial.snappy.SnappyApi; import static org.xerial.snappy.pure.UnsafeUtil.getAddress; import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET;
import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentLinkedDeque;
import static org.xerial.snappy.pure.UnsafeUtil.getAddress; import static sun.misc.Unsafe.ARRAY_BYTE_BASE_OFFSET; import org.xerial.snappy.SnappyApi;
/** * A pure-java Snappy implementation using https://github.com/airlift/aircompressor */ public class PureJavaSnappy implements SnappyApi { private final short[] table = new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE]; /** * Using a {@link ConcurrentLinkedDeque}, with values constantly popped and pushed from the head, leads to the fewest * {@code short[]} instances remaining live over time. */ private final static ConcurrentLinkedDeque<SoftReference<short[]>> CACHED_TABLES = new ConcurrentLinkedDeque<>();
private final static int MAX_OUTPUT_LENGTH = Integer.MAX_VALUE;
@Override public long rawCompress(long inputAddr, long inputSize, long destAddr) throws IOException { return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table); final short[] table = getTable(); try { return SnappyRawCompressor.compress(null, inputAddr, inputSize, null, destAddr, MAX_OUTPUT_LENGTH, table); } finally { returnTable(table); } }
@Override Expand Down Expand Up @@ -76,16 +91,24 @@ else if (compressed.hasArray()) { // collected in a block, and technically, the JVM is allowed to eliminate these locks. synchronized (input) { synchronized (compressed) { int written = SnappyRawCompressor.compress( inputBase, inputAddress, inputLimit, outputBase, outputAddress, outputLimit, table); compressed.position(compressed.position() + written); return written; final short[] table = getTable(); try { int written = SnappyRawCompressor.compress( inputBase, inputAddress, inputLimit, outputBase, outputAddress, outputLimit, table); compressed.position(compressed.position() + written); return written; } finally { returnTable(table); } } } } Expand All @@ -99,7 +122,15 @@ public int rawCompress(Object input, int inputOffset, int inputByteLength, Objec long outputAddress = ARRAY_BYTE_BASE_OFFSET + outputOffset; long outputLimit = outputAddress + MAX_OUTPUT_LENGTH;
return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table); final short[] table = getTable(); try { return SnappyRawCompressor.compress(input, inputAddress, inputLimit, output, outputAddress, outputLimit, table); } finally { returnTable(table); } }
@Override Expand Down Expand Up @@ -241,4 +272,38 @@ public void arrayCopy(Object src, int offset, int byteLength, Object dest, int d { System.arraycopy(src, offset, dest, dOffset, byteLength); }
private static short[] getTable() { SoftReference<short[]> existingRef; while((existingRef = CACHED_TABLES.poll()) != null) { short[] table = existingRef.get(); if (table != null) { //purge oldest entries have lost references SoftReference<short[]> entry; boolean lastEmpty = true; while (lastEmpty && (entry = CACHED_TABLES.peekLast()) != null) { if (entry.get() == null) { CACHED_TABLES.removeLastOccurrence(entry); } else { lastEmpty = false; } }
return table; } } return new short[SnappyRawCompressor.MAX_HASH_TABLE_SIZE]; }
private static void returnTable(short[] table) { CACHED_TABLES.addFirst(new SoftReference<short[]>(table)); } }