Skip to content

Commit e65018a

Browse files
timlee0119jiangxb1987
authored andcommitted
[SPARK-56511][CORE] Fix NPE in ShuffleInMemorySorter.getMemoryUsage after failed reset
### What changes were proposed in this pull request? Null-check `array` in `ShuffleInMemorySorter.getMemoryUsage()`. When `ShuffleExternalSorter.spill()` → `ShuffleInMemorySorter.reset()` → `MemoryConsumer.allocateArray()` throws OOM, `ShuffleInMemorySorter.array` is left null. The OOM propagates to `UnsafeShuffleWriter.write()`'s finally block, which calls `ShuffleExternalSorter.cleanupResources()` → `freeMemory()` → `updatePeakMemoryUsed()` → `ShuffleInMemorySorter.getMemoryUsage()` → NPE on `array.size()`. Example stack trace we see in prod: ``` java.lang.NullPointerException at org.apache.spark.shuffle.sort.ShuffleInMemorySorter.getMemoryUsage(ShuffleInMemorySorter.java:131) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.getMemoryUsage(ShuffleExternalSorter.java:349) at org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:472) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:297) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:213) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:87) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:82) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) ... ``` Returning 0 for `ShuffleInMemorySorter.getMemoryUsage()` is correct: when `array` is null the pointer array was already freed by `ShuffleInMemorySorter.reset()` and never reallocated — the actual memory usage IS zero. The value is only consumed by `ShuffleExternalSorter.updatePeakMemoryUsed()` for bookkeeping. ### Why are the changes needed? Without the fix, the NPE in `cleanupResources()` prevents `ShuffleInMemorySorter.free()` and page cleanup from running, causing a memory leak on top of the original OOM. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Unit test (`testGetMemoryUsageAfterFree`) verifying `ShuffleInMemorySorter.getMemoryUsage()` returns 0 after `free()` - Integration test in `ShuffleExternalSorterSuite`: constrains memory so `ShuffleInMemorySorter.reset()` → `allocateArray()` fails with OOM, then verifies `ShuffleExternalSorter.cleanupResources()` does not throw Before fix: `NullPointerException: Cannot invoke "LongArray.size()" because "this.array" is null at ShuffleExternalSorter.cleanupResources` After fix: test passes. ### Was this patch authored or co-authored using generative AI tooling? Yes. Closes #55373 from timlee0119/fix-shuffle-sorter-npe. Authored-by: Tim Lee <tim.lee@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
1 parent 57b6522 commit e65018a

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ public boolean hasSpaceForAnotherRecord() {
128128
}
129129

130130
public long getMemoryUsage() {
131+
if (array == null) {
132+
return 0L;
133+
}
131134
return array.size() * 8;
132135
}
133136

core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ public void testBasicSorting() {
120120
Assertions.assertFalse(iter.hasNext());
121121
}
122122

123+
@Test
124+
public void testGetMemoryUsageAfterFree() {
125+
final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
126+
consumer, 100, shouldUseRadixSort());
127+
Assertions.assertTrue(sorter.getMemoryUsage() > 0);
128+
sorter.free();
129+
Assertions.assertEquals(0, sorter.getMemoryUsage());
130+
}
131+
123132
@Test
124133
public void testSortingManyNumbers() {
125134
ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4, shouldUseRadixSort());

