Skip to content
清晨的一缕阳光
返回

Kafka Schema Registry 数据格式管理实战

Schema Registry 是 Kafka 生态系统中用于管理数据格式的核心组件。本文将深入探讨 Schema Registry 的工作原理、数据格式演化和实战应用。

一、Schema Registry 基础

1.1 为什么需要 Schema Registry?

问题场景

graph TB
    subgraph 问题
        P1[生产者 A: 发送 JSON]
        P2[生产者 B: 发送不同格式 JSON]
        C1[消费者:无法解析]
    end
    
    P1 --> C1
    P2 --> C1
    
    style C1 fill:#f96,stroke:#333

解决方案

graph TB
    subgraph 解决方案
        P1[生产者 A]
        P2[生产者 B]
        SR[Schema Registry]
        C1[消费者]
    end
    
    P1 -->|注册 Schema | SR
    P2 -->|注册 Schema | SR
    SR -->|获取 Schema | C1
    P1 -->|发送数据 | C1
    P2 -->|发送数据 | C1

1.2 核心概念

概念说明
Schema数据格式定义(Avro/Protobuf/JSON Schema)
SubjectSchema 的逻辑分组(通常按 Topic 命名)
VersionSchema 版本号,自动递增
Compatibility兼容性级别(Backward/Forward/Full)

1.3 架构

graph TB
    subgraph Producer
        P1[Producer 1]
        P2[Producer 2]
    end
    
    subgraph Schema Registry
        SR[Schema Registry Cluster]
        K1[Kafka _schemas Topic]
    end
    
    subgraph Consumer
        C1[Consumer 1]
        C2[Consumer 2]
    end
    
    P1 -->|注册 Schema | SR
    P2 -->|注册 Schema | SR
    SR -->|存储 | K1
    SR -->|提供 Schema | C1
    SR -->|提供 Schema | C2

二、数据格式对比

2.1 Avro

Schema 定义

{
  "type": "record",
  "name": "Order",
  "namespace": "com.example.orders",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "user_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "create_time", "type": "long"}
  ]
}

特点

2.2 Protobuf

Schema 定义

syntax = "proto3";
package orders;

message Order {
  string order_id = 1;
  string user_id = 2;
  double amount = 3;
  int64 create_time = 4;
}

特点

2.3 JSON Schema

Schema 定义

{
  "type": "object",
  "properties": {
    "order_id": {"type": "string"},
    "user_id": {"type": "string"},
    "amount": {"type": "number"},
    "create_time": {"type": "integer"}
  },
  "required": ["order_id", "user_id", "amount"]
}

特点

2.4 格式对比

特性AvroProtobufJSON Schema
格式二进制二进制JSON
大小最小
性能最高
可读性
兼容性优秀优秀良好
生态Kafka 原生多语言Web 友好

三、部署配置

3.1 安装部署

# 1. 下载 Confluent Platform
wget https://packages.confluent.io/archive/7.4/confluent-7.4.0.tar.gz
tar -xzf confluent-7.4.0.tar.gz

# 2. 配置 Schema Registry
cat > $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties << EOF
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
EOF

# 3. 启动 Schema Registry
$CONFLUENT_HOME/bin/schema-registry-start \
  $CONFLUENT_HOME/etc/schema-registry/schema-registry.properties &

3.2 Docker 部署

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
  
  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

四、REST API

4.1 注册 Schema

# 注册新 Schema
curl -X POST http://localhost:8081/subjects/order-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"}]}"
  }'

# 响应
{
  "subject": "order-value",
  "version": 1,
  "id": 1,
  "schema": "..."
}

4.2 查询 Schema

# 获取最新版本
curl -X GET http://localhost:8081/subjects/order-value/versions/latest

# 获取指定版本
curl -X GET http://localhost:8081/subjects/order-value/versions/1

# 获取 Schema by ID
curl -X GET http://localhost:8081/schemas/ids/1

4.3 删除 Schema

# 删除指定版本
curl -X DELETE http://localhost:8081/subjects/order-value/versions/1

# 删除所有版本
curl -X DELETE http://localhost:8081/subjects/order-value

4.4 兼容性检查

# 测试兼容性
curl -X POST http://localhost:8081/compatibility/subjects/order-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"user_id\",\"type\":\"string\"}]}"
  }'

# 响应
{"is_compatible":true}

五、兼容性

5.1 兼容性级别

级别说明适用场景
BACKWARD新 Schema 可读旧数据默认推荐
BACKWARD_TRANSITIVE所有历史版本兼容严格场景
FORWARD旧 Schema 可读新数据少见
FORWARD_TRANSITIVE所有未来版本兼容少见
FULL双向兼容最严格
NONE不检查开发环境

5.2 兼容性配置

# 全局配置
curl -X PUT http://localhost:8081/config \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "BACKWARD"}'

# Topic 级别配置
curl -X PUT http://localhost:8081/config/order-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

5.3 兼容性规则

添加字段(兼容)

// 旧 Schema
{"name": "Order", "fields": [
  {"name": "order_id", "type": "string"}
]}

// 新 Schema(添加可选字段)
{"name": "Order", "fields": [
  {"name": "order_id", "type": "string"},
  {"name": "user_id", "type": ["null", "string"], "default": null}
]}

删除字段(不兼容)

// 旧 Schema
{"name": "Order", "fields": [
  {"name": "order_id", "type": "string"},
  {"name": "user_id", "type": "string"}
]}

// 新 Schema(删除字段)- 不兼容
{"name": "Order", "fields": [
  {"name": "order_id", "type": "string"}
]}

修改类型(不兼容)

// 旧 Schema
{"name": "amount", "type": "int"}

