Kafka Connect REST Interface for Confluent Platform
适用于 Confluent Platform 的 Kafka Connect REST 接口

Since Kafka Connect is intended to be run as a service, it also supports a REST API for managing connectors. By default, this service runs on port 8083. When executed in distributed mode, the REST API is the primary interface to the cluster. You can make requests to any cluster member—the REST API forwards requests if required.
由于 Kafka Connect 旨在作为服务运行,因此它还支持用于管理连接器的 REST API。默认情况下,此服务在端口 8083 上运行。在分布式模式下执行时,REST API 是群集的主要接口。您可以向任何集群成员发出请求 - 如果需要,REST API 会转发请求。

Although you can use the standalone mode by submitting a connector on the command line, it also runs the REST interface. This is useful for getting status information, adding and removing connectors without stopping the process, and so forth.
尽管您可以通过在命令行上提交连接器来使用独立模式,但它也运行 REST 接口。这对于获取状态信息、在不停止进程的情况下添加和删除连接器等非常有用。

You can learn more about the REST API in the Kafka Connect Rest API module of the free Kafka Connect 101 course.
您可以在免费 Kafka Connect 101 课程的 Kafka Connect Rest API 模块中了解有关 REST API 的更多信息。

Currently the top level resources are connector and connector-plugins. The sub-resources for connector lists configuration settings and tasks. The sub-resource for connector-plugins provides configuration validation and recommendation.
目前,顶级资源是 connectorconnector-plugins 。用于 connector 列出配置设置和任务的子资源。的 connector-plugins 子资源提供配置验证和建议。

Note that if you try to modify, update or delete a resource under connector which may require the request to be forwarded to the leader, Connect will return HTTP 409 while the worker group rebalance is in process as the leader may change during rebalance.
请注意,如果您尝试修改、更新或删除可能需要将请求转发给领导者的资源 connector ,则 Connect HTTP 409 将在工作组重新平衡过程中返回,因为领导者可能会在重新平衡期间发生变化。

Tip

For common activities that you can do using the REST API and curl, see Common REST examples.
有关可以使用 REST API 和 curl 执行的常见活动,请参阅常见 REST 示例。

Content Types 内容类型

Currently the REST API only supports application/json as both the request and response entity content type. Your requests should specify the expected content type of the response using the HTTP Accept header:
目前,REST API 仅支持 application/json 作为请求和响应实体内容类型。您的请求应使用 HTTP Accept 标头指定响应的预期内容类型:

Accept: application/json
Copy

Your requests should also specify the content type of the request entity (if one is included) using the Content-Type header:
您的请求还应使用 Content-Type 标头指定请求实体的内容类型(如果包含):

Content-Type: application/json
Copy

Log levels 日志级别

You can check log levels and change log levels using Connect API endpoints. For details, see Changing log levels using the Connect API.
您可以使用 Connect API 终端节点检查日志级别和更改日志级别。有关详细信息,请参阅使用 Connect API 更改日志级别。

Status and Errors 状态和错误

The REST API will return standards-compliant HTTP status. Clients should check the HTTP status, especially before attempting to parse and use response entities. Currently the API does not use redirects (statuses in the 300 range), but the use of these codes is reserved for future use so clients should handle them.
REST API 将返回符合标准的 HTTP 状态。客户端应检查 HTTP 状态,尤其是在尝试分析和使用响应实体之前。目前,API 不使用重定向(300 范围内的状态),但这些代码的使用保留供将来使用,因此客户端应处理它们。

When possible, all endpoints will use a standard error message format for all errors (HTTP 400 or HTTP 500 range). For example, a request entity that omits a required field may generate the following response:
如果可能,所有终结点都将对所有错误( HTTP 400HTTP 500 范围)使用标准错误消息格式。例如,省略必填字段的请求实体可能会生成以下响应:

HTTP/1.1 422 Unprocessable Entity
Content-Type: application/json

