Ik zit met een, volgens mij, eerder conceptuele vraag dan een praktische. Het scenario is relatief simpel en ik kan zelf ook een aantal oplossing bedenken, maar ik ben eigenlijk op zoek naar verschillende meningen en/of praktische ervaring. Bekijk het als een poging om mezelf te verbeteren en mijn horizon te verbreden.
Laat ons beginnen met een beetje context: Ik heb verschillende bronnen die data aanleveren. Elke bron levert andere data aan. De bedoeling is om data uit elke bron te transformeren tot één enkele output. Het komt dus eigenlijk simpelweg neer op een ETL proces.
Om dit te faciliteren heb ik in assembly A de volgende interfaces en classes.
(Let op, dit is versimpelde representatie puur om het probleem te schetsen)
Alles heeft zo mooi zijn eigen verantwoordelijkheid. Ook kan ik zo met behulp van dependency injection heel eenvoudig een nieuw ETL processing job maken door gewoon een andere implementatie te voorzien voor beide interfaces. Dit is dan per bron een eigen assembly.
Het feitelijke probleem treed op wanneer we spreken van een implementatie van IItemExtractor waarbij we gebruik maken van bijvoorbeeld een Azure Storage Queue.
In dit geval moet elk bericht dat van de queue gehaald wordt ook expliciet verwijderd worden, anders blijft dit "oneindig" lang bestaan en bijgevolg meerdere keren verwerkt worden.
MAAR nu is de vereiste uiteraard dat dit enkel zou mogen wanneer zeker is dat het bericht van de queue met succes getransformeerd én geladen is.
De mogelijkheden die ik kan bedenken om dit op een zo generiek mogelijke manier te voorzien zonder al te veel overhead te forceren daar waar niet nodig:
1. Een interface IPostProcessable voorzien en dan met pattern matching potentieel de PostProcess methode aan te roepen.
De Run methode zou dan zoiets worden:
2. Een interface IPostProcessor voorzien, net zoals de IItemExtractor en IItemTransformer, die gewoonweg een implementatie vereist.
De ProcessingJob zou dan zoiets worden:
3. De Extract methode van IItemExtractor zo aanpassen dat deze een IPostProcessable<T> interface teruggeeft die het feitelijke item "wrapped" samen met een PostProcess methode:
De Run methode zou dan zoiets worden:
En dan nu de vraag: Welke oplossing is volgens jullie de beste en waarom? Of is er nog een andere manier die mij een Eureka momentje kan bezorgen?
Ik ben oprecht benieuwd naar jullie inzichten, waarvoor mijn dank bij voorbaat!

