Toon posts:

[C#] Wat is de beste implementatie?

Pagina: 1
Acties:

Onderwerpen

Vraag


  • Diumelia
  • Registratie: Augustus 2010
  • Laatst online: 05-02 13:18
Ik zit met een, volgens mij, eerder conceptuele vraag dan een praktische. Het scenario is relatief simpel en ik kan zelf ook een aantal oplossing bedenken, maar ik ben eigenlijk op zoek naar verschillende meningen en/of praktische ervaring. Bekijk het als een poging om mezelf te verbeteren en mijn horizon te verbreden. :)

Laat ons beginnen met een beetje context: Ik heb verschillende bronnen die data aanleveren. Elke bron levert andere data aan. De bedoeling is om data uit elke bron te transformeren tot één enkele output. Het komt dus eigenlijk simpelweg neer op een ETL proces.

Om dit te faciliteren heb ik in assembly A de volgende interfaces en classes.
(Let op, dit is versimpelde representatie puur om het probleem te schetsen)
C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public interface IItemExtractor<T>
{
    IAsyncEnumerable<T> Extract(CancellationToken cancellationToken = default);
}

public interface IItemTransformer<T>
{
    ValueTask<MyTargetObject> Transform(T item, CancellationToken cancellationToken = default);
}

public class MyTargetObject
{
    public int MyProperty { get; set; }
    public string SomethingElse { get; set; }
}

public class ProcessingJob<T>
{
    private readonly IItemExtractor<T> loader;
    private readonly IItemTransformer<T> transformer;

    public ProcessingJob(IItemExtractor<T> loader, IItemTransformer<T> transformer)
    {
        this.loader = loader;
        this.transformer = transformer;
    }

    public async Task Run(CancellationToken cancellationToken)
    {
        await foreach (var item in this.loader.Extract(cancellationToken))
        {
            var transformedItem = await this.transformer.Transform(item, cancellationToken);
            await Load(transformedItem);
        }
    }

    private Task Load(MyTargetObject transformedItem)
    {
        // Imagine an actual implementation here. Yes this belongs in a separate class, I know. This is only for explanatory purposes! :)
        return Task.CompletedTask;
    }
}


Alles heeft zo mooi zijn eigen verantwoordelijkheid. Ook kan ik zo met behulp van dependency injection heel eenvoudig een nieuw ETL processing job maken door gewoon een andere implementatie te voorzien voor beide interfaces. Dit is dan per bron een eigen assembly.

Het feitelijke probleem treed op wanneer we spreken van een implementatie van IItemExtractor waarbij we gebruik maken van bijvoorbeeld een Azure Storage Queue.
In dit geval moet elk bericht dat van de queue gehaald wordt ook expliciet verwijderd worden, anders blijft dit "oneindig" lang bestaan en bijgevolg meerdere keren verwerkt worden.
MAAR nu is de vereiste uiteraard dat dit enkel zou mogen wanneer zeker is dat het bericht van de queue met succes getransformeerd én geladen is.

De mogelijkheden die ik kan bedenken om dit op een zo generiek mogelijke manier te voorzien zonder al te veel overhead te forceren daar waar niet nodig:

1. Een interface IPostProcessable voorzien en dan met pattern matching potentieel de PostProcess methode aan te roepen.
C#:
1
2
3
4
public interface IPostProcessable
{
    Task PostProcess(CancellationToken cancellationToken);
}


De Run methode zou dan zoiets worden:
C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
public async Task Run(CancellationToken cancellationToken)
    {
        await foreach (var item in this.loader.Extract(cancellationToken))
        {
            var transformedItem = await this.transformer.Transform(item, cancellationToken);
            await Load(transformedItem);

            if(item is IPostProcessable postProcessable)
            {
                await postProcessable.PostProcess(cancellationToken);
            }
        }
    }


2. Een interface IPostProcessor voorzien, net zoals de IItemExtractor en IItemTransformer, die gewoonweg een implementatie vereist.
C#:
1
2
3
4
public interface IPostProcessor<T>
{
    Task PostProcess(T item, CancellationToken cancellationToken = default);
}


De ProcessingJob zou dan zoiets worden:
C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ProcessingJob<T>
{
    private readonly IItemExtractor<T> loader;
    private readonly IItemTransformer<T> transformer;
    private readonly IItemPostProcessor<T> postProcessor;

    public ProcessingJob(IItemExtractor<T> loader, IItemTransformer<T> transformer, IItemPostProcessor<T> postProcessor)
    {
        this.loader = loader;
        this.transformer = transformer;
        this.postProcessor = postProcessor;
    }

    public async Task Run(CancellationToken cancellationToken)
    {
        await foreach (var item in this.loader.Extract(cancellationToken))
        {
            var transformedItem = await this.transformer.Transform(item, cancellationToken);
            await Load(transformedItem);

            await this.postProcessor.PostProcess(item, cancellationToken);
        }
    }

    private Task Load(MyTargetObject transformedItem)
    {
        // Imagine an actual implementation here. Yes this belongs in a separate class, I know. This
        // is only for explanatory purposes! :)
        return Task.CompletedTask;
    }
}


