PDNS API Code Examples
Introduction
To help with querying the PDNS API this page has more advanced code examples. These code examples show how to login, save the response with the token to a file for future queries and use the token to make a query. These examples can be altered to allow more complex behavior. If your program is intended for continuous operation you can put in a check against the ‘expires’ field to retrieve a token before it expires.
Bash
This code requires the usage of curl
and jq
programs.
USER=""
PASSWORD=""
REALM="pdns"
LOGIN_URL="https://api.spamhaus.org/api/v1/login"
API_URL="https://pdnsapi.deteque.com/api/query/v2/"
TOKEN_FILE="token.json"
DEFAULT_OUTPUT="csv"
DEFAULT_QUERY="ip"
load_token() {
local current_time=$(date +%s)
if [ ! -f $TOKEN_FILE ]; then
get_new_token
echo "No token file. Getting new token..."
else
expires="$(jq -r '.expires' "$TOKEN_FILE")"
token="$(jq -r '.token' "$TOKEN_FILE")"
fi
if [ $token = "null" ]; then
echo "Invalid token. Getting new token..."
get_new_token
fi
# expires - 5 minutes to prevent token expiring during program runtime
if [ $current_time -gt $(($expires - 300)) ]; then
get_new_token
echo "Token is expired. Getting new token..."
else
return 1 # Token is not expired
fi
}
get_new_token() {
response=$(curl -s -d "
{
\"username\":\"${USER}\",
\"password\":\"${PASSWORD}\",
\"realm\":\"${REALM}\"
}" ${LOGIN_URL}
)
echo $response > $TOKEN_FILE
expires="$(echo $response | jq -r '.expires')"
token="$(echo $response | jq -r '.token')"
code="$(echo $response | jq -r '.code')"
if [ "$code" -ne 200 ]; then
echo "Invalid login request $response"
exit 1
fi
}
load_token
if [ $# -eq 3 ]; then
querytype=$1
output=$2
target=$3
elif [ $# -eq 2 ]; then
querytype=$1
output=$DEFAULT_OUTPUT
target=$2
elif [ $# -eq 1 ]; then
querytype=$DEFAULT_QUERY
output=$DEFAULT_OUTPUT
target=$1
else
echo -e "\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n"
exit 0
fi
/usr/bin/curl -H "Authorization: Bearer ${token}" ${API_URL}"?queryType=${querytype}&target=${target}&output=${output}"
Python
import json
import os.path
import time
import requests
import sys
USER = ""
PASSWORD = ""
REALM = "pdns"
LOGIN_URL = "https://api.spamhaus.org/api/v1/login"
API_URL = "https://pdnsapi.deteque.com/api/query/v2/"
TOKEN_FILE = "token.json"
DEFAULT_OUTPUT = "csv"
DEFAULT_QUERY = "ip"
def load_token():
if not os.path.exists(TOKEN_FILE):
expires, token = get_new_token()
return expires, token
fp = open(TOKEN_FILE, "r")
try:
data = json.load(fp)
except json.JSONDecodeError:
expires, token = get_new_token()
fp.close()
return expires, token
fp.close()
if "expires" not in data:
print("Invalid JSON format, getting new token")
expires, token = get_new_token()
return expires, token
if "token" not in data:
print("Invalid JSON format, getting new token")
expires, token = get_new_token()
return expires, token
# expires minus 5 minutes to prevent token expiring during program runtime
if data["expires"] < (time.time() - 300) :
print("Token expired, getting new token")
expires, token = get_new_token()
return expires, token
return data["expires"], data["token"]
def get_new_token():
r = requests.post(LOGIN_URL, json={"username": USER, "password": PASSWORD, "realm": REALM})
if r.status_code != 200:
print("Login bad status code", r.json())
exit(1)
fp = open(TOKEN_FILE, "w")
fp.write(r.text)
fp.close()
data = r.json()
return data["expires"], data["token"]
expires, token = load_token()
if len(sys.argv) == 4:
querytype = sys.argv[1]
output = sys.argv[2]
target = sys.argv[3]
elif len(sys.argv) == 3:
querytype = sys.argv[1]
output = DEFAULT_OUTPUT
target = sys.argv[2]
elif len(sys.argv) == 2:
querytype = DEFAULT_QUERY
output = DEFAULT_OUTPUT
target = sys.argv[1]
else:
print("\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n")
sys.exit(0)
PARAMS = {'queryType' : querytype,
'target' : target,
'output': output,
}
r = requests.get(API_URL, params = PARAMS, headers={'Authorization': 'Bearer {}'.format(token)})
print(r.text)
PHP
This code requires the php-curl and php-json extensions.
<?php
define("USER", "");
define("PASSWORD", "");
define("REALM", "pdns");
define("TOKEN_FILE", "token.json");
define("LOGIN_URL","https://api.spamhaus.org/api/v1/login");
define("API_URL","https://pdnsapi.deteque.com/api/query/v2/");
define("DEFAULT_QUERY","ip");
define("DEFAULT_FORMAT","csv");
function load_token() {
if (!file_exists(TOKEN_FILE)) {
list($expires, $token) = get_new_token();
return array($expires, $token);
}
$fp = fopen(TOKEN_FILE, "r");
$data = fread($fp, filesize(TOKEN_FILE));
fclose($fp);
$data = json_decode($data, true);
if ($data === null) {
list($expires, $token) = get_new_token();
return array($expires, $token);
}
if (!isset($data["expires"]) || !isset($data["token"])) {
echo "Invalid JSON format, getting new token\n";
list($expires, $token) = get_new_token();
return array($expires, $token);
}
# expires minus 5 minutes to prevent token expiring during program runtime
if ($data["expires"] < (time() - 300)) {
echo "Token expired, getting new token\n";
list($expires, $token) = get_new_token();
return array($expires, $token);
}
return array($data["expires"], $data["token"]);
}
function get_new_token() {
$ch = curl_init();
$data = array(
'username' => USER,
'password' => PASSWORD,
'realm' => REALM,
);
$payload = json_encode($data);
curl_setopt($ch, CURLOPT_URL, LOGIN_URL);
curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_PROTOCOLS,CURLPROTO_HTTPS);
$response = curl_exec($ch);
$return_code = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if($return_code != "200") {
die("Login bad status code " . $response);
}
$data = json_decode($response, true);
$fp = fopen(TOKEN_FILE, "w");
fwrite($fp, $response);
fclose($fp);
$data = json_decode($response, true);
return array($data["expires"], $data["token"]);
}
function get_data($token, $querytype, $format, $target){
$ch = curl_init();
$href = sprintf("%s?queryType=%s&target=%s&output=%s",API_URL,$querytype,$target, $format);
$header = array();
$header[] = 'Authorization: Bearer '. $token;
curl_setopt($ch, CURLOPT_URL, $href);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
curl_setopt($ch, CURLOPT_HTTPGET, true);
curl_setopt($ch, CURLOPT_HTTPHEADER, $header);
curl_setopt($ch, CURLOPT_PROTOCOLS,CURLPROTO_HTTPS);
$response = curl_exec($ch);
curl_close($ch);
return($response);
}
list($expires, $token) = load_token();
switch($argc){
case 4: $page = get_data($token, $argv[1], $argv[2], $argv[3]);
echo $page;
break;
case 3: $page = get_data($token, $argv[1], DEFAULT_FORMAT, $argv[2]);
echo $page;
break;
case 2: $page = get_data($token, DEFAULT_QUERY, DEFAULT_FORMAT, $argv[1]);
echo $page;
break;
default: echo "\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n\n";
}
?>
Go
package main
import (
"fmt"
"os"
"encoding/json"
"net/http"
"time"
"bytes"
"io"
)
const (
USER = ""
PASSWORD = ""
REALM = "pdns"
LOGIN_URL = "https://api.spamhaus.org/api/v1/login"
API_URL = "https://pdnsapi.deteque.com/api/query/v2/"
TOKEN_FILE = "token.json"
DEFAULT_OUTPUT = "csv"
DEFAULT_QUERY = "ip"
)
func main() {
_, token, err := loadToken()
if err != nil {
fmt.Println(err)
return
}
var querytype, output, target string
switch len(os.Args) {
case 4:
querytype = os.Args[1]
output = os.Args[2]
target = os.Args[3]
case 3:
querytype = os.Args[1]
output = DEFAULT_OUTPUT
target = os.Args[2]
case 2:
querytype = DEFAULT_QUERY
output = DEFAULT_OUTPUT
target = os.Args[1]
default:
fmt.Println("\nSyntax - lookup [host|hostdom|ip|nameserver|domain|domsearch|cname|chost|mxd|mx|new_domain|txt|soa] [csv|json|jobject] target \n")
os.Exit(0)
}
body, err := getData(token, querytype, target, output)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(body)
}
func getData(token string, querytype string, target string, output string) (body string, fErr error) {
client := &http.Client{}
req, _ := http.NewRequest("GET", fmt.Sprintf("%s?queryType=%s&target=%s&output=%s", API_URL, querytype, target, output), nil)
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
resp, err := client.Do(req)
if err != nil {
fErr = fmt.Errorf("AUTH URL Broken %v", err)
return
}
defer resp.Body.Close()
rBody, err := io.ReadAll(resp.Body)
if err != nil {
fErr = fmt.Errorf("AUTH URL INVALID return %v\n", err)
return
}
return string(rBody), nil
}
func loadToken() (expires int64, token string, fErr error) {
if _, err := os.Stat(TOKEN_FILE); err != nil {
expires, token, fErr = getNewToken()
return
}
data, err := os.ReadFile(TOKEN_FILE)
if err != nil {
expires, token, fErr = getNewToken()
return
}
var recAuth JsonReceiveAuth
err = json.Unmarshal(data, &recAuth)
if err != nil {
fmt.Println("Invalid JSON format, getting new token")
expires, token, fErr = getNewToken()
return
}
if recAuth.Token == "" {
fmt.Println("Invalid JSON format, getting new token")
expires, token, fErr = getNewToken()
return
}
// expires minus 5 minutes to prevent token expiring during program runtime
if recAuth.Expires < (time.Now().Unix() - 300) {
fmt.Println("Token expired, getting new token")
expires, token, fErr = getNewToken()
return
}
return recAuth.Expires, recAuth.Token, nil
}
func getNewToken() (expires int64, token string, fErr error) {
var sendAuth JsonSendAuth
var recAuth JsonReceiveAuth
sendAuth.Username = USER
sendAuth.Password = PASSWORD
sendAuth.Realm = REALM
bSendAuth, _ := json.Marshal(sendAuth)
client := &http.Client{}
req, _ := http.NewRequest("POST", LOGIN_URL, bytes.NewBuffer(bSendAuth))
resp, err := client.Do(req)
if err != nil {
fErr = fmt.Errorf("AUTH URL Broken %v", err)
return
}
defer resp.Body.Close()
rBody, err := io.ReadAll(resp.Body)
if err != nil {
fErr = fmt.Errorf("AUTH URL INVALID return %v\n", err)
return
}
if resp.StatusCode != http.StatusOK {
fErr = fmt.Errorf("AUTH URL bad status code %v %v\n", resp.StatusCode, string(rBody))
return
}
json.Unmarshal(rBody, &recAuth)
if recAuth.Code != 200 {
fErr = fmt.Errorf("AUTH URL bad status code in JSON body: %v %v\n", recAuth.Code, recAuth.Message)
return
}
token = recAuth.Token
expires = recAuth.Expires
err = os.WriteFile(TOKEN_FILE, rBody, 0600)
if err != nil {
fErr = fmt.Errorf("Error writing credential file: %v %v", TOKEN_FILE, err)
return
}
return
}
type JsonSendAuth struct {
Username string `json:"username"`
Password string `json:"password"`
Realm string `json:"realm"`
}
type JsonReceiveAuth struct {
Code int `json:"code"`
Token string `json:"token"`
Expires int64 `json:"expires"`
Message string `json:"message"`
}