spring-cloud-dataflow-samples/function-samples at main · spring-cloud/spring-cloud-dataflow-samples

Kafka Streams application samples

Kafka Streams word count application as a processor

When you have a streaming data pipeline that uses Kafka Streams, it can be used as a Processor application in the Spring Cloud Data Flow streaming pipeline. In the following example, you will see how Kafka Streams application can be registered as a Spring Cloud Data Flow processor application and subsequently in streaming data pipeline.

To demonstrate this use case, we have a source application http-ingest which is an extension (with some additional functionalities needed for our examples) of the existing out-of-the-box http source application.

cd http-ingest
./mvnw clean package

At this step, the artifact http-ingest-2.0.0-SNAPSHOT.jar is available.

You can register this artifact as the Spring Cloud Data Flow Source application as follows, From the Spring Cloud Data Flow shell,

dataflow:>app register --name http-ingest --type source --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-2.0.0-SNAPSHOT.jar
Successfully registered application 'source:http-ingest'

The application kstreams-word-count is a Kafka Streams application written using Spring Cloud Stream framework.

cd kstreams-word-count
./mvnw clean package

After this step, the artifact kstreams-word-count-2.0.0-SNAPSHOT.jar is available.

You can register this artifact as the Spring Cloud Data Flow Processor application as follows:

From the Spring Cloud Data Flow shell,

dataflow:>app register --name kstream-word-count --type processor --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-word-count/target/kstreams-word-count-2.0.0-SNAPSHOT.jar
Successfully registered application 'processor:kstream-word-count'

Let’s register the out-of-the-box log application to display the results from the kstream-word-count processor (if this was not registered already from the previous app import).

dataflow:>app register --name log --type sink --uri maven://org.springframework.cloud.stream.app:log-sink-kafka:2.1.1.RELEASE
Successfully registered application 'sink:log'

Create the following stream that listens at HTTP web endpoint http://localhost:9100 and sends the data to the kstream-word-count processor registered in the above step. The KStream processor’s output is then pipelined to the log sink application.

dataflow:>stream create kstream-wc-sample --definition "http-ingest --server.port=9100 | kstream-word-count | log"
Created new stream 'kstream-wc-sample'

dataflow:>stream deploy kstream-wc-sample --properties "deployer.log.local.inheritLogging=true"
Deployment request has been sent for stream 'kstream-wc-sample'

dataflow:>http post --target "http://localhost:9100" --data "Baby shark, doo doo doo doo doo doo"
> POST (text/plain) http://localhost:9100 Baby shark, doo doo doo doo doo doo
> 202 ACCEPTED

You can see the log sink application now has:

2019-03-18 15:40:46.195 INFO 34974 --- [container-0-C-1] log-sink : {"word":"baby","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"} 2019-03-18 15:40:46.198 INFO 34974 --- [container-0-C-1] log-sink : {"word":"shark","count":1,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"} 2019-03-18 15:40:46.199 INFO 34974 --- [container-0-C-1] log-sink : {"word":"doo","count":6,"start":"2019-03-18T10:10:00.000+0000","end":"2019-03-18T10:10:30.000+0000"}

Kafka Streams application that has multiple inputs and single output

Let’s use the http-ingest application to create a couple of streaming pipelines that produce the input data (user regions and user clicks).

To build http-ingest application:

cd http-ingest
./mvnw clean package

You can register this artifact as the Spring Cloud Data Flow Source application as follows, From the Spring Cloud Data Flow shell,

dataflow:>app register --name http-ingest --type source --uri file://<Your local parent directory to the forked github repo>/spring-cloud-dataflow-samples/kafka-samples/http-ingest/target/http-ingest-2.0.0-SNAPSHOT.jar
Successfully registered application 'source:http-ingest'

Create a stream that ingests user regions:

stream create ingest-user-regions --definition "http-ingest --server.port=9000 --http.mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] > :userRegions" --deploy

The above stream uses the named destination to send the user regions HTTP events into the Kafka topic named userRegions. We use the http header username to extract and set it as the key for the Kafka message and the user region String is extracted from the http payload String.

This sample application also showcases how we can make use function composition support in the streaming application.

The http-ingest has a function bean definition that looks like:

@Bean
public Function<String, Long> sendAsUserClicks() {
  return value -> Long.parseLong(value);
}

When this function bean is enabled, the incoming String value is converted to Long value.

When sending user clicks count as Long we can enable this function for the http-ingest while the same application can be used for sending the user region String value.

Creating a stream that ingests user clicks:

stream create ingest-user-clicks --definition "http-ingest --server.port=9001 --mapped-request-headers=username --spring.cloud.stream.kafka.bindings.output.producer.messageKeyExpression=headers['username'] --spring.cloud.stream.kafka.binder.configuration.value.serializer=org.apache.kafka.common.serialization.LongSerializer --spring.cloud.stream.function.definition=sendAsUserClicks > :userClicks" --deploy

The above stream uses the named destination to send the user click HTTP events into the Kafka topic named userClicks. We use the http header username to extract and set it as the key for the Kafka message and the click count is extracted from the http payload. The function bean sendAsUserClicks is enabled using the spring.cloud.stream.function.definition property to send the http payload String as Long and using the value serializer as LongSerializer.

Since the Kafka Streams processor has multiple inputs (one for user click events and another one for user regions events), this application needs to be deployed as app type and you need to explicitly configure the bindings to the appropriate Kafka topics and other related configurations.

To build this application:

cd kstreams-join-user-clicks-and-region
./mvnw clean package

Register this Kafka Streams application as app type:

dataflow:> app register --name join-user-clicks-and-regions --type app --uri file://<local parent directory of this git repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-join-user-clicks-and-region/target/kstreams-join-user-clicks-and-region-2.0.0-SNAPSHOT.jar

We also have a demo application log-user-clicks-per-region that logs the result of the Kafka Streams application. Since the app type is not compatible with other stream application types source, sink and processor, this logger application also needs to registered as app type to work with the above KStream application.

To build this application:

cd kstreams-log-user-clicks-per-region
./mvnw clean package
dataflow:> app register --name log-user-clicks-per-region --type app --uri file://<local parent directory of this git repo>/spring-cloud-dataflow-samples/kafka-samples/kstreams-log-user-clicks-per-region/target/kstreams-log-user-clicks-per-region-2.0.0-SNAPSHOT.jar

Let’s create the stream that pipes the Kafka Streams application’s output into the logger input.

stream create compute-user-clicks-per-region --definition "join-user-clicks-and-regions || log-user-clicks-per-region"

stream deploy compute-user-clicks-per-region --properties "deployer.log-user-clicks-per-region.local.inheritLogging=true"

Now, you can confirm all the three streams (ingest-user-regions, ingest-user-clicks and compute-user-clicks-per-region) are deployed successfully using stream list command from Spring Cloud Data Flow shell.

You can send the following sample user regions using cURL commands:

The http-ingest application in the ingest-user-regions stream accepts user regions data at http://localhost:9000

curl -X POST http://localhost:9000 -H "username: Glenn" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Soby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Janne" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Mark" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Sabby" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Gunnar" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Ilaya" -d "asia" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Chris" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Damien" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Michael" -d "americas" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Christian" -d "europe" -H "Content-Type: text/plain"
curl -X POST http://localhost:9000 -H "username: Oleg" -d "europe" -H "Content-Type: text/plain"

The http-ingest application in the ingest-user-clicks stream accepts user clicks data at http://localhost:9001

curl -X POST http://localhost:9001 -H "username: Glenn" -d 9 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Soby" -d 15 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Janne" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Mark" -d 7 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Sabby" -d 20 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Gunnar" -d 18 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Ilaya" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Chris" -d 5 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Damien" -d 21 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Michael" -d 10 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Christian" -d 12 -H "Content-Type: text/plain"
curl -X POST http://localhost:9001 -H "username: Oleg" -d 10 -H "Content-Type: text/plain"

Once the above data is published, you will see the KStream application outputs the processed result into the logger application and it has the following result:

2019-03-15 15:50:39.251  INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : europe : 53
2019-03-15 15:50:39.252  INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : asia : 10
2019-03-15 15:50:39.252  INFO 49790 --- [container-0-C-1] ksPerRegion$Logger$1 : americas : 84

You can keep publishing some click data and see the results at the logger application.