FEAT: Complex Data Type Support - DATETIMEOFFSET by gargsaumya · Pull Request #243 · microsoft/mssql-python

Expand Up @@ -9,7 +9,7 @@ """
import pytest from datetime import datetime, date, time from datetime import datetime, date, time, timedelta, timezone import time as time_module import decimal from contextlib import closing Expand Down Expand Up @@ -6472,7 +6472,7 @@ def test_only_null_and_empty_binary(cursor, db_connection): finally: drop_table_if_exists(cursor, "#pytest_null_empty_binary") db_connection.commit()
# ---------------------- VARCHAR(MAX) ----------------------
def test_varcharmax_short_fetch(cursor, db_connection): Expand Down Expand Up @@ -7560,6 +7560,169 @@ def test_decimal_separator_calculations(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") db_connection.commit()
def test_datetimeoffset_read_write(cursor, db_connection): """Test reading and writing timezone-aware DATETIMEOFFSET values.""" try: test_cases = [ # Valid timezone-aware datetimes datetime(2023, 10, 26, 10, 30, 0, tzinfo=timezone(timedelta(hours=5, minutes=30))), datetime(2023, 10, 27, 15, 45, 10, 123456, tzinfo=timezone(timedelta(hours=-8))), datetime(2023, 10, 28, 20, 0, 5, 987654, tzinfo=timezone.utc) ]
cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" for i, dt in enumerate(test_cases): cursor.execute(insert_stmt, i, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") for i, dt in enumerate(test_cases): row = cursor.fetchone() assert row is not None fetched_id, fetched_dt = row assert fetched_dt.tzinfo is not None expected_utc = dt.astimezone(timezone.utc) fetched_utc = fetched_dt.astimezone(timezone.utc) # Ignore sub-microsecond differences expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000) fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000) assert fetched_utc == expected_utc finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") db_connection.commit()
def test_datetimeoffset_max_min_offsets(cursor, db_connection): """ Test inserting and retrieving DATETIMEOFFSET with maximum and minimum allowed offsets (+14:00 and -14:00). Uses fetchone() for retrieval. """ try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
test_cases = [ (1, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=14)))), # max offset (2, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=-14)))), # min offset ]
insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" for row_id, dt in test_cases: cursor.execute(insert_stmt, row_id, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;")
for expected_id, expected_dt in test_cases: row = cursor.fetchone() assert row is not None, f"No row fetched for id {expected_id}." fetched_id, fetched_dt = row
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"
# Compare in UTC to avoid offset differences expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) assert fetched_utc == expected_utc, ( f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" )
finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") db_connection.commit()
def test_datetimeoffset_invalid_offsets(cursor, db_connection): """Verify driver rejects offsets beyond ±14 hours.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_invalid_offsets (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", 1, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=15))))
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", 2, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=-15)))) finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_invalid_offsets;") db_connection.commit()
def test_datetimeoffset_dst_transitions(cursor, db_connection): """ Test inserting and retrieving DATETIMEOFFSET values around DST transitions. Ensures that driver handles DST correctly and does not crash. """ try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_dst_transitions (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
# Example DST transition dates (replace with actual region offset if needed) dst_test_cases = [ (1, datetime(2025, 3, 9, 1, 59, 59, tzinfo=timezone(timedelta(hours=-5)))), # Just before spring forward (2, datetime(2025, 3, 9, 3, 0, 0, tzinfo=timezone(timedelta(hours=-4)))), # Just after spring forward (3, datetime(2025, 11, 2, 1, 59, 59, tzinfo=timezone(timedelta(hours=-4)))), # Just before fall back (4, datetime(2025, 11, 2, 1, 0, 0, tzinfo=timezone(timedelta(hours=-5)))), # Just after fall back ]
insert_stmt = "INSERT INTO #pytest_datetimeoffset_dst_transitions (id, dto_column) VALUES (?, ?);" for row_id, dt in dst_test_cases: cursor.execute(insert_stmt, row_id, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_dst_transitions ORDER BY id;")
for expected_id, expected_dt in dst_test_cases: row = cursor.fetchone() assert row is not None, f"No row fetched for id {expected_id}." fetched_id, fetched_dt = row
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"
# Compare UTC time to avoid issues due to offsets changing in DST expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) assert fetched_utc == expected_utc, ( f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" )
finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_dst_transitions;") db_connection.commit()
def test_datetimeoffset_leap_second(cursor, db_connection): """Ensure driver handles leap-second-like microsecond edge cases without crashing.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_leap_second (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
leap_second_sim = datetime(2023, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) cursor.execute("INSERT INTO #pytest_datetimeoffset_leap_second (id, dto_column) VALUES (?, ?);", 1, leap_second_sim) db_connection.commit()
row = cursor.execute("SELECT dto_column FROM #pytest_datetimeoffset_leap_second;").fetchone() assert row[0].tzinfo is not None finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_leap_second;") db_connection.commit()
def test_datetimeoffset_malformed_input(cursor, db_connection): """Verify driver raises error for invalid datetimeoffset strings.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_malformed_input (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_malformed_input (id, dto_column) VALUES (?, ?);", 1, "2023-13-45 25:61:00 +99:99") # invalid string finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_malformed_input;") db_connection.commit()
def test_lowercase_attribute(cursor, db_connection): """Test that the lowercase attribute properly converts column names to lowercase"""
Expand Down