bump object_store and, arrow (52) and datafusion (39) by houqp · Pull Request #341 · roapi/roapi

1,342 changes: 850 additions & 492 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions columnq/Cargo.toml

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "columnq"
version = "0.9.0"
version = "0.9.1"
homepage = "https://github.com/roapi/roapi"
license = "MIT"
authors = ["QP Hou <dave2008713@gmail.com>"]
Expand All @@ -13,9 +13,9 @@ path = "src/lib.rs"
[dependencies]
# pulling arrow-schema manually to enable the serde feature.
# TODO: add serde feature in datafusion to avoid this workaround
arrow-schema = { version = "51", features = ["serde"] }
arrow-schema = { version = "52", features = ["serde"] }

datafusion = "37"
datafusion = "39"
object_store = { version = "0", features = ["aws", "gcp", "azure"] }
percent-encoding = "2.2.0"
url = "2.2"
Expand Down Expand Up @@ -48,15 +48,15 @@ hyper-rustls = { version = "0.25", default-features = false, optional = true }
tokio-postgres = { version = "0.7.8", optional = true }

[dependencies.deltalake]
version = "0.17.3"
version = "0.18.1"
# git = "https://github.com/delta-io/delta-rs.git"
# rev = "63c14b3716428ff65e01404c6f7e62f341c98f05"
features = ["datafusion", "s3", "gcs", "azure"]
default-features = false

[dependencies.connectorx]
git = "https://github.com/roapi/connector-x.git"
rev = "0732ad7efb08fdb4c08793f8942ed2a76406f92a"
rev = "f7ba1c38130e554cdb7dc4e04d7a166e3286d4e7"
version = "0.3.3-alpha.1"
features = ["default", "dst_arrow"]
optional = true
Expand Down

1 change: 1 addition & 0 deletions columnq/src/columnq.rs

Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl ColumnQ {

Ok(())
}

