# Copyright 2024-2024, Pulumi Corporation. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import asyncio import os import tempfile import time import unittest from pulumi.automation._stack import _watch_logs from pulumi.automation import EngineEvent, StdoutEngineEvent class TestStack(unittest.IsolatedAsyncioTestCase): async def test_always_read_complete_lines(self): tmp = tempfile.NamedTemporaryFile(delete=False) async def write_lines(tmp): with open(tmp, "w") as f: parts = [ '{"stdoutEvent": ', '{"message": "hello", "color": "blue"}', "}\n", '{"stdoutEvent": ', '{"message": "world"', ', "color": "red"}}\n', '{"cancelEvent": {}}\n', ] for part in parts: f.write(part) f.flush() time.sleep(0.2) write_task = asyncio.create_task(write_lines(tmp.name)) event_num = 0 def callback(event): nonlocal event_num if event_num == 0: self.assertTrue(event.stdout_event) self.assertEqual(event.stdout_event.message, "hello") self.assertEqual(event.stdout_event.color, "blue") elif event_num == 1: self.assertTrue(event.stdout_event) self.assertEqual(event.stdout_event.message, "world") self.assertEqual(event.stdout_event.color, "red") event_num += 1 async def watch_async(): _watch_logs(tmp.name, callback) watch_task = asyncio.create_task(watch_async()) await asyncio.gather(write_task, watch_task)