This article describes an example of creating and executing an ETL task with source data filtering.
The repository must contain two tables T_Source, T_Destination», the tables must be identical in structure and must contain the ID field. Values of this field are used to set up filtering condition. The repository must also contain an ETL task with the ETLTASKS identifier. On executing the example presented below three objects are created in the ETL task: repository source, the Filtering converter and repository consumer. The required properties are defined for all objects, links are set and filtering condition is defined:
After creating and saving of objects the ETL task is executed. The similar code applied to different objects is placed into separate procedures or functions.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
Filter: IEtlPlainDataFilter;
Link, Link1: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, FilterInFields, FilterOutFields, ConsFields: IEtlPlainFields;
Begin
//Open the ETL task
MB := MetabaseClass.Active;
ETLTask := MB.ItemById("ETLTASKS").Edit As IEtlTask;
//Create provider
EtlProvider := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit;
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById("T_Source").Bind As IDatasetModel;
EtlProvider.FillDefault;
//Create visual object the source
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create consumer
EtlConsumer := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit;
EtlConsumer.Id:="Metabase_Consumer"; EtlConsumer.Name:="Export to the table"; MetabaseConsumer:=EtlConsumer.ConsumerAsIDtMetabaseConsumer; MetabaseConsumer.Dataset:=MB.ItemById("T_Destination").BindAsIDatasetModel; //Create visual object of the consumer CreateWX(ETLTask,EtlConsumer,60,0); //Create the Filtering transformer Filter:=ETLTask.Create(EtlObjectType.PlainDataFilter)AsIEtlPlainDataFilter; Filter:=Filter.Edit; Filter.Id:="Filter_Transform"; Filter.Name:="Filtering";
//Create visual object of transformer CreateWX(ETLTask,Filter,0,0); //Fill the list of all objects basing on the list of all provider fields //Get a list of fields ProvFields:=EtlProvider.PlainOutput.Fields;//Provider output FilterInFields:=Filter.PlainInput.Fields;//Transformer input FilterOutFields:=Filter.PlainOutput.Fields;//Transformer output ConsFields:=EtlConsumer.PlainInput.Fields;//Consumer input //Fill lists of fields FillFields(FilterInFields,ProvFields); FillFields(FilterOutFields,FilterInFields); FillFields(ConsFields,FilterOutFields);
//Set up filtering parameters:skip all records where ID<30 Filter.FilterIf.AsString:=Filter.PlainInput.Id+".ID<30"; Filter.KeepIfFulfillCondition:=False; //Save all objects EtlProvider.Save; Filter.Save; EtlConsumer.Save; //Link of transformer with all objects //Create links Link:=CreateLink(ETLTask,EtlProvider.PlainOutput,Filter.PlainInput); Link1:=CreateLink(ETLTask,Filter.PlainOutput,EtlConsumer.PlainInput);
//Create visual links Shapes:=ETLTask.Workspace.Shapes; CreateWXLink(ETLTask,Shapes.FindById(EtlProvider.Id),Shapes.FindById(Filter.Id),Link); CreateWXLink(ETLTask,Shapes.FindById(Filter.Id),Shapes.FindById(EtlConsumer.Id),Link1); //Save task (ETLTaskAsIMetabaseObject).Save; //Perform task ETLTask.Execute(Null); EndSubUserProc; //Create visual objects SubCreateWX(ETLTask:IEtlTask;ETLObject:IEtlObject;XPosition:Integer;YPosition:Integer); Var WxRect:IWxRectangle;
WxETLObj:IWxEtlObject; Begin WxRect:=ETLTask.Workspace.CreateRectangle; WxRect.Id:=ETLObject.Id; WxETLObj:=NewWxEtlObject.Create; WxETLObj.EtlObject:=ETLObject; WxRect.Style.TextPosition:=WxTextPosition.Bottom; WxRect.Style.PictureMarginTop:=-10; WxRect.PinPosition:=NewGxPointF.Create(XPosition,YPosition); WxRect.Extension:=WxETLObjAsIWxShapeExtension; EndSubCreateWX; //Fill lists of input/output lists SubFillFields(Fields,FieldsSource:IEtlPlainFields); Begin Fields:=Fields.Edit;
Fields.Fill(FieldsSource);
Fields.Save;
End Sub FillFields;
//Create a link between objects
Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink;
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault;
Return Link;
EndFunctionCreateLink; //Create visual links SubCreateWXLink(ETLTask:IEtlTask;Shape1:IWxShape;Shape2:IWxShape;Link:IEtlPlainLink); Var WLink:IWxLink; WxETLLink:IWxEtlObject; Begin //Create visual objects of the link WLink:=ETLTask.Workspace.AutoLinkShapes(Shape1,Shape2); WLink.Style.LinePenBeginWxCap:=WxLineCap.Flat; WLink.Style.LinePenEndWxCap:=WxLineCap.Arrow30DegreeFilled; WxETLLink:=NewWxEtlObject.Create; WxETLLink.EtlObject:=Link; WLink.Extension:=WxETLLinkAsIWxShapeExtension; EndSubCreateWXLink;
Specified procedure is an entry point for .NET assembly. Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
Filter: IEtlPlainDataFilter;
Link, Link1: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields,FilterInFields,FilterOutFields,ConsFields:IEtlPlainFields; Begin //Open the ETL task MB:=Params.Metabase; ETLTask:=MB.ItemById["ETLTASKS"].Edit()AsIEtlTask; //Create source EtlProvider:=ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider)AsIEtlPlainDataProvider; EtlProvider:=EtlProvider.Edit(); EtlProvider.Id:="Metabase_Provider"; EtlProvider.Name:="Import from the table"; MetabaseProvider:=EtlProvider.ProviderAsIDtMetabaseProvider;
MetabaseProvider.Dataset:=MB.ItemById["T_Source"].Bind()AsIDatasetModel; EtlProvider.FillDefault(); //Create visual object of the provider CreateWX(ETLTask,EtlProvider,-60,0); //Create a consumer EtlConsumer:=ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer)AsIEtlPlainDataConsumer; EtlConsumer:=EtlConsumer.Edit(); EtlConsumer.Id:="Metabase_Consumer"; EtlConsumer.Name:="Export to the table"; MetabaseConsumer:=EtlConsumer.ConsumerAsIDtMetabaseConsumer; MetabaseConsumer.Dataset:=MB.ItemById["T_Destination"].Bind()AsIDatasetModel; //Create visual object of the consumer CreateWX(ETLTask,EtlConsumer,60,0); //Create the Filtering transformer Filter:=ETLTask.Create(EtlObjectType.eotPlainDataFilter)AsIEtlPlainDataFilter; Filter:=Filter.Edit(); Filter.Id:="Filter_Transform";
Filter.Name:="Filtering"; //Create visual object of transformer CreateWX(ETLTask,Filter,0,0); //Fill fields lists of all objects basing on the provider fields //Get fields list ProvFields:=EtlProvider.PlainOutput.Fields;//Provider output FilterInFields:=Filter.PlainInput.Fields;//Transformer input FilterOutFields:=Filter.PlainOutput.Fields;//Transformer output ConsFields:=EtlConsumer.PlainInput.Fields;//Consumer input //Fill lists of fields FillFields(FilterInFields,ProvFields); FillFields(FilterOutFields,FilterInFields); FillFields(ConsFields,FilterOutFields);
//Filtering parameters setting:skip all records for which ID<30 Filter.FilterIf.AsString:=Filter.PlainInput.Id+".ID<30"; Filter.KeepIfFulfillCondition:=False; //Save all objects EtlProvider.Save(); Filter.Save(); EtlConsumer.Save(); //Link of the transformer with all objects //Create links Link:=CreateLink(ETLTask,EtlProvider.PlainOutput,Filter.PlainInput); Link1:=CreateLink(ETLTask,Filter.PlainOutput,EtlConsumer.PlainInput); //Create visual links Shapes:=ETLTask.Workspace.Shapes; CreateWXLink(ETLTask,Shapes.FindById(EtlProvider.Id),Shapes.FindById(Filter.Id),Link);
CreateWXLink(ETLTask,Shapes.FindById(Filter.Id),Shapes.FindById(EtlConsumer.Id),Link1); //Savetask (ETLTaskAsIMetabaseObject).Save(); //Executetask ETLTask.Execute(Null); EndSub; //Createvisualobjects PublicSharedSubCreateWX(ETLTask:IEtlTask;ETLObject:IEtlObject;XPosition:Integer;YPosition:Integer); Var GxPointFCls:GxPointF=NewGxPointFClass(); WxRect:IWxRectangle; WxETLObj:IWxEtlObject=NewWxEtlObject(); Begin WxRect:=ETLTask.Workspace.CreateRectangle(); WxRect.Id:=ETLObject.Id; WxETLObj.EtlObject:=ETLObject;
WxRect.Style.TextPosition:=WxTextPosition.wtpBottom; WxRect.Style.PictureMarginTop:=-10; GxPointFCls.Create(XPosition,YPosition); WxRect.PinPosition:=GxPointFCls; WxRect.Extension:=WxETLObjAsIWxShapeExtension; EndSub; //Createvisuallinks PublicSharedSubCreateWXLink(ETLTask:IEtlTask;Shape1:IWxShape;Shape2:IWxShape;Link:IEtlPlainLink); Var WLink:IWxLink; WxETLLink:IWxEtlObject=NewWxEtlObject(); Begin //Createvisuallinkobjects WLink:=ETLTask.Workspace.AutoLinkShapes(Shape1,Shape2); WLink.Style.LinePenBeginWxCap:=WxLineCap.wlcFlat; WLink.Style.LinePenEndWxCap:=WxLineCap.wlcArrow30DegreeFilled;
WxETLLink.EtlObject:=Link; WLink.Extension:=WxETLLinkAsIWxShapeExtension; EndSub; //Filllistsof input/outputfields PublicSharedSubFillFields(Fields,FieldsSource:IEtlPlainFields); Begin Fields:=Fields.Edit(); Fields.Fill(FieldsSource); Fields.Save(); EndSub; //Createa linkbetweenobjects PublicSharedFunctionCreateLink(ETLTask:IEtlTask;Output:IEtlPlainOutput;Input:IEtlPlainInput):IEtlPlainLink; Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink();
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault();
Return Link;
End Function;
See also:
Examples | IEtlPlainDataFilter
Fore example