-
Notifications
You must be signed in to change notification settings - Fork 588
HDDS-14166. Get rid of byte array operations from RDBBatchOperation for PUT and DELETE #9552
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
swamirishi
wants to merge
8
commits into
apache:master
Choose a base branch
from
swamirishi:HDDS-14166_Alt
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f47ebe4
HDDS-14166. Completely get rid of byte array operations from RDBBatch…
swamirishi 61091f2
HDDS-14166. Address review comments
swamirishi 95cfe2b
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi d9b1480
HDDS-14166. Add test cases
swamirishi aa1841a
HDDS-14166. Rename class
swamirishi 48639d2
HDDS-14166. Update java doc
swamirishi 58b9cd5
HDDS-14166. Address review comments
swamirishi 3b90398
Merge remote-tracking branch 'apache/master' into HEAD
swamirishi File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -49,6 +49,7 @@ public final class RDBBatchOperation implements BatchOperation { | |
| static final Logger LOG = LoggerFactory.getLogger(RDBBatchOperation.class); | ||
|
|
||
| private static final AtomicInteger BATCH_COUNT = new AtomicInteger(); | ||
| private static final CodecBufferCodec DIRECT_CODEC_BUFFER_CODEC = CodecBufferCodec.get(true); | ||
|
|
||
| private final String name = "Batch-" + BATCH_COUNT.getAndIncrement(); | ||
|
|
||
|
|
@@ -135,16 +136,26 @@ public void close() { | |
| } | ||
| } | ||
|
|
||
| private abstract static class Op implements Closeable { | ||
| private abstract static class SingleKeyOp extends Op { | ||
|
Comment on lines
-138
to
+139
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move SingleKeyOp to below Op. |
||
| private final CodecBuffer keyBuffer; | ||
| private final Bytes keyBytes; | ||
|
|
||
| private Op(Bytes keyBytes) { | ||
| this.keyBytes = keyBytes; | ||
| private SingleKeyOp(CodecBuffer keyBuffer) { | ||
| this.keyBuffer = Objects.requireNonNull(keyBuffer); | ||
| this.keyBytes = Bytes.newBytes(keyBuffer); | ||
| } | ||
|
|
||
| abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; | ||
| CodecBuffer getKeyBuffer() { | ||
| return keyBuffer; | ||
| } | ||
|
|
||
| abstract int keyLen(); | ||
| Bytes getKeyBytes() { | ||
| return keyBytes; | ||
| } | ||
|
|
||
| int keyLen() { | ||
| return getKeyBuffer().readableBytes(); | ||
| } | ||
|
|
||
| int valLen() { | ||
| return 0; | ||
|
|
@@ -155,100 +166,75 @@ int totalLength() { | |
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (keyBytes != null) { | ||
| keyBytes.close(); | ||
| boolean closeImpl() { | ||
| if (super.closeImpl()) { | ||
| IOUtils.close(LOG, keyBuffer, keyBytes); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Delete operation to be applied to a {@link ColumnFamily} batch. | ||
| */ | ||
| private static final class DeleteOp extends Op { | ||
| private final byte[] key; | ||
| private abstract static class Op implements Closeable { | ||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
|
|
||
| private DeleteOp(byte[] key, Bytes keyBytes) { | ||
| super(Objects.requireNonNull(keyBytes, "keyBytes == null")); | ||
| this.key = Objects.requireNonNull(key, "key == null"); | ||
| } | ||
| abstract void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException; | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchDelete(batch, this.key); | ||
| abstract int totalLength(); | ||
|
|
||
| boolean closeImpl() { | ||
| return closed.compareAndSet(false, true); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.length; | ||
| public final void close() { | ||
| closeImpl(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. | ||
| * Delete operation to be applied to a {@link ColumnFamily} batch. | ||
| */ | ||
| private final class PutOp extends Op { | ||
| private final CodecBuffer key; | ||
| private final CodecBuffer value; | ||
| private final AtomicBoolean closed = new AtomicBoolean(false); | ||
| private static final class DeleteOp extends SingleKeyOp { | ||
|
|
||
| private PutOp(CodecBuffer key, CodecBuffer value, Bytes keyBytes) { | ||
| super(keyBytes); | ||
| this.key = key; | ||
| this.value = value; | ||
| private DeleteOp(CodecBuffer key) { | ||
| super(key); | ||
| } | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchPut(batch, key.asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public int valLen() { | ||
| return value.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| if (closed.compareAndSet(false, true)) { | ||
| key.release(); | ||
| value.release(); | ||
| } | ||
| super.close(); | ||
| family.batchDelete(batch, this.getKeyBuffer().asReadOnlyByteBuffer()); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the byte array api. | ||
| * Put operation to be applied to a {@link ColumnFamily} batch using the CodecBuffer api. | ||
| */ | ||
| private static final class ByteArrayPutOp extends Op { | ||
| private final byte[] key; | ||
| private final byte[] value; | ||
| private final class PutOp extends SingleKeyOp { | ||
| private final CodecBuffer value; | ||
|
|
||
| private ByteArrayPutOp(byte[] key, byte[] value, Bytes keyBytes) { | ||
| super(keyBytes); | ||
| this.key = Objects.requireNonNull(key, "key == null"); | ||
| private PutOp(CodecBuffer key, CodecBuffer value) { | ||
| super(Objects.requireNonNull(key, "key == null")); | ||
| this.value = Objects.requireNonNull(value, "value == null"); | ||
| } | ||
|
|
||
| @Override | ||
| public void apply(ColumnFamily family, ManagedWriteBatch batch) throws RocksDatabaseException { | ||
| family.batchPut(batch, key, value); | ||
| family.batchPut(batch, getKeyBuffer().asReadOnlyByteBuffer(), value.asReadOnlyByteBuffer()); | ||
| } | ||
|
|
||
| @Override | ||
| public int keyLen() { | ||
| return key.length; | ||
| public int valLen() { | ||
| return value.readableBytes(); | ||
| } | ||
|
|
||
| @Override | ||
| public int valLen() { | ||
| return value.length; | ||
| boolean closeImpl() { | ||
| if (super.closeImpl()) { | ||
| IOUtils.close(LOG, value); | ||
| return true; | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -271,7 +257,7 @@ private class FamilyCache { | |
| * It supports operations such as additions and deletions while maintaining the ability to overwrite | ||
| * existing entries when necessary. | ||
| */ | ||
| private final Map<Bytes, Op> ops = new HashMap<>(); | ||
| private final Map<Bytes, SingleKeyOp> ops = new HashMap<>(); | ||
| private boolean isCommit; | ||
|
|
||
| private long batchSize; | ||
|
|
@@ -312,7 +298,7 @@ void clear() { | |
| } | ||
|
|
||
| private void deleteIfExist(Bytes key) { | ||
| final Op previous = ops.remove(key); | ||
| final SingleKeyOp previous = ops.remove(key); | ||
| if (previous != null) { | ||
| previous.close(); | ||
| discardedSize += previous.totalLength(); | ||
|
|
@@ -322,8 +308,9 @@ private void deleteIfExist(Bytes key) { | |
| } | ||
| } | ||
|
|
||
| void overwriteIfExists(Bytes key, Op op) { | ||
| void overwriteIfExists(SingleKeyOp op) { | ||
| Preconditions.checkState(!isCommit, "%s is already committed.", this); | ||
| Bytes key = op.getKeyBytes(); | ||
| deleteIfExist(key); | ||
| batchSize += op.totalLength(); | ||
| Op overwritten = ops.put(key, op); | ||
|
|
@@ -336,21 +323,12 @@ void overwriteIfExists(Bytes key, Op op) { | |
|
|
||
| void put(CodecBuffer key, CodecBuffer value) { | ||
| putCount++; | ||
| // always release the key with the value | ||
| Bytes keyBytes = Bytes.newBytes(key); | ||
| overwriteIfExists(keyBytes, new PutOp(key, value, keyBytes)); | ||
| overwriteIfExists(new PutOp(key, value)); | ||
| } | ||
|
|
||
| void put(byte[] key, byte[] value) { | ||
| putCount++; | ||
| Bytes keyBytes = new Bytes(key); | ||
| overwriteIfExists(keyBytes, new ByteArrayPutOp(key, value, keyBytes)); | ||
| } | ||
|
|
||
| void delete(byte[] key) { | ||
| void delete(CodecBuffer key) { | ||
| delCount++; | ||
| Bytes keyBytes = new Bytes(key); | ||
| overwriteIfExists(keyBytes, new DeleteOp(key, keyBytes)); | ||
| overwriteIfExists(new DeleteOp(key)); | ||
| } | ||
|
|
||
| String putString(int keySize, int valueSize) { | ||
|
|
@@ -378,14 +356,8 @@ void put(ColumnFamily f, CodecBuffer key, CodecBuffer value) { | |
| .put(key, value); | ||
| } | ||
|
|
||
| void put(ColumnFamily f, byte[] key, byte[] value) { | ||
| name2cache.computeIfAbsent(f.getName(), k -> new FamilyCache(f)) | ||
| .put(key, value); | ||
| } | ||
|
|
||
| void delete(ColumnFamily family, byte[] key) { | ||
| name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)) | ||
| .delete(key); | ||
| void delete(ColumnFamily family, CodecBuffer key) { | ||
| name2cache.computeIfAbsent(family.getName(), k -> new FamilyCache(family)).delete(key); | ||
| } | ||
|
|
||
| /** Prepare batch write for the entire cache. */ | ||
|
|
@@ -461,6 +433,10 @@ public void close() { | |
| } | ||
|
|
||
| public void delete(ColumnFamily family, byte[] key) { | ||
| opCache.delete(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key)); | ||
| } | ||
|
|
||
| public void delete(ColumnFamily family, CodecBuffer key) { | ||
| opCache.delete(family, key); | ||
| } | ||
|
|
||
|
|
@@ -469,6 +445,7 @@ public void put(ColumnFamily family, CodecBuffer key, CodecBuffer value) { | |
| } | ||
|
|
||
| public void put(ColumnFamily family, byte[] key, byte[] value) { | ||
| opCache.put(family, key, value); | ||
| opCache.put(family, DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(key), | ||
| DIRECT_CODEC_BUFFER_CODEC.fromPersistedFormat(value)); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method currently only used in test. What is it for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was planning to use this for SnapshotDiff Algorithm.
https://issues.apache.org/jira/browse/HDDS-14031(WIP)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, add this method in HDDS-14031 so it can be properly reviewed.