PDNS API Code Examples

Introduction

To help with querying the PDNS API this page has more advanced code examples. These code examples show how to login, save the response with the token to a file for future queries and use the token to make a query. These examples can be altered to allow more complex behavior. If your program is intended for continuous operation you can put in a check against the ‘expires’ field to retrieve a token before it expires.

Bash

This code requires the usage of curl and jq programs.

USER=""
PASSWORD=""
REALM="pdns"
LOGIN_URL="https://api.spamhaus.org/api/v1/login"

API_URL="https://pdnsapi.deteque.com/api/query/v2/"
TOKEN_FILE="token.json"

DEFAULT_OUTPUT="csv"
DEFAULT_QUERY="ip"

load_token() {
  local current_time=$(date +%s)

  if [ ! -f $TOKEN_FILE ]; then
    get_new_token
    echo "No token file. Getting new token..."
  else
    expires="$(jq -r '.expires' "$TOKEN_FILE")"
    token="$(jq -r '.token' "$TOKEN_FILE")"
  fi

  if [ $token = "null" ]; then
    echo "Invalid token. Getting new token..."
    get_new_token
  fi
  
  # expires - 5 minutes to prevent token expiring during program runtime
  if [ $current_time -gt $(($expires - 300)) ]; then
    get_new_token
    echo "Token is expired. Getting new token..."
  else
    return 1  # Token is not expired
  fi
}

get_new_token() {
  response=$(curl -s -d "
    {
  \"username\":\"${USER}\",
  \"password\":\"${PASSWORD}\",
  \"realm\":\"${REALM}\"
    }" ${LOGIN_URL}

  )
  echo $response > $TOKEN_FILE
  expires="$(echo $response | jq -r '.expires')"
  token="$(echo $response | jq  -r '.token')"
  code="$(echo $response | jq  -r '.code')"
  if [ "$code" -ne 200 ]; then
    echo "Invalid login request $response"
    exit 1
  fi

}

load_token

