Drizzled Public API Documentation

create_replication.cc
1 /* - mode: c; c-basic-offset: 2; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
3  *
4  * Copyright (C) 2010 Brian Aker
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20 
21 #include <config.h>
22 
23 #include <drizzled/message.h>
24 #include "read_replication.h"
25 #include "create_replication.h"
26 
27 #ifdef UNIV_NONINL
28 #include "dict0crea.ic"
29 #endif
30 
31 #include "btr0pcur.h"
32 #include "btr0btr.h"
33 #include "page0page.h"
34 #include "mach0data.h"
35 #include "dict0boot.h"
36 #include "dict0dict.h"
37 #include "que0que.h"
38 #include "row0ins.h"
39 #include "row0mysql.h"
40 #include "pars0pars.h"
41 #include "trx0roll.h"
42 #include "usr0sess.h"
43 #include "ut0vec.h"
44 #include "row0merge.h"
45 #include "row0mysql.h"
46 
47 UNIV_INTERN ulint dict_create_sys_replication_log(void)
48 {
49  dict_table_t* table1;
50  ulint error;
51  trx_t *trx;
52 
53  mutex_enter(&(dict_sys->mutex));
54 
55  table1 = dict_table_get_low("SYS_REPLICATION_LOG");
56 
58 
59  if (table1)
60  {
61  mutex_exit(&(dict_sys->mutex));
62 
63  return(DB_SUCCESS);
64  }
65 
66  mutex_exit(&(dict_sys->mutex));
67 
69 
70  trx->op_info= "creating replication sys table";
71 
72  row_mysql_lock_data_dictionary(trx);
73 
75 
76 
77  error = que_eval_sql(info,
78  "PROCEDURE CREATE_SYS_REPLICATION_LOG_PROC () IS\n"
79  "BEGIN\n"
80  "CREATE TABLE SYS_REPLICATION_LOG(ID INT(8), SEGID INT, COMMIT_ID INT(8), END_TIMESTAMP INT(8), ORIGINATING_SERVER_UUID BLOB, ORIGINATING_COMMIT_ID INT(8), MESSAGE_LEN INT, MESSAGE BLOB);\n"
81  "CREATE UNIQUE CLUSTERED INDEX PRIMARY ON SYS_REPLICATION_LOG (ID, SEGID);\n"
82  "CREATE INDEX COMMIT_IDX ON SYS_REPLICATION_LOG (COMMIT_ID, ID);\n"
83  "END;\n"
84  , FALSE, trx);
85 
86 
87 
88  if (error != DB_SUCCESS)
89  {
90  fprintf(stderr, "InnoDB: error %lu in creation.\n", (ulong) error);
91 
92  ut_a(error == DB_OUT_OF_FILE_SPACE || error == DB_TOO_MANY_CONCURRENT_TRXS);
93 
94  fprintf(stderr,
95  "InnoDB: creation failed\n"
96  "InnoDB: tablespace is full\n"
97  "InnoDB: dropping incompletely created SYS_REPLICATION_LOG table.\n");
98 
99  row_drop_table_for_mysql("SYS_REPLICATION_LOG", trx, TRUE);
100 
101  error = DB_MUST_GET_MORE_FILE_SPACE;
102  }
103 
105 
107 
108  trx_free_for_mysql(trx);
109 
110  return(error);
111 }
112 
113 UNIV_INTERN int read_replication_log_table_message(const char* table_name, drizzled::message::Table *table_message)
114 {
115  std::string search_string(table_name);
116  boost::algorithm::to_lower(search_string);
117 
118  if (search_string.compare("sys_replication_log") != 0)
119  return -1;
120 
121  drizzled::message::Engine *engine= table_message->mutable_engine();
122  engine->set_name("InnoDB");
123  table_message->set_name("SYS_REPLICATION_LOG");
124  table_message->set_schema("DATA_DICTIONARY");
125  table_message->set_type(drizzled::message::Table::STANDARD);
126  table_message->set_creation_timestamp(0);
127  table_message->set_update_timestamp(0);
128 
129  drizzled::message::Table::TableOptions *options= table_message->mutable_options();
130  options->set_collation_id(drizzled::my_charset_bin.number);
131  options->set_collation(drizzled::my_charset_bin.name);
132  drizzled::message::set_is_replicated(*table_message, false);
133 
134  drizzled::message::Table::Field *field= table_message->add_field();
135  field->set_name("ID");
136  field->set_type(drizzled::message::Table::Field::BIGINT);
137 
138  field= table_message->add_field();
139  field->set_name("SEGID");
140  field->set_type(drizzled::message::Table::Field::INTEGER);
141 
142  field= table_message->add_field();
143  field->set_name("COMMIT_ID");
144  field->set_type(drizzled::message::Table::Field::BIGINT);
145 
146  field= table_message->add_field();
147  field->set_name("END_TIMESTAMP");
148  field->set_type(drizzled::message::Table::Field::BIGINT);
149 
150  field= table_message->add_field();
151  field->set_name("ORIGINATING_SERVER_UUID");
152  field->set_type(drizzled::message::Table::Field::BLOB);
153 
154  field= table_message->add_field();
155  field->set_name("ORIGINATING_COMMIT_ID");
156  field->set_type(drizzled::message::Table::Field::BIGINT);
157 
158  field= table_message->add_field();
159  field->set_name("MESSAGE_LEN");
160  field->set_type(drizzled::message::Table::Field::INTEGER);
161 
162  field= table_message->add_field();
163  field->set_name("MESSAGE");
164  field->set_type(drizzled::message::Table::Field::BLOB);
165  drizzled::message::Table::Field::StringFieldOptions *stropt= field->mutable_string_options();
166  stropt->set_collation_id(drizzled::my_charset_bin.number);
167  stropt->set_collation(drizzled::my_charset_bin.name);
168 
169  drizzled::message::Table::Index *index= table_message->add_indexes();
170  index->set_name("PRIMARY");
171  index->set_is_primary(true);
172  index->set_is_unique(true);
173  index->set_type(drizzled::message::Table::Index::BTREE);
174  index->set_key_length(12);
175  drizzled::message::Table::Index::IndexPart *part= index->add_index_part();
176  part->set_fieldnr(0);
177  part->set_compare_length(8);
178  part= index->add_index_part();
179  part->set_fieldnr(1);
180  part->set_compare_length(4);
181 
182  index= table_message->add_indexes();
183  index->set_name("COMMIT_IDX");
184  index->set_is_primary(false);
185  index->set_is_unique(false);
186  index->set_type(drizzled::message::Table::Index::BTREE);
187  index->set_key_length(16);
188  part= index->add_index_part();
189  part->set_fieldnr(2);
190  part->set_compare_length(8);
191  part= index->add_index_part();
192  part->set_fieldnr(0);
193  part->set_compare_length(8);
194 
195  return 0;
196 }
197 
198 extern dtuple_t* row_get_prebuilt_insert_row(row_prebuilt_t* prebuilt);
199 
200 ulint insert_replication_message(const char *message, size_t size,
201  trx_t *trx, uint64_t trx_id,
202  uint64_t end_timestamp, bool is_end_segment,
203  uint32_t seg_id, const char *server_uuid,
204  bool use_originating_server_uuid,
205  const char *originating_server_uuid,
206  uint64_t originating_commit_id)
207 {
208  ulint error;
209  row_prebuilt_t* prebuilt; /* For reading rows */
210  dict_table_t *table;
211  que_thr_t* thr;
212  byte* data;
213 
214  table = dict_table_get("SYS_REPLICATION_LOG",TRUE);
215 
216  prebuilt = row_create_prebuilt(table);
217 
218  if (prebuilt->trx != trx)
219  {
220  row_update_prebuilt_trx(prebuilt, trx);
221  }
222 
223  /* DDL operations create table/drop table call
224  * innobase_commit_low() which will commit the trx
225  * that leaves the operation of committing to the
226  * log in a new trx. If that is the case we need
227  * to keep track and commit the trx later in this
228  * function.
229  */
230  bool is_started= true;
231  if (trx->conc_state == TRX_NOT_STARTED)
232  {
233  is_started= false;
234  }
235 
236  dtuple_t* dtuple= row_get_prebuilt_insert_row(prebuilt);
237  dfield_t *dfield;
238 
239  dfield = dtuple_get_nth_field(dtuple, 0);
240  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
241  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&trx_id, 8, dict_table_is_comp(prebuilt->table));
242  dfield_set_data(dfield, data, 8);
243 
244  dfield = dtuple_get_nth_field(dtuple, 1);
245 
246  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
247  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&seg_id, 4, dict_table_is_comp(prebuilt->table));
248  dfield_set_data(dfield, data, 4);
249 
250  uint64_t commit_id= 0;
251  if (is_end_segment)
252  {
253  commit_id= trx_sys_commit_id.increment();
254  }
255 
256  dfield = dtuple_get_nth_field(dtuple, 2);
257  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
258  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&commit_id, 8, dict_table_is_comp(prebuilt->table));
259  dfield_set_data(dfield, data, 8);
260 
261  dfield = dtuple_get_nth_field(dtuple, 3);
262  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
263  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&end_timestamp, 8, dict_table_is_comp(prebuilt->table));
264  dfield_set_data(dfield, data, 8);
265 
266  if (not use_originating_server_uuid)
267  {
268  /* This transaction originated from this server, rather then being
269  replicated to this server reset the values to reflect that */
270  originating_server_uuid= server_uuid;
271  originating_commit_id= commit_id;
272  }
273 
274  dfield = dtuple_get_nth_field(dtuple, 4);
275  dfield_set_data(dfield, originating_server_uuid, 36);
276 
277  dfield = dtuple_get_nth_field(dtuple, 5);
278  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 8));
279  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&originating_commit_id, 8, dict_table_is_comp(prebuilt->table));
280  dfield_set_data(dfield, data, 8);
281 
282  dfield = dtuple_get_nth_field(dtuple, 6);
283  data= static_cast<byte*>(mem_heap_alloc(prebuilt->heap, 4));
284  row_mysql_store_col_in_innobase_format(dfield, data, TRUE, (byte*)&size, 4, dict_table_is_comp(prebuilt->table));
285  dfield_set_data(dfield, data, 4);
286 
287  dfield = dtuple_get_nth_field(dtuple, 7);
288  dfield_set_data(dfield, message, size);
289 
290  ins_node_t* node = prebuilt->ins_node;
291 
292  thr = que_fork_get_first_thr(prebuilt->ins_graph);
293 
294  if (prebuilt->sql_stat_start) {
295  node->state = INS_NODE_SET_IX_LOCK;
296  prebuilt->sql_stat_start = FALSE;
297  } else {
298  node->state = INS_NODE_ALLOC_ROW_ID;
299  }
300 
302 
303 //run_again:
304  thr->run_node = node;
305  thr->prev_node = node;
306 
307  row_ins_step(thr);
308 
309  error = trx->error_state;
310 
312  row_prebuilt_free(prebuilt, FALSE);
313 
314  if (! is_started)
315  {
317  }
318 
319  return error;
320 }
321 
322 UNIV_INTERN read_replication_state_st *replication_read_init(void)
323 {
325 
326  mutex_enter(&(dict_sys->mutex));
327 
328  mtr_start(&state->mtr);
329  state->sys_tables= dict_table_get_low("SYS_REPLICATION_LOG");
330  state->sys_index= UT_LIST_GET_FIRST(state->sys_tables->indexes);
331 
332  mutex_exit(&(dict_sys->mutex));
333 
334  btr_pcur_open_at_index_side(TRUE, state->sys_index, BTR_SEARCH_LEAF, &state->pcur, TRUE, &state->mtr);
335 
336  return state;
337 }
338 
339 UNIV_INTERN void replication_read_deinit(struct read_replication_state_st *state)
340 {
341  btr_pcur_close(&state->pcur);
342  mtr_commit(&state->mtr);
343  delete state;
344 }
345 
346 UNIV_INTERN struct read_replication_return_st replication_read_next(struct read_replication_state_st *state)
347 {
348  struct read_replication_return_st ret;
349  const rec_t *rec;
350 
351  btr_pcur_move_to_next_user_rec(&state->pcur, &state->mtr);
352 
353  rec= btr_pcur_get_rec(&state->pcur);
354 
355  while (btr_pcur_is_on_user_rec(&state->pcur))
356  {
357  const byte* field;
358  ulint len;
359 
360  // Is the row deleted? If so go fetch the next
361  if (rec_get_deleted_flag(rec, 0))
362  continue;
363 
364  // Store transaction id
365  field = rec_get_nth_field_old(rec, 0, &len);
366  byte idbyte[8];
367  convert_to_mysql_format(idbyte, field, 8);
368  ret.id= *(uint64_t *)idbyte;
369 
370  // Store segment id
371  field = rec_get_nth_field_old(rec, 1, &len);
372  byte segbyte[4];
373  convert_to_mysql_format(segbyte, field, 4);
374  ret.seg_id= *(uint32_t *)segbyte;
375 
376  field = rec_get_nth_field_old(rec, 4, &len);
377  byte commitbyte[8];
378  convert_to_mysql_format(commitbyte, field, 8);
379  ret.commit_id= *(uint64_t *)commitbyte;
380 
381  field = rec_get_nth_field_old(rec, 5, &len);
382  byte timestampbyte[8];
383  convert_to_mysql_format(timestampbyte, field, 8);
384  ret.end_timestamp= *(uint64_t *)timestampbyte;
385 
386  field = rec_get_nth_field_old(rec, 6, &len);
387  ret.originating_server_uuid= (char *)field;
388 
389  field = rec_get_nth_field_old(rec, 7, &len);
390  byte originatingcommitbyte[8];
391  convert_to_mysql_format(originatingcommitbyte, field, 8);
392  ret.originating_commit_id= *(uint64_t *)originatingcommitbyte;
393 
394  // Handler message
395  field = rec_get_nth_field_old(rec, 9, &len);
396  ret.message= (char *)field;
397  ret.message_length= len;
398 
399  // @todo double check that "field" will continue to be value past this
400  // point.
401  btr_pcur_store_position(&state->pcur, &state->mtr);
402  mtr_commit(&state->mtr);
403 
404  mtr_start(&state->mtr);
405 
406  btr_pcur_restore_position(BTR_SEARCH_LEAF, &state->pcur, &state->mtr);
407 
408  return ret;
409  }
410 
411  /* end of index */
412  memset(&ret, 0, sizeof(ret));
413 
414  return ret;
415 }
416 
417 UNIV_INTERN void convert_to_mysql_format(byte* out, const byte* in, int len)
418 {
419  byte *ptr;
420  ptr = out + len;
421 
422  for (;;) {
423  ptr--;
424  *ptr = *in;
425  if (ptr == out) {
426  break;
427  }
428  in++;
429  }
430 
431  out[len - 1] = (byte) (out[len - 1] ^ 128);
432 
433 }
UNIV_INTERN void que_thr_stop_for_mysql_no_error(que_thr_t *thr, trx_t *trx)
Definition: que0que.cc:1033
UNIV_INTERN void trx_free_for_mysql(trx_t *trx)
Definition: trx0trx.cc:342
dict_table_t * table
Definition: row0mysql.h:592
UNIV_INLINE ibool btr_pcur_is_on_user_rec(const btr_pcur_t *cursor)
ins_node_t * ins_node
Definition: row0mysql.h:646
UNIV_INLINE ibool dict_table_is_comp(const dict_table_t *table)
que_fork_t * ins_graph
Definition: row0mysql.h:665
unsigned sql_stat_start
Definition: row0mysql.h:596
que_node_t * run_node
Definition: que0que.h:376
ulint conc_state
Definition: trx0trx.h:480
UNIV_INTERN byte * row_mysql_store_col_in_innobase_format(dfield_t *dfield, byte *buf, ibool row_format_col, const byte *mysql_data, ulint col_len, ulint comp)
Definition: row0mysql.cc:301
UNIV_INTERN ulint trx_commit_for_mysql(trx_t *trx)
Definition: trx0trx.cc:1596
UNIV_INLINE dict_table_t * dict_table_get_low(const char *table_name)
UNIV_INLINE void btr_pcur_close(btr_pcur_t *cursor)
UNIV_INTERN void trx_sys_read_commit_id(void)
Definition: trx0sys.cc:697
UNIV_INLINE ulint rec_get_deleted_flag(const rec_t *rec, ulint comp)
UNIV_INTERN void mtr_commit(mtr_t *mtr) __attribute__((nonnull))
Definition: mtr0mtr.cc:247
UNIV_INLINE void dfield_set_data(dfield_t *field, const void *data, ulint len)
const char * op_info
Definition: trx0trx.h:477
UNIV_INTERN void row_prebuilt_free(row_prebuilt_t *prebuilt, ibool dict_locked)
Definition: row0mysql.cc:712
UNIV_INTERN void btr_pcur_store_position(btr_pcur_t *cursor, mtr_t *mtr)
Definition: btr0pcur.cc:89
#define ut_a(EXPR)
Definition: ut0dbg.h:105
UNIV_INTERN pars_info_t * pars_info_create(void)
Definition: pars0pars.cc:1938
UNIV_INLINE void * mem_heap_alloc(mem_heap_t *heap, ulint n)
UNIV_INTERN ulint que_eval_sql(pars_info_t *info, const char *sql, ibool reserve_dict_mutex, trx_t *trx)
Definition: que0que.cc:1392
UNIV_INLINE que_thr_t * que_fork_get_first_thr(que_fork_t *fork)
#define UT_LIST_GET_FIRST(BASE)
Definition: ut0lst.h:224
UNIV_INTERN row_prebuilt_t * row_create_prebuilt(dict_table_t *table)
Definition: row0mysql.cc:650
dict_sys_t * dict_sys
Definition: dict0dict.cc:63
UNIV_INTERN trx_t * trx_allocate_for_mysql(void)
Definition: trx0trx.cc:199
UNIV_INTERN void que_thr_move_to_run_state_for_mysql(que_thr_t *thr, trx_t *trx)
Definition: que0que.cc:1001
que_node_t * prev_node
Definition: que0que.h:379
UNIV_INTERN que_thr_t * row_ins_step(que_thr_t *thr)
Definition: row0ins.cc:2414
drizzled::atomic< uint64_t > trx_sys_commit_id
Definition: trx0sys.cc:96
UNIV_INLINE void mtr_start(mtr_t *mtr) __attribute__((nonnull))
UNIV_INLINE ibool btr_pcur_move_to_next_user_rec(btr_pcur_t *cursor, mtr_t *mtr)
UNIV_INTERN dict_table_t * dict_table_get(const char *table_name, ibool inc_mysql_count)
Definition: dict0dict.cc:746
mem_heap_t * heap
Definition: row0mysql.h:643
UNIV_INTERN void row_update_prebuilt_trx(row_prebuilt_t *prebuilt, trx_t *trx)
Definition: row0mysql.cc:798
UNIV_INLINE void btr_pcur_open_at_index_side(ibool from_left, dict_index_t *index, ulint latch_mode, btr_pcur_t *pcur, ibool do_init, mtr_t *mtr)
ulint error_state
Definition: trx0trx.h:601
UNIV_INTERN void row_mysql_unlock_data_dictionary(trx_t *trx)
Definition: row0mysql.cc:1790
UNIV_INTERN int row_drop_table_for_mysql(const char *name, trx_t *trx, ibool drop_db)
Definition: row0mysql.cc:3033