Implement WorkerPool, use webkitdirectory input

WorkerPool speeds up processing of plugins by using more cores on separate threads. It terminates and recreates a worker after it finishes a task since web assembly seems to slow down a ton when a worker is reused (maybe memory becomes to large / unfragmented, idk).

I switched to the deprecated input tag with webkitdirectory since it is the only way to load a folder under the program files folder. Drag and drop might be another alternative, but I'd rather have a picker.
This commit is contained in:
Tyler Hallada 2022-03-04 23:49:14 -05:00
parent 3a34f48e6c
commit eebae85a65
6 changed files with 143 additions and 79 deletions

View File

@ -1,14 +1,8 @@
import React, { useContext, useEffect, useRef } from "react"; import React, { useContext } from "react";
import { WorkersContext } from "../pages/index"; import { WorkerPoolContext } from "../pages/index";
import { useAppSelector, useAppDispatch } from "../lib/hooks"; import { useAppDispatch } from "../lib/hooks";
import { import { clearPlugins, setPending } from "../slices/plugins";
addPluginInOrder,
clearPlugins,
setPending,
decrementPending,
PluginFile,
} from "../slices/plugins";
import styles from "../styles/DataDirPicker.module.css"; import styles from "../styles/DataDirPicker.module.css";
export const excludedPlugins = [ export const excludedPlugins = [
@ -22,49 +16,39 @@ export const excludedPlugins = [
type Props = {}; type Props = {};
const DataDirPicker: React.FC<Props> = () => { const DataDirPicker: React.FC<Props> = () => {
const workers = useContext(WorkersContext); const workerPool = useContext(WorkerPoolContext);
const dispatch = useAppDispatch(); const dispatch = useAppDispatch();
const plugins = useAppSelector((state) => state.plugins.plugins);
const onDataDirButtonClick = async () => { const onDataDirButtonClick = async (event: {
if (workers.length === 0) { target: { files: FileList | null };
return alert("Worker not loaded yet"); }) => {
if (!workerPool) {
return alert("Workers not loaded yet");
} }
const dirHandle = await ( const files = event.target.files ?? [];
window as Window & typeof globalThis & { showDirectoryPicker: () => any }
).showDirectoryPicker();
dispatch(clearPlugins()); dispatch(clearPlugins());
const values = dirHandle.values();
const plugins = []; const plugins = [];
while (true) { for (let i = 0; i < files.length; i++) {
const next = await values.next(); const file = files[i];
if (next.done) {
break;
}
if ( if (
next.value.kind == "file" && file.name.endsWith(".esp") ||
(next.value.name.endsWith(".esp") || file.name.endsWith(".esm") ||
next.value.name.endsWith(".esm") || file.name.endsWith(".esl")
next.value.name.endsWith(".esl"))
) { ) {
plugins.push(next.value); plugins.push(file);
} }
} }
dispatch(setPending(plugins.length)); dispatch(setPending(plugins.length));
plugins.forEach(async (plugin, index) => { plugins.forEach(async (plugin, index) => {
const file = await plugin.getFile(); const contents = new Uint8Array(await plugin.arrayBuffer());
const contents = new Uint8Array(await file.arrayBuffer());
try { try {
workers[index % workers.length].postMessage( workerPool.pushTask({
{ skipParsing: excludedPlugins.includes(plugin.name),
skipParsing: excludedPlugins.includes(plugin.name), filename: plugin.name,
filename: plugin.name, lastModified: plugin.lastModified,
lastModified: file.lastModified, contents,
contents, });
},
[contents.buffer]
);
} catch (error) { } catch (error) {
console.error(error); console.error(error);
} }
@ -77,9 +61,12 @@ const DataDirPicker: React.FC<Props> = () => {
To see all of the cell edits and conflicts for your current mod load To see all of the cell edits and conflicts for your current mod load
order select your <code>Data</code> directory below to load the plugins. order select your <code>Data</code> directory below to load the plugins.
</p> </p>
<button onClick={onDataDirButtonClick} disabled={workers.length === 0}> <input
{plugins.length === 0 ? "Open" : "Reload"} Skyrim Data directory type="file"
</button> webkitdirectory=""
onChange={onDataDirButtonClick}
disabled={!workerPool}
/>
</> </>
); );
}; };

81
lib/WorkerPool.ts Normal file
View File

@ -0,0 +1,81 @@
import {
addPluginInOrder,
decrementPending,
PluginFile,
} from "../slices/plugins";
import store from "./store";
import { default as Worker } from "worker-loader?filename=static/[fullhash].worker.js!../workers/PluginsLoader.worker";
export interface Task {
skipParsing: boolean,
filename: string,
lastModified: number,
contents: Uint8Array,
}
export class WorkerPool {
public taskQueue: Task[];
public availableWorkers: Worker[];
public constructor() {
this.taskQueue = [];
this.availableWorkers = [];
}
public async init(count: number = 8): Promise<WorkerPool> {
this.availableWorkers = [];
for (let i = 0; i < count; i++) {
this.availableWorkers.push(await this.createWorker());
}
return this;
}
public async addWorker() {
const worker = await this.createWorker();
this.availableWorkers.push(worker);
this.assignWorker();
}
public async createWorker(): Promise<Worker> {
return new Promise((resolve) => {
const worker = new Worker();
worker.onmessage = (evt: {
data: string | PluginFile & { timeHashEnd: number };
}) => {
const { data } = evt;
if (typeof data === "string" && data === "ready") {
resolve(worker);
} else if (typeof data !== "string") {
store.dispatch(decrementPending(1));
store.dispatch(addPluginInOrder(data));
// Since web assembly memory cannot be shrunk, replace worker with a fresh one to avoid slow repeated
// invocations on the same worker instance. Repeated invocations are so slow that the delay in creating a
// new worker is worth it. In practice, there are usually more workers than tasks, so the delay does not slow
// down processing.
worker.terminate();
this.addWorker();
}
};
});
}
public pushTask(task: Task) {
this.taskQueue.push(task);
this.assignWorker();
}
public async assignWorker() {
if (this.taskQueue.length > 0 && this.availableWorkers.length > 0) {
const task = this.taskQueue.shift()!;
const worker = this.availableWorkers.shift()!;
worker.postMessage(task, [task.contents.buffer]);
}
}
public async terminateAll() {
for (const worker of this.availableWorkers) {
worker.terminate();
}
this.availableWorkers = [];
}
}

View File

@ -1,52 +1,38 @@
import { createContext, useEffect, useRef, useState } from "react"; import { createContext, useEffect, useCallback, useState } from "react";
import type { NextPage } from "next"; import type { NextPage } from "next";
import Head from "next/head"; import Head from "next/head";
import "mapbox-gl/dist/mapbox-gl.css"; import "mapbox-gl/dist/mapbox-gl.css";
import Map from "../components/Map"; import Map from "../components/Map";
import { useAppDispatch, useAppSelector } from "../lib/hooks"; import { useAppDispatch } from "../lib/hooks";
import {
addPluginInOrder,
decrementPending,
PluginFile,
} from "../slices/plugins";
import { setPluginsTxtAndApplyLoadOrder } from "../slices/pluginsTxt"; import { setPluginsTxtAndApplyLoadOrder } from "../slices/pluginsTxt";
import { WorkerPool } from "../lib/WorkerPool";
export const WorkersContext = createContext<Worker[]>([]); export const WorkerPoolContext = createContext<WorkerPool | null>(null);
const Home: NextPage = () => { const Home: NextPage = () => {
const [workers, setWorkers] = useState<Worker[]>([]); const [workerPool, setWorkerPool] = useState<WorkerPool | null>(null);
const pluginsPending = useAppSelector((state) => state.plugins.pending);
const dispatch = useAppDispatch(); const dispatch = useAppDispatch();
const createWorkerPool = useCallback(async () => {
setWorkerPool(
await new WorkerPool().init(window.navigator.hardwareConcurrency)
);
}, []);
useEffect(() => { useEffect(() => {
async function loadWorkers(count: number) {
const { default: Worker } = await import(
"worker-loader?filename=static/[fullhash].worker.js!../workers/PluginsLoader.worker"
);
const newWorkers = [];
for (let i = 0; i < count; i++) {
const worker = new Worker();
worker.onmessage = (evt: { data: PluginFile }) => {
const { data } = evt;
dispatch(decrementPending(1));
dispatch(addPluginInOrder(data));
};
newWorkers.push(worker);
}
setWorkers(newWorkers);
}
loadWorkers(window.navigator.hardwareConcurrency ?? 8);
return () => { return () => {
if (workers) { if (workerPool) {
for (const worker of workers) { workerPool.terminateAll();
worker.terminate();
}
} }
}; };
// eslint-disable-next-line react-hooks/exhaustive-deps // eslint-disable-next-line react-hooks/exhaustive-deps
}, [dispatch]); }, [dispatch]);
useEffect(() => {
createWorkerPool();
}, [createWorkerPool]);
return ( return (
<> <>
<Head> <Head>
@ -95,7 +81,7 @@ const Home: NextPage = () => {
<meta name="twitter:site" content="@tyhallada" /> <meta name="twitter:site" content="@tyhallada" />
<meta name="twitter:creator" content="@tyhallada" /> <meta name="twitter:creator" content="@tyhallada" />
</Head> </Head>
<WorkersContext.Provider value={workers}> <WorkerPoolContext.Provider value={workerPool}>
<div <div
style={{ style={{
margin: 0, margin: 0,
@ -119,7 +105,7 @@ const Home: NextPage = () => {
> >
<Map /> <Map />
</div> </div>
</WorkersContext.Provider> </WorkerPoolContext.Provider>
</> </>
); );
}; };

View File

@ -15,6 +15,6 @@
"jsx": "preserve", "jsx": "preserve",
"incremental": true "incremental": true
}, },
"include": ["next-env.d.ts", "**/*.ts", "**/*.tsx"], "include": ["next-env.d.ts", "webkitdirectory.d.ts", "**/*.ts", "**/*.tsx"],
"exclude": ["node_modules"] "exclude": ["node_modules"]
} }

8
webkitdirectory.d.ts vendored Normal file
View File

@ -0,0 +1,8 @@
import { AriaAttributes, DOMAttributes } from "react";
declare module 'react' {
interface HTMLAttributes<T> extends AriaAttributes, DOMAttributes<T> {
// extends React's HTMLAttributes
webkitdirectory?: boolean | string;
}
}

View File

@ -1,6 +1,8 @@
import { hash_plugin, parse_plugin } from "skyrim-cell-dump-wasm"; import { hash_plugin, parse_plugin } from "skyrim-cell-dump-wasm";
self.addEventListener('message', async (event: MessageEvent<{ skipParsing?: boolean; filename: string; lastModified: number; contents: Uint8Array}>) => { self.postMessage("ready");
self.addEventListener('message', async (event: MessageEvent<{ skipParsing?: boolean; filename: string; lastModified: number; contents: Uint8Array }>) => {
const { skipParsing, filename, lastModified, contents } = event.data; const { skipParsing, filename, lastModified, contents } = event.data;
let parsed = undefined; let parsed = undefined;
let parseError = undefined; let parseError = undefined;
@ -17,7 +19,7 @@ self.addEventListener('message', async (event: MessageEvent<{ skipParsing?: bool
} }
} }
const hash = hash_plugin(contents).toString(36); const hash = hash_plugin(contents).toString(36);
self.postMessage({ filename, lastModified, parsed, hash, parseError, enabled: parsed && !parseError }); self.postMessage({ filename, lastModified, parsed, hash, parseError, enabled: parsed && !parseError, timeHashEnd: Date.now() });
} catch (error) { } catch (error) {
console.error(error); console.error(error);
self.postMessage(error); self.postMessage(error);