Provides Mysql specific functionality for database replication
Adds a big (8 byte value), auto-incrementing primary key column to the specified table.
table_name: name of the target table
key_name: name of the primary key column
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 239 def add_big_primary_key(table_name, key_name) execute( alter table #{table_name} add column #{key_name} bigint not null auto_increment primary key) end
Removes the custom sequence setup for the specified table. If no more rubyrep sequences are left, removes the sequence table.
rep_prefix: not used (necessary) for the Postgres
table_name: name of the table
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 249 def clear_sequence_setup(rep_prefix, table_name) sequence_table_name = "#{rep_prefix}_sequences" if tables.include?(sequence_table_name) trigger_name = "#{rep_prefix}_#{table_name}_sequence" trigger_row = select_one( select * from information_schema.triggers where trigger_schema = database() and trigger_name = '#{trigger_name}') if trigger_row execute "DROP TRIGGER `#{trigger_name}`" execute "delete from #{sequence_table_name} where name = '#{table_name}'" unless select_one("select * from #{sequence_table_name}") # no more sequences left --> delete sequence table drop_table sequence_table_name.to_sym end end end end
Creates or replaces the replication trigger function. See create_replication_trigger for a descriptions of the params hash.
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 10 def create_or_replace_replication_trigger_function(params) execute( DROP PROCEDURE IF EXISTS `#{params[:trigger_name]}`;) activity_check = "" if params[:exclude_rr_activity] then activity_check = DECLARE active INT; SELECT count(*) INTO active FROM #{params[:activity_table]}; IF active <> 0 THEN LEAVE p; END IF; end execute( CREATE PROCEDURE `#{params[:trigger_name]}`(change_key varchar(2000), change_new_key varchar(2000), change_type varchar(1)) p: BEGIN #{activity_check} INSERT INTO #{params[:log_table]}(change_table, change_key, change_new_key, change_type, change_time) VALUES('#{params[:table]}', change_key, change_new_key, change_type, now()); END;) end
Creates a trigger to log all changes for the given table. params is a hash with all necessary information:
:trigger_name: name of the trigger
:table: name of the table that should be monitored
:keys: array of names of the key columns of the monitored table
:log_table: name of the table receiving all change notifications
:activity_table: name of the table receiving the rubyrep activity information
:key_sep: column seperator to be used in the key column of the log table
:exclude_rr_activity: if true, the trigger will check and filter out changes initiated by RubyRep
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 57 def create_replication_trigger(params) create_or_replace_replication_trigger_function params %(insert update delete).each do |action| execute( DROP TRIGGER IF EXISTS `#{params[:trigger_name]}_#{action}`;) # The created triggers can handle the case where the trigger procedure # is updated (that is: temporarily deleted and recreated) while the # trigger is running. # For that an MySQL internal exception is raised if the trigger # procedure cannot be found. The exception is caught by an trigger # internal handler. # The handler causes the trigger to retry calling the # trigger procedure several times with short breaks in between. trigger_var = action == 'delete' ? 'OLD' : 'NEW' if action == 'update' call_statement = "CALL `#{params[:trigger_name]}`(#{key_clause('OLD', params)}, #{key_clause('NEW', params)}, '#{action[0,1].upcase}');" else call_statement = "CALL `#{params[:trigger_name]}`(#{key_clause(trigger_var, params)}, null, '#{action[0,1].upcase}');" end execute( CREATE TRIGGER `#{params[:trigger_name]}_#{action}` AFTER #{action} ON `#{params[:table]}` FOR EACH ROW BEGIN DECLARE number_attempts INT DEFAULT 0; DECLARE failed INT; DECLARE CONTINUE HANDLER FOR 1305 BEGIN DO SLEEP(0.05); SET failed = 1; SET number_attempts = number_attempts + 1; END; REPEAT SET failed = 0; #{call_statement} UNTIL failed = 0 OR number_attempts >= 40 END REPEAT; END;) end end
Removes a trigger and related trigger procedure.
trigger_name: name of the trigger
table_name: name of the table for which the trigger exists
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 103 def drop_replication_trigger(trigger_name, table_name) %(insert update delete).each do |action| execute "DROP TRIGGER `#{trigger_name}_#{action}`;" end execute "DROP PROCEDURE `#{trigger_name}`;" end
Returns true if the named trigger exists for the named table.
trigger_name: name of the trigger
table_name: name of the table
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 113 def replication_trigger_exists?(trigger_name, table_name) !select_all("select 1 from information_schema.triggers where trigger_schema = database() and trigger_name = '#{trigger_name}_insert' and event_object_table = '#{table_name}'").empty? end
Returns all unadjusted sequences of the given table. Parameters:
rep_prefix: The prefix put in front of all replication related database objects as specified via Configuration#options. Is used to create the sequences table.
table_name: name of the table
Return value: a hash with
key: sequence name
value: a hash with
:increment: current sequence increment
:value: current value
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 129 def sequence_values(rep_prefix, table_name) # check if the table has an auto_increment column, return if not sequence_row = select_one( show columns from `#{table_name}` where extra = 'auto_increment') return {} unless sequence_row column_name = sequence_row['Field'] # check if the sequences table exists, create if necessary sequence_table_name = "#{rep_prefix}_sequences" unless tables.include?(sequence_table_name) create_table "#{sequence_table_name}".to_sym, :id => false, :options => 'ENGINE=MyISAM' do |t| t.column :name, :string t.column :current_value, :integer t.column :increment, :integer t.column :offset, :integer end ActiveRecord::Base.connection.execute( ALTER TABLE "#{sequence_table_name}" ADD CONSTRAINT #{sequence_table_name}_pkey PRIMARY KEY (name)) rescue nil end sequence_row = select_one("select current_value, increment, offset from #{sequence_table_name} where name = '#{table_name}'") if sequence_row == nil current_max = select_one( select max(`#{column_name}`) as current_max from `#{table_name}`)['current_max'].to_i return {column_name => { :increment => 1, :value => current_max } } else return {column_name => { :increment => sequence_row['increment'].to_i, :value => sequence_row['offset'].to_i } } end end
Ensures that the sequences of the named table (normally the primary key column) are generated with the correct increment and offset.
rep_prefix: not used (necessary) for the Postgres
table_name: name of the table (not used for Postgres)
increment: increment of the sequence
offset: offset
left_sequence_values:
hash as returned by #sequence_values for the left database
right_sequence_values:
hash as returned by #sequence_values for the right database
adjustment_buffer:
the "gap" that is created during sequence update to avoid concurrency problems
an increment of 2 and offset of 1 will lead to generation of odd
numbers.
# File lib/rubyrep/replication_extenders/mysql_replication.rb, line 187 def update_sequences( rep_prefix, table_name, increment, offset, left_sequence_values, right_sequence_values, adjustment_buffer) return if left_sequence_values.empty? column_name = left_sequence_values.keys[0] # check if the sequences table exists, create if necessary sequence_table_name = "#{rep_prefix}_sequences" current_max = [left_sequence_values[column_name][:value], right_sequence_values[column_name][:value]].max + adjustment_buffer new_start = current_max - (current_max % increment) + increment + offset sequence_row = select_one("select current_value, increment, offset from #{sequence_table_name} where name = '#{table_name}'") if sequence_row == nil # no sequence exists yet for the table, create it and the according # sequence trigger execute( insert into #{sequence_table_name}(name, current_value, increment, offset) values('#{table_name}', #{new_start}, #{increment}, #{offset})) trigger_name = "#{rep_prefix}_#{table_name}_sequence" execute( DROP TRIGGER IF EXISTS `#{trigger_name}`;) execute( CREATE TRIGGER `#{trigger_name}` BEFORE INSERT ON `#{table_name}` FOR EACH ROW BEGIN IF NEW.`#{column_name}` = 0 THEN UPDATE #{sequence_table_name} SET current_value = LAST_INSERT_ID(current_value + increment) WHERE name = '#{table_name}'; SET NEW.`#{column_name}` = LAST_INSERT_ID(); END IF; END;) elsif sequence_row['increment'].to_i != increment or sequence_row['offset'].to_i != offset # sequence exists but with incorrect values; update it execute( update #{sequence_table_name} set current_value = #{new_start}, increment = #{increment}, offset = #{offset} where name = '#{table_name}') end end
Generated with the Darkfish Rdoc Generator 2.