Apache SkyWalking - Python Agent Performance Enhancement Plan

Proposal for Google Summer of Code 2023

About Me

Personal Information

Why Me

My primary programming languages are Python, Java and Go, with Python being my best. I am interested in microservices and cloud-native, especially for cloud-native observability.

Even though I’m new to open source, I’m passionate about code. I often write small programs using different technology stacks and upload them to my Github repository. This year is the first time I learned about Google Summer of Code, and I think participating in this event will definitely allow me to get more technically advanced and create cooler code!

I had experience with OpenTelemetry during one of my previous corporate internships, and since OpenTelemetry’s Python agent implementation is very similar to Skywalking’s Python agent, I believe I was able to quickly become familiar with Skywalking’s Python agent code architecture and operating principles.

I’m also familiar with the async concept. I am currently pursuing a Master’s degree in Computer Science and Technology at the Information Networking Center at Beijing University of Posts and Telecommunications. In my daily projects, I often need to implement large-scale network IO tasks, which gives me some experience with parallel programming.

Project Description

  • Project Name: Apache SkyWalking - Python Agent Performance Enhancement Plan
  • Mentor: @Superskyyy, @kezhenxu94
  • Abstract: Currently, SkyWalking Python agent is implemented with the Threading module to provide data reporters. Yet with the growth of the Python agent, it is now fully capable and requires more resources than when only tracing was supported.
  • Goal:
    • Deprecate or provide an alternative implementation of data reporters (Trace/Log/Meter), maybe also for profilers.
    • The alternative implementation should also work for gRPC/HTTP/Kafka using corresponding async clients.
    • A simple but reliable performance test job in CI/local.

Homework

Familiarity with the Skywalking Ecosystem

  • Build from source and run Skywalking Python agent, make dev environment and dependencies ready on my machine.
  • Try to finish the task in issue#10447, currently waiting for mentor to reproduce the problem with my code.

Investigate Asynchronous Protocol API Implementations

For gRPC

The more clear solution is to use the official grpc.aio, which obviously has the best support.

For Kafka

confluent kafka and aiokafka can both be used for asynchronous api.

  • confluent kafka has a more active community, but currently does not support out-of-the-box use of asynio and requires more code to implement asynchronous functionality.
  • aiokafka’s API is more convenient and intuitive to use, and it seamlessly supports the asyncio library. But it has a relatively small community and may not have an advantage in terms of performance.

Here is a simple benchmark of the two libraries (Skywalking Python agent always plays a role of kafka producer):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from confluent_kafka import KafkaException
from threading import Thread
from aiokafka import AIOKafkaProducer

import asyncio
import confluent_kafka
import time


class AIOProducer:
def __init__(self, configs, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._producer = confluent_kafka.Producer(configs)
self._cancelled = False
self._poll_thread = Thread(target=self._poll_loop)
self._poll_thread.start()

def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0)

def close(self):
self._cancelled = True
self._poll_thread.join()

def produce_with_callback(self, topic, value, on_delivery=None):
result = self._loop.create_future()

def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
else:
self._loop.call_soon_threadsafe(result.set_result, msg)
if on_delivery:
self._loop.call_soon_threadsafe(on_delivery, err, msg)
self._producer.produce(topic, value, on_delivery=ack)
return result


DATA_SIZE = 5000

async def send_confluent():
producer = AIOProducer({"bootstrap.servers": "localhost:9094"})
for _ in range(DATA_SIZE):
try:
# Produce messages
await producer.produce_with_callback("test-topic", "message")
except:
raise
producer.close()

async def send_aio():
producer = AIOKafkaProducer(bootstrap_servers='localhost:9094')
await producer.start()
for _ in range(DATA_SIZE):
try:
# Produce messages
await producer.send_and_wait("test-topic", b"message")
except:
await producer.stop()
raise
await producer.stop()

def send_normal(on_delivery=None):
producer = confluent_kafka.Producer({"bootstrap.servers": "localhost:9094"})
for _ in range(DATA_SIZE):
try:
# Produce messages
producer.poll(0)
producer.produce("test-topic", "message", on_delivery=on_delivery)
except:
raise
producer.flush()

start_time = time.perf_counter()
asyncio.run(send_confluent())
end_time = time.perf_counter()
print(f"confluent_kafka async: {end_time - start_time:.2f} seconds")

start_time = time.perf_counter()
asyncio.run(send_aio())
end_time = time.perf_counter()
print(f"aiokafka async: {end_time - start_time:.2f} seconds")


cnt = 0
def ack(err, msg):
global cnt
cnt = cnt + 1

start_time = time.perf_counter()
send_normal(ack)
end_time = time.perf_counter()
print(f"confluent_kafka sync: {end_time - start_time:.2f} seconds")