3. De Extract methode van IItemExtractor zo aanpassen dat deze een IPostProcessable<T> interface teruggeeft die het feitelijke item "wrapped" samen met een PostProcess methode:
C#:
1
2
3
4
5
6
7
8
9
10
11
public interface IPostProcessable<T>
{
    T Value { get; }

    Task PostProcess(CancellationToken cancellation = default);
}

public interface IItemExtractor<T>
{
    IAsyncEnumerable<IPostProcessable<T>> Extract(CancellationToken cancellationToken = default);
}


De Run methode zou dan zoiets worden:
C#:
1
2
3
4
5
6
7
8
9
public async Task Run(CancellationToken cancellationToken)
    {
        await foreach (var item in this.loader.Extract(cancellationToken))
        {
            var transformedItem = await this.transformer.Transform(item.Value, cancellationToken);
            await Load(transformedItem);
            await item.PostProcess(cancellationToken);
        }
    }


En dan nu de vraag: Welke oplossing is volgens jullie de beste en waarom? Of is er nog een andere manier die mij een Eureka momentje kan bezorgen?

Ik ben oprecht benieuwd naar jullie inzichten, waarvoor mijn dank bij voorbaat!

Alle reacties


  • Merethil
  • Registratie: December 2008
  • Laatst online: 23:52
Ik vraag me af hoe je het nu doet met andere bronnen - als je bv een bestand binnenkrijgt, gooi je die dan al na de extract-stap weg? Als je een stagingtabel binnenkrijgt in een db, vink je de data dan al af na de extract?

Het lijkt me dat je eigenlijk altijd een stap ná je load wil om dingen af te ronden: files opruimen, tabel(len) bijwerken, bron (via callback URL ofzo) informeren dat zending met ID XYZ is verwerkt etc.
En toch tenminste wat logging, al zou die net zo goed kunnen in een (abstract) parent class/via aspects omdat die per ETL-proces niet echt zal veranderen natuurlijk.

Mijn insteek zou dus optie twee zijn.

[Voor 3% gewijzigd door Merethil op 10-01-2023 02:28]



Tweakers maakt gebruik van cookies

Tweakers plaatst functionele en analytische cookies voor het functioneren van de website en het verbeteren van de website-ervaring. Deze cookies zijn noodzakelijk. Om op Tweakers relevantere advertenties te tonen en om ingesloten content van derden te tonen (bijvoorbeeld video's), vragen we je toestemming. Via ingesloten content kunnen derde partijen diensten leveren en verbeteren, bezoekersstatistieken bijhouden, gepersonaliseerde content tonen, gerichte advertenties tonen en gebruikersprofielen opbouwen. Hiervoor worden apparaatgegevens, IP-adres, geolocatie en surfgedrag vastgelegd.

Meer informatie vind je in ons cookiebeleid.

Sluiten

Toestemming beheren

Hieronder kun je per doeleinde of partij toestemming geven of intrekken. Meer informatie vind je in ons cookiebeleid.

Functioneel en analytisch

Deze cookies zijn noodzakelijk voor het functioneren van de website en het verbeteren van de website-ervaring. Klik op het informatie-icoon voor meer informatie. Meer details

janee

    Relevantere advertenties

    Dit beperkt het aantal keer dat dezelfde advertentie getoond wordt (frequency capping) en maakt het mogelijk om binnen Tweakers contextuele advertenties te tonen op basis van pagina's die je hebt bezocht. Meer details

    Tweakers genereert een willekeurige unieke code als identifier. Deze data wordt niet gedeeld met adverteerders of andere derde partijen en je kunt niet buiten Tweakers gevolgd worden. Indien je bent ingelogd, wordt deze identifier gekoppeld aan je account. Indien je niet bent ingelogd, wordt deze identifier gekoppeld aan je sessie die maximaal 4 maanden actief blijft. Je kunt deze toestemming te allen tijde intrekken.

    Ingesloten content van derden

    Deze cookies kunnen door derde partijen geplaatst worden via ingesloten content. Klik op het informatie-icoon voor meer informatie over de verwerkingsdoeleinden. Meer details

    janee