pub fn register_object_storage(
&mut self,
url: &Url,
Expand Down

14 changes: 5 additions & 9 deletions columnq/src/table/csv.rs

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use datafusion::arrow;
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
Expand Down Expand Up @@ -43,18 +42,17 @@ pub async fn to_datafusion_table(
.option
.clone()
.unwrap_or_else(|| TableLoadOption::csv(TableOptionCsv::default()));
if opt
let opt = opt
.as_csv()
.expect("Invalid table format option, expect csv")
.use_memory_table
{
.expect("Invalid table format option, expect csv");
if opt.use_memory_table {
return to_mem_table(t, dfctx).await;
}
let table_url =
ListingTableUrl::parse(t.get_uri_str()).with_context(|_| table::ListingTableUriSnafu {
uri: t.get_uri_str().to_string(),
})?;
let mut options = ListingOptions::new(Arc::new(CsvFormat::default()));
let mut options = ListingOptions::new(Arc::new(opt.as_df_csv_format()));
if let Some(partition_cols) = t.datafusion_partition_cols() {
options = options.with_table_partition_cols(partition_cols)
}
Expand Down Expand Up @@ -96,12 +94,10 @@ pub async fn to_mem_table(
let schema_ref: arrow::datatypes::SchemaRef = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
let fmt = opt.as_arrow_csv_format();
let schemas = partitions_from_table_source!(
t,
|r| {
let fmt = arrow::csv::reader::Format::default()
.with_delimiter(delimiter)
.with_header(has_header);
let (schema, record_count) = fmt
.infer_schema(r, None)
.context(InferSchemaSnafu)
Expand Down

4 changes: 2 additions & 2 deletions columnq/src/table/excel.rs

Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ mod tests {
infer_schema(&range, &TableOptionExcel::default(), &Some(table_schema)).unwrap();

assert_eq!(
schema.all_fields(),
schema.flattened_fields(),
vec![
&Field::new("float_column", DataType::Float64, true),
&Field::new("integer_column", DataType::Int64, true),
Expand Down Expand Up @@ -727,7 +727,7 @@ option:
let rb = excel_range_to_record_batch(range, &TableOptionExcel::default(), shema).unwrap();

assert_eq!(
rb.schema().all_fields(),
rb.schema().flattened_fields(),
vec![
&Field::new("float_column", DataType::Float64, true),
&Field::new("integer_column", DataType::Int64, true),
Expand Down

29 changes: 20 additions & 9 deletions columnq/src/table/mod.rs

Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::io::Read;
use std::path::Path;
use std::sync::Arc;

use datafusion::datasource::TableProvider;

use datafusion::arrow;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::listing::{ListingOptions, ListingTableUrl};
use datafusion::datasource::TableProvider;
use serde::de::{Deserialize, Deserializer};
use serde_derive::Deserialize;
use snafu::prelude::*;
Expand Down Expand Up @@ -216,6 +216,18 @@ impl TableOptionCsv {
pub fn default_use_memory_table() -> bool {
true
}

pub fn as_arrow_csv_format(&self) -> arrow::csv::reader::Format {
arrow::csv::reader::Format::default()
.with_delimiter(self.delimiter)
.with_header(self.has_header)
}

pub fn as_df_csv_format(&self) -> CsvFormat {
CsvFormat::default()
.with_has_header(self.has_header)
.with_delimiter(self.delimiter)
}
}

impl Default for TableOptionCsv {
Expand Down Expand Up @@ -604,14 +616,13 @@ pub async fn datafusion_get_or_infer_schema(
.expect("Failed to create file url"),
)
.context(InferSchemaSnafu)?;
let inferred_schema = listing_options
.infer_schema(&dfctx.state(), &file_url)
.await
.context(InferSchemaSnafu)?;
schemas.push(
Arc::into_inner(
listing_options
.infer_schema(&dfctx.state(), &file_url)
.await
.context(InferSchemaSnafu)?,
)
.expect("Failed to unwrap schemaref into schema on merge"),
Arc::into_inner(inferred_schema)
.expect("Failed to unwrap schemaref into schema on merge"),
);
}
Arc::new(arrow::datatypes::Schema::try_merge(schemas).context(MergeSchemaSnafu)?)
Expand Down

14 changes: 7 additions & 7 deletions roapi/Cargo.toml

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "roapi"
version = "0.12.0"
version = "0.12.1"
authors = ["QP Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/roapi/roapi"
license = "MIT"
Expand Down Expand Up @@ -45,9 +45,9 @@ thiserror = "1"
snafu = "0"

# flight-sql
arrow-flight = { version = "51", features = ["flight-sql-experimental"] }
arrow-flight = { version = "52", features = ["flight-sql-experimental"] }
tonic = { version = "0.11", features = ["tls"] }
prost = "0"
prost = "0.12"
futures = "0"
# TODO: remove once_cell dependency
once_cell = "*"
Expand All @@ -56,13 +56,13 @@ uuid = "1"

[dependencies.convergence]
version = "0"
git = "https://github.com/returnString/convergence.git"
rev = "8360bb4f6ee3778f4a4951026a2e1a3cdb6f4df7"
git = "https://github.com/roapi/convergence.git"
rev = "40c5fca38d83611f6c941c9ffe86b597c2e5851b"

[dependencies.convergence-arrow]
version = "0"
git = "https://github.com/returnString/convergence.git"
rev = "8360bb4f6ee3778f4a4951026a2e1a3cdb6f4df7"
git = "https://github.com/roapi/convergence.git"
rev = "40c5fca38d83611f6c941c9ffe86b597c2e5851b"

[features]
default = ["rustls", "snmalloc"]
Expand Down

6 changes: 3 additions & 3 deletions roapi/src/server/flight_sql.rs

Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use arrow_flight::sql::{
CommandGetDbSchemas, CommandGetExportedKeys, CommandGetImportedKeys, CommandGetPrimaryKeys,
CommandGetSqlInfo, CommandGetTableTypes, CommandGetTables, CommandGetXdbcTypeInfo,
CommandPreparedStatementQuery, CommandPreparedStatementUpdate, CommandStatementQuery,
CommandStatementSubstraitPlan, CommandStatementUpdate, Nullable, ProstMessageExt, Searchable,
SqlInfo, TicketStatementQuery, XdbcDataType,
CommandStatementSubstraitPlan, CommandStatementUpdate, DoPutPreparedStatementResult, Nullable,
ProstMessageExt, Searchable, SqlInfo, TicketStatementQuery, XdbcDataType,
};
use arrow_flight::{
flight_service_server::FlightService, Action, FlightDescriptor, FlightEndpoint, FlightInfo,
Expand Down Expand Up @@ -800,7 +800,7 @@ impl<H: RoapiContext> FlightSqlService for RoapiFlightSqlService<H> {
&self,
_query: CommandPreparedStatementQuery,
_request: Request<PeekableFlightDataStream>,
) -> Result<Response<<Self as FlightService>::DoPutStream>, Status> {
) -> Result<DoPutPreparedStatementResult, Status> {
Err(Status::unimplemented(
"do_put_prepared_statement_query not implemented",
))
Expand Down

2 changes: 1 addition & 1 deletion roapi/tests/flight_sql_test.rs

Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ async fn test_flight_sql_get_tables() {
let schema_bytes = schema_arr.value(0);
let schema = try_schema_from_ipc_buffer(schema_bytes).expect("Invalid schema data");
assert_eq!(
schema.all_fields(),
schema.flattened_fields(),
vec![
&Field::new("city", DataType::Utf8, true),
&Field::new("lat", DataType::Float64, true),
Expand Down

11 changes: 9 additions & 2 deletions roapi/tests/postgres_test.rs

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ async fn test_postgres_count() {
.unwrap();

match &rows[0] {
tokio_postgres::SimpleQueryMessage::RowDescription(_) => {}
_ => {
panic!("expect row description from query result.");
}
}

match &rows[1] {
tokio_postgres::SimpleQueryMessage::Row(row) => {
assert_eq!(row.get(0).unwrap(), "132");
}
Expand All @@ -34,7 +41,7 @@ async fn test_postgres_count() {
}
}

match &rows[1] {
match &rows[2] {
tokio_postgres::SimpleQueryMessage::CommandComplete(modified) => {
assert_eq!(modified, &1);
}
Expand All @@ -43,5 +50,5 @@ async fn test_postgres_count() {
}
}

assert_eq!(rows.len(), 2);
assert_eq!(rows.len(), 3);
}