Skip to content

Commit

Permalink
refactor: distinguish between awaiting Promise and waiting for subtas…
Browse files Browse the repository at this point in the history
…k, simplify route rebuilding tasks (#7203)
  • Loading branch information
AlCalzone authored Sep 29, 2024
1 parent c29d22e commit d1ddbaf
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 125 deletions.
15 changes: 5 additions & 10 deletions packages/cc/src/lib/Values.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import {
ValueMetadata,
} from "@zwave-js/core";
import type { ZWaveApplicationHost } from "@zwave-js/host";
import {
type FnOrStatic,
type ReturnTypeOrStatic,
evalOrStatic,
} from "@zwave-js/shared/safe";
import type { Overwrite } from "alcalzone-shared/types";
import type { ValueIDProperties } from "./API";

Expand Down Expand Up @@ -128,23 +133,13 @@ type ToDynamicCCValues<
}
>;

type FnOrStatic<TArgs extends any[], TReturn> =
| ((...args: TArgs) => TReturn)
| TReturn;

type ReturnTypeOrStatic<T> = T extends (...args: any[]) => infer R ? R : T;

type InferArgs<T extends FnOrStatic<any, any>[]> = T extends [
(...args: infer A) => any,
...any,
] ? A
: T extends [any, ...infer R] ? InferArgs<R>
: [];

function evalOrStatic<T>(fnOrConst: T, ...args: any[]): ReturnTypeOrStatic<T> {
return typeof fnOrConst === "function" ? fnOrConst(...args) : fnOrConst;
}

