feat: Snowflake source. fetch MAX in a single query (#5387) · feast-dev/feast@b49cea1

@@ -245,6 +245,38 @@ def get_table_column_names_and_types(

245245

"The following source:\n" + query + "\n ... is empty"

246246

)

247247248+

high_precision_number_columns = [

249+

col["column_name"]

250+

for col in metadata

251+

if col["type_code"] == 0 and col["scale"] == 0 and col["precision"] > 19

252+

]

253+254+

if high_precision_number_columns:

255+

max_selects = [

256+

f'MAX("{col}") AS "{col}"' for col in high_precision_number_columns

257+

]

258+

query = (

259+

f"SELECT {', '.join(max_selects)} FROM {self.get_table_query_string()}"

260+

)

261+262+

with GetSnowflakeConnection(config.offline_store) as conn:

263+

result = execute_snowflake_statement(conn, query).fetch_pandas_all()

264+265+

for col in high_precision_number_columns:

266+

max_value = result[col].iloc[0]

267+

if max_value is not None:

268+

str_length = len(str(int(max_value)))

269+

for row in metadata:

270+

if row["column_name"] == col:

271+

if str_length <= 9:

272+

row["snowflake_type"] = "NUMBER32"

273+

elif str_length <= 19:

274+

row["snowflake_type"] = "NUMBER64"

275+

else:

276+

raise NotImplementedError(

277+

f"Number in column {col} larger than INT64 is not supported"

278+

)

279+248280

for row in metadata:

249281

if row["type_code"] == 0:

250282

if row["scale"] == 0:

@@ -253,34 +285,7 @@ def get_table_column_names_and_types(

253285

elif row["precision"] <= 18: # max precision size to ensure INT64

254286

row["snowflake_type"] = "NUMBER64"

255287

else:

256-

column = row["column_name"]

257-258-

with GetSnowflakeConnection(config.offline_store) as conn:

259-

query = f'SELECT MAX("{column}") AS "{column}" FROM {self.get_table_query_string()}'

260-

result = execute_snowflake_statement(

261-

conn, query

262-

).fetch_pandas_all()

263-

if (

264-

result.dtypes[column].name

265-

in python_int_to_snowflake_type_map

266-

):

267-

row["snowflake_type"] = python_int_to_snowflake_type_map[

268-

result.dtypes[column].name

269-

]

270-

else:

271-

if len(result) > 0:

272-

max_value = result.iloc[0][0]

273-

if max_value is not None and len(str(max_value)) <= 9:

274-

row["snowflake_type"] = "NUMBER32"

275-

continue

276-

elif (

277-

max_value is not None and len(str(max_value)) <= 18

278-

):

279-

row["snowflake_type"] = "NUMBER64"

280-

continue

281-

raise NotImplementedError(

282-

"NaNs or Numbers larger than INT64 are not supported"

283-

)

288+

continue

284289

else:

285290

row["snowflake_type"] = "NUMBERwSCALE"

286291