An event-driven microservice is a pattern in which a piece of code only
communicates with the outside world through messages called events. This
technique can dramatically simplify an architecture because each
microservice only receives and emits information from clearly defined
communication channels. Because state is localized within each
microservice, complexity is tightly contained.
Diagram showing an event-driven microservice implemented the “hard way”, without ksqlDB¶
A common way that you might implement this architecture is to feed event
streams into Kafka, read them with a stream processing framework,
and trigger side-effects whenever something of interest happens — like
sending an email with Twilio SendGrid. This
works, but it’s up to you to blend your stream processing, state, and
side-effects logic in a maintainable way. Is there a better approach?
Scaling stateful services is challenging. Coupling a stateful service
with the responsibility of triggering side-effects makes it even harder.
It’s up to you to manage both as if they were one, even though they
might have completely different needs. If you want to change how
side-effects behave, you also need to redeploy your stateful stream
processor. ksqlDB helps simplify this by splitting things up: stateful
stream processing is managed on a cluster of servers, while side-effects
run inside your stateless microservice.
Diagram showing an event-driven microservice implemented the “easy way”, with ksqlDB¶
Using ksqlDB, you can isolate complex stateful operations within
ksqlDB’s runtime. Your app stays simple because it is stateless. It
merely reads events from a Kafka topic and executes side-effects as
needed.
Imagine that you work at a financial services company which clears many
credit card transactions each day. You want to prevent malicious
activity in your customer base. When a high number of transactions
occurs in a narrow window of time, you want to notify the cardholder of
suspicious activity.
This tutorial shows how to create an event-driven microservice that
identifies suspicious activity and notifies customers. It demonstrates
finding anomalies with ksqlDB and sending alert emails using a simple
Kafka consumer with SendGrid.
Before you issue more commands, tell ksqlDB to start all queries from
earliest point in each topic:
SET'auto.offset.reset'='earliest';
We want to model a stream of credit card transactions from which we’ll
look for anomalous activity. To do that, create a ksqlDB stream to
represent the transactions. Each transaction has a few key pieces of
information, like the card number, amount, and email address that it’s
associated with. Because the specified topic (transactions) does not
exist yet, ksqlDB creates it on your behalf.
Notice that this stream is configured with a custom timestamp to
signal that
event-time
should be used instead of
processing-time.
What this means is that when ksqlDB does time-related operations over
the stream, it uses the timestamp column to measure time, not the
current time of the operating system. This makes it possible to handle
out-of-order events.
The stream is also configured to use the Avro format for the value
part of the underlying Kafka records that it generates. Because
ksqlDB has been configured with Schema Registry (as part of the Docker
Compose file), the schemas of each stream and table are centrally
tracked. We’ll make use of this in our microservice later.
If a single credit card is transacted many times within a short
duration, there’s probably something suspicious going on. A table is an
ideal choice to model this because you want to aggregate events over
time and find activity that spans multiple events. Run the following
statement:
For each credit card number, 30 second
tumbling windows
are created to group activity. A new row is inserted into the table
when at least 3 transactions take place inside a given window.
The window retains data for the last 1000 days based on each
row’s timestamp. In general, you should choose your retention
carefully. It is a trade-off between storing data longer and having
larger state sizes. The very long retention period used in this
tutorial is useful because the timestamps are fixed at the time of
writing this and won’t need to be adjusted often to account for
retention.
The credit card number is selected twice. In the first instance, it
becomes part of the underlying Kafka record key, because it’s
present in the groupby clause, which is used for sharding. In
the second instance, the as_value function is used to make it
available in the value, too. This is generally for convenience.
The individual transaction IDs and amounts that make up the window
are collected as lists.
The last transaction’s email address is “carried forward” with
latest_by_offset.
Column aliases are surrounded by backticks, which tells ksqlDB to use
exactly that case. ksqlDB uppercases identity names by default.
The underlying Kafka topic for this table is explicitly set to
possible_anomalies.
The Avro schema that ksqlDB generates for the value portion of its
records is recorded under the namespace
io.ksqldb.tutorial.PossibleAnomaly. You’ll use this later in the
microservice.
Check what anomalies the table picked up. Run the following statement to
select a stream of events emitted from the table:
SELECT*FROMpossible_anomaliesEMITCHANGES;
This should yield a single row. Three transactions for card number
358579699410099 were recorded with timestamps within a single
30-second tumbling window:
Notice that so far, all the heavy lifting happens inside of ksqlDB.
ksqlDB takes care of the stateful stream processing. Triggering
side-effects will be delegated to a light-weight service that consumes
from a Kafka topic. You want to send an email each time an anomaly is
found. To do that, you’ll implement a simple, scalable microservice. In
practice, you might use Kafka Streams to handle
this piece, but to keep things simple, just use a Kafka consumer
client.
Start by creating a pom.xml file for your microservice. This simple
microservice will run a loop, reading from the possible_anomalies
Kafka topic and sending an email for each event it receives.
Dependencies are declared on Kafka, Avro, SendGrid, and a few
other things:
Before you can begin coding your microservice, you’ll need access to the
Avro schemas that the Kafka topic is serialized with. Confluent
has a Maven plugin
that makes this simple, which you might have already noticed is present
in the pom.xml file. Run the following command, which downloads the
required Avro schema out of Schema Registry to your local machine:
mvnschema-registry:download
You should now have a file called
src/main/avro/possible_anomalies-value.avsc. This is the Avro schema
generated by ksqlDB for the value portion of the Kafka records of
the possible_anomalies topic.
Next, compile the Avro schema into a Java file. The
Avro Maven plugin
(already added to the pom.xml file, too) makes this simple:
mvngenerate-sources
You should now have a file called
target/generated-sources/io/ksqldb/tutorial/PossibleAnomaly.java
containing the compiled Java code.
Now we can write the code that triggers side effects when anomalies are
found. Add the following Java file at
src/main/java/io/ksqldb/tutorial/EmailSender.java. This is a simple
program that consumes events from Kafka and sends an email with
SendGrid for each one it finds. There are a few constants to fill in,
including a SendGrid API key. You can get one by signing up for
SendGrid.
packageio.ksqldb.tutorial;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.common.serialization.StringDeserializer;importio.confluent.kafka.serializers.KafkaAvroDeserializer;importio.confluent.kafka.serializers.KafkaAvroDeserializerConfig;importio.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;importcom.sendgrid.SendGrid;importcom.sendgrid.Request;importcom.sendgrid.Response;importcom.sendgrid.Method;importcom.sendgrid.helpers.mail.Mail;importcom.sendgrid.helpers.mail.objects.Email;importcom.sendgrid.helpers.mail.objects.Content;importjava.time.Duration;importjava.time.Instant;importjava.time.ZoneId;importjava.time.format.DateTimeFormatter;importjava.time.format.FormatStyle;importjava.util.Collections;importjava.util.Properties;importjava.util.Locale;importjava.io.IOException;publicclassEmailSender{// Matches the broker port specified in the Docker Compose file.privatefinalstaticStringBOOTSTRAP_SERVERS="localhost:29092";// Matches the Schema Registry port specified in the Docker Compose file.privatefinalstaticStringSCHEMA_REGISTRY_URL="http://localhost:8081";// Matches the topic name specified in the ksqlDB CREATE TABLE statement.privatefinalstaticStringTOPIC="possible_anomalies";// For you to fill in: which address SendGrid should send from.privatefinalstaticStringFROM_EMAIL="<< FILL ME IN >>";// For you to fill in: the SendGrid API key to use their service.privatefinalstaticStringSENDGRID_API_KEY="<< FILL ME IN >>";privatefinalstaticSendGridsg=newSendGrid(SENDGRID_API_KEY);privatefinalstaticDateTimeFormatterformatter=DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT).withLocale(Locale.US).withZone(ZoneId.systemDefault());publicstaticvoidmain(finalString[]args)throwsIOException{finalPropertiesprops=newProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);props.put(ConsumerConfig.GROUP_ID_CONFIG,"email-sender");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,SCHEMA_REGISTRY_URL);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaAvroDeserializer.class);props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,true);try(finalKafkaConsumer<String,PossibleAnomaly>consumer=newKafkaConsumer<>(props)){consumer.subscribe(Collections.singletonList(TOPIC));while(true){finalConsumerRecords<String,PossibleAnomaly>records=consumer.poll(Duration.ofMillis(100));for(finalConsumerRecord<String,PossibleAnomaly>record:records){finalPossibleAnomalyvalue=record.value();if(value!=null){sendEmail(value);}}}}}privatestaticvoidsendEmail(PossibleAnomalyanomaly)throwsIOException{Emailfrom=newEmail(FROM_EMAIL);Emailto=newEmail(anomaly.getEmailAddress().toString());Stringsubject=makeSubject(anomaly);Contentcontent=newContent("text/plain",makeContent(anomaly));Mailmail=newMail(from,subject,to,content);Requestrequest=newRequest();try{request.setMethod(Method.POST);request.setEndpoint("mail/send");request.setBody(mail.build());Responseresponse=sg.api(request);System.out.println("Attempted to send email!\n");System.out.println("Status code: "+response.getStatusCode());System.out.println("Body: "+response.getBody());System.out.println("Headers: "+response.getHeaders());System.out.println("======================");}catch(IOExceptionex){throwex;}}privatestaticStringmakeSubject(PossibleAnomalyanomaly){return"Suspicious activity detected for card "+anomaly.getCardNumber();}privatestaticStringmakeContent(PossibleAnomalyanomaly){returnString.format("Found suspicious activity for card number %s. %s transactions were made for a total of %s between %s and %s",anomaly.getCardNumber(),anomaly.getNAttempts(),anomaly.getTotalAmount(),formatter.format(Instant.ofEpochMilli(anomaly.getStartBoundary())),formatter.format(Instant.ofEpochMilli(anomaly.getEndBoundary())));}}
If everything is configured correctly, emails will be sent whenever an
anomaly is detected. There are a few things to note with this simple
implementation.
First, if you start more instances of this microservice, the partitions
of the possible_anomalies topic will be load balanced across them.
This takes advantage of the standard
Kafka consumer groups
behavior.
Second, this microservice is configured to checkpoint its progress every
100 milliseconds through the ENABLE_AUTO_COMMIT_CONFIG
configuration. That means any successfully processed messages will not
be reprocessed if the microservice is taken down and turned on again.
Finally, note that ksqlDB emits a new event every time a tumbling window
changes. ksqlDB uses a model called “refinements” to continually emit
new changes to stateful aggregations. For example, if an anomaly was
detected because three credit card transactions were found in a given
interval, an event would be emitted from the table. If a fourth is
detected in the same interval, another event is emitted. Because
SendGrid does not (at the time of writing) support idempotent email
submission, you would need to have a small piece of logic in your
program to prevent sending an email multiple times for the same period.
This is omitted for brevity.
If you wish, you can continue the example by inserting more events into
the transactions topics.