{
    "error_code": 422,
    "message": "config may not be empty"
}
Copy

Connect Cluster 连接集群

GET /

Top-level (root) request that gets the version of the Connect worker that serves the REST request, the git commit ID of the source code, and the Kafka cluster ID that the worker is connected to.
顶级 (root) 请求,用于获取为 REST 请求提供服务的 Connect 工作线程的版本、源代码的 git 提交 ID 以及工作线程连接到的 Kafka 集群 ID。

Response JSON Object: 响应 JSON 对象:
 
  • version (string) – Connect worker version
    version (string) – 连接工作线程版本
  • commit ID (string) – git commit ID
    commit ID (string) – git commit ID
  • cluster ID (string) – Kafka cluster ID
    cluster ID (string) – Kafka 集群 ID

Example request: 示例请求:

GET / HTTP/1.1
Host: connect.example.com
Accept: application/json
Copy

Example response: 响应示例:

HTTP/1.1 200 OK
Content-Type: application/json

{
  "version":"5.5.0",
  "commit":"e5741b90cde98052",
  "kafka_cluster_id":"I4ZmrWqfT2e-upky_4fdPA"
}
Copy

Connectors 连接

GET /connectors

Get a list of active connectors
获取活动连接器的列表

Response JSON Object: 响应 JSON 对象:
 
  • connectors (array) – List of connector names
    connectors (array) – 连接器名称列表

Example request: 示例请求:

GET /connectors HTTP/1.1
Host: connect.example.com
Accept: application/json
Copy

Example response: 响应示例:

HTTP/1.1 200 OK
Content-Type: application/json

["my-jdbc-source", "my-hdfs-sink"]
Copy

Query parameters: 查询参数:

Name 名字 Data type 数据类型 Required / Optional 必需/可选 Description 描述
?expand=status Map Optional 自选 Retrieves additional state information for each of the connectors returned in the API call. The endpoint also returns the status of each of the connectors and its tasks as shown in the ?expand=status example below.
检索 API 调用中返回的每个连接器的其他状态信息。终结点还返回每个连接器及其任务的状态,如下面的 ?expand=status 示例所示。
?expand=info Map Optional 自选 Returns metadata of each of the connectors such as the configuration, task information, and type of connector as in ?expand=info example below.
返回每个连接器的元数据,例如连接器的配置、任务信息和类型,如下面的 ?expand=info 示例所示。

?expand=status example ?expand=status 示例

 {
        "FileStreamSinkConnectorConnector_0": {
            "status": {
            "name": "FileStreamSinkConnectorConnector_0",
            "connector": {
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
            },
            "tasks": [
                {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
                }
            ],
            "type": "sink"
            }
        },
        "DatagenConnectorConnector_0": {
            "status": {
            "name": "DatagenConnectorConnector_0",
            "connector": {
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
            },
            "tasks": [
                {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "10.0.0.162:8083"
                }
            ],
            "type": "source"
            }
        }
}
Copy

?expand=info example ?expand=info 示例

{
     "FileStreamSinkConnectorConnector_0": {
           "info": {
           "name": "FileStreamSinkConnectorConnector_0",
           "config": {
               "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
               "file": "/Users/smogili/file.txt",
               "tasks.max": "1",
               "topics": "datagen",
               "name": "FileStreamSinkConnectorConnector_0"
           },
           "tasks": [
               {
               "connector": "FileStreamSinkConnectorConnector_0",
               "task": 0
               }
           ],
           "type": "sink"
           }
       },
       "DatagenConnectorConnector_0": {
           "info": {
           "name": "DatagenConnectorConnector_0",
           "config": {
               "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
               "quickstart": "clickstream",
               "tasks.max": "1",
               "name": "DatagenConnectorConnector_0",
               "kafka.topic": "datagen"
           },
           "tasks": [
               {
               "connector": "DatagenConnectorConnector_0",
               "task": 0
               }
           ],
           "type": "source"
           }
       }
  }
Copy

Users can also combine the status and info expands by appending both to the endpoint (for example, http://localhost:8083/connectors?expand=status&expand=info). This will return the metadata for the connectors and the current status of the connector and its tasks as shown in the following example:
用户还可以通过将状态和信息扩展附加到端点(例如,)来组合 状态和信息展开 (例如, http://localhost:8083/connectors?expand=status&expand=info )。这将返回连接器的元数据以及连接器及其任务的当前状态,如以下示例所示:

Note 注意

Without using ?expand=status and/or ?expand=info, the connector’s endpoint will only return a list of connector names that are launched.
如果不使用 ?expand=status 和/或 ?expand=info ,连接器的终结点将仅返回已启动的连接器名称列表。

{
     "FileStreamSinkConnectorConnector_0": {
         "info": {
         "name": "FileStreamSinkConnectorConnector_0",
         "config": {
             "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
             "file": "/Users/smogili/file.txt",
             "tasks.max": "1",
             "topics": "datagen",
             "name": "FileStreamSinkConnectorConnector_0"
         },
         "tasks": [
             {
             "connector": "FileStreamSinkConnectorConnector_0",
             "task": 0
             }
         ],
         "type": "sink"
         },
         "status": {
         "name": "FileStreamSinkConnectorConnector_0",
         "connector": {
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
         },
         "tasks": [
             {
             "id": 0,
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
             }
         ],
         "type": "sink"
         }
     },
     "DatagenConnectorConnector_0": {
         "info": {
         "name": "DatagenConnectorConnector_0",
         "config": {
             "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
             "quickstart": "clickstream",
             "tasks.max": "1",
             "name": "DatagenConnectorConnector_0",
             "kafka.topic": "datagen"
         },
         "tasks": [
             {
             "connector": "DatagenConnectorConnector_0",
             "task": 0
             }
         ],
         "type": "source"
         },
         "status": {
         "name": "DatagenConnectorConnector_0",
         "connector": {
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
         },
         "tasks": [
             {
             "id": 0,
             "state": "RUNNING",
             "worker_id": "10.0.0.162:8083"
             }
         ],
         "type": "source"
         }
     }
     }
Copy
POST /connectors

Create a new connector, returning the current connector info if successful. Return 409 (Conflict) if rebalance is in process, or if the connector already exists.

Request JSON Object:
 
  • name (string) – Name of the connector to create
  • config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object:
 
  • name (string) – Name of the created connector
  • config (map) – Configuration parameters for the connector
  • tasks (array) – List of active tasks generated by the connector
  • tasks[i].connector (string) – The name of the connector the task belongs to
  • tasks[i].task (int) – Task ID within the connector

Example request:

POST /connectors HTTP/1.1
Host: connect.example.com
Content-Type: application/json
Accept: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    }
}
Copy

Example response:

HTTP/1.1 201 Created
Content-Type: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    },
    "tasks": [
        { "connector": "hdfs-sink-connector", "task": 1 },
        { "connector": "hdfs-sink-connector", "task": 2 },
        { "connector": "hdfs-sink-connector", "task": 3 }
    ]
}
Copy
GET /connectors/(string: name)

Get information about the connector.

Response JSON Object:
 
  • name (string) – Name of the created connector
  • config (map) – Configuration parameters for the connector
  • tasks (array) – List of active tasks generated by the connector
  • tasks[i].connector (string) – The name of the connector the task belongs to
  • tasks[i].task (int) – Task ID within the connector

Example request:

GET /connectors/hdfs-sink-connector HTTP/1.1
Host: connect.example.com
Accept: application/json
Copy

Example response: 响应示例:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    },
    "tasks": [
        { "connector": "hdfs-sink-connector", "task": 1 },
        { "connector": "hdfs-sink-connector", "task": 2 },
        { "connector": "hdfs-sink-connector", "task": 3 }
    ]
}
Copy
GET /connectors/(string: name)/config
GET /connectors/ (字符串:名称) /config

