This article describes an example of creating and executing an ETL task with deleting duplicate values at the output.
The repository must contain three tables: T_Source, T_Destination and T_Duplicate. Tables must be identical in structure, the field with the Value identifier is present, this field is used to check for duplicates. The repository must also contain an ETL task with the ETLTASKS identifier. On executing the example presented below four objects are created in the ETL task: repository source, the Delete Duplicates converter and two consumers (unique data goes to the first one , and duplicates - to the second one). The required properties and links are set for all objects:
After objects are created and saved, the ETL task is executed. The similar code applied to different objects is placed into separate procedures or functions.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer, DuplicateConsumer: IEtlPlainDataConsumer;
MetabaseConsumer, MetabaseConsumer1: IDtMetabaseConsumer;
Deduplicate: IEtlPlainDataDeduplicate;
Index: IEtlPlainIndex;
Link, Link1, Link2: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, DedupInFields, DedupOutFields, DedupOutDIFields, ConsFields, DupConsFields: IEtlPlainFields;
Begin
//Open ETL task
MB := MetabaseClass.Active;
ETLTask := MB.ItemById("ETLTASKS").Edit As IEtlTask;
//Create a provider
EtlProvider := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit;
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById("T_Source").Bind As IDatasetModel;
EtlProvider.FillDefault;
//Create a visual object of provider
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create a consumer
EtlConsumer := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit;
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById("T_Destination").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create a consumer for duplicates
DuplicateConsumer := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
DuplicateConsumer := DuplicateConsumer.Edit;
DuplicateConsumer.Id := "Duplicate_Consumer";
DuplicateConsumer.Name := "Duplicates export";
MetabaseConsumer1 := DuplicateConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer1.Dataset := MB.ItemById("T_Duplicate").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, DuplicateConsumer, 60, 20);
//Create the "Duplicates Removal" object
Deduplicate := ETLTask.Create(EtlObjectType.PlainDataDeduplicate) As IEtlPlainDataDeduplicate;
Deduplicate := Deduplicate.Edit;
Deduplicate.Id := "Deduplicate_Transform";
Deduplicate.Name := "Duplicates removal";
Deduplicate.Rule := EtlAgregateFormula.Min; //Record satisfies condition
Deduplicate.Expression.AsString := "True";
//Create a visual object of transformer
CreateWX(ETLTask, Deduplicate, 0, 0);
//Fill field lists of all objects based on provider field list
//Get list of fields
ProvFields := EtlProvider.PlainOutput.Fields; //Provider output
DedupInFields := Deduplicate.PlainInput.Fields; //Transformer input
DedupOutFields := Deduplicate.PlainOutput.Fields; //Transformer output
DedupOutDIFields := Deduplicate.DuplicatePlainOutput.Fields; //Output for saving duplicates
ConsFields := EtlConsumer.PlainInput.Fields; //Consumer input
DupConsFields := DuplicateConsumer.PlainInput.Fields; //Input of duplicates consumer
//Fill lists of fields
FillFields(DedupInFields, ProvFields);
FillFields(DedupOutFields, DedupInFields);
FillFields(DedupOutDIFields, DedupInFields);
FillFields(ConsFields, DedupOutFields);
FillFields(DupConsFields, DedupOutDIFields);
//Set up transformer index
Index := Deduplicate.Index.Edit;
Index.AddField;
Index.PlainInputField(0, 0) := DedupInFields.FindById("VALUE");
Index.Save;
//Save all objects
EtlProvider.Save;
Deduplicate.Save;
EtlConsumer.Save;
DuplicateConsumer.Save;
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, Deduplicate.PlainInput);
Link1 := CreateLink(ETLTask, Deduplicate.PlainOutput, EtlConsumer.PlainInput);
Link2 := CreateLink(ETLTask, Deduplicate.DuplicatePlainOutput, DuplicateConsumer.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProvider.Id), Shapes.FindById(Deduplicate.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(Deduplicate.Id), Shapes.FindById(EtlConsumer.Id), Link1);
CreateWXLink(ETLTask, Shapes.FindById(Deduplicate.Id), Shapes.FindById(DuplicateConsumer.Id), Link2);
//Save task
(ETLTask As IMetabaseObject).Save;
//Execute task
ETLTask.Execute(Null);
End Sub UserProc;
//Create visual objects
Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject;
Begin
WxRect := ETLTask.Workspace.CreateRectangle;
WxRect.Id := ETLObject.Id;
WxETLObj := New WxEtlObject.Create;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.Bottom;
WxRect.Style.PictureMarginTop := -10;
WxRect.PinPosition := New GxPointF.Create(XPosition, YPosition);
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub CreateWX;
//Fill lists of input/output fields
Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit;
Fields.Fill(FieldsSource);
Fields.Save;
End Sub FillFields;
//Create a link between objects
Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink;
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault;
Return Link;
End Function CreateLink;
//Create visual links
Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject;
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.Flat;
WLink.Style.LinePenEndWxCap := WxLineCap.Arrow30DegreeFilled;
WxETLLink := New WxEtlObject.Create;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub CreateWXLink;
The specified procedure is an entry point for .NET assembly. Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer, DuplicateConsumer: IEtlPlainDataConsumer;
MetabaseConsumer, MetabaseConsumer1: IDtMetabaseConsumer;
Deduplicate: IEtlPlainDataDeduplicate;
Index: IEtlPlainIndex;
Link, Link1, Link2: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, DedupInFields, DedupOutFields, DedupOutDIFields, ConsFields, DupConsFields: IEtlPlainFields;
Begin
//Open ETL task
MB := Params.Metabase;
ETLTask := MB.ItemById["ETLTASKS"].Edit() As IEtlTask;
//Create a provider
EtlProvider := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit();
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById["T_Source"].Bind() As IDatasetModel;
EtlProvider.FillDefault();
//Create a visual object of provider
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create a consumer
EtlConsumer := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit();
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById["T_Destination"].Bind() As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create a consumer for duplicates
DuplicateConsumer := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
DuplicateConsumer := DuplicateConsumer.Edit();
DuplicateConsumer.Id := "Duplicate_Consumer";
DuplicateConsumer.Name := "Duplicates export";
MetabaseConsumer1 := DuplicateConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer1.Dataset := MB.ItemById["T_Duplicate"].Bind() As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, DuplicateConsumer, 60, 20);
//Create the "Duplicates Removal" transformer
Deduplicate := ETLTask.Create(EtlObjectType.eotPlainDataDeduplicate) As IEtlPlainDataDeduplicate;
Deduplicate := Deduplicate.Edit();
Deduplicate.Id := "Deduplicate_Transform";
Deduplicate.Name := "Duplicates removal";
Deduplicate.Rule := EtlAgregateFormula.eafMin; //Record satisfies condition
Deduplicate.Expression.AsString := "True";
//Create a visual object of transformer
CreateWX(ETLTask, Deduplicate, 0, 0);
//Fill lists of fields of all objects based on the provider field list
//Get list of fields
ProvFields := EtlProvider.PlainOutput.Fields;
DedupInFields := Deduplicate.PlainInput.Fields;
DedupOutFields := Deduplicate.PlainOutput.Fields;
DedupOutDIFields := Deduplicate.DuplicatePlainOutput.Fields;
ConsFields := EtlConsumer.PlainInput.Fields;
DupConsFields := DuplicateConsumer.PlainInput.Fields;
//Fill lists
FillFields(DedupInFields, ProvFields);
FillFields(DedupOutFields, DedupInFields);
FillFields(DedupOutDIFields, DedupInFields);
FillFields(ConsFields, DedupOutFields);
FillFields(DupConsFields, DedupOutDIFields);
//Transformer index setup
Index := Deduplicate.Index.Edit();
Index.AddField();
Index.PlainInputField[0, 0] := DedupInFields.FindById("VALUE");
Index.Save();
//Save all objects
EtlProvider.Save();
Deduplicate.Save();
EtlConsumer.Save();
DuplicateConsumer.Save();
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, Deduplicate.PlainInput);
Link1 := CreateLink(ETLTask, Deduplicate.PlainOutput, EtlConsumer.PlainInput);
Link2 := CreateLink(ETLTask, Deduplicate.DuplicatePlainOutput, DuplicateConsumer.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProvider.Id), Shapes.FindById(Deduplicate.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(Deduplicate.Id), Shapes.FindById(EtlConsumer.Id), Link1);
CreateWXLink(ETLTask, Shapes.FindById(Deduplicate.Id), Shapes.FindById(DuplicateConsumer.Id), Link2);
//Save task
(ETLTask As IMetabaseObject).Save();
//Execute task
ETLTask.Execute(Null);
End Sub;
//Create visual objects
Public Shared Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
GxPointFCls: GxPointF = New GxPointFClass();
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject = New WxEtlObject();
Begin
WxRect := ETLTask.Workspace.CreateRectangle();
WxRect.Id := ETLObject.Id;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.wtpBottom;
WxRect.Style.PictureMarginTop := -10;
GxPointFCls.Create(XPosition, YPosition);
WxRect.PinPosition := GxPointFCls;
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub;
//Create visual links
Public Shared Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject = New WxEtlObject();
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.wlcFlat;
WLink.Style.LinePenEndWxCap := WxLineCap.wlcArrow30DegreeFilled;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub;
//Fill lists of input/output fields
Public Shared Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit();
Fields.Fill(FieldsSource);
Fields.Save();
End Sub;
//Create a link between objects
Public Shared Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink();
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault();
Return Link;
End Function;
See also:
Examples | IEtlPlainDataDeduplicate