Building an Order Delivery Analytics Application with FastAPI, Kafka, Apache Pinot, and Dash, Part 2
In the last article, we delved into the creation of a FastAPI application designed to dispatch orders to a Kafka topic. Building upon that foundation, our journey now takes us deeper into the realm of data infrastructure as we explore the essential setup of both Apache Kafka and Apache Pinot.
As we embark on the path to constructing the Order Delivery Analytics Application, we encounter a critical juncture — the establishment of Apache Kafka. Renowned for its robust data streaming capabilities, Kafka is the crucial bridge connecting our Order Service (responsible for receiving incoming orders) to Apache Pinot, our chosen data warehousing solution. This pivotal step ensures the seamless flow of data within our application, setting the stage for comprehensive analytics and insights.
Installing Kafka
To embark on this journey, we must first install Apache Kafka on our system. The official Apache Kafka documentation provides detailed installation instructions tailored to various platforms. Once installed, don’t forget to fire up the Kafka server. I provide this docker file, which will allow you to run Kafka in your local environment.
version: '3.7'services:
zookeeper:
image: zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
networks:
- order-services
### KAFKA ###
broker:
image: confluentinc/cp-server:7.4.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- order-services
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- order-services
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.4.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
networks:
- order-services
control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
networks:
- order-services
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.4.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
networks:
- order-services
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.4.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
networks:
- order-services
ksql-datagen:
image: confluentinc/ksqldb-examples:7.4.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && cub kafka-ready -b broker:29092 1 40 && echo Waiting for Confluent Schema Registry to be ready... && cub sr-ready schema-registry 8081 40 && echo Waiting a few seconds for topic creation to finish... && sleep 11 && tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
networks:
- order-services
rest-proxy:
image: confluentinc/cp-kafka-rest:7.4.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
networks:
- order-services
Creating a Kafka Topic
In the realm of Kafka, data is structured into topics, each representing a specific data stream. For our Order Delivery Analytics Application, we will craft a topic christened “orders” to handle the incoming flow of order data. Once the containers is up and running, execute the following command to bring this topic to life:
docker exec -it kafka \
kafka-topics \
--create \
--bootstrap-server localhost:9092 \
--topic orders \
--partitions 1
This command weaves the “orders” topic into existence, furnishing it with a single partition and a replication factor of 1.
Setting Up Apache Pinot for Data Storage and Querying
As we continue our journey, the spotlight shifts to Apache Pinot. This distributed, real-time analytics database is the cornerstone of our data storage and querying efforts. Apache Pinot boasts low-latency query capabilities and will house our treasure trove of order data. Configuration in Pinot is orchestrated through two primary files — one for schema and another for table. Let’s add some code to our docker-compose.yml
.
### PINOT ###
pinot-controller:
image: apachepinot/pinot:0.12.0-arm64
command: "StartController -zkAddress zookeeper:2181"
container_name: pinot-controller
restart: unless-stopped
ports:
- "9000:9000"
environment:
JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms1G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-controller.log"
networks:
- order-services
pinot-broker:
image: apachepinot/pinot:0.12.0-arm64
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
ports:
- "8099:8099"
environment:
JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx4G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-broker.log"
depends_on:
- pinot-controller
networks:
- order-services
pinot-server:
image: apachepinot/pinot:0.12.0-arm64
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-server"
ports:
- "8098:8098"
environment:
JAVA_OPTS: "-Dplugins.dir=/opt/pinot/plugins -Xms4G -Xmx16G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xloggc:gc-pinot-server.log"
depends_on:
- pinot-broker
networks:
- order-services
networks:
order-services:
name: order-services
Pinot Schema Configuration
Let’s begin by crafting the schema that will define the structure of our order data. Create a schema configuration file (e.g., schema.json
) to elucidate the fields and their data types. Feast your eyes on this example schema:
{
"schemaName": "orders",
"dimensionFieldSpecs": [
{"name": "id", "dataType": "STRING"},
{"name": "user_id", "dataType": "INT"},
{"name": "delivery_lat", "dataType": "DOUBLE"},
{"name": "delivery_lon", "dataType": "DOUBLE"},
{"name": "items", "dataType": "JSON"}
],
"metricFieldSpecs": [
{"name": "products_ordered", "dataType": "INT"},
{"name": "total_quantity", "dataType": "INT"},
{"name": "total_price", "dataType": "DOUBLE"}
],
"dateTimeFieldSpecs": [
{
"name": "ts",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
This schema configuration delineates fields such as “id,”“user_id,” “created_at,” “delivery_lat,” “delivery_lon,” and “total_price,” and their associated data types and primary key.
Pinot Table Configuration
With the schema in place, proceed to create a table configuration file (e.g., table.json
) that references the schema and spells out the table's storage, indexing, and querying attributes. Here's a condensed version:
{
"tableName": "orders",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "ts",
"timeType": "MILLISECONDS",
"schemaName": "orders",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "orders",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "broker:29092",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"ingestionConfig": {
"transformConfigs": [
{
"columnName": "ts",
"transformFunction": "createdAt"
},
{
"columnName": "total_quantity",
"transformFunction": "JSONPATHLONG(items, '$.sum($.[*].quantity)')"
},
{
"columnName": "products_ordered",
"transformFunction": "JSONPATHARRAY(items, '$.length()')"
}
]
},
"metadata": {
}
}
This table configuration specifies that we’re dealing with a real-time table, sets a retention time for data, and defines indexing and storage options.
Starting Apache Pinot
Armed with your schema and table configurations, it’s time to set Apache Pinot in motion. Utilize these scripts to start pinot with our configuration:
docker run \
-v $PWD/pinot/config:/config \
--network order-services \
apachepinot/pinot:0.12.0-arm64 \
AddTable \
-schemaFile /config/orders/schema.json \
-tableConfigFile /config/orders/table.json \
-controllerHost pinot-controller \
-exec
With Apache Kafka orchestrating the flow of data and Apache Pinot acting as the guardian of our valuable order information, we’re well-prepared for efficient data handling and real-time analytics. The next article will dive headfirst into crafting interactive data visualizations using Dash, transforming raw data into actionable insights.
Checkout the entire code on my GitHub