DMTN-310

Reducing Butler database contention in Prompt Processing#

Abstract

During Operations Rehearsal 5, the scalability of Prompt Processing was limited by the Butler Postgres database. We propose creating a service to batch the outputs from the individual Prompt Processing pods, reducing contention for the Butler database. Instead of writing directly to the Butler database, pods would send an event to the service using Kafka.

The problem#

During Operations Rehearsal 5, the throughput of Prompt Processing was limited when Butler Postgres query times increased significantly under high concurrency, in some cases taking up to 10 minutes to update the database with information about the datasets generated by processing. DM-49293 has additional details.

The current Butler queries for inserting dataset records are inefficient for the Prompt Processing use case. They do a large amount of sanity-checking that is reasonably efficient when writing many datasets in a batch, but use an excessive number of queries and temporary tables when writing a small number of datasets of many dataset types.

However, having hundreds of connections from a large cluster simultaneously writing to a Postgres database may be problematic even if queries are improved. The Embargo Postgres database used by Prompt Processing is also used by other services and for ad-hoc queries by staff. As the database grows, it will be difficult to guarantee that sufficient resources will be available to meet Prompt Processing time limits.

Besides the queries that caused problems during the Operations Rehearsal, there are other potential issues lurking. For example, collection chain updates take a lock that could cause one stalled pod to block completion of all other work.

Proposed solution#

We propose decoupling the update of the Embargo Butler database from the prompt processing pods by creating a separate writer service to handle the updates. Pods would send events to the writer service via Kafka to notify it that processing has been completed.

By having the database updates occur outside the Prompt Processing pods, we can batch them together to reduce the number of connections and queries hitting the database. Pods could immediately be re-used for other processing without waiting for the database update to complete.

Kafka messages would be formatted as JSON, to re-use existing Butler code for serializing the records that need to be inserted in the database.

For operations log messages and Prometheus metrics for Kafka consumer lag would be added to identify issues and delays ingesting the Kafka messages.

Transferring Butler data to the writer service#

At the end of execution, each Prompt Processing pod does the following writes to the central Butler database:

  1. Inserts dimension records (metadata) associated with the processing that occurred

  2. Inserts dataset records for the data products created by Prompt Processing

  3. Updates a chained collection to include the collection containing the output datasets

The following sections explain how these updates can be transferred to the writer service.

Inserting dimension records#

Transferring and inserting dimension records is straightforward. Prompt Processing already generates a list of the records, and Butler already provides a JSON serialization for these records that can be used to transfer them to the writer service.

Inserting dataset records#

Currently, the insertion of dataset records in Prompt Processing uses the method Butler.transfer_from(). transfer_from is a compound operation that both copies the artifact files and updates the database.

In the new scheme, the Prompt Processing pod is still required to copy the artifact files to the central S3 repository, but it does not write the associated dataset records to the Butler Postgres database.

Instead of calling transfer_from(), we can use Datastore.export(). Datastore.export() generates a FileDataset object containing all the information necessary to insert a dataset record into the database. FileDataset does not currently have a convenient JSON serialization, but one could be added easily. The writer service can call Butler.ingest() to insert the dataset records into the database.

In addition, Datastore.export() copies the files to a specified location. Prompt Processing pods could write the files to a staging area separate from the central Butler datastore directory, and the writer service could move them to their final location. This has the downside of potentially doubling the bandwidth requirements for the file copy, depending on the S3 implementation.

Alternatively, the pods could write the files directly to their final destination in the datastore, but this increases the chances of consistency problems between the database and the files on disk.

Chained collection update#

The name of the collection can be transferred in JSON as a plain string, and the writer service can use the same function that the pods are currently using to update the database.

Kafka setup#

Phalanx with the Strimzi operator would be used to install Kafka into the existing Butler development and production USDF vClusters. A Kafka topic and username for Prompt Processing would be added. A retention period would be configured on the topic to retain messages for a certain amount of days to allow for events to be processed if the Butler database or the writer service is down.

If we partition the Kafka topic for scalability, we need to be careful with the event ordering because Kafka only guarantees a consistent ordering within each partition, not across partitions. One option would be to partition by a key derived from Butler metadata (e.g. detector number). Alternatively, if there are no dependencies between events we can use round-robin partioning – sending dimension records, dataset records, and the collection chain update together in a single event would guarantee that for the immediate use case.

For communication with Prompt Processing and other applications a load balancer with DNS name would be added.

Fringe Benefits#

As a side-effect, Kafka orders the events in a way that makes it easy to create a log of the data products that were generated. This may make it easier to incrementally populate downstream databases, when data is moved out of the embargo rack and made available to end-users.

The retention period on the Prompt Processing Kakfa topic can also be used as a transaction log to replay event processing in the event of database corruption.

Disadvantages to this approach#

Adding an external service will complicate build and deployment. It also makes it harder for unit tests to fully cover the process of writing the outputs, since it is split between services and adds additional infrastructure requirements.

The other half of the problem#

Removing the writes to the database for the outputs will not fully remove the need for Prompt Processing pods to communicate with the central Butler database. The database is also queried for inputs needed by the pipelines executed on the pods.

However, with writes eliminated, we no longer need to connect to the primary database. We can set up a dedicated read replica for Prompt Processing, insulating time-critical processing from other users/services that may consume resources unpredictably on the primary database. Using physical replication, the replication lag should be less than a second when things are operating normally, though it could become unbounded when the system is under stress.

Testing in Operations Rehearsal 5 showed that with only the input queries, the database access from the pods was able to scale acceptably. We may not need to do anything to address the input queries until we start seeing issues from growth in the database or increased complexity of queries.