if [ $# -eq 3 ]; then
    querytype=$1
    output=$2
    target=$3
elif [ $# -eq 2 ]; then
    querytype=$1
    output=$DEFAULT_OUTPUT
    target=$2
elif [ $# -eq 1 ]; then
    querytype=$DEFAULT_QUERY
    output=$DEFAULT_OUTPUT
    target=$1
else
    echo -e "\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n"
    exit 0
fi

/usr/bin/curl -H "Authorization: Bearer ${token}" ${API_URL}"?queryType=${querytype}&target=${target}&output=${output}"

Python

import json
import os.path
import time
import requests
import sys

USER = ""
PASSWORD = ""
REALM = "pdns"
LOGIN_URL = "https://api.spamhaus.org/api/v1/login"
API_URL = "https://pdnsapi.deteque.com/api/query/v2/"

TOKEN_FILE = "token.json"

DEFAULT_OUTPUT = "csv"
DEFAULT_QUERY = "ip"

def load_token():
    if not os.path.exists(TOKEN_FILE):
        expires, token = get_new_token()
        return expires, token

    fp = open(TOKEN_FILE, "r")
    try:
        data = json.load(fp)
    except json.JSONDecodeError:
        expires, token = get_new_token()
        fp.close()
        return expires, token
        
    fp.close()
    if "expires" not in data:
        print("Invalid JSON format, getting new token")
        expires, token = get_new_token()
        return expires, token
    if "token" not in data:
        print("Invalid JSON format, getting new token")
        expires, token = get_new_token()
        return expires, token
    # expires minus 5 minutes to prevent token expiring during program runtime
    if data["expires"] < (time.time() - 300) :
        print("Token expired, getting new token")
        expires, token = get_new_token()
        return expires, token


    return data["expires"], data["token"]

def get_new_token():
    r = requests.post(LOGIN_URL, json={"username": USER, "password": PASSWORD, "realm": REALM})
    if r.status_code != 200:
        print("Login bad status code", r.json())
        exit(1)
    fp = open(TOKEN_FILE, "w")
    fp.write(r.text)
    fp.close()

    data = r.json()
    
    return data["expires"], data["token"]

expires, token = load_token()

if len(sys.argv) == 4:
    querytype = sys.argv[1]
    output = sys.argv[2]
    target = sys.argv[3]
elif len(sys.argv) == 3:
    querytype = sys.argv[1]
    output = DEFAULT_OUTPUT
    target = sys.argv[2]
elif len(sys.argv) == 2:
    querytype = DEFAULT_QUERY
    output = DEFAULT_OUTPUT
    target = sys.argv[1]
else:
    print("\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n")
    sys.exit(0)
    


PARAMS = {'queryType' : querytype,
    'target' : target,
    'output': output,
}

r = requests.get(API_URL, params = PARAMS, headers={'Authorization': 'Bearer {}'.format(token)})
print(r.text)

PHP

This code requires the php-curl and php-json extensions.

<?php

	define("USER", "");
	define("PASSWORD", "");
	define("REALM", "pdns");
	define("TOKEN_FILE", "token.json");

	define("LOGIN_URL","https://api.spamhaus.org/api/v1/login");
	define("API_URL","https://pdnsapi.deteque.com/api/query/v2/");

	define("DEFAULT_QUERY","ip");
	define("DEFAULT_FORMAT","csv");

	function load_token() {
		if (!file_exists(TOKEN_FILE)) {
			list($expires, $token) = get_new_token();
			return array($expires, $token);
		}

		$fp = fopen(TOKEN_FILE, "r");
		$data = fread($fp, filesize(TOKEN_FILE));
		fclose($fp);

		$data = json_decode($data, true);
		if ($data === null) {
			list($expires, $token) = get_new_token();
			return array($expires, $token);
		}

		if (!isset($data["expires"]) || !isset($data["token"])) {
			echo "Invalid JSON format, getting new token\n";
			list($expires, $token) = get_new_token();
			return array($expires, $token);
		}
        # expires minus 5 minutes to prevent token expiring during program runtime
		if ($data["expires"] < (time() - 300)) {
			echo "Token expired, getting new token\n";
			list($expires, $token) = get_new_token();
			return array($expires, $token);
		}

		return array($data["expires"], $data["token"]);
	}

	function get_new_token() {
		$ch = curl_init();
		$data = array(
			'username' => USER,
			'password' => PASSWORD,
			'realm' => REALM,
		);
		$payload = json_encode($data);
		curl_setopt($ch, CURLOPT_URL, LOGIN_URL);
		curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
		curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
		curl_setopt($ch, CURLOPT_PROTOCOLS,CURLPROTO_HTTPS);
		$response = curl_exec($ch);
		$return_code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
		curl_close($ch);
		if($return_code != "200") {
			die("Login bad status code " . $response);
		}
		
		$data = json_decode($response, true);
		$fp = fopen(TOKEN_FILE, "w");
		fwrite($fp, $response);
		fclose($fp);

		$data = json_decode($response, true);

		return array($data["expires"], $data["token"]);
	}

	function get_data($token, $querytype, $format, $target){
		$ch = curl_init();
		$href = sprintf("%s?queryType=%s&target=%s&output=%s",API_URL,$querytype,$target, $format);

		$header = array();
		$header[] = 'Authorization: Bearer '. $token;
		
		curl_setopt($ch, CURLOPT_URL, $href);
		curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
		curl_setopt($ch, CURLOPT_HTTPGET, true);
		curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
		curl_setopt($ch, CURLOPT_PROTOCOLS,CURLPROTO_HTTPS);
		$response = curl_exec($ch);
		curl_close($ch);
		return($response);
	}

	list($expires, $token) = load_token();

	switch($argc){
		case 4:	$page = get_data($token, $argv[1], $argv[2], $argv[3]);
			echo $page;
			break;
		case 3:	$page = get_data($token, $argv[1], DEFAULT_FORMAT, $argv[2]);
			echo $page;
			break;
		case 2: $page = get_data($token, DEFAULT_QUERY, DEFAULT_FORMAT, $argv[1]);
			echo $page;
			break;
		default: echo "\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n\n";
	}

?>

Go

package main

import (
	"fmt"
	"os"
	"encoding/json"
	"net/http"
	"time"
	"bytes"
	"io"
)

const (
	USER = ""
	PASSWORD = ""
	REALM = "pdns"

	LOGIN_URL = "https://api.spamhaus.org/api/v1/login"
	API_URL = "https://pdnsapi.deteque.com/api/query/v2/"

	TOKEN_FILE = "token.json"

	DEFAULT_OUTPUT = "csv"
	DEFAULT_QUERY = "ip"
)

func main() {
	_, token, err := loadToken()
	if err != nil {
		fmt.Println(err)
		return
	}

	var querytype, output, target string

	switch len(os.Args) {
	case 4:
		querytype = os.Args[1]
		output = os.Args[2]
		target = os.Args[3]
	case 3:
		querytype = os.Args[1]
		output = DEFAULT_OUTPUT
		target = os.Args[2]
	case 2:
		querytype = DEFAULT_QUERY
		output = DEFAULT_OUTPUT
		target = os.Args[1]
	default:
		fmt.Println("\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n")
		os.Exit(0)
	}

	body, err := getData(token, querytype, target, output)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Println(body)
}

func getData(token string, querytype string, target string, output string) (body string, fErr error) {
	client := &http.Client{}
	req, _ := http.NewRequest("GET", fmt.Sprintf("%s?queryType=%s&target=%s&output=%s", API_URL, querytype, target, output), nil)
	req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
	resp, err := client.Do(req)
	if err != nil {
		fErr =	fmt.Errorf("AUTH URL Broken %v", err)
		return
	}   
	defer resp.Body.Close()
	rBody, err := io.ReadAll(resp.Body)
	if err != nil {
		fErr = fmt.Errorf("AUTH URL INVALID return %v\n", err)
		return
	}
	return string(rBody), nil

}

func loadToken() (expires int64, token string, fErr error) {
	if _, err := os.Stat(TOKEN_FILE); err != nil {
		expires, token, fErr = getNewToken()
		return
	}
	data, err := os.ReadFile(TOKEN_FILE)
	if err != nil {
		expires, token, fErr = getNewToken()
		return
	}
	var recAuth JsonReceiveAuth
	err = json.Unmarshal(data, &recAuth)
	if err != nil {
		fmt.Println("Invalid JSON format, getting new token")
		expires, token, fErr = getNewToken()
		return
	}
	if recAuth.Token == "" {
		fmt.Println("Invalid JSON format, getting new token")
		expires, token, fErr = getNewToken()
		return
	}
	// expires minus 5 minutes to prevent token expiring during program runtime
	if recAuth.Expires < (time.Now().Unix() - 300)  {
		fmt.Println("Token expired, getting new token")
		expires, token, fErr = getNewToken()
		return
	}

	return recAuth.Expires, recAuth.Token, nil

}

func getNewToken() (expires int64, token string, fErr error) {
	var sendAuth JsonSendAuth
	var recAuth JsonReceiveAuth
	sendAuth.Username = USER
	sendAuth.Password = PASSWORD
	sendAuth.Realm = REALM

	bSendAuth, _ := json.Marshal(sendAuth)

	client := &http.Client{}
	req, _ := http.NewRequest("POST", LOGIN_URL, bytes.NewBuffer(bSendAuth))
	resp, err := client.Do(req)
	if err != nil {
		fErr =	fmt.Errorf("AUTH URL Broken %v", err)
		return
	}
	defer resp.Body.Close()
	rBody, err := io.ReadAll(resp.Body)
	if err != nil {
		fErr = fmt.Errorf("AUTH URL INVALID return %v\n", err)
		return
	}
	if resp.StatusCode != http.StatusOK {
		fErr = fmt.Errorf("AUTH URL bad status code %v %v\n", resp.StatusCode, string(rBody))
		return
	}
	json.Unmarshal(rBody, &recAuth)
	if recAuth.Code != 200 {
		fErr = fmt.Errorf("AUTH URL bad status code in JSON body: %v %v\n", recAuth.Code, recAuth.Message)
		return
	}
	token = recAuth.Token
	expires = recAuth.Expires

	err = os.WriteFile(TOKEN_FILE, rBody, 0600)
	if err != nil {
		fErr = fmt.Errorf("Error writing credential file: %v %v", TOKEN_FILE, err)
		return
	}
	return

}

type JsonSendAuth struct {
	Username string `json:"username"`
	Password string `json:"password"`
	Realm	 string `json:"realm"`
}

type JsonReceiveAuth struct {
	Code	int    `json:"code"`
	Token	string `json:"token"`
	Expires int64	 `json:"expires"`
	Message string `json:"message"`
}