Queuewars es un ejercicio diseñado para mostrar como distintas configuraciones de tópicos y particiones en Kafka afectan a la velocidad de procesado de series temporales.
El objetivo es apoderarse de la mayor cantidad de bloques, antes de que el resto de equipos lo haga. La vista general de todos los bloques se muestra en queuewars.luisbelloch.es. Cada bloque sólo puede ser reclamado por un equipo.
Los bloques se han fraccionado en N pedazos (chunks) con un identificador para cada trozo, así como un peso. La suma de todos los pesos de los trozos, para un mismo bloque, es igual a 1.
Se pide desarrollar un productor y un consumidor de Kafka que ordene los distintos pedazos hasta componer un bloque. Cuando se disponga de un bloque entero, se ha de enviar al servidor con los identificadores de todos los pedazos que lo componen.
El productor debe recoger los pedazos aleatorios que devuelve el servidor y publicarlo en el Kafka local que habréis levantado mediante make up
. Para recoger los pedazos del servidor, puede utilizarse la función api.fetch()
:
for chunk in api.fetch():
print('Chunk', chunk)
Cada pedazo (chunk), tiene la siguiente estructura:
{ "id": "7bb5e0", "parent": 1023, "weight": 0.13 }
Donde id
es el identificador del pedazo, parent
es el identificador del bloque y weight
es el peso del pedazo en relación al bloque. El productor debe publicar cada pedazo en uno o varios tópicos de Kafka. La elección de las particiones, el número de brokers y los topics es crucial para aumentar o reducir la velocidad de procesado.
El consumidor debe componer los pedazos de cada bloque hasta que la suma sea igual a 1, momento en el que enviará la solución al servidor:
parent = 42
chunk_ids = ['9e100b', 'd31cc4', '90e756']
statusCode = api.confirm(parent, chunk_ids)
Cuando la solución sea correcta, y el bloque no haya sido reclamado por ningún otro equipo, statusCode será igual a 200. Recordad que los bloques sólo pueden ser reclamados una sola vez.
Este es el diagrama completo del proceso:
Hay un esqueleto de ejemplo para el consumidor (consumer.rb
) y el productor (producer.rb
) que puede descargarse desde aquí.
Para levantar Kafka puede usarse el comando make up
. Al final de este documento hay una lista de todos los comandos disponibles.
Para ejecutar el código de ejemplo se requiere Python 3.11.4 y la librería cliente de Kafka. Hay dos opciones: realizar una instalación local con pyenv y venv o usar un contenedor de Docker con todo precargado.
Una vez instalado pyenv y descomprimido el código de ejemplo, ejecutar en el terminal:
mkdir -p queuewars
curl -s http://bigdata.luisbelloch.es/queuewars.tgz | tar xvf - -C queuewars
cd queuewars
pyenv install
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
Eso instalará una versión de python con todas sus dependencias en la carpeta .venv
. Recordad que en cada terminal nueva debéis activar la instalación mediante source .venv/bin/activate
.
El método actual creará un contenedor junto con el resto de contenedores de Kafka, conteniendo Python y todas las dependencias creadas. Los archivos locales se montan dentro del contenedor, por lo que cualquier modificación se refleja internamente.
mkdir -p queuewars
curl -s http://bigdata.luisbelloch.es/queuewars.tgz | tar xvf - -C queuewars
cd queuewars
make up
make shell
Por ejemplo, para ejecutar el consumidor de ejemplo:
root@74c2b8804962:/opt/queuewars# python demo_consumer.py
Received: {"foo": 1, "bar": "penguin"}
Received: {"hello": 1, "world": "bar"}
Es posible hacer make shell
tantas veces como sea necesario.
Para gestionar multiples terminales, puedes utilizar tmux, tal como vimos en clase.
Para reiniciar el tablero y devolver todos los bloques a su estado original, puede lanzarse la siguiente petición:
curl -L -X DELETE -d "{}" http://queuewars.luisbelloch.es/api/block
Ten en cuenta que otros alumnos pueden estar haciendo pruebas en el mismo momento.
Puedes utilizar el siguiente contenedor para no depender de la URL en internet:
docker run -p 9000:9000 -ti luisbelloch/queuewars
La interfaz de usuario estará disponible en localhost:9000.
Recuerda cambiar la configuración en config.py
:
OWNER = 'one' # one, two, three, four, five, six
# BASE_URL = "https://queuewars.luisbelloch.es"
BASE_URL = "http://localhost:9000"
# Levantar Kafka y sus dependencias
make up
# Destruir todos los contenedores
make down
# Crear un topic de ejemplo, llamado 'test'
make create-topic
# Crear un topic con un nombre determinado
TOPIC_NAME=precios make create-topic
# Listar todos los topics
make list-topics
# Subscribirse por consola a un topic llamado precios
TOPIC_NAME=precios make consumer
# Subscribirse a todos los mensajes desde el principio
TOPIC_NAME=precios make consumer-from-beginning
# Publicar por consola en un topic llamado precios
TOPIC_NAME=precios make producer
# Entrar en la consola de ksqlDB
make ksqldb
# Limpiar contenedores antiguos
docker container prune
docker volume prune
En el caso de que no dispongas de make
puedes ejecutar los siguientes comandos directamente:
# make up
docker compose up
# make down
docker compose down --remove-orphans --rmi local --volumes
# make shell
docker exec -it shell bash
# make ksqldb
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
# make up-cluster
docker compose -f docker-compose.yml -f docker-compose.cluster.yml up
# make down-cluster
docker compose -f docker-compose.yml -f docker-compose.cluster.yml down --remove-orphans --rmi local --volumes
# make create-topic
docker compose exec broker kafka-topics --bootstrap-server broker:9092 --create --topic test --replication-factor 1 --partitions 1
# make list-topics
docker compose exec broker kafka-topics --bootstrap-server broker:9092 --list
# make delete-topic
docker compose exec broker kafka-topics --bootstrap-server broker:9092 --delete --topic test
# make describe-topic
docker compose exec broker kafka-topics --bootstrap-server broker:9092 --describe --topic test
# make producer
docker compose exec broker kafka-console-producer --bootstrap-server broker:9092 --topic test
# make producer-keys
docker compose exec broker kafka-console-producer --bootstrap-server broker:9092 --topic test --property parse.key=true --property key.separator=":"
# make consumer
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic test
# make consumer-keys
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic test --property print.key=true --property key.separator=":"
# make consumer-from-beginning
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic test --from-beginning
# make consumer-group
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic test --consumer-property group.id=work_queue_name
# make describe-group
docker compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --describe --group work_queue_name
# PARTITION_COUNT=3 TOPIC_NAME=patatas make create-topic-many-partitions
docker compose exec broker kafka-topics --bootstrap-server broker:9092 --create --topic patatas --replication-factor 1 --partitions 3
# make consumer-partition-0
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic test --from-beginning --partition 0
# make consumer-partition-1
docker compose exec broker kafka-console-consumer --bootstrap-server broker:9092 --topic test --from-beginning --partition 1