Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

Support vanilla spark 3.1.1 #33

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
2d763b2
Update README.md
JustDoCoder Apr 15, 2021
e06463b
Update pom.xml
JustDoCoder Apr 16, 2021
358c891
Update MemoryConsumer.java
JustDoCoder Apr 16, 2021
a46dddc
Update PMemManagerInitializer.java
JustDoCoder Apr 16, 2021
832cd47
Update TaskMemoryManager.java
JustDoCoder Apr 16, 2021
1b59a13
Update BytesToBytesMap.java
JustDoCoder Apr 16, 2021
1320ad1
Update MemoryAllocator.java
JustDoCoder Apr 16, 2021
3ae5228
Update MemoryBlock.java
JustDoCoder Apr 16, 2021
6350d1b
Update PMemReader.java
JustDoCoder Apr 16, 2021
6a0703f
Update PMemReaderForUnsafeExternalSorter.java
JustDoCoder Apr 16, 2021
8d852bf
Update PMemSpillWriterFactory.java
JustDoCoder Apr 16, 2021
b29b62f
Update PMemSpillWriterType.java
JustDoCoder Apr 16, 2021
0aa0fa8
Update PMemWriter.java
JustDoCoder Apr 16, 2021
9e0c90f
Update SortedIteratorForSpills.java
JustDoCoder Apr 16, 2021
1fa1706
Update SortedPMemPageSpillWriter.java
JustDoCoder Apr 16, 2021
e64a414
Update SpillWriterForUnsafeSorter.java
JustDoCoder Apr 16, 2021
715bab3
Update UnsafeExternalSorter.java
JustDoCoder Apr 16, 2021
485cc28
Update UnsafeInMemorySorter.java
JustDoCoder Apr 16, 2021
400ca01
Update UnsafeSorterPMemSpillWriter.java
JustDoCoder Apr 16, 2021
68ddd0d
Update UnsafeSorterPMemSpillWriter.java
JustDoCoder Apr 16, 2021
facb72e
Update UnsafeSorterSpillReader.java
JustDoCoder Apr 16, 2021
c33de3c
Update UnsafeSorterSpillWriter.java
JustDoCoder Apr 16, 2021
3d2b365
Update UnsafeSorterStreamSpillReader.java
JustDoCoder Apr 16, 2021
1834c8d
Update UnsafeSorterStreamSpillWriter.java
JustDoCoder Apr 16, 2021
e6f9698
Update package.scala
JustDoCoder Apr 16, 2021
d1ac9c0
Update ExtendedMemoryPool.scala
JustDoCoder Apr 16, 2021
3484c92
Update MemoryManager.scala
JustDoCoder Apr 16, 2021
ff98519
Update StorageMemoryPool.scala
JustDoCoder Apr 16, 2021
8d96ae9
Update UnifiedMemoryManager.scala
JustDoCoder Apr 16, 2021
4c8864d
Update MemoryStore.scala
JustDoCoder Apr 16, 2021
1204cc1
Update BlockManager.scala
JustDoCoder Apr 16, 2021
99d75df
Update PMemBlockObjectWriter.scala
JustDoCoder Apr 16, 2021
30f5d5f
Update StorageLevel.scala
JustDoCoder Apr 16, 2021
3b5f617
Update SparkEnv.scala
JustDoCoder Apr 16, 2021
0c983ea
Update TestMemoryManager.scala
JustDoCoder Apr 16, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ You will find jar files under oap-common/target and oap-spark/target.

