Skip to content

Commit 6c2ff2b

Browse files
authored
Postgres Canary Test Env (#19)
* Mock data generator for local canary * Add destination schema and user for transfer * Update transfer library to use dates * Fix bool col * Fixing canary transfer * Update date types for sql destinations
1 parent a780493 commit 6c2ff2b

File tree

10 files changed

+1253
-13
lines changed

10 files changed

+1253
-13
lines changed

data-transfer/pontoon/pontoon/base.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import json
99
import time
1010
from uuid import UUID
11-
from datetime import datetime, timedelta, timezone
11+
from datetime import datetime, timedelta, timezone, date
1212
from decimal import Decimal
1313
from abc import ABC, abstractmethod
1414
from typing import List, Dict, Tuple, Generator, Any
@@ -50,7 +50,7 @@ class Stream:
5050
str: pa.string(),
5151
bool: pa.bool_(),
5252
bytes: pa.binary(),
53-
datetime.date: pa.date32(),
53+
date: pa.date32(),
5454
datetime.time: pa.time64('us'),
5555
datetime: pa.timestamp('us', tz='UTC'),
5656
type(None): pa.null() # NoneType corresponds to NULL
@@ -60,10 +60,12 @@ class Stream:
6060
PY_CONVERSION_MAP = {
6161
UUID: str,
6262
Decimal: float,
63+
dict: str, # JSONB and other dict-like types get converted to string
64+
date: date, # Date type maps to itself (already in PY_TO_PYARROW_MAP)
6365
'TIMESTAMP_NTZ': datetime,
6466
'TIMESTAMP_LTZ': datetime,
6567
'TIMESTAMP_TZ': datetime,
66-
'DATE': datetime.date,
68+
'DATE': date, # String fallback for DATE type
6769
'TIME': datetime.time
6870
}
6971

@@ -154,6 +156,8 @@ def convert(val):
154156
py_type = type(val)
155157
if py_type is datetime:
156158
return val.astimezone(timezone.utc)
159+
elif py_type is date:
160+
return val # date objects should be passed through as-is
157161
fn = type_map.get(py_type)
158162
return fn(val) if fn else val
159163

data-transfer/pontoon/pontoon/cache/sqlite_cache.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import sqlite3
22
import pyarrow as pa
3-
from datetime import datetime
3+
from datetime import datetime, date
44
from typing import List, Dict, Tuple, Generator, Any
55
from pontoon.base import Cache, Namespace, Stream, Record
66

@@ -11,9 +11,17 @@ def adapt_datetime_iso(val):
1111
def convert_datetime(val):
1212
return datetime.fromisoformat(val.decode())
1313

14+
def adapt_date_iso(val):
15+
return val.isoformat()
16+
17+
def convert_date(val):
18+
return date.fromisoformat(val.decode())
19+
1420

1521
sqlite3.register_adapter(datetime, adapt_datetime_iso)
1622
sqlite3.register_converter("datetime", convert_datetime)
23+
sqlite3.register_adapter(date, adapt_date_iso)
24+
sqlite3.register_converter("date", convert_date)
1725

1826

1927
class SqliteCache(Cache):
@@ -48,7 +56,9 @@ def _arrow_to_sqlite_type(arrow_type):
4856
return "BLOB"
4957
elif pa.types.is_boolean(arrow_type):
5058
return "INTEGER"
51-
elif pa.types.is_date(arrow_type) or pa.types.is_timestamp(arrow_type):
59+
elif pa.types.is_date(arrow_type):
60+
return "date"
61+
elif pa.types.is_timestamp(arrow_type):
5262
return "datetime"
5363
elif pa.types.is_decimal(arrow_type):
5464
return "TEXT"
@@ -93,8 +103,31 @@ def _insert_rows_to_stream(self, stream:Stream, records:List[Record]):
93103

94104

95105
def _rows_to_records(self, stream:Stream, rows):
96-
# covert a sqlite row back into a record
97-
return [Record(list(row)) for row in rows]
106+
# convert a sqlite row back into a record with proper type conversion
107+
records = []
108+
for row in rows:
109+
converted_data = []
110+
for i, value in enumerate(row):
111+
# Get the expected type from the schema
112+
expected_type = stream.schema.types[i]
113+
114+
# Convert value based on expected type
115+
if pa.types.is_boolean(expected_type) and isinstance(value, int):
116+
# Convert SQLite integer (0/1) back to boolean
117+
converted_data.append(bool(value))
118+
elif pa.types.is_date(expected_type) and isinstance(value, str):
119+
# Convert date string back to date object
120+
converted_data.append(date.fromisoformat(value))
121+
elif pa.types.is_timestamp(expected_type) and isinstance(value, str):
122+
# Convert datetime string back to datetime object
123+
converted_data.append(datetime.fromisoformat(value))
124+
else:
125+
# Keep the value as-is for other types
126+
converted_data.append(value)
127+
128+
records.append(Record(converted_data))
129+
130+
return records
98131

99132

100133
def write(self, stream:Stream, records:List[Record]):

data-transfer/pontoon/pontoon/destination/postgres_destination.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ def write(self, ds:Dataset, progress_callback = None):
141141
if callable(progress_callback):
142142
progress.subscribe(progress_callback)
143143

144+
# Check if there are any records to process
145+
stream_size = ds.size(stream)
146+
if stream_size == 0:
147+
progress.message("No records to process for this stream")
148+
continue
149+
144150
with self._connect() as conn:
145151
# create target table for the stream if it doesn't exist
146152
table = SQLDestination.create_table_if_not_exists(conn, stream)

data-transfer/pontoon/pontoon/destination/sql_destination.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ class SQLDestination(Destination):
2424
pa.binary(): String,
2525
pa.bool_(): Boolean,
2626
pa.timestamp('us', tz='UTC'): DateTime(True),
27-
pa.date32(): DateTime,
28-
pa.date64(): DateTime
27+
pa.date32(): Date,
28+
pa.date64(): Date,
2929
}
3030

