Poison

EMR-OSS #557

在我们的每日离线数据同步任务中,需要将在线业务数据同步至离线 Hive 数仓,而离线数仓存储使用的阿里云 OSS,Spark 与 OSS 数据交互使用的连接器为 EMR-OSS。我们发现,有极低的概率会出现 SQL 执行卡住,大约几个月会出现一次,且因为问题都发生在凌晨,所以每次收到数据同步延迟告警时我都临时将对应 SQL 进行了 kill,然后手动对出错的表数据进行了校准并重新同步,然后白天使用相同的 SQL 尝试复现该问题时,一直未能复现该问题,我当时还写了个 for 循环生成不同大小的文件然后使用该连接器读取以尝试复现该异常,但是实在是太慢了,使用多线程跑了几天都没跑出一个能触发该异常的文件,而在上周的一个凌晨,问题又出现了,这一次,我保留了现场。

我将线上出现异常涉及到的数据复制到测试环境后,在测试环境稳定复现了该异常,在测试环境复现的截图如下:

Thrift Server

表现即为 SQL 执行卡住,如果查看对应 Executor 的 err 日志,可以发现如下异常:
ArrayIndexOutOfBoundsException

可以看出,出现了 ArrayIndexOutOfBoundsException,根据截图中异常可知,off = 1842697length = 254456buffer length = 2097152。我们将 offlength 相加可以得到 off + length = 1842697 + 254456 = 2097153,刚好比 buffer length 多出一个字节,令我不解的是,即使出现了数组越界异常,SQL 也应该抛出异常,而不应该卡住。于是,我查看了该 Executor 的线程转储:

Thread dump

根据线程转储,我们知道负责 SQL 执行的线程当前正在 sleep,就是因为它的 sleep 导致我不能 sleep…

Executor 的 err 日志及线程转储都共同指向了一个类,即 com.aliyun.fs.oss.nat.BufferReader,我们线上环境使用的 EMR-OSS 连接器的版本为 1.8,该类的源码位于 BufferReader.java,感兴趣的可以先看一看。

到这里,问题已经定位到与 BufferReader 类有关,所以我尝试在本地复现该问题,因为本地能够更好调试验证该问题,了解 Spark 的同学应该都知道一个目录下通常有许多文件,此时,我想找出是哪一个文件导致 BufferReader 异常及 sleep,于是我对该 Executor 发起了一次堆转储,在堆转储中,找到该线程,根据当前正在 sleepBufferReader 实例中的 key 属性定位到了该文件。

Heap Dump

该文件的文件名为 part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000,然后我将卡住的 SQL kill 掉,并重新执行该 SQL 数次,发现每次 SQL 执行都会卡住,然后堆转储中 sleepBufferReader 实例中的 key 均为 part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000,让我更加怀疑该文件,是什么原因导致 BufferReader 出现数组索引越界并处于 sleep 调用中。

于是,我尝试在本地使用 EMR-OSS 连接器读取文件 part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 来复现该问题。根据上文的截图我们知道,sleep 的调用栈帧中对 EMR-OSS 的调用入口为 com.aliyun.fs.oss.nat.NativeOssFileSystem.NativeOssFsInputStream.read(),我在本地使用 TestAliyunOSSInputStream.java 中的单元测试,将代码适当调整后读取文件 part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 尝试在本地复现该问题,很遗憾,读取文件没有出现异常,也没有出现 sleep。根据之前 sleep 截图的栈帧我们知道,停留在 NativeOssFileSystem.java 的 576 行的 read 方法调用上,该行的代码位于 NativeOssFileSystem.java:576,源码为 return bufferReader.read(b, off, len),可知调用的 bufferReader 带参数的 read 方法。这让我怀疑是否需要特定参数值的 read 方法调用才能触发该异常,于是我尝试从堆转储中拿到 sleep 栈帧中的 read 方法的调用值,我在 MAT 中并没有找到栈帧的方法参数值,然后查询相关文档,好像堆转储中是不包含线程调用时的方法参数值的(如有不对请指正),比如如下链接:Eclipse Community Forums: Memory Analyzer (MAT) » How to get executing stack and local variable information?。为了验证我的想法,我使用 ArthasBufferReader 对文件 part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 的相关方法调用进行了跟踪,拿到了调用记录,其最后一次打开文件到 sleep 的调用链路如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
new BufferReader(store, 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000', conf, 1);
bufferReader.seek(4);
bufferReader.read(b, 0, 8388608); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 1048576, 7340032); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 2097152, 6291456); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 3145728, 5242880); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 4194304, 4194304); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 5242880, 3145728); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 6291456, 2097152); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 7340032, 1048576); // b.size = 8388608
bufferReader.read(b, 0, 8388608); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 1048576, 7340032); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 2097152, 6291456); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 3145728, 5242880); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 4194304, 4194304); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 5242880, 3145728); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 6291456, 2097152); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 7340032, 1048576); // b.size = 8388608
bufferReader.read(b, 0, 8388608); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 1048576, 7340032); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 2097152, 6291456); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 3145728, 5242880); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 4194304, 4194304); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 5242880, 3145728); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 6291456, 2097152); // b.size = 8388608
bufferReader.available();
bufferReader.read(b, 7340032, 1048576); // b.size = 8388608
bufferReader.read(b, 0, 8387176); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 1048576, 7338600); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 2097152, 6290024); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 3145728, 5241448); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 4194304, 4192872); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 5242880, 3144296); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 6291456, 2095720); // b.size = 8387176
bufferReader.available();
bufferReader.read(b, 7340032, 1047144); // b.size = 8387176