The result is:

1
2
3
confluent_kafka async: 27.60 seconds
aiokafka async: 3.66 seconds
confluent_kafka sync: 0.03 seconds

Let me explain this result in detail.

In confluent kafka, the main blocking operations are the poll()/flush() functions, which are used to receive callback events for messages produced by the producer (see this issue). To enable confluent kafka to run on the asyncio model, the official example (and the code I wrote) implements the asyncio interface by binding the poll() operation to an additional thread and making the produce() operation mandatory for the callback function ack(). This results in having to execute a callback function involving a cross-thread call after each production data, which I think is the main reason why confluent kafka performs very poorly in the asyncio model.

In addition, aiokafka performs normally under asynchronous tasks, but I also found that confluent kafka performs exceptionally well under the traditional synchronous model (aiokafka does not have an interface for synchronous tasks).

On the one hand, this is because my callback function is relatively simple, on the other hand, I set the timeout parameter of the poll() function to 0, which means that the producer will not have any waits for the callback of the sent information. In my case it worked fine, but in more heavy workloads, this is likely to cause confluent kafka’s local buffer queue to overflow and cause catastrophic consequences such as data loss. I actually ran into this problem while working on issue#10447. If we try to slightly increase the poll() timeout parameter in the synchronous task, for example, set it to 0.1 (seconds), we can see what the result is:

1
confluent_kafka sync: 27.92 seconds

As expected, its performance plummeted.

So in summary, aiokafka seems to be better than confluent kafka in all aspects in terms of support for the asyncio model

For HTTP

httpx and aiohttp can both be used for asynchronous api.

There is a related blog for reference.

  • httpx is a very comprehensive python networking library that greatly extends python’s traditional requests and other libraries to support many new features, including, of course, asyncio asynchronous support, which we need in particular. Its API is almost similar to that of the traditional requests library, so it’s less difficult to get started and has a more active community. However, its powerful and comprehensive features maybe also its downside, as its performance for asynchronous tasks does not seem to be as good as the aiohttp library, which focuses on asynchronous optimization.
  • The aiohttp library is another available option. It is focused on optimizing python’s asynchronous network io performance and does not have many additional feature extensions. Given that the goal of this project is performance optimization rather than feature extensions, aiohttp is probably the better choice.

Here is a simple benchmark of the two libraries (Windows WSL platform, 12-core Intel CPU):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import time
import httpx
import aiohttp

REQUEST_NUMBER = 1000
TARGET_HTTP_URL = "http://10.3.242.223:9999/"

async def httpx_test():
httpx_client = httpx.AsyncClient()

try:
# Send 100 asynchronous GET requests using HTTPX
start_time = time.perf_counter()
tasks = [httpx_client.get(TARGET_HTTP_URL) for _ in range(REQUEST_NUMBER)]
await asyncio.gather(*tasks)
end_time = time.perf_counter()
print(f"HTTPX: {end_time - start_time:.2f} seconds")
finally:
await httpx_client.aclose()

async def aiohttp_test():
aiohttp_client = aiohttp.ClientSession()

try:
# Send 100 asynchronous GET requests using AIOHTTP
start_time = time.perf_counter()
tasks = [aiohttp_client.get(TARGET_HTTP_URL) for _ in range(REQUEST_NUMBER)]
await asyncio.gather(*tasks)
end_time = time.perf_counter()
print(f"AIOHTTP: {end_time - start_time:.2f} seconds")
finally:
await aiohttp_client.close()

# asyncio.run(main())
asyncio.run(httpx_test())
time.sleep(1)
asyncio.run(aiohttp_test())

I deployed the flask service on port 9999 on a server on the campus LAN and made asynchronous requests to port 9999 locally via httpx and aiohttp respectively. The results are as follows:

1
2
HTTPX: 2.75 seconds
AIOHTTP: 1.47 seconds

So far it looks like aiohttp has a real performance advantage, as mentioned in that blog. However, after I set the TARGET_HTTP_URL parameter to a domain name on the Internet, the result is the exact opposite:

1
2
3
4
5
6
7
8
9
10
11
# 100 requests https://www.baidu.com
HTTPX: 0.41 seconds
AIOHTTP: 1.07 seconds

# 100 requests https://www.microsoft.com
HTTPX: 3.76 seconds
AIOHTTP: 4.42 seconds

# 100 requests https://www.bupt.edu.cn
HTTPX: 0.38 seconds
AIOHTTP: 1.10 seconds

Also, httpx is more stable. When I use a proxy or access some URLs that are very slow to respond, httpx always returns correctly, while aiohttp is sometimes slower or wrong.

