Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle kill signal #123

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/blockstore/fs.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import fs from 'fs'
import os from 'os'

import { CID } from 'multiformats'
import { BaseBlockstore } from 'blockstore-core'
import { Blockstore } from './index'
import * as os from "os";

export class FsBlockStore extends BaseBlockstore implements Blockstore {
path: string
_opened: boolean
_opening?: Promise<void>

constructor () {
constructor (path?: string) {
super()
this.path = `${os.tmpdir()}/${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
this.path = path ? path : `${os.tmpdir()}/${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
this._opened = false
}

Expand Down
9 changes: 5 additions & 4 deletions src/pack/fs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import fs from 'fs'
import os from 'os'
import path from 'path'
import moveFile from 'move-file'

Expand All @@ -13,8 +12,10 @@ export interface PackToFsProperties extends PackProperties {
}

export async function packToFs ({ input, output, blockstore: userBlockstore, hasher, maxChunkSize, maxChildrenPerNode, wrapWithDirectory, rawLeaves }: PackToFsProperties) {
const blockstore = userBlockstore ? userBlockstore : new FsBlockStore()
const location = output || `${os.tmpdir()}/${(parseInt(String(Math.random() * 1e9), 10)).toString() + Date.now()}`
const realpath = path.basename(await fs.promises.realpath(input as string))
const inputBasename = realpath === "/" ? "file" : realpath
const blockstore = userBlockstore ? userBlockstore : new FsBlockStore(`/tmp/${inputBasename}.tmp.${process.pid}`)
const location = output || `${process.cwd()}/.${inputBasename}.car.tmp.${process.pid}`
const writable = fs.createWriteStream(location)

const { root } = await packToStream({
Expand All @@ -35,7 +36,7 @@ export async function packToFs ({ input, output, blockstore: userBlockstore, has
// Move to work dir
if (!output) {
const basename = typeof input === 'string' ? path.parse(path.basename(input)).name : root.toString()
const filename = `${basename}.car`
const filename = basename === "/" ? "file.car" : `${basename}.car`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file.car will be problematic for multiple packs in parallel as they will use same filename. Can we generate a random name like we used to do in blockstore ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file.car is meant to be used when someone wants to pack up to his root (so someone wouldn't be packing its entire root at the same time multiple times).
But otherwise it was meant to not use ..car when . is passed as the input directory but resolve the name of this directory instead.

Copy link
Contributor Author

@Breigner01 Breigner01 Jul 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the state of this discussion?
I believe that it shouldn't be an issue but LMK.

await moveFile(location, `${process.cwd()}/${filename}`)

return {root, filename}
Expand Down
57 changes: 41 additions & 16 deletions src/pack/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { MemoryBlockStore } from '../blockstore/memory'
import { unixfsImporterOptionsDefault } from './constants'

import type { PackProperties } from './index'
import {Blockstore} from "../blockstore";
import {FsBlockStore} from "../blockstore/fs";

export interface PackToStreamProperties extends PackProperties {
input: string | Iterable<string> | AsyncIterable<string>,
Expand All @@ -27,21 +29,32 @@ export async function packToStream ({ input, writable, blockstore: userBlockstor
}
input = typeof input === 'string' ? [input] : input

const blockstore = userBlockstore ? userBlockstore : new MemoryBlockStore()
if (userBlockstore) {
process.on("SIGINT", async (signal) => await handleSignal(signal, userBlockstore));
process.on("SIGTERM", async (signal) => await handleSignal(signal, userBlockstore));
}

// Consume the source
const rootEntry = await last(pipe(
legacyGlobSource(input),
source => normaliseInput(source),
(source: any) => importer(source, blockstore, {
...unixfsImporterOptionsDefault,
hasher: hasher || unixfsImporterOptionsDefault.hasher,
maxChunkSize: maxChunkSize || unixfsImporterOptionsDefault.maxChunkSize,
maxChildrenPerNode: maxChildrenPerNode || unixfsImporterOptionsDefault.maxChildrenPerNode,
wrapWithDirectory: wrapWithDirectory === false ? false : unixfsImporterOptionsDefault.wrapWithDirectory,
rawLeaves: rawLeaves == null ? unixfsImporterOptionsDefault.rawLeaves : rawLeaves
})
))
const blockstore = userBlockstore ? userBlockstore : new MemoryBlockStore()
let rootEntry

try {
// Consume the source
rootEntry = await last(pipe(
legacyGlobSource(input),
source => normaliseInput(source),
(source: any) => importer(source, blockstore, {
...unixfsImporterOptionsDefault,
hasher: hasher || unixfsImporterOptionsDefault.hasher,
maxChunkSize: maxChunkSize || unixfsImporterOptionsDefault.maxChunkSize,
maxChildrenPerNode: maxChildrenPerNode || unixfsImporterOptionsDefault.maxChildrenPerNode,
wrapWithDirectory: wrapWithDirectory === false ? false : unixfsImporterOptionsDefault.wrapWithDirectory,
rawLeaves: rawLeaves == null ? unixfsImporterOptionsDefault.rawLeaves : rawLeaves
})
))
} catch (err) {
// tslint:disable-next-line:no-console
console.log("Error while importing")
}

if (!rootEntry || !rootEntry.cid) {
throw new Error('given input could not be parsed correctly')
Expand All @@ -52,8 +65,13 @@ export async function packToStream ({ input, writable, blockstore: userBlockstor
const { writer, out } = await CarWriter.create([root])
Readable.from(out).pipe(writable)

for await (const block of blockstore.blocks()) {
await writer.put(block)
try {
for await (const block of blockstore.blocks()) {
await writer.put(block)
}
} catch (err) {
// tslint:disable-next-line:no-console
console.log("Error while writing blocks")
}

await writer.close()
Expand Down Expand Up @@ -86,3 +104,10 @@ async function * legacyGlobSource (input: Iterable<string> | AsyncIterable<strin
}
}
}

async function handleSignal(signal: NodeJS.Signals, blockstore: Blockstore) {
// tslint:disable-next-line:no-console
console.log(`Received ${signal} signal, cleaning up...`)
fs.rmSync((blockstore as FsBlockStore).path, {recursive: true, force: true})
fs.rmSync((blockstore as FsBlockStore).path.replace(".tmp", ".car.tmp").replace("/tmp/", `${process.cwd()}/.`), {recursive: true, force: true})
}
4 changes: 2 additions & 2 deletions src/unpack/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { FsBlockStore } from '../blockstore/fs'
import toIterable from 'stream-to-it'

import { unpack, unpackStream } from './index'
import { Blockstore } from '../blockstore/index'
import { Blockstore } from '../blockstore'

// Node only, read a car from fs, write files to fs
export async function unpackToFs ({input, roots, output}: {input: string, roots?: CID[], output?: string}) {
Expand All @@ -22,7 +22,7 @@ export async function unpackToFs ({input, roots, output}: {input: string, roots?

// Node only, read a stream, write files to fs
export async function unpackStreamToFs ({input, roots, output, blockstore: userBlockstore}: {input: AsyncIterable<Uint8Array>, roots?: CID[], output?: string, blockstore?: Blockstore}) {
const blockstore = userBlockstore ? userBlockstore : new FsBlockStore()
const blockstore = userBlockstore ? userBlockstore : new FsBlockStore(output ? output : (roots ? roots[0].toString() : "output"))
await writeFiles(unpackStream(input, { roots, blockstore }), output)
if (!userBlockstore) {
await blockstore.close()
Expand Down