上面的调用链路看起来很有规律的样子,在单元测试中,我使用如上的调用链路成功复现了异常,输出日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2021-11-27 21:36:36,587 INFO  nat.NativeOssFileSystem (NativeOssFileSystem.java:open(435)) - Opening '/tmp/part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000' for reading
2021-11-27 21:36:37,217 INFO nat.BufferReader (BufferReader.java:updateInnerStream(357)) - Closed previous input stream.
2021-11-27 21:36:37,217 INFO nat.BufferReader (BufferReader.java:updateInnerStream(359)) - Opening key 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000' for reading at position '4'.
2021-11-27 21:36:38,982 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.13
2021-11-27 21:36:41,051 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.25
2021-11-27 21:36:42,384 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.38
2021-11-27 21:36:43,927 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.5
2021-11-27 21:36:45,997 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.63
2021-11-27 21:36:47,445 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.75
2021-11-27 21:36:49,223 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 [4:...]' is 0.88
java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: length == 261253 off == 1835900 buffer length == 2097152
at com.aliyun.fs.oss.nat.BufferReader$ConcurrentReader.fetchData(BufferReader.java:575)
at com.aliyun.fs.oss.nat.BufferReader$ConcurrentReader.execute(BufferReader.java:479)
at com.aliyun.fs.oss.utils.Task.run(Task.java:43)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.lang.ArrayIndexOutOfBoundsException: length == 261253 off == 1835900 buffer length == 2097152
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:161)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:200)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
at java.base/java.util.zip.CheckedInputStream.read(CheckedInputStream.java:83)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
at com.aliyun.oss.event.ProgressInputStream.read(ProgressInputStream.java:118)
at java.base/java.util.zip.CheckedInputStream.read(CheckedInputStream.java:83)
at com.aliyun.fs.oss.nat.BufferReader$ConcurrentReader.fetchData(BufferReader.java:561)
... 5 more
2021-11-27 21:37:01,021 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 21:37:11,266 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 21:37:21,513 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 21:37:31,795 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 21:37:42,049 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 21:37:52,305 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 21:38:02,549 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3

从日志可以看出,异步读取的线程发生了数组索引越界异常,而 BufferReader 一直在等待异步线程将数据读取完毕,很明显,BufferReader 永远等不到这一刻了…

其中日志中的这一行引起了我的注意:

1
2021-11-27 21:36:37,217 INFO  nat.BufferReader (BufferReader.java:updateInnerStream(359)) - Opening key 'part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000' for reading at position '4'.

即从 position: 4 开始读取,与代码里的 bufferReader.seek(4) 相匹配,而之前我测试没有复现出异常问题的读取代码中是没有 seek() 方法调用的,这使我开始怀疑这个方法与异常存在着某种联系,于是我注释掉了 bufferReader.seek(4) 这一行再次运行单元测试,果然,没有异常出现。那么为什么 seek(4) 调用后就会触发数组下标异常呢?读取文件为何要调用 seek(4) 呢?于是我再次查看了之前 sleep 时的线程转储,发现该文件读取操作是由 parquet 包下的相关类发起,于是查询了该文件所属的 Hive 表,发现果然是 parquet 格式的,根据 Apache Parquet 中的描述我们可以知道,parquet 的文件格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
4-byte magic number "PAR1"
<Column 1 Chunk 1 + Column Metadata>
<Column 2 Chunk 1 + Column Metadata>
...
<Column N Chunk 1 + Column Metadata>
<Column 1 Chunk 2 + Column Metadata>
<Column 2 Chunk 2 + Column Metadata>
...
<Column N Chunk 2 + Column Metadata>
...
<Column 1 Chunk M + Column Metadata>
<Column 2 Chunk M + Column Metadata>
...
<Column N Chunk M + Column Metadata>
File Metadata
4-byte length in bytes of file metadata
4-byte magic number "PAR1"

