FetchBlockSize: Integer;
FetchBlockSize: integer;
The FetchBlockSize property determines the number of processed records on using partial data sampling in data source.
This property is relevant if the IEtlTask.PartialFetch property is set to True.
If the IEtlPlainDataProvider.FetchBlockSize property is set to any value, it overlaps value of the IEtlTask.DefaultBlockSize property.
Executing the example requires that the repository contains:
An ETL task with the T_ETL identifier.
A data source table with the IMPORT_TABLE identifier containing several ten thousands records.
A data consumer table with the EXPORT_TABLE identifier.
Each table contains three identical fields.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
MObj: IMetabaseObject;
EtlTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
EtlConsumer: IEtlPlainDataConsumer;
MBProvider: IDtMetabaseProvider;
MBConsumer: IDtMetabaseConsumer;
TabProv, TabCons: IDatasetModel;
DataSetInstance: IDataSetInstance;
Fields: IDatasetInstanceFields;
DBU: IDatasetBatchUpdate;
Vals: Array[0..2] Of Variant;
i, j: integer;
PlainLink: IEtlPlainLink;
ProvPlainOutput: IEtlPlainOutput;
ConsPlainInput: IEtlPlainInput;
WLink: IWxLink;
WxETLLink: IWxEtlObject;
WxProvider, WxConsumer: IWxShape;
Begin
MB := MetabaseClass.Active;
MObj := MB.ItemById("T_ETL").Edit;
EtlTask := MObj As IEtlTask;
EtlTask.PartialFetch := True;
If EtlTask.Workspace.Shapes.Count > 0 Then
j := EtlTask.Workspace.Shapes.Count - 1;
// Remove ETL task objects
For i := j To 0 Step - 1 Do
EtlTask.Workspace.Shapes.Item(i).Delete;
End For;
EtlTask.FindById("MB_Provider").Remove;
End If;
// Get data source table
TabProv := MB.ItemById("IMPORT_TABLE").Bind As IDatasetModel;
DataSetInstance := MB.ItemById("IMPORT_TABLE").Open(Null) As IDataSetInstance;
// Get collection of values of data source current record fields
Fields := DataSetInstance.Fields;
// Create an object for source data update
DBU := DataSetInstance.CreateBatchUpdate;
// Set data update method in table - quick insert
DBU.BatchCommand := DataSetBatchCommand.Insert;
DBU.FastInsert := True;
// Update the records, the first field values of which are greater than 1
Dbu.KeyFieldNames := Fields.Item(0).Id;
Dbu.FailPolicy := DatasetFailPolicy.CommitByRow;
Dbu.BeginUpdate;
While Not DataSetInstance.Eof Do
i := Fields.Item(0).Value As Integer;
If i > 1 Then
Vals[0] := i;
Vals[1] := Fields.Item(1).Value;
Vals[2] := (i - (Fields.Item(1).Value As Integer)) * 10;
Dbu.AddRecord(Vals);
End If;
DataSetInstance.Next;
End While;
// Apply update
Dbu.Commit;
//Create the "Repository Source" object
EtlProvider := EtlTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit;
EtlProvider.Id := "MB_Provider";
EtlProvider.Name := "Repository data source";
EtlProvider.FetchBlockSize := 200;
//Set up "Repository Source"
MBProvider := EtlProvider.Provider As IDtMetabaseProvider;
MBProvider.Dataset := TabProv;
MBProvider.FieldsFromFile;
EtlProvider.FillDefault;
ProvPlainOutput := EtlProvider.PlainOutput;
EtlProvider.Save;
If EtlTask.Workspace.Shapes.Count > 0 Then
j := EtlTask.Workspace.Shapes.Count - 1;
// Remove ETL task objects
For i := j To 0 Step - 1 Do
EtlTask.Workspace.Shapes.Item(i).Delete;
End For;
EtlTask.FindById("MB_Consumer").Remove;
End If;
TabCons := MB.ItemById("EXPORT_TABLE").Bind As IDatasetModel;
//Create the "Repository Consumer" object
EtlConsumer := EtlTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit;
EtlConsumer.Id := "MB_Consumer";
EtlConsumer.Name := "Repository data source";
//Set up "Repository Consumer"
MBConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MBConsumer.Dataset := TabCons;
EtlConsumer.FillDefault;
ConsPlainInput := EtlConsumer.PlainInput;
EtlConsumer.Save;
// Create visual objects
CreateWXP(EtlProvider, EtlTask);
CreateWXC(EtlConsumer, EtlTask);
// Create a link and its visual view
PlainLink := EtlTask.CreatePlainLink;
PlainLink.DestinationObjectInput := ConsPlainInput;
PlainLink.SourceObjectOutput := ProvPlainOutput;
PlainLink.FillDefault;
//Source rectangle
WxProvider := EtlTask.Workspace.Shapes.Item(0);
//Consumer rectangle
WxConsumer := EtlTask.Workspace.Shapes.Item(1);
WLink := EtlTask.Workspace.AutoLinkShapes(WxProvider, WxConsumer);
WxETLLink := New WxEtlObject.Create;
WxETLLink.EtlObject := PlainLink;
WLink.Extension := WxETLLink As IWxShapeExtension;
// Save changes
Mobj.Save;
End Sub UserProc;
// Create a visual object of data source
Sub CreateWXP(CopyObj: IEtlPlainDataProvider; Etltask: IEtltask);
Var
WxDataTrans: IWxRectangle;
WxETLDataTrans: IWxEtlObject;
Begin
WxDataTrans := EtlTask.Workspace.CreateRectangle;
WxDataTrans.Id := CopyObj.Id;
WxETLDataTrans := New WxEtlObject.Create;
WxETLDataTrans.EtlObject := CopyObj;
WxDataTrans.Style.TextPosition := WxTextPosition.Bottom;
WxDataTrans.Style.PictureMarginTop := -10;
WxDataTrans.PinPosition := New GxPointF.Create(20, 20);
WxDataTrans.Extension := WxETLDataTrans As IWxShapeExtension;
End Sub CreateWXP;
// Create a visual object of data consumer
Sub CreateWXC(CopyObj: IEtlPlainDataConsumer; Etltask: IEtltask);
Var
WxDataTrans: IWxRectangle;
WxETLDataTrans: IWxEtlObject;
Begin
WxDataTrans := EtlTask.Workspace.CreateRectangle;
WxDataTrans.Id := CopyObj.Id;
WxETLDataTrans := New WxEtlObject.Create;
WxETLDataTrans.EtlObject := CopyObj;
WxDataTrans.Style.TextPosition := WxTextPosition.Bottom;
WxDataTrans.Style.PictureMarginTop := -10;
WxDataTrans.PinPosition := New GxPointF.Create(40, 40);
WxDataTrans.Extension := WxETLDataTrans As IWxShapeExtension;
End Sub CreateWXC;
After executing the example the following is created in the ETL task:
A repository data source with set parameters. For the table specified as a data source the "quick insert" table data update method is applied.
A repository data consumer with set parameters.
After executing the example starting the specified ETL task takes less time for execution.
The requirements and result of the Fore.NET example execution match with those in the Fore example.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
…
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
MObj: IMetabaseObject;
EtlTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
EtlConsumer: IEtlPlainDataConsumer;
MBProvider: IDtMetabaseProvider;
MBConsumer: IDtMetabaseConsumer;
TabProv, TabCons: IDatasetModel;
DataSetInstance: IDataSetInstance;
Fields: IDatasetInstanceFields;
DBU: IDatasetBatchUpdate;
Vals: Array[0..2] Of object;
i, j: integer;
PlainLink: IEtlPlainLink;
ProvPlainOutput: IEtlPlainOutput;
ConsPlainInput: IEtlPlainInput;
WLink: IWxLink;
WxETLLink: WxEtlObject = New WxEtlObjectClass();
WxProvider, WxConsumer: IWxShape;
Begin
MB := Params.Metabase;
MObj := MB.ItemById["T_ETL"].Edit();
EtlTask := MObj As IEtlTask;
EtlTask.PartialFetch := True;
If EtlTask.Workspace.Shapes.Count > 0 Then
j := EtlTask.Workspace.Shapes.Count - 1;
// Remove ETL task objects
For i := j To 0 Step - 1 Do
EtlTask.Workspace.Shapes.Item[i].Delete();
End For;
EtlTask.FindById("MB_Provider").Remove();
End If;
// Get data source table
TabProv := MB.ItemById["IMPORT_TABLE"].Bind() As IDatasetModel;
DataSetInstance := MB.ItemById["IMPORT_TABLE"].Open(Null) As IDataSetInstance;
// Get collection of values of data source current record fields
Fields := DataSetInstance.Fields;
// Create an object for source data update
DBU := DataSetInstance.CreateBatchUpdate();
// Set data update method in table - quick insert
DBU.BatchCommand := DataSetBatchCommand.dbcInsert;
DBU.FastInsert := True;
// Update the records, the first field values of which are greater than 1
Dbu.KeyFieldNames := Fields.Item[0].Id;
Dbu.FailPolicy := DatasetFailPolicy.dfpCommitByRow;
Dbu.BeginUpdate();
While Not DataSetInstance.Eof() Do
i := Fields.Item[0].Value As Integer;
If i > 1 Then
Vals[0] := i;
Vals[1] := Fields.Item[1].Value;
Vals[2] := (i - (Fields.Item[1].Value As Integer)) * 10;
Dbu.AddRecord(Vals);
End If;
DataSetInstance.Next();
End While;
// Apply update
Dbu.Commit();
//Create the "Repository Source" object
EtlProvider := EtlTask.Create(EtlObjectType.eotPlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit();
EtlProvider.Id := "MB_Provider";
EtlProvider.Name := "Repository data source";
EtlProvider.FetchBlockSize := 200;
//Set up "Repository Source"
MBProvider := EtlProvider.Provider As IDtMetabaseProvider;
MBProvider.Dataset := TabProv;
MBProvider.FieldsFromFile();
EtlProvider.FillDefault();
ProvPlainOutput := EtlProvider.PlainOutput;
EtlProvider.Save();
If EtlTask.Workspace.Shapes.Count > 0 Then
j := EtlTask.Workspace.Shapes.Count - 1;
// Remove ETL task objects
For i := j To 0 Step - 1 Do
EtlTask.Workspace.Shapes.Item[i].Delete();
End For;
EtlTask.FindById("MB_Consumer").Remove();
End If;
TabCons := MB.ItemById["EXPORT_TABLE"].Bind() As IDatasetModel;
//Create the "Repository Consumer" object
EtlConsumer := EtlTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit();
EtlConsumer.Id := "MB_Consumer";
EtlConsumer.Name := "Repository data source";
//Set up "Repository Consumer"
MBConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MBConsumer.Dataset := TabCons;
EtlConsumer.FillDefault();
ConsPlainInput := EtlConsumer.PlainInput;
EtlConsumer.Save();
// Create visual objects
CreateWXP(EtlProvider, EtlTask);
CreateWXC(EtlConsumer, EtlTask);
// Create a link
PlainLink := EtlTask.CreatePlainLink();
PlainLink.DestinationObjectInput := ConsPlainInput;
PlainLink.SourceObjectOutput := ProvPlainOutput;
// Create a link and its visual view
PlainLink := EtlTask.CreatePlainLink();
PlainLink.DestinationObjectInput := ConsPlainInput;
PlainLink.SourceObjectOutput := ProvPlainOutput;
PlainLink.FillDefault();
//Source rectangle
WxProvider := EtlTask.Workspace.Shapes.Item[0];
//Consumer rectangle
WxConsumer := EtlTask.Workspace.Shapes.Item[1];
WLink := EtlTask.Workspace.AutoLinkShapes(WxProvider, WxConsumer);
WxETLLink.EtlObject := PlainLink;
WLink.Extension := WxETLLink As IWxShapeExtension;
// Save changes
Mobj.Save();
End Sub;
// Create a visual object of data source
Public Shared Sub CreateWXP(CopyObj: IEtlPlainDataProvider; Etltask: IEtltask);
Var
WxDataTrans: IWxRectangle;
WxETLDataTrans: WxEtlObject = New WxEtlObjectClass();
OutPoint: GxPointF = New GxPointFClass();
Begin
WxDataTrans := EtlTask.Workspace.CreateRectangle();
WxDataTrans.Id := CopyObj.Id;
WxETLDataTrans.EtlObject := CopyObj;
WxDataTrans.Style.TextPosition := WxTextPosition.wtpBottom;
WxDataTrans.Style.PictureMarginTop := -10;
OutPoint := WxDataTrans.PinPosition;
OutPoint.Create(20,20); WxDataTrans.Extension:=WxETLDataTransAsIWxShapeExtension; EndSubCreateWXP; //Create visual object of data consumer PublicSharedSubCreateWXC(CopyObj:IEtlPlainDataConsumer;Etltask:IEtltask); Var WxDataTrans:IWxRectangle; WxETLDataTrans:WxEtlObject=NewWxEtlObjectClass(); OutPoint:GxPointF=NewGxPointFClass(); Begin WxDataTrans:=EtlTask.Workspace.CreateRectangle(); WxDataTrans.Id:=CopyObj.Id; WxETLDataTrans.EtlObject:=CopyObj; WxDataTrans.Style.TextPosition:=WxTextPosition.wtpBottom; WxDataTrans.Style.PictureMarginTop:=-10; OutPoint:=WxDataTrans.PinPosition; OutPoint.Create(40,40); WxDataTrans.Extension:=WxETLDataTransAsIWxShapeExtension; EndSubCreateWXC;
See also: