ksqlDB ships with a lightweight Java client that enables sending
requests easily to a ksqlDB server from within your Java application, as
an alternative to using the REST API. The
client supports pull and push queries; inserting new rows of data into existing
ksqlDB streams; creation and management of new streams, tables, and
persistent queries; and also admin operations such as listing streams,
tables, and topics.
The client sends requests to the HTTP2 server endpoints.
Pull and push queries are served by the
/query-stream endpoint,
and inserts are served by the
/inserts-stream endpoint.
All other requests are served by the
/ksql endpoint.
The client is compatible only with ksqlDB deployments that are on
version 0.10.0 or later.
If you’re using ksqlDB for Confluent Platform (CP), use the Confluent Platform-specific
modules from https://packages.confluent.io/maven/ by replacing
repositories in the example POM above with a repository with this URL
instead. Also update ksqldb.version to be a Confluent Platform version, such as
7.9.1, instead.
Note
The with-dependencies artifact was introduced in ksqlDB
version 0.29 and Confluent Platform version 7.4.0. This jar includes
all the necessary dependencies and relocates most of them in an
attempt to avoid classpath issues. Using this jar provides the easiest
way to get started. If you want more control over the dependencies on
the classpath, you can depend directly on the client using this
dependency block instead:
If you do this, you will need to add all of the transitive dependencies
for ksqldb-api-client.
Create your example app at
src/main/java/my/ksqldb/app/ExampleApp.java:
packagemy.ksqldb.app;importio.confluent.ksql.api.client.Client;importio.confluent.ksql.api.client.ClientOptions;publicclassExampleApp{publicstaticStringKSQLDB_SERVER_HOST="localhost";publicstaticintKSQLDB_SERVER_HOST_PORT=8088;publicstaticvoidmain(String[]args){ClientOptionsoptions=ClientOptions.create().setHost(KSQLDB_SERVER_HOST).setPort(KSQLDB_SERVER_HOST_PORT);Clientclient=Client.create(options);// Send requests with the client by following the other examples// Terminate any open connections and close the clientclient.close();}}
Receive query results one row at a time (streamQuery())¶
The streamQuery() method enables client apps to receive query
results one row at a time, either asynchronously via a Reactive Streams
subscriber or synchronously in a polling fashion.
You can use this method to issue both push and pull queries, but the
usage pattern is better for push queries. For pull queries, consider
using the
executeQuery()
method instead.
Query properties can be passed as an optional second argument. For more
information, see the streamQuery() method in the
Client.java class.
By default, push queries return only newly arriving rows. To start from
the beginning of the stream or table, set the auto.offset.reset
property to earliest.
publicinterfaceClient{/** * Executes a query (push or pull) and returns the results one row at a time. * *
This method may be used to issue both push and pull queries, but the usage
* pattern is better for push queries. For pull queries, consider using the * {@link Client#executeQuery(String)} method instead. * *
If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed. * * @param sql statement of query to execute * @return a future that completes once the server response is received, and contains the query * result if successful */CompletableFuture<StreamedQueryResult>streamQuery(Stringsql);...}
To consume records asynchronously, create a
Reactive Streams subscriber to receive
query result rows:
importio.confluent.ksql.api.client.Row;importorg.reactivestreams.Subscriber;importorg.reactivestreams.Subscription;publicclassRowSubscriberimplementsSubscriber<Row>{privateSubscriptionsubscription;publicRowSubscriber(){}@OverridepublicsynchronizedvoidonSubscribe(Subscriptionsubscription){System.out.println("Subscriber is subscribed.");this.subscription=subscription;// Request the first rowsubscription.request(1);}@OverridepublicsynchronizedvoidonNext(Rowrow){System.out.println("Received a row!");System.out.println("Row: "+row.values());// Request the next rowsubscription.request(1);}@OverridepublicsynchronizedvoidonError(Throwablet){System.out.println("Received an error: "+t);}@OverridepublicsynchronizedvoidonComplete(){System.out.println("Query has ended.");}}
Use the Java client to send the query result to the server and stream
results to the subscriber:
client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;").thenAccept(streamedQueryResult->{System.out.println("Query has started. Query ID: "+streamedQueryResult.queryID());RowSubscribersubscriber=newRowSubscriber();streamedQueryResult.subscribe(subscriber);}).exceptionally(e->{System.out.println("Request failed: "+e);returnnull;});
To consume records one-at-a-time in a synchronous fashion, use the
poll() method on the query result object. If poll() is called
with no arguments, it blocks until a new row becomes available or the
query is terminated. You can also pass a Duration argument to
poll(), which causes poll() to return null if no new rows
are received by the time the duration has elapsed. For more information,
see the
StreamedQueryResult.java class.
StreamedQueryResultstreamedQueryResult=client.streamQuery("SELECT * FROM MY_STREAM EMIT CHANGES;").get();for(inti=0;i<10;i++){// Block until a new row is availableRowrow=streamedQueryResult.poll();if(row!=null){System.out.println("Received a row!");System.out.println("Row: "+row.values());}else{System.out.println("Query has ended.");}}
Receive query results in a single batch (executeQuery())¶
The executeQuery() method enables client apps to receive query
results as a single batch that’s returned when the query completes.
This method is suitable for both pull queries and for terminating push
queries, for example, queries that have a LIMIT clause. For
non-terminating push queries, use the
streamQuery()
method instead.
Query properties can be passed as an optional second argument. For more
information, see the executeQuery() method in the
Client.java class.
By default, push queries return only newly arriving rows. To start from
the beginning of the stream or table, set the auto.offset.reset
property to earliest.
publicinterfaceClient{/** * Executes a query (push or pull) and returns all result rows in a single batch, once the query * has completed. * *
This method is suitable for both pull queries and for terminating push queries,
* for example, queries that have a {@code LIMIT} clause. For non-terminating push queries, * use the {@link Client#streamQuery(String)} method instead. * * @param sql statement of query to execute * @return query result */BatchedQueryResultexecuteQuery(Stringsql);...}
StringpullQuery="SELECT * FROM MY_MATERIALIZED_TABLE WHERE KEY_FIELD='some_key';";BatchedQueryResultbatchedQueryResult=client.executeQuery(pullQuery);// Wait for query resultList<Row>resultRows=batchedQueryResult.get();System.out.println("Received results. Num rows: "+resultRows.size());for(Rowrow:resultRows){System.out.println("Row: "+row.values());}
The terminatePushQuery() method enables client apps to terminate
push queries.
publicinterfaceClient{/** * Terminates a push query with the specified query ID. * *
If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed. * * @param queryId ID of the query to terminate * @return a future that completes once the server response is received */CompletableFuture<Void>terminatePushQuery(StringqueryId);...}
The query ID is obtained from the query result response object when the
client issues push queries, by using either the
streamQuery() or
executeQuery()
methods.
Client apps can insert a new row of data into an existing ksqlDB stream
by using the insertInto() method. To insert multiple rows in a
streaming fashion, see
streamInserts()
instead.
publicinterfaceClient{/** * Inserts a row into a ksqlDB stream. * *
The {@code CompletableFuture} will be failed if a non-200 response is received from the
* server, or if the server encounters an error while processing the insertion. * * @param streamName name of the target stream * @param row the row to insert. Keys are column names and values are column values. * @return a future that completes once the server response is received */CompletableFuture<Void>insertInto(StringstreamName,KsqlObjectrow);...}
Rows for insertion are represented as KsqlObject instances. A
KsqlObject represents a map of strings (in this case, column names)
to values (column values).
Here’s an example of using the client to insert a new row into an
existing stream ORDERS with schema
(ORDER_IDBIGINT,PRODUCT_IDVARCHAR,USER_IDVARCHAR).
Insert new rows in a streaming fashion (streamInserts())¶
Starting with ksqlDB 0.11.0, the streamInserts() method enables
client apps to insert new rows of data into an existing ksqlDB stream in
a streaming fashion. This is in contrast to the
insertInto() method
which inserts a single row per request.
publicinterfaceClient{/** * Inserts rows into a ksqlDB stream. Rows to insert are supplied by a * {@code org.reactivestreams.Publisher} and server acknowledgments are exposed similarly. * *
The {@code CompletableFuture} will be failed if a non-200 response is received from the
* server. * *
See {@link InsertsPublisher} for an example publisher that may be passed an argument to
* this method. * * @param streamName name of the target stream * @param insertsPublisher the publisher to provide rows to insert * @return a future that completes once the initial server response is received, and contains a * publisher that publishes server acknowledgments for inserted rows. */CompletableFuture<AcksPublisher>streamInserts(StringstreamName,Publisher<KsqlObject>insertsPublisher);...}
Rows for insertion are represented as KsqlObject instances. A
KsqlObject represents a map of strings (in this case, column names)
to values (column values).
The rows to be inserted are supplied via a
Reactive Streams publisher. For
convenience, the Java client for ksqlDB ships with a simple publisher
implementation suitable for use with the streamInserts() method out
of the box. This implementation is the
InsertsPublisher
in the example usage below.
As the specified rows are inserted by the ksqlDB server, the server
responds with acknowledgments that may be consumed from the
AcksPublisher
returned by the streamInserts() method. The AcksPublisher is a
Reactive Streams publisher.
Here’s an example of using the client to insert new rows into an
existing stream ORDERS, in a streaming fashion. The ORDERS
stream has schema
(ORDER_IDBIGINT,PRODUCT_IDVARCHAR,USER_IDVARCHAR).
To consume server acknowledgments for the stream of inserts, implement a
Reactive Streams subscriber to receive the acknowledgments:
importio.confluent.ksql.api.client.InsertAck;importorg.reactivestreams.Subscriber;importorg.reactivestreams.Subscription;publicclassAcksSubscriberimplementsSubscriber<InsertAck>{privateSubscriptionsubscription;publicAcksSubscriber(){}@OverridepublicsynchronizedvoidonSubscribe(Subscriptionsubscription){System.out.println("Subscriber is subscribed.");this.subscription=subscription;// Request the first acksubscription.request(1);}@OverridepublicsynchronizedvoidonNext(InsertAckack){System.out.println("Received an ack for insert number: "+ack.seqNum());// Request the next acksubscription.request(1);}@OverridepublicsynchronizedvoidonError(Throwablet){System.out.println("Received an error: "+t);}@OverridepublicsynchronizedvoidonComplete(){System.out.println("Inserts stream has been closed.");}}
and subscribe to the AcksPublisher from above:
acksPublisher.subscribe(newAcksSubscriber());
Create and manage new streams, tables, and persistent queries (executeStatement())¶
Starting with ksqlDB 0.11.0, the executeStatement() method enables
client apps to:
Create new ksqlDB streams and tables
Drop existing ksqlDB streams and tables
Create new persistent queries, i.e., CREATE...ASSELECT and
INSERTINTO...ASSELECT statements
Pause, Resume, and Terminate persistent queries
publicinterfaceClient{/** * Sends a SQL request to the ksqlDB server. This method supports 'CREATE', 'CREATE ... AS * SELECT', 'DROP', 'TERMINATE', and 'INSERT INTO ... AS SELECT' statements. * *
Each request should contain exactly one statement. Requests that contain multiple statements
* will be rejected by the client, in the form of failing the {@code CompletableFuture}, and the * request will not be sent to the server. * *
The {@code CompletableFuture} is completed once a response is received from the server.
* Note that the actual execution of the submitted statement is asynchronous, so the statement * may not have been executed by the time the {@code CompletableFuture} is completed. * *
If a non-200 response is received from the server, the {@code CompletableFuture} will be
* failed. * * @param sql the request to be executed * @return a future that completes once the server response is received, and contains the query ID * for statements that start new persistent queries */CompletableFuture<ExecuteStatementResult>executeStatement(Stringsql);...}
To use this method, pass in the SQL for the command to be executed.
Query properties can be passed as an optional second argument. For more
information, see the
For more information, see the executeStatement() in the
Client.java class.
As explained in the Javadocs for the method above, the
CompletableFuture returned by the executeStatement() method is
completed as soon as the ksqlDB server has accepted the statement and a
response is received by the client. In most situations, the ksqlDB
server will have already executed the statement by this time, but this
is not guaranteed.
For statements that create new persistent queries, the query ID may be
retrieved from the returned ExecuteStatementResult, as long as the
ksqlDB server version is at least 0.11.0, and the statement has executed
by the time the server response was completed.
Start a persistent query that reads from the earliest offset, assuming
the stream ORDERS exists:
Stringsql="CREATE TABLE ORDERS_BY_USER AS "+"SELECT USER_ID, COUNT(*) as COUNT "+"FROM ORDERS GROUP BY USER_ID EMIT CHANGES;";Map<String,Object>properties=Collections.singletonMap("auto.offset.reset","earliest");ExecuteStatementResultresult=client.executeStatement(sql,properties).get();System.out.println("Query ID: "+result.queryId().orElse(""));
Terminate a persistent query, assuming a query with ID
CTAS_ORDERS_BY_USER_0 exists:
Starting with ksqlDB 0.12.0, the describeSource() method enables
client apps to fetch metadata for existing ksqlDB streams and tables.
The metadata returned from this method includes the stream or table’s
underlying topic name, column names and associated types, serialization
formats, queries that read and write from the stream or table, and more.
For more information, see the
Client.java class.
Fetch metadata for the stream or table with name my_source:
SourceDescriptiondescription=client.describeSource("my_source").get();System.out.println("This source is a "+description.type());System.out.println("This stream/table has "+description.fields().size()+" columns.");System.out.println(description.writeQueries().size()+" queries write to this stream/table.");System.out.println(description.readQueries().size()+" queries read from this stream/table.");
Starting with ksqlDB 0.16.0, the serverInfo() method enables client
apps to fetch metadata about the ksqlDB cluster. The metadata returned
from this method includes the version of ksqlDB the server is running,
the Kafka cluster id and the ksqlDB service id. For more information,
see the Client.java class.
ServerInfoserverInfo=client.serverInfo().get();System.out.println("The ksqlDB version running on this server is "+serverInfo.getServerVersion());System.out.println("The Kafka cluster this server is using is "+serverInfo.getKafkaClusterId());System.out.println("The id of this ksqlDB service is "+serverInfo.getKsqlServiceId());
ConnectorDescriptiondescription=client.describeConnector("jdbc-connector").get();System.out.println(description.name()+" is a "+description.type()+" connector.\n"+" The connector's class is "+description.className()+".\n"+" The connector is currently "+description.state()+".\n"+" It reads/writes to "+description.sources().size()+" ksqlDB sources"+" and uses "+description.topics().size()+" topics.");
Starting with ksqlDB 0.18.0, users can define session variables by
calling the define() method and reference them in other functions by
wrapping the variable name in ${}. The undefine() method undefines
a session variable, and getVariables() returns a map of the currently
defined variables and their values.
Substitution is supported for the following functions:
Sometimes, you need to execute requests directly against the ksqlDB
server for reasons including, but not limited to, accessing features in
ksqlDB REST API that are not available in the API client, or
deserializing responses into different classes that are more native to
your application.
For this purpose, the Client now adds an HttpRequest and
HttpResponse interface that you can use for sending direct requests.
HttpResponseresponse=client.buildRequest("GET","/info").send().get();// check status withassertresponse.status()==200;// parse body (a byte[]) with:parseIntoJson(response.body())// or use the helper method to read body into a map:Map<String,Map<String,Object>>info=response.bodyAsMap();
Add query properties variables:
HttpResponseresponse=client.buildRequest("POST","/ksql").payload("ksql","CREATE STREAM FOO AS CONCAT(A, `wow;`) FROM `BAR`;").propertiesKey("streamsProperties").property("auto.offset.reset","earliest").send().get();assertresponse.status()==200;
Or build the entire payload manually:
HttpResponseresponse=client.buildRequest("POST","/ksql").payload("ksql","CREATE STREAM FOO AS CONCAT(A, `wow;`) FROM `BAR`;").payload("streamsProperties",Collections.singletonMap("auto.offset.reset","earliest")).send().get();assertresponse.status()==200;
The send() method adds authentication headers as specified in
ClientOptions.
Starting with ksqlDB 0.27, users can use the assertSchema and
assertTopic methods to assert the existence of resources. If the
assertion fails, then the method will return a failed
CompletableFuture.
// Create the rows to insertList<KsqlObject>insertRows=newArrayList<>();insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","358579699410099").put("TX_ID","f88c5ebb-699c-4a7b-b544-45b30681cc39").put("TIMESTAMP","2020-04-22T03:19:58").put("AMOUNT",newBigDecimal("50.25")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","352642227248344").put("TX_ID","0cf100ca-993c-427f-9ea5-e892ef350363").put("TIMESTAMP","2020-04-25T12:50:30").put("AMOUNT",newBigDecimal("18.97")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","373913272311617").put("TX_ID","de9831c0-7cf1-4ebf-881d-0415edec0d6b").put("TIMESTAMP","2020-04-19T09:45:15").put("AMOUNT",newBigDecimal("12.50")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","358579699410099").put("TX_ID","044530c0-b15d-4648-8f05-940acc321eb7").put("TIMESTAMP","2020-04-22T03:19:54").put("AMOUNT",newBigDecimal("103.43")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","352642227248344").put("TX_ID","5d916e65-1af3-4142-9fd3-302dd55c512f").put("TIMESTAMP","2020-04-25T12:50:25").put("AMOUNT",newBigDecimal("3200.80")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","352642227248344").put("TX_ID","d7d47fdb-75e9-46c0-93f6-d42ff1432eea").put("TIMESTAMP","2020-04-25T12:51:55").put("AMOUNT",newBigDecimal("154.32")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","358579699410099").put("TX_ID","c5719d20-8d4a-47d4-8cd3-52ed784c89dc").put("TIMESTAMP","2020-04-22T03:19:32").put("AMOUNT",newBigDecimal("78.73")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","373913272311617").put("TX_ID","2360d53e-3fad-4e9a-b306-b166b7ca4f64").put("TIMESTAMP","2020-04-19T09:45:35").put("AMOUNT",newBigDecimal("234.65")));insertRows.add(newKsqlObject().put("EMAIL_ADDRESS","[email protected]").put("CARD_NUMBER","373913272311617").put("TX_ID","de9831c0-7cf1-4ebf-881d-0415edec0d6b").put("TIMESTAMP","2020-04-19T09:44:03").put("AMOUNT",newBigDecimal("150.00")));// Insert the rowsList<CompletableFuture<Void>>insertFutures=newArrayList<>();for(KsqlObjectrow:insertRows){insertFutures.add(client.insertInto("TRANSACTIONS",row));}// Wait for the inserts to completeCompletableFuture<Void>allInsertsFuture=CompletableFuture.allOf(insertFutures.toArray(newCompletableFuture>[0]));allInsertsFuture.thenRun(()->System.out.println("Seeded transaction events."));
Create the anomalies tables:
Stringsql="CREATE TABLE possible_anomalies WITH ("+" kafka_topic = 'possible_anomalies',"+" VALUE_AVRO_SCHEMA_FULL_NAME = 'io.ksqldb.tutorial.PossibleAnomaly'"+") AS"+" SELECT card_number AS `card_number_key`,"+" as_value(card_number) AS `card_number`,"+" latest_by_offset(email_address) AS `email_address`,"+" count(*) AS `n_attempts`,"+" sum(amount) AS `total_amount`,"+" collect_list(tx_id) AS `tx_ids`,"+" WINDOWSTART as `start_boundary`,"+" WINDOWEND as `end_boundary`"+" FROM transactions"+" WINDOW TUMBLING (SIZE 30 SECONDS, RETENTION 1000 DAYS)"+" GROUP BY card_number"+" HAVING count(*) >= 3"+" EMIT CHANGES;";Map<String,Object>properties=Collections.singletonMap("auto.offset.reset","earliest");client.executeStatement(sql,properties).get();
Check contents of the anomalies table with a push query:
In the example above, RowSubscriber is the example subscriber
implementation introduced in the
section on the streamQuery() method
above. The RowSubscriber implementation
can be adapted to adjust how the received rows are printed, or to pass
them to a downstream application.
Stringsql1="SELECT name, total_calls, minutes_engaged FROM lifetime_view WHERE name = 'derek';";Stringsql2="SELECT name, total_calls, minutes_engaged FROM lifetime_view WHERE name = 'michael';";// Execute two pull queries and compare the resultsclient.executeQuery(sql1).thenCombine(client.executeQuery(sql2),(queryResult1,queryResult2)->{// One row is returned from each query, as long as the queried keys existRowresult1=queryResult1.get(0);Rowresult2=queryResult2.get(0);if(result1.getLong("TOTAL_CALLS")>result2.getLong("TOTAL_CALLS")){System.out.println(result1.getString("NAME")+" made more calls.");}else{System.out.println(result2.getString("NAME")+" made more calls.");}returnnull;});