即前 4 个字节为魔数,然后就是各个列各个块的数据,这与我们捕捉到的读取调用链相符合,然后我又查看了文件 part-00028-e2cd9234-e0f8-488b-8ae7-1853e227bb01-c000 的大小,为 33554434 字节,即比 32MiB 多两个字节,如果我们不调用 seek() 方法直接读取整个文件,则会读取 33554434 个字节,而如果我们调用 seek(4) 方法跳过前 4 个字节后再读取该文件,则会读取 33554434 - 4 = 33554430 个字节,即比 32MiB 少两个字节。为了验证我的想法,我生成了一个大小为 33554430 的文件,然后直接使用 instream.read(new byte[33554430], 0, 33554430); 读取整个文件,异常复现了,这次没有 seek(4) 方法调用,输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2021-11-27 22:01:36,758 INFO  nat.NativeOssFileSystem (NativeOssFileSystem.java:open(435)) - Opening '/tmp/33554430' for reading
2021-11-27 22:01:39,007 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.13
2021-11-27 22:01:40,441 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.25
2021-11-27 22:01:42,380 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.38
2021-11-27 22:01:44,223 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.5
2021-11-27 22:01:45,659 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.63
2021-11-27 22:01:47,298 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.75
2021-11-27 22:01:49,048 INFO nat.BufferReader (BufferReader.java:progressPrint(434)) - Current progress of reading '33554430 [0:...]' is 0.88
java.io.IOException: java.lang.ArrayIndexOutOfBoundsException: length == 261253 off == 1835900 buffer length == 2097152
at com.aliyun.fs.oss.nat.BufferReader$ConcurrentReader.fetchData(BufferReader.java:575)
at com.aliyun.fs.oss.nat.BufferReader$ConcurrentReader.execute(BufferReader.java:479)
at com.aliyun.fs.oss.utils.Task.run(Task.java:43)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.lang.ArrayIndexOutOfBoundsException: length == 261253 off == 1835900 buffer length == 2097152
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:161)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:139)
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:200)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:137)
at java.base/java.util.zip.CheckedInputStream.read(CheckedInputStream.java:83)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:133)
at com.aliyun.oss.event.ProgressInputStream.read(ProgressInputStream.java:118)
at java.base/java.util.zip.CheckedInputStream.read(CheckedInputStream.java:83)
at com.aliyun.fs.oss.nat.BufferReader$ConcurrentReader.fetchData(BufferReader.java:561)
... 5 more
2021-11-27 22:02:00,711 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 22:02:10,948 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 22:02:21,197 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 22:02:31,458 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3
2021-11-27 22:02:41,720 WARN nat.BufferReader (BufferReader.java:read(284)) - waiting for fetching oss data at half-1, has completed 3

这证明了异常与 seek(4) 没有直接关系,而是与读取的文件长度存在某种联系,文件大小为 33554430 字节时直接读取文件会触发异常,33554430 等于 32MiB 减去两个字节,即非常接近 32MB,于是我阅读了 BufferReader 的代码,因为几乎没有注释,所以是跟着调试读完这个类的代码的,记录的注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.aliyun.fs.oss.nat;

import com.aliyun.fs.oss.common.NativeFileSystemStore;
import com.aliyun.fs.oss.utils.Task;
import com.aliyun.fs.oss.utils.TaskEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;

import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

