FEAT: Complex Data Type Support - DATETIMEOFFSET by gargsaumya · Pull Request #243 · microsoft/mssql-python
Expand Up
@@ -9,7 +9,7 @@
"""
import pytest from datetime import datetime, date, time from datetime import datetime, date, time, timedelta, timezone import time as time_module import decimal from contextlib import closing Expand Down Expand Up @@ -6472,7 +6472,7 @@ def test_only_null_and_empty_binary(cursor, db_connection): finally: drop_table_if_exists(cursor, "#pytest_null_empty_binary") db_connection.commit()
# ---------------------- VARCHAR(MAX) ----------------------
def test_varcharmax_short_fetch(cursor, db_connection): Expand Down Expand Up @@ -7560,6 +7560,169 @@ def test_decimal_separator_calculations(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") db_connection.commit()
def test_datetimeoffset_read_write(cursor, db_connection): """Test reading and writing timezone-aware DATETIMEOFFSET values.""" try: test_cases = [ # Valid timezone-aware datetimes datetime(2023, 10, 26, 10, 30, 0, tzinfo=timezone(timedelta(hours=5, minutes=30))), datetime(2023, 10, 27, 15, 45, 10, 123456, tzinfo=timezone(timedelta(hours=-8))), datetime(2023, 10, 28, 20, 0, 5, 987654, tzinfo=timezone.utc) ]
cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" for i, dt in enumerate(test_cases): cursor.execute(insert_stmt, i, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") for i, dt in enumerate(test_cases): row = cursor.fetchone() assert row is not None fetched_id, fetched_dt = row assert fetched_dt.tzinfo is not None expected_utc = dt.astimezone(timezone.utc) fetched_utc = fetched_dt.astimezone(timezone.utc) # Ignore sub-microsecond differences expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000) fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000) assert fetched_utc == expected_utc finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") db_connection.commit()
def test_datetimeoffset_max_min_offsets(cursor, db_connection): """ Test inserting and retrieving DATETIMEOFFSET with maximum and minimum allowed offsets (+14:00 and -14:00). Uses fetchone() for retrieval. """ try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
test_cases = [ (1, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=14)))), # max offset (2, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=-14)))), # min offset ]
insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" for row_id, dt in test_cases: cursor.execute(insert_stmt, row_id, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;")
for expected_id, expected_dt in test_cases: row = cursor.fetchone() assert row is not None, f"No row fetched for id {expected_id}." fetched_id, fetched_dt = row
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"
# Compare in UTC to avoid offset differences expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) assert fetched_utc == expected_utc, ( f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" )
finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") db_connection.commit()
def test_datetimeoffset_invalid_offsets(cursor, db_connection): """Verify driver rejects offsets beyond ±14 hours.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_invalid_offsets (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", 1, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=15))))
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", 2, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=-15)))) finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_invalid_offsets;") db_connection.commit()
def test_datetimeoffset_dst_transitions(cursor, db_connection): """ Test inserting and retrieving DATETIMEOFFSET values around DST transitions. Ensures that driver handles DST correctly and does not crash. """ try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_dst_transitions (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
# Example DST transition dates (replace with actual region offset if needed) dst_test_cases = [ (1, datetime(2025, 3, 9, 1, 59, 59, tzinfo=timezone(timedelta(hours=-5)))), # Just before spring forward (2, datetime(2025, 3, 9, 3, 0, 0, tzinfo=timezone(timedelta(hours=-4)))), # Just after spring forward (3, datetime(2025, 11, 2, 1, 59, 59, tzinfo=timezone(timedelta(hours=-4)))), # Just before fall back (4, datetime(2025, 11, 2, 1, 0, 0, tzinfo=timezone(timedelta(hours=-5)))), # Just after fall back ]
insert_stmt = "INSERT INTO #pytest_datetimeoffset_dst_transitions (id, dto_column) VALUES (?, ?);" for row_id, dt in dst_test_cases: cursor.execute(insert_stmt, row_id, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_dst_transitions ORDER BY id;")
for expected_id, expected_dt in dst_test_cases: row = cursor.fetchone() assert row is not None, f"No row fetched for id {expected_id}." fetched_id, fetched_dt = row
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"
# Compare UTC time to avoid issues due to offsets changing in DST expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) assert fetched_utc == expected_utc, ( f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" )
finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_dst_transitions;") db_connection.commit()
def test_datetimeoffset_leap_second(cursor, db_connection): """Ensure driver handles leap-second-like microsecond edge cases without crashing.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_leap_second (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
leap_second_sim = datetime(2023, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) cursor.execute("INSERT INTO #pytest_datetimeoffset_leap_second (id, dto_column) VALUES (?, ?);", 1, leap_second_sim) db_connection.commit()
row = cursor.execute("SELECT dto_column FROM #pytest_datetimeoffset_leap_second;").fetchone() assert row[0].tzinfo is not None finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_leap_second;") db_connection.commit()
def test_datetimeoffset_malformed_input(cursor, db_connection): """Verify driver raises error for invalid datetimeoffset strings.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_malformed_input (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_malformed_input (id, dto_column) VALUES (?, ?);", 1, "2023-13-45 25:61:00 +99:99") # invalid string finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_malformed_input;") db_connection.commit()
def test_lowercase_attribute(cursor, db_connection): """Test that the lowercase attribute properly converts column names to lowercase"""
Expand Down
import pytest from datetime import datetime, date, time from datetime import datetime, date, time, timedelta, timezone import time as time_module import decimal from contextlib import closing Expand Down Expand Up @@ -6472,7 +6472,7 @@ def test_only_null_and_empty_binary(cursor, db_connection): finally: drop_table_if_exists(cursor, "#pytest_null_empty_binary") db_connection.commit()
# ---------------------- VARCHAR(MAX) ----------------------
def test_varcharmax_short_fetch(cursor, db_connection): Expand Down Expand Up @@ -7560,6 +7560,169 @@ def test_decimal_separator_calculations(cursor, db_connection): cursor.execute("DROP TABLE IF EXISTS #pytest_decimal_calc_test") db_connection.commit()
def test_datetimeoffset_read_write(cursor, db_connection): """Test reading and writing timezone-aware DATETIMEOFFSET values.""" try: test_cases = [ # Valid timezone-aware datetimes datetime(2023, 10, 26, 10, 30, 0, tzinfo=timezone(timedelta(hours=5, minutes=30))), datetime(2023, 10, 27, 15, 45, 10, 123456, tzinfo=timezone(timedelta(hours=-8))), datetime(2023, 10, 28, 20, 0, 5, 987654, tzinfo=timezone.utc) ]
cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" for i, dt in enumerate(test_cases): cursor.execute(insert_stmt, i, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;") for i, dt in enumerate(test_cases): row = cursor.fetchone() assert row is not None fetched_id, fetched_dt = row assert fetched_dt.tzinfo is not None expected_utc = dt.astimezone(timezone.utc) fetched_utc = fetched_dt.astimezone(timezone.utc) # Ignore sub-microsecond differences expected_utc = expected_utc.replace(microsecond=int(expected_utc.microsecond / 1000) * 1000) fetched_utc = fetched_utc.replace(microsecond=int(fetched_utc.microsecond / 1000) * 1000) assert fetched_utc == expected_utc finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") db_connection.commit()
def test_datetimeoffset_max_min_offsets(cursor, db_connection): """ Test inserting and retrieving DATETIMEOFFSET with maximum and minimum allowed offsets (+14:00 and -14:00). Uses fetchone() for retrieval. """ try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_read_write (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
test_cases = [ (1, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=14)))), # max offset (2, datetime(2025, 1, 1, 12, 0, 0, tzinfo=timezone(timedelta(hours=-14)))), # min offset ]
insert_stmt = "INSERT INTO #pytest_datetimeoffset_read_write (id, dto_column) VALUES (?, ?);" for row_id, dt in test_cases: cursor.execute(insert_stmt, row_id, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_read_write ORDER BY id;")
for expected_id, expected_dt in test_cases: row = cursor.fetchone() assert row is not None, f"No row fetched for id {expected_id}." fetched_id, fetched_dt = row
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"
# Compare in UTC to avoid offset differences expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) assert fetched_utc == expected_utc, ( f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" )
finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_read_write;") db_connection.commit()
def test_datetimeoffset_invalid_offsets(cursor, db_connection): """Verify driver rejects offsets beyond ±14 hours.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_invalid_offsets (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", 1, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=15))))
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_invalid_offsets (id, dto_column) VALUES (?, ?);", 2, datetime(2025, 1, 1, 12, 0, tzinfo=timezone(timedelta(hours=-15)))) finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_invalid_offsets;") db_connection.commit()
def test_datetimeoffset_dst_transitions(cursor, db_connection): """ Test inserting and retrieving DATETIMEOFFSET values around DST transitions. Ensures that driver handles DST correctly and does not crash. """ try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_dst_transitions (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
# Example DST transition dates (replace with actual region offset if needed) dst_test_cases = [ (1, datetime(2025, 3, 9, 1, 59, 59, tzinfo=timezone(timedelta(hours=-5)))), # Just before spring forward (2, datetime(2025, 3, 9, 3, 0, 0, tzinfo=timezone(timedelta(hours=-4)))), # Just after spring forward (3, datetime(2025, 11, 2, 1, 59, 59, tzinfo=timezone(timedelta(hours=-4)))), # Just before fall back (4, datetime(2025, 11, 2, 1, 0, 0, tzinfo=timezone(timedelta(hours=-5)))), # Just after fall back ]
insert_stmt = "INSERT INTO #pytest_datetimeoffset_dst_transitions (id, dto_column) VALUES (?, ?);" for row_id, dt in dst_test_cases: cursor.execute(insert_stmt, row_id, dt) db_connection.commit()
cursor.execute("SELECT id, dto_column FROM #pytest_datetimeoffset_dst_transitions ORDER BY id;")
for expected_id, expected_dt in dst_test_cases: row = cursor.fetchone() assert row is not None, f"No row fetched for id {expected_id}." fetched_id, fetched_dt = row
assert fetched_id == expected_id, f"ID mismatch: expected {expected_id}, got {fetched_id}" assert fetched_dt.tzinfo is not None, f"Fetched datetime object is naive for id {fetched_id}"
# Compare UTC time to avoid issues due to offsets changing in DST expected_utc = expected_dt.astimezone(timezone.utc).replace(tzinfo=None) fetched_utc = fetched_dt.astimezone(timezone.utc).replace(tzinfo=None) assert fetched_utc == expected_utc, ( f"Value mismatch for id {expected_id}: expected UTC {expected_utc}, got {fetched_utc}" )
finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_dst_transitions;") db_connection.commit()
def test_datetimeoffset_leap_second(cursor, db_connection): """Ensure driver handles leap-second-like microsecond edge cases without crashing.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_leap_second (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
leap_second_sim = datetime(2023, 12, 31, 23, 59, 59, 999999, tzinfo=timezone.utc) cursor.execute("INSERT INTO #pytest_datetimeoffset_leap_second (id, dto_column) VALUES (?, ?);", 1, leap_second_sim) db_connection.commit()
row = cursor.execute("SELECT dto_column FROM #pytest_datetimeoffset_leap_second;").fetchone() assert row[0].tzinfo is not None finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_leap_second;") db_connection.commit()
def test_datetimeoffset_malformed_input(cursor, db_connection): """Verify driver raises error for invalid datetimeoffset strings.""" try: cursor.execute("CREATE TABLE #pytest_datetimeoffset_malformed_input (id INT PRIMARY KEY, dto_column DATETIMEOFFSET);") db_connection.commit()
with pytest.raises(Exception): cursor.execute("INSERT INTO #pytest_datetimeoffset_malformed_input (id, dto_column) VALUES (?, ?);", 1, "2023-13-45 25:61:00 +99:99") # invalid string finally: cursor.execute("DROP TABLE IF EXISTS #pytest_datetimeoffset_malformed_input;") db_connection.commit()
def test_lowercase_attribute(cursor, db_connection): """Test that the lowercase attribute properly converts column names to lowercase"""
Expand Down