Identity Resolution API: How to Resolve Entities Programmatically with Zingg

Engineering
April 23, 2026

Identity resolution is the process of recognizing that records across different systems represent the same real-world entity — the same customer, patient, supplier, or organization — and linking them together. Done well, it is the foundation that makes Customer 360, fraud detection, MDM, and reliable AI all possible.

This post is the practical implementation guide: how to use Zingg’s Python API to configure, train, and run identity resolution on your data, from installation through production-ready output. It covers the core API, then shows exactly how to adapt it for Databricks and Microsoft Fabric notebooks.

If you want the conceptual background first — why rules-based approaches fail at scale, what blocking is, why probabilistic matching beats exact matching for real-world data — start with The What and Why of Entity Resolution. For the strategic context on why identity resolution has become the prerequisite for agentic AI, the Learning from Data newsletter covers this from a founder’s perspective.

What the Python API Does

Zingg’s Python API lets you configure and execute identity resolution pipelines as PySpark programs. You define which fields to match and how, point Zingg at your data, and call the relevant phase. Zingg handles the ML model, the blocking that makes large-scale matching tractable, and the output format.

The full pipeline has five phases:

Phase What it does
findTrainingData Samples informative candidate pairs from your data for labeling
label Interactive labeler — you mark pairs as match, non-match, or unsure
train Builds the ML model from your labeled pairs
match Applies the model to your full dataset, outputs clusters with scores
runIncremental Processes new/updated records against existing clusters (Enterprise)

For a new model, run all five in sequence. Once the model is trained, only match (or runIncremental) runs in production.

Installation

python -m pip install zingg

Zingg also ships as a Docker image for the quickest start:

docker pull zingg/zingg:0.5.0
docker run -it zingg/zingg:0.5.0 bash

Because Zingg Python programs are PySpark programs, they execute via the Zingg CLI rather than directly with python:

./scripts/zingg.sh --run my_zingg_program.py

Full installation guide: docs.zingg.ai/latest/stepbystep/installation


Core Python API

Step 1: Define Your Fields and Data

Every Zingg program starts with an Arguments object. You use it to define which fields to match, where your data lives, and where to write output.

from zingg.client import *
from zingg.pipes import *

# Initialise arguments
args = Arguments()

# Define fields and how to match them
rec_id   = FieldDefinition("rec_id",   "string", MatchType.DONT_USE)
fname    = FieldDefinition("fname",    "string", MatchType.FUZZY)
lname    = FieldDefinition("lname",    "string", MatchType.FUZZY)
stNo     = FieldDefinition("stNo",     "string", MatchType.NUMERIC)
add1     = FieldDefinition("add1",     "string", MatchType.ONLY_ALPHABETS_FUZZY)
add2     = FieldDefinition("add2",     "string", MatchType.FUZZY)
city     = FieldDefinition("city",     "string", MatchType.FUZZY)
state    = FieldDefinition("state",    "string", MatchType.EXACT)
areacode = FieldDefinition("areacode", "string", MatchType.EXACT)
dob      = FieldDefinition("dob",      "string", MatchType.FUZZY)
ssn      = FieldDefinition("ssn",      "string", MatchType.FUZZY)

args.setFieldDefinition([rec_id, fname, lname, stNo, add1, add2,
                         city, state, areacode, dob, ssn])

# Set model location
args.setModelId("100")
args.setZinggDir("models")
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

# Input data
schema = "rec_id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string"
inputPipe = CsvPipe("customers", "data/customers.csv", schema)
args.setData(inputPipe)

# Output location
outputPipe = CsvPipe("output", "/tmp/output")
args.setOutput(outputPipe)

Choosing match types

The MatchType you assign to each field directly shapes what the model learns:

Match type Use for Example
FUZZY Names, freetext — handles typos, abbreviations, variations fname, lname, company_name
EXACT Categorical fields with no expected variation state_code, country
DONT_USE Fields needed in output but not for matching rec_id, source_system
EMAIL Email addresses — matches only the part before @ email
NUMERIC Street numbers, apartment numbers stNo, apt_number
ONLY_ALPHABETS_FUZZY Addresses — ignores numbers, fuzzy-matches street name add1 when stNo is separate
TEXT Descriptive fields — word overlap rather than character similarity product_description
NULL_OR_BLANK Sparse fields where nulls should be learned, not assumed to match Any high-null field

Full reference: docs.zingg.ai/latest/stepbystep/configuration/field-definitions

DONT_USE passes the field through to output without contributing to matching — the right choice for any ID column you need downstream but don’t want influencing the model.

Step 2: Build Training Data

Zingg uses active learning to select the most informative record pairs from your data. You do not build a training set manually — you label what Zingg shows you.

# Sample candidate pairs
options = ClientOptions([ClientOptions.PHASE, "findTrainingData"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

Then run the interactive labeler:

options = ClientOptions([ClientOptions.PHASE, "label"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

The labeler displays each pair. Mark it:

  • 1 — match (same real-world entity)
  • 0 — non-match (different entities)
  • 2 — unsure

Run findTrainingData and label in cycles. Each round adds cumulatively. Typically 30–40 labeled pairs are enough for a production-quality model; aim for 40+ matches and 40+ non-matches for datasets around 100k records.

Labeling documentation: docs.zingg.ai/latest/stepbystep/createtrainingdata

Step 3: Train the Model

options = ClientOptions([ClientOptions.PHASE, "train"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

The model saves to zinggDir. You only need to train once per schema — the same model handles ongoing production runs without retraining, unless you change field definitions or want to improve accuracy with more labeled pairs.

Step 4: Run Identity Resolution

options = ClientOptions([ClientOptions.PHASE, "match"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

Or combine training and matching in a single call:

options = ClientOptions([ClientOptions.PHASE, "trainMatch"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

Understanding the Output

The match output adds three columns to your original data:

Column Description
z_cluster Shared by all records representing the same entity
z_minScore Lowest match probability within the cluster
z_maxScore Highest match probability within the cluster

Matching is transitive — if A matches B and B matches C, all three land in the same cluster. Clusters where z_minScore is near 0 are worth manual review; clusters with size above 4–5 are worth inspecting for potential over-merging.

Full scoring documentation: docs.zingg.ai/latest/scoring


Running on Databricks

Databricks is one of the most common environments for Zingg. The Python API is identical — the differences are in setup, file paths, and the labeling widget.

1. Cluster setup

Create a Databricks cluster using a current LTS runtime (tested with Runtime 15.4, Spark 3.5). Once it’s running:

  • Download the latest Zingg JAR from Zingg releases
  • In your cluster’s Libraries tab → Install NewUpload JAR → upload the file

2. Install the Python package

%pip install zingg
%pip install tabulate

3. Set up directories and helpers

# Change these to your preferred locations
zinggDir = "/models"
modelId  = "zingg_customer_match"

MARKED_DIR   = zinggDir + "/" + modelId + "/trainingData/marked/"
UNMARKED_DIR = zinggDir + "/" + modelId + "/trainingData/unmarked/"
MARKED_DIR_DBFS   = "/dbfs" + MARKED_DIR
UNMARKED_DIR_DBFS = "/dbfs" + UNMARKED_DIR

import pandas as pd
import numpy as np
from tabulate import tabulate
from ipywidgets import widgets
import base64
import pyspark.sql.functions as fn
from zingg.client import *
from zingg.pipes import *

def count_labeled_pairs(marked_pd):
    n_total    = len(np.unique(marked_pd['z_cluster']))
    n_positive = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster']))
    n_negative = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster']))
    return n_positive, n_negative, n_total

4. Configure arguments and load data

On Databricks, upload your CSV via the Data tab. The file path uses the DBFS /FileStore/tables/ prefix:

args = Arguments()
args.setModelId(modelId)
args.setZinggDir(zinggDir)

schema = "rec_id string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, dob string, ssn string"
inputPipe = CsvPipe("customers", "/FileStore/tables/customers.csv", schema)
args.setData(inputPipe)

outputPipe = CsvPipe("output", "/tmp/zingg_output")
args.setOutput(outputPipe)

# Field definitions (same as core API above)
rec_id = FieldDefinition("rec_id", "string", MatchType.DONT_USE)
fname  = FieldDefinition("fname",  "string", MatchType.FUZZY)
lname  = FieldDefinition("lname",  "string", MatchType.FUZZY)
stNo   = FieldDefinition("stNo",   "string", MatchType.FUZZY)
add1   = FieldDefinition("add1",   "string", MatchType.FUZZY)
add2   = FieldDefinition("add2",   "string", MatchType.FUZZY)
city   = FieldDefinition("city",   "string", MatchType.FUZZY)
state  = FieldDefinition("state",  "string", MatchType.FUZZY)
dob    = FieldDefinition("dob",    "string", MatchType.EXACT)
ssn    = FieldDefinition("ssn",    "string", MatchType.EXACT)

args.setFieldDefinition([rec_id, fname, lname, stNo, add1, add2, city, state, dob, ssn])
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.5)

5. Find training data, label, and save

# Find candidate pairs
options = ClientOptions([ClientOptions.PHASE, "findTrainingData"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

# Init the labeler
options = ClientOptions([ClientOptions.PHASE, "label"])
zingg = ZinggWithSpark(args, options)
zingg.init()

# Check how many pairs are available
candidate_pairs_pd = getPandasDfFromDs(zingg.getUnmarkedRecords())
if candidate_pairs_pd.shape[0] == 0:
    print("No unlabeled pairs found. Re-run findTrainingData.")
else:
    z_clusters = list(np.unique(candidate_pairs_pd['z_cluster']))
    print(f"{len(z_clusters)} candidate pairs found for labeling")

The full ipywidgets labeling UI is in the Databricks identity resolution notebook. After labeling, save with:

dbutils.fs.mkdirs(MARKED_DIR)
zingg.writeLabelledOutputFromPandas(candidate_pairs_pd, args)

marked_pd_df = getPandasDfFromDs(zingg.getMarkedRecords())
n_pos, n_neg, n_tot = count_labeled_pairs(marked_pd_df)
print(f"Accumulated {n_pos} matches and {n_neg} non-matches out of {n_tot} total.")

6. Train, match, and read output

# Train and match
options = ClientOptions([ClientOptions.PHASE, "trainMatch"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

# Read and display results
col_names = ["z_minScore", "z_maxScore", "z_cluster", "rec_id",
             "fname", "lname", "stNo", "add1", "add2", "city", "state", "dob", "ssn"]
outputDF = spark.read.csv("/tmp/zingg_output")
outputDF.toDF(*col_names).show(50)

Full Databricks guide with screenshots: Open Source Identity Resolution on Databricks for Customer 360


Running on Microsoft Fabric

Fabric runs Zingg in Lakehouse notebooks using the Spark runtime. The API is the same — the differences are file paths (ABFSS), directory creation (using notebookutils instead of dbutils), and checkpoint setup.

1. Set up your Fabric workspace

  • Create a workspace in Fabric and name it (e.g., “Zingg-Fabric”)
  • Create a Lakehouse inside the workspace to store your data and model outputs
  • Download the Zingg notebook from Zingg’s GitHub and import it to your workspace
  • Download the Zingg JAR from Zingg releases, create a new Environment, and upload the JAR under Custom Libraries

2. Install Zingg and set checkpoint

%pip install zingg
%pip show zingg  # verify installation

# Fabric-specific: set checkpoint directory in OneLake
spark.sparkContext.setCheckpointDir("Files")

3. Set up directories

On Fabric, paths use the ABFSS scheme pointing to your OneLake workspace. Replace the IDs with your own:

# Replace with your actual OneLake workspace and lakehouse IDs
zinggDir = "abfss://<workspace-id>@onelake.dfs.fabric.microsoft.com/<lakehouse-id>/Files"
modelId  = "zingg_customer_match"

MARKED_DIR   = zinggDir + "/" + modelId + "/trainingData/marked/"
UNMARKED_DIR = zinggDir + "/" + modelId + "/trainingData/unmarked/"

import pandas as pd
import numpy as np
from zingg.client import *
from zingg.pipes import *

def count_labeled_pairs(marked_pd):
    n_total     = len(np.unique(marked_pd['z_cluster']))
    n_positive  = len(np.unique(marked_pd[marked_pd['z_isMatch']==1]['z_cluster']))
    n_negative  = len(np.unique(marked_pd[marked_pd['z_isMatch']==0]['z_cluster']))
    n_uncertain = len(np.unique(marked_pd[marked_pd['z_isMatch']==2]['z_cluster']))
    return n_positive, n_negative, n_uncertain, n_total

4. Configure arguments and load data

args = Arguments()
args.setModelId(modelId)
args.setZinggDir(zinggDir)

schema = "rec_id string, fname string, lname string, stNo string, add1 string, add2 string, city string, areacode string, state string, dob string, ssn string"

input_path  = zinggDir + "/Test.csv"
output_path = zinggDir + "/Output/" + modelId

inputPipe  = CsvPipe("inputpipe",    input_path,  schema)
outputPipe = CsvPipe("resultOutput", output_path)

args.setData(inputPipe)
args.setOutput(outputPipe)

rec_id   = FieldDefinition("rec_id",   "string", MatchType.DONT_USE)
fname    = FieldDefinition("fname",    "string", MatchType.FUZZY)
lname    = FieldDefinition("lname",    "string", MatchType.FUZZY)
stNo     = FieldDefinition("stNo",     "string", MatchType.FUZZY)
add1     = FieldDefinition("add1",     "string", MatchType.FUZZY)
add2     = FieldDefinition("add2",     "string", MatchType.FUZZY)
city     = FieldDefinition("city",     "string", MatchType.FUZZY)
areacode = FieldDefinition("areacode", "string", MatchType.FUZZY)
state    = FieldDefinition("state",    "string", MatchType.FUZZY)
dob      = FieldDefinition("dob",      "string", MatchType.EXACT)
ssn      = FieldDefinition("ssn",      "string", MatchType.EXACT)

args.setFieldDefinition([rec_id, fname, lname, stNo, add1, add2,
                         city, areacode, state, dob, ssn])
args.setNumPartitions(4)
args.setLabelDataSampleSize(0.4)

If findTrainingData is taking too long, reduce LabelDataSampleSize by at least a factor of 10 and retry.

5. Find training data, label, and save

# Find candidate pairs
options = ClientOptions([ClientOptions.PHASE, "findTrainingData"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

# Init labeler
options = ClientOptions([ClientOptions.PHASE, "label"])
zingg = ZinggWithSpark(args, options)
zingg.init()

# Retrieve pairs
candidate_pairs_pd = getPandasDfFromDs(zingg.getUnmarkedRecords())
if candidate_pairs_pd.shape[0] == 0:
    print("No unlabeled pairs found. Re-run findTrainingData.")
else:
    z_clusters = list(np.unique(candidate_pairs_pd['z_cluster']))
    print(f"{len(z_clusters)} candidate pairs found for labeling")

The full labeling widget is in the Fabric identity resolution guide. After labeling, save with — note notebookutils instead of dbutils:

notebookutils.fs.mkdirs(MARKED_DIR)
zingg.writeLabelledOutputFromPandas(candidate_pairs_pd, args)

marked_pd_df = getPandasDfFromDs(zingg.getMarkedRecords())
n_pos, n_neg, n_uncer, n_tot = count_labeled_pairs(marked_pd_df)
print(f"{n_pos} matches, {n_neg} non-matches, {n_uncer} uncertain out of {n_tot} total")

6. Train, match, and read output

options = ClientOptions([ClientOptions.PHASE, "trainMatch"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

col_names = ["z_minScore", "z_maxScore", "z_cluster", "rec_id",
             "fname", "lname", "stNo", "add1", "add2",
             "city", "areacode", "state", "dob", "ssn"]
outputDF = spark.read.csv(output_path)
outputDF = outputDF.toDF(*col_names)
display(outputDF)

Full Fabric guide with screenshots: Step by Step Identity Resolution with Zingg on Fabric


Linking Across Multiple Source Systems

If your data comes from two separate systems, use the link phase to match records across sources rather than deduplicating within one:

crmPipe     = CsvPipe("crm",     "data/crm.csv",     schema)
billingPipe = CsvPipe("billing", "data/billing.csv", schema)
args.setData([crmPipe, billingPipe])

options = ClientOptions([ClientOptions.PHASE, "link"])
zingg = ZinggWithSpark(args, options)
zingg.initAndExecute()

Link documentation: docs.zingg.ai/latest/stepbystep/link

Incremental Identity Resolution (Enterprise)

For production systems where data changes continuously, Zingg Enterprise’s runIncremental phase processes only new and updated records against the existing identity graph, preserving everything else and keeping ZINGG_IDs stable throughout.

from zinggEC.enterprise.common.IncrementalArguments import *
from zinggEC.enterprise.common.epipes import *
from zinggEC.enterprise.common.EArguments import *
from zinggEC.enterprise.common.EFieldDefinition import EFieldDefinition
from zinggES.enterprise.spark.ESparkClient import *

# Base arguments — same field definitions as initial match
args = EArguments()
recId = EFieldDefinition("recId", "string", MatchType.DONT_USE)
recId.setPrimaryKey(True)
# ... remaining field definitions as above

args.setFieldDefinition([recId, ...])
args.setModelId("100")
args.setZinggDir("/tmp/models")

inputPipe = ECsvPipe("customers", "data/customers.csv", schema)
args.setData(inputPipe)
outputPipe = ECsvPipe("output", "/tmp/output")
outputPipe.setHeader("true")
args.setOutput(outputPipe)

# Configure incremental run — only the delta
incrArgs = IncrementalArguments()
incrArgs.setParentArgs(args)

incrPipe = ECsvPipe("customers_delta", "data/customers_delta.csv", schema)
incrArgs.setIncrementalData(incrPipe)

outputTmpPipe = ECsvPipe("output_tmp", "/tmp/zingg_incremental_tmp")
outputTmpPipe.setHeader("true")
incrArgs.setOutputTmp(outputTmpPipe)

options = ClientOptions([ClientOptions.PHASE, "runIncremental"])
zingg = EZingg(incrArgs, options)
zingg.initAndExecute()

For the engineering story behind incremental resolution, see the Zingg incremental flow post on the newsletter.

Incremental documentation: docs.zingg.ai/latest/stepbystep/runincremental


Platform Reference

Platform Key difference from core API Guide
Databricks DBFS paths (/FileStore/tables/), dbutils.fs.mkdirs() Full Databricks guide
Microsoft Fabric ABFSS paths, notebookutils.fs.mkdirs(), checkpoint setup required Full Fabric guide
Snowflake (Enterprise) Native Snowpark execution, no Spark cluster needed Snowflake product page
BigQuery BigQueryPipe connector BigQuery product page
AWS Glue / EMR Standard Spark execution Cloud running docs

All platform step-by-step notebooks: zingg.ai/resources/guides


Get Started

docker pull zingg/zingg:0.5.0
docker run -it zingg/zingg:0.5.0 bash
# or
python -m pip install zingg

Zingg Enterprise (persistent ZINGG_ID, incremental flow, native Snowflake, deterministic matching): contact us


Further reading: - The What and Why of Entity Resolution - Deterministic vs. Probabilistic Matching: Why You Need Both - The ZINGG_ID: A Persistent Identifier for Your Entity Graph - Incremental Identity Resolution: Keeping Your Entity Graph Current - Learning from Data newsletter — founder POV on entity resolution as data infrastructure

Recent posts