This article describes an example of creating and executing an ETL task with the use of user algorithm of data transformation.
The repository must contain two tables T_Source, T_Destination and a unit with the USER_PLAIN_MODULE identifier containing the UserSubChangeValues macro. Structure of tables must satisfy the implemented data transformation algorithm. The repository must also contain an ETL task with the ETLTASKS identifier. On executing the example presented below three objects are created in the ETL task: repository source, theUser Algorithm transformer and repository consumer. The required properties and links are set for all objects:
After objects are created and saved, the ETL task is executed. The similar code applied to different objects is placed into separate procedures or functions.
The macro that implements the required class is given in the example for IEtlCustomUser.Transform.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
UserAlg: IEtlPlainDataUser;
UserInput: IEtlPlainInput;
UserOutput: IEtlPlainOutput;
Link, Link1: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, UserInFields, UserOutFields, ConsFields: IEtlPlainFields;
Begin
//Open ETL task
MB := MetabaseClass.Active;
ETLTask := MB.ItemById("ETLTASKS").Edit As IEtlTask;
//Create a provider
EtlProvider := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit;
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById("T_Source").Bind As IDatasetModel;
EtlProvider.FillDefault;
//Create a visual object of provider
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create a consumer
EtlConsumer := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit;
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById("T_Destination").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create the "User Algorithm" transformer
UserAlg := ETLTask.Create(EtlObjectType.PlainDataUser) As IEtlPlainDataUser;
UserAlg := UserAlg.Edit;
UserAlg.Id := "UserAlg_Transform";
UserAlg.Name := "User Algorithm";
UserAlg.Module := MB.ItemById("USER_PLAIN_MODULE");
UserAlg.ForeSub := "UserSubChangeValues";
UserAlg.PlainInputs.Add;
UserAlg.PlainOutputs.Add;
UserInput := UserAlg.PlainInputs.Item(0);
UserOutput := UserAlg.PlainOutputs.Item(0);
//Create a visual object of transformer
CreateWX(ETLTask, UserAlg, 0, 0);
//Fill lists of fields of all objects based on the list of provider fields
//Get list of fields
ProvFields := EtlProvider.PlainOutput.Fields; //Provider output
UserInFields := UserInput.Fields; //Transformer input
UserOutFields := UserOutput.Fields; //Transformer output
ConsFields := EtlConsumer.PlainInput.Fields; //Consumer input
//Fill lists of fields
FillFields(UserInFields, ProvFields);
FillFields(UserOutFields, UserInFields);
FillFields(ConsFields, UserOutFields);
//Save all objects
EtlProvider.Save;
UserAlg.Save;
EtlConsumer.Save;
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, UserInput);
Link1 := CreateLink(ETLTask, UserOutput, EtlConsumer.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProvider.Id), Shapes.FindById(UserAlg.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(UserAlg.Id), Shapes.FindById(EtlConsumer.Id), Link1);
//Save task
(ETLTask As IMetabaseObject).Save;
//Execute task
ETLTask.Execute(Null);
End Sub UserProc;
//Create visual objects
Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject;
Begin
WxRect := ETLTask.Workspace.CreateRectangle;
WxRect.Id := ETLObject.Id;
WxETLObj := New WxEtlObject.Create;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.Bottom;
WxRect.Style.PictureMarginTop := -10;
WxRect.PinPosition := New GxPointF.Create(XPosition, YPosition);
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub CreateWX;
//Fill lists of input/output fields
Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit;
Fields.Fill(FieldsSource);
Fields.Save;
End Sub FillFields;
//Create a link between objects
Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink;
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault;
Return Link;
End Function CreateLink;
//Create visual links
Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject;
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.Flat;
WLink.Style.LinePenEndWxCap := WxLineCap.Arrow30DegreeFilled;
WxETLLink := New WxEtlObject.Create;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub CreateWXLink;
The specified procedure is an entry point for .NET assembly. Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
UserAlg: IEtlPlainDataUser;
UserInput: IEtlPlainInput;
UserOutput: IEtlPlainOutput;
Link, Link1: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, UserInFields, UserOutFields, ConsFields: IEtlPlainFields;
Begin
//Open ETL task
MB := Params.Metabase;
ETLTask := MB.ItemById["ETLTASKS"].Edit() As IEtlTask;
//Create a provider
EtlProvider := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit();
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById["T_Source"].Bind() As IDatasetModel;
EtlProvider.FillDefault();
//Create a visual object of provider
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create a provider
EtlConsumer := ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit();
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById["T_Destination"].Bind() As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create the "User Algorithm" transformer
UserAlg := ETLTask.Create(EtlObjectType.eotPlainDataUser) As IEtlPlainDataUser;
UserAlg := UserAlg.Edit();
UserAlg.Id := "UserAlg_Transform";
UserAlg.Name := "User algorithm";
UserAlg.Module := MB.ItemById["USER_PLAIN_MODULE"];
UserAlg.ForeSub := "UserSubChangeValues";
UserAlg.PlainInputs.Add();
UserAlg.PlainOutputs.Add();
UserInput := UserAlg.PlainInputs.Item[0];
UserOutput := UserAlg.PlainOutputs.Item[0];
//Create a visual object of transformer
CreateWX(ETLTask, UserAlg, 0, 0);
//Fill lists of fields of all objects based on the list of provider fields
//Get list of fields
ProvFields := EtlProvider.PlainOutput.Fields; //Provider output
UserInFields := UserInput.Fields; //Transformer input
UserOutFields := UserOutput.Fields; //Transformer output
ConsFields := EtlConsumer.PlainInput.Fields; //Consumer input
//Fill lists of fields
FillFields(UserInFields, ProvFields);
FillFields(UserOutFields, UserInFields);
FillFields(ConsFields, UserOutFields);
//Save all objects
EtlProvider.Save();
UserAlg.Save();
EtlConsumer.Save();
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, UserInput);
Link1 := CreateLink(ETLTask, UserOutput, EtlConsumer.PlainInput);
//Create visual links
Shapes := ETLTask.Workspace.Shapes;
CreateWXLink(ETLTask, Shapes.FindById(EtlProvider.Id), Shapes.FindById(UserAlg.Id), Link);
CreateWXLink(ETLTask, Shapes.FindById(UserAlg.Id), Shapes.FindById(EtlConsumer.Id), Link1);
//Save task
(ETLTask As IMetabaseObject).Save();
//Execute task
ETLTask.Execute(Null);
End Sub;
//Create visual objects
Public Shared Sub CreateWX(ETLTask: IEtlTask; ETLObject: IEtlObject; XPosition: Integer; YPosition: Integer);
Var
GxPointFCls: GxPointF = New GxPointFClass();
WxRect: IWxRectangle;
WxETLObj: IWxEtlObject = New WxEtlObject();
Begin
WxRect := ETLTask.Workspace.CreateRectangle();
WxRect.Id := ETLObject.Id;
WxETLObj.EtlObject := ETLObject;
WxRect.Style.TextPosition := WxTextPosition.wtpBottom;
WxRect.Style.PictureMarginTop := -10;
GxPointFCls.Create(XPosition, YPosition);
WxRect.PinPosition := GxPointFCls;
WxRect.Extension := WxETLObj As IWxShapeExtension;
End Sub;
//Create visual links
Public Shared Sub CreateWXLink(ETLTask: IEtlTask; Shape1: IWxShape; Shape2: IWxShape; Link: IEtlPlainLink);
Var
WLink: IWxLink;
WxETLLink: IWxEtlObject = New WxEtlObject();
Begin
//Create visual link objects
WLink := ETLTask.Workspace.AutoLinkShapes(Shape1, Shape2);
WLink.Style.LinePenBeginWxCap := WxLineCap.wlcFlat;
WLink.Style.LinePenEndWxCap := WxLineCap.wlcArrow30DegreeFilled;
WxETLLink.EtlObject := Link;
WLink.Extension := WxETLLink As IWxShapeExtension;
End Sub;
//Fill lists of input/output fields
Public Shared Sub FillFields(Fields, FieldsSource: IEtlPlainFields);
Begin
Fields := Fields.Edit();
Fields.Fill(FieldsSource);
Fields.Save();
End Sub;
//Create a link between objects
Public Shared Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink();
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault();
Return Link;
End Function;
See also: