Transaccional hacia Inteligencia de Negocio
Introducción
Todos los desarrolladores de sistemas Transaccionales(OLTP) han tenido que realizar determinados procesos para la alimentación de las bases de datos de Inteligencia de Negocio(BI). Este tipo de procesos, pueden ser más o menos complicados dependiendo del gerente del sistema OLTP y del gerente del sistema de BI. Me explico: Normalmente, la información necesaria en ambos tipos de sistema no suele ser la misma, así que, ya sea el equipo del OLTP o ya sea el del BI, deberá analizar, seleccionar y trasvasar la información disponible desde un sistema a otro. Y dependiendo de los gerentes, esta tarea la realizará uno u otro equipo.
En el caso de que dicha tarea la deban realizar el equipo del OLTP, normalmente, se le ha de añadir un requisito adicional: la eficiencia. Ello suele ser debido a que los sistemas OLTP suelen estar muy ocupados durante el día con el trabajo online, y por las noches, con cargas de datos y realización de informes variados. Así, la ventana de tiempo dada para este tipo de procesos suele ser muy pequeña, por lo que se ha de optimizar al máximo todo el proceso de trasvase.
Aquí se va a describir una metodología de trabajo usando las librerías y procedimientos almacenados de Insert Bulk Bestia de forma que se minimice el tiempo de análisis, selección y trasmisión de la información desde un sistema a otro.
El código mostrado en este capítulo se presenta a modo de ejemplo de utilización de la librería SQLServerAsyncExecProcedures incluida en Insert Bulk Bestia. InforCustom no se hace responsable del uso de dicho código.
Planteamiento del Problema
Se desea realizar un proceso diario para enviar determinada información desde un sistema OLTP a otro sistema para BI (ambos se encuentran en hosts diferentes). Dicha información debe quedar marcada indicando para cada registro si es nuevo, se ha modificado o eliminado desde la última vez que se ejecutó el proceso.
Para ello se crearán una serie de tablas repositorio en ambos sistemas, y en BI se creará una tabla, "ACTUALIZACIONES", donde quedará registrado cada ejecución del proceso. BI chequeará dicha tabla y cuando aparezca un nuevo registro, deberá procesar la información trasvasada.
Nota: Todos los objetos creados se harán en el esquema "rca".
También se parte de la hipótesis de que en el OLTP se encuentran instaladas las librerías de Insert Bulk Bestia y en el sistema de BI no.
Repositorio en OLTP
En la base de datos del sistema OLTP se va a crear un repositorio de tablas con la información que se desea transmitir, y asociado a cada tabla existirá otra tabla donde se almacenará un par de HASHs de la información. Estos dos HASHs van a facilitar y acelerar el proceso de comparación de la información con respecto a si se hiciese con querys que comparasen campo a campo, junto a las querys necesarias para averiguar cuáles son los registros nuevos y los eliminados.
Además del repositorio, se va a incluir en el código una Secuencia utilizada para dar nombre a las sucesivas ejecuciones y una función de cálculo de HASH para aquellas versiones de SQL Server (2014 e inferiores) cuya función HASHBYTES está limitada a 4000 caracteres (NVARCHAR). Veámoslo con código (por brevedad, sólo se van a incluir un par de tablas de ejemplo):
IF EXISTS(select NULL from sys.sequences where object_id = object_id('rca.SEQ_PARALELO'))
DROP SEQUENCE rca.SEQ_PARALELO
GO
CREATE SEQUENCE rca.SEQ_PARALELO
AS [bigint]
START WITH 1
INCREMENT BY 1
MINVALUE 1
MAXVALUE 9223372036854775807
CACHE 10
GO
--SQL Server 2014 y versiones anteriores:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.fnHashBytes') IS NOT NULL
DROP FUNCTION rca.fnHashBytes
GO
CREATE FUNCTION rca.fnHashBytes (
@EncryptAlgorithm NVARCHAR(50),
@DataToEncrypt nvarchar(max))
RETURNS VARBINARY(MAX) WITH SCHEMABINDING
AS BEGIN
DECLARE @Index INT = 1, @DataToEncryptLength INT
DECLARE @EncryptedResult varbinary(max) = NULL
IF @DataToEncrypt IS NOT NULL
BEGIN
SELECT @EncryptedResult = 0x, @DataToEncryptLength = DATALENGTH(@DataToEncrypt)
WHILE @Index <= @DataToEncryptLength
BEGIN
SELECT @EncryptedResult = @EncryptedResult + HASHBYTES(@EncryptAlgorithm,
SUBSTRING(@DataToEncrypt, @Index, 4000)), @Index = @Index + 4000
END
END
RETURN @EncryptedResult
END
GO
Como se puede observar, se utiliza el esquema "rca" para definir los objetos utilizados para el repositorio.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
Campo2 ASC,
ACT_CD_TIPO ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1_HASH') IS NOT NULL
DROP TABLE rca.Tabla1_HASH
GO
CREATE TABLE rca.Tabla1_HASH (
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
HASH_OLD varbinary(max) NULL,
HASH_NEW varbinary(max) NULL,
CONSTRAINT Tabla1_HASH_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
Campo2 ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
ACT_CD_TIPO ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2_HASH') IS NOT NULL
DROP TABLE rca.Tabla2_HASH
GO
CREATE TABLE rca.Tabla2_HASH (
Campo1 varchar(16) NOT NULL,
HASH_OLD varbinary(max) NULL,
HASH_NEW varbinary(max) NULL,
CONSTRAINT Tabla2_HASH_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
Se va a suponer que tanto "Tabla1" como "Tabla2" sólo contienen los campos que se desean trasvasar. La información va a provenir de las tablas "Tabla1Org" y "Tabla2Org" respectivamente, las cuales contendrán mucha más información, pero que no es necesaria para BI. "Tabla1" tiene una clave primaria que se compone de dos campos y "Tabla2" de un solo campo.
Ambas tablas contienen el campo ACT_CD_TIPO que forma parte de la clave principal. Durante el procesado de la información, los registros se van a duplicar, y este campo se utilizará para distinguir los registros nuevos de los antiguos.
El campo HASH_OLD contendrá el HASH del registro de la última ejecución del proceso, mientras que el campo y HASH_NEW contendrá el HASH del registro de la ejecución actual. Si HASH_OLD es NULL, querrá decir que el registro es nuevo, si HAS_NEW es NULL querrá decir que el registro se ha eliminado de la tabla original. Y si ambos campos no son NULL y no coinciden querrá decir que el registro ha sido modificado.
Procedimientos Almacenados en OLTP
En el OLTP se van a crear una serie de procedimientos almacenados para la actualización de las tablas del repositorio que se han creado.
Se quiere destacar que en las tablas del repositorio no se ha creado ninguna Foreing Key. Sin embargo, si las hubiese, antes de la actualización de las tablas habría que eliminarlas. La actualización de las tablas del repositorio se va a realizar en paralelo, por lo que no habrá ningún orden preestablecido. Así, en primer lugar, se van a crear dos procedimientos almacenados para eliminar y recrear dichas Foreing Keys. A continuación se presenta una aproximación a dichos procedimientos almacenados:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkLocales') IS NOT NULL
DROP PROCEDURE rca.EliminaFkLocales
GO
CREATE PROCEDURE rca.EliminaFkLocales
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Eliminar todas las FKs de las tablas del
repositorio para su carga
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'Tabla1_Tabla2_FK')
ALTER TABLE rca.Tabla1 DROP CONSTRAINT
Tabla1_Tabla2_FK
EXECUTE ibb.Log_Errores_Inserta N'Elimnadas Foreing
Keys Locales.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR eliminando
las Foreing Keys Locales.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkLocales') IS NOT NULL
DROP PROCEDURE rca.CreaFkLocales
GO
CREATE PROCEDURE rca.CreaFkLocales
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Crear todas las FKs necesarias
--En nuestro ejemplo, no hay FKs entre tablas del
repositorio
--IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys
WHERE name = 'Tabla1_Tabla2_FK')
-- ALTER TABLE rca.Tabla1 ADD CONSTRAINT
Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
-- REFERENCES rca.Tabla2 (Campo1)
EXECUTE ibb.Log_Errores_Inserta N'Creadas Foreing
Keys Locales.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR creando las
Foreing Keys Locales.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
A continuación, vendrían los procedimientos almacenados encargados de actualizar las tablas del repositorio. Los que se muestran aquí, se encargan de actualizar el contenido de las tablas "rca.Tabla1", "rca.Tabla1_HASH", "rca.Tabla2" y "rca.Tabla2_HASH".
Estos procedimientos almacenados se ejecutarán en paralelo. Por ello, son importantes las siguientes instrucciones:
- Se desea que, si se produce un error en ellos, no continúe la ejecución (SET XACT_ABORT ON).
- El objetivo de la ejecución en paralelo es la de a aprovechar al máximo todas las CPUs disponibles. Por ello, no se desea que las Querys se ejecuten en paralelo (OPTION(MAXDOP 1)). MERGE suele ser una instrucción candidata a ser ejecutada en paralelo, por lo que es muy importante indicarle que no lo haga.
Algoritmo:
- Actualizar el campo ACT_CD_TIPO a 2 para todos los registros en la tabla del repositorio (Tabla1).
-
Insertar todos los registros procedentes de la tabla original del OLTP (Tabla1Org) en la tabla del
repositorio (Tabla1). El campo ACT_CD_TIPO se pone a 0 para indicar que son los registros insertados.
En esta Query es donde se selecciona y transforma adecuadamente la información procedente del OLTP para cubrir las necesidades de BI. -
Eliminar los registros viejos:
- Tabla1 contiene los registros de la anterior ejecución marcados con ACT_CD_TIPO a 2 y TODOS los registros del OLTP marcados con ACT_CD_TIPO a 0.
- El DELETE eliminará los registros que ya existían en Tabla1 y que siguen existiendo en el OLTP. Para ello se usa la clave de Tabla1.
- De esta forma, los registros con ACT_CD_TIPO = 2 serán los registros que han desaparecido del OLTP, y los registros con ACT_CD_TIPO = 0 contendrán la información existente en el OLTP.
-
Actualización de Tabla1_HASH:
- Se calcula el HASH de cada registro y se compara con el HASH contenido en el campo HASH_NEW.
- Si el registro existe en ambas tablas (Tabla1 y Tabla1_HASH), pero el HASH calculado no coincide con HASH_NEW, se actualiza el campo HASH_NEW con el HASH calculado. Esos serán los registros cuya información ha sido modificada en el OLTP. Para estos registros, HASH_OLD y HASH_NEW no coincidirán. Se verá por qué más adelante.
- Si el registro no existe en Tabla1_HASH, se inserta en dicha tabla. Serán los registros nuevos. Estos registros tendrán el campo HAS_OLD a NULL.
- Si el registro no existe en Tabla1, en Tabla1_HASH se actualiza el campo HASH_NEW a NULL. Estos registros serán los eliminados y el campo HASH_OLD no será NULL.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla1') IS NOT NULL
DROP PROCEDURE rca.RellenaTabla1
GO
CREATE PROCEDURE rca.RellenaTabla1
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se
desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Marcar todos los registros como 2
UPDATE rca.Tabla1
SET ACT_CD_TIPO = 2
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Insertar Todos los registros procedentes de la
Tabla Original marcados como 0
INSERT INTO rca.Tabla1
SELECT Tb1Org.Campo1, Tb1Org.Campo2,
Tb1Org.Campo3, 0 AS ACT_CD_TIPO
FROM dbo.Tabla1Org Tb1Org
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Eliminar los registros viejos:
-- Si registro1 existe en Tabla1 y TablaOrg1,
después del INSERT anterior Tabla1
-- contiene: registro1-2 (del UPDATE) y
registro1-0 (del INSERT). Con este DELETE
-- se eliminará registro1-2, quedando el nuevo
registro.
DELETE Tb1Del
FROM rca.Tabla1 Tb1Del
INNER JOIN rca.Tabla1 Tb1Ins
ON Tb1Del.Campo1
= Tb1Ins.Campo1 AND Tb1Del.Campo2 = Tb1Ins.Campo2
WHERE Tb1Del.ACT_CD_TIPO = 2 AND
Tb1Ins.ACT_CD_TIPO = 0
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Actualización de Tabla1_HASH
-- Se calcula el HASH de cada registro y se
compara con el campo HASH_NEW
-- Si el registro existe en ambas
tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
-- Si el registro no existe
Tabla1_HASH, se inserta
-- Si el registro no existe Tabla1,
se anula el campo HAS_NEW
MERGE rca.Tabla1_HASH TablaHash USING
(SELECT
MBT.Campo1, MBT.Campo2,
--SQL Server 2014 y versiones anteriores:
-- Aquí se está utilizando la función de SQL Server HASHBYTES,
porque se supone que
-- la cadena de caracteres que se va a formar no superará los 4000
caracteres. Si se
-- superase dicha longitud se debería usar la función
rca.fnHashBytes
HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml
auto)) as HASH_NEW
FROM
rca.Tabla1 AS MBT
WHERE
MBT.ACT_CD_TIPO = 0
) AS Hases_Nuevos
ON
TablaHash.Campo1 = Hases_Nuevos.Campo1 AND TablaHash.Campo2 =
Hases_Nuevos.Campo2
WHEN MATCHED AND TablaHash.HASH_NEW
<> Hases_Nuevos.HASH_NEW THEN --Modificados
UPDATE SET TablaHash.HASH_NEW
= Hases_Nuevos.HASH_NEW
WHEN NOT MATCHED BY TARGET THEN --Nuevos
INSERT(Campo1, Campo2,
HASH_NEW)
VALUES(Hases_Nuevos.Campo1,
Hases_Nuevos.Campo2, Hases_Nuevos.HASH_NEW)
WHEN NOT MATCHED BY SOURCE THEN
--Borrados
UPDATE SET HASH_NEW = NULL
OPTION(MAXDOP 1); --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--MERGE es una instrucción candidata a
ser paralelizada por SQL Server. Como ya se hacen muchas cosas
--en paralelo, se le indica que no la
paralelice para no bloquear las CPUs
--Puede ser conveniente actualizar los índices y las
estadísticas por si hay muchas modificaciones
ALTER INDEX ALL ON rca.Tabla1_HASH REBUILD WITH
(MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.Tabla1_HASH WITH COLUMNS
ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP =
1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla1 WITH COLUMNS
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla1 y Tabla1_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla1 y Tabla1_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla2') IS NOT NULL
DROP PROCEDURE rca.RellenaTabla2
GO
CREATE PROCEDURE rca.RellenaTabla2
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se
desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Marcar todos los registros como 2
UPDATE rca.Tabla2
SET ACT_CD_TIPO = 2
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Insertar Todos los registros procedentes de la
Tabla Original marcados como 0
INSERT INTO rca.Tabla2
SELECT Tb2Org.Campo1, Tb2Org.Campo2,
Tb2Org.Campo3, 0 AS ACT_CD_TIPO
FROM dbo.Tabla2Org Tb2Org
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Eliminar los registros viejos:
-- Si registro1 existe en Tabla2 y Tabla2Org,
después del INSERT anterior Tabla1
-- contiene: registro1-2 (del UPDATE) y
registro1-0 (del INSERT). Con este DELETE
-- se eliminará registro1-2, quedando el nuevo
registro.
DELETE Tb2Del
FROM rca.Tabla2 Tb2Del
INNER JOIN rca.Tabla2 Tb2Ins
ON Tb2Del.Campo1
= Tb2Ins.Campo1
WHERE Tb2Del.ACT_CD_TIPO = 2 AND
Tb2Ins.ACT_CD_TIPO = 0
OPTION(MAXDOP 1) --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--Actualización de Tabla2_HASH
-- Se calcula el HASH de cada registro y se
compara con el campo HASH_NEW
-- Si el registro existe en ambas
tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
-- Si el registro no existe
Tabla1_HASH, se inserta
-- Si el registro no existe Tabla1,
se anula el campo HAS_NEW
MERGE rca.Tabla2_HASH TablaHash USING
(SELECT
MBT.Campo1,
--SQL Server 2014 y versiones anteriores:
-- Aquí se está utilizando la función de SQL Server HASHBYTES,
porque se supone que
-- la cadena de caracteres que se va a formar no superará los 4000
caracteres. Si se
-- superase dicha longitud se debería usar la función
rca.fnHashBytes
HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml
auto)) as HASH_NEW
FROM
rca.Tabla2 AS MBT
WHERE
MBT.ACT_CD_TIPO = 0
) AS Hases_Nuevos
ON
TablaHash.Campo1 = Hases_Nuevos.Campo1
WHEN MATCHED AND TablaHash.HASH_NEW
<> Hases_Nuevos.HASH_NEW THEN --Modificados
UPDATE SET TablaHash.HASH_NEW
= Hases_Nuevos.HASH_NEW
WHEN NOT MATCHED BY TARGET THEN --Nuevos
INSERT(Campo1, HASH_NEW)
VALUES(Hases_Nuevos.Campo1,
Hases_Nuevos.HASH_NEW)
WHEN NOT MATCHED BY SOURCE THEN
--Borrados
UPDATE SET HASH_NEW = NULL
OPTION(MAXDOP 1); --Como se ejecutará en
paralelo, no hemos de bloquear las CPUs
--MERGE es una instrucción candidata a
ser paralelizada por SQL Server. Como ya se hacen muchas cosas
--en paralelo, se le indica que no la
paralelice para no bloquear las CPUs
--Puede ser conveniente actualizar los índices y las
estadísticas por si hay muchas modificaciones
ALTER INDEX ALL ON rca.Tabla2_HASH REBUILD WITH
(MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.Tabla2_HASH WITH COLUMNS
ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP =
1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla2 WITH COLUMNS
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla2 y Tabla2_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla2 y Tabla2_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Repositorio en BI
Al igual que se han creado una serie de tablas en el OLTP, se crearán sus réplicas en BI de acuerdo con los requisitos planteados al inicio.
A continuación, se muestra las réplicas de "Tabla1" y "Tabla2". Se les ha añadido un campo nuevo, ACT_CD_ACTUALIZACION, que se corresponde con la clave de la tabla rca.ACTUALIZACIONES. Ambas tablas contienen el campo ACT_CD_TIPO que indicará lo que ha sucedido con el registro desde la última vez que se ejecutó el proceso. Contendrá los valores: 0-Insert, 1-Update, 2-Delete.
La tabla rca.ACTUALIZACIONES va a contener un registro por cada vez que se haya intentado realizar el proceso de trasvase de datos. Si el proceso se ha realizado con éxito, el campo ACT_FH_FIN tendrá un valor, y si ha habido errores, este campo contendrá un NULL.
Al igual que en el OLTP, no se han definido Foreing Keys entre las tablas del repositorio, sin embargo, si se han definido las FKs que relacionan dichas tablas con la tabla rca.ACTUALIZACIONES.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
ACT_CD_ACTUALIZACION bigint NOT NULL,
CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
Campo2 ASC,
ACT_CD_ACTUALIZACION ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON
[PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
Campo1 varchar(16) NOT NULL,
Campo2 smallint NOT NULL,
Campo3 varchar(55) NOT NULL,
ACT_CD_TIPO smallint NOT NULL,
ACT_CD_ACTUALIZACION bigint NOT NULL,
CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
(
Campo1 ASC,
ACT_CD_ACTUALIZACION ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON
[PRIMARY]
) ON [PRIMARY]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ACTUALIZACIONES') IS NOT NULL
DROP TABLE rca.ACTUALIZACIONES
GO
CREATE TABLE rca.ACTUALIZACIONES(
ACT_CD_ACTUALIZACION bigint NOT NULL,
ACT_FH_INICIO datetime2(0) NOT NULL,
ACT_FH_FIN datetime2(0) NULL,
ACT_DURACION nvarchar(50) NULL,
CONSTRAINT ACTUALIZACIONES_PK PRIMARY KEY CLUSTERED
(
ACT_CD_ACTUALIZACION ASC
)WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF,
IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON
[PRIMARY]
) ON [PRIMARY]
GO
CREATE NONCLUSTERED INDEX RCA_ACTUALIZACIONES_FIN_NULO_IDX ON
rca.ACTUALIZACIONES
(
ACT_FH_FIN ASC
)
INCLUDE(ACT_CD_ACTUALIZACION)
WITH (PAD_INDEX = ON, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB =
OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON,
ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 100) ON [PRIMARY]
GO
ALTER TABLE rca.Tabla1 ADD CONSTRAINT ACTUALIZACIONES_Tabla1_FK FOREIGN
KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE
CASCADE
GO
ALTER TABLE rca.Tabla2 ADD CONSTRAINT ACTUALIZACIONES_Tabla2_FK FOREIGN
KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE
CASCADE
GO
Procedimientos Almacenados en BI
También va a ser necesario la creación de determinados procedimientos almacenados del lado de BI:
- Limpieza de las tablas del repositorio y de la tabla rca.ACTUALIZACIONES de posibles ejecuciones anteriores que no finalizaron correctamente: (ACT_FH_FIN IS NULL, o bien, ACT_CD_ACTUALIZACION no existe en rca.ACTUALIZACIONES). También inserta en rca.ACTUALIZACIONES un nuevo registro para la ejecución en curso.
- También se crea un procedimiento almacenado para actualizar la fecha de finalización del proceso, indicando, además, que el proceso se ha ejecutado correctamente.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.LimpiaTablasRemotas') IS NOT NULL
DROP PROCEDURE rca.LimpiaTablasRemotas
GO
CREATE PROCEDURE rca.LimpiaTablasRemotas
@fechaInicioProceso NVARCHAR(50),
@IdParalelo BIGINT,
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET @error = 0
BEGIN TRY
--Eliminar de Tabla1 y Tabla2 aquellos registros que
se hayan podido insertar de un proceso anterior que haya
--podido fallar (ACT_FH_FIN IS NULL o bien
ACT_CD_ACTUALIZACION no existe en rca.ACTUALIZACIONES)
DELETE rca.Tabla1
WHERE ACT_CD_ACTUALIZACION IN (SELECT
ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
OR ACT_CD_ACTUALIZACION NOT
IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)
DELETE rca.Tabla2
WHERE ACT_CD_ACTUALIZACION IN (SELECT
ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
OR ACT_CD_ACTUALIZACION NOT
IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)
--Hay un DELETE CASCADE, pero por tema de
prestaciones se realizan las querys anteriores.
DELETE rca.ACTUALIZACIONES
WHERE ACT_FH_FIN IS NULL
--Se crea el nuevo registro para la ejecución en
curso
INSERT INTO rca.ACTUALIZACIONES
(ACT_CD_ACTUALIZACION, ACT_FH_INICIO)
SELECT @IdParalelo,
TRY_CONVERT(datetime2(0), @fechaInicioProceso, 121)
--Por prestaciones, puede ser conveniente la
actualización de los índices y estadísticas.
ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH
(MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.UpdateFinActualizacion') IS NOT NULL
DROP PROCEDURE rca.UpdateFinActualizacion
GO
CREATE PROCEDURE rca.UpdateFinActualizacion
@IdParalelo bigint,
@Duracion nvarchar(50),
@fFin nvarchar(50),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
UPDATE rca.ACTUALIZACIONES
SET ACT_FH_FIN =
TRY_CONVERT(datetime2(0), @fFin, 121), ACT_DURACION = @Duracion
WHERE ACT_CD_ACTUALIZACION = @IdParalelo
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
En segundo lugar, se crean los procedimientos almacenados para eliminar y crear los Foreing Keys en el servidor remoto. Aunque no se definan FKs entre las tablas del repositorio, si se han definido FKs entre dichas tablas y la tabla rca.ACTUALIZACIONES.
También se ha creado un procedimiento almacenado para la regeneración de los índices y estadísticas de las tablas de BI.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkRemotas') IS NOT NULL
DROP PROCEDURE rca.EliminaFkRemotas
GO
CREATE PROCEDURE rca.EliminaFkRemotas
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
--Eliminar todas las FKs de las tablas del
repositorio para su carga
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'Tabla1_Tabla2_FK')
ALTER TABLE rca.Tabla1 DROP CONSTRAINT
Tabla1_Tabla2_FK
--Eliminar todas las FKs contra la tabla de
rca.ACTUALIZACIONES
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla1_FK')
ALTER TABLE rca.Tabla1 DROP CONSTRAINT
ACTUALIZACIONES_Tabla1_FK
IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla2_FK')
ALTER TABLE rca.Tabla2 DROP CONSTRAINT
ACTUALIZACIONES_Tabla2_FK
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkRemotas') IS NOT NULL
DROP PROCEDURE rca.CreaFkRemotas
GO
CREATE PROCEDURE rca.CreaFkRemotas
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
--Crear todas las FKs necesarias
--En el ejemplo no hay ninguna FKs entre las tablas
del repositorio
--IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys
WHERE name = 'Tabla1_Tabla2_FK')
--ALTER TABLE rca.Tabla1 ADD CONSTRAINT
Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
-- REFERENCES rca.Tabla2 (Campo1)
--Crear todas las FKs contra la tabla de
rca.ACTUALIZACIONES
IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla1_FK')
ALTER TABLE rca.Tabla1 ADD CONSTRAINT
ACTUALIZACIONES_Tabla1_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES
(ACT_CD_ACTUALIZACION) ON DELETE CASCADE
IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE
name = 'ACTUALIZACIONES_Tabla2_FK')
ALTER TABLE rca.Tabla2 ADD CONSTRAINT
ACTUALIZACIONES_Tabla2_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
REFERENCES rca.ACTUALIZACIONES
(ACT_CD_ACTUALIZACION) ON DELETE CASCADE
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RegeneraIndicesRemotos') IS NOT NULL
DROP PROCEDURE rca.RegeneraIndicesRemotos
GO
CREATE PROCEDURE rca.RegeneraIndicesRemotos
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
SET @error = 0
BEGIN TRY
--Si hay muchas actualizaciones en el repositorio,
puede ser conveniente la actualización
--de los índices y las estadísticas
ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP =
0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla1 WITH COLUMNS
ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP =
0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
UPDATE STATISTICS rca.Tabla2 WITH COLUMNS
ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH
(MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE =
OFF)
UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
END TRY
BEGIN CATCH
--Sería conveniente crear toda la estructura de
ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
SET @error = 1
END CATCH
END
GO
Procedimientos Almacenados para el Trasvase de Información
En el OLTP, se crean los procedimientos almacenados para el trasvase de información. Se supone que se está utilizando un Servidor Vinculado llamado RCA, y una base de datos de destino también llamada RCA. Las tablas de destino, como ya se ha comentado, se encuentran en el esquema rca.
Evidentemente, el uso de un Servidor Vinculado no es lo más apropiado para la realización de este tipo de cosas. Sin embargo, si éste es uno de los requisitos, una de las formas de acelerar el proceso es el uso de las librerías 'SQLServerAsyncExecProcedures' de forma que se optimicen al máximo los recursos disponibles, y sobre todo, la transmisión de datos a través de la red, ya que éste suele ser el cuello de botella.
Se puede observar que con el algoritmo utilizado hasta ahora no es necesario que la información viaje en los dos sentidos, es decir, en el OLTP se conoce la información que se ha de enviar a BI, y no es necesario que la información de BI deba viajar al OLTP para realizar comparaciones. Evidentemente, esto genera una gran eficiencia, ya que el cuello de botella suele estar en la red.
Así, a continuación, se muestran los procedimientos almacenados que trasvasan los datos para Tabla1 y Tabla2:
- Si hay transacciones, éstas deberán promocionar a transacciones distribuidas: SET REMOTE_PROC_TRANSACTIONS ON.
- Se desea que, si se produce un error en ellos, no continúe la ejecución (SET XACT_ABORT ON).
- En BI, se han de insertar los registros nuevos (HASH_OLD IS NULL), los modificados y los eliminados (HASH_NEW != HASH_OLD).
- El campo ACT_CD_TIPO contendrá los valores: 0-Insert(HASH_OLD IS NULL), 1-Update(HASH_NEW != HASH_OLD), 2-Delete(HASH_NEW IS NULL).
- El campo ACT_CD_ACTUALIZACION contendrá el valor apropiado como ya se verá mas adelante.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla1') IS NOT NULL
DROP PROCEDURE rca.Traspasa_Tabla1
GO
CREATE PROCEDURE rca.Traspasa_Tabla1
@idParalelo BIGINT,
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Insert de los Nuevos (HASH_OLD Es nulo) y
Modificados (HASH_NEW != HASH_OLD)
INSERT INTO RCA.RCA.rca.Tabla1
SELECT tb1.Campo1, tb1.Campo2,
tb1.Campo3,
IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)),
@idParalelo
FROM rca.Tabla1 tb1
INNER JOIN
rca.Tabla1_HASH ExpH
ON
ExpH.Campo1 = tb1.Campo1 AND ExpH.Campo2 = tb1.Campo2
WHERE ExpH.HASH_OLD IS NULL
OR ExpH.HASH_NEW != ExpH.HASH_OLD
EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado
Tabla1.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando
Tabla1.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla2') IS NOT NULL
DROP PROCEDURE rca.Traspasa_Tabla2
GO
CREATE PROCEDURE rca.Traspasa_Tabla2
@idParalelo BIGINT,
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Insert de los Nuevos (HASH_OLD Es nulo) y
Modificados (HASH_NEW != HASH_OLD)
INSERT INTO RCA.RCA.rca.Tabla2
SELECT tb1.Campo1, tb1.Campo2,
tb1.Campo3,
IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)),
@idParalelo
FROM rca.Tabla2 tb1
INNER JOIN
rca.Tabla2_HASH ExpH
ON
ExpH.Campo1 = tb1.Campo1
WHERE ExpH.HASH_OLD IS NULL
OR ExpH.HASH_NEW != ExpH.HASH_OLD
EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado
Tabla2.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando
Tabla2.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Procedimientos Almacenados para el COMMIT
Hasta ahora, todos los procedimientos almacenados no necesitan transacciones, ya que, en caso de error el algoritmo se recupera de errores. Por ejemplo, si falla en la actualización de las tablas del OLTP o la generación de los HASH_NEWs, en la próxima ejecución del proceso se recuperará de estos errores. Si falla en la transmisión de las tablas desde el OLTP a BI, el procedimiento almacenado 'RCA.RCA.rca.LimpiaTablasRemotas' recuperará las tablas remotas.
Sin embargo, los procedimientos almacenados que aquí se describen son que los realmente hacen el COMMIT del proceso, es decir, son procedimientos almacenados que necesitan hacerse dentro de una transacción, ya que en caso de fallo, el proceso no podrá recuperarse. Las transacciones pueden ser de cualquier nivel de aislamiento, ya que cada procedimiento almacenado no necesita "ver" los datos del resto de procedimientos almacenados.
Los procedimientos almacenados 'rca.ActualizaHash_Tabla1' y 'rca.ActualizaHash_Tabla2' preparan las tablas Tabla1_HASH y Tabla2_HASH para la próxima ejecución del proceso:
- Eliminan de Tabla1_HASH los registros que han desaparecido en el OLTP (HASH_NEW IS NULL).
- Eliminan de Tabla1 aquellos registros que no existen en Tabla1_HASH.
- Actualiza en Tabla1_HASH el campo HASH_OLD con el contenido de HASH_NEW.
En este paso crítico también se ha de llamar al procedimiento remoto 'RCA.RCA.rca.UpdateFinActualizacion', que actualiza el campo 'ACT_FH_FIN' indicando que el proceso se ha ejecutado correctamente. Si falla este método, el proceso tampoco se podrá recuperar en la próxima ejecución. En este otro caso, la transacción que se asocie a este procedimiento almacenado va a ser distribuida, por lo que se tendrá que habilitar MS DTC en ambos servidores.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla1') IS NOT NULL
DROP PROCEDURE rca.ActualizaHash_Tabla1
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla1
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Eliminar de Tabla1_HASH aquellos registros cuyo
HAS_NEW es nulo (eliminados)
DELETE rca.Tabla1_HASH
WHERE HASH_NEW IS NULL
--Eliminar de Tabla1 aquellos registros que no
existan en Tabla1_HASH
DELETE Tbl
FROM rca.Tabla1 Tbl
LEFT JOIN rca.Tabla1_HASH
TblHs
ON Tbl.Campo1 =
TblHs.Campo1 AND Tbl.Campo2 = TblHs.Campo2
WHERE TblHs.Campo1 IS NULL AND
TblHs.Campo2 IS NULL
--Actualizar los HASH_OLD con los HAS_NEW
UPDATE rca.Tabla1_HASH
SET HASH_OLD = HASH_NEW
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla1_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla1_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla2') IS NOT NULL
DROP PROCEDURE rca.ActualizaHash_Tabla2
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla2
@NombreParalelo NVARCHAR(256),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Eliminar de Tabla1_HASH aquellos registros cuyo
HAS_NEW es nulo (eliminados)
DELETE rca.Tabla2_HASH
WHERE HASH_NEW IS NULL
--Eliminar de Tabla1 aquellos registros que no
existan en Tabla1_HASH
DELETE Tbl
FROM rca.Tabla2 Tbl
LEFT JOIN rca.Tabla2_HASH
TblHs
ON Tbl.Campo1 =
TblHs.Campo1
WHERE TblHs.Campo1 IS NULL
--Actualizar los HASH_OLD con los HAS_NEW
UPDATE rca.Tabla2_HASH
SET HASH_OLD = HASH_NEW
EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado
Tabla2_HASH.',
0, @NombreParalelo, @inicio, @nameDB,
@esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
Tabla2_HASH.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Para este punto crítico existen dos posibilidades:
- La primera es la de ejecutar en paralelo cada uno de estos procedimientos almacenados con transacciones independientes. Si todos los procedimientos se ejecutan sin errores se hará un commit de todas las transacciones. Si alguno falla, se hará un rollback. Conviene destacar que la transacción necesaria para ejecutar 'RCA.RCA.rca.UpdateFinActualizacion' debe ser una transacción distribuida, por lo que se necesita tener habilitado MS DTC en ambos servidores. La experiencia dice que esta posibilidad tiene una alta probabilidad de que las transacciones fallen, no los procedimientos almacenados, sino las propias transacciones.
- La segunda es crear un nuevo procedimiento almacenado que llame a todos estos procedimientos almacenados de actualización de HASHs, junto con el procedimiento almacenado 'RCA.RCA.rca.UpdateFinActualizacion' y ponerlo en una transacción. La transacción, evidentemente, va a promocionar a una transacción distribuida, por lo que se necesita tener habilitado MS DTC en ambos servidores.
Como aquí se está presentando un ejemplo demostrativo, se mostrará el código para ambas soluciones. En función de las pruebas de ensayo y error, ya se decidirá por una u otra.
Si se opta por la segunda posibilidad, habrá que crear el siguiente procedimiento almacenado:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Actualiza_Hash_Todas_Tablas_Secuencial') IS NOT NULL
DROP PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
GO
CREATE PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
@NombreParalelo NVARCHAR(256),
@IdParalelo bigint,
@inicioProceso DATETIME2(7),
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)
SET @error = 0
BEGIN TRY
EXEC rca.ActualizaHash_Tabla1 @NombreParalelo, @error
out
IF @error = 0
EXEC rca.ActualizaHash_Tabla2
@NombreParalelo, @error out
--Y así con el resto de tablas
--Por último, actualizar la fecha de finalización del
proceso
IF @error = 0
BEGIN
SELECT @fFin = SYSDATETIME(), @Duracion =
ibb.TimeSpanToNow(@inicioProceso)
EXEC RCA.RCA.rca.UpdateFinActualizacion
@IdParalelo, @Duracion, @fFin, @error out
END
IF @error = 0
EXECUTE ibb.Log_Errores_Inserta N'Se ha
actualizado el HASH de todas las Tablas.',
0, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
ELSE
EXECUTE ibb.Log_Errores_Inserta N'ERROR
actualizando el HASH de todas las Tablas.',
10, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando
el HASH de todas las Tablas.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Creación del Paralelo y Ejecución
Por último, se detalla el procedimiento almacenado que crea el 'Paralelo' y lo ejecuta.
- Si hay transacciones, éstas deberán promocionar a transacciones distribuidas: SET REMOTE_PROC_TRANSACTIONS ON.
- Se desea que, si se produce un error en ellos, no continúe la ejecución (SET XACT_ABORT ON).
- Si hay transacciones abiertas en la llamada a este procedimiento remoto, se aborta la ejecución.
- Con el fin de identificar cada ejecución por un nombre distinto, se va a crear dicho nombre a partir de una secuencia. Este valor, también será utilizado para identificar una nueva ejecución del proceso en BI (ACT_CD_ACTUALIZACION).
-
El Paralelo se va a componer de 3 Lotes de Trabajos:
-
1º Actualización de las tablas del repositorio del OLTP. Sin transacciones.
Trabajos iniciales ejecutados secuencialmente:- Crear el nuevo registro en rca.ACTUALIZACIONES y eliminar datos de ejecuciones pasadas fallidas.
- Si hay FKs en el repositorio del OLTP: Eliminarlas.
- Eliminar las FKs del repositorio de BI.
-
Actualizar las tablas del repositorio del OLTP en paralelo.
Nota: en los procedimientos almacenados va incluido la regeneración de índices y estadísticas, si no, habría que hacer un procedimiento almacenado para ello y ejecutarlo como trabajos Finales.
- Si hay FKs en el repositorio del OLTP: Crearlas
-
2º Trasvase de Información desde el repositorio del OLTP al repositorio de BI. Sin transacciones.
Trabajos a realizar en paralelo:- Traspasar la información desde el repositorio del OLTP a BI en paralelo.
- Crear las FKs en BI.
- Regenerar los índices en BI.
-
3º Actualización de HAHS_OLD por HASH_NEW y poner indicador de fin del trabajo. Con transacciones.
PRIMERA OPCIÓN: Ejecutar en paralelo y con transacciones la actualización de los HASH y la Fecha de Finalización
Trabajos a realizar en paralelo:- Actualizar los HASHs de las tablas en el repositorio del OLTP.
- Poner la fecha de finalización en el repositorio de BI.
- Se han creado tres lotes de trabajos, y después de experimentar varias veces se obtienen los grados de paralelismo óptimos. En el código se indica que para el primer lote el grado de paralelismo es de 6(6 procesos simultáneos), para el segundo 20(transmisión de la información) y para el tercero 15(si se opta por la PRIMERA OPCION)
- Se indica que las transacciones a crear serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta una transacción será de 5 minutos. Este valor deberá ser tal que sea el tiempo total de duración del 3º lote, ya que es el único que tiene transacciones.
SEGUNDA OPCIÓN: Ejecutar el procedimiento almacenado "rca.Actualiza_Hash_Todas_Tablas_Secuencial" dentro de una transacción
Opciones de ejecución para el paralelo:- Se han creado dos lotes de trabajos, y después de experimentar varias veces se obtienen los grados de paralelismo óptimos. En el código se indica que para el primer lote el grado de paralelismo es de 6(6 procesos simultáneos), y para el segundo 20(transmisión de la información). No hay tercer lote.
- Se indica que las transacciones a crear serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta una transacción será de 5 minutos. Sin embargo, esta parametrización no tiene efecto, ya que el tercer lote de trabajos no se ha creado y el resto de lotes no tienen transacciones.
Si el paralelo se ha ejecutado correctamente:- Abrir una transacción
- Ejecutar el procedimiento almacenado creado expresamente para la SEGUNDA OPCIÓN.
- Si se ha ejecutado correctamente, finalizar la transacción, en caso contrario, deshacerla.
-
1º Actualización de las tablas del repositorio del OLTP. Sin transacciones.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTablasParalelo') IS NOT NULL
DROP PROCEDURE rca.RellenaTablasParalelo
GO
CREATE PROCEDURE rca.RellenaTablasParalelo
@error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--SET IMPLICIT_TRANSACTIONS OFF
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON
DECLARE @NumTranAntes INT = @@TRANCOUNT
DECLARE @idParalelo bigint, @NombreParalelo varchar(256) = NULL,
@Parametros NVARCHAR(MAX) = NULL, @Trabajo NVARCHAR(MAX) = NULL
DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)
--Variables para LOG
DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME =
DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID),
@procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
SET @error = 0
BEGIN TRY
--Si hay transacciones abiertas, el sistema no
funcionará
IF @NumTranAntes > 0
BEGIN
SET @error = 1
EXECUTE ibb.Log_Errores_Inserta N'El
trasvase de tablas no puede funcionar con una transacción abierta.',
0, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
RETURN
END
--Con el fin de identificar cada ejecución por un
nombre distinto, se va a crear dicho nombre a partir de una secuencia:
SELECT @idParalelo = NEXT VALUE FOR rca.SEQ_PARALELO,
@NombreParalelo = CONCAT('ParaleloRepositorio', @idParalelo)
--El Paralelo se va a componer de 3 Lotes de
Trabajos:
-- 1º Actualización de las tablas del
repositorio local. Sin transacciones
-- 2º Trasvase de Información desde el
repositorio local al remoto. Sin transacciones
-- 3º Actualización de HAHS_OLD por HASH_NEW y
poner indicador de fin del trabajo. Con transacciones
--Lote de Trabajos 1º:
-- 1º Crear el nuevo registro en
rca.ACTUALIZACIONES y eliminar datos de ejecuciones pasadas fallidas
-- 2º Si hay FKs locales en el respositorio:
Eliminarlas.
-- 3º Eliminar las FKs remotas.
SET @Trabajo = CONCAT(N'EXEC
SGR.rca.LimpiaTablasRemotas N''', TRY_CONVERT(nvarchar, @inicio, 121),
N''', ', @idParalelo, N', @error out')
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Iniciales', N'Limpia Tablas Remotas', 0, @Trabajo
SELECT @Parametros = CONCAT(N'N''', @NombreParalelo,
N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.EliminaFkLocales ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Iniciales', N'Elimina FKs Locales', 0, @Trabajo
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Iniciales', N'Elimina FKs Remotas', 0, N'EXEC SGR.rca.EliminaFkRemotas
@error out'
-- 4º Actualizar las tablas del repositorio
local en paralelo.
-- Nota: en los procedimientos
almacenados va incluido la regeneración de índices y estadísticas, si no,
habría
-- que hacer un
procedimiento almacenado para ello y ejecutarlo como trabajos Finales
SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla1 ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Paralelo', N'Carga Tabla1', 0, @Trabajo
SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla2 ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Paralelo', N'Carga Tabla2', 0, @Trabajo
-- 5º Si hay FKs locales en el repositorio:
Crearlas
SET @Trabajo = CONCAT(N'EXEC rca.CreaFkLocales ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1,
N'Finales', N'Crea FKs Locales', 0, @Trabajo
--Lote de Trabajos 2º:
-- 1º Traspasar la información desde el local
al remoto en paralelo
-- 2º Crear las FKs remotas
-- 3º Regenerar los índices remotos
SELECT @Parametros = CONCAT(@idParalelo, N', N''',
@NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC
rca.Traspasa_Tabla1 ', @Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Paralelo', N'Traspasa Tabla1', 0, @Trabajo
SELECT @Trabajo = CONCAT(N'EXEC rca.Traspasa_Tabla2
', @Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Paralelo', N'Traspasa Tabla2', 0, @Trabajo
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Finales', N'Crea FKs Remotas', 0, N'EXEC SGR.rca.CreaFkRemotas @error
out'
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2,
N'Finales', N'Regenera Índices Remotos', 0, N'EXEC
SGR.rca.RegeneraIndicesRemotos @error out'
--PRIMERA OPCIÓN: Ejecutar en paralelo y con
transacciones la actualización de los HASH y la Fecha de Finalización
--Lote de trabajos 3º, con transacciones:
-- 1º Actualizar los HASHs de las tablas
-- 2º Poner la fecha de finalización en el
remoto
SELECT @Parametros = CONCAT(N'''', @NombreParalelo,
N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.ActualizaHash_Tabla1 ',
@Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3,
N'Paralelo', N'ActualizaHash Tabla1', 1, @Trabajo
SELECT @Trabajo = CONCAT(N'EXEC
rca.ActualizaHash_Tabla2 ', @Parametros)
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3,
N'Paralelo', N'ActualizaHash Tabla2', 1, @Trabajo
SELECT @Duracion = ibb.TimeSpanToNow(@inicio),
@Trabajo = CONCAT(N'EXEC SGR.rca.UpdateFinActualizacion ', @idParalelo,
N', ''', @Duracion, N''', ''',
TRY_CONVERT(nvarchar, SYSDATETIME(), 121), N''', @error out')
EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3,
N'Paralelo', N'Actualiza Fecha Fin', 1, @Trabajo
--Opciones para la ejecución de los Trabajos
-- Se han creado tres lotes de trabajos, y
después de experimentar varias veces comprobamos que los siguientes
grados
-- de paralelismo son los óptimos
EXEC ibbclr.Parallel_SetOption_MaxTaskBatch
@NombreParalelo, 1, 6
EXEC ibbclr.Parallel_SetOption_MaxTaskBatch
@NombreParalelo, 2, 20
EXEC ibbclr.Parallel_SetOption_MaxTaskBatch
@NombreParalelo, 3, 15
-- Se indica que las transacciones a crear
serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta
una
-- Transacción será de 5 minutos. Este valor
deberá ser tal que sea el tiempo total de duración del 3º lote, ya que
es el
-- único que tiene transacciones
EXEC ibbclr.Parallel_SetOptions @NombreParalelo,
N'ReadCommitted', 5
--Ejecución del Paralelo
EXEC @error = ibbclr.Parallel_Execute @NombreParalelo
----SEGUNDA OPCION: Ejecutar el procedimiento
almacenado rca.Actualiza_Hash_Todas_Tablas_Secuencial dentro de una
transacción
--BEGIN TRAN
--EXEC rca.Actualiza_Hash_Todas_Tablas_Secuencial
@NombreParalelo, @idParalelo, @inicio, @error out
--IF @error = 0
-- COMMIT
--ELSE
-- ROLLBACK
IF @error = 0
EXECUTE ibb.Log_Errores_Inserta N'Se ha
ejecutado el Paralelo.',
0, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
ELSE
EXECUTE ibb.Log_Errores_Inserta N'ERROR
ejecutando el Paralelo.',
10, @NombreParalelo, @inicio,
@nameDB, @esquema, @procedimiento
END TRY
BEGIN CATCH
DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(),
@ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) =
ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(),
@ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT =
ERROR_STATE()
EXECUTE ibb.Log_Errores_Inserta N'ERROR ejecutando el
Paralelo.',
10, @NombreParalelo, NULL, @nameDB,
@esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA,
@ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD,
@ERR_SQL_ESTADO
SET @error = @ERR_SQL_NUM_ERROR
END CATCH
END
GO
Una vez programado todo, la ejecución es bastante sencilla:
DECLARE @error int
EXEC rca.RellenaTablasParalelo @error out
En la tabla 'ibb.Log_Errores' aparecerán los mensajes de LOG junto con los mensajes de ERROR si los hubiese. Los mensajes son bastante largos, y contienen saltos de línea, por lo que se recomienda activar la opción de conservar CR/LF de SSMS, para poder copiarlos y pegarlos donde se puedan leer completos.
No entra dentro de los objetivos de este capítulo, pero al algoritmo que aquí se describe se le puede añadir funcnionalidad para recuperación ante desastres, es decir, recuparación de las tablas del repositorio del OLTP desde las tablas del repositorio de BI, y viceversa.