Fix TSan data race with FFI tests by myrrc · Pull Request #7244 · vortex-data/vortex

There are TSan data race reports like

https://github.com/vortex-data/vortex/actions/runs/23804465237/job/69373850775?pr=7018

Significant parts of stack trace

Read of size 8 at 0xffff888084a0 by thread T40:

#8 vx_array_sink_open_file::{closure#0}::{closure#0} vortex-ffi/src/sink.rs:75
#9 <vortex_io::runtime::handle::Handle>::spawn::<vx_array_sink_open_file::{closure#0}::{closure#0}, ...> vortex-io/src/runtime/handle.rs:73
#31 vx_array_sink_close::{closure#0} vortex-ffi/src/sink.rs:110
#33 vx_array_sink_close vortex-ffi/src/sink.rs:106
#34 test_sink_basic_workflow vortex-ffi/src/sink.rs:166

Previous write of size 8 at 0xffff888084a0 by thread T42:

#28 <vortex_io::runtime::current::CurrentThreadRuntime>::block_on::<vx_array_sink_close::{closure#0}::{closure#0}, ...> vortex-io/src/runtime/current.rs:102
#29 vx_array_sink_close::{closure#0} vortex-ffi/src/sink.rs:110
#31 vx_array_sink_close vortex-ffi/src/sink.rs:106
#32 test_sink_multiple_arrays

Location is heap block of size 112 at 0xffff88808490 allocated by thread T42:

#13 <vortex_file::writer::VortexWriteOptions>::write::<&mut async_fs::File, ...>::{closure#0} vortex-file/src/writer.rs:137
#14 vx_array_sink_open_file::{closure#0}::{closure#0} vortex-ffi/src/sink.rs:75
#15 <vortex_io::runtime::handle::Handle>::spawn::<vx_array_sink_open_file::{closure#0}::{closure#0}, ...>::{closure#0} vortex-io/src/runtime/handle.rs:73
#37 vx_array_sink_close::{closure#0} vortex-ffi/src/sink.rs:110
#40 test_sink_multiple_arrays vortex-ffi/src/sink.rs:208

which happens in sink FFI tests but can be narrowed down to

fn test(path: String) {
    let session = VortexSession::default().with_handle(RUNTIME.handle());
    let dtype = DType::Primitive(vortex::dtype::PType::I32, false.into());
    let (mut sink, rx) = mpsc::channel(32);
    let array_stream = ArrayStreamAdapter::new(dtype.clone(), rx.into_stream());

    let array = PrimitiveArray::new(buffer![3i32], Validity::NonNullable);
    RUNTIME.block_on(async move {
        sink.send(Ok(array.into())).await.unwrap();
        sink.close_channel();

        let mut file = File::create(path).await.unwrap();
        session
            .write_options()
            .write(&mut file, array_stream)
            .await.unwrap();
        file.sync_all()
            .await
            .map_err(|e| vortex_err!("sync error: {e}")).unwrap();
    });
}
#[test] fn test_f1() { test("1".to_string()); }
#[test] fn test_f2() { test("2".to_string()); }

The underlying issue is that RUNTIME in FFI is a smol::Executor, and as we don't
have a background thread pool, and we use CurrentThreadRuntime, it runs on host
program's threads, in our case, cargo test's. If t1 and t2 are scheduled to
run on different threads, this triggers TSan in vortex-io's spawn() (case 1,
oneshot) and in vortex-file write_internal (case 2, kanal::bounded_async) which
don't protect multiple different thread consumers to read the state.

The solution here is to benchmark replacing oneshot with futures-rs oneshot
channel