/**
* thread1 thread2 thread2^m
* | | |
* / / /
* [0][1][2][3][4][5][6][7]...|...[2^n-6][2^n-5][2^n-4][2^n-3][2^n-2][2^n-1]
*/
public class BufferReader {
public static final Log LOG = LogFactory.getLog(BufferReader.class);

private NativeFileSystemStore store;
private int concurrentStreams;
private TaskEngine taskEngine;
private Configuration conf;
private int bufferSize;
private String key;
private byte[] buffer = null;
private Task[] readers;
private int[] splitContentSize;
private AtomicInteger halfReading = new AtomicInteger(0); // 当前应该读取 half-0 还是 half-1
private AtomicInteger ready0 = new AtomicInteger(0);
private AtomicInteger ready1 = new AtomicInteger(0);
private boolean closed = false;
private int cacheIdx = 0; // half-0 或 half-1 在当前应该读取的 buffer 数组的字节索引,exclusive
private int splitSize = 0;
private long fileContentLength;
private long pos = 0;
private boolean squeezed0 = false; // half-0 是否被压扁
private boolean squeezed1 = false; // half-1 是否被压扁
private int realContentSize = 0;
private double lastProgress = 0.0d; // 上次读取到的进度,用于判断本次读取的进度与上次是否相差 10% 以决定是否打印进度
private AtomicInteger halfConsuming = new AtomicInteger(1); // 此处初始化为 1 为至少先读取一个 half
private int algorithmVersion; // 版本 1 为多线程并行读取,版本 2 为调用 com.aliyun.fs.oss.common.NativeFileSystemStore.retrieve(java.lang.String, long) 读取
private InputStream in;
private long lengthToFetch; // 需要获取的字节数,如果 seek 过,就减去了 seek 的字节数
private long instreamStart = 0;

public BufferReader(NativeFileSystemStore store, String key,
Configuration conf, int algorithmVersion) throws IOException {
this.store = store;
this.key = key;
this.conf = conf;
this.fileContentLength = store.retrieveMetadata(key).getLength(); // 获取到该 key 对应文件的大小
this.algorithmVersion = algorithmVersion;
if (store.retrieveMetadata(key).getLength() < 5 * 1024 * 1024) {
// 如果文件大小小于 5MB, 则使用 version = 2 的算法,即调用 com.aliyun.fs.oss.common.NativeFileSystemStore.retrieve(java.lang.String, long) 直接读取
this.algorithmVersion = 2;
}
prepareBeforeFetch();
}

private void prepareBeforeFetch() throws IOException {
if (algorithmVersion == 1) {
this.lengthToFetch = fileContentLength - pos; // 若未 seek 过,则 lengthToFetch 的值为文件大小,若 seek 过,则 lengthToFetch 的值为文件大小减去 seek 的值
this.bufferSize = lengthToFetch < 16 * 1024 * 1024 ? 1024 * 1024 :
(lengthToFetch > 1024 * 1024 * 1024 ? 64 * 1024 * 1024 :
(int) (lengthToFetch / 16)); // 根据文件大小计算缓冲区大小,如果文件大小小于 16MB, 则缓冲区大小为 1MB, 如果文件大小大于 1GB, 则缓冲区大小为 64MB, 其余情况缓冲区大小为文件大小除以 16
if (Math.log(bufferSize) / Math.log(2) != 0) {
// 如果缓冲区大小不是 2 的 n 次幂,则将缓冲区大小对其至 2 的 n 次幂
int power = (int) Math.ceil(Math.log(bufferSize) / Math.log(2));
this.bufferSize = (int) Math.pow(2, power);
}

if (buffer == null) {
buffer = new byte[bufferSize];
}
this.concurrentStreams = conf.getInt("fs.oss.reader.concurrent.number", 4); // 默认情况下并发流的数量为 4
if ((Math.log(concurrentStreams) / Math.log(2)) != 0) {
// 如果配置的并发流的数量不是 2 的 n 次幂,则将并发流的数量对其至 2 的 n 次幂
int power = (int) Math.ceil(Math.log(concurrentStreams) / Math.log(2));
this.concurrentStreams = (int) Math.pow(2, power);
}
this.readers = new ConcurrentReader[concurrentStreams]; // 申请数组用于存储 ConcurrentReader 的实例
this.splitContentSize = new int[concurrentStreams * 2]; // 因为每个并发流读取两个 half, 所以申请 splitContentSize 数组的大小为 concurrentStreams * 2
this.splitSize = bufferSize / concurrentStreams / 2; // 每个 half 对应的 size, 即使用 bufferSize 除以流的数量再除以 2

initializeTaskEngine(); // 创建并发流实例并提交至线程池异步执行读取任务
} else {
in = store.retrieve(key, pos);
}
}

private void initializeTaskEngine() {
for (int i = 0; i < concurrentStreams; i++) {
try {
readers[i] = new ConcurrentReader(i);
} catch (FileNotFoundException e) {
LOG.error(e);
}
}
this.taskEngine = new TaskEngine(Arrays.asList(this.readers),
concurrentStreams, concurrentStreams);
this.taskEngine.executeTask();
}

public void close() {
LOG.info("Closing input stream for '" + key + "'.");
closed = true;
try {
if (algorithmVersion == 1) {
taskEngine.shutdown();
} else {
if (in != null) {
in.close();
in = null;
}
}
} catch (IOException e) {
LOG.error("Failed to close input stream.", e);
} finally {
buffer = null;
}
}

public synchronized int read() throws IOException {
if (algorithmVersion == 1) {
while (true) {
if (halfReading.get() == 0) {
int i = 0;
while (!(ready0.get() == concurrentStreams)) { // 等待所有的 reader 线程把各自负责的 half-0 读取完毕
i++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("Something wrong, keep waiting.");
}
if (i % 100 == 0) {
LOG.warn("waiting for fetching oss data at half-0, has completed "
+ ready0.get());
}
}
if (!squeezed0) { // 如果 half-0 没有被压扁,则压扁 half-0, squeezed 为挤压、压扁的意思
realContentSize = squeeze(); // 触发压扁操作,即累加所有 reader 当前 half-0 的大小并赋值给 realContentSize
squeezed0 = true; // 标记 half-0 已经被压扁
squeezed1 = false; // 标记 half-1 没有被压扁
progressPrint(); // 尝试打印一下进度
}

// read data from buffer half-0
if (pos >= fileContentLength) {
close();
return -1;
} else if (cacheIdx < realContentSize) {
// 将所有 reader half-0 的对应的数组部分赋值给数组 b
int ret = buffer[cacheIdx];
cacheIdx++;
pos++;
return ret;
} else {
ready0.set(0); // cacheIdx >= realContentSize 即已经把 half-0 读取完毕,此时将 ready0 设置为 0
halfReading.set(1); // 设置 halfReading 为 1, 以便下次读取 half-1
cacheIdx = 0; // 将 cacheIdx 置为 0, 以便下次计算 half-1 开始读取的偏移量
halfConsuming.addAndGet(1); // 将 halfConsuming 加 1 以使各个 reader 继续读取数据
}
} else {
int i = 0;
while (!(ready1.get() == concurrentStreams)) { // 等待所有的 reader 线程把各自负责的 half-1 读取完毕
i++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("Something wrong, keep waiting.");
}
if (i % 100 == 0) {
LOG.warn("waiting for fetching oss data at half-1, has completed "
+ ready1.get());
}
}
if (!squeezed1) { // 如果 half-1 没有被压扁,则压扁 half-1, squeezed 为挤压、压扁的意思
realContentSize = squeeze(); // 触发压扁操作,即累加所有 reader 当前 half-1 的大小并赋值给 realContentSize
squeezed0 = false; // 标记 half-0 没有被压扁
squeezed1 = true; // 标记 half-1 已经被压扁
progressPrint(); // 尝试打印一下进度
}

// read data from buffer half-1
if (pos >= fileContentLength) {
close();
return -1;
} else if (cacheIdx < realContentSize) {
int ret = buffer[bufferSize / 2 + cacheIdx];
cacheIdx++;
return ret;
} else {
ready1.set(0);
halfReading.set(0);
cacheIdx = 0;
halfConsuming.addAndGet(1);
}
}
}
} else {
int result = in.read();
if (result != -1) {
pos++;
}

return result;
}
}

