support catalog queries by houqp · Pull Request #258 · roapi/roapi

@@ -1,32 +1,35 @@ use std::convert::TryFrom; use crate::io::BlobStoreType; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc;
use datafusion::arrow; use datafusion::arrow::array::as_string_array; use datafusion::arrow::array::StringArray; use datafusion::datasource::object_store::{ObjectStoreRegistry, ObjectStoreProvider}; use datafusion::datasource::object_store::{ObjectStoreProvider, ObjectStoreRegistry}; use datafusion::error::{DataFusionError, Result as DatafusionResult}; pub use datafusion::execution::context::SessionConfig; use datafusion::execution::context::SessionContext; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::collect;
use object_store::aws::AmazonS3Builder; use object_store::gcp::GoogleCloudStorageBuilder; use object_store::azure::MicrosoftAzureBuilder; use crate::error::{ColumnQError, QueryError}; use crate::query; use crate::table::{self, KeyValueSource, TableSource}; use object_store::aws::AmazonS3Builder; use object_store::azure::MicrosoftAzureBuilder; use object_store::gcp::GoogleCloudStorageBuilder; use url::Url;
pub struct ColumnQObjectStoreProvider {} impl ObjectStoreProvider for ColumnQObjectStoreProvider { fn get_by_url(&self, url: &Url) -> DatafusionResult<Arc<dyn object_store::ObjectStore>> { match url.host_str() { None => Err(DataFusionError::Execution(format!("Missing bucket name: {}", url.as_str()))), None => Err(DataFusionError::Execution(format!( "Missing bucket name: {}", url.as_str() ))), Some(host) => { let url_schema = url.scheme(); match BlobStoreType::try_from(url_schema) { Expand All @@ -41,28 +44,28 @@ impl ObjectStoreProvider for ColumnQObjectStoreProvider { Ok(s3) => Ok(Arc::new(s3)), Err(err) => Err(DataFusionError::External(Box::new(err))), } }, } BlobStoreType::GCS => { let gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(host); let gcs_builder = GoogleCloudStorageBuilder::from_env().with_bucket_name(host); match gcs_builder.build() { Ok(gcs) => Ok(Arc::new(gcs)), Err(err) => Err(DataFusionError::External(Box::new(err))), }
}, } BlobStoreType::Azure => { let azure_builder = MicrosoftAzureBuilder::from_env().with_container_name(host); let azure_builder = MicrosoftAzureBuilder::from_env().with_container_name(host); match azure_builder.build() { Ok(azure) => Ok(Arc::new(azure)), Err(err) => Err(DataFusionError::External(Box::new(err))), } }, } _ => Err(DataFusionError::Execution(format!( "Unsupported scheme: {url_schema}" ))), } }, }
} } } Expand All @@ -76,13 +79,15 @@ pub struct ColumnQ {
impl ColumnQ { pub fn new() -> Self { Self::new_with_config(SessionConfig::default()) Self::new_with_config(SessionConfig::from_env().with_information_schema(true)) }
pub fn new_with_config(config: SessionConfig) -> Self { let object_store_provider = ColumnQObjectStoreProvider {}; let object_store_registry = ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider))); let rn_config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); let object_store_registry = ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider))); let rn_config = RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)); let runtime_env = RuntimeEnv::new(rn_config).unwrap(); let dfctx = SessionContext::with_config_rt(config, Arc::new(runtime_env));
Expand Down Expand Up @@ -204,10 +209,10 @@ impl Default for ColumnQ {
#[cfg(test)] mod tests { use tempfile::Builder; use std::fs::File; use std::io::Write; use std::{env, str::FromStr}; use std::fs::File; use tempfile::Builder;
use datafusion::datasource::object_store::ObjectStoreProvider; use url::Url; Expand All @@ -225,8 +230,7 @@ mod tests { assert!(err.to_string().contains("Generic S3 error: Missing region"));
env::set_var("AWS_REGION", "us-east-1"); let res = provider .get_by_url(&Url::from_str(host_url).unwrap()); let res = provider.get_by_url(&Url::from_str(host_url).unwrap()); let msg = match res { Err(e) => format!("{e}"), Ok(_) => "".to_string(), Expand All @@ -251,16 +255,16 @@ mod tests { let host_url = "gs://bucket_name/path"; let provider = ColumnQObjectStoreProvider {};
let tmp_dir = Builder::new() .prefix("columnq.test.gcs") .tempdir()?; let tmp_dir = Builder::new().prefix("columnq.test.gcs").tempdir()?; let tmp_gcs_path = tmp_dir.path().join("service_account.json"); let mut tmp_gcs = File::create(tmp_gcs_path.clone())?; writeln!(tmp_gcs, r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"#)?; writeln!( tmp_gcs, r#"{{"gcs_base_url": "http://localhost:4443", "disable_oauth": true, "client_email": "", "private_key": ""}}"# )?; env::set_var("GOOGLE_SERVICE_ACCOUNT", tmp_gcs_path);
let res = provider .get_by_url(&Url::from_str(host_url).unwrap()); let res = provider.get_by_url(&Url::from_str(host_url).unwrap()); let msg = match res { Err(e) => format!("{e}"), Ok(_) => "".to_string(), Expand All @@ -281,8 +285,7 @@ mod tests { env::set_var("AZURE_STORAGE_ACCOUNT_NAME", "devstoreaccount1"); env::set_var("AZURE_STORAGE_ACCOUNT_KEY", "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==");
let res = provider .get_by_url(&Url::from_str(host_url).unwrap()); let res = provider.get_by_url(&Url::from_str(host_url).unwrap()); let msg = match res { Err(e) => format!("{e}"), Ok(_) => "".to_string(), Expand Down