Skip to content

Commit 7b8cf23

Browse files
committed
Use distinct worker ID + pre-queuing validation
1 parent 89a2d7e commit 7b8cf23

File tree

7 files changed

+44
-9
lines changed

7 files changed

+44
-9
lines changed

.devcontainer/.dev_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ kafka_enable_dlq: True
88
input_config_path: input_dummy.yaml
99
original_aem_pack_topic: original-aempacks
1010
derived_aem_pack_topic: derived-aempacks
11+
worker_id: test-worker

src/ets/adapters/outbound/incoming_aem_pack_queue.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ def __init__(
3737
self,
3838
*,
3939
collection: AsyncCollection,
40-
service_instance_id: str,
40+
worker_id: str,
4141
):
4242
self._collection = collection
43-
self._service_instance_id = service_instance_id
43+
self._worker_id = worker_id
4444

4545
async def queue(self, aem_pack: AEMPack) -> None:
4646
"""Upsert an AEMPack into the queue."""
@@ -87,15 +87,15 @@ async def claim_next(self) -> IncomingAEMPack | None:
8787
# Check for packs abandoned by a previous crash of this instance
8888
doc = await self._collection.find_one(
8989
{
90-
PROCESSOR_FIELD: self._service_instance_id,
90+
PROCESSOR_FIELD: self._worker_id,
9191
PROCESSED_AT_FIELD: None,
9292
}
9393
)
9494
if not doc:
9595
# No abandoned packs; try to claim a fresh one
9696
doc = await self._collection.find_one_and_update(
9797
filter={PROCESSOR_FIELD: None, PROCESSED_AT_FIELD: None},
98-
update={"$set": {PROCESSOR_FIELD: self._service_instance_id}},
98+
update={"$set": {PROCESSOR_FIELD: self._worker_id}},
9999
return_document=ReturnDocument.AFTER,
100100
)
101101
if not doc:
@@ -107,7 +107,7 @@ async def claim_next(self) -> IncomingAEMPack | None:
107107
},
108108
update={
109109
"$set": {
110-
PROCESSOR_FIELD: self._service_instance_id,
110+
PROCESSOR_FIELD: self._worker_id,
111111
PROCESSED_AT_FIELD: None,
112112
NEEDS_REPROCESSING_FIELD: False,
113113
}

src/ets/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,7 @@ class Config(
4545
default=60,
4646
description="Seconds to sleep when no unprocessed AEMPacks are found.",
4747
)
48+
worker_id: str = Field(
49+
default=...,
50+
description="Unique identifier for a service instance used specifically for the reclamation logic.",
51+
)

src/ets/core/aem_pack_registry.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
from metldata import get_transformation_registry
2525
from metldata.transform.handling import TransformationHandler
2626
from pydantic import UUID4, BaseModel, ConfigDict
27+
from schemapack import SchemaPackValidator
28+
from schemapack.exceptions import ValidationError
2729
from schemapack.spec.datapack import DataPack
2830
from schemapack.spec.schemapack import SchemaPack
2931

@@ -64,6 +66,30 @@ def __init__(
6466

6567
async def queue_unprocessed(self, aem_pack: AEMPack):
6668
"""Fetch new AEMPacks via event subscriber and put them into the queue for processing."""
69+
# For now, just load. Reloading will be dealt with in the config lock + update ticket
70+
config = await self._config_loader.load_config_from_db()
71+
72+
# Ensure model name exists
73+
matching_model = None
74+
for model in config.models:
75+
if model.name == aem_pack.model_name:
76+
matching_model = model
77+
break
78+
else:
79+
model_lookup_error = ValueError(
80+
f"No model with name {aem_pack.model_name} registered for AEMPack with id {aem_pack.id}."
81+
)
82+
log.error(model_lookup_error)
83+
raise model_lookup_error
84+
85+
# Validate DataPack against corresponding SchemaPack
86+
validator = SchemaPackValidator(schemapack=matching_model.schema_)
87+
try:
88+
validator.validate(datapack=aem_pack.data)
89+
except ValidationError as error:
90+
log.error(error)
91+
raise
92+
6793
await self._incoming_aem_pack_queue.queue(aem_pack)
6894

6995
async def process_aem_packs(self) -> None:
@@ -146,9 +172,12 @@ def _traverse_graph(
146172
routes_by_input.setdefault(route.input_model_name, []).append(route)
147173

148174
if not models_by_name.get(incoming.model_name):
149-
raise ValueError(
175+
model_lookup_error = ValueError(
150176
f"No model with name {incoming.model_name} registered for AEMPack with id {incoming.id}."
177+
+ "This means a previously existing model vanished, which should not happen."
151178
)
179+
log.critical(model_lookup_error)
180+
raise model_lookup_error
152181

153182
# Relies on dict insertion-order guarantee.
154183
# Avoid copying or re-sorting this dict, as that would break the traversal order.

src/ets/inject.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ async def prepare_aem_pack_registry(
9494
)
9595
incoming_aem_pack_queue = IncomingAEMPackQueue(
9696
collection=mongo_client[config.db_name][INCOMING_AEM_PACK_COLLECTION],
97-
service_instance_id=config.service_instance_id,
97+
worker_id=config.worker_id,
9898
)
9999

100100
yield AEMPackRegistry(

tests/fixtures/test_config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ kafka_enable_dlq: True
77
input_config_path: tests/fixtures/input_configs/manager/valid/basic_config.yaml
88
original_aem_pack_topic: original-aempacks
99
derived_aem_pack_topic: derived-aempacks
10+
worker_id: test-worker

tests/test_aem_pack_registry/test_queue_and_claim.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ async def test_abandoned_pack_reclaimed_by_same_instance(joint_fixture: JointFix
104104
# Simulate a previous crash: the doc is already claimed by this instance
105105
await joint_fixture.incoming_aem_pack_collection.update_one(
106106
{"_id": ingress.id},
107-
{"$set": {PROCESSOR_FIELD: joint_fixture.config.service_instance_id}},
107+
{"$set": {PROCESSOR_FIELD: joint_fixture.config.worker_id}},
108108
)
109109

110110
await _assert_pack_claimed_during_processing(registry, ingress.id)
@@ -184,7 +184,7 @@ async def test_concurrent_queue_publishes_and_leaves_for_reprocessing(
184184
# Processor is preserved so in-flight instance can complete; needs_reprocessing signals v2 is pending
185185
raw = await joint_fixture.incoming_aem_pack_collection.find_one({"_id": aem_id})
186186
assert raw is not None
187-
assert raw["processor"] == joint_fixture.config.service_instance_id
187+
assert raw["processor"] == joint_fixture.config.worker_id
188188
assert raw["needs_reprocessing"] is True
189189

190190
# Process v1 — should still publish

0 commit comments

Comments
 (0)