public synchronized int read(byte[] b, int off, int len) throws IOException {
if (algorithmVersion == 1) {
while (true) {
if (halfReading.get() == 0) {
int j = 0;
while (!(ready0.get() == concurrentStreams)) { // 等待所有的 reader 线程把各自负责的 half-0 读取完毕
j++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("Something wrong, keep waiting.");
}
if (j % 100 == 0) {
LOG.warn("waiting for fetching oss data at half-0, has completed "
+ ready0.get());
}
}
if (!squeezed0) { // 如果 half-0 没有被压扁,则压扁 half-0, squeezed 为挤压、压扁的意思
realContentSize = squeeze(); // 触发压扁操作,即累加所有 reader 当前 half-0 的大小并赋值给 realContentSize
squeezed0 = true; // 标记 half-0 已经被压扁
squeezed1 = false; // 标记 half-1 没有被压扁
progressPrint(); // 尝试打印一下进度
}

// read data from buffer half-0
int size = 0;
if (pos >= fileContentLength) {
close();
return -1;
} else if (cacheIdx < realContentSize) {
// 将所有 reader half-0 的对应的数组部分赋值给数组 b
for (int i = 0; i < len && cacheIdx < realContentSize; i++) {
b[off + i] = buffer[cacheIdx];
cacheIdx++;
pos++;
size++;
}
return size;
} else {
ready0.set(0); // cacheIdx >= realContentSize 即已经把 half-0 读取完毕,此时将 ready0 设置为 0
halfReading.set(1); // 设置 halfReading 为 1, 以便下次读取 half-1
cacheIdx = 0; // 将 cacheIdx 置为 0, 以便下次计算 half-1 开始读取的偏移量
halfConsuming.addAndGet(1); // 将 halfConsuming 加 1 以使各个 reader 继续读取数据
}
} else {
int j = 0;
while (!(ready1.get() == concurrentStreams)) { // 等待所有的 reader 线程把各自负责的 half-1 读取完毕
j++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("Something wrong, keep waiting.");
}
if (j % 100 == 0) {
LOG.warn("waiting for fetching oss data at half-1, has completed "
+ ready1.get());
}
}
if (!squeezed1) { // 如果 half-1 没有被压扁,则压扁 half-1, squeezed 为挤压、压扁的意思
realContentSize = squeeze(); // 触发压扁操作,即累加所有 reader 当前 half-1 的大小并赋值给 realContentSize
squeezed0 = false; // 标记 half-0 没有被压扁
squeezed1 = true; // 标记 half-1 已经被压扁
progressPrint(); // 尝试打印一下进度
}

// read data from buffer half-1
int size = 0;
if (pos >= fileContentLength) {
close();
return -1;
} else if (cacheIdx < realContentSize) {
for (int i = 0; i < len && cacheIdx < realContentSize; i++) {
b[off + i] = buffer[bufferSize / 2 + cacheIdx];
cacheIdx++;
pos++;
size++;
}
return size;
} else {
ready1.set(0);
halfReading.set(0);
cacheIdx = 0;
halfConsuming.addAndGet(1);
}
}
}
} else {
int result = in.read(b, off, len);
if (result > 0) {
pos += result;
}

return result;
}
}

public synchronized void seek(long newpos) throws IOException {
if (newpos < 0) {
throw new EOFException("negative seek position: " + newpos);
} else if (newpos > fileContentLength) {
throw new EOFException("Cannot seek after EOF, contentLength:" +
fileContentLength + " position:" + newpos);
}

if (pos != newpos) {
// the seek is attempting to move to the current position
updateInnerStream(newpos);
}
}

