feat(official-bots): implement king-relative (HalfKP) encoding in NNUE (NCS-109) #82

Closed
Janis wants to merge 1 commits from feat/NCS-109-king-relative-encoding into main
5 changed files with 137 additions and 103 deletions
+35 -22
View File
@@ -61,38 +61,51 @@ class NNUEDataset(Dataset):
return features, target return features, target
# King-relative (HalfKP) encoding: two perspectives, one per side's king.
# Each piece is encoded as: kingSq * 768 + pieceIdx * 64 + sq
# White perspective uses white king square; black perspective uses black king square.
# Total input dimension = 2 × 64 × 12 × 64 = 98304.
_HALF_SIZE = 64 * 12 * 64 # 49152 features per perspective
INPUT_SIZE = _HALF_SIZE * 2 # 98304
_PIECE_TO_IDX = {
'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11,
}
def fen_to_features(fen): def fen_to_features(fen):
"""Convert FEN to 768-dimensional binary feature vector.""" """Convert FEN to 98304-dim king-relative (HalfKP) feature vector."""
# Piece type to index: pawn=0, knight=1, bishop=2, rook=3, queen=4, king=5 features = torch.zeros(INPUT_SIZE, dtype=torch.float32)
piece_to_idx = {'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11}
features = torch.zeros(768, dtype=torch.float32)
try: try:
board = chess.Board(fen) board = chess.Board(fen)
wk = board.king(chess.WHITE)
# 12 piece types × 64 squares = 768 bk = board.king(chess.BLACK)
for square in chess.SQUARES: if wk is None or bk is None:
piece = board.piece_at(square) return features
if piece is not None: for sq in chess.SQUARES:
piece_char = piece.symbol() piece = board.piece_at(sq)
if piece_char in piece_to_idx: if piece is None:
piece_idx = piece_to_idx[piece_char] continue
feature_idx = piece_idx * 64 + square pidx = _PIECE_TO_IDX[piece.symbol()]
features[feature_idx] = 1.0 # White-king perspective (indices 0 .. _HALF_SIZE-1)
except: features[wk * 768 + pidx * 64 + sq] = 1.0
# Black-king perspective (indices _HALF_SIZE .. INPUT_SIZE-1)
features[_HALF_SIZE + bk * 768 + pidx * 64 + sq] = 1.0
except Exception:
pass pass
return features return features
DEFAULT_HIDDEN_SIZES = [1536, 1024, 512, 256] # Smaller hidden layers are appropriate: the L1 input is very sparse (~64 active
# features out of 98304) so the L1 itself is cheap to update incrementally; the
# larger capacity comes from the wider perspective encoding, not deeper layers.
DEFAULT_HIDDEN_SIZES = [512, 256, 128]
class NNUE(nn.Module): class NNUE(nn.Module):
"""NNUE neural network with configurable hidden layers. """NNUE neural network with configurable hidden layers.
Architecture: 768 → hidden_sizes[0] → ... → hidden_sizes[-1] → 1 Architecture: INPUT_SIZE → hidden_sizes[0] → ... → hidden_sizes[-1] → 1
Layer attributes follow the naming l1, l2, ..., lN so export.py can Layer attributes follow the naming l1, l2, ..., lN so export.py can
infer the architecture directly from the state_dict. infer the architecture directly from the state_dict.
""" """
@@ -102,7 +115,7 @@ class NNUE(nn.Module):
if hidden_sizes is None: if hidden_sizes is None:
hidden_sizes = DEFAULT_HIDDEN_SIZES hidden_sizes = DEFAULT_HIDDEN_SIZES
self.hidden_sizes = list(hidden_sizes) self.hidden_sizes = list(hidden_sizes)
sizes = [768] + self.hidden_sizes + [1] sizes = [INPUT_SIZE] + self.hidden_sizes + [1]
num_hidden = len(self.hidden_sizes) num_hidden = len(self.hidden_sizes)
for i in range(num_hidden): for i in range(num_hidden):
@@ -23,9 +23,9 @@ object EvaluationNNUE extends Evaluation:
nnue.copyAccumulator(parentPly, childPly) nnue.copyAccumulator(parentPly, childPly)
override def pushAccumulator(childPly: Int, move: Move, parent: GameContext, child: GameContext): Unit = override def pushAccumulator(childPly: Int, move: Move, parent: GameContext, child: GameContext): Unit =
// Use incremental updates, but recompute from scratch every 10 plies to prevent accumulation errors // Recompute every 10 plies to prevent floating-point drift; king moves always recompute internally
if childPly % 10 == 0 then nnue.recomputeAccumulator(childPly, child.board) if childPly % 10 == 0 then nnue.recomputeAccumulator(childPly, child.board)
else nnue.pushAccumulator(childPly, move, parent.board) else nnue.pushAccumulator(childPly, move, parent.board, child.board)
override def evaluateAccumulator(ply: Int, context: GameContext, hash: Long): Int = override def evaluateAccumulator(ply: Int, context: GameContext, hash: Long): Int =
nnue.evaluateAtPlyWithValidation(ply, context.turn, hash, context.board) nnue.evaluateAtPlyWithValidation(ply, context.turn, hash, context.board)
@@ -1,17 +1,17 @@
package de.nowchess.bot.bots.nnue package de.nowchess.bot.bots.nnue
import de.nowchess.api.board.{Board, Color, File, Piece, PieceType, Square} import de.nowchess.api.board.{Board, Color, Piece, PieceType, Square}
import de.nowchess.api.game.GameContext import de.nowchess.api.game.GameContext
import de.nowchess.api.move.{Move, MoveType, PromotionPiece} import de.nowchess.api.move.{Move, MoveType, PromotionPiece}
class NNUE(model: NbaiModel): class NNUE(model: NbaiModel):
private val featureSize = model.layers(0).inputSize private val HALF_SIZE = 49152 // 64 king-squares × 12 piece-types × 64 piece-squares
private val featureSize = model.layers(0).inputSize // 98304 (= HALF_SIZE * 2) for king-relative
private val accSize = model.layers(0).outputSize private val accSize = model.layers(0).outputSize
private val validateAccum = sys.env.contains("NNUE_VALIDATE") // Enable with NNUE_VALIDATE=1 private val validateAccum = sys.env.contains("NNUE_VALIDATE")
// Column-major L1 weights for cache-friendly sparse & incremental updates. // Column-major L1 weights: l1WeightsT(featureIdx * accSize + outputIdx)
// l1WeightsT(featureIdx * accSize + outputIdx) = l1Weights(outputIdx * featureSize + featureIdx)
private val l1WeightsT: Array[Float] = private val l1WeightsT: Array[Float] =
val w = model.weights(0).weights val w = model.weights(0).weights
val t = new Array[Float](featureSize * accSize) val t = new Array[Float](featureSize * accSize)
@@ -23,7 +23,6 @@ class NNUE(model: NbaiModel):
private val MAX_PLY = 128 private val MAX_PLY = 128
private val l1Stack: Array[Array[Float]] = Array.fill(MAX_PLY + 1)(new Array[Float](accSize)) private val l1Stack: Array[Array[Float]] = Array.fill(MAX_PLY + 1)(new Array[Float](accSize))
// Shared evaluation buffers: index i holds the output of layers(i) (all except the scalar output layer).
private val evalBuffers: Array[Array[Float]] = model.layers.init.map(l => new Array[Float](l.outputSize)) private val evalBuffers: Array[Array[Float]] = model.layers.init.map(l => new Array[Float](l.outputSize))
// ── Eval cache ─────────────────────────────────────────────────────────── // ── Eval cache ───────────────────────────────────────────────────────────
@@ -36,9 +35,26 @@ class NNUE(model: NbaiModel):
private def squareNum(sq: Square): Int = sq.rank.ordinal * 8 + sq.file.ordinal private def squareNum(sq: Square): Int = sq.rank.ordinal * 8 + sq.file.ordinal
private def featureIndex(piece: Piece, sqNum: Int): Int = private def pieceIdx(piece: Piece): Int =
val colorOffset = if piece.color == Color.White then 6 else 0 if piece.color == Color.White then 6 + piece.pieceType.ordinal else piece.pieceType.ordinal
(colorOffset + piece.pieceType.ordinal) * 64 + sqNum
// White-king perspective: index in [0, HALF_SIZE)
private def featureIdxWhite(piece: Piece, sqNum: Int, wkSq: Int): Int =
wkSq * 768 + pieceIdx(piece) * 64 + sqNum
// Black-king perspective: index in [HALF_SIZE, featureSize)
private def featureIdxBlack(piece: Piece, sqNum: Int, bkSq: Int): Int =
HALF_SIZE + bkSq * 768 + pieceIdx(piece) * 64 + sqNum
private def wkSqOf(board: Board): Int =
board.pieces
.collectFirst { case (sq, p) if p.pieceType == PieceType.King && p.color == Color.White => squareNum(sq) }
.getOrElse(0)
private def bkSqOf(board: Board): Int =
board.pieces
.collectFirst { case (sq, p) if p.pieceType == PieceType.King && p.color == Color.Black => squareNum(sq) }
.getOrElse(0)
private def addColumn(l1Pre: Array[Float], featureIdx: Int): Unit = private def addColumn(l1Pre: Array[Float], featureIdx: Int): Unit =
val offset = featureIdx * accSize val offset = featureIdx * accSize
@@ -48,92 +64,96 @@ class NNUE(model: NbaiModel):
val offset = featureIdx * accSize val offset = featureIdx * accSize
for i <- 0 until accSize do l1Pre(i) -= l1WeightsT(offset + i) for i <- 0 until accSize do l1Pre(i) -= l1WeightsT(offset + i)
private def addPiece(l1: Array[Float], piece: Piece, sqNum: Int, wkSq: Int, bkSq: Int): Unit =
addColumn(l1, featureIdxWhite(piece, sqNum, wkSq))
addColumn(l1, featureIdxBlack(piece, sqNum, bkSq))
private def removePiece(l1: Array[Float], piece: Piece, sqNum: Int, wkSq: Int, bkSq: Int): Unit =
subtractColumn(l1, featureIdxWhite(piece, sqNum, wkSq))
subtractColumn(l1, featureIdxBlack(piece, sqNum, bkSq))
// ── Accumulator init ───────────────────────────────────────────────────── // ── Accumulator init ─────────────────────────────────────────────────────
def initAccumulator(board: Board): Unit = def initAccumulator(board: Board): Unit =
val wkSq = wkSqOf(board)
val bkSq = bkSqOf(board)
System.arraycopy(model.weights(0).bias, 0, l1Stack(0), 0, accSize) System.arraycopy(model.weights(0).bias, 0, l1Stack(0), 0, accSize)
for (sq, piece) <- board.pieces do addColumn(l1Stack(0), featureIndex(piece, squareNum(sq))) for (sq, piece) <- board.pieces do addPiece(l1Stack(0), piece, squareNum(sq), wkSq, bkSq)
// ── Accumulator push (incremental updates) ─────────────────────────────── // ── Accumulator push (incremental updates) ───────────────────────────────
def pushAccumulator(childPly: Int, move: Move, board: Board): Unit = def pushAccumulator(childPly: Int, move: Move, parentBoard: Board, childBoard: Board): Unit =
System.arraycopy(l1Stack(childPly - 1), 0, l1Stack(childPly), 0, accSize) System.arraycopy(l1Stack(childPly - 1), 0, l1Stack(childPly), 0, accSize)
val l1 = l1Stack(childPly) if isKingMove(move, parentBoard) then recomputeAccumulatorInto(l1Stack(childPly), childBoard)
move.moveType match else applyNonKingDelta(l1Stack(childPly), move, parentBoard)
case MoveType.Normal(_) => applyNormalDelta(l1, move, board)
case MoveType.EnPassant => applyEnPassantDelta(l1, move, board) private def isKingMove(move: Move, board: Board): Boolean =
case MoveType.CastleKingside | MoveType.CastleQueenside => applyCastleDelta(l1, move, board) move.moveType == MoveType.CastleKingside ||
case MoveType.Promotion(p) => applyPromotionDelta(l1, move, p, board) move.moveType == MoveType.CastleQueenside ||
board.pieceAt(move.from).exists(_.pieceType == PieceType.King)
def copyAccumulator(parentPly: Int, childPly: Int): Unit = def copyAccumulator(parentPly: Int, childPly: Int): Unit =
System.arraycopy(l1Stack(parentPly), 0, l1Stack(childPly), 0, accSize) System.arraycopy(l1Stack(parentPly), 0, l1Stack(childPly), 0, accSize)
def recomputeAccumulator(ply: Int, board: Board): Unit = def recomputeAccumulator(ply: Int, board: Board): Unit =
System.arraycopy(model.weights(0).bias, 0, l1Stack(ply), 0, accSize) recomputeAccumulatorInto(l1Stack(ply), board)
for (sq, piece) <- board.pieces do addColumn(l1Stack(ply), featureIndex(piece, squareNum(sq)))
private def recomputeAccumulatorInto(l1: Array[Float], board: Board): Unit =
val wkSq = wkSqOf(board)
val bkSq = bkSqOf(board)
System.arraycopy(model.weights(0).bias, 0, l1, 0, accSize)
for (sq, piece) <- board.pieces do addPiece(l1, piece, squareNum(sq), wkSq, bkSq)
def validateAccumulator(ply: Int, board: Board): Boolean = def validateAccumulator(ply: Int, board: Board): Boolean =
// Compute what L1 should be from scratch val expected = new Array[Float](accSize)
val expectedL1 = new Array[Float](accSize) val wkSq = wkSqOf(board)
System.arraycopy(model.weights(0).bias, 0, expectedL1, 0, accSize) val bkSq = bkSqOf(board)
for (sq, piece) <- board.pieces do addColumn(expectedL1, featureIndex(piece, squareNum(sq))) System.arraycopy(model.weights(0).bias, 0, expected, 0, accSize)
for (sq, piece) <- board.pieces do addPiece(expected, piece, squareNum(sq), wkSq, bkSq)
// Compare with actual L1
val actual = l1Stack(ply) val actual = l1Stack(ply)
val maxError = (0 until accSize).forall(i => math.abs(actual(i) - expected(i)) < 0.001f)
(0 until accSize).foldLeft(0f) { (currentMax, i) =>
val error = math.abs(actual(i) - expectedL1(i))
math.max(currentMax, error)
}
maxError < 0.001f // Allow small floating-point errors // ── Non-king incremental deltas ──────────────────────────────────────────
private def applyNormalDelta(l1: Array[Float], move: Move, board: Board): Unit = private def applyNonKingDelta(l1: Array[Float], move: Move, board: Board): Unit =
// Extract source and destination square indices early val wkSq = wkSqOf(board)
val fromNum = squareNum(move.from) val bkSq = bkSqOf(board)
val toNum = squareNum(move.to) move.moveType match
case MoveType.Normal(_) => applyNormalDelta(l1, move, board, wkSq, bkSq)
case MoveType.EnPassant => applyEnPassantDelta(l1, move, board, wkSq, bkSq)
case MoveType.Promotion(p) => applyPromotionDelta(l1, move, p, board, wkSq, bkSq)
case _ => () // king moves handled before this point
// Get the moving piece private def applyNormalDelta(l1: Array[Float], move: Move, board: Board, wkSq: Int, bkSq: Int): Unit =
board.pieceAt(move.from).foreach { mover => board.pieceAt(move.from).foreach { mover =>
subtractColumn(l1, featureIndex(mover, fromNum)) val fromNum = squareNum(move.from)
val toNum = squareNum(move.to)
// If there's a capture, subtract the captured piece removePiece(l1, mover, fromNum, wkSq, bkSq)
board.pieceAt(move.to).foreach { cap => board.pieceAt(move.to).foreach(cap => removePiece(l1, cap, toNum, wkSq, bkSq))
subtractColumn(l1, featureIndex(cap, toNum)) addPiece(l1, mover, toNum, wkSq, bkSq)
}
// Add the piece to its new location
addColumn(l1, featureIndex(mover, toNum))
} }
private def applyEnPassantDelta(l1: Array[Float], move: Move, board: Board): Unit = private def applyEnPassantDelta(l1: Array[Float], move: Move, board: Board, wkSq: Int, bkSq: Int): Unit =
board.pieceAt(move.from).foreach { pawn => board.pieceAt(move.from).foreach { pawn =>
val capturedSq = Square(move.to.file, move.from.rank) val capturedSq = Square(move.to.file, move.from.rank)
subtractColumn(l1, featureIndex(pawn, squareNum(move.from))) removePiece(l1, pawn, squareNum(move.from), wkSq, bkSq)
board.pieceAt(capturedSq).foreach(cap => subtractColumn(l1, featureIndex(cap, squareNum(capturedSq)))) board.pieceAt(capturedSq).foreach(cap => removePiece(l1, cap, squareNum(capturedSq), wkSq, bkSq))
addColumn(l1, featureIndex(pawn, squareNum(move.to))) addPiece(l1, pawn, squareNum(move.to), wkSq, bkSq)
} }
private def applyCastleDelta(l1: Array[Float], move: Move, board: Board): Unit = private def applyPromotionDelta(
board.pieceAt(move.from).foreach { king => l1: Array[Float],
val rank = move.from.rank move: Move,
val kingside = move.moveType == MoveType.CastleKingside promo: PromotionPiece,
val (rookFrom, rookTo) = board: Board,
if kingside then (Square(File.H, rank), Square(File.F, rank)) wkSq: Int,
else (Square(File.A, rank), Square(File.D, rank)) bkSq: Int,
val rook = Piece(king.color, PieceType.Rook) ): Unit =
subtractColumn(l1, featureIndex(king, squareNum(move.from)))
addColumn(l1, featureIndex(king, squareNum(move.to)))
subtractColumn(l1, featureIndex(rook, squareNum(rookFrom)))
addColumn(l1, featureIndex(rook, squareNum(rookTo)))
}
private def applyPromotionDelta(l1: Array[Float], move: Move, promo: PromotionPiece, board: Board): Unit =
board.pieceAt(move.from).foreach { pawn => board.pieceAt(move.from).foreach { pawn =>
val toNum = squareNum(move.to) val toNum = squareNum(move.to)
subtractColumn(l1, featureIndex(pawn, squareNum(move.from))) removePiece(l1, pawn, squareNum(move.from), wkSq, bkSq)
board.pieceAt(move.to).foreach(cap => subtractColumn(l1, featureIndex(cap, toNum))) board.pieceAt(move.to).foreach(cap => removePiece(l1, cap, toNum, wkSq, bkSq))
addColumn(l1, featureIndex(Piece(pawn.color, promotedType(promo)), toNum)) addPiece(l1, Piece(pawn.color, promotedType(promo)), toNum, wkSq, bkSq)
} }
private def promotedType(promo: PromotionPiece): PieceType = promo match private def promotedType(promo: PromotionPiece): PieceType = promo match
@@ -154,7 +174,6 @@ class NNUE(model: NbaiModel):
score score
def evaluateAtPlyWithValidation(ply: Int, turn: Color, hash: Long, board: Board): Int = def evaluateAtPlyWithValidation(ply: Int, turn: Color, hash: Long, board: Board): Int =
// For debugging: validate that incremental accumulator matches recomputation
if validateAccum && ply > 0 && ply % 10 != 0 then if validateAccum && ply > 0 && ply % 10 != 0 then
val isValid = validateAccumulator(ply, board) val isValid = validateAccumulator(ply, board)
if !isValid then System.err.println(s"WARNING: NNUE accumulator diverged at ply $ply") if !isValid then System.err.println(s"WARNING: NNUE accumulator diverged at ply $ply")
@@ -206,8 +225,10 @@ class NNUE(model: NbaiModel):
private val legacyL1 = new Array[Float](accSize) private val legacyL1 = new Array[Float](accSize)
def evaluate(context: GameContext): Int = def evaluate(context: GameContext): Int =
val wkSq = wkSqOf(context.board)
val bkSq = bkSqOf(context.board)
System.arraycopy(model.weights(0).bias, 0, legacyL1, 0, accSize) System.arraycopy(model.weights(0).bias, 0, legacyL1, 0, accSize)
for (sq, piece) <- context.board.pieces do addColumn(legacyL1, featureIndex(piece, squareNum(sq))) for (sq, piece) <- context.board.pieces do addPiece(legacyL1, piece, squareNum(sq), wkSq, bkSq)
runL2toOutput(legacyL1, context.turn) runL2toOutput(legacyL1, context.turn)
def benchmark(): Unit = def benchmark(): Unit =
@@ -85,17 +85,17 @@ class HybridBotTest extends AnyFunSuite with Matchers:
private val altMove = Move(Square(File.E, Rank.R2), Square(File.E, Rank.R3), MoveType.Normal()) private val altMove = Move(Square(File.E, Rank.R2), Square(File.E, Rank.R3), MoveType.Normal())
private def vetoRules: RuleSet = new RuleSet: private def vetoRules: RuleSet = new RuleSet:
private def fresh(ctx: GameContext): Boolean = ctx.moves.isEmpty private def fresh(ctx: GameContext): Boolean = ctx.moves.isEmpty
def candidateMoves(context: GameContext)(square: Square): List[Move] = Nil def candidateMoves(context: GameContext)(square: Square): List[Move] = Nil
def legalMoves(context: GameContext)(square: Square): List[Move] = Nil def legalMoves(context: GameContext)(square: Square): List[Move] = Nil
def allLegalMoves(context: GameContext): List[Move] = def allLegalMoves(context: GameContext): List[Move] =
if fresh(context) then List(mateMove, altMove) else Nil if fresh(context) then List(mateMove, altMove) else Nil
def isCheck(context: GameContext): Boolean = false def isCheck(context: GameContext): Boolean = false
def isCheckmate(context: GameContext): Boolean = context.moves.lastOption.contains(mateMove) def isCheckmate(context: GameContext): Boolean = context.moves.lastOption.contains(mateMove)
def isStalemate(context: GameContext): Boolean = context.moves.lastOption.contains(altMove) def isStalemate(context: GameContext): Boolean = context.moves.lastOption.contains(altMove)
def isInsufficientMaterial(context: GameContext): Boolean = false def isInsufficientMaterial(context: GameContext): Boolean = false
def isFiftyMoveRule(context: GameContext): Boolean = false def isFiftyMoveRule(context: GameContext): Boolean = false
def isThreefoldRepetition(context: GameContext): Boolean = false def isThreefoldRepetition(context: GameContext): Boolean = false
def applyMove(context: GameContext)(move: Move): GameContext = def applyMove(context: GameContext)(move: Move): GameContext =
context.copy(turn = context.turn.opposite, moves = context.moves :+ move) context.copy(turn = context.turn.opposite, moves = context.moves :+ move)