Get the configuration for the connector.
获取连接器的配置。

Response JSON Object: 响应 JSON 对象:
 
  • config (map) – Configuration parameters for the connector

Example request:

GET /connectors/hdfs-sink-connector/config HTTP/1.1
Host: connect.example.com
Accept: application/json
Copy

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "hdfs.url": "hdfs://fakehost:9000",
    "hadoop.conf.dir": "/opt/hadoop/conf",
    "hadoop.home": "/opt/hadoop",
    "flush.size": "100",
    "rotate.interval.ms": "1000"
}
Copy
PUT /connectors/(string: name)/config

Create a new connector using the given configuration, or update the configuration for an existing connector. Returns information about the connector after the change has been made. Return 409 (Conflict) if rebalance is in process.

Note

The payload is not wrapped in {"config": {}} as in the POST request. The config is directly provided.

Request JSON Object:
 
  • config (map) – Configuration parameters for the connector. All values should be strings.
Response JSON Object:
 
  • name (string) – Name of the created connector
  • config (map) – Configuration parameters for the connector
  • tasks (array) – List of active tasks generated by the connector
  • tasks[i].connector (string) – The name of the connector the task belongs to
  • tasks[i].task (int) – Task ID within the connector

Example request:

PUT /connectors/hdfs-sink-connector/config HTTP/1.1
Host: connect.example.com
Accept: application/json

{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "10",
    "topics": "test-topic",
    "hdfs.url": "hdfs://fakehost:9000",
    "hadoop.conf.dir": "/opt/hadoop/conf",
    "hadoop.home": "/opt/hadoop",
    "flush.size": "100",
    "rotate.interval.ms": "1000"
}
Copy

Example response:

HTTP/1.1 201 Created
Content-Type: application/json

{
    "name": "hdfs-sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "tasks.max": "10",
        "topics": "test-topic",
        "hdfs.url": "hdfs://fakehost:9000",
        "hadoop.conf.dir": "/opt/hadoop/conf",
        "hadoop.home": "/opt/hadoop",
        "flush.size": "100",
        "rotate.interval.ms": "1000"
    },
    "tasks": [
        { "connector": "hdfs-sink-connector", "task": 1 },
        { "connector": "hdfs-sink-connector", "task": 2 },
        { "connector": "hdfs-sink-connector", "task": 3 }
    ]
}
Copy

Note that in this example the return status indicates that the connector was Created. In the case of a configuration update the status would have been 200 OK.

GET /connectors/(string: name)/status

Gets the current status of the connector, including:

  • Whether it is running or restarting, or if it has failed or paused
  • Which worker it is assigned to
  • Error information if it has failed
  • The state of all its tasks
Response JSON Object:
 
  • name (string) – The name of the connector
  • connector (map) – The map containing connector status
  • tasks[i] (map) – The map containing the task status

Example request:

GET /connectors/hdfs-sink-connector/status HTTP/1.1
Host: connect.example.com
Copy

Example response:

HTTP/1.1 200 OK

{
    "name": "hdfs-sink-connector",
    "connector": {
        "state": "RUNNING",
        "worker_id": "fakehost:8083"
    },
    "tasks":
    [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "fakehost:8083"
        },
        {
            "id": 1,
            "state": "FAILED",
            "worker_id": "fakehost:8083",
            "trace": "org.apache.kafka.common.errors.RecordTooLargeException\n"
        }
    ]
}
Copy
POST /connectors/(string: name)/restart

Restart the connector. You may use the following query parameters to restart any combination of the Connector and/or Task instances for the connector.

Example request:

POST /connectors/hdfs-sink-connector/restart HTTP/1.1
Host: connect.example.com
Copy

Example response:

HTTP/1.1 200 OK
Copy

Query parameters:

