This article describes an example of creating and executing an ETL task with data split.
The repository must contain three tables: T_Source, T_DestinationOne and T_DestinationTwo. Tables must be identical in structure, there is a field with the ID identifier, records are split by values of this field. The repository must also contain an ETL task with the ETLTASKS identifier. On executing the example presented below four objects are created in the ETL task: repository source, the Split transformer and two repository consumers. The required properties are defined for all objects, links are set, split conditions and linking of consumers for each condition are set:
After objects are created and saved, the ETL task is executed. The similar code applied to different objects is placed into separate procedures or functions.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumerOne, EtlConsumerTwo: IEtlPlainDataConsumer;
MetabaseConsumerOne, MetabaseConsumerTwo: IDtMetabaseConsumer;
Split: IEtlPlainDataSplit;
SplitOutputs: IEtlPlainOutputs;
OutputOne, OutputTwo: IEtlPlainOutput;
Link, Link1, Link2: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, SplitInFields, SplitOutOneFields, SplitOutTwoFields, ConsOneFields, ConsTwoFields: IEtlPlainFields;
Begin
//Open ETL task
MB := MetabaseClass.Active;
ETLTask := MB.ItemById("ETLTASKS").Edit As IEtlTask;
//Create a provider
EtlProvider := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit;
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById("T_Source").Bind As IDatasetModel;
EtlProvider.FillDefault;
//Create a visual object of provider
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create first consumer
EtlConsumerOne := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumerOne := EtlConsumerOne.Edit;
EtlConsumerOne.Id := "Metabase_Consumer1";
EtlConsumerOne.Name := "Export to table 1";
MetabaseConsumerOne := EtlConsumerOne.Consumer As IDtMetabaseConsumer;
MetabaseConsumerOne.Dataset := MB.ItemById("T_DestinationOne").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumerOne, 60, 0);
//Create second consumer
EtlConsumerTwo := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumerTwo := EtlConsumerTwo.Edit;
EtlConsumerTwo.Id := "Metabase_Consumer2";
EtlConsumerTwo.Name := "Export to table 2";
MetabaseConsumerTwo := EtlConsumerTwo.Consumer As IDtMetabaseConsumer;
MetabaseConsumerTwo.Dataset := MB.ItemById("T_DestinationTwo").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumerTwo, 60, 20);
//Create the "Split" transformer
Split := ETLTask.Create(EtlObjectType.PlainDataSplit) As IEtlPlainDataSplit;
Split := Split.Edit;
Split.Id := "Split_Transform";
Split.Name := "Split";
//Create two outputs
SplitOutputs := Split.PlainOutputs;
SplitOutputs.Add;
SplitOutputs.Add;
OutputOne := SplitOutputs.Item(0);
OutputTwo := SplitOutputs.Item(1);
//Create a visual object of transformer
CreateWX(ETLTask, Split, 0, 0);
//Fill lists of fields of all objects based on the list of provider fields
//Get list of fields
ProvFields := EtlProvider.PlainOutput.Fields; //Provider output
SplitInFields := Split.PlainInput.Fields; //Transformer input
SplitOutOneFields := OutputOne.Fields; //First output of transformer
SplitOutTwoFields := OutputTwo.Fields; //Second output of transformer
ConsOneFields := EtlConsumerOne.PlainInput.Fields; //Input of first consumer
ConsTwoFields := EtlConsumerTwo.PlainInput.Fields; //Input of second consumer
//Fill lists of fields
FillFields(SplitInFields, ProvFields);
FillFields(SplitOutOneFields, SplitInFields);
FillFields(SplitOutTwoFields, SplitInFields);
FillFields(ConsOneFields, SplitOutOneFields);
FillFields(ConsTwoFields, SplitOutTwoFields);
//Set up conditions and link fields
Split.Add;
Split.Condition(0).AsString := Split.PlainInput.Id + ".ID<=100";
Split.Add;
Split.Condition(1).AsString := Split.PlainInput.Id + ".ID>100";
Mapper := Split.Mapper(0);
For Each Field In SplitOutOneFields Do
Mapper.Map(Field).AsString := Split.PlainInput.Id + "." + Field.Id;
End For;
Mapper := Split.Mapper(1);
For Each Field In SplitOutTwoFields Do
Mapper.Map(Field).AsString := Split.PlainInput.Id + "." + Field.Id;
End For;
//Save all objects
EtlProvider.Save;
Split.Save;
EtlConsumerOne.Save;
EtlConsumerTwo.Save;
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, Split.PlainInput);
Link1 := CreateLink(ETLTask, OutputOne, EtlConsumerOne.PlainInput);
Link2 := CreateLink(ETLTask, OutputTwo, EtlConsumerTwo.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProvider.Id), Shapes.FindById(Split.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(Split.Id), Shapes.FindById(EtlConsumerOne.Id), Link1);
CreateWXLink(ETLTask, Shapes.FindById(Split.Id), Shapes.FindById(EtlConsumerTwo.Id), Link2);
//Save task
(ETLTask As IMetabaseObject).Save;
//Execute task
ETLTask.Execute(Null);
End Sub UserProc;
//Create visual objects
Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject;
Begin
WxRect := ETLTask.Workspace.CreateRectangle;
WxRect.Id := ETLObject.Id;
WxETLObj := New WxEtlObject.Create;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.Bottom;
WxRect.Style.PictureMarginTop := -10;
WxRect.PinPosition := New GxPointF.Create(XPosition, YPosition);
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub CreateWX;
//Fill lists of input/output fields
Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit;
Fields.Fill(FieldsSource);
Fields.Save;
End Sub FillFields;
//Create a link between objects
Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink;
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault;
Return Link;
End Function CreateLink;
//Create visual links
Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject;
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.Flat;
WLink.Style.LinePenEndWxCap := WxLineCap.Arrow30DegreeFilled;
WxETLLink := New WxEtlObject.Create;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub CreateWXLink;
The specified procedure is an entry point for .NET assembly. Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumerOne, EtlConsumerTwo: IEtlPlainDataConsumer;
MetabaseConsumerOne, MetabaseConsumerTwo: IDtMetabaseConsumer;
Split: IEtlPlainDataSplit;
SplitOutputs: IEtlPlainOutputs;
OutputOne, OutputTwo: IEtlPlainOutput;
Link, Link1, Link2: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, SplitInFields, SplitOutOneFields, SplitOutTwoFields, ConsOneFields, ConsTwoFields: IEtlPlainFields;
Field: IEtlPlainField;
Mapper: IEtlPlainFieldsMapper;
Begin
//Open ETL task
MB := Params.Metabase;
ETLTask := MB.ItemById["ETLTASKS"].Edit() As IEtlTask;
//Create a provider
EtlProvider := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit();
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById["T_Source"].Bind() As IDatasetModel;
EtlProvider.FillDefault();
//Create a visual object of provider
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create first consumer
EtlConsumerOne := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumerOne := EtlConsumerOne.Edit();
EtlConsumerOne.Id := "Metabase_Consumer1";
EtlConsumerOne.Name := "Export to table 1";
MetabaseConsumerOne := EtlConsumerOne.Consumer As IDtMetabaseConsumer;
MetabaseConsumerOne.Dataset := MB.ItemById["T_DestinationOne"].Bind() As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumerOne, 60, 0);
//Create second consumer
EtlConsumerTwo := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumerTwo := EtlConsumerTwo.Edit();
EtlConsumerTwo.Id := "Metabase_Consumer2";
EtlConsumerTwo.Name := "Export to table 2";
MetabaseConsumerTwo := EtlConsumerTwo.Consumer As IDtMetabaseConsumer;
MetabaseConsumerTwo.Dataset := MB.ItemById["T_DestinationTwo"].Bind() As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumerTwo, 60, 20);
//Create the "Split" transformer
Split := ETLTask.Create(EtlObjectType.eotPlainDataSplit) As IEtlPlainDataSplit;
Split := Split.Edit();
Split.Id := "Split_Transform";
Split.Name := "Split";
//Create twio outputs
SplitOutputs := Split.PlainOutputs;
SplitOutputs.Add();
SplitOutputs.Add();
OutputOne := SplitOutputs.Item[0];
OutputTwo := SplitOutputs.Item[1];
//Create a visual object of transformer
CreateWX(ETLTask, Split, 0, 0);
//Fill lists of fields of all objects based on the list of provider fields
//Get list of fields
ProvFields := EtlProvider.PlainOutput.Fields; //Source output
SplitInFields := Split.PlainInput.Fields; //Transformer output
SplitOutOneFields := OutputOne.Fields; //First output of transformer
SplitOutTwoFields := OutputTwo.Fields; //Second output of transformer
ConsOneFields := EtlConsumerOne.PlainInput.Fields; //Input of first consumer
ConsTwoFields := EtlConsumerTwo.PlainInput.Fields; //Input of second consumer
//Fill lists of fields
FillFields(SplitInFields, ProvFields);
FillFields(SplitOutOneFields, SplitInFields);
FillFields(SplitOutTwoFields, SplitInFields);
FillFields(ConsOneFields, SplitOutOneFields);
FillFields(ConsTwoFields, SplitOutTwoFields);
//Set up conditions and link fields
Split.Add();
Split.Condition[0].AsString := Split.PlainInput.Id + ".ID<=100";
Split.Add();
Split.Condition[1].AsString := Split.PlainInput.Id + ".ID>100";
Mapper := Split.Mapper[0];
For Each Field In SplitOutOneFields Do
Mapper.Map[Field].AsString := Split.PlainInput.Id + "." + Field.Id;
End For;
Mapper := Split.Mapper[1];
For Each Field In SplitOutTwoFields Do
Mapper.Map[Field].AsString := Split.PlainInput.Id + "." + Field.Id;
End For;
//Save all objects
EtlProvider.Save();
Split.Save();
EtlConsumerOne.Save();
EtlConsumerTwo.Save();
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, Split.PlainInput);
Link1 := CreateLink(ETLTask, OutputOne, EtlConsumerOne.PlainInput);
Link2 := CreateLink(ETLTask, OutputTwo, EtlConsumerTwo.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProvider.Id), Shapes.FindById(Split.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(Split.Id), Shapes.FindById(EtlConsumerOne.Id), Link1);
CreateWXLink(ETLTask, Shapes.FindById(Split.Id), Shapes.FindById(EtlConsumerTwo.Id), Link2);
//Save task
(ETLTask As IMetabaseObject).Save();
//Execute task
ETLTask.Execute(Null);
End Sub;
//Create visual objects
Public Shared Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
GxPointFCls: GxPointF = New GxPointFClass();
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject = New WxEtlObject();
Begin
WxRect := ETLTask.Workspace.CreateRectangle();
WxRect.Id := ETLObject.Id;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.wtpBottom;
WxRect.Style.PictureMarginTop := -10;
GxPointFCls.Create(XPosition, YPosition);
WxRect.PinPosition := GxPointFCls;
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub;
//Create visual links
Public Shared Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject = New WxEtlObject();
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.wlcFlat;
WLink.Style.LinePenEndWxCap := WxLineCap.wlcArrow30DegreeFilled;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub;
//Fill lists of input/output fields
Public Shared Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit();
Fields.Fill(FieldsSource);
Fields.Save();
End Sub;
//Create a link between objects
Public Shared Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink();
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault();
Return Link;
End Function;
See also: