Add support for orc format by MehulBatra · Pull Request #790 · apache/iceberg-python
Expand Up
@@ -168,6 +168,7 @@
ICEBERG_SCHEMA = b"iceberg.schema"
# The PARQUET: in front means that it is Parquet specific, in this case the field_id
PYARROW_PARQUET_FIELD_ID_KEY = b"PARQUET:field_id"
PYARROW_ORC_FIELD_ID_KEY = b"iceberg.id"
PYARROW_FIELD_DOC_KEY = b"doc"
LIST_ELEMENT_NAME = "element"
MAP_KEY_NAME = "key"
Expand Down
Expand Up
@@ -627,6 +628,8 @@ def expression_to_pyarrow(expr: BooleanExpression) -> pc.Expression:
def _get_file_format(file_format: FileFormat, **kwargs: Dict[str, Any]) -> ds.FileFormat:
if file_format == FileFormat.PARQUET:
return ds.ParquetFileFormat(**kwargs)
elif file_format == FileFormat.ORC:
return ds.OrcFileFormat()
else:
raise ValueError(f"Unsupported file format: {file_format}")
Expand Down Expand Up @@ -799,11 +802,12 @@ def primitive(self, primitive: pa.DataType) -> T:
def _get_field_id(field: pa.Field) -> Optional[int]: return ( int(field_id_str.decode()) if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY))) else None ) if field.metadata and (field_id_str := field.metadata.get(PYARROW_ORC_FIELD_ID_KEY)): return int(field_id_str.decode()) elif field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)): return int(field_id_str.decode()) else: return None
class _HasIds(PyArrowSchemaVisitor[bool]): Expand Down Expand Up @@ -912,6 +916,9 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return TimestamptzType() elif primitive.tz is None: return TimestampType() if primitive.unit == "ns": if primitive.tz == "UTC": return TimestamptzType() elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive): return BinaryType() elif pa.types.is_fixed_size_binary(primitive): Expand Down Expand Up @@ -972,8 +979,11 @@ def _task_to_table( name_mapping: Optional[NameMapping] = None, ) -> Optional[pa.Table]: _, _, path = PyArrowFileIO.parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) with fs.open_input_file(path) as fin: if task.file.file_format == FileFormat.PARQUET: arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) if task.file.file_format == FileFormat.ORC: arrow_format = ds.OrcFileFormat() # currently ORC doesn't support any fragment scan options fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema file_schema = pyarrow_to_schema(physical_schema, name_mapping) Expand Down
Expand Down Expand Up @@ -799,11 +802,12 @@ def primitive(self, primitive: pa.DataType) -> T:
def _get_field_id(field: pa.Field) -> Optional[int]: return ( int(field_id_str.decode()) if (field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY))) else None ) if field.metadata and (field_id_str := field.metadata.get(PYARROW_ORC_FIELD_ID_KEY)): return int(field_id_str.decode()) elif field.metadata and (field_id_str := field.metadata.get(PYARROW_PARQUET_FIELD_ID_KEY)): return int(field_id_str.decode()) else: return None
class _HasIds(PyArrowSchemaVisitor[bool]): Expand Down Expand Up @@ -912,6 +916,9 @@ def primitive(self, primitive: pa.DataType) -> PrimitiveType: return TimestamptzType() elif primitive.tz is None: return TimestampType() if primitive.unit == "ns": if primitive.tz == "UTC": return TimestamptzType() elif pa.types.is_binary(primitive) or pa.types.is_large_binary(primitive): return BinaryType() elif pa.types.is_fixed_size_binary(primitive): Expand Down Expand Up @@ -972,8 +979,11 @@ def _task_to_table( name_mapping: Optional[NameMapping] = None, ) -> Optional[pa.Table]: _, _, path = PyArrowFileIO.parse_location(task.file.file_path) arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) with fs.open_input_file(path) as fin: if task.file.file_format == FileFormat.PARQUET: arrow_format = ds.ParquetFileFormat(pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8)) if task.file.file_format == FileFormat.ORC: arrow_format = ds.OrcFileFormat() # currently ORC doesn't support any fragment scan options fragment = arrow_format.make_fragment(fin) physical_schema = fragment.physical_schema file_schema = pyarrow_to_schema(physical_schema, name_mapping) Expand Down