1
2
3
4
5
6
7
# 100 requests https://google.github.io/gsocguides/ Frist try
HTTPX: 1.25 seconds
AIOHTTP: 0.95 seconds

# 100 requests https://google.github.io/gsocguides/ Second try
HTTPX: 1.18 seconds
AIOHTTP: 1.77 seconds

I guess this is probably because httpx is optimized for production deployments, e.g. in terms of DNS or retransmission policies, while aiohttp does not do a good job in this area and lacks stability.

In addition, aiohttp needs more codes in implementation, at this point httpx is more convenient.

In summary, aiohttp has a performance advantage over httpx, but this is in theory. In a broader network environment, httpx is more stable, has better availability, and has smoother performance (aiohttp’s performance fluctuates more). And httpx requires less code than aiohttp, which can also increase code readability and development efficiency. Finally, httpx additionally supports many new features not supported by aiohttp, such as http/2.0, which may also benefit the Skywalking Python agent for future expansion.

However, given that the main goal of this proposal is just IO performance improvement, the choice between these two options needs to be explored in more depth.

Investigate queues used between both synchronous and asynchronous code

Due to the highly invasive nature of making asyncio changes, code refactoring will be a recursive process, which may result in python agent users being overly aware of async code. For this, we may need a bridge to properly buffer the difference between synchronous code and asynchronous code, instead of turning all the code into asynchronous implementation. It’s a good idea to retrofit on the queue.

A more suitable option is to use the janus queue. It is simple enough, light enough, and can meet the current requirements. aioprocessing‘s queue is also an alternative, but it is more complex and has many features we don’t use

It must be made clear that janus still has limit with usability. For example, in a thread where there is no asyncio event loop, we can’t create janus queues directly (see this issue).

In addition, using a cross-synchronous-asynchronous queue will definitely cause performance loss. janus’s issue#419 mentions this problem. The reason is probably that the underlying implementation is basically based on calling asyncio.run_in_executor() , which leads to the overhead of internal threads.

Therefore, it is also necessary to use asyncio queue when implementing asyncio refactoring. Once it is confirmed that no thread safety issues will be raised, asyncio queue is clearly our first choice.

Plan

  1. Provide asyncio implementation of data reporters and protocol clients

    • Replace the Threading module of Trace/Log/Meter with asyncio.
    • Replace the protocol(gRPC/Kafka/HTTP) client implementation so that it supports the new concurrent model.
    • If the above tasks go well, the next refactoring may also be done for profilers. (Optional)
    • Considering that asyncio cannot support fork, additional multi-processing work may be required to make the Python agent run on an independent process. (Optional)

Extra info: According to this issue, asyncio will fully support fork() in Python 3.12

  1. Provide a simple but reliable performance test

    • The performance test should be able to test the performance of the new implementation and compare the performance of the new implementation with the performance of the old implementation.
    • If I find that some asyncio implementations are even difficult to surpass the previous thread implementation, I will focus on optimizing my code and consult my mentor; if the performance is still not satisfactory, I may still need to keep the old implementation.
  2. Keep in Touch

    I will keep in touch with my mentor and the community through the mailing list and some instant messaging tools such as WeChat. If I run into a problem that I really have trouble solving on my own or have questions about Skywalking Python’s architecture and concepts, I’m very proactive in asking mentor questions.

  3. Output

    • Submit a PR to Apache Skywalking’s code repository.
    • Improve the relevant documentation.
    • Provide relevant quantitative data for performance optimization, such as the results of benchmark tests.

Timeline

Depending on the progress of the project and the requirements of the mentor, this part may undergo some changes after coding officially begins.

  • now to May 4: before Accepted GSoC contributor projects announced
    • Learn more about the Skywalking ecosystem
    • Try to submit the code related to issue#10447 to PR
    • Learn which modules to focus on for improvement or refactoring in the Skywalking python agent. Run and test some of the threaded models in the code in my local environment by converting them to asyncio models
    • Before introducing the asyncio model, try replacing the protocol client implementation
  • May 4 to May 29: before Coding officially begins
    • Rough replacement of data reports from the threaded model to the asyncio model(Rough version, may lack usability)
    • Complete the basic replacement of the protocol client
  • May 29 to July 14: before Midterm evaluation
    • Iteratively optimize my code to further improve its usability/performance
    • Complete some of the usability and performance testing, further discussion with Mentor may be required on what metrics are needed for testing
    • If the above tasks go well, a further asynchronous modification of the profilers module and multi-process refactoring is also possible
  • July 14 to August 21: before Final week
    • Complete the final replacement of the protocol client
    • Complete the final replacement of the data reporter
    • Complete the final performance testing
    • Code style and quality review, write documentation under the guidance of mentor