pytest: adding support for dnsd test server

Add tests to test_21 with checks for 'overrride-dns' from curlinfo.

Closes #21289
This commit is contained in:
Stefan Eissing 2026-04-10 13:31:02 +02:00 committed by Daniel Stenberg
parent 7c8f11e953
commit a5542c23e7
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
6 changed files with 214 additions and 2 deletions

View File

@ -28,6 +28,7 @@ TESTENV = \
testenv/certs.py \
testenv/client.py \
testenv/curl.py \
testenv/dnsd.py \
testenv/dante.py \
testenv/env.py \
testenv/httpd.py \

View File

@ -27,9 +27,10 @@
import logging
import os
from datetime import timedelta
from typing import Generator
import pytest
from testenv import CurlClient, Env, LocalClient
from testenv import CurlClient, Env, LocalClient, Dnsd
log = logging.getLogger(__name__)
@ -38,6 +39,13 @@ log = logging.getLogger(__name__)
@pytest.mark.skipif(condition=not Env.curl_has_feature('AsynchDNS'), reason="needs AsynchDNS")
class TestResolve:
@pytest.fixture(scope='class')
def dnsd(self, env: Env) -> Generator[Dnsd, None, None]:
dnsd = Dnsd(env=env)
assert dnsd.initial_start()
yield dnsd
dnsd.stop()
@pytest.fixture(autouse=True, scope='class')
def _class_scope(self, env, httpd):
indir = httpd.docs_dir
@ -117,6 +125,45 @@ class TestResolve:
r.check_stats(count=count, http_status=0, exitcode=6)
assert r.duration > timedelta(milliseconds=count * delay_ms), f'{r}'
# dnsd with no answers
@pytest.mark.skipif(condition=not Env.curl_override_dns(), reason="no DNS override")
def test_21_06_dnsd_empty(self, env: Env, httpd, dnsd):
dnsd.set_answers()
run_env = os.environ.copy()
run_env['CURL_DNS_SERVER'] = f'127.0.0.1:{dnsd.port}'
curl = CurlClient(env=env, run_env=run_env, force_resolv=False)
url = f'https://test-dnsd.http.curl.invalid/'
r = curl.http_download(urls=[url], with_stats=True)
r.check_exit_code(6) # could not resolve host
r.check_stats(count=1, http_status=0, exitcode=6)
# dnsd with one answer for A
@pytest.mark.skipif(condition=not Env.curl_override_dns(), reason="no DNS override")
def test_21_07_dnsd_a(self, env: Env, httpd, dnsd):
dnsd.set_answers(addr_a=['127.0.0.1'])
run_env = os.environ.copy()
run_env['CURL_DNS_SERVER'] = f'127.0.0.1:{dnsd.port}'
curl = CurlClient(env=env, run_env=run_env, force_resolv=False)
url = f'https://{env.authority_for(env.domain1, "http/1.1")}/data.json'
r = curl.http_download(urls=[url], with_stats=True)
r.check_exit_code(0)
r.check_stats(count=1, http_status=200, exitcode=0)
assert r.stats[0]['remote_ip'] == '127.0.0.1'
# dnsd with one answer for AAAA
@pytest.mark.skipif(condition=not Env.curl_override_dns(), reason="no DNS override")
@pytest.mark.skipif(condition=not Env.curl_has_feature('IPv6'), reason="no IPv6")
def test_21_08_dnsd_aaaa(self, env: Env, httpd, dnsd):
dnsd.set_answers(addr_aaaa=['[::1]'])
run_env = os.environ.copy()
run_env['CURL_DNS_SERVER'] = f'127.0.0.1:{dnsd.port}'
curl = CurlClient(env=env, run_env=run_env, force_resolv=False)
url = f'https://{env.authority_for(env.domain1, "http/1.1")}/data.json'
r = curl.http_download(urls=[url], with_stats=True)
r.check_exit_code(0)
r.check_stats(count=1, http_status=200, exitcode=0)
assert r.stats[0]['remote_ip'] == '::1'
def _clean_files(self, files):
for file in files:
if os.path.exists(file):

View File

@ -36,6 +36,7 @@ from .caddy import Caddy
from .certs import Credentials, TestCA
from .client import LocalClient
from .dante import Dante
from .dnsd import Dnsd
from .env import Env
from .httpd import Httpd
from .nghttpx import Nghttpx, NghttpxFwd, NghttpxQuic

157
tests/http/testenv/dnsd.py Normal file
View File

