Friday, August 27, 2010

Data Change Tracking via the Streams

Recently I’ve got an assignment to implement a facility which would capture the changes in a table once it’s appeared there and propagate it further on through EMS to the subscribers. Here I’d like to describe the way we implemented such a mechanism.

We’ve considered a bunch of solutions we found useful like Change Notification, Change Data Capture, Trigger based mechanism, etc., but decided to stop on the Streams as it was the winner in correlation between speed, maintainability, and resource consummation.

In a nutshell


Capture catches the changes satisfying the basic table filter rules. Afterwards it pushes them down to Apply which uses manually written DML Handler. The Handler is written in the way, when it takes LCR, extracts the necessary information (like PK ID or so), piles it up into a user defined queue, and simply ignores the LCR without any further processing. In case with more complicated structures (which is the case for me), I put the caught changes into an interim cache collection and publish them into the queue using a pre-commit handler (also manually written).

The paragraph above might seem a tad mouthful for a person who isn’t really into the Streams, ergo see further example to comprehend (if you’re interested).

Preparation


First of all the schema you want to the Streams code to work under must be a granted with Streams administrator permissions:
BEGIN
  dbms_streams_auth.grant_admin_privilege(grantee => 'SCHEMA_NAME');
END;
/
Secondly the user must be granted with the DBA role explicitly. Once it’s done, we can start setting up the test environment.

Now let’s create the test table the changes in which we’re going to catch:
CREATE TABLE t1
(
  id NUMBER(15) PRIMARY KEY pk_t1,
  f1 NUMBER(15) NOT NULL,
  padding rpad('x', 500)
);
I’m going to show you how to catch all the unique values in the f1 field for the table above. I will publish all the f1 values into the queue below:
BEGIN
  dbms_aqadm.create_queue_table(queue_table        => 'ct_t1_tbl',
                                queue_payload_type => 'sys.aq$_jms_text_message',
                                multiple_consumers => FALSE);
  dbms_aqadm.create_queue(queue_name  => 'ct_t1',
                          queue_table => 'ct_t1_tbl');
  dbms_aqadm.start_queue(queue_name => 'ct_t1');

  COMMIT;
END;
/
The messages from that queue can be picked up from within either Oracle or Java code.

The Queue


The Streams uses a (buffered) queue to pass the changes over between capture and apply. Let’s create it:
BEGIN
  dbms_streams_adm.set_up_queue(queue_table => 'STRM_QT',
                                queue_name  => 'STRM_Q',
                                queue_user  => 'STREAMS_ADMIN', -- put here your user
                                COMMENT     => 'The Streams queue');
END;
/

The Capture

Before we create the capture, we need to instantiate the table:
BEGIN
  dbms_capture_adm.prepare_table_instantiation(table_name => 'T1');
END;
/
The step above is necessary. Oracle scans the table and jots down the SCN starting from which it can capture the changes. Also it will automatically add supplemental log groups for the PK. To check if the table is capture ready examine the dictionary dba_capture_prepared_tables.

As we’re going to capture the f1 field which isn’t present in the PK we have to add manually a supplemental log group for that column so it would appear in the redo log every time DML occurs against the table:
ALTER TABLE T1 ADD SUPPLEMENTAL LOG GROUP GR1 (F1) ALWAYS;

To check if the column is being logged, examine the views user_log_groups and user_log_group_columns.

Now we can create the capture having built the logminer dictionary before of course:
DECLARE
  l_first_scn NUMBER;
BEGIN
  dbms_capture_adm.build(l_first_scn);
  dbms_capture_adm.create_capture(queue_name                => 'STRM_Q',
                                  capture_name              => 'CPT01',
                                  start_scn                 => l_first_scn,
                                  first_scn                 => l_first_scn,
                                  checkpoint_retention_time => 1);
  dbms_streams_adm.add_table_rules(table_name   => 'T1',
                                         streams_type => 'CAPTURE',
                                         streams_name => 'CPT01',
                                         queue_name   => 'STRM_Q',
                                         include_dml  => TRUE,
                                         include_ddl  => FALSE);
END;
/

The DML and Pre-Commit Handlers


The whole peculiar logic is concluded in the next package:
CREATE OR REPLACE PACKAGE strms IS
  PROCEDURE dml_handler(p_lcr IN sys.anydata);
  PROCEDURE precommit_handler(commit_number IN NUMBER);
END strms;
/

