VStream

Change event streams

Vitess Gateways (vtgate) provide a VStream service that allows clients to subscribe to a change event stream for a set of tables.

Use Cases #

  • Change Data Capture (CDC): VStream can be used to capture changes to a table and send them to a downstream system. This is useful for building real-time data pipelines.

Overview #

VStream supports copying the current contents of a table — as you will often not have the binary logs going back to the creation of the table — and then begin streaming new changes to the table from that point on. It supports resuming this initial copy phase if it's interrupted for any reason. It also supports automatic handling of resharding events — if the VStream is connected throughout then it will automatically transition from the old shards to the new when traffic is switched (SwitchTraffic or ReverseTraffic), and if you were not connected but re-connect after traffic is switched (SwitchTraffic or ReverseTraffic) but before the old shards are removed, it will automatically catch up on any missed changes on the old shards before seamlessly transitioning to the new shards and continuing to stream all changes made there.

Events in the stream are MySQL row based binary log events — with extended metadata — and can be processed by event bridges which support Vitess such as Debezium. Other products such as AirByte can also be used with custom Vitess connectors.

We recommend Debezium as it has native Vitess support and has been used in production environments by many Vitess users.

API Details #

VStream is a gRPC that is part of the vtgate service and is accessible via a vtgate process's --grpc_port.

RPC Parameters #

Context #

Type Context
Required
Default none

In addition to the typical Context usage, it can contain a custom key-value pair where the key is 1 and the value is a CallerID. This value is then passed along to tablets to identify the originating client for the request. It is not meant to be secure, but primarily informational. The client can provide whatever info they want in the CallerID fields and they will be trusted by the servers as this information is primarily used to aid in monitoring and debugging. The vtgate propagates the value to the source vttablet processes and the tablets may use this information for various monitoring, metrics, and logging purposes. It can, however, also be used for other purposes such as denying the client access to tables during a migration (MoveTables or Reshard).

TabletType #

Type TabletType
Required
Default UNKNOWN (you must specify a valid type)

The tablet type to use when selecting stream source tablets.

VGtid #

Type VGtid
Required

The keyspace, shard, and GTID position list to start streaming from. If no ShardGtid.Gtid value is provided then a table copy phase will be initiated for the tables matched by the provided filter on the given shard.

If the ShardGtid.Shard value is omitted, this means that all shards in the keyspace specified in the ShardGtid.Keyspace value are included. Additionally, if the ShardGtid.Keyspace value has a / prefix, you can use regular expressions such as /.* to include all keyspaces.

Filter #

Type Filter
Required

The tables which you want to subscribe to change events from — in the given keyspace(s) and shard(s) contained in the provided VGtid — and any query predicates to use when filtering the rows for which change events will be generated.

VStreamFlags #

Cells #

Type string
Default ""

If specified, these cells (comma-separated list) are used when selecting stream source tablets. When no value is specified the vtgate will default to looking for source tablets within its own local cell.

CellPreference #

Type string
Default ""

If specified, this determines which cells to give preference to during tablet selection. By default, preferlocalwithalias is used in order to give preference to the caller's local cell and then any alias its cell belongs to. If onlyspecified is given, then only tablets within the specified Cells field value will be considered.

HeartbeatInterval #

Type unsigned integer
Default 0 (none)

How frequently, in seconds, to send heartbeat events to the client when there are no other events in the stream to send.

IncludeReshardJournalEvents #

Type bool
Default false

When enabled the vtgate will include reshard journal events in the stream along with all other events.

MinimizeSkew #

Type bool
Default false

When enabled the vtgate will keep the events in the stream roughly time aligned — it is aggregating streams coming from each of the shards involved — using the event timestamps to ensure the maximum time skew between the source tablet shard streams is kept under 10 minutes. When it detects skew between the source streams it will pause sending the client more events and allow the lagging shard(s) to catch up.

There is no strict ordering of events across shards and the client will need to examine the event timestamps.
StopOnReshard #

Type bool
Default false