Name Data type Required / Optional Default value Description
?includeTasks=<true|false> Boolean Optional False Specifies whether to restart the connector instance and task instances (includeTasks=true`) or just the connector instance (includeTasks=false)
?onlyFailed=<true|false> Boolean Optional False Specifies whether to restart just the instances with a FAILED status (onlyFailed=true) or all instances (onlyFailed=false)

The following responses will be outputted accordingly:

  • 200 OK: When the named connector exists and the server has successfully stopped and begun restarting only the Connector object (for example, includeTasks=false and onlyFailed=false). No response body will be returned.
  • 202 ACCEPTED: When the named connector exists and the server has successfully and durably recorded the request to stop and begin restarting at least one failed or running Connector object and Task instances (for example, includeTasks=true or onlyFailed=true). A response body will be returned, and it is similar to the GET /connector/{connectorName}/status response except that the state field is set to RESTARTING for all instances that will eventually be restarted.
  • 204 NO CONTENT: When the operation succeeded, but there is no content in the response.
  • 404 NOT FOUND: When the named connector does not exist.
  • 409 CONFLICT: When a rebalance is needed, forthcoming, or underway while restarting any of the connector and/or task objects; the reason may mention that the Connect cluster’s leader is not known, or that the worker assigned the connector can’t be found.
  • 500 INTERNAL SERVER ERROR: When the request timed out (takes more than 90 seconds), which means the request could not be durably recorded, perhaps because the worker or cluster are shutting down or because the worker receiving the request has temporarily lost contact with the Kafka cluster.

Example request:

POST /connectors/my-connector/restart?includeTasks=true&onlyFailed=true
Host: connect.example.com
Copy

Example response:

HTTP/1.1 202 ACCEPTED
{
      "name": "my-connector",
      "connector": {
          "state": "RUNNING",
          "worker_id": "fakehost1:8083"
      },
      "tasks":
      [
          {
              "id": 0,
              "state": "RUNNING",
              "worker_id": "fakehost2:8083"
          },
          {
              "id": 1,
              "state": "RESTARTING",
              "worker_id": "fakehost3:8083"
          },
          {
              "id": 2,
              "state": "RESTARTING",
              "worker_id": "fakehost1:8083"
          }
      ]
}
Copy

Important

The Connector instance and task 0 were not restarted, since they were RUNNING when this call was made. The user can monitor the progress of the restart with subsequent calls to the GET /connector/{connectorName}/status method.

PUT /connectors/(string: name)/pause

Pause the connector and its tasks, which stops message processing until the connector is resumed. This call asynchronous and the tasks will not transition to PAUSED state at the same time.

Example request:

PUT /connectors/hdfs-sink-connector/pause HTTP/1.1
Host: connect.example.com
Copy

Example response:

HTTP/1.1 202 Accepted
Copy
PUT /connectors/(string: name)/resume

Resume a paused connector or do nothing if the connector is not paused. This call asynchronous and the tasks will not transition to RUNNING state at the same time.

Example request:

PUT /connectors/hdfs-sink-connector/resume HTTP/1.1
Host: connect.example.com
Copy

Example response:

HTTP/1.1 202 Accepted
Copy
PUT /connectors/(string: name)/stop

Stops the connector but does not delete the connector. All tasks for the connector are shut down completely. When you resume a stopped connector, the connector starts on the assigned worker.

Example request:

PUT /connectors/hdfs-sink-connector/stop HTTP/1.1
Host: connect.example.com
Copy

Example response:

HTTP/1.1 202 Accepted
Copy
DELETE /connectors/(string: name)/

Delete a connector, halting all tasks and deleting its configuration. Return 409 (Conflict) if rebalance is in process.

Example request:

DELETE /connectors/hdfs-sink-connector HTTP/1.1
Host: connect.example.com
Copy

Example response:

HTTP/1.1 204 No Content
Copy

Tasks

GET