private synchronized void updateInnerStream(long newpos) throws IOException {
this.pos = newpos;
this.instreamStart = newpos;
try {
if (algorithmVersion == 1) {
closed = true;
taskEngine.shutdown();
closed = false;
} else {
if (in != null) {
in.close();
in = null;
}
}
} catch (IOException e) {
LOG.error("Failed to close input stream.", e);
}
LOG.info("Closed previous input stream.");
reset();
LOG.info("Opening key '" + key + "' for reading at position '"
+ newpos + "'.");
prepareBeforeFetch();
}

private void reset() {
halfReading.set(0);
ready0.set(0);
ready1.set(0);
cacheIdx = 0;
squeezed0 = false;
squeezed1 = false;
realContentSize = 0;
lastProgress = 0.0d;
halfConsuming.set(1);
}

private int squeeze() {
int totalSize = 0;
int begin;
if (halfReading.get() == 0) {
// 此时应该读取 half-0
for (int i = 0; i < concurrentStreams; i++) {
totalSize += splitContentSize[i]; // 将所有 reader 中 half-0 的大小累加至 totalSize
}
begin = 0;

int cacheIdx;
if (totalSize != bufferSize / 2) {
// 处理所有 reader 中 half-0 的容量之和不为 bufferSize / 2 的情况,通常为文件尾部不足一组 half 的部分
cacheIdx = splitContentSize[0];
for (int i = 1; i < concurrentStreams; i++) {
for (int j = 0; j < splitContentSize[i]; j++) {
buffer[begin + cacheIdx] = buffer[begin + splitSize * i + j]; // 将 buffer 中的数据向左移,因为中间有空隙
cacheIdx++;
}
}
}
} else {
// 此时应该读取 half-1
for (int i = 0; i < concurrentStreams; i++) {
totalSize += splitContentSize[concurrentStreams + i];
}
begin = bufferSize / 2;

int cacheIdx;
if (totalSize != bufferSize / 2) {
// 处理所有 reader 中 half-1 的容量之和不为 bufferSize / 2 的情况,通常为文件尾部不足一组 half 的部分
cacheIdx = splitContentSize[concurrentStreams];
for (int i = 1; i < concurrentStreams; i++) {
for (int j = 0; j < splitContentSize[concurrentStreams + i]; j++) {
buffer[begin + cacheIdx] = buffer[begin + splitSize * i + j]; // 将 buffer 中的数据向左移,因为中间有空隙
cacheIdx++;
}
}
}
}

return totalSize;
}

public long getPos() {
return pos;
}