When enabled the vtgate will send a reshard journal event to the client along with an EOF error in the VStreamReader.Recv response and stop sending any further events.

TabletOrder #

Type string
Default ""

This replaces the in_order hint (e.g. "in_order:REPLICA,PRIMARY") previously used to specify tablet type order during source tablet selection.

RPC Response #

The VStream gRPC returns a VStreamReader and a non-nil error if the stream could not be initialized. You would call the Recv method on that VStreamReader in a for loop and responses will be sent when available. Each response consisting of the following two parameters:

  • An array of VEvent objects — the new messages to process in the stream
  • An error — an error that, if non-nil, indicates the stream has been closed (EOF) or an error occurred

API Types #

Example Usage #

You can find a full example go client here.

Below is a snippet showing how to use the VStream API in go:

gconn, err := vtgateconn.Dial(ctx, grpcAddress)
if err != nil {
    t.Fatal(err)
}
defer gconn.Close()

// lastPK is id1=4
lastPK := sqltypes.Result{
    Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
    Rows:   [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
}
tableLastPK := []*binlogdatapb.TableLastPK{{
    TableName: "t1",
    Lastpk:    sqltypes.ResultToProto3(&lastPK),
}}

var shardGtids []*binlogdatapb.ShardGtid
var vgtid = &binlogdatapb.VGtid{}
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
    Keyspace: "ks",
    Shard:    "-80",
    Gtid:     "MySQL56/89f66ef2-863a-11ed-9bdf-3d270fd3f552:1-30219"
    TablePKs: tableLastPK,
})
shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
    Keyspace: "ks",
    Shard:    "80-",
    Gtid:     "MySQL56/2174b383-5441-11e8-b90a-c80aa9429562:1-29516,24da167-0c0c-11e8-8442-00059a3c7b00:1-19"
    TablePKs: tableLastPK,
})
vgtid.ShardGtids = shardGtids
filter := &binlogdatapb.Filter{
    Rules: []*binlogdatapb.Rule{{
        Match:  "t1",
        Filter: "select * from t1",
    }},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)

var evs []*binlogdatapb.VEvent
for {
    e, err := reader.Recv()
    ...

Copy All Tables From All Shards in the ks Keyspace #

Below is a snippet in Go that demonstrates how to copy from all shards by omitting ShardGtid.Shard:

vgtid := &binlogdatapb.VGtid{
  ShardGtids: []*binlogdatapb.ShardGtid{{
    Keyspace: "ks",
    Gtid: "",
  }},
}
filter := &binlogdatapb.Filter{
  Rules: []*binlogdatapb.Rule{{
    Match: "/.*",
  }},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)

Copy All Tables From All Shards in All Keyspaces #

Below is a snippet in Go that demonstrates how to copy from all keyspaces by specifying /.* as the value for ShardGtid.Keyspace:

vgtid := &binlogdatapb.VGtid{
  ShardGtids: []*binlogdatapb.ShardGtid{{
    Keyspace: "/.*",
    Gtid: "",
  }},
}
filter := &binlogdatapb.Filter{
  Rules: []*binlogdatapb.Rule{{
    Match: "/.*",
  }},
}
flags := &vtgatepb.VStreamFlags{}
reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)

Copying from all keyspaces can generate a significant amount of load and potentially impact production traffic. Therefore, please exercise caution when using regular expressions in production.

Debugging #

There is also an SQL interface that can be used for testing and debugging from a vtgate. Here's an example:

$ mysql --quick <vtgate params>

mysql> SET WORKLOAD=OLAP;

mysql> VSTREAM * FROM commerce.corder\G
*************** 1. row ***************
         op: +
   order_id: 1
customer_id: 1
        sku: NULL
      price: 10
************** 2. row ***************
         op: *
   order_id: 1
customer_id: 1
        sku: NULL
      price: 7
************** 3. row ***************
         op: -
   order_id: 1
customer_id: 1
        sku: NULL
      price: 7

Monitoring #

VTGates publish vstream metrics listed here.

More Reading #