feat(official-bots): implement king-relative (HalfKP) encoding in NNUE (NCS-109) (#80)

Co-authored-by: Janis Eccarius <eccariusjanis@gmail.com>
Reviewed-on: #80
This commit was merged in pull request #80.
This commit is contained in:
2026-06-24 19:33:12 +02:00
parent 7372867a82
commit 44f376f032
5 changed files with 165 additions and 103 deletions
+47 -21
View File
@@ -53,6 +53,11 @@ class NNUEDataset(Dataset):
eval_val = self.evals[idx]
features = fen_to_features(fen)
# Board is flipped for Black-to-move in fen_to_features; negate eval
# so the label still means "good for the side shown as White after flip"
if ' b ' in fen:
eval_val = -eval_val
# Use evaluation as-is if normalized, otherwise apply sigmoid scaling
if self.is_normalized:
target = torch.tensor(eval_val, dtype=torch.float32)
@@ -61,38 +66,59 @@ class NNUEDataset(Dataset):
return features, target
# King-relative (HalfKP) encoding: two perspectives, one per side's king.
# Each piece is encoded as: kingSq * 768 + pieceIdx * 64 + sq
# White perspective uses white king square; black perspective uses black king square.
# Total input dimension = 2 × 64 × 12 × 64 = 98304.
_HALF_SIZE = 64 * 12 * 64 # 49152 features per perspective
INPUT_SIZE = _HALF_SIZE * 2 # 98304
_PIECE_TO_IDX = {
'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11,
}
def fen_to_features(fen):
"""Convert FEN to 768-dimensional binary feature vector."""
# Piece type to index: pawn=0, knight=1, bishop=2, rook=3, queen=4, king=5
piece_to_idx = {'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11}
features = torch.zeros(768, dtype=torch.float32)
"""Convert FEN to 98304-dim king-relative (HalfKP) feature vector.
For Black-to-move positions the board is mirrored (ranks flipped, colours
swapped) so the network always sees the position from the side-to-move's
perspective. The caller is responsible for negating the eval label to match.
"""
features = torch.zeros(INPUT_SIZE, dtype=torch.float32)
try:
board = chess.Board(fen)
# 12 piece types × 64 squares = 768
for square in chess.SQUARES:
piece = board.piece_at(square)
if piece is not None:
piece_char = piece.symbol()
if piece_char in piece_to_idx:
piece_idx = piece_to_idx[piece_char]
feature_idx = piece_idx * 64 + square
features[feature_idx] = 1.0
except:
# Perspective flip: present all positions as if White is to move
if board.turn == chess.BLACK:
board = board.mirror()
wk = board.king(chess.WHITE)
bk = board.king(chess.BLACK)
if wk is None or bk is None:
return features
for sq in chess.SQUARES:
piece = board.piece_at(sq)
if piece is None:
continue
pidx = _PIECE_TO_IDX[piece.symbol()]
# White-king perspective (indices 0 .. _HALF_SIZE-1)
features[wk * 768 + pidx * 64 + sq] = 1.0
# Black-king perspective (indices _HALF_SIZE .. INPUT_SIZE-1)
features[_HALF_SIZE + bk * 768 + pidx * 64 + sq] = 1.0
except Exception:
pass
return features
DEFAULT_HIDDEN_SIZES = [1536, 1024, 512, 256]
# Smaller hidden layers are appropriate: the L1 input is very sparse (~64 active
# features out of 98304) so the L1 itself is cheap to update incrementally; the
# larger capacity comes from the wider perspective encoding, not deeper layers.
DEFAULT_HIDDEN_SIZES = [512, 256, 128]
class NNUE(nn.Module):
"""NNUE neural network with configurable hidden layers.
Architecture: 768 → hidden_sizes[0] → ... → hidden_sizes[-1] → 1
Architecture: INPUT_SIZE → hidden_sizes[0] → ... → hidden_sizes[-1] → 1
Layer attributes follow the naming l1, l2, ..., lN so export.py can
infer the architecture directly from the state_dict.
"""
@@ -102,7 +128,7 @@ class NNUE(nn.Module):
if hidden_sizes is None:
hidden_sizes = DEFAULT_HIDDEN_SIZES
self.hidden_sizes = list(hidden_sizes)
sizes = [768] + self.hidden_sizes + [1]
sizes = [INPUT_SIZE] + self.hidden_sizes + [1]
num_hidden = len(self.hidden_sizes)
for i in range(num_hidden):
@@ -23,9 +23,9 @@ object EvaluationNNUE extends Evaluation:
nnue.copyAccumulator(parentPly, childPly)
override def pushAccumulator(childPly: Int, move: Move, parent: GameContext, child: GameContext): Unit =
// Use incremental updates, but recompute from scratch every 10 plies to prevent accumulation errors
// Recompute every 10 plies to prevent floating-point drift; king moves always recompute internally
if childPly % 10 == 0 then nnue.recomputeAccumulator(childPly, child.board)
else nnue.pushAccumulator(childPly, move, parent.board)
else nnue.pushAccumulator(childPly, move, parent.board, child.board)
override def evaluateAccumulator(ply: Int, context: GameContext, hash: Long): Int =
nnue.evaluateAtPlyWithValidation(ply, context.turn, hash, context.board)
@@ -1,17 +1,17 @@
package de.nowchess.bot.bots.nnue
import de.nowchess.api.board.{Board, Color, File, Piece, PieceType, Square}
import de.nowchess.api.board.{Board, Color, Piece, PieceType, Square}
import de.nowchess.api.game.GameContext
import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
class NNUE(model: NbaiModel):
private val featureSize = model.layers(0).inputSize
private val HALF_SIZE = 49152 // 64 king-squares × 12 piece-types × 64 piece-squares
private val featureSize = model.layers(0).inputSize // 98304 (= HALF_SIZE * 2) for king-relative
private val accSize = model.layers(0).outputSize
private val validateAccum = sys.env.contains("NNUE_VALIDATE") // Enable with NNUE_VALIDATE=1
private val validateAccum = sys.env.contains("NNUE_VALIDATE")
// Column-major L1 weights for cache-friendly sparse & incremental updates.
// l1WeightsT(featureIdx * accSize + outputIdx) = l1Weights(outputIdx * featureSize + featureIdx)
// Column-major L1 weights: l1WeightsT(featureIdx * accSize + outputIdx)
private val l1WeightsT: Array[Float] =
val w = model.weights(0).weights
val t = new Array[Float](featureSize * accSize)
@@ -23,7 +23,6 @@ class NNUE(model: NbaiModel):
private val MAX_PLY = 128
private val l1Stack: Array[Array[Float]] = Array.fill(MAX_PLY + 1)(new Array[Float](accSize))
// Shared evaluation buffers: index i holds the output of layers(i) (all except the scalar output layer).
private val evalBuffers: Array[Array[Float]] = model.layers.init.map(l => new Array[Float](l.outputSize))
// ── Eval cache ───────────────────────────────────────────────────────────
@@ -36,9 +35,29 @@ class NNUE(model: NbaiModel):
private def squareNum(sq: Square): Int = sq.rank.ordinal * 8 + sq.file.ordinal
private def featureIndex(piece: Piece, sqNum: Int): Int =
val colorOffset = if piece.color == Color.White then 6 else 0
(colorOffset + piece.pieceType.ordinal) * 64 + sqNum
// Mirror square vertically (rank 0 ↔ rank 7) for the perspective flip
private def flipSqNum(sqNum: Int): Int = (7 - sqNum / 8) * 8 + sqNum % 8
private def pieceIdx(piece: Piece): Int =
if piece.color == Color.White then 6 + piece.pieceType.ordinal else piece.pieceType.ordinal
// White-king perspective: index in [0, HALF_SIZE)
private def featureIdxWhite(piece: Piece, sqNum: Int, wkSq: Int): Int =
wkSq * 768 + pieceIdx(piece) * 64 + sqNum
// Black-king perspective: index in [HALF_SIZE, featureSize)
private def featureIdxBlack(piece: Piece, sqNum: Int, bkSq: Int): Int =
HALF_SIZE + bkSq * 768 + pieceIdx(piece) * 64 + sqNum
private def wkSqOf(board: Board): Int =
board.pieces
.collectFirst { case (sq, p) if p.pieceType == PieceType.King && p.color == Color.White => squareNum(sq) }
.getOrElse(0)
private def bkSqOf(board: Board): Int =
board.pieces
.collectFirst { case (sq, p) if p.pieceType == PieceType.King && p.color == Color.Black => squareNum(sq) }
.getOrElse(0)
private def addColumn(l1Pre: Array[Float], featureIdx: Int): Unit =
val offset = featureIdx * accSize
@@ -48,92 +67,96 @@ class NNUE(model: NbaiModel):
val offset = featureIdx * accSize
for i <- 0 until accSize do l1Pre(i) -= l1WeightsT(offset + i)
private def addPiece(l1: Array[Float], piece: Piece, sqNum: Int, wkSq: Int, bkSq: Int): Unit =
addColumn(l1, featureIdxWhite(piece, sqNum, wkSq))
addColumn(l1, featureIdxBlack(piece, sqNum, bkSq))
private def removePiece(l1: Array[Float], piece: Piece, sqNum: Int, wkSq: Int, bkSq: Int): Unit =
subtractColumn(l1, featureIdxWhite(piece, sqNum, wkSq))
subtractColumn(l1, featureIdxBlack(piece, sqNum, bkSq))
// ── Accumulator init ─────────────────────────────────────────────────────
def initAccumulator(board: Board): Unit =
val wkSq = wkSqOf(board)
val bkSq = bkSqOf(board)
System.arraycopy(model.weights(0).bias, 0, l1Stack(0), 0, accSize)
for (sq, piece) <- board.pieces do addColumn(l1Stack(0), featureIndex(piece, squareNum(sq)))
for (sq, piece) <- board.pieces do addPiece(l1Stack(0), piece, squareNum(sq), wkSq, bkSq)
// ── Accumulator push (incremental updates) ───────────────────────────────
def pushAccumulator(childPly: Int, move: Move, board: Board): Unit =
def pushAccumulator(childPly: Int, move: Move, parentBoard: Board, childBoard: Board): Unit =
System.arraycopy(l1Stack(childPly - 1), 0, l1Stack(childPly), 0, accSize)
val l1 = l1Stack(childPly)
move.moveType match
case MoveType.Normal(_) => applyNormalDelta(l1, move, board)
case MoveType.EnPassant => applyEnPassantDelta(l1, move, board)
case MoveType.CastleKingside | MoveType.CastleQueenside => applyCastleDelta(l1, move, board)
case MoveType.Promotion(p) => applyPromotionDelta(l1, move, p, board)
if isKingMove(move, parentBoard) then recomputeAccumulatorInto(l1Stack(childPly), childBoard)
else applyNonKingDelta(l1Stack(childPly), move, parentBoard)
private def isKingMove(move: Move, board: Board): Boolean =
move.moveType == MoveType.CastleKingside ||
move.moveType == MoveType.CastleQueenside ||
board.pieceAt(move.from).exists(_.pieceType == PieceType.King)
def copyAccumulator(parentPly: Int, childPly: Int): Unit =
System.arraycopy(l1Stack(parentPly), 0, l1Stack(childPly), 0, accSize)
def recomputeAccumulator(ply: Int, board: Board): Unit =
System.arraycopy(model.weights(0).bias, 0, l1Stack(ply), 0, accSize)
for (sq, piece) <- board.pieces do addColumn(l1Stack(ply), featureIndex(piece, squareNum(sq)))
recomputeAccumulatorInto(l1Stack(ply), board)
private def recomputeAccumulatorInto(l1: Array[Float], board: Board): Unit =
val wkSq = wkSqOf(board)
val bkSq = bkSqOf(board)
System.arraycopy(model.weights(0).bias, 0, l1, 0, accSize)
for (sq, piece) <- board.pieces do addPiece(l1, piece, squareNum(sq), wkSq, bkSq)
def validateAccumulator(ply: Int, board: Board): Boolean =
// Compute what L1 should be from scratch
val expectedL1 = new Array[Float](accSize)
System.arraycopy(model.weights(0).bias, 0, expectedL1, 0, accSize)
for (sq, piece) <- board.pieces do addColumn(expectedL1, featureIndex(piece, squareNum(sq)))
// Compare with actual L1
val expected = new Array[Float](accSize)
val wkSq = wkSqOf(board)
val bkSq = bkSqOf(board)
System.arraycopy(model.weights(0).bias, 0, expected, 0, accSize)
for (sq, piece) <- board.pieces do addPiece(expected, piece, squareNum(sq), wkSq, bkSq)
val actual = l1Stack(ply)
val maxError =
(0 until accSize).foldLeft(0f) { (currentMax, i) =>
val error = math.abs(actual(i) - expectedL1(i))
math.max(currentMax, error)
}
(0 until accSize).forall(i => math.abs(actual(i) - expected(i)) < 0.001f)
maxError < 0.001f // Allow small floating-point errors
// ── Non-king incremental deltas ──────────────────────────────────────────
private def applyNormalDelta(l1: Array[Float], move: Move, board: Board): Unit =
// Extract source and destination square indices early
val fromNum = squareNum(move.from)
val toNum = squareNum(move.to)
private def applyNonKingDelta(l1: Array[Float], move: Move, board: Board): Unit =
val wkSq = wkSqOf(board)
val bkSq = bkSqOf(board)
move.moveType match
case MoveType.Normal(_) => applyNormalDelta(l1, move, board, wkSq, bkSq)
case MoveType.EnPassant => applyEnPassantDelta(l1, move, board, wkSq, bkSq)
case MoveType.Promotion(p) => applyPromotionDelta(l1, move, p, board, wkSq, bkSq)
case _ => () // king moves handled before this point
// Get the moving piece
private def applyNormalDelta(l1: Array[Float], move: Move, board: Board, wkSq: Int, bkSq: Int): Unit =
board.pieceAt(move.from).foreach { mover =>
subtractColumn(l1, featureIndex(mover, fromNum))
// If there's a capture, subtract the captured piece
board.pieceAt(move.to).foreach { cap =>
subtractColumn(l1, featureIndex(cap, toNum))
}
// Add the piece to its new location
addColumn(l1, featureIndex(mover, toNum))
val fromNum = squareNum(move.from)
val toNum = squareNum(move.to)
removePiece(l1, mover, fromNum, wkSq, bkSq)
board.pieceAt(move.to).foreach(cap => removePiece(l1, cap, toNum, wkSq, bkSq))
addPiece(l1, mover, toNum, wkSq, bkSq)
}
private def applyEnPassantDelta(l1: Array[Float], move: Move, board: Board): Unit =
private def applyEnPassantDelta(l1: Array[Float], move: Move, board: Board, wkSq: Int, bkSq: Int): Unit =
board.pieceAt(move.from).foreach { pawn =>
val capturedSq = Square(move.to.file, move.from.rank)
subtractColumn(l1, featureIndex(pawn, squareNum(move.from)))
board.pieceAt(capturedSq).foreach(cap => subtractColumn(l1, featureIndex(cap, squareNum(capturedSq))))
addColumn(l1, featureIndex(pawn, squareNum(move.to)))
removePiece(l1, pawn, squareNum(move.from), wkSq, bkSq)
board.pieceAt(capturedSq).foreach(cap => removePiece(l1, cap, squareNum(capturedSq), wkSq, bkSq))
addPiece(l1, pawn, squareNum(move.to), wkSq, bkSq)
}
private def applyCastleDelta(l1: Array[Float], move: Move, board: Board): Unit =
board.pieceAt(move.from).foreach { king =>
val rank = move.from.rank
val kingside = move.moveType == MoveType.CastleKingside
val (rookFrom, rookTo) =
if kingside then (Square(File.H, rank), Square(File.F, rank))
else (Square(File.A, rank), Square(File.D, rank))
val rook = Piece(king.color, PieceType.Rook)
subtractColumn(l1, featureIndex(king, squareNum(move.from)))
addColumn(l1, featureIndex(king, squareNum(move.to)))
subtractColumn(l1, featureIndex(rook, squareNum(rookFrom)))
addColumn(l1, featureIndex(rook, squareNum(rookTo)))
}
private def applyPromotionDelta(l1: Array[Float], move: Move, promo: PromotionPiece, board: Board): Unit =
private def applyPromotionDelta(
l1: Array[Float],
move: Move,
promo: PromotionPiece,
board: Board,
wkSq: Int,
bkSq: Int,
): Unit =
board.pieceAt(move.from).foreach { pawn =>
val toNum = squareNum(move.to)
subtractColumn(l1, featureIndex(pawn, squareNum(move.from)))
board.pieceAt(move.to).foreach(cap => subtractColumn(l1, featureIndex(cap, toNum)))
addColumn(l1, featureIndex(Piece(pawn.color, promotedType(promo)), toNum))
removePiece(l1, pawn, squareNum(move.from), wkSq, bkSq)
board.pieceAt(move.to).foreach(cap => removePiece(l1, cap, toNum, wkSq, bkSq))
addPiece(l1, Piece(pawn.color, promotedType(promo)), toNum, wkSq, bkSq)
}
private def promotedType(promo: PromotionPiece): PieceType = promo match
@@ -154,7 +177,6 @@ class NNUE(model: NbaiModel):
score
def evaluateAtPlyWithValidation(ply: Int, turn: Color, hash: Long, board: Board): Int =
// For debugging: validate that incremental accumulator matches recomputation
if validateAccum && ply > 0 && ply % 10 != 0 then
val isValid = validateAccumulator(ply, board)
if !isValid then System.err.println(s"WARNING: NNUE accumulator diverged at ply $ply")
@@ -206,9 +228,23 @@ class NNUE(model: NbaiModel):
private val legacyL1 = new Array[Float](accSize)
def evaluate(context: GameContext): Int =
// Match training: for Black-to-move positions, mirror the board (ranks flipped,
// colours swapped) so the model always sees from the side-to-move's perspective.
// The scoreFromOutput negation then converts back to White's absolute perspective.
val (wkSq, bkSq, pieces, turn) =
if context.turn == Color.Black then
val wk = flipSqNum(bkSqOf(context.board)) // flipped Black king → new "White" king
val bk = flipSqNum(wkSqOf(context.board)) // flipped White king → new "Black" king
val flipped = context.board.pieces.map { case (sq, p) =>
(sq, Piece(p.color.opposite, p.pieceType))
}
(wk, bk, flipped, Color.Black) // pass Black so scoreFromOutput negates the result
else (wkSqOf(context.board), bkSqOf(context.board), context.board.pieces, context.turn)
System.arraycopy(model.weights(0).bias, 0, legacyL1, 0, accSize)
for (sq, piece) <- context.board.pieces do addColumn(legacyL1, featureIndex(piece, squareNum(sq)))
runL2toOutput(legacyL1, context.turn)
for (sq, piece) <- pieces do
val sqNum = if turn == Color.Black then flipSqNum(squareNum(sq)) else squareNum(sq)
addPiece(legacyL1, piece, sqNum, wkSq, bkSq)
runL2toOutput(legacyL1, turn)
def benchmark(): Unit =
val context = GameContext.initial
@@ -85,17 +85,17 @@ class HybridBotTest extends AnyFunSuite with Matchers:
private val altMove = Move(Square(File.E, Rank.R2), Square(File.E, Rank.R3), MoveType.Normal())
private def vetoRules: RuleSet = new RuleSet:
private def fresh(ctx: GameContext): Boolean = ctx.moves.isEmpty
def candidateMoves(context: GameContext)(square: Square): List[Move] = Nil
def legalMoves(context: GameContext)(square: Square): List[Move] = Nil
def allLegalMoves(context: GameContext): List[Move] =
private def fresh(ctx: GameContext): Boolean = ctx.moves.isEmpty
def candidateMoves(context: GameContext)(square: Square): List[Move] = Nil
def legalMoves(context: GameContext)(square: Square): List[Move] = Nil
def allLegalMoves(context: GameContext): List[Move] =
if fresh(context) then List(mateMove, altMove) else Nil
def isCheck(context: GameContext): Boolean = false
def isCheckmate(context: GameContext): Boolean = context.moves.lastOption.contains(mateMove)
def isStalemate(context: GameContext): Boolean = context.moves.lastOption.contains(altMove)
def isInsufficientMaterial(context: GameContext): Boolean = false
def isFiftyMoveRule(context: GameContext): Boolean = false
def isThreefoldRepetition(context: GameContext): Boolean = false
def isCheck(context: GameContext): Boolean = false
def isCheckmate(context: GameContext): Boolean = context.moves.lastOption.contains(mateMove)
def isStalemate(context: GameContext): Boolean = context.moves.lastOption.contains(altMove)
def isInsufficientMaterial(context: GameContext): Boolean = false
def isFiftyMoveRule(context: GameContext): Boolean = false
def isThreefoldRepetition(context: GameContext): Boolean = false
def applyMove(context: GameContext)(move: Move): GameContext =
context.copy(turn = context.turn.opposite, moves = context.moves :+ move)