public synchronized int available() throws IOException {
long remaining = fileContentLength - pos;
if (remaining > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
return (int)remaining;
}

private void progressPrint() {
long hasRead = pos + realContentSize - instreamStart;
double currentProgress = hasRead >= lengthToFetch ? 1.0d :
(double) hasRead / lengthToFetch;
if (currentProgress - lastProgress >= 0.1 || currentProgress == 1.0d) { // 单次读取进度超过 10% 或者读取完毕时会打印一次
BigDecimal b = new BigDecimal(currentProgress);
LOG.info("Current progress of reading '" + key +
" [" + instreamStart + ":...]' is " +
b.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue());
lastProgress = currentProgress;
}
}

private class ConcurrentReader extends Task {
private final Log LOG = LogFactory.getLog(ConcurrentReader.class);
int halfFetched = 1; // 读取了几个 half, 初始化为 1 是对应 preRead 读取的首个 half
private Boolean preRead = true;
private int readerId = -1; // 每个 reader 的 id
private boolean half0Completed = false; // 该变量用于控制当前应该填充 half-0 还是 half-1
private boolean half1Completed = false; // 该变量用于控制当前应该填充 half-0 还是 half-1
private int half0StartPos = -1;
private int half1StartPos = -1;
private int length = -1;
private boolean _continue = true;

public ConcurrentReader(int readerId) throws FileNotFoundException {
assert (bufferSize % 2 == 0);
assert (concurrentStreams % 2 == 0);
this.readerId = readerId; // 每个 reader 的 id
this.length = bufferSize / (2 * concurrentStreams); // 每个 half 的 size
assert (concurrentStreams * length * 2 == bufferSize);

this.half0StartPos = readerId * length; // half-0 的起始索引
this.half1StartPos = bufferSize / 2 + readerId * length; // half-1 的起始索引,注意 half-1 从 buffer 的中点开始
}

@Override
public void execute(TaskEngine engineRef) throws IOException {
int i = 0;
while (!closed && _continue) {
if (preRead) {
// 因 preRead 初始值为 true, 即首次进入时从各自 reader 的 half0StartPos 开始读取
// fetch oss data for half-0 at the first time, as there is
// no data in buffer.
_continue = fetchData(half0StartPos);
half0Completed = true; // 标记 half-0 已经读取完成
half1Completed = false; // 标记 half-1 未读取完成,以便下次读取 half-1
ready0.addAndGet(1); // 增加可用的 half-0 数量
preRead = false; // 标记不再预读,避免 while 再次进入该 if 块读取 half-0
} else if ((halfFetched <= halfConsuming.get())
&& (halfFetched % 2 == 1) && !half1Completed) {
// fetch oss data for half-1
_continue = fetchData(half1StartPos);
half1Completed = true;
half0Completed = false; // 标记 half-0 未读取完成,以便下次读取 half-0
ready1.addAndGet(1); // 增加可用的 half-1 数量
halfFetched++; // 增加已经读取的 half 数量
} else if (halfFetched <= halfConsuming.get()
&& (halfFetched % 2 == 0) && !half0Completed) {
// fetch oss data for half-0
_continue = fetchData(half0StartPos);
half0Completed = true;
half1Completed = false; // 标记 half-1 未读取完成,以便下次读取 half-1
ready0.addAndGet(1); // 增加可用的 half-0 数量
halfFetched++; // 增加已经读取的 half 数量
} else {
// 当读取的 half 块数量 halfFetched 大于 halfConsuming 时,说明此时有剩余,将不进行预读,等待数据被消耗后再读取
i++;
// waiting for `halfReading` block data to be consumed
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if (i % 600 == 0) {
LOG.info("[ConcurrentReader-" + readerId + "] waiting for " +
"consuming cached data.");
}
}
}
}

// startPos 为 buffer 数组中的索引
private boolean fetchData(int startPos) throws IOException {
boolean _continue = true; // 用于返回该方法调用后是否应该继续读取
if (startPos == half0StartPos) {
splitContentSize[readerId] = 0; // 重置本次读取 half 部分的值为 0
} else {
splitContentSize[concurrentStreams + readerId] = 0; // 重置本次读取 half 部分的值为 0
}
long newPos;
int fetchLength; // 本次需要 fetch 的数据长度
if (preRead && bufferSize / 2 >= lengthToFetch) {
_continue = false;
fetchLength = (int) lengthToFetch / concurrentStreams;
newPos = instreamStart + fetchLength * readerId;
if (readerId == (concurrentStreams - 1)) {
fetchLength = (int) lengthToFetch - fetchLength *
(concurrentStreams - 1);
}
} else if (preRead) {
fetchLength = bufferSize / (2 * concurrentStreams); // 此时计算出的 fetchLength 为 half-0 需要读取的大小
newPos = instreamStart + fetchLength * readerId; // newPos 为本次从 OSS 读取的起始索引,注意根据 readerId 进行偏移
} else if ((long) (halfFetched + 1) * bufferSize / 2 >= lengthToFetch) { // 当下一个 half 的起始位置超过 lengthToFetch 时,即当前要读取的 half 属于最后一个 half
_continue = false; // 当前 half 属于最后一组,读取后不再读取
fetchLength =
(int) (lengthToFetch - (long) halfFetched * bufferSize / 2) /
concurrentStreams; // 此处除不尽时可能多出 [1, concurrentStreams - 1] 个字节,而多出的这部分字节,需要保证 buffer 预留给 half-1 的空间能够存储
newPos =
instreamStart + (long) halfFetched * bufferSize / 2 +
readerId * fetchLength;
if (readerId == (concurrentStreams - 1)) {
fetchLength =
(int) (lengthToFetch - (long) halfFetched * bufferSize / 2 -
(fetchLength * (concurrentStreams - 1))); // 之前除不尽的部分都让最后一个 reader 负责读取
}
} else {
fetchLength = bufferSize / (2 * concurrentStreams); // 对应一个 half 的数据长度
newPos = instreamStart + (long) halfFetched * bufferSize / 2 +
readerId * fetchLength; // buffer 中点当前 readerId half-1 块在 OSS 文件中的起始索引
}
InputStream in;
try {
in = store.retrieve(key, newPos, fetchLength);
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
throw new IOException("[ConcurrentReader-" + readerId + "] Cannot " +
"open oss input stream");
}

int off = startPos; // off 为 buffer 数组需要填充数据的起始索引,即 half-0 或 half-1 在 buffer 数组中的起始索引
int tries = 10;
int result;
boolean retry = true;
int hasRead = 0;
do {
try {
result = in.read(buffer, off, fetchLength - hasRead);
if (result > 0) {
off += result;
hasRead += result;
} else if (result == -1) {
break;
}
retry = hasRead < fetchLength;
} catch (EOFException e0) {
LOG.warn(e0.getMessage(), e0);
throw e0;
} catch (Exception e1) {
tries--;
if (tries == 0) {
throw new IOException(e1);
}

try {
Thread.sleep(100);
} catch (InterruptedException e2) {
LOG.warn(e2.getMessage());
}
if (in != null) {
try {
in.close();
} catch (Exception e) {
// do nothing
} finally {
in = null;
}
}
try {
in = store.retrieve(key, newPos, fetchLength);
} catch (Exception e) {
LOG.warn(e.getMessage(), e);
throw new IOException("[ConcurrentReader-" + readerId + "] " +
"Cannot open oss input stream", e);
}
off = startPos;
hasRead = 0;
}
} while (tries > 0 && retry);
in.close();
if (startPos == half0StartPos) {
splitContentSize[readerId] = hasRead; // 将已经读取的字节数存储至 splitContentSize half-0 对应的位置上
} else {
splitContentSize[concurrentStreams + readerId] = hasRead; // 将已经读取的字节数存储至 splitContentSize half-1 对应的位置上
}

return _continue;
}
}
}

