Manage Schemas
This page only shows some frequently used operations.
For the latest and complete information about
Pulsar admin, including commands, flags, descriptions, and more, see Pulsar admin docs.For the latest and complete information about
REST API, including parameters, responses, samples, and more, see REST API doc.For the latest and complete information about
Java admin API, including classes, methods, descriptions, and more, see Java admin API doc.
Manage schema
Upload a schema
To upload (register) a new schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the upload subcommand.
pulsar-admin schemas upload --filename <schema-definition-file> <topic-name>
The schema-definition-file is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
Send a POST request to the endpoint documented here: POST /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_postSchema
Below is an example with CURL with a payload stored on the schema.json file, Pulsar broker running on localhost and the topic my-tenant/my-ns/my-topic:
curl -X POST -H 'Content-Type: application/json' -d @schema.json http://localhost:8080/admin/v2/schemas/my-tenant/my-ns/my-topic/schema
The post payload is in JSON format.
{
"type": "<schema-type>",
"schema": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
The method on PulsarAdmin client is:
void createSchema(String topic, PostSchemaPayload schemaPayload)
Here is an example of PostSchemaPayload:
PulsarAdmin admin = …;
PostSchemaPayload payload = new PostSchemaPayload();
payload.setType("INT8");
payload.setSchema("");
admin.createSchema("my-tenant/my-ns/my-topic", payload);
If the schema is a primitive schema, the schema field must be blank.
If the schema is a struct schema, this field must be a JSON string of the Avro schema definition.
The payload includes the following fields:
| Field | Description |
|---|---|
type | |
schema | The schema definition data, which is encoded in UTF 8 charset. |
properties | The additional properties associated with the schema. |
The following is an example for a JSON schema.
Example
{
"type": "JSON",
"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"com.foo\",\"fields\":[{\"name\":\"file1\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file2\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"file3\",\"type\":[\"string\",\"null\"],\"default\":\"dfdf\"}]}",
"properties": {}
}
Get the latest schema
To get the latest schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get subcommand.
pulsar-admin schemas get <topic-name>
Example output:
{
"version": 0,
"type": "String",
"timestamp": 0,
"data": "string",
"properties": {
"property1": "string",
"property2": "string"
}
}
Send a GET request to this endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_getSchema
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic)
Here is an example of SchemaInfo:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic");
Get a specific schema
To get a specific version of a schema, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the get subcommand.
pulsar-admin schemas get <topic-name> --version=<version>
Send a GET request to a schema endpoint: GET /admin/v2/schemas/:tenant/:namespace/:topic/schema/:version/SchemasResource_getSchema
Here is an example of a response, which is returned in JSON format.
{
"version": "<the-version-number-of-the-schema>",
"type": "<the-schema-type>",
"timestamp": "<the-creation-timestamp-of-the-version-of-the-schema>",
"data": "<an-utf8-encoded-string-of-schema-definition-data>",
"properties": {} // the properties associated with the schema
}
SchemaInfo createSchema(String topic, long version)
Here is an example of SchemaInfo:
PulsarAdmin admin = …;
SchemaInfo si = admin.getSchema("my-tenant/my-ns/my-topic", 1L);
Extract a schema
To extract (provide) a schema via a topic, use the following method.
- Admin CLI
Use the extract subcommand.
pulsar-admin schemas extract --classname <class-name> --jar <jar-path> --type <type-name>
Delete a schema
In any case, the delete action deletes all versions of a schema registered for a topic.
To delete a schema for a topic, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the delete subcommand.
pulsar-admin schemas delete <topic-name>
Send a DELETE request to a schema endpoint: DELETE /admin/v2/schemas/:tenant/:namespace/:topic/schema/SchemasResource_deleteSchema
Here is an example of a response returned in JSON format.
{
"version": "<the-latest-version-number-of-the-schema>",
}
void deleteSchema(String topic)
Here is an example of deleting a schema.
PulsarAdmin admin = …;
admin.deleteSchema("my-tenant/my-ns/my-topic");
Manage schema AutoUpdate
Enable schema AutoUpdate
To enable/enforce schema auto-update at the namespace level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “true”
}
Here is an example to enable schema auto-update for a tenant/namespace.
admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", true);
Disable schema AutoUpdate
When schema auto-update is disabled, you can only register a new schema.
To disable schema auto-update at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-is-allow-auto-update-schema subcommand.
bin/pulsar-admin namespaces set-is-allow-auto-update-schema --disable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/isAllowAutoUpdateSchema/Namespaces_setIsAllowAutoUpdateSchema
The post payload is in JSON format.
{
“isAllowAutoUpdateSchema”: “false”
}
Here is an example to enable schema auto-unpdate of a tenant/namespace.
admin.namespaces().setIsAllowAutoUpdateSchema("my-namspace", false);
Manage schema validation enforcement
Enable schema validation enforcement
To enforce schema validation enforcement at the cluster level, you can configure isSchemaValidationEnforced to true in the conf/broker.conf file.
To enable schema validation enforcement at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-schema-validation-enforce subcommand.
bin/pulsar-admin namespaces set-schema-validation-enforce --enable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced
The post payload is in JSON format.
{
“schemaValidationEnforced”: “true”
}
Here is an example to enable schema validation enforcement for a tenant/namespace.
admin.namespaces().setSchemaValidationEnforced("my-namspace", true);
Disable schema validation enforcement
To disable schema validation enforcement at the namespace level, you can use one of the following commands.
- Admin CLI
- REST API
- Java
Use the set-schema-validation-enforce subcommand.
bin/pulsar-admin namespaces set-schema-validation-enforce --disable tenant/namespace
Send a POST request to a namespace endpoint: POST /admin/v2/namespaces/:tenant/:namespace/schemaValidationEnforced/Namespaces_setSchemaValidationEnforced
The post payload is in JSON format.
{
“schemaValidationEnforced”: “false”
}
Here is an example to enable schema validation enforcement for a tenant/namespace.
admin.namespaces().setSchemaValidationEnforced("my-namspace", false);
Manage schema compatibility strategy
The schema compatibility check strategy configured at different levels has priority: topic level > namespace level > cluster level. In other words:
- If you set the strategy at both topic and namespace levels, the topic-level strategy is used.
- If you set the strategy at both namespace and cluster levels, the namespace-level strategy is used.
Set schema compatibility strategy
Set topic-level schema compatibility strategy
To set a schema compatibility check strategy at the topic level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin topicPolicies set-schema-compatibility-strategy command.
pulsar-admin topicPolicies set-schema-compatibility-strategy <strategy> <topicName>
Send a PUT request to this endpoint: PUT /admin/v2/topics/:tenant/:namespace/:topic/PersistentTopics_setSchemaCompatibilityStrategy
void setSchemaCompatibilityStrategy(String topic, SchemaCompatibilityStrategy strategy)
Here is an example of setting a schema compatibility check strategy at the topic level.
PulsarAdmin admin = …;
admin.topicPolicies().setSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
Set namespace-level schema compatibility strategy
To set schema compatibility check strategy at the namespace level, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin namespaces set-schema-compatibility-strategy command.
pulsar-admin namespaces set-schema-compatibility-strategy options
Send a PUT request to this endpoint: PUT /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy/Namespaces_setSchemaCompatibilityStrategy
Use the setSchemaCompatibilityStrategy method.
admin.namespaces().setSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);
Set cluster-level schema compatibility strategy
To set schema compatibility check strategy at the cluster level, set schemaCompatibilityStrategy in the conf/broker.conf file.
The following is an example:
schemaCompatibilityStrategy=ALWAYS_INCOMPATIBLE
Get schema compatibility strategy
Get topic-level schema compatibility strategy
To get the topic-level schema compatibility check strategy, you can use one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin topicPolicies get-schema-compatibility-strategy command.
pulsar-admin topicPolicies get-schema-compatibility-strategy <topicName>
Send a GET request to this endpoint: GET /admin/v2/topics/:tenant/:namespace/:topic/schemaCompatibilityStrategy/PersistentTopics_getSchemaCompatibilityStrategy
SchemaCompatibilityStrategy getSchemaCompatibilityStrategy(String topic, boolean applied)
Here is an example of getting the topic-level schema compatibility check strategy.
PulsarAdmin admin = …;
// get the current applied schema compatibility strategy
admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", true);
// only get the schema compatibility strategy from topic policies
admin.topicPolicies().getSchemaCompatibilityStrategy("my-tenant/my-ns/my-topic", false);
Get namespace-level schema compatibility strategy
You can get schema compatibility check strategy at namespace level using one of the following methods.
- Admin CLI
- REST API
- Java
Use the pulsar-admin namespaces get-schema-compatibility-strategy command.
pulsar-admin namespaces get-schema-compatibility-strategy options
Send a GET request to this endpoint: GET /admin/v2/namespaces/:tenant/:namespace/schemaCompatibilityStrategy/Namespaces_getSchemaCompatibilityStrategy
Use the getSchemaCompatibilityStrategy method.
admin.namespaces().getSchemaCompatibilityStrategy("test", SchemaCompatibilityStrategy.FULL);