@ -0,0 +1,157 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#***************************************************************************
# _ _ ____ _
# Project ___| | | | _ \| |
# / __| | | | |_) | |
# | (__| |_| | _ <| |___
# \___|\___/|_| \_\_____|
#
# Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
#
# This software is licensed as described in the file COPYING, which
# you should have received as part of this distribution. The terms
# are also available at https://curl.se/docs/copyright.html.
#
# You may opt to use, copy, modify, merge, publish, distribute and/or sell
# copies of the Software, and permit persons to whom the Software is
# furnished to do so, under the terms of the COPYING file.
#
# This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
# KIND, either express or implied.
#
# SPDX-License-Identifier: curl
#
###########################################################################
#
import logging
import os
import socket
import subprocess
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from .env import Env
from .ports import alloc_ports_and_do
log = logging.getLogger(__name__)
class Dnsd:
def __init__(self, env: Env):
self.env = env
self._cmd = os.path.join(env.build_dir, 'tests/server/servers')
self._port = 0
self.name = 'dnsd'
self._port_skey = 'dnsd'
self._port_specs = {
'dnsd': socket.SOCK_DGRAM,
}
self._dnsd_dir = os.path.join(env.gen_dir, self.name)
self._log_dir = self._dnsd_dir
self._lock_dir = os.path.join(self._dnsd_dir, 'lock')
self._log_file = os.path.join(self._log_dir, 'dnsd.log')
self._conf_file = os.path.join(self._log_dir, 'dnsd.cmd')
self._pid_file = os.path.join(self._log_dir, 'dante.pid')
self._error_log = os.path.join(self._log_dir, 'dnsd.err.log')
self._process = None
self.clear_logs()
@property
def port(self) -> int:
return self._port
def clear_logs(self):
self._rmf(self._log_file)
self._rmf(self._error_log)
def exists(self):
return os.path.exists(self._cmd)
def is_running(self):
if self._process:
self._process.poll()
return self._process.returncode is None
return False
def start_if_needed(self):
if not self.is_running():
return self.start()
return True
def stop(self, wait_dead=True):
if self._process:
self._process.terminate()
self._process.wait(timeout=2)
self._process = None
return not wait_dead or True
return True
def restart(self):
self.stop()
return self.start()
def initial_start(self):
self._mkpath(self._lock_dir)
def startup(ports: Dict[str, int]) -> bool:
self._port = ports[self._port_skey]
if self.start():
self.env.update_ports(ports)
return True
self.stop()
self._port = 0
return False
return alloc_ports_and_do(self._port_specs, startup,
self.env.gen_root, max_tries=3)
def start(self, wait_live=True):
assert self._port > 0
if self._process:
self.stop()
self.set_answers() # empty answers by default
self._rmf(self._log_file)
args = [
self._cmd, 'dnsd',
'--port', str(self._port),
'--logdir', f'{self._log_dir}',
'--logfile', f'{self._log_file}',
'--pidfile', f'{self._pid_file}',
]
procerr = open(self._error_log, 'a')
self._process = subprocess.Popen(args=args, stderr=procerr)
if self._process.returncode is not None:
return False
return self.wait_live(timeout=timedelta(seconds=Env.SERVER_TIMEOUT))
def wait_live(self, timeout: timedelta):
try_until = datetime.now() + timeout
while datetime.now() < try_until:
if os.path.exists(self._log_file):
return True
time.sleep(.1)
log.error(f"Server still not responding after {timeout}")
return False
def _rmf(self, path):
if os.path.exists(path):
os.remove(path)
def _mkpath(self, path):
if not os.path.exists(path):
os.makedirs(path)
def set_answers(self, addr_a: Optional[List[str]] = None,
addr_aaaa: Optional[List[str]] = None):
conf = []
if addr_a:
conf.extend([f'A: {addr}' for addr in addr_a])
if addr_aaaa:
conf.extend([f'AAAA: {addr}' for addr in addr_aaaa])
conf.append('\n')
with open(self._conf_file, 'w') as fd:
fd.write("\n".join(conf))

View File

@ -164,6 +164,7 @@ class EnvConfig:
raise RuntimeError(f'{self.curlinfo} failed with exit code: {p.returncode}')
self.curl_is_verbose = 'verbose-strings: ON' in p.stdout
self.curl_can_cert_status = 'cert-status: ON' in p.stdout
self.curl_override_dns = 'override-dns: ON' in p.stdout
self.ports = {}
@ -510,6 +511,10 @@ class Env:
def curl_can_cert_status() -> bool:
return Env.CONFIG.curl_can_cert_status
@staticmethod
def curl_override_dns() -> bool:
return Env.CONFIG.curl_override_dns
@staticmethod
def curl_can_early_data() -> bool:
if Env.curl_uses_lib('gnutls'):

View File

@ -365,7 +365,8 @@ static void read_instructions(void)
}
}
else {
rc = 0;
/* accept empty line */
rc = buf[0] ? 0 : 1;
}
if(rc != 1) {
logmsg("Bad line in %s: '%s'\n", file, buf);