SubstrateBatchProcessor
in action
An end-to-end idiomatic usage of SubstrateBatchProcessor
can be inspected in the squid-substrate-template and also learned from more elaborate examples.
Here we highlight the key steps and put together a configuration and a data handling definition to illustrate the concepts covered so far.
1. Configuration
See Configuration section for more details.
const processor = new SubstrateBatchProcessor()
// set the data source
.setDataSource({
archive: lookupArchive("kusama")
})
// set subsciptions and define the data
// to be fetched for each log item
.addEvent('Balances.Transfer', {
data: {
event: {
args: true,
extrinsic: {
hash: true,
fee: true
}
}
}
})
.addCall('Balances.transfer_keep_alive', {
data: {
call: {
success: true,
args: true,
extrinsic: {
args: true
}
},
}
})
2. Define a custom data facade and extract normalized data from BatchContext
The following code snippet illustrates how the data is extracted and normalized into some user-specific facade interface TransferData
. Note how the array of Item
is processed based on the kind and the name of the item and that the resulting normalized data respects the canonical ordering of the ctx.blocks
and ctx.blocks.items
within each block.
Type-safe access and decoding of the call and event data is done with the help of the classes generated by a suitable typegen tool.
import {
BatchContext,
BatchProcessorItem,
SubstrateBatchProcessor
} from "@subsquid/substrate-processor"
import {Store, TypeormDatabase} from "@subsquid/typeorm-store"
// ... processor configuration from the previous step
type Item = BatchProcessorItem<typeof processor>
type Ctx = BatchContext<Store, Item>
// some normalized user-define data
interface TransferData {
id: string
timestamp: Date
from: string
to: string
amount: bigint
fee?: bigint
}
// extract and normalize
function getTransfers(ctx: Ctx): TransferData[] {
let transfers: TransferData[] = []
for (let block of ctx.blocks) {
for (let item of block.items) {
if (item.kind === "event" &&
item.name === "Balances.Transfer") {
// instantiate the classes generated by typegen
// to decode the item data
let e = new BalancesTransferEvent(ctx, item.event)
let rec: {from: Uint8Array, to: Uint8Array, amount: bigint}
// normalize the historical versions of the data
// using the version getters
if (e.isV1020 || e.isV1050) {
let [from, to, amount,] = e.asV1020
rec = {from, to, amount}
} else {
rec = e.asV9130
}
// extract and normalize the `item.event` data
transfers.push({
id: item.event.id,
timestamp: new Date(block.header.timestamp),
extrinsicHash: item.event.extrinsic?.hash,
from: ss58.codec('kusama').encode(rec.from),
to: ss58.codec('kusama').encode(rec.to),
amount: rec.amount,
fee: item.event.extrinsic?.fee || 0n
})
}
if (item.kind === "call" &&
item.name === "Balances.transfer_keep_alive") {
// extranct and normalize the `item.call` data
transfer.push({
/* some data manipulation on the `item.call` data */
})
}
}
}
return transfers
}
3. Run the processor and store the transformed data into the target database
The snippet below assumes that we are using TypeormDatabase
and that the database schema together with the model entity types has already been prepared. Consult [/basics/schema-file] for more details.
import {Store, TypeormDatabase} from "@subsquid/typeorm-store"
// an entity type generated from the schema file
import {Transfer} from "./model"
// ... processor configuration
// ... `getTransfers(Ctx)` definition
processor.run(new TypeormDatabase(), async ctx => {
// get the normalized data from the context
let transfersData = getTransfers(ctx)
// an array of entity class instances
let transfers: Transfer[] = []
for (let t of transfersData) {
let {id, blockNumber, timestamp, extrinsicHash, amount, fee} = t
// join some extra data
let from = getAccount(accounts, t.from)
let to = getAccount(accounts, t.to)
// populate entity class instances
// with the extracted data
transfers.push(new Transfer({
id,
blockNumber,
timestamp,
extrinsicHash,
from,
to,
amount,
fee
}))
}
// persist an array of entities
// in a single statement
await ctx.store.insert(transfers)
})