CREATE OR REPLACE PACKAGE BODY strms IS

  -- the cache structure for LCRs
  TYPE t_cache IS TABLE OF VARCHAR2(1) INDEX BY PLS_INTEGER;
  l_cache t_cache;

  ---------------------------------------------------------------------------------------------------

  PROCEDURE publish2aq
  (
    p_entity_id IN NUMBER,
    p_msg       IN VARCHAR2
  ) IS
    l_msg           sys.aq$_jms_text_message;
    l_queue_options dbms_aq.enqueue_options_t;
    l_msg_props     dbms_aq.message_properties_t;
    l_queue_name    VARCHAR2(64) := 'STRM_Q';
    msg_id          RAW(16);
  BEGIN
    -- Ensure what is sent will be a JMS message
    l_msg := sys.aq$_jms_text_message.construct();
  
    l_msg.set_string_property('entity', p_entity_id);
    l_msg.set_text(p_msg);
  
    dbms_aq.enqueue(queue_name         => l_queue_name,
                    enqueue_options    => l_queue_options,
                    message_properties => l_msg_props,
                    payload            => l_msg,
                    msgid              => msg_id);
  END publish2aq;

  ---------------------------------------------------------------------------------------------------

  PROCEDURE dml_handler(p_lcr IN sys.anydata) IS
    lcr sys.lcr$_row_record;
    rc  PLS_INTEGER;
  
    l_owner           VARCHAR2(30);
    l_object          VARCHAR2(30);
    l_entity          NUMBER(15);
    l_id              NUMBER(15);
    l_table_id        NUMBER(15);
    l_hierarchy_level NUMBER(2);
    l_operation       VARCHAR2(1);
    res               PLS_INTEGER;
  BEGIN
    rc := p_lcr.getobject(lcr);
  
    l_operation := substr(lcr.get_command_type(), 1, 1);
    l_owner     := lcr.get_object_owner();
    l_object    := lcr.get_object_name();
  
    -- if the proc has fired against another object, just returning
    IF l_owner != 'STERAMS_ADMIN' OR l_object != 'T1' THEN
      RETURN;
    END IF;
  
    res := sys.anydata.getnumber(lcr.get_value(value_type  => CASE l_operation WHEN 'D' THEN 'OLD' ELSE 'NEW' END,
                                               column_name => 'F1'),
                                 l_id);
  
    -- Putting into the cache
    IF NOT l_cache.exists(l_id) THEN
      -- in my case I store the operation performed on the row. In case when within a transaction
      -- it was performed couple of operations against one row I calculated the result operation.
      -- This example isn't supposed to be so complicated, so I haven't included such code
      l_cache(l_id) := l_operation;
    END IF;
  
  END dml_handler;

  ---------------------------------------------------------------------------------------------------

  PROCEDURE precommit_handler(commit_number IN NUMBER) IS
    idx PLS_INTEGER;
  BEGIN
    -- If there's nothing to publish just returning
    IF l_cache.count = 0 THEN
      RETURN;
    END IF;
  
    idx := l_cache.first;
    WHILE idx IS NOT NULL LOOP
      -- May be any other publishing way
      publish2aq(idx, idx);
      idx := l_cache.next(idx);
    END LOOP;
  
    l_cache.delete;
  
  END precommit_handler;

END strms;
/

The Apply


Now let’s create the apply process and tie it with the handlers:
BEGIN
  dbms_apply_adm.create_apply(queue_name     => 'STRM_Q',
                              apply_name     => 'APL01',
                              apply_user     => 'STERAMS_ADMIN',
                              apply_captured => TRUE);
  dbms_apply_adm.alter_apply(apply_name        => 'APL01',
                             precommit_handler => 'STERAMS_ADMIN..strms.PRECOMMIT_HANDLER');
  dbms_apply_adm.set_parameter(apply_name => 'APL01',
                               parameter  => 'disable_on_error',
                               VALUE      => 'N');
  FOR i IN 1 .. 4 LOOP
    dbms_apply_adm.set_dml_handler(object_name    => 'T1',
                                   object_type    => 'TABLE',
                                   operation_name => CASE i WHEN 1 THEN 'INSERT'
                                                            WHEN 2 THEN 'UPDATE'
                                                            WHEN 3 THEN 'DELETE'
                                                            ELSE 'LOB_UPDATE' END,
                                   user_procedure => 'STREAMS_ADMIN..strms.DML_HANDLER',
                                   assemble_lobs  => TRUE);
  END LOOP;
END;
/
It’s left only to start the Streams:
BEGIN
  dbms_capture_adm.start_capture(capture_name => 'CPT01');
  dbms_apply_adm.start_apply(apply_name => 'APL01');
END;
/