We’ve considered a bunch of solutions we found useful like Change Notification, Change Data Capture, Trigger based mechanism, etc., but decided to stop on the Streams as it was the winner in correlation between speed, maintainability, and resource consummation.
In a nutshell
Capture catches the changes satisfying the basic table filter rules. Afterwards it pushes them down to Apply which uses manually written DML Handler. The Handler is written in the way, when it takes LCR, extracts the necessary information (like PK ID or so), piles it up into a user defined queue, and simply ignores the LCR without any further processing. In case with more complicated structures (which is the case for me), I put the caught changes into an interim cache collection and publish them into the queue using a pre-commit handler (also manually written).
The paragraph above might seem a tad mouthful for a person who isn’t really into the Streams, ergo see further example to comprehend (if you’re interested).
Preparation
First of all the schema you want to the Streams code to work under must be a granted with Streams administrator permissions:
BEGIN dbms_streams_auth.grant_admin_privilege(grantee => 'SCHEMA_NAME'); END; /Secondly the user must be granted with the DBA role explicitly. Once it’s done, we can start setting up the test environment.
Now let’s create the test table the changes in which we’re going to catch:
CREATE TABLE t1 ( id NUMBER(15) PRIMARY KEY pk_t1, f1 NUMBER(15) NOT NULL, padding rpad('x', 500) );I’m going to show you how to catch all the unique values in the f1 field for the table above. I will publish all the f1 values into the queue below:
BEGIN dbms_aqadm.create_queue_table(queue_table => 'ct_t1_tbl', queue_payload_type => 'sys.aq$_jms_text_message', multiple_consumers => FALSE); dbms_aqadm.create_queue(queue_name => 'ct_t1', queue_table => 'ct_t1_tbl'); dbms_aqadm.start_queue(queue_name => 'ct_t1'); COMMIT; END; /The messages from that queue can be picked up from within either Oracle or Java code.
The Queue
The Streams uses a (buffered) queue to pass the changes over between capture and apply. Let’s create it:
BEGIN dbms_streams_adm.set_up_queue(queue_table => 'STRM_QT', queue_name => 'STRM_Q', queue_user => 'STREAMS_ADMIN', -- put here your user COMMENT => 'The Streams queue'); END; /
The Capture
Before we create the capture, we need to instantiate the table:BEGIN dbms_capture_adm.prepare_table_instantiation(table_name => 'T1'); END; /The step above is necessary. Oracle scans the table and jots down the SCN starting from which it can capture the changes. Also it will automatically add supplemental log groups for the PK. To check if the table is capture ready examine the dictionary dba_capture_prepared_tables.
As we’re going to capture the f1 field which isn’t present in the PK we have to add manually a supplemental log group for that column so it would appear in the redo log every time DML occurs against the table:
ALTER TABLE T1 ADD SUPPLEMENTAL LOG GROUP GR1 (F1) ALWAYS;
To check if the column is being logged, examine the views user_log_groups and user_log_group_columns.
Now we can create the capture having built the logminer dictionary before of course:
DECLARE l_first_scn NUMBER; BEGIN dbms_capture_adm.build(l_first_scn); dbms_capture_adm.create_capture(queue_name => 'STRM_Q', capture_name => 'CPT01', start_scn => l_first_scn, first_scn => l_first_scn, checkpoint_retention_time => 1); dbms_streams_adm.add_table_rules(table_name => 'T1', streams_type => 'CAPTURE', streams_name => 'CPT01', queue_name => 'STRM_Q', include_dml => TRUE, include_ddl => FALSE); END; /
The DML and Pre-Commit Handlers
The whole peculiar logic is concluded in the next package:
CREATE OR REPLACE PACKAGE strms IS PROCEDURE dml_handler(p_lcr IN sys.anydata); PROCEDURE precommit_handler(commit_number IN NUMBER); END strms; / CREATE OR REPLACE PACKAGE BODY strms IS -- the cache structure for LCRs TYPE t_cache IS TABLE OF VARCHAR2(1) INDEX BY PLS_INTEGER; l_cache t_cache; --------------------------------------------------------------------------------------------------- PROCEDURE publish2aq ( p_entity_id IN NUMBER, p_msg IN VARCHAR2 ) IS l_msg sys.aq$_jms_text_message; l_queue_options dbms_aq.enqueue_options_t; l_msg_props dbms_aq.message_properties_t; l_queue_name VARCHAR2(64) := 'STRM_Q'; msg_id RAW(16); BEGIN -- Ensure what is sent will be a JMS message l_msg := sys.aq$_jms_text_message.construct(); l_msg.set_string_property('entity', p_entity_id); l_msg.set_text(p_msg); dbms_aq.enqueue(queue_name => l_queue_name, enqueue_options => l_queue_options, message_properties => l_msg_props, payload => l_msg, msgid => msg_id); END publish2aq; --------------------------------------------------------------------------------------------------- PROCEDURE dml_handler(p_lcr IN sys.anydata) IS lcr sys.lcr$_row_record; rc PLS_INTEGER; l_owner VARCHAR2(30); l_object VARCHAR2(30); l_entity NUMBER(15); l_id NUMBER(15); l_table_id NUMBER(15); l_hierarchy_level NUMBER(2); l_operation VARCHAR2(1); res PLS_INTEGER; BEGIN rc := p_lcr.getobject(lcr); l_operation := substr(lcr.get_command_type(), 1, 1); l_owner := lcr.get_object_owner(); l_object := lcr.get_object_name(); -- if the proc has fired against another object, just returning IF l_owner != 'STERAMS_ADMIN' OR l_object != 'T1' THEN RETURN; END IF; res := sys.anydata.getnumber(lcr.get_value(value_type => CASE l_operation WHEN 'D' THEN 'OLD' ELSE 'NEW' END, column_name => 'F1'), l_id); -- Putting into the cache IF NOT l_cache.exists(l_id) THEN -- in my case I store the operation performed on the row. In case when within a transaction -- it was performed couple of operations against one row I calculated the result operation. -- This example isn't supposed to be so complicated, so I haven't included such code l_cache(l_id) := l_operation; END IF; END dml_handler; --------------------------------------------------------------------------------------------------- PROCEDURE precommit_handler(commit_number IN NUMBER) IS idx PLS_INTEGER; BEGIN -- If there's nothing to publish just returning IF l_cache.count = 0 THEN RETURN; END IF; idx := l_cache.first; WHILE idx IS NOT NULL LOOP -- May be any other publishing way publish2aq(idx, idx); idx := l_cache.next(idx); END LOOP; l_cache.delete; END precommit_handler; END strms; /
The Apply
Now let’s create the apply process and tie it with the handlers:
BEGIN dbms_apply_adm.create_apply(queue_name => 'STRM_Q', apply_name => 'APL01', apply_user => 'STERAMS_ADMIN', apply_captured => TRUE); dbms_apply_adm.alter_apply(apply_name => 'APL01', precommit_handler => 'STERAMS_ADMIN..strms.PRECOMMIT_HANDLER'); dbms_apply_adm.set_parameter(apply_name => 'APL01', parameter => 'disable_on_error', VALUE => 'N'); FOR i IN 1 .. 4 LOOP dbms_apply_adm.set_dml_handler(object_name => 'T1', object_type => 'TABLE', operation_name => CASE i WHEN 1 THEN 'INSERT' WHEN 2 THEN 'UPDATE' WHEN 3 THEN 'DELETE' ELSE 'LOB_UPDATE' END, user_procedure => 'STREAMS_ADMIN..strms.DML_HANDLER', assemble_lobs => TRUE); END LOOP; END; /It’s left only to start the Streams:
BEGIN dbms_capture_adm.start_capture(capture_name => 'CPT01'); dbms_apply_adm.start_apply(apply_name => 'APL01'); END; /
No comments:
Post a Comment