Schema Registry 是 Kafka 生态系统中用于管理数据格式的核心组件。本文将深入探讨 Schema Registry 的工作原理、数据格式演化和实战应用。
一、Schema Registry 基础
1.1 为什么需要 Schema Registry?
问题场景:
graph TB
subgraph 问题
P1[生产者 A: 发送 JSON]
P2[生产者 B: 发送不同格式 JSON]
C1[消费者:无法解析]
end
P1 --> C1
P2 --> C1
style C1 fill:#f96,stroke:#333
解决方案:
graph TB
subgraph 解决方案
P1[生产者 A]
P2[生产者 B]
SR[Schema Registry]
C1[消费者]
end
P1 -->|注册 Schema | SR
P2 -->|注册 Schema | SR
SR -->|获取 Schema | C1
P1 -->|发送数据 | C1
P2 -->|发送数据 | C1
1.2 核心概念
| 概念 | 说明 |
|---|---|
| Schema | 数据格式定义(Avro/Protobuf/JSON Schema) |
| Subject | Schema 的逻辑分组(通常按 Topic 命名) |
| Version | Schema 版本号,自动递增 |
| Compatibility | 兼容性级别(Backward/Forward/Full) |
1.3 架构
graph TB
subgraph Producer
P1[Producer 1]
P2[Producer 2]
end
subgraph Schema Registry
SR[Schema Registry Cluster]
K1[Kafka _schemas Topic]
end
subgraph Consumer
C1[Consumer 1]
C2[Consumer 2]
end
P1 -->|注册 Schema | SR
P2 -->|注册 Schema | SR
SR -->|存储 | K1
SR -->|提供 Schema | C1
SR -->|提供 Schema | C2
二、数据格式对比
2.1 Avro
Schema 定义:
{
"type": "record",
"name": "Order",
"namespace": "com.example.orders",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "create_time", "type": "long"}
]
}
特点:
- 二进制格式,紧凑高效
- 强类型,需要 Schema
- 支持丰富的数据类型
- 兼容性好
2.2 Protobuf
Schema 定义:
syntax = "proto3";
package orders;
message Order {
string order_id = 1;
string user_id = 2;
double amount = 3;
int64 create_time = 4;
}
特点:
- Google 开发
- 二进制格式
- 多语言支持好
- 性能优秀
2.3 JSON Schema
Schema 定义:
{
"type": "object",
"properties": {
"order_id": {"type": "string"},
"user_id": {"type": "string"},
"amount": {"type": "number"},
"create_time": {"type": "integer"}
},
"required": ["order_id", "user_id", "amount"]
}
特点:
- 基于 JSON
- 人类可读
- 兼容性好
- 体积较大
2.4 格式对比
| 特性 | Avro | Protobuf | JSON Schema |
|---|---|---|---|
| 格式 | 二进制 | 二进制 | JSON |
| 大小 | 小 | 最小 | 大 |
| 性能 | 高 | 最高 | 中 |
| 可读性 | 低 | 低 | 高 |
| 兼容性 | 优秀 | 优秀 | 良好 |
| 生态 | Kafka 原生 | 多语言 | Web 友好 |
三、部署配置
3.1 安装部署
# 1. 下载 Confluent Platform
wget https://packages.confluent.io/archive/7.4/confluent-7.4.0.tar.gz
tar -xzf confluent-7.4.0.tar.gz
# 2. 配置 Schema Registry
cat > $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties << EOF
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
EOF
# 3. 启动 Schema Registry
$CONFLUENT_HOME/bin/schema-registry-start \
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties &
3.2 Docker 部署
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
四、REST API
4.1 注册 Schema
# 注册新 Schema
curl -X POST http://localhost:8081/subjects/order-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
}'
# 响应
{
"subject": "order-value",
"version": 1,
"id": 1,
"schema": "..."
}
4.2 查询 Schema
# 获取最新版本
curl -X GET http://localhost:8081/subjects/order-value/versions/latest
# 获取指定版本
curl -X GET http://localhost:8081/subjects/order-value/versions/1
# 获取 Schema by ID
curl -X GET http://localhost:8081/schemas/ids/1
4.3 删除 Schema
# 删除指定版本
curl -X DELETE http://localhost:8081/subjects/order-value/versions/1
# 删除所有版本
curl -X DELETE http://localhost:8081/subjects/order-value
4.4 兼容性检查
# 测试兼容性
curl -X POST http://localhost:8081/compatibility/subjects/order-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"user_id\",\"type\":\"string\"}]}"
}'
# 响应
{"is_compatible":true}
五、兼容性
5.1 兼容性级别
| 级别 | 说明 | 适用场景 |
|---|---|---|
| BACKWARD | 新 Schema 可读旧数据 | 默认推荐 |
| BACKWARD_TRANSITIVE | 所有历史版本兼容 | 严格场景 |
| FORWARD | 旧 Schema 可读新数据 | 少见 |
| FORWARD_TRANSITIVE | 所有未来版本兼容 | 少见 |
| FULL | 双向兼容 | 最严格 |
| NONE | 不检查 | 开发环境 |
5.2 兼容性配置
# 全局配置
curl -X PUT http://localhost:8081/config \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'
# Topic 级别配置
curl -X PUT http://localhost:8081/config/order-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'
5.3 兼容性规则
添加字段(兼容):
// 旧 Schema
{"name": "Order", "fields": [
{"name": "order_id", "type": "string"}
]}
// 新 Schema(添加可选字段)
{"name": "Order", "fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": ["null", "string"], "default": null}
]}
删除字段(不兼容):
// 旧 Schema
{"name": "Order", "fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "string"}
]}
// 新 Schema(删除字段)- 不兼容
{"name": "Order", "fields": [
{"name": "order_id", "type": "string"}
]}
修改类型(不兼容):
// 旧 Schema
{"name": "amount", "type": "int"}
// 新 Schema(修改类型)- 不兼容
{"name": "amount", "type": "string"}
六、客户端使用
6.1 Java Producer
// Maven 依赖
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.4.0</version>
</dependency>
// Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", true);
// 创建 Producer
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);
// 构建消息
Schema schema = SchemaBuilder.record("Order")
.fields()
.requiredString("order_id")
.requiredDouble("amount")
.endRecord();
GenericData.Record record = new GenericData.Record(schema);
record.put("order_id", "order-123");
record.put("amount", 99.99);
// 发送
ProducerRecord<String, GenericRecord> msg =
new ProducerRecord<>("order-topic", "order-123", record);
producer.send(msg);
6.2 Java Consumer
// Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");
// 创建 Consumer
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("order-topic"));
// 消费
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericData.Record value = record.value();
String orderId = value.get("order_id").toString();
Double amount = (Double) value.get("amount");
System.out.printf("order_id=%s, amount=%.2f%n", orderId, amount);
}
}
6.3 Python 客户端
# 安装
# pip install confluent-kafka
from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
# Schema Registry 客户端
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
# Schema 定义
order_schema_str = '''
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
'''
# Producer
order_serializer = AvroSerializer(
schema_registry_client,
order_schema_str
)
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'value.serializer': order_serializer
})
producer.produce(
topic='order-topic',
value={'order_id': 'order-123', 'amount': 99.99}
)
producer.flush()
# Consumer
order_deserializer = AvroDeserializer(
schema_registry_client,
order_schema_str
)
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-consumer-group',
'auto.offset.reset': 'earliest',
'value.deserializer': order_deserializer
})
consumer.subscribe(['order-topic'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
order = msg.value()
print(f"order_id={order['order_id']}, amount={order['amount']}")
七、实战案例
7.1 订单系统 Schema 演化
版本 1:
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"}
]
}
版本 2(添加用户 ID):
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": ["null", "string"], "default": null}
]
}
版本 3(添加订单状态):
{
"type": "record",
"name": "Order",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "user_id", "type": ["null", "string"], "default": null},
{"name": "status", "type": {"type": "enum", "name": "OrderStatus", "symbols": ["CREATED", "PAID", "SHIPPED"]}, "default": "CREATED"}
]
}
7.2 多 Topic Schema 管理
#!/bin/bash
# 批量注册 Schema 脚本
SCHEMA_REGISTRY="http://localhost:8081"
SCHEMAS_DIR="./schemas"
# Topic 列表
declare -A topics=(
["order-value"]="Order"
["user-value"]="User"
["payment-value"]="Payment"
)
for topic in "${!topics[@]}"; do
entity=${topics[$topic]}
schema_file="$SCHEMAS_DIR/${entity}.avsc"
echo "注册 $topic Schema..."
curl -X POST "$SCHEMA_REGISTRY/subjects/${topic}/versions" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": \"$(cat $schema_file | sed 's/"/\\"/g' | tr -d '\n')\"}"
echo ""
done
echo "Schema 注册完成"
7.3 Schema 版本回滚
#!/bin/bash
# Schema 版本回滚脚本
SCHEMA_REGISTRY="http://localhost:8081"
SUBJECT=$1
TARGET_VERSION=$2
# 获取目标版本 Schema
schema=$(curl -s -X GET "$SCHEMA_REGISTRY/subjects/$SUBJECT/versions/$TARGET_VERSION" | jq -r '.schema')
# 注册为新版本
curl -X POST "$SCHEMA_REGISTRY/subjects/$SUBJECT/versions" \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d "{\"schema\": $schema}"
echo "Schema 已回滚到版本 $TARGET_VERSION"
八、监控运维
8.1 监控指标
| 指标 | 说明 |
|---|---|
schema-registry:jetty-metrics:connections | 连接数 |
schema-registry:jetty-metrics:requests | 请求数 |
schema-registry:jetty-metrics:errors | 错误数 |
schema-registry:storage:adapter-kafka:latency | Kafka 延迟 |
8.2 备份恢复
#!/bin/bash
# Schema 备份脚本
SCHEMA_REGISTRY="http://localhost:8081"
BACKUP_DIR="/backup/schema-registry"
DATE=$(date +%Y%m%d)
# 获取所有 Subject
subjects=$(curl -s -X GET "$SCHEMA_REGISTRY/subjects" | jq -r '.[]')
# 备份每个 Subject
for subject in $subjects; do
echo "备份 $subject..."
# 获取所有版本
versions=$(curl -s -X GET "$SCHEMA_REGISTRY/subjects/$subject/versions" | jq -r '.[]')
for version in $versions; do
curl -s -X GET "$SCHEMA_REGISTRY/subjects/$subject/versions/$version" > \
"$BACKUP_DIR/${subject}_v${version}_$DATE.json"
done
done
echo "备份完成"
总结
Schema Registry 的核心要点:
- 数据格式:Avro、Protobuf、JSON Schema
- 兼容性:BACKWARD、FORWARD、FULL
- REST API:注册、查询、删除 Schema
- 客户端:Java、Python 等语言支持
- 运维:监控、备份、版本管理
核心要点:
- 使用 Schema Registry 管理数据格式
- 选择合适的兼容性级别
- 遵循 Schema 演化规则
- 定期备份 Schema 数据
参考资料
- Schema Registry 官方文档
- Avro 官方文档
- 《Kafka 权威指南》第 6 章