/** Defines a single static CC values that belong to a CC */
function defineStaticCCValue<
TCommandClass extends CommandClasses,
Expand Down
2 changes: 1 addition & 1 deletion packages/serial/src/message/Constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export enum FunctionType {
GetProtocolStatus = 0xbf, // Request the current status of the protocol running on the Z-Wave module

FUNC_ID_ZW_SET_PROMISCUOUS_MODE = 0xd0, // Set controller into promiscuous mode to listen to all messages
FUNC_ID_PROMISCUOUS_APPLICATION_COMMAND_HANDLER = 0xd1,
FUNC_ID_PROMISCUOUS_APPLICATION_COMMAND_HANDLER = 0xd1, // deprecated, replaced with a flag for the ApplicationCommandHandler

StartWatchdog = 0xd2, // Start Hardware Watchdog (700 series and newer)
StopWatchdog = 0xd3, // Stop Hardware Watchdog (700 series and newer)
Expand Down
14 changes: 14 additions & 0 deletions packages/shared/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,17 @@ export function sum(values: number[]): number {
export function noop(): void {
// intentionally empty
}

export type FnOrStatic<TArgs extends any[], TReturn> =
| ((...args: TArgs) => TReturn)
| TReturn;

export type ReturnTypeOrStatic<T> = T extends (...args: any[]) => infer R ? R
: T;

export function evalOrStatic<T>(
fnOrConst: T,
...args: any[]
): ReturnTypeOrStatic<T> {
return typeof fnOrConst === "function" ? fnOrConst(...args) : fnOrConst;
}
99 changes: 60 additions & 39 deletions packages/zwave-js/src/lib/controller/Controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4688,10 +4688,14 @@ export class ZWaveController

async function* doRebuildRoutes(nodeId: number) {
// Await the process for each node and convert errors to a non-successful result
const result: boolean = yield () =>
self.rebuildNodeRoutesInternal(nodeId).catch(() =>
false
);
let result: boolean;
try {
const node = self.nodes.getOrThrow(nodeId);
result = yield () =>
self.getRebuildNodeRoutesTask(node);
} catch {
result = false;
}

// Track the success in a map
self._rebuildRoutesProgress.set(
Expand Down Expand Up @@ -4736,29 +4740,42 @@ export class ZWaveController
"Rebuilding routes for sleeping nodes when they wake up",
);

const tasks = todoSleeping.map((nodeId) =>
const sleepingNodes = todoSleeping.map((nodeId) =>
self.nodes.get(nodeId)
).filter((node) => node != undefined)
.map((node) => {
const sleepingNodeTask: TaskBuilder<void> = {
priority: TaskPriority.Lower - 1,
tag: {
id: "rebuild-node-routes-wakeup",
nodeId: node.id,
},
task:
async function* rebuildSleepingNodeRoutesTask() {
// Pause the task until the node wakes up
yield () => node.waitForWakeup();
yield* doRebuildRoutes(node.id);
},
};
return self.driver.scheduler.queueTask(
sleepingNodeTask,
).filter((node) => node != undefined);

const wakeupPromises = new Map(
sleepingNodes.map((node) =>
[
node.id,
node.waitForWakeup().then(() => node),
] as const
),
);

// As long as there are sleeping nodes that haven't had their routes rebuilt yet,
// wait for any of them to wake up
while (wakeupPromises.size > 0) {
const wakeUpPromise = Promise.race(
wakeupPromises.values(),
);
const wokenUpNode = (
yield () => wakeUpPromise
) as Awaited<typeof wakeUpPromise>;
if (wokenUpNode.status === NodeStatus.Asleep) {
// The node has gone to sleep again since the promise was resolved. Wait again
wakeupPromises.set(
wokenUpNode.id,
wokenUpNode.waitForWakeup().then(() =>
wokenUpNode
),
);
});
// Pause until all sleeping nodes have been processed
yield () => Promise.all(tasks);
continue;
}
// Once the node has woken up, remove it from the list and rebuild its routes
wakeupPromises.delete(wokenUpNode.id);
yield* doRebuildRoutes(wokenUpNode.id);
}
}

self.driver.controllerLog.print(
Expand Down Expand Up @@ -4857,36 +4874,40 @@ export class ZWaveController
private rebuildNodeRoutesInternal(
nodeId: number,
): Promise<boolean> {
// Don't start the process twice
const existingTask = this.driver.scheduler.findTask<boolean>((t) =>
t.tag?.id === "rebuild-node-routes" && t.tag.nodeId === nodeId
);
if (existingTask) return existingTask;

const node = this.nodes.getOrThrow(nodeId);
return this.driver.scheduler.queueTask(
this.getRebuildNodeRoutesTask(node),
);
const task = this.getRebuildNodeRoutesTask(node);
if (task instanceof Promise) return task;

return this.driver.scheduler.queueTask(task);
}

private getRebuildNodeRoutesTask(
node: ZWaveNode,
): TaskBuilder<boolean> {
let keepAwake: boolean;
): Promise<boolean> | TaskBuilder<boolean> {
// This task should only run once at a time
const existingTask = this.driver.scheduler.findTask<boolean>((t) =>
t.tag?.id === "rebuild-node-routes" && t.tag.nodeId === node.id
);
if (existingTask) return existingTask;

const self = this;
let keepAwake: boolean;

return {
// This task is executed by users and by the network-wide route rebuilding process.
// Since it can possibly be spawned by a "wait for wakeup" task aswell, we need to
// increment the priority by 2 here to avoid blocking.
priority: TaskPriority.Lower - 2,
priority: TaskPriority.Lower,
tag: { id: "rebuild-node-routes", nodeId: node.id },
task: async function* rebuildNodeRoutesTask() {
// Keep battery powered nodes awake during the process
keepAwake = node.keepAwake;
node.keepAwake = true;

if (
node.canSleep && node.supportsCC(CommandClasses["Wake Up"])
) {
yield () => node.waitForWakeup();
}

self.driver.controllerLog.logNode(node.id, {
message: `Rebuilding routes...`,
direction: "none",
Expand Down
3 changes: 1 addition & 2 deletions packages/zwave-js/src/lib/controller/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,5 @@ export function sdkVersionLte(
/** Checks if a task belongs to a route rebuilding process */
export function isRebuildRoutesTask(t: Task<unknown>): boolean {
return t.tag?.id === "rebuild-routes"
|| t.tag?.id === "rebuild-node-routes"
|| t.tag?.id === "rebuild-node-routes-wakeup";
|| t.tag?.id === "rebuild-node-routes";
}
96 changes: 76 additions & 20 deletions packages/zwave-js/src/lib/driver/Task.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type TaskBuilder,
TaskInterruptBehavior,
TaskPriority,
type TaskReturnType,
TaskScheduler,
} from "./Task";

Expand Down Expand Up @@ -686,8 +687,7 @@ test("Tasks can yield-queue higher-priority tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("outer");
},
});
Expand Down Expand Up @@ -715,8 +715,7 @@ test("Tasks can yield-queue same-priority tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("outer");
},
});
Expand Down Expand Up @@ -744,15 +743,71 @@ test.failing("Tasks cannot yield-queue lower-priority tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("outer");
},
});

await outer;
});

test("Yielding tasks multiple levels deep works", async (t) => {
const scheduler = new TaskScheduler();
scheduler.start();

const order: string[] = [];
const yieldedPromise = createDeferredPromise<void>();

const innerinnerBuilder: TaskBuilder<void> = {
name: "innerinner",
priority: TaskPriority.Normal,
task: async function*() {
yield;
order.push("innerinner1");
yield () => yieldedPromise;
order.push("innerinner2");
},
};

const innerBuilder: TaskBuilder<void> = {
name: "inner",
priority: TaskPriority.Normal,
task: async function*() {
yield;
order.push("inner1");
yield innerinnerBuilder;
order.push("inner2");
},
};

const outer = scheduler.queueTask({
name: "outer",
priority: TaskPriority.Normal,
task: async function*() {
order.push("outer1");
yield innerBuilder;
order.push("outer2");
},
});

// Wait long enough that the task is definitely waiting for the promise
await wait(10);
t.deepEqual(order, ["outer1", "inner1", "innerinner1"]);

// Run to completion
yieldedPromise.resolve();
await outer;

t.deepEqual(order, [
"outer1",
"inner1",
"innerinner1",
"innerinner2",
"inner2",
"outer2",
]);
});

test("Tasks receive the result of yielded tasks", async (t) => {
const scheduler = new TaskScheduler();
scheduler.start();
Expand All @@ -769,8 +824,9 @@ test("Tasks receive the result of yielded tasks", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
const result = (yield () => inner) as Awaited<typeof inner>;
const result = (yield innerBuilder) as TaskReturnType<
typeof innerBuilder
>;
return result;
},
});
Expand Down Expand Up @@ -803,11 +859,13 @@ test("Tasks receive the result of yielded tasks, part 2", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner1 = scheduler.queueTask(inner1Builder);
const result1 = (yield () => inner1) as Awaited<typeof inner1>;
const result1 = (yield inner1Builder) as TaskReturnType<
typeof inner1Builder
>;
const result2 = (yield) as any;
const inner3 = scheduler.queueTask(inner3Builder);
const result3 = (yield () => inner3) as Awaited<typeof inner3>;
const result3 = (yield inner3Builder) as TaskReturnType<
typeof inner3Builder
>;
return result1 + (result2 ?? "") + result3;
},
});
Expand All @@ -830,12 +888,11 @@ test("Tasks receive the result of yielded tasks, part 3", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
try {
const ret = (yield () => inner) as any;
return ret;
yield innerBuilder;
throw new Error("This should not happen");
} catch (e) {
return e;
return e as Error;
}
},
});
Expand Down Expand Up @@ -1186,8 +1243,7 @@ test("Canceling nested tasks works", async (t) => {
priority: TaskPriority.Normal,
task: async function*() {
order.push("1a");
const inner = scheduler.queueTask(innerBuilder);
yield () => inner;
yield innerBuilder;
order.push("1b");
},
}).catch(noop);
Expand Down Expand Up @@ -1219,9 +1275,8 @@ test("Canceling nested tasks works, part 2", async (t) => {
const outer = scheduler.queueTask({
priority: TaskPriority.Normal,
task: async function*() {
const inner = scheduler.queueTask(innerBuilder);
try {
yield () => inner;
yield innerBuilder;
} catch (e) {
return "canceled";
}
Expand All @@ -1232,6 +1287,7 @@ test("Canceling nested tasks works, part 2", async (t) => {
await wait(10);

// Cancel all tasks
// FIXME: Restore parent tasks when removing nested tasks
await scheduler.removeTasks((t) => t.name === "inner");

t.is(await outer, "canceled");
Expand Down
Loading

0 comments on commit d1ddbaf

Please sign in to comment.