diff --git a/src/main/java/net/spy/memcached/FillWriteBufferStatus.java b/src/main/java/net/spy/memcached/FillWriteBufferStatus.java new file mode 100644 index 000000000..87b44e4d1 --- /dev/null +++ b/src/main/java/net/spy/memcached/FillWriteBufferStatus.java @@ -0,0 +1,77 @@ +/** + * (c) Copyright 2019 freiheit.com technologies GmbH + * + * Created on 2019-09-26 by Marco Kortkamp (marco.kortkamp@freiheit.com) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING + * IN THE SOFTWARE. + */ + +package net.spy.memcached; + +import net.spy.memcached.ops.OperationState; + +/** + * Status, which tells if a fillWriteBuffer method has + * successfully been completed or not. + * + * @author Marco Kortkamp (marco.kortkamp@freiheit.com) + */ +public enum FillWriteBufferStatus { + SUCCESS, + /** + * An operation may be marked as canceled (by an async callback), + * before a fillWriteBuffer completed successfully. + *

+ * This has been observed, if a large values are written in + * a very high frequency to the memcached and the payload contains + * the error-code ERR_2BIG. + *

+ * @see Bug-Report + */ + OP_STATUS_IS_COMPLETED, + /** + * The residual entries are just for paranoia. If the operation + * state can switch to completed, it might switch to another state + * as well. Although this has not been observed jet TBMK. + */ + OP_STATUS_IS_WRITE_QUEUED, + OP_STATUS_IS_READING, + OP_STATUS_IS_RETRY + ; + + public static FillWriteBufferStatus forOperationState(final OperationState opState) { + switch (opState){ + case WRITE_QUEUED: + return OP_STATUS_IS_WRITE_QUEUED; + case WRITING: + return SUCCESS; + case READING: + return OP_STATUS_IS_READING; + case COMPLETE: + return OP_STATUS_IS_COMPLETED; + case RETRY: + return OP_STATUS_IS_RETRY; + } + return null; + } + + public boolean isSuccess() { + return this == SUCCESS; + } +} diff --git a/src/main/java/net/spy/memcached/MemcachedConnection.java b/src/main/java/net/spy/memcached/MemcachedConnection.java index 47f432fb1..5b0aff4fa 100644 --- a/src/main/java/net/spy/memcached/MemcachedConnection.java +++ b/src/main/java/net/spy/memcached/MemcachedConnection.java @@ -39,12 +39,15 @@ import net.spy.memcached.ops.OperationStatus; import net.spy.memcached.ops.TapOperation; import net.spy.memcached.ops.VBucketAware; +import net.spy.memcached.protocol.TCPMemcachedNodeImpl; import net.spy.memcached.protocol.binary.BinaryOperationFactory; import net.spy.memcached.protocol.binary.MultiGetOperationImpl; import net.spy.memcached.protocol.binary.TapAckOperationImpl; import net.spy.memcached.util.StringUtils; +import sun.nio.ch.IOUtil; import java.io.IOException; +import java.lang.annotation.Native; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.Socket; @@ -806,12 +809,12 @@ public void complete() { * @throws IOException can be raised during writing failures. */ private void handleWrites(final MemcachedNode node) throws IOException { - node.fillWriteBuffer(shouldOptimize); + FillWriteBufferStatus bufferFilledStatus = node.fillWriteBuffer(shouldOptimize); boolean canWriteMore = node.getBytesRemainingToWrite() > 0; - while (canWriteMore) { + while (canWriteMore && bufferFilledStatus.isSuccess()) { int wrote = node.writeSome(); metrics.updateHistogram(OVERALL_AVG_BYTES_WRITE_METRIC, wrote); - node.fillWriteBuffer(shouldOptimize); + bufferFilledStatus = node.fillWriteBuffer(shouldOptimize); canWriteMore = wrote > 0 && node.getBytesRemainingToWrite() > 0; } } @@ -1458,23 +1461,30 @@ public void run() { while (running) { try { handleIO(); - } catch (IOException e) { + } catch (final IOException e) { + logRunException(e); + } catch (final CancelledKeyException e) { logRunException(e); - } catch (CancelledKeyException e) { + } catch (final ClosedSelectorException e) { logRunException(e); - } catch (ClosedSelectorException e) { + } catch (final IllegalStateException e) { logRunException(e); - } catch (IllegalStateException e) { + } catch (final ConcurrentModificationException e) { logRunException(e); - } catch (ConcurrentModificationException e) { + } catch (final NullPointerException e) { logRunException(e); + } catch (final Exception e) { + // Exception, because we don't want to catch an OutOfMemoryError here. + logException(e); } } getLogger().info("Shut down memcached client"); } + + /** - * Log a exception to different levels depending on the state. + * Log an exception to different levels depending on the state. * * Exceptions get logged at debug level when happening during shutdown, but * at warning level when operating normally. @@ -1489,6 +1499,19 @@ private void logRunException(final Exception e) { } } + /** + * Logs an exception to different levels depending on the state. + * + * @param e the Throwable to log. + */ + private void logException(final Exception e) { + if (shutDown) { + getLogger().warn("Exception occurred during shutdown", e); + } else { + getLogger().error("Unexpected problem handling memcached IO", e); + } + } + /** * Returns whether the connection is shut down or not. * diff --git a/src/main/java/net/spy/memcached/MemcachedNode.java b/src/main/java/net/spy/memcached/MemcachedNode.java index c86f96e70..e00bf2cbd 100644 --- a/src/main/java/net/spy/memcached/MemcachedNode.java +++ b/src/main/java/net/spy/memcached/MemcachedNode.java @@ -56,18 +56,19 @@ public interface MemcachedNode { */ void setupResend(); + /** + * Transition the current write item into a read state. + */ + void transitionWriteItem(); + /** * Fill the write buffer with data from the next operations in the queue. * * @param optimizeGets if true, combine sequential gets into a single * multi-key get + * @return FillWriteBufferStatus indicates, whether this method was completed successfully or not. */ - void fillWriteBuffer(boolean optimizeGets); - - /** - * Transition the current write item into a read state. - */ - void transitionWriteItem(); + FillWriteBufferStatus fillWriteBuffer(boolean optimizeGets); /** * Get the operation at the top of the queue that is requiring input. diff --git a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java index d3ae9c497..5b6de4556 100644 --- a/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java +++ b/src/main/java/net/spy/memcached/MemcachedNodeROImpl.java @@ -62,7 +62,7 @@ public void copyInputQueue() { throw new UnsupportedOperationException(); } - public void fillWriteBuffer(boolean optimizeGets) { + public FillWriteBufferStatus fillWriteBuffer(boolean optimizeGets) { throw new UnsupportedOperationException(); } diff --git a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java index 52deeb04b..8c6368804 100644 --- a/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java +++ b/src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java @@ -35,10 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import net.spy.memcached.ConnectionFactory; -import net.spy.memcached.FailureMode; -import net.spy.memcached.MemcachedConnection; -import net.spy.memcached.MemcachedNode; +import net.spy.memcached.*; import net.spy.memcached.compat.SpyObject; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; @@ -192,23 +189,26 @@ private boolean preparePending() { * * @see net.spy.memcached.MemcachedNode#fillWriteBuffer(boolean) */ - public final void fillWriteBuffer(boolean shouldOptimize) { + public final FillWriteBufferStatus fillWriteBuffer(boolean shouldOptimize) { if (toWrite == 0 && readQ.remainingCapacity() > 0) { getWbuf().clear(); Operation o=getNextWritableOp(); while(o != null && toWrite < getWbuf().capacity()) { synchronized(o) { - assert o.getState() == OperationState.WRITING; - + final OperationState oState = o.getState(); ByteBuffer obuf = o.getBuffer(); - assert obuf != null : "Didn't get a write buffer from " + o; + // This cases may happen due to race condition. See FillWriteBufferNPETest. + if (oState != OperationState.WRITING || obuf == null) { + return logCleanUpAndReturnStatus(FillWriteBufferStatus.forOperationState(oState), o, obuf); + } + int bytesToCopy = Math.min(getWbuf().remaining(), obuf.remaining()); byte[] b = new byte[bytesToCopy]; obuf.get(b); getWbuf().put(b); getLogger().debug("After copying stuff from %s: %s", o, getWbuf()); - if (!o.getBuffer().hasRemaining()) { + if (!obuf.hasRemaining()) { o.writeComplete(); transitionWriteItem(); @@ -230,9 +230,24 @@ public final void fillWriteBuffer(boolean shouldOptimize) { } else { getLogger().debug("Buffer is full, skipping"); } + return FillWriteBufferStatus.SUCCESS; + } + + private FillWriteBufferStatus logCleanUpAndReturnStatus( + final FillWriteBufferStatus status, final Operation op, + final ByteBuffer obuf) { + if (getLogger().isDebugEnabled()) { + getLogger().debug("fillWriteBuffer not finished successfully." + + " FillWriteBufferStatus: " + status + + " ByteBuffer: " + obuf + + " Operation: " + op + + ". Removing operation."); + } + wbuf.clear(); + removeCurrentWriteOp(); + return status; } - private Operation getNextWritableOp() { Operation o = getCurrentWriteOp(); while (o != null && o.getState() == OperationState.WRITE_QUEUED) { @@ -485,8 +500,9 @@ public final int getReconnectCount() { @Override public final String toString() { int sops = 0; - if (getSk() != null && getSk().isValid()) { - sops = getSk().interestOps(); + final SelectionKey sk = getSk(); + if (sk != null && sk.isValid()) { + sops = sk.interestOps(); } int rsize = readQ.size() + (optimizedOp == null ? 0 : 1); int wsize = writeQ.size(); diff --git a/src/main/java/net/spy/memcached/protocol/binary/OperationImpl.java b/src/main/java/net/spy/memcached/protocol/binary/OperationImpl.java index 6552dd1ce..10cdcf6dd 100644 --- a/src/main/java/net/spy/memcached/protocol/binary/OperationImpl.java +++ b/src/main/java/net/spy/memcached/protocol/binary/OperationImpl.java @@ -209,9 +209,16 @@ protected void finishedPayload(byte[] pl) throws IOException { && !getState().equals(OperationState.COMPLETE)) { transitionState(OperationState.RETRY); } else { - getCallback().receivedStatus(status); - transitionState(OperationState.COMPLETE); + setOpStateAndTriggerCallback(status, errorCode); + } + } + + private void setOpStateAndTriggerCallback(final OperationStatus status, final int errorCode) { + if (getLogger().isTraceEnabled()) { + getLogger().trace("Received error-code: " + errorCode); } + transitionState(OperationState.COMPLETE); + getCallback().receivedStatus(status); } /** diff --git a/src/test/java/net/spy/memcached/MockMemcachedNode.java b/src/test/java/net/spy/memcached/MockMemcachedNode.java index b35c32949..5202fd405 100644 --- a/src/test/java/net/spy/memcached/MockMemcachedNode.java +++ b/src/test/java/net/spy/memcached/MockMemcachedNode.java @@ -79,8 +79,8 @@ public void setupResend() { // noop } - public void fillWriteBuffer(boolean optimizeGets) { - // noop + public FillWriteBufferStatus fillWriteBuffer(boolean optimizeGets) { + return null; } public void transitionWriteItem() { diff --git a/src/test/manual/net/spy/memcached/test/FillWriteBufferNPETest.java b/src/test/manual/net/spy/memcached/test/FillWriteBufferNPETest.java new file mode 100644 index 000000000..2899c1b51 --- /dev/null +++ b/src/test/manual/net/spy/memcached/test/FillWriteBufferNPETest.java @@ -0,0 +1,87 @@ +/** + * (c) Copyright 2019 freiheit.com technologies GmbH + * + * Created on 2019-09-25 by Marco Kortkamp (marco.kortkamp@freiheit.com) + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING + * IN THE SOFTWARE. + */ + +package net.spy.memcached.test; + +import net.spy.memcached.BinaryConnectionFactory; +import net.spy.memcached.MemcachedClient; +import net.spy.memcached.transcoders.SerializingTranscoder; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; + +/** + * A manual test which tries to trigger a NPE in the fillWriteBuffer + * method, when large session passed to the memcached with very high + * frequency. + * + * The initial verison of this test was provided in the following bug report. + * @see Bug Report + * + * @author Marco Kortkamp (marco.kortkamp@freiheit.com) + */ +public class FillWriteBufferNPETest { + private static final String MEMCACHED_HOST = "localhost"; + private static final int MEMCACHED_PORT = 11211; + private static final int SLEEP_MS = 1; + + /** + * In order to run this test you need a memcached instance. + * It can e.g. be started by: + * $ memcached -d -p 11211 -u memcached -m 64 -c 1024 + * + * Experiments were conducted with the VM-Options "-Xmx4G". + */ + public static void main(final String[] args) throws IOException, InterruptedException { + final MemcachedClient client = new MemcachedClient(new BinaryConnectionFactory(), + Collections.singletonList(new InetSocketAddress(MEMCACHED_HOST, MEMCACHED_PORT)) + ); + + final SerializingTranscoder transcoder = new SerializingTranscoder(); + transcoder.setCompressionThreshold(Integer.MAX_VALUE); + + final byte[] data = new byte[2 * 1024 * 1024]; + int i = 0; + try { + while (true) { + client.set("test", 60, data, transcoder); + /* + * If this sleep is not in, the memcached ocassionally closes + * the socket with "java.io.IOException: Connection reset by peer" + * and during the reconnect phase the while loop shovels us into + * a an "java.lang.OutOfMemoryError: Java heap space". + * I'm not sure if this is a remaining issue. + */ + Thread.sleep(SLEEP_MS); + if (i > 0 && i % 100 == 0) { + System.out.println(i); + } + i++; + } + } catch (final Throwable t) { + t.printStackTrace(); + } + } +}