Skip to content
This repository was archived by the owner on May 10, 2024. It is now read-only.

Commit c2d0f4a

Browse files
committed
k8s-scheduler: run observer on a single thread
Signed-off-by: Lalith Suresh <lsuresh@vmware.com>
1 parent 750b87d commit c2d0f4a

File tree

1 file changed

+18
-13
lines changed

1 file changed

+18
-13
lines changed

k8s-scheduler/src/main/java/com/vmware/dcm/PodEventsToDatabase.java

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.fabric8.kubernetes.api.model.Quantity;
3030
import io.fabric8.kubernetes.api.model.ResourceRequirements;
3131
import io.fabric8.kubernetes.api.model.Toleration;
32+
import io.reactivex.schedulers.Schedulers;
3233
import io.reactivex.subjects.PublishSubject;
3334
import org.jooq.DSLContext;
3435
import org.jooq.Insert;
@@ -80,19 +81,23 @@ private enum Operators {
8081

8182
PodEventsToDatabase(final IConnectionPool dbConnectionPool) {
8283
this.dbConnectionPool = dbConnectionPool;
83-
eventStream.buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT).subscribe(podEvents -> {
84-
if (podEvents.isEmpty()) {
85-
return;
86-
}
87-
final List<Query> queries = new ArrayList<>();
88-
for (final BatchedTask task: podEvents) {
89-
queries.addAll(task.queries());
90-
}
91-
LOG.info("Inserting {} queries from a batch of {} events", queries.size(), podEvents.size());
92-
dbConnectionPool.getConnectionToDb().batch(queries).execute();
93-
for (final BatchedTask task: podEvents) {
94-
task.future().set(true);
95-
}
84+
eventStream.subscribeOn(Schedulers.single())
85+
.buffer(BATCH_INTERVAL_IN_MS, TimeUnit.MILLISECONDS, BATCH_COUNT)
86+
.subscribe(podEvents -> {
87+
if (podEvents.isEmpty()) {
88+
return;
89+
}
90+
final List<Query> queries = new ArrayList<>();
91+
for (final BatchedTask task: podEvents) {
92+
queries.addAll(task.queries());
93+
}
94+
final long now = System.nanoTime();
95+
dbConnectionPool.getConnectionToDb().batch(queries).execute();
96+
LOG.info("Inserted {} queries from a batch of {} events in time {}", queries.size(), podEvents.size(),
97+
System.nanoTime() - now);
98+
for (final BatchedTask task: podEvents) {
99+
task.future().set(true);
100+
}
96101
});
97102
}
98103

0 commit comments

Comments
 (0)