core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleExternalSorterSuite.scala

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,4 +114,93 @@ class ShuffleExternalSorterSuite extends SparkFunSuite with LocalSparkContext wi
114114
condition = "UNABLE_TO_ACQUIRE_MEMORY",
115115
parameters = Map("requestedBytes" -> "800", "receivedBytes" -> "400"))
116116
}
117+
118+
test("cleanupResources should not NPE when reset fails to reallocate array") {
119+
// Reproduces a bug where:
120+
// 1. insertRecord() triggers spill -> reset() -> array = null -> allocateArray() throws OOM
121+
// 2. OOM propagates out of insertRecord()
122+
// 3. UnsafeShuffleWriter's finally block calls cleanupResources()
123+
// 4. cleanupResources() -> freeMemory() -> updatePeakMemoryUsed() -> getMemoryUsage()
124+
// -> inMemSorter.getMemoryUsage() -> NPE because inMemSorter.array is still null
125+
//
126+
// The root cause: reset() sets array = null, then allocateArray() fails. The sorter is left
127+
// with inMemSorter != null but inMemSorter.array == null. cleanupResources() calls
128+
// freeMemory() which calls getMemoryUsage() before reaching inMemSorter.free().
129+
val conf = new SparkConf()
130+
.setMaster("local[1]")
131+
.setAppName("ShuffleExternalSorterSuite")
132+
.set(IS_TESTING, true)
133+
.set(TEST_MEMORY, 1600L)
134+
.set(MEMORY_FRACTION, 0.9999)
135+
136+
sc = new SparkContext(conf)
137+
val memoryManager = UnifiedMemoryManager(conf, 1)
138+
139+
var shouldStealMemory = false
140+
141+
// Override acquireExecutionMemory to steal freed memory during reset()'s allocateArray(),
142+
// forcing the allocation to fail with OOM.
143+
val taskMemoryManager = new TaskMemoryManager(memoryManager, 0) {
144+
override def acquireExecutionMemory(required: Long, consumer: MemoryConsumer): Long = {
145+
if (shouldStealMemory &&
146+
memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed > 400) {
147+
val acquireExecutionMemoryMethod =
148+
memoryManager.getClass.getMethods.filter(_.getName == "acquireExecutionMemory").head
149+
acquireExecutionMemoryMethod.invoke(
150+
memoryManager,
151+
JLong.valueOf(
152+
memoryManager.maxHeapMemory - memoryManager.executionMemoryUsed - 400),
153+
JLong.valueOf(1L),
154+
MemoryMode.ON_HEAP
155+
).asInstanceOf[java.lang.Long]
156+
}
157+
super.acquireExecutionMemory(required, consumer)
158+
}
159+
}
160+
val taskContext = mock[TaskContext]
161+
val taskMetrics = new TaskMetrics
162+
when(taskContext.taskMetrics()).thenReturn(taskMetrics)
163+
val sorter = new ShuffleExternalSorter(
164+
taskMemoryManager,
165+
sc.env.blockManager,
166+
taskContext,
167+
100, // initialSize: ShuffleInMemorySorter needs 800 bytes (100 * 8)
168+
1,
169+
conf,
170+
new ShuffleWriteMetrics)
171+
val inMemSorter = {
172+
val field = sorter.getClass.getDeclaredField("inMemSorter")
173+
field.setAccessible(true)
174+
field.get(sorter).asInstanceOf[ShuffleInMemorySorter]
175+
}
176+
177+
// Fill the pointer array until there's no space for another record.
178+
val bytes = new Array[Byte](1)
179+
while (inMemSorter.hasSpaceForAnotherRecord) {
180+
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
181+
}
182+
183+
// Enable memory stealing so that when spill -> reset() -> allocateArray() runs, the freed
184+
// memory is consumed before allocateArray can use it, causing OOM.
185+
shouldStealMemory = true
186+
187+
// insertRecord triggers spill -> reset() -> array = null -> allocateArray fails -> OOM
188+
intercept[SparkOutOfMemoryError] {
189+
sorter.insertRecord(bytes, Platform.BYTE_ARRAY_OFFSET, 1, 0)
190+
}
191+
192+
// Verify the broken state: inMemSorter != null but inMemSorter.array == null
193+
val inMemSorterField = sorter.getClass.getDeclaredField("inMemSorter")
194+
inMemSorterField.setAccessible(true)
195+
assert(inMemSorterField.get(sorter) != null, "inMemSorter should still be non-null")
196+
val arrayField = classOf[ShuffleInMemorySorter].getDeclaredField("array")
197+
arrayField.setAccessible(true)
198+
assert(arrayField.get(inMemSorterField.get(sorter)) == null,
199+
"inMemSorter.array should be null (reset freed it, allocateArray failed)")
200+
201+
// Without the fix, this NPEs in:
202+
// cleanupResources -> freeMemory -> updatePeakMemoryUsed -> getMemoryUsage
203+
// -> inMemSorter.getMemoryUsage -> array.size() -> NPE
204+
sorter.cleanupResources()
205+
}
117206
}

0 commit comments

Comments
 (0)