Processor
The SubstrateProcessor is the main actor in transforming and loading on-chain data, according to pre-defined database model

Overview

In the Architecture explanation, the relationship between the Squid Archive and Squid query node was clarified. It has also been mentioned that raw chain data is decoded to be readily available for consumption.
The Substrate section explained what decoded data is, what it consists of, and what information it brings.
Next, the Typegen section explained how automated tools provide a way to conveniently wrap these entities with TypeScript objects.
The real Squid developer experience starts with defining one's own data schema, modeling Entities that you want to keep tabs on, and tracking how on-chain information affects them.

Entities and Schema definition

The definition of a schema, and specifically knowing what Entities to identify in it, requires a level of domain knowledge that is beyond the scope of this page. Refer to the related Recipe for operational guidance, but in this context, we will take the Squid template as an example.
In the template, the Account and HistoricalBalance have been defined in the schema.graphql, and two TypeScript models have been automatically generated for them. These can be found in two files in src/model/generated/.
Although not central to the description of the Processor, this is important because these Entities are the ones being impacted by the code defined in the Processor itself. Most importantly, these Entities will be saved and persisted in the database and made available to API clients, via the GraphQL server.

Processor customization

The Processor customization starts with the processor.ts file, this is where a SubstrateProcessor is instantiated and configured.
It's worth noting that the Account and HistoricalBalance classes mentioned in the previous section are imported at the top of the file.
1
import * as ss58 from "@subsquid/ss58"
2
import {EventHandlerContext, Store, SubstrateProcessor} from "@subsquid/substrate-processor"
3
import {Account, HistoricalBalance} from "./model"
4
import {BalancesTransferEvent} from "./types/events"
5
6
7
const processor = new SubstrateProcessor('kusama_balances')
8
9
10
processor.setTypesBundle('kusama')
11
processor.setBatchSize(500)
12
13
14
processor.setDataSource({
15
archive: 'https://kusama.indexer.gc.subsquid.io/v4/graphql',
16
chain: 'wss://kusama-rpc.polkadot.io'
17
})
Copied!
The SubstrateProcessor class accomplishes a few tasks:
  • setup and start a monitoring system
  • connect to the database (using environment variables for connection info)
  • start a loop that processes all incoming blocks from the data source in batches
    • upon processing a batch, for each block, all relevant hooks, event handlers, and extrinsic handlers are triggered
What's more, the class exposes various methods to attach custom functions as pre and post-block hooks (these can be compared to how middleware process requests in a webserver), event handlers, and extrinsic handlers, which, as mentioned, are going to be triggered, when necessary. Here's an example:
1
processor.addEventHandler('balances.Transfer', async ctx => {
2
let transfer = getTransferEvent(ctx)
3
let tip = ctx.extrinsic?.tip || 0n
4
let from = ss58.codec('kusama').encode(transfer.from)
5
let to = ss58.codec('kusama').encode(transfer.to)
6
7
let fromAcc = await getOrCreate(ctx.store, Account, from)
8
fromAcc.balance = fromAcc.balance || 0n
9
fromAcc.balance -= transfer.amount
10
fromAcc.balance -= tip
11
await ctx.store.save(fromAcc)
12
13
const toAcc = await getOrCreate(ctx.store, Account, to)
14
toAcc.balance = toAcc.balance || 0n
15
toAcc.balance += transfer.amount
16
await ctx.store.save(toAcc)
17
18
await ctx.store.save(new HistoricalBalance({
19
id: ctx.event.id + '-to',
20
account: fromAcc,
21
balance: fromAcc.balance,
22
date: new Date(ctx.block.timestamp)
23
}))
24
25
await ctx.store.save(new HistoricalBalance({
26
id: ctx.event.id + '-from',
27
account: toAcc,
28
balance: toAcc.balance,
29
date: new Date(ctx.block.timestamp)
30
}))
31
})
32
33
34
processor.run()
35
36
37
interface TransferEvent {
38
from: Uint8Array
39
to: Uint8Array
40
amount: bigint
41
}
42
43
44
function getTransferEvent(ctx: EventHandlerContext): TransferEvent {
45
let event = new BalancesTransferEvent(ctx)
46
if (event.isV1020) {
47
let [from, to, amount] = event.asV1020
48
return {from, to, amount}
49
} else if (event.isV1050) {
50
let [from, to, amount] = event.asV1050
51
return {from, to, amount}
52
} else {
53
return event.asLatest
54
}
55
}
Copied!
This code attaches an asynchronous function to the processor, that, similarly to a pub-sub system, gets triggered when the 'balances.Transfer' event is encountered.
The business logic itself is not relevant for the scope of this page, what's worth noting is that ctx is the BlockHandlerContext, which stores not only information about the block itself, but the Store, which in this case is the database, so when the following line is executed, the Account Entity is created or updated with the relevant information.
1
await ctx.store.save(fromAcc)
Copied!
The logic in the getTransferEvent and how it is tied to the BalancesTransferEvent wrapper for an event has been described in the previous section but has been reported here because the added context might further clarify it.
Last modified 2mo ago