Skip to main content
Version: FireSquid

Filesystem store

Overview

The Squid SDK provides append-only Store interface implementations for saving the data generated by squids to files. The store is primarily designed for offline analytics. It supports CSV and parquet files, persisted either locally or to a s3 storage. See the list of Packages below.

The file-based stores provide the same failover guarantees as Postgres-based stores, as the stores rollbacks to the last successful state after a restart or shutdown. The stores always partition the produced data set along the "block height" dimension, even when the block height is not explicitly defined in the data schema. The number of blocks in a partition varies, as the partition files are written either when the buffer reaches a specified byte size or when the data is old enough. Thus, depending on the use case, one may choose a suitable trade off between the lag of the indexed data and the number of partitioned files produced by the store. See Filesystem Syncs and Dataset Partitioning for details and configuration options.

The core component used for storing squid data to files is the Database class from @subsquid/file-store. Using it is as simple as constructing an instance and passing it to processor.run() as the first argument. This results in ctx.store exposing table writers that can be used to store rows of data.

Example

Save all ERC20 Transfer events retrieved by EVM processor in transfers.csv files:

import {EvmBatchProcessor} from '@subsquid/evm-processor'
import * as erc20abi from './abi/erc20'
import {Database, LocalDest} from '@subsquid/file-store'
import {Column, Table, Types} from '@subsquid/file-store-csv'

const processor = /* processor definition */

const dbOptions = {
tables: {
TransfersTable: new Table('transfers.csv', {
from: Column(Types.String()),
to: Column(Types.String()),
value: Column(Types.Numeric())
})
},
dest: new LocalDest('./data'),
chunkSizeMb: 10
}

processor.run(new Database(dbOptions), async (ctx) => {
for (let c of ctx.blocks) {
for (let i of c.items) {
if (i.kind==='evmLog') {
let { from, to, value } =
erc20abi.events.Transfer.decode(i.evmLog);
ctx.store.TransfersTable.write({
from,
to,
value: value.toBigInt()
});
}
}
}
})

The resulting ./data folder may look like this:

./data/
├── 0000000000-0007688959
│   └── transfers.csv
├── 0007688960-0007861589
│   └── transfers.csv
...
├── 0016753040-0016762029
│   └── transfers.csv
└── status.txt

Each of the folders here contains a little over 10 MBytes of data. status.txt will contain the height of the last indexed block, 16762029 in this case.

Packages

@subsquid/file-store is the core package that contains the implementation of Database for filesystems. At least one file format addon must be installed alongside it. Available formats:

Data in either of these formats can be written to

  • A local filesystem: Supported by @subsquid/file-store out of the box.
  • A bucket in an Amazon S3-compatible cloud: Supported via @subsquid/file-store-s3.

Database Options

Constructor of the Database class from file-store accepts a configuration object as its only argument. Its format is as follows:

DatabaseOptions {
tables: Record<string, Table>
dest: Dest
chunkSizeMb?: number
syncIntervalBlocks?: number
hooks?: DatabaseHooks<Dest>
}

Here,

  • Table is an interface for classes that make table writers, objects that convert in-memory tabular data into format-specific file contents. An implementation of Table is available for every file format supported by file-store. Consult pages about specific output formats to find out how to define Tables.
  • tables is a mapping from developer-defined string handles to Table instances. A table writer will be created for each Table in this mapping. It will be exposed at ctx.store.<tableHandle>.
  • dest is an instance of Dest, an interface for classes that take the properly formatted file contents and write them onto a particular filesystem. An implementation of Dest is available for every filesystem supported by file-store. For local filesystems use the LocalDest class from the @subsquid/file-store package and supply new LocalDest(outputDirectoryName) here. For other targets consult documentation pages specific to your filesystem choice.
  • chunkSizeMb, syncIntervalBlocks and hooks are optional parameters that tune the behavior of the dataset partitioning algorithm.

Table Writer Interface

For each Table supplied via the tables field of the constructor argument, Database adds a table writer property to ctx.store. The writer is exposed at ctx.store.<tableHandle>, where <tableHandle> is the key of the Table instance in the tables mapping. It has the following methods:

ctx.store.<tableHandle>.write(record: T)
ctx.store.<tableHandle>.writeMany(records: T[])

Here, T is a Table-specific data row type. See the documentation pages on support for specific file formats for details.

These synchronous methods add rows of data to an in-memory buffer and perform no actual filesystem writes. Instead, the write happens automatically when a new dataset partition is created. The methods return the table writer instance and can be chained.

Filesystem Syncs and Dataset Partitioning

As the indexing proceeds, the processor continues to append new data to the in-memory buffer of Database. At the end of each batch the tool decides whether to write a new dataset partition. A partition is written when either

  • the amount of data stored in the buffer exceeds chunkSizeMb (20 MBytes by default) across all tables, or
  • (1) syncIntervalBlocks is finite (default - infinite), (2) the blockchain head is reached and (3) there is at least one row of data in the buffer, or
  • (1) syncIntervalBlocks is finite, (2) at least syncIntervalBlocks blocks has been processed since the last write and (3) there is at least one row of data in the buffer.

This approach keeps the number of files at a reasonable level while ensuring that the dataset is kept up-to-date, and it does that for both high and low rates of processor data:

  • For high, stable data rates (e.g. events from the USDC contract), it suffices to keep syncIntervalBlocks at the default value (infinity). Once the processor catches up with the blockchain, it will keep producing partitions of roughly the same size at block intervals much smaller than the blockchain size, thus keeping the dataset updated. The balance between the dataset latency and the number of files is tuned with the chunkSizeMb parameter.
  • For low data rates (e.g. LiquidationCall events from the AAVE V2 LendingPool - emitted once every few hours) syncIntervalBlocks allows to govern the dataset latency directly. Once the processor reaches the blockhain head, it keeps the dataset up to date to within syncIntervalBlocks blocks behind the chain. Absence of new partitions beyond that range has a clear interpretation: no new data is available.
warning

If syncIntervalBlocks is kept at the default value and the data rate is low, the resulting dataset may end up never catching up to the chain in a practical sense. Indeed, in extreme cases the processor may end up writing no data at all. For example, all the aforementioned LiquidationCall events fit into a CSV file less than 20 Mbytes in size. With the default values of syncIntervalBlocks and chunkSizeMb the buffer size never reaches the threshold value and no data is written to the filesystem.

By default, Database maintains a record of the syncing progress in the status.txt file. When the processor with a Database instance starts, it calls the onConnect() function that reads the highest reached block from status.txt on the target filesystem. If the file does not exist, the function returns -1. The syncing resumes starting at the next block (zeroth or genesis block if status.txt does not exist).

Syncing status record is updated every time a new partition is written to the dataset: the processor calls onFlush() which overwrites status.txt with the new highest reached block.

As a result, the integrity of the data set is guaranteed given the blockchain history up to the point recorded in status.txt.

The functions onConnect() and onFlush() can be overridden using the hooks constructor argument field. To do that, set that field to

DatabaseHooks<Dest> {
onConnect(dest: Dest): Promise<number>
onFlush(dest: Dest, range: {from: number, to: number}, isHead: boolean): Promise<void>
}

Parameters:

  • dest: the Dest object used by Database. Use it to access the filesystem.
  • range: the range of blocks covered by the current dataset partition.
  • isHead: true if the blockchain head has been reached.

An example of using hooks can be found here.