// 新 Schema(修改类型)- 不兼容
{"name": "amount", "type": "string"}

六、客户端使用

6.1 Java Producer

// Maven 依赖
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>7.4.0</version>
</dependency>

// Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
props.put("auto.register.schemas", true);

// 创建 Producer
KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props);

// 构建消息
Schema schema = SchemaBuilder.record("Order")
    .fields()
    .requiredString("order_id")
    .requiredDouble("amount")
    .endRecord();

GenericData.Record record = new GenericData.Record(schema);
record.put("order_id", "order-123");
record.put("amount", 99.99);

// 发送
ProducerRecord<String, GenericRecord> msg = 
    new ProducerRecord<>("order-topic", "order-123", record);
producer.send(msg);

6.2 Java Consumer

// Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", "http://localhost:8081");

// 创建 Consumer
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("order-topic"));

// 消费
while (true) {
    ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, GenericRecord> record : records) {
        GenericData.Record value = record.value();
        String orderId = value.get("order_id").toString();
        Double amount = (Double) value.get("amount");
        
        System.out.printf("order_id=%s, amount=%.2f%n", orderId, amount);
    }
}

6.3 Python 客户端

# 安装
# pip install confluent-kafka

from confluent_kafka import Producer, Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer

# Schema Registry 客户端
schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})

# Schema 定义
order_schema_str = '''
{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}
'''

# Producer
order_serializer = AvroSerializer(
    schema_registry_client, 
    order_schema_str
)

producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'value.serializer': order_serializer
})

producer.produce(
    topic='order-topic',
    value={'order_id': 'order-123', 'amount': 99.99}
)
producer.flush()

# Consumer
order_deserializer = AvroDeserializer(
    schema_registry_client,
    order_schema_str
)

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-consumer-group',
    'auto.offset.reset': 'earliest',
    'value.deserializer': order_deserializer
})

consumer.subscribe(['order-topic'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    
    order = msg.value()
    print(f"order_id={order['order_id']}, amount={order['amount']}")

七、实战案例

7.1 订单系统 Schema 演化

版本 1

{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"}
  ]
}

版本 2(添加用户 ID)

{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "user_id", "type": ["null", "string"], "default": null}
  ]
}

版本 3(添加订单状态)

{
  "type": "record",
  "name": "Order",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "amount", "type": "double"},
    {"name": "user_id", "type": ["null", "string"], "default": null},
    {"name": "status", "type": {"type": "enum", "name": "OrderStatus", "symbols": ["CREATED", "PAID", "SHIPPED"]}, "default": "CREATED"}
  ]
}

7.2 多 Topic Schema 管理

#!/bin/bash
# 批量注册 Schema 脚本

SCHEMA_REGISTRY="http://localhost:8081"
SCHEMAS_DIR="./schemas"

# Topic 列表
declare -A topics=(
    ["order-value"]="Order"
    ["user-value"]="User"
    ["payment-value"]="Payment"
)

for topic in "${!topics[@]}"; do
    entity=${topics[$topic]}
    schema_file="$SCHEMAS_DIR/${entity}.avsc"
    
    echo "注册 $topic Schema..."
    
    curl -X POST "$SCHEMA_REGISTRY/subjects/${topic}/versions" \
      -H "Content-Type: application/vnd.schemaregistry.v1+json" \
      -d "{\"schema\": \"$(cat $schema_file | sed 's/"/\\"/g' | tr -d '\n')\"}"
    
    echo ""
done

echo "Schema 注册完成"

7.3 Schema 版本回滚

#!/bin/bash
# Schema 版本回滚脚本

SCHEMA_REGISTRY="http://localhost:8081"
SUBJECT=$1
TARGET_VERSION=$2

# 获取目标版本 Schema
schema=$(curl -s -X GET "$SCHEMA_REGISTRY/subjects/$SUBJECT/versions/$TARGET_VERSION" | jq -r '.schema')

# 注册为新版本
curl -X POST "$SCHEMA_REGISTRY/subjects/$SUBJECT/versions" \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d "{\"schema\": $schema}"

echo "Schema 已回滚到版本 $TARGET_VERSION"

八、监控运维

8.1 监控指标

指标说明
schema-registry:jetty-metrics:connections连接数
schema-registry:jetty-metrics:requests请求数
schema-registry:jetty-metrics:errors错误数
schema-registry:storage:adapter-kafka:latencyKafka 延迟

8.2 备份恢复

#!/bin/bash
# Schema 备份脚本

SCHEMA_REGISTRY="http://localhost:8081"
BACKUP_DIR="/backup/schema-registry"
DATE=$(date +%Y%m%d)

# 获取所有 Subject
subjects=$(curl -s -X GET "$SCHEMA_REGISTRY/subjects" | jq -r '.[]')

# 备份每个 Subject
for subject in $subjects; do
    echo "备份 $subject..."
    
    # 获取所有版本
    versions=$(curl -s -X GET "$SCHEMA_REGISTRY/subjects/$subject/versions" | jq -r '.[]')
    
    for version in $versions; do
        curl -s -X GET "$SCHEMA_REGISTRY/subjects/$subject/versions/$version" > \
            "$BACKUP_DIR/${subject}_v${version}_$DATE.json"
    done
done

echo "备份完成"

总结

Schema Registry 的核心要点:

  1. 数据格式:Avro、Protobuf、JSON Schema
  2. 兼容性:BACKWARD、FORWARD、FULL
  3. REST API:注册、查询、删除 Schema
  4. 客户端:Java、Python 等语言支持
  5. 运维:监控、备份、版本管理

核心要点

参考资料


分享这篇文章到:

上一篇文章
Spring Kafka 集成开发实战指南
下一篇文章
Redis 缓存设计与优化