BufferReader 的核心思想如下,根据文件大小创建一个缓冲区 buffer,在文件大小为 33554430 时,这个缓冲区的大小为 2097152 即 2MB,在默认配置下,会创建 4 个 ConcurrentReader 即 4 个读取线程去异步读取文件,我们将这四个线程命名为 reader-0, reader-1, reader-2, reader-3,其中,每个线程负责读取两个 half 的数据,我们将这两个 half 命名为 half-0, half-1,缓冲区最大容纳 8 个 half,给每个 half 划分的缓冲区可用空间大小为 2097152 / 8 = 262144,即对应代码中的 splitSize 字段,可以用如下示意图描述各个 reader 线程在缓冲区中各自负责的部分:

1
2
3
-------------------------- buffer --------------------------
r0h0 r1h0 r2h0 r3h0 r0h1 r1h1 r2h1 r3h1
-------------------------- buffer --------------------------

那么对于文件大小 33554430,每个 reader 需要读取 16 组 half,即 16 次 half-0 和 16 次 half-1,在本地调试过程中,我发现抛异常的都为 reader-3,且均为 reader-3 读取 half-1 时,对应图中 buffer 中的最右侧,那么这一块有什么问题呢?此时就要引出最关键的为每个 half 计算需要从流中获取数据的长度的代码,其中造成数组越界最关键的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
else if ((long) (halfFetched + 1) * bufferSize / 2 >= lengthToFetch) {
_continue = false;
fetchLength =
(int) (lengthToFetch - (long) halfFetched * bufferSize / 2) /
concurrentStreams;
newPos =
instreamStart + (long) halfFetched * bufferSize / 2 +
readerId * fetchLength;
if (readerId == (concurrentStreams - 1)) {
fetchLength =
(int) (lengthToFetch - (long) halfFetched * bufferSize / 2 -
(fetchLength * (concurrentStreams - 1)));
}
}

即在当前 half 的下一个 half 超出 lengthToFetch 时,在本例中,为 reader-3 即将读取 half1 时,即将读取最后一个 half,因为 2MiB 的 buffer 中单个 reader 需要读取 half-0half-1 两个 half,所以在读取最后一个 half1 时,reader-3 已经读取了 31 个 half,此时将为 reader-3half-1 计算 fetchLength。从上面的代码中,我们知道,如果此时是 reader-0reader-1reader-2,此时 fetchLength = (int) (lengthToFetch - (long) halfFetched * bufferSize / 2) / concurrentStreams = (33554430 - 31 * 2097152 / 2) / 4 = 1048574 / 4 = 262143.5 = 262143,当此时是 reader-3 时,fetchLength 会被重新计算为 fetchLength = (int) (lengthToFetch - (long) halfFetched * bufferSize / 2 - (fetchLength * (concurrentStreams - 1))) = 33554430 - 31 * 2097152 / 2 - (262143 * (4 - 1)) = 262145。为什么需要对 reader-3 重新计算呢?是因为使用剩余文件长度除以 reader 的个数时,可能存在除不尽的情况,那么对于前面几个 reader,负责读取的长度为抹掉小数点的值,即刚刚提到的 261243 字节,这部分除不尽的的小数部分最后都需要由 reader-3 来读取,即刚刚计算出的 262145 字节,在这里,就超出了预先给每个 half 分配的 splitSize 的大小,即超出了 262144,所以,reader-3 在使用 fetchLength = 262145 去流中读取数据时,底层流在读取前会检查传入的 buffer 能否容纳传入的 fetchLength,因为不能通过检查,就触发了数组索引越界异常,即我们之前看到的异常。

看到这里其实已经知道了问题所在,在定位问题的原因之后,解决方案有多种,我认为最简单的解决方案即在为 buffer 分配空间时多分配 concurrentStreams - 1 个字节,以存储因 concurrentStreams - 1 除不尽 concurrentStreams 这部分字节,因为这部分字节由最后一个 readerhalf-1 读取。当然,也可以将整个 BufferReader 的设计进行一定的调整来解决该问题,此处不再一一分析。

经过该问题,我想到之前 Binary Search 一文中提到的观点:即使是最小的代码段也很难正确编写。单元测试无法覆盖所有的输入值也是一个原因,我们在程序设计时需要对各种输入值进行尽量完善的考虑以尝试避免类似问题的出现。

Reference

Fix ArrayIndexOutOfBoundsException in BufferReader by tianshuang · Pull Request #557 · aliyun/aliyun-emapreduce-datasources · GitHub