POC: Encryption read support for REST catalog by smaheshwar-pltr · Pull Request #3221 · apache/iceberg-python

@pytest.mark.integration
def test_read_encrypted_table_via_spark(session_catalog: Catalog) -> None:
    table_name = "default.test_encrypted_spark_read"

    # Configure KMS via py-kms-impl property with the same master keys as Java's UnitestKMS
    session_catalog.properties["py-kms-impl"] = "pyiceberg.encryption.kms.InMemoryKms"
    session_catalog.properties["encryption.kms.key.keyA"] = b"0123456789012345".hex()
    session_catalog.properties["encryption.kms.key.keyB"] = b"1123456789012345".hex()

    tbl = session_catalog.load_table(table_name)

    # Verify the table has encryption metadata
    assert tbl.metadata.properties.get("encryption.key-id") == "keyA"
    assert len(tbl.metadata.encryption_keys) > 0, "Expected encryption keys in table metadata"

    if tbl.metadata.current_snapshot_id is not None:
        snapshot = tbl.metadata.snapshot_by_id(tbl.metadata.current_snapshot_id)
        assert snapshot is not None
        assert snapshot.key_id is not None, "Expected key_id on snapshot"

    # Read the encrypted data via PyIceberg
    result = tbl.scan().to_arrow()

    # Verify the data matches what Spark wrote
    assert result.num_rows == 3, f"Expected 3 rows, got {result.num_rows}"

    # Sort by id for deterministic comparison
    result = result.sort_by("id")

    ids = result.column("id").to_pylist()
    data = result.column("data").to_pylist()
    values = result.column("value").to_pylist()

    assert ids == [1, 2, 3], f"Expected ids [1,2,3], got {ids}"
    assert data == ["alice", "bob", "charlie"], f"Expected data ['alice','bob','charlie'], got {data}"
    assert values == [1.0, 2.0, 3.0], f"Expected values [1.0,2.0,3.0], got {values}"