Laat ons beginnen met een beetje context: Ik heb verschillende bronnen die data aanleveren. Elke bron levert andere data aan. De bedoeling is om data uit elke bron te transformeren tot één enkele output. Het komt dus eigenlijk simpelweg neer op een ETL proces.
Om dit te faciliteren heb ik in assembly A de volgende interfaces en classes.
(Let op, dit is versimpelde representatie puur om het probleem te schetsen)
C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| public interface IItemExtractor<T> { IAsyncEnumerable<T> Extract(CancellationToken cancellationToken = default); } public interface IItemTransformer<T> { ValueTask<MyTargetObject> Transform(T item, CancellationToken cancellationToken = default); } public class MyTargetObject { public int MyProperty { get; set; } public string SomethingElse { get; set; } } public class ProcessingJob<T> { private readonly IItemExtractor<T> loader; private readonly IItemTransformer<T> transformer; public ProcessingJob(IItemExtractor<T> loader, IItemTransformer<T> transformer) { this.loader = loader; this.transformer = transformer; } public async Task Run(CancellationToken cancellationToken) { await foreach (var item in this.loader.Extract(cancellationToken)) { var transformedItem = await this.transformer.Transform(item, cancellationToken); await Load(transformedItem); } } private Task Load(MyTargetObject transformedItem) { // Imagine an actual implementation here. Yes this belongs in a separate class, I know. This is only for explanatory purposes! :) return Task.CompletedTask; } } |
Alles heeft zo mooi zijn eigen verantwoordelijkheid. Ook kan ik zo met behulp van dependency injection heel eenvoudig een nieuw ETL processing job maken door gewoon een andere implementatie te voorzien voor beide interfaces. Dit is dan per bron een eigen assembly.
Het feitelijke probleem treed op wanneer we spreken van een implementatie van IItemExtractor waarbij we gebruik maken van bijvoorbeeld een Azure Storage Queue.
In dit geval moet elk bericht dat van de queue gehaald wordt ook expliciet verwijderd worden, anders blijft dit "oneindig" lang bestaan en bijgevolg meerdere keren verwerkt worden.
MAAR nu is de vereiste uiteraard dat dit enkel zou mogen wanneer zeker is dat het bericht van de queue met succes getransformeerd én geladen is.
De mogelijkheden die ik kan bedenken om dit op een zo generiek mogelijke manier te voorzien zonder al te veel overhead te forceren daar waar niet nodig:
1. Een interface IPostProcessable voorzien en dan met pattern matching potentieel de PostProcess methode aan te roepen.
C#:
1
2
3
4
| public interface IPostProcessable { Task PostProcess(CancellationToken cancellationToken); } |
De Run methode zou dan zoiets worden:
C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
| public async Task Run(CancellationToken cancellationToken) { await foreach (var item in this.loader.Extract(cancellationToken)) { var transformedItem = await this.transformer.Transform(item, cancellationToken); await Load(transformedItem); if(item is IPostProcessable postProcessable) { await postProcessable.PostProcess(cancellationToken); } } } |
2. Een interface IPostProcessor voorzien, net zoals de IItemExtractor en IItemTransformer, die gewoonweg een implementatie vereist.
C#:
1
2
3
4
| public interface IPostProcessor<T> { Task PostProcess(T item, CancellationToken cancellationToken = default); } |
De ProcessingJob zou dan zoiets worden:
C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| public class ProcessingJob<T> { private readonly IItemExtractor<T> loader; private readonly IItemTransformer<T> transformer; private readonly IItemPostProcessor<T> postProcessor; public ProcessingJob(IItemExtractor<T> loader, IItemTransformer<T> transformer, IItemPostProcessor<T> postProcessor) { this.loader = loader; this.transformer = transformer; this.postProcessor = postProcessor; } public async Task Run(CancellationToken cancellationToken) { await foreach (var item in this.loader.Extract(cancellationToken)) { var transformedItem = await this.transformer.Transform(item, cancellationToken); await Load(transformedItem); await this.postProcessor.PostProcess(item, cancellationToken); } } private Task Load(MyTargetObject transformedItem) { // Imagine an actual implementation here. Yes this belongs in a separate class, I know. This // is only for explanatory purposes! :) return Task.CompletedTask; } } |
3. De Extract methode van IItemExtractor zo aanpassen dat deze een IPostProcessable<T> interface teruggeeft die het feitelijke item "wrapped" samen met een PostProcess methode:
C#:
1
2
3
4
5
6
7
8
9
10
11
| public interface IPostProcessable<T> { T Value { get; } Task PostProcess(CancellationToken cancellation = default); } public interface IItemExtractor<T> { IAsyncEnumerable<IPostProcessable<T>> Extract(CancellationToken cancellationToken = default); } |
De Run methode zou dan zoiets worden:
C#:
1
2
3
4
5
6
7
8
9
| public async Task Run(CancellationToken cancellationToken) { await foreach (var item in this.loader.Extract(cancellationToken)) { var transformedItem = await this.transformer.Transform(item.Value, cancellationToken); await Load(transformedItem); await item.PostProcess(cancellationToken); } } |
En dan nu de vraag: Welke oplossing is volgens jullie de beste en waarom? Of is er nog een andere manier die mij een Eureka momentje kan bezorgen?
Ik ben oprecht benieuwd naar jullie inzichten, waarvoor mijn dank bij voorbaat!