Skip to content

Commit 44711e3

Browse files
committed
GH-49697: [C++][CI] Check IPC file body bounds are in sync with decoder outcome
1 parent 35fb62e commit 44711e3

File tree

3 files changed

+10
-5
lines changed

3 files changed

+10
-5
lines changed

cpp/src/arrow/ipc/message.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -423,10 +423,12 @@ static Result<std::unique_ptr<Message>> ReadMessageInternal(
423423
body, file->ReadAt(offset + metadata_length, decoder.next_required_size()));
424424
}
425425

426-
if (body->size() < decoder.next_required_size()) {
427-
return Status::IOError("Expected to be able to read ",
428-
decoder.next_required_size(),
429-
" bytes for message body, got ", body->size());
426+
if (body->size() != decoder.next_required_size()) {
427+
// The streaming decoder got out of sync with the actual advertised
428+
// metadata and body size, which signals an invalid IPC file.
429+
return Status::IOError("Invalid IPC file: advertised body size is ", body->size(),
430+
", but message decoder expects to read ",
431+
decoder.next_required_size(), " bytes instead");
430432
}
431433
RETURN_NOT_OK(decoder.Consume(body));
432434
return result;

cpp/src/arrow/ipc/reader.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
624624
FieldVector filtered_fields;
625625
std::shared_ptr<Schema> filtered_schema;
626626

627+
// TODO factor this out?
627628
for (int i = 0; i < schema->num_fields(); ++i) {
628629
const Field& field = *schema->field(i);
629630
if (!inclusion_mask || (*inclusion_mask)[i]) {
@@ -645,6 +646,8 @@ Result<std::shared_ptr<RecordBatch>> LoadRecordBatchSubset(
645646
}
646647
}
647648

649+
// TODO factor out these steps?
650+
648651
// Dictionary resolution needs to happen on the unfiltered columns,
649652
// because fields are mapped structurally (by path in the original schema).
650653
RETURN_NOT_OK(ResolveDictionaries(columns, *context.dictionary_memo,

0 commit comments

Comments
 (0)