3131

@@ -39,7 +39,7 @@ class SQLDestination(Destination):
3939
SmallInteger: pa.int64(),
4040
Numeric: pa.float64(),
4141
Boolean: pa.bool_(),
42-
Date: pa.timestamp('us', tz='UTC'),
42+
Date: pa.date32(),
4343
Time: pa.timestamp('us', tz='UTC'),
4444
DateTime: pa.timestamp('us', tz='UTC'),
4545
}
@@ -102,6 +102,29 @@ def table_ddl_to_schema(cols) -> pa.Schema:
102102
return pa.schema(fields)
103103

104104

105+
@staticmethod
106+
def schemas_compatible(stream_schema: pa.Schema, existing_schema: pa.Schema) -> bool:
107+
"""
108+
Compare two schemas for compatibility, ignoring column order.
109+
Returns True if the schemas are compatible (same column names and types).
110+
"""
111+
# Convert schemas to dictionaries for easier comparison
112+
stream_fields = {field.name: field.type for field in stream_schema}
113+
existing_fields = {field.name: field.type for field in existing_schema}
114+
115+
# Check if all columns exist in both schemas with matching types
116+
if set(stream_fields.keys()) != set(existing_fields.keys()):
117+
return False
118+
119+
# Check if all column types match
120+
for col_name, stream_type in stream_fields.items():
121+
existing_type = existing_fields[col_name]
122+
if stream_type != existing_type:
123+
return False
124+
125+
return True
126+
127+
105128
@staticmethod
106129
def create_table_if_not_exists(conn, stream:Stream, override_name:str = None):
107130
# create a table for this stream if it doesn't already exist
@@ -118,8 +141,8 @@ def create_table_if_not_exists(conn, stream:Stream, override_name:str = None):
118141
table = Table(name, metadata_obj, schema=stream.schema_name, autoload_with=insp)
119142
existing_schema = SQLDestination.table_ddl_to_schema(table.columns)
120143

121-
# if not, we can't write to it
122-
if not existing_schema.equals(stream.schema):
144+
# Use flexible schema comparison that ignores column order
145+
if not SQLDestination.schemas_compatible(stream.schema, existing_schema):
123146
raise ValueError(f"Existing schema for stream {name} does not match.")
124147

125148
else:

0 commit comments

Comments
 (0)