To enable rdd cache on Intel Optane PMem, you need add the following configurations to `spark-defaults.conf`
```
spark.memory.pmem.extension.enabled true
spark.memory.pmem.initial.path [Your Optane PMem paths seperate with comma]
spark.memory.pmem.initial.size [Your Optane PMem size in GB]
spark.memory.pmem.usable.ratio [from 0 to 1, 0.85 is recommended]
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<junit.version>4.12</junit.version>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.internal.version>3.0.0</spark.internal.version>
<spark.internal.version>3.1.1</spark.internal.version>
<maven-patch-plugin.version>1.2</maven-patch-plugin.version>
<maven-resource-plugin.version>2.6</maven-resource-plugin.version>
<jetty.version>9.4.39.v20210325</jetty.version>
Expand Down
114 changes: 95 additions & 19 deletions src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,12 @@ public void remove() {
}

private void handleFailedDelete() {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
if (spillWriters.size() > 0) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
}
}
Expand All @@ -406,17 +408,10 @@ private void handleFailedDelete() {
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
* The returned iterator is thread-safe. However if the map is modified while iterating over it,
* the behavior of the returned iterator is undefined.
*/
public MapIterator iterator() {
return new MapIterator(numValues, loc, false);
}

/**
* Returns a thread safe iterator that iterates of the entries of this map.
*/
public MapIterator safeIterator() {
return new MapIterator(numValues, new Location(), false);
}

Expand All @@ -427,19 +422,82 @@ public MapIterator safeIterator() {
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* If any other lookups or operations are performed on this map while iterating over it, including
* `lookup()`, the behavior of the returned iterator is undefined.
* The returned iterator is thread-safe. However if the map is modified while iterating over it,
* the behavior of the returned iterator is undefined.
*/
public MapIterator destructiveIterator() {
updatePeakMemoryUsed();
return new MapIterator(numValues, loc, true);
return new MapIterator(numValues, new Location(), true);
}

/**
* Iterator for the entries of this map. This is to first iterate over key indices in
* `longArray` then accessing values in `dataPages`. NOTE: this is different from `MapIterator`
* in the sense that key index is preserved here
* (See `UnsafeHashedRelation` for example of usage).
*/
public final class MapIteratorWithKeyIndex implements Iterator<Location> {

/**
* The index in `longArray` where the key is stored.
*/
private int keyIndex = 0;

private int numRecords;
private final Location loc;

private MapIteratorWithKeyIndex() {
this.numRecords = numValues;
this.loc = new Location();
}

@Override
public boolean hasNext() {
return numRecords > 0;
}

@Override
public Location next() {
if (!loc.isDefined() || !loc.nextValue()) {
while (longArray.get(keyIndex * 2) == 0) {
keyIndex++;
}
loc.with(keyIndex, 0, true);
keyIndex++;
}
numRecords--;
return loc;
}
}

/**
* Returns an iterator for iterating over the entries of this map,
* by first iterating over the key index inside hash map's `longArray`.
*
* For efficiency, all calls to `next()` will return the same {@link Location} object.
*
* The returned iterator is NOT thread-safe. If the map is modified while iterating over it,
* the behavior of the returned iterator is undefined.
*/
public MapIteratorWithKeyIndex iteratorWithKeyIndex() {
return new MapIteratorWithKeyIndex();
}

/**
* The maximum number of allowed keys index.
*
* The value of allowed keys index is in the range of [0, maxNumKeysIndex - 1].
*/
public int maxNumKeysIndex() {
return (int) (longArray.size() / 2);
}

/**
* Looks up a key, and return a {@link Location} handle that can be used to test existence
* and read/write values.
*
* This function always return the same {@link Location} instance to avoid object allocation.
* This function always returns the same {@link Location} instance to avoid object allocation.
* This function is not thread-safe.
*/
public Location lookup(Object keyBase, long keyOffset, int keyLength) {
safeLookup(keyBase, keyOffset, keyLength, loc,
Expand All @@ -451,7 +509,8 @@ public Location lookup(Object keyBase, long keyOffset, int keyLength) {
* Looks up a key, and return a {@link Location} handle that can be used to test existence
* and read/write values.
*
* This function always return the same {@link Location} instance to avoid object allocation.
* This function always returns the same {@link Location} instance to avoid object allocation.
* This function is not thread-safe.
*/
public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) {
safeLookup(keyBase, keyOffset, keyLength, loc, hash);
Expand Down Expand Up @@ -606,6 +665,14 @@ public boolean isDefined() {
return isDefined;
}

/**
* Returns index for key.
*/
public int getKeyIndex() {
assert (isDefined);
return pos;
}

/**
* Returns the base object for key.
*/
Expand Down Expand Up @@ -743,14 +810,23 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff
longArray.set(pos * 2 + 1, keyHashcode);
isDefined = true;

// If the map has reached its growth threshold, try to grow it.
if (numKeys >= growthThreshold) {
// We use two array entries per key, so the array size is twice the capacity.
// We should compare the current capacity of the array, instead of its size.
if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
if (longArray.size() / 2 < MAX_CAPACITY) {
try {
growAndRehash();
} catch (SparkOutOfMemoryError oom) {
canGrowArray = false;
}
} else {
// The map is already at MAX_CAPACITY and cannot grow. Instead, we prevent it from
// accepting any more new elements to make sure we don't exceed the load factor. If we
// need to spill later, this allows UnsafeKVExternalSorter to reuse the array for
// sorting.
canGrowArray = false;
}
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ public int getRecordLength() {
public long getKeyPrefix() {
return keyPrefix;
}

@Override
public long getCurrentPageNumber() {
throw new UnsupportedOperationException();
}

@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public void loadNext() {
public int getNumRecords() {
return numRecords;
}

@Override
public long getCurrentPageNumber() {
throw new UnsupportedOperationException();
}

/**
* load more PMem records in the buffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ public SortedPMemPageSpillReader() throws IOException{
numRecordsOnDisk = diskSpillWriter.recordsSpilled();
}
}

@Override
public long getCurrentPageNumber() {
throw new UnsupportedOperationException();
}

@Override
public boolean hasNext() {
return curNumOfRec < numRecordsOnPMem + numRecordsOnDisk;
Expand Down
Loading