This article describes an example of creating and executing an ETL task with data union from different sources.
Three tables must be in the repository: T_SourceOne, T_SourceTwo and T_Destination. Tables must be identical in structure. The repository must also contain an ETL task with the ETLTASKS identifier. On executing the example presented below four objects are created in the ETL task: two repository sources, the Union transformer and repository consumer. The required properties and links are set for all objects:
After objects are created and saved, the ETL task is executed. The similar code applied to different objects is placed into separate procedures or functions.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProviderOne, EtlProviderTwo: IEtlPlainDataProvider;
MetabaseProviderOne, MetabaseProviderTwo: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
Union: IEtlPlainDataUnion;
UnionInputs: IEtlPlainInputs;
InputOne, InputTwo: IEtlPlainInput;
Link, Link1, Link2: IEtlPlainLink;
Shapes: IWxShapes;
ProvOneFields, ProvTwoFields, UnionInOneFields, UnionInTwoFields, UnionOutFields, ConsFields: IEtlPlainFields;
Mapper: IEtlPlainFieldsMapper;
Field: IEtlPlainField;
Begin
//Open ETL task
MB := MetabaseClass.Active;
ETLTask := MB.ItemById("ETLTASKS").Edit As IEtlTask;
//Create first provider
EtlProviderOne := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProviderOne := EtlProviderOne.Edit;
EtlProviderOne.Id := "Metabase_Provider1";
EtlProviderOne.Name := "Import from table 1";
MetabaseProviderOne := EtlProviderOne.Provider As IDtMetabaseProvider;
MetabaseProviderOne.Dataset := MB.ItemById("T_SourceOne").Bind As IDatasetModel;
EtlProviderOne.FillDefault;
//Create a visual object of provider
CreateWX(ETLTask, EtlProviderOne, -60, 0);
//Create second provider
EtlProviderTwo := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProviderTwo := EtlProviderTwo.Edit;
EtlProviderTwo.Id := "Metabase_Provider2";
EtlProviderTwo.Name := "Import from table 2";
MetabaseProviderTwo := EtlProviderTwo.Provider As IDtMetabaseProvider;
MetabaseProviderTwo.Dataset := MB.ItemById("T_SourceTwo").Bind As IDatasetModel;
EtlProviderTwo.FillDefault;
//Create a visual object of provider
CreateWX(ETLTask, EtlProviderTwo, -60, 20);
//Create a consumer
EtlConsumer := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit;
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById("T_Destination").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create the "Union" transformer
Union := ETLTask.Create(EtlObjectType.PlainDataUnion) As IEtlPlainDataUnion;
Union := Union.Edit;
Union.Id := "Union_Transform";
Union.Name := "Union";
//Create two outputs
UnionInputs := Union.PlainInputs;
UnionInputs.Add;
UnionInputs.Add;
InputOne := UnionInputs.Item(0);
InputTwo := UnionInputs.Item(1);
//Create a visual object of transformer
CreateWX(ETLTask, Union, 0, 0);
//Fill lists of fields of all objects based on the list of provider fieds
//Get list of fields
ProvOneFields := EtlProviderOne.PlainOutput.Fields; //Output of first provider
ProvTwoFields := EtlProviderTwo.PlainOutput.Fields; //Output of second provider
UnionInOneFields := InputOne.Fields; //First input of transformer
UnionInTwoFields := InputTwo.Fields; //Second input of transformer
UnionOutFields := Union.PlainOutput.Fields; //Transformer output
ConsFields := EtlConsumer.PlainInput.Fields; //Consumer input
//Fill lists of fields
FillFields(UnionInOneFields, ProvOneFields);
FillFields(UnionInTwoFields, ProvTwoFields);
FillFields(UnionOutFields, UnionInOneFields);
FillFields(ConsFields, UnionOutFields);
//Set up conditions and link fields
Mapper := Union.Mapper(0);
For Each Field In UnionOutFields Do
Mapper.Map(Field).AsString := InputOne.Id + "." + Field.Id;
End For;
Mapper := Union.Mapper(1);
For Each Field In UnionOutFields Do
Mapper.Map(Field).AsString := InputTwo.Id + "." + Field.Id;
End For;
//Save all objects
EtlProviderOne.Save;
EtlProviderTwo.Save;
Union.Save;
EtlConsumer.Save;
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProviderOne.PlainOutput, InputOne);
Link1 := CreateLink(ETLTask, EtlProviderTwo.PlainOutput, InputTwo);
Link2 := CreateLink(ETLTask, Union.PlainOutput, EtlConsumer.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProviderOne.Id), Shapes.FindById(Union.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(EtlProviderTwo.Id), Shapes.FindById(Union.Id), Link1);
CreateWXLink(ETLTask, Shapes.FindById(Union.Id), Shapes.FindById(EtlConsumer.Id), Link2);
//Save task}
(ETLTask As IMetabaseObject).Save;
//Execute task
ETLTask.Execute(Null);
End Sub UserProc;
//Create visual objects
Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject;
Begin
WxRect := ETLTask.Workspace.CreateRectangle;
WxRect.Id := ETLObject.Id;
WxETLObj := New WxEtlObject.Create;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.Bottom;
WxRect.Style.PictureMarginTop := -10;
WxRect.PinPosition := New GxPointF.Create(XPosition, YPosition);
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub CreateWX;
//Fill lists of input/output fields
Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit;
Fields.Fill(FieldsSource);
Fields.Save;
End Sub FillFields;
//Create a link between objects
Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink;
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault;
Return Link;
End Function CreateLink;
//Create visual links
Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject;
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.Flat;
WLink.Style.LinePenEndWxCap := WxLineCap.Arrow30DegreeFilled;
WxETLLink := New WxEtlObject.Create;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub CreateWXLink;
The specified procedure is an entry point for .NET assembly. Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProviderOne, EtlProviderTwo: IEtlPlainDataProvider;
MetabaseProviderOne, MetabaseProviderTwo: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
Union: IEtlPlainDataUnion;
UnionInputs: IEtlPlainInputs;
InputOne, InputTwo: IEtlPlainInput;
Link, Link1, Link2: IEtlPlainLink;
Shapes: IWxShapes;
ProvOneFields, ProvTwoFields, UnionInOneFields, UnionInTwoFields, UnionOutFields, ConsFields: IEtlPlainFields;
Mapper: IEtlPlainFieldsMapper;
Field: IEtlPlainField;
Begin
//Open ETL task
MB := Params.Metabase;
ETLTask := MB.ItemById["ETLTASKS"].Edit() As IEtlTask;
//Create first provider
EtlProviderOne := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProviderOne := EtlProviderOne.Edit();
EtlProviderOne.Id := "Metabase_Provider1";
EtlProviderOne.Name := "Import from table 1";
MetabaseProviderOne := EtlProviderOne.Provider As IDtMetabaseProvider;
MetabaseProviderOne.Dataset := MB.ItemById["T_SourceOne"].Bind() As IDatasetModel;
EtlProviderOne.FillDefault();
//Create a visual object of provider
CreateWX(ETLTask, EtlProviderOne, -60, 0);
//Create second provider
EtlProviderTwo := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProviderTwo := EtlProviderTwo.Edit();
EtlProviderTwo.Id := "Metabase_Provider2";
EtlProviderTwo.Name := "Import from table 2";
MetabaseProviderTwo := EtlProviderTwo.Provider As IDtMetabaseProvider;
MetabaseProviderTwo.Dataset := MB.ItemById["T_SourceTwo"].Bind() As IDatasetModel;
EtlProviderTwo.FillDefault();
//Create a visual object of provider
CreateWX(ETLTask, EtlProviderTwo, -60, 20);
//Create a consumer
EtlConsumer := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit();
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById["T_Destination"].Bind() As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create0 the "Union" transformer
Union := ETLTask.Create(EtlObjectType.eotPlainDataUnion) As IEtlPlainDataUnion;
Union := Union.Edit();
Union.Id := "Union_Transform";
Union.Name := "Union";
//Create two outputs
UnionInputs := Union.PlainInputs;
UnionInputs.Add();
UnionInputs.Add();
InputOne := UnionInputs.Item[0];
InputTwo := UnionInputs.Item[1];
//Create a visual object of transformer
CreateWX(ETLTask, Union, 0, 0);
//Fill lists of fields of all objects based on the list of provider fields
//Get list of fields
ProvOneFields := EtlProviderOne.PlainOutput.Fields; //First provider output
ProvTwoFields := EtlProviderTwo.PlainOutput.Fields; //Second provider output
UnionInOneFields := InputOne.Fields; //First input of transformer
UnionInTwoFields := InputTwo.Fields; //Second input of transformer
UnionOutFields := Union.PlainOutput.Fields; //Transformer output
ConsFields := EtlConsumer.PlainInput.Fields; //Consumer input
//Fill lists of fields
FillFields(UnionInOneFields, ProvOneFields);
FillFields(UnionInTwoFields, ProvTwoFields);
FillFields(UnionOutFields, UnionInOneFields);
FillFields(ConsFields, UnionOutFields);
//Set up conditions and link fields
Mapper := Union.Mapper[0];
For Each Field In UnionOutFields Do
Mapper.Map[Field].AsString := InputOne.Id + "." + Field.Id;
End For;
Mapper := Union.Mapper[1];
For Each Field In UnionOutFields Do
Mapper.Map[Field].AsString := InputTwo.Id + "." + Field.Id;
End For;
//Save all objects
EtlProviderOne.Save();
EtlProviderTwo.Save();
Union.Save();
EtlConsumer.Save();
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProviderOne.PlainOutput, InputOne);
Link1 := CreateLink(ETLTask, EtlProviderTwo.PlainOutput, InputTwo);
Link2 := CreateLink(ETLTask, Union.PlainOutput, EtlConsumer.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProviderOne.Id), Shapes.FindById(Union.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(EtlProviderTwo.Id), Shapes.FindById(Union.Id), Link1);
CreateWXLink(ETLTask, Shapes.FindById(Union.Id), Shapes.FindById(EtlConsumer.Id), Link2);
//Save task
(ETLTask As IMetabaseObject).Save();
//Execute task
ETLTask.Execute(Null);
End Sub;
//Create visual objects
Public Shared Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
GxPointFCls: GxPointF = New GxPointFClass();
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject = New WxEtlObject();
Begin
WxRect := ETLTask.Workspace.CreateRectangle();
WxRect.Id := ETLObject.Id;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.wtpBottom;
WxRect.Style.PictureMarginTop := -10;
GxPointFCls.Create(XPosition, YPosition);
WxRect.PinPosition := GxPointFCls;
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub;
//Create visual links
Public Shared Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject = New WxEtlObject();
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.wlcFlat;
WLink.Style.LinePenEndWxCap := WxLineCap.wlcArrow30DegreeFilled;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub;
//Fill lists of input/output fields
Public Shared Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit();
Fields.Fill(FieldsSource);
Fields.Save();
End Sub;
//Create a link between objects
Public Shared Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink();
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault();
Return Link;
End Function;
See also: